1pub mod options;
2
3use anyhow::Context;
4use auxon_sdk::{
5 api::{AttrVal, BigInt, LogicalTime, Nanoseconds, Uuid},
6 ingest_client::{BoundTimelineState, IngestClient, IngestError as SdkIngestError},
7 ingest_protocol::InternedAttrKey,
8};
9use once_cell::sync::Lazy;
10use std::{
11 borrow::Borrow,
12 collections::HashMap,
13 ops::{Deref, DerefMut},
14 sync::RwLock,
15};
16use thiserror::Error;
17use tracing_serde_structured::{
18 DebugRecord, RecordMap, SerializeId, SerializeMetadata, SerializeRecord, SerializeRecordFields,
19 SerializeValue,
20};
21use tracing_serde_wire::{Packet, TWOther, TracingWire};
22
23pub use auxon_sdk::api::TimelineId;
24
25pub use options::Options;
26
27static SPAN_NAMES: Lazy<RwLock<HashMap<u64, String>>> = Lazy::new(|| RwLock::new(HashMap::new()));
29
30#[derive(Debug, Error)]
31pub enum ConnectError {
32 #[error("Authentication required")]
34 AuthRequired,
35 #[error("Authenticating with the provided auth failed")]
37 AuthFailed(SdkIngestError),
38 #[error(transparent)]
41 UnexpectedFailure(#[from] anyhow::Error),
42}
43
44#[derive(Debug, Error)]
45pub enum IngestError {
46 #[error(transparent)]
49 UnexpectedFailure(#[from] anyhow::Error),
50}
51
52pub struct TracingModality {
53 client: IngestClient<BoundTimelineState>,
54 event_keys: HashMap<String, InternedAttrKey>,
55 timeline_keys: HashMap<String, InternedAttrKey>,
56 timeline_id: TimelineId,
57}
58
59impl TracingModality {
60 pub async fn connect() -> Result<Self, ConnectError> {
61 let opt = Options::default();
62
63 Self::connect_with_options(opt).await
64 }
65
66 pub async fn connect_with_options(options: Options) -> Result<Self, ConnectError> {
67 let url = url::Url::parse(&format!("modality-ingest://{}/", options.server_addr)).unwrap();
68 let unauth_client = IngestClient::connect(&url, false)
69 .await
70 .context("init ingest client")?;
71
72 let auth_key = options.auth.ok_or(ConnectError::AuthRequired)?;
73 let client = unauth_client
74 .authenticate(auth_key)
75 .await
76 .map_err(ConnectError::AuthFailed)?;
77
78 let timeline_id = TimelineId::allocate();
79
80 let client = client
81 .open_timeline(timeline_id)
82 .await
83 .context("open new timeline")?;
84
85 let mut tracer = Self {
86 client,
87 event_keys: HashMap::new(),
88 timeline_keys: HashMap::new(),
89 timeline_id,
90 };
91
92 for (key, value) in options.metadata {
93 let timeline_key_name = tracer
94 .get_or_create_timeline_attr_key(key)
95 .await
96 .context("get or define timeline attr key")?;
97
98 tracer
99 .client
100 .timeline_metadata([(timeline_key_name, value)])
101 .await
102 .context("apply timeline metadata")?;
103 }
104
105 Ok(tracer)
106 }
107
108 pub fn timeline_id(&self) -> TimelineId {
109 self.timeline_id
110 }
111
112 pub async fn handle_packet<'a>(&mut self, pkt: Packet<'_>) -> Result<(), IngestError> {
113 match pkt.message {
114 TracingWire::NewSpan { id, attrs, values } => {
115 let mut records = match values {
116 SerializeRecord::Ser(_event) => {
117 unreachable!("this variant can't be sent")
118 }
119 SerializeRecord::De(record_map) => record_map,
120 };
121
122 let name = {
123 let name = records
125 .get(&"name".into())
126 .or_else(|| records.get(&"message".into()))
127 .map(|n| format!("{:?}", n))
128 .unwrap_or_else(|| attrs.metadata.name.to_string());
129
130 SPAN_NAMES
131 .write()
132 .expect("span name lock poisoned, this is a bug")
133 .deref_mut()
134 .insert(id.id.get(), name.clone());
135
136 name
137 };
138
139 let mut packed_attrs = Vec::new();
140
141 packed_attrs.push((
142 self.get_or_create_event_attr_key("event.name".to_string())
143 .await?,
144 AttrVal::String(name.into()),
145 ));
146
147 let kind = records
148 .remove(&"modality.kind".into())
149 .and_then(tracing_value_to_attr_val)
150 .unwrap_or_else(|| "span:defined".into());
151 packed_attrs.push((
152 self.get_or_create_event_attr_key("event.internal.rs.kind".to_string())
153 .await?,
154 kind,
155 ));
156
157 let span_id = records
158 .remove(&"modality.span_id".into())
159 .and_then(tracing_value_to_attr_val)
160 .unwrap_or_else(|| BigInt::new_attr_val(id.id.get() as i128));
161 packed_attrs.push((
162 self.get_or_create_event_attr_key("event.internal.rs.span_id".to_string())
163 .await?,
164 span_id,
165 ));
166
167 self.pack_common_attrs(&mut packed_attrs, attrs.metadata, records, pkt.tick)
168 .await?;
169
170 self.client
171 .event(pkt.tick.into(), packed_attrs)
172 .await
173 .context("send packed event")?;
174 }
175 TracingWire::Record { .. } => {
176 }
179 TracingWire::RecordFollowsFrom { .. } => {
180 }
183 TracingWire::Event(ev) => {
184 let mut packed_attrs = Vec::new();
185
186 let mut records = match ev.fields {
187 SerializeRecordFields::Ser(_event) => {
188 unreachable!("this variant can't be sent")
189 }
190 SerializeRecordFields::De(record_map) => record_map,
191 };
192
193 let kind = records
194 .remove(&"modality.kind".into())
195 .and_then(tracing_value_to_attr_val)
196 .unwrap_or_else(|| "event".into());
197 packed_attrs.push((
198 self.get_or_create_event_attr_key("event.internal.rs.kind".to_string())
199 .await?,
200 kind,
201 ));
202
203 self.pack_common_attrs(&mut packed_attrs, ev.metadata, records, pkt.tick)
204 .await?;
205
206 self.client
207 .event(pkt.tick.into(), packed_attrs)
208 .await
209 .context("send packed event")?;
210 }
211 TracingWire::Enter(SerializeId { id }) => {
212 let mut packed_attrs = Vec::new();
213
214 {
215 let name = SPAN_NAMES
217 .read()
218 .expect("span name lock poisoned, this is a bug")
219 .deref()
220 .get(&id.get())
221 .map(|n| format!("enter: {}", n));
222
223 if let Some(name) = name {
224 packed_attrs.push((
225 self.get_or_create_event_attr_key("event.name".to_string())
226 .await?,
227 AttrVal::String(name.into()),
228 ));
229 }
230 };
231
232 packed_attrs.push((
233 self.get_or_create_event_attr_key("event.internal.rs.kind".to_string())
234 .await?,
235 AttrVal::String("span:enter".to_string().into()),
236 ));
237
238 packed_attrs.push((
239 self.get_or_create_event_attr_key("event.internal.rs.span_id".to_string())
240 .await?,
241 BigInt::new_attr_val(u64::from(id).into()),
242 ));
243
244 packed_attrs.push((
245 self.get_or_create_event_attr_key("event.internal.rs.tick".to_string())
246 .await?,
247 AttrVal::LogicalTime(LogicalTime::unary(pkt.tick)),
248 ));
249
250 self.client
251 .event(pkt.tick.into(), packed_attrs)
252 .await
253 .context("send packed event")?;
254 }
255 TracingWire::Exit(SerializeId { id }) => {
256 let mut packed_attrs = Vec::new();
257
258 {
259 let name = SPAN_NAMES
261 .read()
262 .expect("span name lock poisoned, this is a bug")
263 .deref()
264 .get(&id.get())
265 .map(|n| format!("exit: {}", n));
266
267 if let Some(name) = name {
268 packed_attrs.push((
269 self.get_or_create_event_attr_key("event.name".to_string())
270 .await?,
271 AttrVal::String(name.into()),
272 ));
273 }
274 };
275
276 packed_attrs.push((
277 self.get_or_create_event_attr_key("event.internal.rs.kind".to_string())
278 .await?,
279 AttrVal::String("span:exit".to_string().into()),
280 ));
281
282 packed_attrs.push((
283 self.get_or_create_event_attr_key("event.internal.rs.span_id".to_string())
284 .await?,
285 BigInt::new_attr_val(u64::from(id).into()),
286 ));
287
288 packed_attrs.push((
289 self.get_or_create_event_attr_key("event.internal.rs.tick".to_string())
290 .await?,
291 AttrVal::LogicalTime(LogicalTime::unary(pkt.tick)),
292 ));
293
294 self.client
295 .event(pkt.tick.into(), packed_attrs)
296 .await
297 .context("send packed event")?;
298 }
299 TracingWire::Close(SerializeId { id }) => {
300 SPAN_NAMES
301 .write()
302 .expect("span name lock poisoned, this is a bug")
303 .deref_mut()
304 .remove(&id.get());
305 }
306 TracingWire::IdClone { old, new } => {
307 let mut span_names = SPAN_NAMES
308 .write()
309 .expect("span name lock poisoned, this is a bug");
310
311 let name = span_names.deref().get(&old.id.get()).cloned();
312 if let Some(name) = name {
313 span_names.deref_mut().insert(new.id.get(), name);
314 }
315 }
316 TracingWire::Other(two) => {
317 match two {
318 TWOther::MessageDiscarded => {
319 let mut packed_attrs = Vec::new();
320
321 packed_attrs.push((
322 self.get_or_create_event_attr_key("event.internal.rs.kind".to_string())
323 .await?,
324 AttrVal::String("message_discarded".to_string().into()),
325 ));
326 self.client
327 .event(pkt.tick.into(), packed_attrs)
328 .await
329 .context("send packed event")?;
330 }
331 TWOther::DeviceInfo {
332 clock_id,
333 ticks_per_sec,
334 device_id,
335 } => {
336 let mut packed_attrs = Vec::new();
337 packed_attrs.push((
338 self.get_or_create_timeline_attr_key(
339 "timeline.internal.rs.clock_id".to_string(),
340 )
341 .await?,
342 AttrVal::Integer(clock_id.into()),
343 ));
344 packed_attrs.push((
345 self.get_or_create_timeline_attr_key(
346 "timeline.ticks_per_sec".to_string(),
347 )
348 .await?,
349 AttrVal::Integer(ticks_per_sec.into()),
350 ));
351 packed_attrs.push((
352 self.get_or_create_timeline_attr_key(
353 "timeline.internal.rs.device_id".to_string(),
354 )
355 .await?,
356 AttrVal::String(format!("{:x?}", device_id).into()),
358 ));
359 self.client
360 .timeline_metadata(packed_attrs)
361 .await
362 .context("send packed timeline metadata")?;
363 }
364 }
365 }
366 _ => (),
367 }
368
369 Ok(())
370 }
371
372 async fn get_or_create_timeline_attr_key(
373 &mut self,
374 key: String,
375 ) -> Result<InternedAttrKey, IngestError> {
376 if let Some(id) = self.timeline_keys.get(&key) {
377 return Ok(*id);
378 }
379
380 let interned_key = self
381 .client
382 .declare_attr_key(key.clone())
383 .await
384 .context("define timeline attr key")?;
385
386 self.timeline_keys.insert(key, interned_key);
387
388 Ok(interned_key)
389 }
390
391 async fn get_or_create_event_attr_key(
392 &mut self,
393 key: String,
394 ) -> Result<InternedAttrKey, IngestError> {
395 let key = if key.starts_with("event.") {
396 key
397 } else {
398 format!("event.{key}")
399 };
400
401 if let Some(id) = self.event_keys.get(&key) {
402 return Ok(*id);
403 }
404
405 let interned_key = self
406 .client
407 .declare_attr_key(key.clone())
408 .await
409 .context("define event attr key")?;
410
411 self.event_keys.insert(key, interned_key);
412
413 Ok(interned_key)
414 }
415
416 async fn pack_common_attrs<'a>(
417 &mut self,
418 packed_attrs: &mut Vec<(InternedAttrKey, AttrVal)>,
419 metadata: SerializeMetadata<'a>,
420 mut records: RecordMap<'a>,
421 tick: u64,
422 ) -> Result<(), IngestError> {
423 let name = records
424 .remove(&"name".into())
425 .or_else(|| records.remove(&"message".into()))
426 .and_then(tracing_value_to_attr_val)
427 .unwrap_or_else(|| metadata.name.as_str().into());
428 packed_attrs.push((
429 self.get_or_create_event_attr_key("event.name".to_string())
430 .await?,
431 name,
432 ));
433
434 let severity = records
435 .remove(&"severity".into())
436 .and_then(tracing_value_to_attr_val)
437 .unwrap_or_else(|| format!("{:?}", metadata.level).to_lowercase().into());
438 packed_attrs.push((
439 self.get_or_create_event_attr_key("event.severity".to_string())
440 .await?,
441 severity,
442 ));
443
444 let module_path = records
445 .remove(&"source.module".into())
446 .and_then(tracing_value_to_attr_val)
447 .or_else(|| metadata.module_path.map(|mp| mp.as_str().into()));
448 if let Some(module_path) = module_path {
449 packed_attrs.push((
450 self.get_or_create_event_attr_key("event.source.module".to_string())
451 .await?,
452 module_path,
453 ));
454 }
455
456 let source_file = records
457 .remove(&"source.file".into())
458 .and_then(tracing_value_to_attr_val)
459 .or_else(|| metadata.file.map(|mp| mp.as_str().into()));
460 if let Some(source_file) = source_file {
461 packed_attrs.push((
462 self.get_or_create_event_attr_key("event.source.file".to_string())
463 .await?,
464 source_file,
465 ));
466 }
467
468 let source_line = records
469 .remove(&"source.line".into())
470 .and_then(tracing_value_to_attr_val)
471 .or_else(|| metadata.line.map(|mp| (mp as i64).into()));
472 if let Some(source_line) = source_line {
473 packed_attrs.push((
474 self.get_or_create_event_attr_key("event.source.line".to_string())
475 .await?,
476 source_line,
477 ));
478 }
479
480 packed_attrs.push((
481 self.get_or_create_event_attr_key("event.internal.rs.tick".to_string())
482 .await?,
483 AttrVal::LogicalTime(LogicalTime::unary(tick)),
484 ));
485
486 let remote_timeline_id = records
488 .remove(&"interaction.remote_timeline_id".into())
489 .and_then(tracing_value_to_attr_val);
490 if let Some(attrval) = remote_timeline_id {
491 let remote_timeline_id = if let AttrVal::String(string) = attrval {
492 use std::str::FromStr;
493 if let Ok(uuid) = Uuid::from_str(&string) {
494 AttrVal::TimelineId(Box::new(uuid.into()))
495 } else {
496 AttrVal::String(string)
497 }
498 } else {
499 attrval
500 };
501
502 packed_attrs.push((
503 self.get_or_create_event_attr_key("event.interaction.remote_timeline_id".into())
504 .await?,
505 remote_timeline_id,
506 ));
507 }
508
509 let remote_timestamp = records
511 .remove(&"interaction.remote_timestamp".into())
512 .and_then(tracing_value_to_attr_val);
513 if let Some(attrval) = remote_timestamp {
514 let remote_timestamp = match attrval {
515 AttrVal::Integer(i) if i >= 0 => AttrVal::Timestamp(Nanoseconds::from(i as u64)),
516 AttrVal::BigInt(i) if *i >= 0 && *i <= u64::MAX as i128 => {
517 AttrVal::Timestamp(Nanoseconds::from(*i as u64))
518 }
519 AttrVal::Timestamp(t) => AttrVal::Timestamp(t),
520 x => x,
521 };
522
523 packed_attrs.push((
524 self.get_or_create_event_attr_key("event.interaction.remote_timestamp".into())
525 .await?,
526 remote_timestamp,
527 ));
528 }
529
530 let local_timestamp = records
532 .remove(&"timestamp".into())
533 .and_then(tracing_value_to_attr_val);
534 if let Some(attrval) = local_timestamp {
535 let remote_timestamp = match attrval {
536 AttrVal::Integer(i) if i >= 0 => AttrVal::Timestamp(Nanoseconds::from(i as u64)),
537 AttrVal::BigInt(i) if *i >= 0 && *i <= u64::MAX as i128 => {
538 AttrVal::Timestamp(Nanoseconds::from(*i as u64))
539 }
540 AttrVal::Timestamp(t) => AttrVal::Timestamp(t),
541 x => x,
542 };
543
544 packed_attrs.push((
545 self.get_or_create_event_attr_key("event.timestamp".into())
546 .await?,
547 remote_timestamp,
548 ));
549 } else if let Ok(duration_since_epoch) =
550 std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH)
551 {
552 let duration_since_epoch_in_nanos_res: Result<u64, _> =
553 duration_since_epoch.as_nanos().try_into();
554 if let Ok(duration_since_epoch_in_nanos) = duration_since_epoch_in_nanos_res {
555 packed_attrs.push((
556 self.get_or_create_event_attr_key("event.timestamp".into())
557 .await?,
558 AttrVal::Timestamp(Nanoseconds::from(duration_since_epoch_in_nanos)),
559 ));
560 }
561 }
562
563 for (name, value) in records {
565 let attrval = if let Some(attrval) = tracing_value_to_attr_val(value) {
566 attrval
567 } else {
568 continue;
569 };
570
571 let key = if name.starts_with("event.") {
572 name.to_string()
573 } else {
574 format!("event.{}", name.as_str())
575 };
576
577 packed_attrs.push((self.get_or_create_event_attr_key(key).await?, attrval));
578 }
579
580 Ok(())
581 }
582}
583
584fn tracing_value_to_attr_val<'a, V: Borrow<SerializeValue<'a>>>(value: V) -> Option<AttrVal> {
587 Some(match value.borrow() {
588 SerializeValue::Debug(dr) => match dr {
589 DebugRecord::Ser(s) => AttrVal::String(s.to_string().into()),
592 DebugRecord::De(s) => AttrVal::String(s.to_string().into()),
593 },
594 SerializeValue::Str(s) => AttrVal::String(s.to_string().into()),
595 SerializeValue::F64(n) => AttrVal::Float((*n).into()),
596 SerializeValue::I64(n) => AttrVal::Integer(*n),
597 SerializeValue::U64(n) => BigInt::new_attr_val((*n).into()),
598 SerializeValue::Bool(b) => AttrVal::Bool(*b),
599 unknown_sv => {
600 if let Ok(sval) = serde_json::to_string(&unknown_sv) {
601 AttrVal::String(sval.into())
602 } else {
603 return None;
604 }
605 }
606 })
607}