auxon_sdk/tracing/common/
ingest.rs

1use crate::{
2    api::{AttrVal, BigInt, LogicalTime, Nanoseconds, TimelineId, Uuid},
3    ingest_client::{BoundTimelineState, IngestClient, IngestError as SdkIngestError},
4    ingest_protocol::InternedAttrKey,
5    tracing::{
6        layer::{RecordMap, TracingValue},
7        Options,
8    },
9};
10use anyhow::Context;
11use once_cell::sync::Lazy;
12use std::{collections::HashMap, num::NonZeroU64, time::Duration};
13use thiserror::Error;
14use tokio::{
15    select,
16    sync::mpsc::{self, UnboundedReceiver, UnboundedSender},
17    sync::oneshot,
18};
19use tracing_core::Metadata;
20
21use std::thread::{self, JoinHandle};
22use tokio::runtime::Runtime;
23use tokio::task;
24
25thread_local! {
26    static THREAD_TIMELINE_ID: Lazy<TimelineId> = Lazy::new(TimelineId::allocate);
27}
28
29#[derive(Debug, Error)]
30pub enum ConnectError {
31    /// No auth was provided
32    #[error("Authentication required")]
33    AuthRequired,
34    /// Auth was provided, but was not accepted by modality
35    #[error("Authenticating with the provided auth failed")]
36    AuthFailed(#[from] SdkIngestError),
37    /// Errors that it is assumed there is no way to handle without human intervention, meant for
38    /// consumers to just print and carry on or panic.
39    #[error(transparent)]
40    UnexpectedFailure(#[from] anyhow::Error),
41}
42
43#[derive(Debug, Error)]
44pub enum IngestError {
45    /// Errors that it is assumed there is no way to handle without human intervention, meant for
46    /// consumers to just print and carry on or panic.
47    #[error(transparent)]
48    UnexpectedFailure(#[from] anyhow::Error),
49}
50
51pub(crate) fn current_timeline() -> TimelineId {
52    THREAD_TIMELINE_ID.with(|id| **id)
53}
54
55pub(crate) type SpanId = NonZeroU64;
56
57#[derive(Debug)]
58pub(crate) struct WrappedMessage {
59    pub message: Message,
60    pub tick: Duration,
61    pub nanos_since_unix_epoch: Option<Nanoseconds>,
62    pub timeline: TimelineId,
63}
64
65#[derive(Debug)]
66pub(crate) enum Message {
67    NewTimeline {
68        name: String,
69    },
70    NewSpan {
71        id: SpanId,
72        metadata: &'static Metadata<'static>,
73        records: RecordMap,
74    },
75    Record {
76        span: SpanId,
77        records: RecordMap,
78    },
79    RecordFollowsFrom {
80        span: SpanId,
81        follows: SpanId,
82    },
83    Event {
84        metadata: &'static Metadata<'static>,
85        records: RecordMap,
86    },
87    Enter {
88        span: SpanId,
89    },
90    Exit {
91        span: SpanId,
92    },
93    Close {
94        span: SpanId,
95    },
96    IdChange {
97        old: SpanId,
98        new: SpanId,
99    },
100}
101
102/// A handle to control the spawned ingest thread.
103pub struct ModalityIngestThreadHandle {
104    pub(crate) ingest_sender: UnboundedSender<WrappedMessage>,
105    pub(crate) finish_sender: Option<oneshot::Sender<()>>,
106    pub(crate) thread: Option<JoinHandle<()>>,
107}
108
109impl ModalityIngestThreadHandle {
110    /// Stop accepting new trace events, flush all existing events, and stop ingest thread.
111    ///
112    /// This function must be called at the end of your main thread to give the ingest thread a
113    /// chance to flush all queued trace events out to modality.
114    ///
115    /// # Panics
116    ///
117    /// This function uses [`std::thread::JoinHandle::join`] which may panic on some platforms if a
118    /// thread attempts to join itself or otherwise may create a deadlock with joining threads.
119    /// This case should be incredibly unlikely, if not impossible, but can not be statically
120    /// guarenteed.
121    pub fn finish(mut self) {
122        if let Some(finish) = self.finish_sender.take() {
123            let _ = finish.send(());
124        }
125
126        if let Some(thread) = self.thread.take() {
127            let _ = thread.join();
128        }
129    }
130}
131
132/// A handle to control the spawned ingest task.
133pub struct ModalityIngestTaskHandle {
134    pub(crate) ingest_sender: UnboundedSender<WrappedMessage>,
135    pub(crate) finish_sender: Option<oneshot::Sender<()>>,
136    pub(crate) task: Option<task::JoinHandle<()>>,
137}
138
139impl ModalityIngestTaskHandle {
140    /// Stop accepting new trace events, flush all existing events, and stop ingest thread.
141    ///
142    /// This function must be called at the end of your main thread to give the ingest thread a
143    /// chance to flush all queued trace events out to modality.
144    pub async fn finish(mut self) {
145        if let Some(finish) = self.finish_sender.take() {
146            let _ = finish.send(());
147        }
148
149        if let Some(task) = self.task.take() {
150            let _ = task.await;
151        }
152    }
153}
154
155pub(crate) struct ModalityIngest {
156    client: IngestClient<BoundTimelineState>,
157    global_metadata: Vec<(String, AttrVal)>,
158    event_keys: HashMap<String, InternedAttrKey>,
159    timeline_keys: HashMap<String, InternedAttrKey>,
160    span_names: HashMap<NonZeroU64, String>,
161
162    rt: Option<Runtime>,
163}
164
165impl ModalityIngest {
166    pub(crate) fn connect(opts: Options) -> Result<Self, ConnectError> {
167        let rt = tokio::runtime::Builder::new_current_thread()
168            .enable_io()
169            .enable_time()
170            .build()
171            .expect("build intial tokio current thread runtime");
172
173        rt.block_on(async { Self::async_connect(opts).await })
174            .map(move |mut m| {
175                m.rt = Some(rt);
176                m
177            })
178    }
179
180    pub(crate) async fn async_connect(options: Options) -> Result<Self, ConnectError> {
181        let url = url::Url::parse(&format!("modality-ingest://{}/", options.server_addr)).unwrap();
182        let unauth_client = IngestClient::connect(&url, false)
183            .await
184            .context("init ingest client")?;
185
186        let auth_key = options.auth.ok_or(ConnectError::AuthRequired)?;
187        let client = unauth_client
188            .authenticate(auth_key)
189            .await
190            .map_err(ConnectError::AuthFailed)?;
191
192        // open a timeline for the current thread because we need to open something to make the
193        // types work
194        let timeline_id = current_timeline();
195        let client = client
196            .open_timeline(timeline_id)
197            .await
198            .context("open new timeline")?;
199
200        Ok(Self {
201            client,
202            global_metadata: options.metadata,
203            event_keys: HashMap::new(),
204            timeline_keys: HashMap::new(),
205            span_names: HashMap::new(),
206            rt: None,
207        })
208    }
209
210    pub(crate) fn spawn_thread(mut self) -> ModalityIngestThreadHandle {
211        let (sender, recv) = mpsc::unbounded_channel();
212        let (finish_sender, finish_receiver) = oneshot::channel();
213
214        let join_handle = thread::spawn(move || {
215            // ensure this thread doesn't send trace events to the global dispatcher
216            let _dispatch_guard = tracing::dispatcher::set_default(&tracing::Dispatch::none());
217
218            let rt = self.rt.take().unwrap_or_else(|| {
219                tokio::runtime::Builder::new_current_thread()
220                    .build()
221                    .expect("build local tokio current thread runtime")
222            });
223
224            rt.block_on(self.handler_task(recv, finish_receiver))
225        });
226
227        ModalityIngestThreadHandle {
228            ingest_sender: sender,
229            finish_sender: Some(finish_sender),
230            thread: Some(join_handle),
231        }
232    }
233
234    pub(crate) async fn spawn_task(self) -> ModalityIngestTaskHandle {
235        let (ingest_sender, recv) = mpsc::unbounded_channel();
236        let (finish_sender, finish_receiver) = oneshot::channel();
237
238        let task = tokio::spawn(self.handler_task(recv, finish_receiver));
239
240        ModalityIngestTaskHandle {
241            ingest_sender,
242            finish_sender: Some(finish_sender),
243            task: Some(task),
244        }
245    }
246
247    async fn handler_task(
248        mut self,
249        mut recv: UnboundedReceiver<WrappedMessage>,
250        mut finish: oneshot::Receiver<()>,
251    ) {
252        loop {
253            select! {
254                Some(message) = recv.recv() => {
255                    let _ = self.handle_packet(message).await;
256                },
257                _ = &mut finish => {
258                    break
259                }
260            }
261        }
262
263        // close channel and drain existing messages
264        recv.close();
265        while let Some(message) = recv.recv().await {
266            let _ = self.handle_packet(message).await;
267        }
268        let _ = self.client.flush().await;
269    }
270
271    async fn handle_packet(&mut self, message: WrappedMessage) -> Result<(), IngestError> {
272        let WrappedMessage {
273            message,
274            tick,
275            nanos_since_unix_epoch,
276            timeline,
277        } = message;
278
279        if self.client.bound_timeline() != timeline {
280            self.client
281                .open_timeline(timeline)
282                .await
283                .context("open new timeline")?;
284        }
285
286        match message {
287            Message::NewTimeline { name } => {
288                let mut timeline_metadata = self.global_metadata.clone();
289
290                if !timeline_metadata.iter().any(|(k, _v)| k == "name") {
291                    timeline_metadata.push(("timeline.name".to_string(), name.into()));
292                }
293
294                for (key, value) in timeline_metadata {
295                    let timeline_key_name = self
296                        .get_or_create_timeline_attr_key(key)
297                        .await
298                        .context("get or define timeline attr key")?;
299
300                    self.client
301                        .timeline_metadata([(timeline_key_name, value)])
302                        .await
303                        .context("apply timeline metadata")?;
304                }
305            }
306            Message::NewSpan {
307                id,
308                metadata,
309                mut records,
310            } => {
311                let name = {
312                    // store name for future use
313                    let name = records
314                        .get("name")
315                        .or_else(|| records.get("message"))
316                        .map(|n| format!("{:?}", n))
317                        .unwrap_or_else(|| metadata.name().to_string());
318
319                    self.span_names.insert(id, name.clone());
320
321                    name
322                };
323
324                let mut packed_attrs = Vec::new();
325
326                packed_attrs.push((
327                    self.get_or_create_event_attr_key("event.name".to_string())
328                        .await?,
329                    AttrVal::String(name.into()),
330                ));
331
332                let kind = records
333                    .remove("modality.kind")
334                    .map(tracing_value_to_attr_val)
335                    .unwrap_or_else(|| "span:defined".into());
336                packed_attrs.push((
337                    self.get_or_create_event_attr_key("event.internal.rs.kind".to_string())
338                        .await?,
339                    kind,
340                ));
341
342                let span_id = records
343                    .remove("modality.span_id")
344                    .map(tracing_value_to_attr_val)
345                    .unwrap_or_else(|| BigInt::new_attr_val(u64::from(id) as i128));
346                packed_attrs.push((
347                    self.get_or_create_event_attr_key("event.internal.rs.span_id".to_string())
348                        .await?,
349                    span_id,
350                ));
351
352                self.pack_common_attrs(
353                    &mut packed_attrs,
354                    metadata,
355                    records,
356                    tick,
357                    nanos_since_unix_epoch,
358                )
359                .await?;
360
361                self.client
362                    .event(tick.as_nanos(), packed_attrs)
363                    .await
364                    .context("send packed event")?;
365            }
366            Message::Record { span, records } => {
367                // TODO: span events can't be added to after being sent, impl this once we can use
368                // timelines to represent spans
369
370                let _ = span;
371                let _ = records;
372            }
373            Message::RecordFollowsFrom { span, follows } => {
374                // TODO: span events can't be added to after being sent, impl this once we can use
375                // timelines to represent spans
376
377                let _ = span;
378                let _ = follows;
379            }
380            Message::Event {
381                metadata,
382                mut records,
383            } => {
384                let mut packed_attrs = Vec::new();
385
386                let kind = records
387                    .remove("modality.kind")
388                    .map(tracing_value_to_attr_val)
389                    .unwrap_or_else(|| "event".into());
390                packed_attrs.push((
391                    self.get_or_create_event_attr_key("event.internal.rs.kind".to_string())
392                        .await?,
393                    kind,
394                ));
395
396                self.pack_common_attrs(
397                    &mut packed_attrs,
398                    metadata,
399                    records,
400                    tick,
401                    nanos_since_unix_epoch,
402                )
403                .await?;
404
405                self.client
406                    .event(tick.as_nanos(), packed_attrs)
407                    .await
408                    .context("send packed event")?;
409            }
410            Message::Enter { span } => {
411                let mut packed_attrs = Vec::new();
412
413                {
414                    // get stored span name
415                    let name = self.span_names.get(&span).map(|n| format!("enter: {}", n));
416
417                    if let Some(name) = name {
418                        packed_attrs.push((
419                            self.get_or_create_event_attr_key("event.name".to_string())
420                                .await?,
421                            AttrVal::String(name.into()),
422                        ));
423                    }
424                };
425
426                packed_attrs.push((
427                    self.get_or_create_event_attr_key("event.internal.rs.kind".to_string())
428                        .await?,
429                    AttrVal::String("span:enter".to_string().into()),
430                ));
431
432                packed_attrs.push((
433                    self.get_or_create_event_attr_key("event.internal.rs.span_id".to_string())
434                        .await?,
435                    BigInt::new_attr_val(u64::from(span).into()),
436                ));
437
438                // only record tick directly during the first ~5.8 centuries this program is running
439                if let Ok(tick) = TryInto::<u64>::try_into(tick.as_nanos()) {
440                    packed_attrs.push((
441                        self.get_or_create_event_attr_key("event.internal.rs.tick".to_string())
442                            .await?,
443                        AttrVal::LogicalTime(LogicalTime::unary(tick)),
444                    ));
445                }
446
447                self.client
448                    .event(tick.as_nanos(), packed_attrs)
449                    .await
450                    .context("send packed event")?;
451            }
452            Message::Exit { span } => {
453                let mut packed_attrs = Vec::new();
454
455                {
456                    // get stored span name
457                    let name = self.span_names.get(&span).map(|n| format!("exit: {}", n));
458
459                    if let Some(name) = name {
460                        packed_attrs.push((
461                            self.get_or_create_event_attr_key("event.name".to_string())
462                                .await?,
463                            AttrVal::String(name.into()),
464                        ));
465                    }
466                };
467
468                packed_attrs.push((
469                    self.get_or_create_event_attr_key("event.internal.rs.kind".to_string())
470                        .await?,
471                    AttrVal::String("span:exit".to_string().into()),
472                ));
473
474                packed_attrs.push((
475                    self.get_or_create_event_attr_key("event.internal.rs.span_id".to_string())
476                        .await?,
477                    BigInt::new_attr_val(u64::from(span).into()),
478                ));
479
480                // only record tick directly during the first ~5.8 centuries this program is running
481                if let Ok(tick) = TryInto::<u64>::try_into(tick.as_nanos()) {
482                    packed_attrs.push((
483                        self.get_or_create_event_attr_key("event.internal.rs.tick".to_string())
484                            .await?,
485                        AttrVal::LogicalTime(LogicalTime::unary(tick)),
486                    ));
487                }
488
489                self.client
490                    .event(tick.as_nanos(), packed_attrs)
491                    .await
492                    .context("send packed event")?;
493            }
494            Message::Close { span } => {
495                self.span_names.remove(&span);
496            }
497            Message::IdChange { old, new } => {
498                let name = self.span_names.get(&old).cloned();
499                if let Some(name) = name {
500                    self.span_names.insert(new, name);
501                }
502            }
503        }
504
505        Ok(())
506    }
507
508    async fn get_or_create_timeline_attr_key(
509        &mut self,
510        key: String,
511    ) -> Result<InternedAttrKey, IngestError> {
512        if let Some(id) = self.timeline_keys.get(&key) {
513            return Ok(*id);
514        }
515
516        let interned_key = self
517            .client
518            .declare_attr_key(key.clone())
519            .await
520            .context("define timeline attr key")?;
521
522        self.timeline_keys.insert(key, interned_key);
523
524        Ok(interned_key)
525    }
526
527    async fn get_or_create_event_attr_key(
528        &mut self,
529        key: String,
530    ) -> Result<InternedAttrKey, IngestError> {
531        let key = if key.starts_with("event.") {
532            key
533        } else {
534            format!("event.{key}")
535        };
536
537        if let Some(id) = self.event_keys.get(&key) {
538            return Ok(*id);
539        }
540
541        let interned_key = self
542            .client
543            .declare_attr_key(key.clone())
544            .await
545            .context("define event attr key")?;
546
547        self.event_keys.insert(key, interned_key);
548
549        Ok(interned_key)
550    }
551
552    async fn pack_common_attrs<'a>(
553        &mut self,
554        packed_attrs: &mut Vec<(InternedAttrKey, AttrVal)>,
555        metadata: &'a Metadata<'static>,
556        mut records: RecordMap,
557        tick: Duration,
558        maybe_nanos_since_unix_epoch: Option<Nanoseconds>,
559    ) -> Result<(), IngestError> {
560        let name = records
561            .remove("name")
562            .or_else(|| records.remove("message"))
563            .map(tracing_value_to_attr_val)
564            .unwrap_or_else(|| metadata.name().into());
565        packed_attrs.push((
566            self.get_or_create_event_attr_key("event.name".to_string())
567                .await?,
568            name,
569        ));
570
571        let severity = records
572            .remove("severity")
573            .map(tracing_value_to_attr_val)
574            .unwrap_or_else(|| format!("{}", metadata.level()).to_lowercase().into());
575        packed_attrs.push((
576            self.get_or_create_event_attr_key("event.severity".to_string())
577                .await?,
578            severity,
579        ));
580
581        let module_path = records
582            .remove("source.module")
583            .map(tracing_value_to_attr_val)
584            .or_else(|| metadata.module_path().map(|mp| mp.into()));
585        if let Some(module_path) = module_path {
586            packed_attrs.push((
587                self.get_or_create_event_attr_key("event.source.module".to_string())
588                    .await?,
589                module_path,
590            ));
591        }
592
593        let source_file = records
594            .remove("source.file")
595            .map(tracing_value_to_attr_val)
596            .or_else(|| metadata.file().map(|mp| mp.into()));
597        if let Some(source_file) = source_file {
598            packed_attrs.push((
599                self.get_or_create_event_attr_key("event.source.file".to_string())
600                    .await?,
601                source_file,
602            ));
603        }
604
605        let source_line = records
606            .remove("source.line")
607            .map(tracing_value_to_attr_val)
608            .or_else(|| metadata.line().map(|mp| (mp as i64).into()));
609        if let Some(source_line) = source_line {
610            packed_attrs.push((
611                self.get_or_create_event_attr_key("event.source.line".to_string())
612                    .await?,
613                source_line,
614            ));
615        }
616
617        // only record tick directly during the first ~5.8 centuries this program is running
618        if let Ok(tick) = TryInto::<u64>::try_into(tick.as_nanos()) {
619            packed_attrs.push((
620                self.get_or_create_event_attr_key("event.internal.rs.tick".to_string())
621                    .await?,
622                AttrVal::LogicalTime(LogicalTime::unary(tick)),
623            ));
624        }
625
626        // handle manually to type the AttrVal correctly
627        let remote_timeline_id = records
628            .remove("interaction.remote_timeline_id")
629            .map(tracing_value_to_attr_val);
630        if let Some(attrval) = remote_timeline_id {
631            let remote_timeline_id = if let AttrVal::String(string) = attrval {
632                use std::str::FromStr;
633                if let Ok(uuid) = Uuid::from_str(&string) {
634                    AttrVal::TimelineId(Box::new(uuid.into()))
635                } else {
636                    AttrVal::String(string)
637                }
638            } else {
639                attrval
640            };
641
642            packed_attrs.push((
643                self.get_or_create_event_attr_key("event.interaction.remote_timeline_id".into())
644                    .await?,
645                remote_timeline_id,
646            ));
647        }
648
649        // Manually retype the remote_timestamp
650        let remote_timestamp = records
651            .remove("interaction.remote_timestamp")
652            .map(tracing_value_to_attr_val);
653        if let Some(attrval) = remote_timestamp {
654            let remote_timestamp = match attrval {
655                AttrVal::Integer(i) if i >= 0 => AttrVal::Timestamp(Nanoseconds::from(i as u64)),
656                AttrVal::BigInt(i) if *i >= 0 && *i <= u64::MAX as i128 => {
657                    AttrVal::Timestamp(Nanoseconds::from(*i as u64))
658                }
659                AttrVal::Timestamp(t) => AttrVal::Timestamp(t),
660                x => x,
661            };
662
663            packed_attrs.push((
664                self.get_or_create_event_attr_key("event.interaction.remote_timestamp".into())
665                    .await?,
666                remote_timestamp,
667            ));
668        }
669
670        // Manually retype the local timestamp
671        let local_timestamp = records.remove("timestamp").map(tracing_value_to_attr_val);
672        if let Some(attrval) = local_timestamp {
673            let remote_timestamp = match attrval {
674                AttrVal::Integer(i) if i >= 0 => AttrVal::Timestamp(Nanoseconds::from(i as u64)),
675                AttrVal::BigInt(i) if *i >= 0 && *i <= u64::MAX as i128 => {
676                    AttrVal::Timestamp(Nanoseconds::from(*i as u64))
677                }
678                AttrVal::Timestamp(t) => AttrVal::Timestamp(t),
679                x => x,
680            };
681
682            packed_attrs.push((
683                self.get_or_create_event_attr_key("event.timestamp".into())
684                    .await?,
685                remote_timestamp,
686            ));
687        } else if let Some(nanos_since_unix_epoch) = maybe_nanos_since_unix_epoch {
688            packed_attrs.push((
689                self.get_or_create_event_attr_key("event.timestamp".into())
690                    .await?,
691                AttrVal::Timestamp(nanos_since_unix_epoch),
692            ));
693        }
694
695        // pack any remaining records
696        for (name, value) in records {
697            let attrval = tracing_value_to_attr_val(value);
698
699            let key = if name.starts_with("event.") {
700                name.to_string()
701            } else {
702                format!("event.{}", name.as_str())
703            };
704
705            packed_attrs.push((self.get_or_create_event_attr_key(key).await?, attrval));
706        }
707
708        Ok(())
709    }
710}
711
712fn tracing_value_to_attr_val(value: TracingValue) -> AttrVal {
713    match value {
714        TracingValue::String(s) => s.into(),
715        TracingValue::F64(n) => n.into(),
716        TracingValue::I64(n) => n.into(),
717        TracingValue::U64(n) => (n as i128).into(),
718        TracingValue::Bool(b) => b.into(),
719    }
720}