tracing_modality/common/
ingest.rs

1pub use auxon_sdk::api::TimelineId;
2
3use crate::{
4    layer::{RecordMap, TracingValue},
5    Options,
6};
7use anyhow::Context;
8use auxon_sdk::{
9    api::{AttrVal, BigInt, LogicalTime, Nanoseconds, Uuid},
10    ingest_client::{BoundTimelineState, IngestClient, IngestError as SdkIngestError},
11    ingest_protocol::InternedAttrKey,
12};
13use once_cell::sync::Lazy;
14use std::{collections::HashMap, num::NonZeroU64, time::Duration};
15use thiserror::Error;
16use tokio::{
17    select,
18    sync::mpsc::{self, UnboundedReceiver, UnboundedSender},
19    sync::oneshot,
20};
21use tracing_core::Metadata;
22
23#[cfg(feature = "blocking")]
24use std::thread::{self, JoinHandle};
25#[cfg(feature = "blocking")]
26use tokio::runtime::Runtime;
27#[cfg(feature = "async")]
28use tokio::task;
29
30thread_local! {
31    static THREAD_TIMELINE_ID: Lazy<TimelineId> = Lazy::new(TimelineId::allocate);
32}
33
34#[derive(Debug, Error)]
35pub enum ConnectError {
36    /// No auth was provided
37    #[error("Authentication required")]
38    AuthRequired,
39    /// Auth was provided, but was not accepted by modality
40    #[error("Authenticating with the provided auth failed")]
41    AuthFailed(#[from] SdkIngestError),
42    /// Errors that it is assumed there is no way to handle without human intervention, meant for
43    /// consumers to just print and carry on or panic.
44    #[error(transparent)]
45    UnexpectedFailure(#[from] anyhow::Error),
46}
47
48#[derive(Debug, Error)]
49pub enum IngestError {
50    /// Errors that it is assumed there is no way to handle without human intervention, meant for
51    /// consumers to just print and carry on or panic.
52    #[error(transparent)]
53    UnexpectedFailure(#[from] anyhow::Error),
54}
55
56pub(crate) fn current_timeline() -> TimelineId {
57    THREAD_TIMELINE_ID.with(|id| **id)
58}
59
60pub(crate) type SpanId = NonZeroU64;
61
62#[derive(Debug)]
63pub(crate) struct WrappedMessage {
64    pub message: Message,
65    pub tick: Duration,
66    pub nanos_since_unix_epoch: Option<Nanoseconds>,
67    pub timeline: TimelineId,
68}
69
70#[derive(Debug)]
71pub(crate) enum Message {
72    NewTimeline {
73        name: String,
74    },
75    NewSpan {
76        id: SpanId,
77        metadata: &'static Metadata<'static>,
78        records: RecordMap,
79    },
80    Record {
81        span: SpanId,
82        records: RecordMap,
83    },
84    RecordFollowsFrom {
85        span: SpanId,
86        follows: SpanId,
87    },
88    Event {
89        metadata: &'static Metadata<'static>,
90        records: RecordMap,
91    },
92    Enter {
93        span: SpanId,
94    },
95    Exit {
96        span: SpanId,
97    },
98    Close {
99        span: SpanId,
100    },
101    IdChange {
102        old: SpanId,
103        new: SpanId,
104    },
105}
106
107pub trait ModalityIngestHandle {}
108
109#[cfg(feature = "blocking")]
110/// A handle to control the spawned ingest thread.
111pub struct ModalityIngestThreadHandle {
112    pub(crate) ingest_sender: UnboundedSender<WrappedMessage>,
113    pub(crate) finish_sender: Option<oneshot::Sender<()>>,
114    pub(crate) thread: Option<JoinHandle<()>>,
115}
116
117#[cfg(feature = "blocking")]
118impl ModalityIngestHandle for ModalityIngestThreadHandle {}
119
120#[cfg(feature = "blocking")]
121impl ModalityIngestThreadHandle {
122    /// Stop accepting new trace events, flush all existing events, and stop ingest thread.
123    ///
124    /// This function must be called at the end of your main thread to give the ingest thread a
125    /// chance to flush all queued trace events out to modality.
126    ///
127    /// # Panics
128    ///
129    /// This function uses [`std::thread::JoinHandle::join`] which may panic on some platforms if a
130    /// thread attempts to join itself or otherwise may create a deadlock with joining threads.
131    /// This case should be incredibly unlikely, if not impossible, but can not be statically
132    /// guarenteed.
133    pub fn finish(mut self) {
134        if let Some(finish) = self.finish_sender.take() {
135            let _ = finish.send(());
136        }
137
138        if let Some(thread) = self.thread.take() {
139            let _ = thread.join();
140        }
141    }
142}
143
144#[cfg(feature = "async")]
145/// A handle to control the spawned ingest task.
146pub struct ModalityIngestTaskHandle {
147    pub(crate) ingest_sender: UnboundedSender<WrappedMessage>,
148    pub(crate) finish_sender: Option<oneshot::Sender<()>>,
149    pub(crate) task: Option<task::JoinHandle<()>>,
150}
151
152#[cfg(feature = "async")]
153impl ModalityIngestHandle for ModalityIngestTaskHandle {}
154
155#[cfg(feature = "async")]
156impl ModalityIngestTaskHandle {
157    /// Stop accepting new trace events, flush all existing events, and stop ingest thread.
158    ///
159    /// This function must be called at the end of your main thread to give the ingest thread a
160    /// chance to flush all queued trace events out to modality.
161    pub async fn finish(mut self) {
162        if let Some(finish) = self.finish_sender.take() {
163            let _ = finish.send(());
164        }
165
166        if let Some(task) = self.task.take() {
167            let _ = task.await;
168        }
169    }
170}
171
172pub(crate) struct ModalityIngest {
173    client: IngestClient<BoundTimelineState>,
174    global_metadata: Vec<(String, AttrVal)>,
175    event_keys: HashMap<String, InternedAttrKey>,
176    timeline_keys: HashMap<String, InternedAttrKey>,
177    span_names: HashMap<NonZeroU64, String>,
178
179    #[cfg(feature = "blocking")]
180    rt: Option<Runtime>,
181}
182
183impl ModalityIngest {
184    #[cfg(feature = "blocking")]
185    pub(crate) fn connect(opts: Options) -> Result<Self, ConnectError> {
186        let rt = tokio::runtime::Builder::new_current_thread()
187            .enable_io()
188            .enable_time()
189            .build()
190            .expect("build intial tokio current thread runtime");
191
192        rt.block_on(async { Self::async_connect(opts).await })
193            .map(move |mut m| {
194                m.rt = Some(rt);
195                m
196            })
197    }
198
199    pub(crate) async fn async_connect(options: Options) -> Result<Self, ConnectError> {
200        let url = url::Url::parse(&format!("modality-ingest://{}/", options.server_addr)).unwrap();
201        let unauth_client = IngestClient::connect(&url, false)
202            .await
203            .context("init ingest client")?;
204
205        let auth_key = options.auth.ok_or(ConnectError::AuthRequired)?;
206        let client = unauth_client
207            .authenticate(auth_key)
208            .await
209            .map_err(ConnectError::AuthFailed)?;
210
211        // open a timeline for the current thread because we need to open something to make the
212        // types work
213        let timeline_id = current_timeline();
214        let client = client
215            .open_timeline(timeline_id)
216            .await
217            .context("open new timeline")?;
218
219        Ok(Self {
220            client,
221            global_metadata: options.metadata,
222            event_keys: HashMap::new(),
223            timeline_keys: HashMap::new(),
224            span_names: HashMap::new(),
225            #[cfg(feature = "blocking")]
226            rt: None,
227        })
228    }
229
230    #[cfg(feature = "blocking")]
231    pub(crate) fn spawn_thread(mut self) -> ModalityIngestThreadHandle {
232        let (sender, recv) = mpsc::unbounded_channel();
233        let (finish_sender, finish_receiver) = oneshot::channel();
234
235        let join_handle = thread::spawn(move || {
236            // ensure this thread doesn't send trace events to the global dispatcher
237            let _dispatch_guard = tracing::dispatcher::set_default(&tracing::Dispatch::none());
238
239            let rt = self.rt.take().unwrap_or_else(|| {
240                tokio::runtime::Builder::new_current_thread()
241                    .build()
242                    .expect("build local tokio current thread runtime")
243            });
244
245            rt.block_on(self.handler_task(recv, finish_receiver))
246        });
247
248        ModalityIngestThreadHandle {
249            ingest_sender: sender,
250            finish_sender: Some(finish_sender),
251            thread: Some(join_handle),
252        }
253    }
254
255    #[cfg(feature = "async")]
256    pub(crate) async fn spawn_task(self) -> ModalityIngestTaskHandle {
257        let (ingest_sender, recv) = mpsc::unbounded_channel();
258        let (finish_sender, finish_receiver) = oneshot::channel();
259
260        let task = tokio::spawn(self.handler_task(recv, finish_receiver));
261
262        ModalityIngestTaskHandle {
263            ingest_sender,
264            finish_sender: Some(finish_sender),
265            task: Some(task),
266        }
267    }
268
269    async fn handler_task(
270        mut self,
271        mut recv: UnboundedReceiver<WrappedMessage>,
272        mut finish: oneshot::Receiver<()>,
273    ) {
274        loop {
275            select! {
276                Some(message) = recv.recv() => {
277                    let _ = self.handle_packet(message).await;
278                },
279                _ = &mut finish => {
280                    break
281                }
282            }
283        }
284
285        // close channel and drain existing messages
286        recv.close();
287        while let Some(message) = recv.recv().await {
288            let _ = self.handle_packet(message).await;
289        }
290        let _ = self.client.flush().await;
291    }
292
293    async fn handle_packet(&mut self, message: WrappedMessage) -> Result<(), IngestError> {
294        let WrappedMessage {
295            message,
296            tick,
297            nanos_since_unix_epoch,
298            timeline,
299        } = message;
300
301        if self.client.bound_timeline() != timeline {
302            self.client
303                .open_timeline(timeline)
304                .await
305                .context("open new timeline")?;
306        }
307
308        match message {
309            Message::NewTimeline { name } => {
310                let mut timeline_metadata = self.global_metadata.clone();
311
312                if !timeline_metadata.iter().any(|(k, _v)| k == "name") {
313                    timeline_metadata.push(("timeline.name".to_string(), name.into()));
314                }
315
316                for (key, value) in timeline_metadata {
317                    let timeline_key_name = self
318                        .get_or_create_timeline_attr_key(key)
319                        .await
320                        .context("get or define timeline attr key")?;
321
322                    self.client
323                        .timeline_metadata([(timeline_key_name, value)])
324                        .await
325                        .context("apply timeline metadata")?;
326                }
327            }
328            Message::NewSpan {
329                id,
330                metadata,
331                mut records,
332            } => {
333                let name = {
334                    // store name for future use
335                    let name = records
336                        .get("name")
337                        .or_else(|| records.get("message"))
338                        .map(|n| format!("{:?}", n))
339                        .unwrap_or_else(|| metadata.name().to_string());
340
341                    self.span_names.insert(id, name.clone());
342
343                    name
344                };
345
346                let mut packed_attrs = Vec::new();
347
348                packed_attrs.push((
349                    self.get_or_create_event_attr_key("event.name".to_string())
350                        .await?,
351                    AttrVal::String(name.into()),
352                ));
353
354                let kind = records
355                    .remove("modality.kind")
356                    .map(tracing_value_to_attr_val)
357                    .unwrap_or_else(|| "span:defined".into());
358                packed_attrs.push((
359                    self.get_or_create_event_attr_key("event.internal.rs.kind".to_string())
360                        .await?,
361                    kind,
362                ));
363
364                let span_id = records
365                    .remove("modality.span_id")
366                    .map(tracing_value_to_attr_val)
367                    .unwrap_or_else(|| BigInt::new_attr_val(u64::from(id) as i128));
368                packed_attrs.push((
369                    self.get_or_create_event_attr_key("event.internal.rs.span_id".to_string())
370                        .await?,
371                    span_id,
372                ));
373
374                self.pack_common_attrs(
375                    &mut packed_attrs,
376                    metadata,
377                    records,
378                    tick,
379                    nanos_since_unix_epoch,
380                )
381                .await?;
382
383                self.client
384                    .event(tick.as_nanos(), packed_attrs)
385                    .await
386                    .context("send packed event")?;
387            }
388            Message::Record { span, records } => {
389                // TODO: span events can't be added to after being sent, impl this once we can use
390                // timelines to represent spans
391
392                let _ = span;
393                let _ = records;
394            }
395            Message::RecordFollowsFrom { span, follows } => {
396                // TODO: span events can't be added to after being sent, impl this once we can use
397                // timelines to represent spans
398
399                let _ = span;
400                let _ = follows;
401            }
402            Message::Event {
403                metadata,
404                mut records,
405            } => {
406                let mut packed_attrs = Vec::new();
407
408                let kind = records
409                    .remove("modality.kind")
410                    .map(tracing_value_to_attr_val)
411                    .unwrap_or_else(|| "event".into());
412                packed_attrs.push((
413                    self.get_or_create_event_attr_key("event.internal.rs.kind".to_string())
414                        .await?,
415                    kind,
416                ));
417
418                self.pack_common_attrs(
419                    &mut packed_attrs,
420                    metadata,
421                    records,
422                    tick,
423                    nanos_since_unix_epoch,
424                )
425                .await?;
426
427                self.client
428                    .event(tick.as_nanos(), packed_attrs)
429                    .await
430                    .context("send packed event")?;
431            }
432            Message::Enter { span } => {
433                let mut packed_attrs = Vec::new();
434
435                {
436                    // get stored span name
437                    let name = self.span_names.get(&span).map(|n| format!("enter: {}", n));
438
439                    if let Some(name) = name {
440                        packed_attrs.push((
441                            self.get_or_create_event_attr_key("event.name".to_string())
442                                .await?,
443                            AttrVal::String(name.into()),
444                        ));
445                    }
446                };
447
448                packed_attrs.push((
449                    self.get_or_create_event_attr_key("event.internal.rs.kind".to_string())
450                        .await?,
451                    AttrVal::String("span:enter".to_string().into()),
452                ));
453
454                packed_attrs.push((
455                    self.get_or_create_event_attr_key("event.internal.rs.span_id".to_string())
456                        .await?,
457                    BigInt::new_attr_val(u64::from(span).into()),
458                ));
459
460                // only record tick directly during the first ~5.8 centuries this program is running
461                if let Ok(tick) = TryInto::<u64>::try_into(tick.as_nanos()) {
462                    packed_attrs.push((
463                        self.get_or_create_event_attr_key("event.internal.rs.tick".to_string())
464                            .await?,
465                        AttrVal::LogicalTime(LogicalTime::unary(tick)),
466                    ));
467                }
468
469                self.client
470                    .event(tick.as_nanos(), packed_attrs)
471                    .await
472                    .context("send packed event")?;
473            }
474            Message::Exit { span } => {
475                let mut packed_attrs = Vec::new();
476
477                {
478                    // get stored span name
479                    let name = self.span_names.get(&span).map(|n| format!("exit: {}", n));
480
481                    if let Some(name) = name {
482                        packed_attrs.push((
483                            self.get_or_create_event_attr_key("event.name".to_string())
484                                .await?,
485                            AttrVal::String(name.into()),
486                        ));
487                    }
488                };
489
490                packed_attrs.push((
491                    self.get_or_create_event_attr_key("event.internal.rs.kind".to_string())
492                        .await?,
493                    AttrVal::String("span:exit".to_string().into()),
494                ));
495
496                packed_attrs.push((
497                    self.get_or_create_event_attr_key("event.internal.rs.span_id".to_string())
498                        .await?,
499                    BigInt::new_attr_val(u64::from(span).into()),
500                ));
501
502                // only record tick directly during the first ~5.8 centuries this program is running
503                if let Ok(tick) = TryInto::<u64>::try_into(tick.as_nanos()) {
504                    packed_attrs.push((
505                        self.get_or_create_event_attr_key("event.internal.rs.tick".to_string())
506                            .await?,
507                        AttrVal::LogicalTime(LogicalTime::unary(tick)),
508                    ));
509                }
510
511                self.client
512                    .event(tick.as_nanos(), packed_attrs)
513                    .await
514                    .context("send packed event")?;
515            }
516            Message::Close { span } => {
517                self.span_names.remove(&span);
518            }
519            Message::IdChange { old, new } => {
520                let name = self.span_names.get(&old).cloned();
521                if let Some(name) = name {
522                    self.span_names.insert(new, name);
523                }
524            }
525        }
526
527        Ok(())
528    }
529
530    async fn get_or_create_timeline_attr_key(
531        &mut self,
532        key: String,
533    ) -> Result<InternedAttrKey, IngestError> {
534        if let Some(id) = self.timeline_keys.get(&key) {
535            return Ok(*id);
536        }
537
538        let interned_key = self
539            .client
540            .declare_attr_key(key.clone())
541            .await
542            .context("define timeline attr key")?;
543
544        self.timeline_keys.insert(key, interned_key);
545
546        Ok(interned_key)
547    }
548
549    async fn get_or_create_event_attr_key(
550        &mut self,
551        key: String,
552    ) -> Result<InternedAttrKey, IngestError> {
553        let key = if key.starts_with("event.") {
554            key
555        } else {
556            format!("event.{key}")
557        };
558
559        if let Some(id) = self.event_keys.get(&key) {
560            return Ok(*id);
561        }
562
563        let interned_key = self
564            .client
565            .declare_attr_key(key.clone())
566            .await
567            .context("define event attr key")?;
568
569        self.event_keys.insert(key, interned_key);
570
571        Ok(interned_key)
572    }
573
574    async fn pack_common_attrs<'a>(
575        &mut self,
576        packed_attrs: &mut Vec<(InternedAttrKey, AttrVal)>,
577        metadata: &'a Metadata<'static>,
578        mut records: RecordMap,
579        tick: Duration,
580        maybe_nanos_since_unix_epoch: Option<Nanoseconds>,
581    ) -> Result<(), IngestError> {
582        let name = records
583            .remove("name")
584            .or_else(|| records.remove("message"))
585            .map(tracing_value_to_attr_val)
586            .unwrap_or_else(|| metadata.name().into());
587        packed_attrs.push((
588            self.get_or_create_event_attr_key("event.name".to_string())
589                .await?,
590            name,
591        ));
592
593        let severity = records
594            .remove("severity")
595            .map(tracing_value_to_attr_val)
596            .unwrap_or_else(|| format!("{}", metadata.level()).to_lowercase().into());
597        packed_attrs.push((
598            self.get_or_create_event_attr_key("event.severity".to_string())
599                .await?,
600            severity,
601        ));
602
603        let module_path = records
604            .remove("source.module")
605            .map(tracing_value_to_attr_val)
606            .or_else(|| metadata.module_path().map(|mp| mp.into()));
607        if let Some(module_path) = module_path {
608            packed_attrs.push((
609                self.get_or_create_event_attr_key("event.source.module".to_string())
610                    .await?,
611                module_path,
612            ));
613        }
614
615        let source_file = records
616            .remove("source.file")
617            .map(tracing_value_to_attr_val)
618            .or_else(|| metadata.file().map(|mp| mp.into()));
619        if let Some(source_file) = source_file {
620            packed_attrs.push((
621                self.get_or_create_event_attr_key("event.source.file".to_string())
622                    .await?,
623                source_file,
624            ));
625        }
626
627        let source_line = records
628            .remove("source.line")
629            .map(tracing_value_to_attr_val)
630            .or_else(|| metadata.line().map(|mp| (mp as i64).into()));
631        if let Some(source_line) = source_line {
632            packed_attrs.push((
633                self.get_or_create_event_attr_key("event.source.line".to_string())
634                    .await?,
635                source_line,
636            ));
637        }
638
639        // only record tick directly during the first ~5.8 centuries this program is running
640        if let Ok(tick) = TryInto::<u64>::try_into(tick.as_nanos()) {
641            packed_attrs.push((
642                self.get_or_create_event_attr_key("event.internal.rs.tick".to_string())
643                    .await?,
644                AttrVal::LogicalTime(LogicalTime::unary(tick)),
645            ));
646        }
647
648        // handle manually to type the AttrVal correctly
649        let remote_timeline_id = records
650            .remove("interaction.remote_timeline_id")
651            .map(tracing_value_to_attr_val);
652        if let Some(attrval) = remote_timeline_id {
653            let remote_timeline_id = if let AttrVal::String(string) = attrval {
654                use std::str::FromStr;
655                if let Ok(uuid) = Uuid::from_str(&string) {
656                    AttrVal::TimelineId(Box::new(uuid.into()))
657                } else {
658                    AttrVal::String(string)
659                }
660            } else {
661                attrval
662            };
663
664            packed_attrs.push((
665                self.get_or_create_event_attr_key("event.interaction.remote_timeline_id".into())
666                    .await?,
667                remote_timeline_id,
668            ));
669        }
670
671        // Manually retype the remote_timestamp
672        let remote_timestamp = records
673            .remove("interaction.remote_timestamp")
674            .map(tracing_value_to_attr_val);
675        if let Some(attrval) = remote_timestamp {
676            let remote_timestamp = match attrval {
677                AttrVal::Integer(i) if i >= 0 => AttrVal::Timestamp(Nanoseconds::from(i as u64)),
678                AttrVal::BigInt(i) if *i >= 0 && *i <= u64::MAX as i128 => {
679                    AttrVal::Timestamp(Nanoseconds::from(*i as u64))
680                }
681                AttrVal::Timestamp(t) => AttrVal::Timestamp(t),
682                x => x,
683            };
684
685            packed_attrs.push((
686                self.get_or_create_event_attr_key("event.interaction.remote_timestamp".into())
687                    .await?,
688                remote_timestamp,
689            ));
690        }
691
692        // Manually retype the local timestamp
693        let local_timestamp = records.remove("timestamp").map(tracing_value_to_attr_val);
694        if let Some(attrval) = local_timestamp {
695            let remote_timestamp = match attrval {
696                AttrVal::Integer(i) if i >= 0 => AttrVal::Timestamp(Nanoseconds::from(i as u64)),
697                AttrVal::BigInt(i) if *i >= 0 && *i <= u64::MAX as i128 => {
698                    AttrVal::Timestamp(Nanoseconds::from(*i as u64))
699                }
700                AttrVal::Timestamp(t) => AttrVal::Timestamp(t),
701                x => x,
702            };
703
704            packed_attrs.push((
705                self.get_or_create_event_attr_key("event.timestamp".into())
706                    .await?,
707                remote_timestamp,
708            ));
709        } else if let Some(nanos_since_unix_epoch) = maybe_nanos_since_unix_epoch {
710            packed_attrs.push((
711                self.get_or_create_event_attr_key("event.timestamp".into())
712                    .await?,
713                AttrVal::Timestamp(nanos_since_unix_epoch),
714            ));
715        }
716
717        // pack any remaining records
718        for (name, value) in records {
719            let attrval = tracing_value_to_attr_val(value);
720
721            let key = if name.starts_with("event.") {
722                name.to_string()
723            } else {
724                format!("event.{}", name.as_str())
725            };
726
727            packed_attrs.push((self.get_or_create_event_attr_key(key).await?, attrval));
728        }
729
730        Ok(())
731    }
732}
733
734fn tracing_value_to_attr_val(value: TracingValue) -> AttrVal {
735    match value {
736        TracingValue::String(s) => s.into(),
737        TracingValue::F64(n) => n.into(),
738        TracingValue::I64(n) => n.into(),
739        TracingValue::U64(n) => (n as i128).into(),
740        TracingValue::Bool(b) => b.into(),
741    }
742}