1pub use auxon_sdk::api::TimelineId;
2
3use crate::{
4 layer::{RecordMap, TracingValue},
5 Options,
6};
7use anyhow::Context;
8use auxon_sdk::{
9 api::{AttrVal, BigInt, LogicalTime, Nanoseconds, Uuid},
10 ingest_client::{BoundTimelineState, IngestClient, IngestError as SdkIngestError},
11 ingest_protocol::InternedAttrKey,
12};
13use once_cell::sync::Lazy;
14use std::{collections::HashMap, num::NonZeroU64, time::Duration};
15use thiserror::Error;
16use tokio::{
17 select,
18 sync::mpsc::{self, UnboundedReceiver, UnboundedSender},
19 sync::oneshot,
20};
21use tracing_core::Metadata;
22
23#[cfg(feature = "blocking")]
24use std::thread::{self, JoinHandle};
25#[cfg(feature = "blocking")]
26use tokio::runtime::Runtime;
27#[cfg(feature = "async")]
28use tokio::task;
29
30thread_local! {
31 static THREAD_TIMELINE_ID: Lazy<TimelineId> = Lazy::new(TimelineId::allocate);
32}
33
34#[derive(Debug, Error)]
35pub enum ConnectError {
36 #[error("Authentication required")]
38 AuthRequired,
39 #[error("Authenticating with the provided auth failed")]
41 AuthFailed(#[from] SdkIngestError),
42 #[error(transparent)]
45 UnexpectedFailure(#[from] anyhow::Error),
46}
47
48#[derive(Debug, Error)]
49pub enum IngestError {
50 #[error(transparent)]
53 UnexpectedFailure(#[from] anyhow::Error),
54}
55
56pub(crate) fn current_timeline() -> TimelineId {
57 THREAD_TIMELINE_ID.with(|id| **id)
58}
59
60pub(crate) type SpanId = NonZeroU64;
61
62#[derive(Debug)]
63pub(crate) struct WrappedMessage {
64 pub message: Message,
65 pub tick: Duration,
66 pub nanos_since_unix_epoch: Option<Nanoseconds>,
67 pub timeline: TimelineId,
68}
69
70#[derive(Debug)]
71pub(crate) enum Message {
72 NewTimeline {
73 name: String,
74 },
75 NewSpan {
76 id: SpanId,
77 metadata: &'static Metadata<'static>,
78 records: RecordMap,
79 },
80 Record {
81 span: SpanId,
82 records: RecordMap,
83 },
84 RecordFollowsFrom {
85 span: SpanId,
86 follows: SpanId,
87 },
88 Event {
89 metadata: &'static Metadata<'static>,
90 records: RecordMap,
91 },
92 Enter {
93 span: SpanId,
94 },
95 Exit {
96 span: SpanId,
97 },
98 Close {
99 span: SpanId,
100 },
101 IdChange {
102 old: SpanId,
103 new: SpanId,
104 },
105}
106
107pub trait ModalityIngestHandle {}
108
109#[cfg(feature = "blocking")]
110pub struct ModalityIngestThreadHandle {
112 pub(crate) ingest_sender: UnboundedSender<WrappedMessage>,
113 pub(crate) finish_sender: Option<oneshot::Sender<()>>,
114 pub(crate) thread: Option<JoinHandle<()>>,
115}
116
117#[cfg(feature = "blocking")]
118impl ModalityIngestHandle for ModalityIngestThreadHandle {}
119
120#[cfg(feature = "blocking")]
121impl ModalityIngestThreadHandle {
122 pub fn finish(mut self) {
134 if let Some(finish) = self.finish_sender.take() {
135 let _ = finish.send(());
136 }
137
138 if let Some(thread) = self.thread.take() {
139 let _ = thread.join();
140 }
141 }
142}
143
144#[cfg(feature = "async")]
145pub struct ModalityIngestTaskHandle {
147 pub(crate) ingest_sender: UnboundedSender<WrappedMessage>,
148 pub(crate) finish_sender: Option<oneshot::Sender<()>>,
149 pub(crate) task: Option<task::JoinHandle<()>>,
150}
151
152#[cfg(feature = "async")]
153impl ModalityIngestHandle for ModalityIngestTaskHandle {}
154
155#[cfg(feature = "async")]
156impl ModalityIngestTaskHandle {
157 pub async fn finish(mut self) {
162 if let Some(finish) = self.finish_sender.take() {
163 let _ = finish.send(());
164 }
165
166 if let Some(task) = self.task.take() {
167 let _ = task.await;
168 }
169 }
170}
171
172pub(crate) struct ModalityIngest {
173 client: IngestClient<BoundTimelineState>,
174 global_metadata: Vec<(String, AttrVal)>,
175 event_keys: HashMap<String, InternedAttrKey>,
176 timeline_keys: HashMap<String, InternedAttrKey>,
177 span_names: HashMap<NonZeroU64, String>,
178
179 #[cfg(feature = "blocking")]
180 rt: Option<Runtime>,
181}
182
183impl ModalityIngest {
184 #[cfg(feature = "blocking")]
185 pub(crate) fn connect(opts: Options) -> Result<Self, ConnectError> {
186 let rt = tokio::runtime::Builder::new_current_thread()
187 .enable_io()
188 .enable_time()
189 .build()
190 .expect("build intial tokio current thread runtime");
191
192 rt.block_on(async { Self::async_connect(opts).await })
193 .map(move |mut m| {
194 m.rt = Some(rt);
195 m
196 })
197 }
198
199 pub(crate) async fn async_connect(options: Options) -> Result<Self, ConnectError> {
200 let url = url::Url::parse(&format!("modality-ingest://{}/", options.server_addr)).unwrap();
201 let unauth_client = IngestClient::connect(&url, false)
202 .await
203 .context("init ingest client")?;
204
205 let auth_key = options.auth.ok_or(ConnectError::AuthRequired)?;
206 let client = unauth_client
207 .authenticate(auth_key)
208 .await
209 .map_err(ConnectError::AuthFailed)?;
210
211 let timeline_id = current_timeline();
214 let client = client
215 .open_timeline(timeline_id)
216 .await
217 .context("open new timeline")?;
218
219 Ok(Self {
220 client,
221 global_metadata: options.metadata,
222 event_keys: HashMap::new(),
223 timeline_keys: HashMap::new(),
224 span_names: HashMap::new(),
225 #[cfg(feature = "blocking")]
226 rt: None,
227 })
228 }
229
230 #[cfg(feature = "blocking")]
231 pub(crate) fn spawn_thread(mut self) -> ModalityIngestThreadHandle {
232 let (sender, recv) = mpsc::unbounded_channel();
233 let (finish_sender, finish_receiver) = oneshot::channel();
234
235 let join_handle = thread::spawn(move || {
236 let _dispatch_guard = tracing::dispatcher::set_default(&tracing::Dispatch::none());
238
239 let rt = self.rt.take().unwrap_or_else(|| {
240 tokio::runtime::Builder::new_current_thread()
241 .build()
242 .expect("build local tokio current thread runtime")
243 });
244
245 rt.block_on(self.handler_task(recv, finish_receiver))
246 });
247
248 ModalityIngestThreadHandle {
249 ingest_sender: sender,
250 finish_sender: Some(finish_sender),
251 thread: Some(join_handle),
252 }
253 }
254
255 #[cfg(feature = "async")]
256 pub(crate) async fn spawn_task(self) -> ModalityIngestTaskHandle {
257 let (ingest_sender, recv) = mpsc::unbounded_channel();
258 let (finish_sender, finish_receiver) = oneshot::channel();
259
260 let task = tokio::spawn(self.handler_task(recv, finish_receiver));
261
262 ModalityIngestTaskHandle {
263 ingest_sender,
264 finish_sender: Some(finish_sender),
265 task: Some(task),
266 }
267 }
268
269 async fn handler_task(
270 mut self,
271 mut recv: UnboundedReceiver<WrappedMessage>,
272 mut finish: oneshot::Receiver<()>,
273 ) {
274 loop {
275 select! {
276 Some(message) = recv.recv() => {
277 let _ = self.handle_packet(message).await;
278 },
279 _ = &mut finish => {
280 break
281 }
282 }
283 }
284
285 recv.close();
287 while let Some(message) = recv.recv().await {
288 let _ = self.handle_packet(message).await;
289 }
290 let _ = self.client.flush().await;
291 }
292
293 async fn handle_packet(&mut self, message: WrappedMessage) -> Result<(), IngestError> {
294 let WrappedMessage {
295 message,
296 tick,
297 nanos_since_unix_epoch,
298 timeline,
299 } = message;
300
301 if self.client.bound_timeline() != timeline {
302 self.client
303 .open_timeline(timeline)
304 .await
305 .context("open new timeline")?;
306 }
307
308 match message {
309 Message::NewTimeline { name } => {
310 let mut timeline_metadata = self.global_metadata.clone();
311
312 if !timeline_metadata.iter().any(|(k, _v)| k == "name") {
313 timeline_metadata.push(("timeline.name".to_string(), name.into()));
314 }
315
316 for (key, value) in timeline_metadata {
317 let timeline_key_name = self
318 .get_or_create_timeline_attr_key(key)
319 .await
320 .context("get or define timeline attr key")?;
321
322 self.client
323 .timeline_metadata([(timeline_key_name, value)])
324 .await
325 .context("apply timeline metadata")?;
326 }
327 }
328 Message::NewSpan {
329 id,
330 metadata,
331 mut records,
332 } => {
333 let name = {
334 let name = records
336 .get("name")
337 .or_else(|| records.get("message"))
338 .map(|n| format!("{:?}", n))
339 .unwrap_or_else(|| metadata.name().to_string());
340
341 self.span_names.insert(id, name.clone());
342
343 name
344 };
345
346 let mut packed_attrs = Vec::new();
347
348 packed_attrs.push((
349 self.get_or_create_event_attr_key("event.name".to_string())
350 .await?,
351 AttrVal::String(name.into()),
352 ));
353
354 let kind = records
355 .remove("modality.kind")
356 .map(tracing_value_to_attr_val)
357 .unwrap_or_else(|| "span:defined".into());
358 packed_attrs.push((
359 self.get_or_create_event_attr_key("event.internal.rs.kind".to_string())
360 .await?,
361 kind,
362 ));
363
364 let span_id = records
365 .remove("modality.span_id")
366 .map(tracing_value_to_attr_val)
367 .unwrap_or_else(|| BigInt::new_attr_val(u64::from(id) as i128));
368 packed_attrs.push((
369 self.get_or_create_event_attr_key("event.internal.rs.span_id".to_string())
370 .await?,
371 span_id,
372 ));
373
374 self.pack_common_attrs(
375 &mut packed_attrs,
376 metadata,
377 records,
378 tick,
379 nanos_since_unix_epoch,
380 )
381 .await?;
382
383 self.client
384 .event(tick.as_nanos(), packed_attrs)
385 .await
386 .context("send packed event")?;
387 }
388 Message::Record { span, records } => {
389 let _ = span;
393 let _ = records;
394 }
395 Message::RecordFollowsFrom { span, follows } => {
396 let _ = span;
400 let _ = follows;
401 }
402 Message::Event {
403 metadata,
404 mut records,
405 } => {
406 let mut packed_attrs = Vec::new();
407
408 let kind = records
409 .remove("modality.kind")
410 .map(tracing_value_to_attr_val)
411 .unwrap_or_else(|| "event".into());
412 packed_attrs.push((
413 self.get_or_create_event_attr_key("event.internal.rs.kind".to_string())
414 .await?,
415 kind,
416 ));
417
418 self.pack_common_attrs(
419 &mut packed_attrs,
420 metadata,
421 records,
422 tick,
423 nanos_since_unix_epoch,
424 )
425 .await?;
426
427 self.client
428 .event(tick.as_nanos(), packed_attrs)
429 .await
430 .context("send packed event")?;
431 }
432 Message::Enter { span } => {
433 let mut packed_attrs = Vec::new();
434
435 {
436 let name = self.span_names.get(&span).map(|n| format!("enter: {}", n));
438
439 if let Some(name) = name {
440 packed_attrs.push((
441 self.get_or_create_event_attr_key("event.name".to_string())
442 .await?,
443 AttrVal::String(name.into()),
444 ));
445 }
446 };
447
448 packed_attrs.push((
449 self.get_or_create_event_attr_key("event.internal.rs.kind".to_string())
450 .await?,
451 AttrVal::String("span:enter".to_string().into()),
452 ));
453
454 packed_attrs.push((
455 self.get_or_create_event_attr_key("event.internal.rs.span_id".to_string())
456 .await?,
457 BigInt::new_attr_val(u64::from(span).into()),
458 ));
459
460 if let Ok(tick) = TryInto::<u64>::try_into(tick.as_nanos()) {
462 packed_attrs.push((
463 self.get_or_create_event_attr_key("event.internal.rs.tick".to_string())
464 .await?,
465 AttrVal::LogicalTime(LogicalTime::unary(tick)),
466 ));
467 }
468
469 self.client
470 .event(tick.as_nanos(), packed_attrs)
471 .await
472 .context("send packed event")?;
473 }
474 Message::Exit { span } => {
475 let mut packed_attrs = Vec::new();
476
477 {
478 let name = self.span_names.get(&span).map(|n| format!("exit: {}", n));
480
481 if let Some(name) = name {
482 packed_attrs.push((
483 self.get_or_create_event_attr_key("event.name".to_string())
484 .await?,
485 AttrVal::String(name.into()),
486 ));
487 }
488 };
489
490 packed_attrs.push((
491 self.get_or_create_event_attr_key("event.internal.rs.kind".to_string())
492 .await?,
493 AttrVal::String("span:exit".to_string().into()),
494 ));
495
496 packed_attrs.push((
497 self.get_or_create_event_attr_key("event.internal.rs.span_id".to_string())
498 .await?,
499 BigInt::new_attr_val(u64::from(span).into()),
500 ));
501
502 if let Ok(tick) = TryInto::<u64>::try_into(tick.as_nanos()) {
504 packed_attrs.push((
505 self.get_or_create_event_attr_key("event.internal.rs.tick".to_string())
506 .await?,
507 AttrVal::LogicalTime(LogicalTime::unary(tick)),
508 ));
509 }
510
511 self.client
512 .event(tick.as_nanos(), packed_attrs)
513 .await
514 .context("send packed event")?;
515 }
516 Message::Close { span } => {
517 self.span_names.remove(&span);
518 }
519 Message::IdChange { old, new } => {
520 let name = self.span_names.get(&old).cloned();
521 if let Some(name) = name {
522 self.span_names.insert(new, name);
523 }
524 }
525 }
526
527 Ok(())
528 }
529
530 async fn get_or_create_timeline_attr_key(
531 &mut self,
532 key: String,
533 ) -> Result<InternedAttrKey, IngestError> {
534 if let Some(id) = self.timeline_keys.get(&key) {
535 return Ok(*id);
536 }
537
538 let interned_key = self
539 .client
540 .declare_attr_key(key.clone())
541 .await
542 .context("define timeline attr key")?;
543
544 self.timeline_keys.insert(key, interned_key);
545
546 Ok(interned_key)
547 }
548
549 async fn get_or_create_event_attr_key(
550 &mut self,
551 key: String,
552 ) -> Result<InternedAttrKey, IngestError> {
553 let key = if key.starts_with("event.") {
554 key
555 } else {
556 format!("event.{key}")
557 };
558
559 if let Some(id) = self.event_keys.get(&key) {
560 return Ok(*id);
561 }
562
563 let interned_key = self
564 .client
565 .declare_attr_key(key.clone())
566 .await
567 .context("define event attr key")?;
568
569 self.event_keys.insert(key, interned_key);
570
571 Ok(interned_key)
572 }
573
574 async fn pack_common_attrs<'a>(
575 &mut self,
576 packed_attrs: &mut Vec<(InternedAttrKey, AttrVal)>,
577 metadata: &'a Metadata<'static>,
578 mut records: RecordMap,
579 tick: Duration,
580 maybe_nanos_since_unix_epoch: Option<Nanoseconds>,
581 ) -> Result<(), IngestError> {
582 let name = records
583 .remove("name")
584 .or_else(|| records.remove("message"))
585 .map(tracing_value_to_attr_val)
586 .unwrap_or_else(|| metadata.name().into());
587 packed_attrs.push((
588 self.get_or_create_event_attr_key("event.name".to_string())
589 .await?,
590 name,
591 ));
592
593 let severity = records
594 .remove("severity")
595 .map(tracing_value_to_attr_val)
596 .unwrap_or_else(|| format!("{}", metadata.level()).to_lowercase().into());
597 packed_attrs.push((
598 self.get_or_create_event_attr_key("event.severity".to_string())
599 .await?,
600 severity,
601 ));
602
603 let module_path = records
604 .remove("source.module")
605 .map(tracing_value_to_attr_val)
606 .or_else(|| metadata.module_path().map(|mp| mp.into()));
607 if let Some(module_path) = module_path {
608 packed_attrs.push((
609 self.get_or_create_event_attr_key("event.source.module".to_string())
610 .await?,
611 module_path,
612 ));
613 }
614
615 let source_file = records
616 .remove("source.file")
617 .map(tracing_value_to_attr_val)
618 .or_else(|| metadata.file().map(|mp| mp.into()));
619 if let Some(source_file) = source_file {
620 packed_attrs.push((
621 self.get_or_create_event_attr_key("event.source.file".to_string())
622 .await?,
623 source_file,
624 ));
625 }
626
627 let source_line = records
628 .remove("source.line")
629 .map(tracing_value_to_attr_val)
630 .or_else(|| metadata.line().map(|mp| (mp as i64).into()));
631 if let Some(source_line) = source_line {
632 packed_attrs.push((
633 self.get_or_create_event_attr_key("event.source.line".to_string())
634 .await?,
635 source_line,
636 ));
637 }
638
639 if let Ok(tick) = TryInto::<u64>::try_into(tick.as_nanos()) {
641 packed_attrs.push((
642 self.get_or_create_event_attr_key("event.internal.rs.tick".to_string())
643 .await?,
644 AttrVal::LogicalTime(LogicalTime::unary(tick)),
645 ));
646 }
647
648 let remote_timeline_id = records
650 .remove("interaction.remote_timeline_id")
651 .map(tracing_value_to_attr_val);
652 if let Some(attrval) = remote_timeline_id {
653 let remote_timeline_id = if let AttrVal::String(string) = attrval {
654 use std::str::FromStr;
655 if let Ok(uuid) = Uuid::from_str(&string) {
656 AttrVal::TimelineId(Box::new(uuid.into()))
657 } else {
658 AttrVal::String(string)
659 }
660 } else {
661 attrval
662 };
663
664 packed_attrs.push((
665 self.get_or_create_event_attr_key("event.interaction.remote_timeline_id".into())
666 .await?,
667 remote_timeline_id,
668 ));
669 }
670
671 let remote_timestamp = records
673 .remove("interaction.remote_timestamp")
674 .map(tracing_value_to_attr_val);
675 if let Some(attrval) = remote_timestamp {
676 let remote_timestamp = match attrval {
677 AttrVal::Integer(i) if i >= 0 => AttrVal::Timestamp(Nanoseconds::from(i as u64)),
678 AttrVal::BigInt(i) if *i >= 0 && *i <= u64::MAX as i128 => {
679 AttrVal::Timestamp(Nanoseconds::from(*i as u64))
680 }
681 AttrVal::Timestamp(t) => AttrVal::Timestamp(t),
682 x => x,
683 };
684
685 packed_attrs.push((
686 self.get_or_create_event_attr_key("event.interaction.remote_timestamp".into())
687 .await?,
688 remote_timestamp,
689 ));
690 }
691
692 let local_timestamp = records.remove("timestamp").map(tracing_value_to_attr_val);
694 if let Some(attrval) = local_timestamp {
695 let remote_timestamp = match attrval {
696 AttrVal::Integer(i) if i >= 0 => AttrVal::Timestamp(Nanoseconds::from(i as u64)),
697 AttrVal::BigInt(i) if *i >= 0 && *i <= u64::MAX as i128 => {
698 AttrVal::Timestamp(Nanoseconds::from(*i as u64))
699 }
700 AttrVal::Timestamp(t) => AttrVal::Timestamp(t),
701 x => x,
702 };
703
704 packed_attrs.push((
705 self.get_or_create_event_attr_key("event.timestamp".into())
706 .await?,
707 remote_timestamp,
708 ));
709 } else if let Some(nanos_since_unix_epoch) = maybe_nanos_since_unix_epoch {
710 packed_attrs.push((
711 self.get_or_create_event_attr_key("event.timestamp".into())
712 .await?,
713 AttrVal::Timestamp(nanos_since_unix_epoch),
714 ));
715 }
716
717 for (name, value) in records {
719 let attrval = tracing_value_to_attr_val(value);
720
721 let key = if name.starts_with("event.") {
722 name.to_string()
723 } else {
724 format!("event.{}", name.as_str())
725 };
726
727 packed_attrs.push((self.get_or_create_event_attr_key(key).await?, attrval));
728 }
729
730 Ok(())
731 }
732}
733
734fn tracing_value_to_attr_val(value: TracingValue) -> AttrVal {
735 match value {
736 TracingValue::String(s) => s.into(),
737 TracingValue::F64(n) => n.into(),
738 TracingValue::I64(n) => n.into(),
739 TracingValue::U64(n) => (n as i128).into(),
740 TracingValue::Bool(b) => b.into(),
741 }
742}