tracing_serde_modality_ingest/
lib.rs

1pub mod options;
2
3use anyhow::Context;
4use auxon_sdk::{
5    api::{AttrVal, BigInt, LogicalTime, Nanoseconds, Uuid},
6    ingest_client::{BoundTimelineState, IngestClient, IngestError as SdkIngestError},
7    ingest_protocol::InternedAttrKey,
8};
9use once_cell::sync::Lazy;
10use std::{
11    borrow::Borrow,
12    collections::HashMap,
13    ops::{Deref, DerefMut},
14    sync::RwLock,
15};
16use thiserror::Error;
17use tracing_serde_structured::{
18    DebugRecord, RecordMap, SerializeId, SerializeMetadata, SerializeRecord, SerializeRecordFields,
19    SerializeValue,
20};
21use tracing_serde_wire::{Packet, TWOther, TracingWire};
22
23pub use auxon_sdk::api::TimelineId;
24
25pub use options::Options;
26
27// spans can be defined on any thread and then sent to another and entered/etc, track globally
28static SPAN_NAMES: Lazy<RwLock<HashMap<u64, String>>> = Lazy::new(|| RwLock::new(HashMap::new()));
29
30#[derive(Debug, Error)]
31pub enum ConnectError {
32    /// No auth was provided
33    #[error("Authentication required")]
34    AuthRequired,
35    /// Auth was provided, but was not accepted by modality
36    #[error("Authenticating with the provided auth failed")]
37    AuthFailed(SdkIngestError),
38    /// Errors that it is assumed there is no way to handle without human intervention, meant for
39    /// consumers to just print and carry on or panic.
40    #[error(transparent)]
41    UnexpectedFailure(#[from] anyhow::Error),
42}
43
44#[derive(Debug, Error)]
45pub enum IngestError {
46    /// Errors that it is assumed there is no way to handle without human intervention, meant for
47    /// consumers to just print and carry on or panic.
48    #[error(transparent)]
49    UnexpectedFailure(#[from] anyhow::Error),
50}
51
52pub struct TracingModality {
53    client: IngestClient<BoundTimelineState>,
54    event_keys: HashMap<String, InternedAttrKey>,
55    timeline_keys: HashMap<String, InternedAttrKey>,
56    timeline_id: TimelineId,
57}
58
59impl TracingModality {
60    pub async fn connect() -> Result<Self, ConnectError> {
61        let opt = Options::default();
62
63        Self::connect_with_options(opt).await
64    }
65
66    pub async fn connect_with_options(options: Options) -> Result<Self, ConnectError> {
67        let url = url::Url::parse(&format!("modality-ingest://{}/", options.server_addr)).unwrap();
68        let unauth_client = IngestClient::connect(&url, false)
69            .await
70            .context("init ingest client")?;
71
72        let auth_key = options.auth.ok_or(ConnectError::AuthRequired)?;
73        let client = unauth_client
74            .authenticate(auth_key)
75            .await
76            .map_err(ConnectError::AuthFailed)?;
77
78        let timeline_id = TimelineId::allocate();
79
80        let client = client
81            .open_timeline(timeline_id)
82            .await
83            .context("open new timeline")?;
84
85        let mut tracer = Self {
86            client,
87            event_keys: HashMap::new(),
88            timeline_keys: HashMap::new(),
89            timeline_id,
90        };
91
92        for (key, value) in options.metadata {
93            let timeline_key_name = tracer
94                .get_or_create_timeline_attr_key(key)
95                .await
96                .context("get or define timeline attr key")?;
97
98            tracer
99                .client
100                .timeline_metadata([(timeline_key_name, value)])
101                .await
102                .context("apply timeline metadata")?;
103        }
104
105        Ok(tracer)
106    }
107
108    pub fn timeline_id(&self) -> TimelineId {
109        self.timeline_id
110    }
111
112    pub async fn handle_packet<'a>(&mut self, pkt: Packet<'_>) -> Result<(), IngestError> {
113        match pkt.message {
114            TracingWire::NewSpan { id, attrs, values } => {
115                let mut records = match values {
116                    SerializeRecord::Ser(_event) => {
117                        unreachable!("this variant can't be sent")
118                    }
119                    SerializeRecord::De(record_map) => record_map,
120                };
121
122                let name = {
123                    // store name for future use
124                    let name = records
125                        .get(&"name".into())
126                        .or_else(|| records.get(&"message".into()))
127                        .map(|n| format!("{:?}", n))
128                        .unwrap_or_else(|| attrs.metadata.name.to_string());
129
130                    SPAN_NAMES
131                        .write()
132                        .expect("span name lock poisoned, this is a bug")
133                        .deref_mut()
134                        .insert(id.id.get(), name.clone());
135
136                    name
137                };
138
139                let mut packed_attrs = Vec::new();
140
141                packed_attrs.push((
142                    self.get_or_create_event_attr_key("event.name".to_string())
143                        .await?,
144                    AttrVal::String(name.into()),
145                ));
146
147                let kind = records
148                    .remove(&"modality.kind".into())
149                    .and_then(tracing_value_to_attr_val)
150                    .unwrap_or_else(|| "span:defined".into());
151                packed_attrs.push((
152                    self.get_or_create_event_attr_key("event.internal.rs.kind".to_string())
153                        .await?,
154                    kind,
155                ));
156
157                let span_id = records
158                    .remove(&"modality.span_id".into())
159                    .and_then(tracing_value_to_attr_val)
160                    .unwrap_or_else(|| BigInt::new_attr_val(id.id.get() as i128));
161                packed_attrs.push((
162                    self.get_or_create_event_attr_key("event.internal.rs.span_id".to_string())
163                        .await?,
164                    span_id,
165                ));
166
167                self.pack_common_attrs(&mut packed_attrs, attrs.metadata, records, pkt.tick)
168                    .await?;
169
170                self.client
171                    .event(pkt.tick.into(), packed_attrs)
172                    .await
173                    .context("send packed event")?;
174            }
175            TracingWire::Record { .. } => {
176                // TODO: span events can't be added to after being sent, impl this once we can use
177                // timelines to represent spans
178            }
179            TracingWire::RecordFollowsFrom { .. } => {
180                // TODO: span events can't be added to after being sent, impl this once we can use
181                // timelines to represent spans
182            }
183            TracingWire::Event(ev) => {
184                let mut packed_attrs = Vec::new();
185
186                let mut records = match ev.fields {
187                    SerializeRecordFields::Ser(_event) => {
188                        unreachable!("this variant can't be sent")
189                    }
190                    SerializeRecordFields::De(record_map) => record_map,
191                };
192
193                let kind = records
194                    .remove(&"modality.kind".into())
195                    .and_then(tracing_value_to_attr_val)
196                    .unwrap_or_else(|| "event".into());
197                packed_attrs.push((
198                    self.get_or_create_event_attr_key("event.internal.rs.kind".to_string())
199                        .await?,
200                    kind,
201                ));
202
203                self.pack_common_attrs(&mut packed_attrs, ev.metadata, records, pkt.tick)
204                    .await?;
205
206                self.client
207                    .event(pkt.tick.into(), packed_attrs)
208                    .await
209                    .context("send packed event")?;
210            }
211            TracingWire::Enter(SerializeId { id }) => {
212                let mut packed_attrs = Vec::new();
213
214                {
215                    // get stored span name
216                    let name = SPAN_NAMES
217                        .read()
218                        .expect("span name lock poisoned, this is a bug")
219                        .deref()
220                        .get(&id.get())
221                        .map(|n| format!("enter: {}", n));
222
223                    if let Some(name) = name {
224                        packed_attrs.push((
225                            self.get_or_create_event_attr_key("event.name".to_string())
226                                .await?,
227                            AttrVal::String(name.into()),
228                        ));
229                    }
230                };
231
232                packed_attrs.push((
233                    self.get_or_create_event_attr_key("event.internal.rs.kind".to_string())
234                        .await?,
235                    AttrVal::String("span:enter".to_string().into()),
236                ));
237
238                packed_attrs.push((
239                    self.get_or_create_event_attr_key("event.internal.rs.span_id".to_string())
240                        .await?,
241                    BigInt::new_attr_val(u64::from(id).into()),
242                ));
243
244                packed_attrs.push((
245                    self.get_or_create_event_attr_key("event.internal.rs.tick".to_string())
246                        .await?,
247                    AttrVal::LogicalTime(LogicalTime::unary(pkt.tick)),
248                ));
249
250                self.client
251                    .event(pkt.tick.into(), packed_attrs)
252                    .await
253                    .context("send packed event")?;
254            }
255            TracingWire::Exit(SerializeId { id }) => {
256                let mut packed_attrs = Vec::new();
257
258                {
259                    // get stored span name
260                    let name = SPAN_NAMES
261                        .read()
262                        .expect("span name lock poisoned, this is a bug")
263                        .deref()
264                        .get(&id.get())
265                        .map(|n| format!("exit: {}", n));
266
267                    if let Some(name) = name {
268                        packed_attrs.push((
269                            self.get_or_create_event_attr_key("event.name".to_string())
270                                .await?,
271                            AttrVal::String(name.into()),
272                        ));
273                    }
274                };
275
276                packed_attrs.push((
277                    self.get_or_create_event_attr_key("event.internal.rs.kind".to_string())
278                        .await?,
279                    AttrVal::String("span:exit".to_string().into()),
280                ));
281
282                packed_attrs.push((
283                    self.get_or_create_event_attr_key("event.internal.rs.span_id".to_string())
284                        .await?,
285                    BigInt::new_attr_val(u64::from(id).into()),
286                ));
287
288                packed_attrs.push((
289                    self.get_or_create_event_attr_key("event.internal.rs.tick".to_string())
290                        .await?,
291                    AttrVal::LogicalTime(LogicalTime::unary(pkt.tick)),
292                ));
293
294                self.client
295                    .event(pkt.tick.into(), packed_attrs)
296                    .await
297                    .context("send packed event")?;
298            }
299            TracingWire::Close(SerializeId { id }) => {
300                SPAN_NAMES
301                    .write()
302                    .expect("span name lock poisoned, this is a bug")
303                    .deref_mut()
304                    .remove(&id.get());
305            }
306            TracingWire::IdClone { old, new } => {
307                let mut span_names = SPAN_NAMES
308                    .write()
309                    .expect("span name lock poisoned, this is a bug");
310
311                let name = span_names.deref().get(&old.id.get()).cloned();
312                if let Some(name) = name {
313                    span_names.deref_mut().insert(new.id.get(), name);
314                }
315            }
316            TracingWire::Other(two) => {
317                match two {
318                    TWOther::MessageDiscarded => {
319                        let mut packed_attrs = Vec::new();
320
321                        packed_attrs.push((
322                            self.get_or_create_event_attr_key("event.internal.rs.kind".to_string())
323                                .await?,
324                            AttrVal::String("message_discarded".to_string().into()),
325                        ));
326                        self.client
327                            .event(pkt.tick.into(), packed_attrs)
328                            .await
329                            .context("send packed event")?;
330                    }
331                    TWOther::DeviceInfo {
332                        clock_id,
333                        ticks_per_sec,
334                        device_id,
335                    } => {
336                        let mut packed_attrs = Vec::new();
337                        packed_attrs.push((
338                            self.get_or_create_timeline_attr_key(
339                                "timeline.internal.rs.clock_id".to_string(),
340                            )
341                            .await?,
342                            AttrVal::Integer(clock_id.into()),
343                        ));
344                        packed_attrs.push((
345                            self.get_or_create_timeline_attr_key(
346                                "timeline.ticks_per_sec".to_string(),
347                            )
348                            .await?,
349                            AttrVal::Integer(ticks_per_sec.into()),
350                        ));
351                        packed_attrs.push((
352                            self.get_or_create_timeline_attr_key(
353                                "timeline.internal.rs.device_id".to_string(),
354                            )
355                            .await?,
356                            // TODO: this includes array syntax in the ID
357                            AttrVal::String(format!("{:x?}", device_id).into()),
358                        ));
359                        self.client
360                            .timeline_metadata(packed_attrs)
361                            .await
362                            .context("send packed timeline metadata")?;
363                    }
364                }
365            }
366            _ => (),
367        }
368
369        Ok(())
370    }
371
372    async fn get_or_create_timeline_attr_key(
373        &mut self,
374        key: String,
375    ) -> Result<InternedAttrKey, IngestError> {
376        if let Some(id) = self.timeline_keys.get(&key) {
377            return Ok(*id);
378        }
379
380        let interned_key = self
381            .client
382            .declare_attr_key(key.clone())
383            .await
384            .context("define timeline attr key")?;
385
386        self.timeline_keys.insert(key, interned_key);
387
388        Ok(interned_key)
389    }
390
391    async fn get_or_create_event_attr_key(
392        &mut self,
393        key: String,
394    ) -> Result<InternedAttrKey, IngestError> {
395        let key = if key.starts_with("event.") {
396            key
397        } else {
398            format!("event.{key}")
399        };
400
401        if let Some(id) = self.event_keys.get(&key) {
402            return Ok(*id);
403        }
404
405        let interned_key = self
406            .client
407            .declare_attr_key(key.clone())
408            .await
409            .context("define event attr key")?;
410
411        self.event_keys.insert(key, interned_key);
412
413        Ok(interned_key)
414    }
415
416    async fn pack_common_attrs<'a>(
417        &mut self,
418        packed_attrs: &mut Vec<(InternedAttrKey, AttrVal)>,
419        metadata: SerializeMetadata<'a>,
420        mut records: RecordMap<'a>,
421        tick: u64,
422    ) -> Result<(), IngestError> {
423        let name = records
424            .remove(&"name".into())
425            .or_else(|| records.remove(&"message".into()))
426            .and_then(tracing_value_to_attr_val)
427            .unwrap_or_else(|| metadata.name.as_str().into());
428        packed_attrs.push((
429            self.get_or_create_event_attr_key("event.name".to_string())
430                .await?,
431            name,
432        ));
433
434        let severity = records
435            .remove(&"severity".into())
436            .and_then(tracing_value_to_attr_val)
437            .unwrap_or_else(|| format!("{:?}", metadata.level).to_lowercase().into());
438        packed_attrs.push((
439            self.get_or_create_event_attr_key("event.severity".to_string())
440                .await?,
441            severity,
442        ));
443
444        let module_path = records
445            .remove(&"source.module".into())
446            .and_then(tracing_value_to_attr_val)
447            .or_else(|| metadata.module_path.map(|mp| mp.as_str().into()));
448        if let Some(module_path) = module_path {
449            packed_attrs.push((
450                self.get_or_create_event_attr_key("event.source.module".to_string())
451                    .await?,
452                module_path,
453            ));
454        }
455
456        let source_file = records
457            .remove(&"source.file".into())
458            .and_then(tracing_value_to_attr_val)
459            .or_else(|| metadata.file.map(|mp| mp.as_str().into()));
460        if let Some(source_file) = source_file {
461            packed_attrs.push((
462                self.get_or_create_event_attr_key("event.source.file".to_string())
463                    .await?,
464                source_file,
465            ));
466        }
467
468        let source_line = records
469            .remove(&"source.line".into())
470            .and_then(tracing_value_to_attr_val)
471            .or_else(|| metadata.line.map(|mp| (mp as i64).into()));
472        if let Some(source_line) = source_line {
473            packed_attrs.push((
474                self.get_or_create_event_attr_key("event.source.line".to_string())
475                    .await?,
476                source_line,
477            ));
478        }
479
480        packed_attrs.push((
481            self.get_or_create_event_attr_key("event.internal.rs.tick".to_string())
482                .await?,
483            AttrVal::LogicalTime(LogicalTime::unary(tick)),
484        ));
485
486        // handle manually to type the AttrVal correctly
487        let remote_timeline_id = records
488            .remove(&"interaction.remote_timeline_id".into())
489            .and_then(tracing_value_to_attr_val);
490        if let Some(attrval) = remote_timeline_id {
491            let remote_timeline_id = if let AttrVal::String(string) = attrval {
492                use std::str::FromStr;
493                if let Ok(uuid) = Uuid::from_str(&string) {
494                    AttrVal::TimelineId(Box::new(uuid.into()))
495                } else {
496                    AttrVal::String(string)
497                }
498            } else {
499                attrval
500            };
501
502            packed_attrs.push((
503                self.get_or_create_event_attr_key("event.interaction.remote_timeline_id".into())
504                    .await?,
505                remote_timeline_id,
506            ));
507        }
508
509        // Manually retype the remote_timestamp
510        let remote_timestamp = records
511            .remove(&"interaction.remote_timestamp".into())
512            .and_then(tracing_value_to_attr_val);
513        if let Some(attrval) = remote_timestamp {
514            let remote_timestamp = match attrval {
515                AttrVal::Integer(i) if i >= 0 => AttrVal::Timestamp(Nanoseconds::from(i as u64)),
516                AttrVal::BigInt(i) if *i >= 0 && *i <= u64::MAX as i128 => {
517                    AttrVal::Timestamp(Nanoseconds::from(*i as u64))
518                }
519                AttrVal::Timestamp(t) => AttrVal::Timestamp(t),
520                x => x,
521            };
522
523            packed_attrs.push((
524                self.get_or_create_event_attr_key("event.interaction.remote_timestamp".into())
525                    .await?,
526                remote_timestamp,
527            ));
528        }
529
530        // Manually retype the local timestamp
531        let local_timestamp = records
532            .remove(&"timestamp".into())
533            .and_then(tracing_value_to_attr_val);
534        if let Some(attrval) = local_timestamp {
535            let remote_timestamp = match attrval {
536                AttrVal::Integer(i) if i >= 0 => AttrVal::Timestamp(Nanoseconds::from(i as u64)),
537                AttrVal::BigInt(i) if *i >= 0 && *i <= u64::MAX as i128 => {
538                    AttrVal::Timestamp(Nanoseconds::from(*i as u64))
539                }
540                AttrVal::Timestamp(t) => AttrVal::Timestamp(t),
541                x => x,
542            };
543
544            packed_attrs.push((
545                self.get_or_create_event_attr_key("event.timestamp".into())
546                    .await?,
547                remote_timestamp,
548            ));
549        } else if let Ok(duration_since_epoch) =
550            std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH)
551        {
552            let duration_since_epoch_in_nanos_res: Result<u64, _> =
553                duration_since_epoch.as_nanos().try_into();
554            if let Ok(duration_since_epoch_in_nanos) = duration_since_epoch_in_nanos_res {
555                packed_attrs.push((
556                    self.get_or_create_event_attr_key("event.timestamp".into())
557                        .await?,
558                    AttrVal::Timestamp(Nanoseconds::from(duration_since_epoch_in_nanos)),
559                ));
560            }
561        }
562
563        // pack any remaining records
564        for (name, value) in records {
565            let attrval = if let Some(attrval) = tracing_value_to_attr_val(value) {
566                attrval
567            } else {
568                continue;
569            };
570
571            let key = if name.starts_with("event.") {
572                name.to_string()
573            } else {
574                format!("event.{}", name.as_str())
575            };
576
577            packed_attrs.push((self.get_or_create_event_attr_key(key).await?, attrval));
578        }
579
580        Ok(())
581    }
582}
583
584// `SerializeValue` is `#[nonexhaustive]`, returns `None` if they add a type we don't handle and
585// fail to serialize it as a stringified json value
586fn tracing_value_to_attr_val<'a, V: Borrow<SerializeValue<'a>>>(value: V) -> Option<AttrVal> {
587    Some(match value.borrow() {
588        SerializeValue::Debug(dr) => match dr {
589            // TODO: there's an opertunity here to pull out message format
590            // parameters raw here instead of shipping a formatted string
591            DebugRecord::Ser(s) => AttrVal::String(s.to_string().into()),
592            DebugRecord::De(s) => AttrVal::String(s.to_string().into()),
593        },
594        SerializeValue::Str(s) => AttrVal::String(s.to_string().into()),
595        SerializeValue::F64(n) => AttrVal::Float((*n).into()),
596        SerializeValue::I64(n) => AttrVal::Integer(*n),
597        SerializeValue::U64(n) => BigInt::new_attr_val((*n).into()),
598        SerializeValue::Bool(b) => AttrVal::Bool(*b),
599        unknown_sv => {
600            if let Ok(sval) = serde_json::to_string(&unknown_sv) {
601                AttrVal::String(sval.into())
602            } else {
603                return None;
604            }
605        }
606    })
607}