1use crate::{
2 api::{AttrVal, BigInt, LogicalTime, Nanoseconds, TimelineId, Uuid},
3 ingest_client::{BoundTimelineState, IngestClient, IngestError as SdkIngestError},
4 ingest_protocol::InternedAttrKey,
5 tracing::{
6 layer::{RecordMap, TracingValue},
7 Options,
8 },
9};
10use anyhow::Context;
11use once_cell::sync::Lazy;
12use std::{collections::HashMap, num::NonZeroU64, time::Duration};
13use thiserror::Error;
14use tokio::{
15 select,
16 sync::mpsc::{self, UnboundedReceiver, UnboundedSender},
17 sync::oneshot,
18};
19use tracing_core::Metadata;
20
21use std::thread::{self, JoinHandle};
22use tokio::runtime::Runtime;
23use tokio::task;
24
25thread_local! {
26 static THREAD_TIMELINE_ID: Lazy<TimelineId> = Lazy::new(TimelineId::allocate);
27}
28
29#[derive(Debug, Error)]
30pub enum ConnectError {
31 #[error("Authentication required")]
33 AuthRequired,
34 #[error("Authenticating with the provided auth failed")]
36 AuthFailed(#[from] SdkIngestError),
37 #[error(transparent)]
40 UnexpectedFailure(#[from] anyhow::Error),
41}
42
43#[derive(Debug, Error)]
44pub enum IngestError {
45 #[error(transparent)]
48 UnexpectedFailure(#[from] anyhow::Error),
49}
50
51pub(crate) fn current_timeline() -> TimelineId {
52 THREAD_TIMELINE_ID.with(|id| **id)
53}
54
55pub(crate) type SpanId = NonZeroU64;
56
57#[derive(Debug)]
58pub(crate) struct WrappedMessage {
59 pub message: Message,
60 pub tick: Duration,
61 pub nanos_since_unix_epoch: Option<Nanoseconds>,
62 pub timeline: TimelineId,
63}
64
65#[derive(Debug)]
66pub(crate) enum Message {
67 NewTimeline {
68 name: String,
69 },
70 NewSpan {
71 id: SpanId,
72 metadata: &'static Metadata<'static>,
73 records: RecordMap,
74 },
75 Record {
76 span: SpanId,
77 records: RecordMap,
78 },
79 RecordFollowsFrom {
80 span: SpanId,
81 follows: SpanId,
82 },
83 Event {
84 metadata: &'static Metadata<'static>,
85 records: RecordMap,
86 },
87 Enter {
88 span: SpanId,
89 },
90 Exit {
91 span: SpanId,
92 },
93 Close {
94 span: SpanId,
95 },
96 IdChange {
97 old: SpanId,
98 new: SpanId,
99 },
100}
101
102pub struct ModalityIngestThreadHandle {
104 pub(crate) ingest_sender: UnboundedSender<WrappedMessage>,
105 pub(crate) finish_sender: Option<oneshot::Sender<()>>,
106 pub(crate) thread: Option<JoinHandle<()>>,
107}
108
109impl ModalityIngestThreadHandle {
110 pub fn finish(mut self) {
122 if let Some(finish) = self.finish_sender.take() {
123 let _ = finish.send(());
124 }
125
126 if let Some(thread) = self.thread.take() {
127 let _ = thread.join();
128 }
129 }
130}
131
132pub struct ModalityIngestTaskHandle {
134 pub(crate) ingest_sender: UnboundedSender<WrappedMessage>,
135 pub(crate) finish_sender: Option<oneshot::Sender<()>>,
136 pub(crate) task: Option<task::JoinHandle<()>>,
137}
138
139impl ModalityIngestTaskHandle {
140 pub async fn finish(mut self) {
145 if let Some(finish) = self.finish_sender.take() {
146 let _ = finish.send(());
147 }
148
149 if let Some(task) = self.task.take() {
150 let _ = task.await;
151 }
152 }
153}
154
155pub(crate) struct ModalityIngest {
156 client: IngestClient<BoundTimelineState>,
157 global_metadata: Vec<(String, AttrVal)>,
158 event_keys: HashMap<String, InternedAttrKey>,
159 timeline_keys: HashMap<String, InternedAttrKey>,
160 span_names: HashMap<NonZeroU64, String>,
161
162 rt: Option<Runtime>,
163}
164
165impl ModalityIngest {
166 pub(crate) fn connect(opts: Options) -> Result<Self, ConnectError> {
167 let rt = tokio::runtime::Builder::new_current_thread()
168 .enable_io()
169 .enable_time()
170 .build()
171 .expect("build intial tokio current thread runtime");
172
173 rt.block_on(async { Self::async_connect(opts).await })
174 .map(move |mut m| {
175 m.rt = Some(rt);
176 m
177 })
178 }
179
180 pub(crate) async fn async_connect(options: Options) -> Result<Self, ConnectError> {
181 let url = url::Url::parse(&format!("modality-ingest://{}/", options.server_addr)).unwrap();
182 let unauth_client = IngestClient::connect(&url, false)
183 .await
184 .context("init ingest client")?;
185
186 let auth_key = options.auth.ok_or(ConnectError::AuthRequired)?;
187 let client = unauth_client
188 .authenticate(auth_key)
189 .await
190 .map_err(ConnectError::AuthFailed)?;
191
192 let timeline_id = current_timeline();
195 let client = client
196 .open_timeline(timeline_id)
197 .await
198 .context("open new timeline")?;
199
200 Ok(Self {
201 client,
202 global_metadata: options.metadata,
203 event_keys: HashMap::new(),
204 timeline_keys: HashMap::new(),
205 span_names: HashMap::new(),
206 rt: None,
207 })
208 }
209
210 pub(crate) fn spawn_thread(mut self) -> ModalityIngestThreadHandle {
211 let (sender, recv) = mpsc::unbounded_channel();
212 let (finish_sender, finish_receiver) = oneshot::channel();
213
214 let join_handle = thread::spawn(move || {
215 let _dispatch_guard = tracing::dispatcher::set_default(&tracing::Dispatch::none());
217
218 let rt = self.rt.take().unwrap_or_else(|| {
219 tokio::runtime::Builder::new_current_thread()
220 .build()
221 .expect("build local tokio current thread runtime")
222 });
223
224 rt.block_on(self.handler_task(recv, finish_receiver))
225 });
226
227 ModalityIngestThreadHandle {
228 ingest_sender: sender,
229 finish_sender: Some(finish_sender),
230 thread: Some(join_handle),
231 }
232 }
233
234 pub(crate) async fn spawn_task(self) -> ModalityIngestTaskHandle {
235 let (ingest_sender, recv) = mpsc::unbounded_channel();
236 let (finish_sender, finish_receiver) = oneshot::channel();
237
238 let task = tokio::spawn(self.handler_task(recv, finish_receiver));
239
240 ModalityIngestTaskHandle {
241 ingest_sender,
242 finish_sender: Some(finish_sender),
243 task: Some(task),
244 }
245 }
246
247 async fn handler_task(
248 mut self,
249 mut recv: UnboundedReceiver<WrappedMessage>,
250 mut finish: oneshot::Receiver<()>,
251 ) {
252 loop {
253 select! {
254 Some(message) = recv.recv() => {
255 let _ = self.handle_packet(message).await;
256 },
257 _ = &mut finish => {
258 break
259 }
260 }
261 }
262
263 recv.close();
265 while let Some(message) = recv.recv().await {
266 let _ = self.handle_packet(message).await;
267 }
268 let _ = self.client.flush().await;
269 }
270
271 async fn handle_packet(&mut self, message: WrappedMessage) -> Result<(), IngestError> {
272 let WrappedMessage {
273 message,
274 tick,
275 nanos_since_unix_epoch,
276 timeline,
277 } = message;
278
279 if self.client.bound_timeline() != timeline {
280 self.client
281 .open_timeline(timeline)
282 .await
283 .context("open new timeline")?;
284 }
285
286 match message {
287 Message::NewTimeline { name } => {
288 let mut timeline_metadata = self.global_metadata.clone();
289
290 if !timeline_metadata.iter().any(|(k, _v)| k == "name") {
291 timeline_metadata.push(("timeline.name".to_string(), name.into()));
292 }
293
294 for (key, value) in timeline_metadata {
295 let timeline_key_name = self
296 .get_or_create_timeline_attr_key(key)
297 .await
298 .context("get or define timeline attr key")?;
299
300 self.client
301 .timeline_metadata([(timeline_key_name, value)])
302 .await
303 .context("apply timeline metadata")?;
304 }
305 }
306 Message::NewSpan {
307 id,
308 metadata,
309 mut records,
310 } => {
311 let name = {
312 let name = records
314 .get("name")
315 .or_else(|| records.get("message"))
316 .map(|n| format!("{:?}", n))
317 .unwrap_or_else(|| metadata.name().to_string());
318
319 self.span_names.insert(id, name.clone());
320
321 name
322 };
323
324 let mut packed_attrs = Vec::new();
325
326 packed_attrs.push((
327 self.get_or_create_event_attr_key("event.name".to_string())
328 .await?,
329 AttrVal::String(name.into()),
330 ));
331
332 let kind = records
333 .remove("modality.kind")
334 .map(tracing_value_to_attr_val)
335 .unwrap_or_else(|| "span:defined".into());
336 packed_attrs.push((
337 self.get_or_create_event_attr_key("event.internal.rs.kind".to_string())
338 .await?,
339 kind,
340 ));
341
342 let span_id = records
343 .remove("modality.span_id")
344 .map(tracing_value_to_attr_val)
345 .unwrap_or_else(|| BigInt::new_attr_val(u64::from(id) as i128));
346 packed_attrs.push((
347 self.get_or_create_event_attr_key("event.internal.rs.span_id".to_string())
348 .await?,
349 span_id,
350 ));
351
352 self.pack_common_attrs(
353 &mut packed_attrs,
354 metadata,
355 records,
356 tick,
357 nanos_since_unix_epoch,
358 )
359 .await?;
360
361 self.client
362 .event(tick.as_nanos(), packed_attrs)
363 .await
364 .context("send packed event")?;
365 }
366 Message::Record { span, records } => {
367 let _ = span;
371 let _ = records;
372 }
373 Message::RecordFollowsFrom { span, follows } => {
374 let _ = span;
378 let _ = follows;
379 }
380 Message::Event {
381 metadata,
382 mut records,
383 } => {
384 let mut packed_attrs = Vec::new();
385
386 let kind = records
387 .remove("modality.kind")
388 .map(tracing_value_to_attr_val)
389 .unwrap_or_else(|| "event".into());
390 packed_attrs.push((
391 self.get_or_create_event_attr_key("event.internal.rs.kind".to_string())
392 .await?,
393 kind,
394 ));
395
396 self.pack_common_attrs(
397 &mut packed_attrs,
398 metadata,
399 records,
400 tick,
401 nanos_since_unix_epoch,
402 )
403 .await?;
404
405 self.client
406 .event(tick.as_nanos(), packed_attrs)
407 .await
408 .context("send packed event")?;
409 }
410 Message::Enter { span } => {
411 let mut packed_attrs = Vec::new();
412
413 {
414 let name = self.span_names.get(&span).map(|n| format!("enter: {}", n));
416
417 if let Some(name) = name {
418 packed_attrs.push((
419 self.get_or_create_event_attr_key("event.name".to_string())
420 .await?,
421 AttrVal::String(name.into()),
422 ));
423 }
424 };
425
426 packed_attrs.push((
427 self.get_or_create_event_attr_key("event.internal.rs.kind".to_string())
428 .await?,
429 AttrVal::String("span:enter".to_string().into()),
430 ));
431
432 packed_attrs.push((
433 self.get_or_create_event_attr_key("event.internal.rs.span_id".to_string())
434 .await?,
435 BigInt::new_attr_val(u64::from(span).into()),
436 ));
437
438 if let Ok(tick) = TryInto::<u64>::try_into(tick.as_nanos()) {
440 packed_attrs.push((
441 self.get_or_create_event_attr_key("event.internal.rs.tick".to_string())
442 .await?,
443 AttrVal::LogicalTime(LogicalTime::unary(tick)),
444 ));
445 }
446
447 self.client
448 .event(tick.as_nanos(), packed_attrs)
449 .await
450 .context("send packed event")?;
451 }
452 Message::Exit { span } => {
453 let mut packed_attrs = Vec::new();
454
455 {
456 let name = self.span_names.get(&span).map(|n| format!("exit: {}", n));
458
459 if let Some(name) = name {
460 packed_attrs.push((
461 self.get_or_create_event_attr_key("event.name".to_string())
462 .await?,
463 AttrVal::String(name.into()),
464 ));
465 }
466 };
467
468 packed_attrs.push((
469 self.get_or_create_event_attr_key("event.internal.rs.kind".to_string())
470 .await?,
471 AttrVal::String("span:exit".to_string().into()),
472 ));
473
474 packed_attrs.push((
475 self.get_or_create_event_attr_key("event.internal.rs.span_id".to_string())
476 .await?,
477 BigInt::new_attr_val(u64::from(span).into()),
478 ));
479
480 if let Ok(tick) = TryInto::<u64>::try_into(tick.as_nanos()) {
482 packed_attrs.push((
483 self.get_or_create_event_attr_key("event.internal.rs.tick".to_string())
484 .await?,
485 AttrVal::LogicalTime(LogicalTime::unary(tick)),
486 ));
487 }
488
489 self.client
490 .event(tick.as_nanos(), packed_attrs)
491 .await
492 .context("send packed event")?;
493 }
494 Message::Close { span } => {
495 self.span_names.remove(&span);
496 }
497 Message::IdChange { old, new } => {
498 let name = self.span_names.get(&old).cloned();
499 if let Some(name) = name {
500 self.span_names.insert(new, name);
501 }
502 }
503 }
504
505 Ok(())
506 }
507
508 async fn get_or_create_timeline_attr_key(
509 &mut self,
510 key: String,
511 ) -> Result<InternedAttrKey, IngestError> {
512 if let Some(id) = self.timeline_keys.get(&key) {
513 return Ok(*id);
514 }
515
516 let interned_key = self
517 .client
518 .declare_attr_key(key.clone())
519 .await
520 .context("define timeline attr key")?;
521
522 self.timeline_keys.insert(key, interned_key);
523
524 Ok(interned_key)
525 }
526
527 async fn get_or_create_event_attr_key(
528 &mut self,
529 key: String,
530 ) -> Result<InternedAttrKey, IngestError> {
531 let key = if key.starts_with("event.") {
532 key
533 } else {
534 format!("event.{key}")
535 };
536
537 if let Some(id) = self.event_keys.get(&key) {
538 return Ok(*id);
539 }
540
541 let interned_key = self
542 .client
543 .declare_attr_key(key.clone())
544 .await
545 .context("define event attr key")?;
546
547 self.event_keys.insert(key, interned_key);
548
549 Ok(interned_key)
550 }
551
552 async fn pack_common_attrs<'a>(
553 &mut self,
554 packed_attrs: &mut Vec<(InternedAttrKey, AttrVal)>,
555 metadata: &'a Metadata<'static>,
556 mut records: RecordMap,
557 tick: Duration,
558 maybe_nanos_since_unix_epoch: Option<Nanoseconds>,
559 ) -> Result<(), IngestError> {
560 let name = records
561 .remove("name")
562 .or_else(|| records.remove("message"))
563 .map(tracing_value_to_attr_val)
564 .unwrap_or_else(|| metadata.name().into());
565 packed_attrs.push((
566 self.get_or_create_event_attr_key("event.name".to_string())
567 .await?,
568 name,
569 ));
570
571 let severity = records
572 .remove("severity")
573 .map(tracing_value_to_attr_val)
574 .unwrap_or_else(|| format!("{}", metadata.level()).to_lowercase().into());
575 packed_attrs.push((
576 self.get_or_create_event_attr_key("event.severity".to_string())
577 .await?,
578 severity,
579 ));
580
581 let module_path = records
582 .remove("source.module")
583 .map(tracing_value_to_attr_val)
584 .or_else(|| metadata.module_path().map(|mp| mp.into()));
585 if let Some(module_path) = module_path {
586 packed_attrs.push((
587 self.get_or_create_event_attr_key("event.source.module".to_string())
588 .await?,
589 module_path,
590 ));
591 }
592
593 let source_file = records
594 .remove("source.file")
595 .map(tracing_value_to_attr_val)
596 .or_else(|| metadata.file().map(|mp| mp.into()));
597 if let Some(source_file) = source_file {
598 packed_attrs.push((
599 self.get_or_create_event_attr_key("event.source.file".to_string())
600 .await?,
601 source_file,
602 ));
603 }
604
605 let source_line = records
606 .remove("source.line")
607 .map(tracing_value_to_attr_val)
608 .or_else(|| metadata.line().map(|mp| (mp as i64).into()));
609 if let Some(source_line) = source_line {
610 packed_attrs.push((
611 self.get_or_create_event_attr_key("event.source.line".to_string())
612 .await?,
613 source_line,
614 ));
615 }
616
617 if let Ok(tick) = TryInto::<u64>::try_into(tick.as_nanos()) {
619 packed_attrs.push((
620 self.get_or_create_event_attr_key("event.internal.rs.tick".to_string())
621 .await?,
622 AttrVal::LogicalTime(LogicalTime::unary(tick)),
623 ));
624 }
625
626 let remote_timeline_id = records
628 .remove("interaction.remote_timeline_id")
629 .map(tracing_value_to_attr_val);
630 if let Some(attrval) = remote_timeline_id {
631 let remote_timeline_id = if let AttrVal::String(string) = attrval {
632 use std::str::FromStr;
633 if let Ok(uuid) = Uuid::from_str(&string) {
634 AttrVal::TimelineId(Box::new(uuid.into()))
635 } else {
636 AttrVal::String(string)
637 }
638 } else {
639 attrval
640 };
641
642 packed_attrs.push((
643 self.get_or_create_event_attr_key("event.interaction.remote_timeline_id".into())
644 .await?,
645 remote_timeline_id,
646 ));
647 }
648
649 let remote_timestamp = records
651 .remove("interaction.remote_timestamp")
652 .map(tracing_value_to_attr_val);
653 if let Some(attrval) = remote_timestamp {
654 let remote_timestamp = match attrval {
655 AttrVal::Integer(i) if i >= 0 => AttrVal::Timestamp(Nanoseconds::from(i as u64)),
656 AttrVal::BigInt(i) if *i >= 0 && *i <= u64::MAX as i128 => {
657 AttrVal::Timestamp(Nanoseconds::from(*i as u64))
658 }
659 AttrVal::Timestamp(t) => AttrVal::Timestamp(t),
660 x => x,
661 };
662
663 packed_attrs.push((
664 self.get_or_create_event_attr_key("event.interaction.remote_timestamp".into())
665 .await?,
666 remote_timestamp,
667 ));
668 }
669
670 let local_timestamp = records.remove("timestamp").map(tracing_value_to_attr_val);
672 if let Some(attrval) = local_timestamp {
673 let remote_timestamp = match attrval {
674 AttrVal::Integer(i) if i >= 0 => AttrVal::Timestamp(Nanoseconds::from(i as u64)),
675 AttrVal::BigInt(i) if *i >= 0 && *i <= u64::MAX as i128 => {
676 AttrVal::Timestamp(Nanoseconds::from(*i as u64))
677 }
678 AttrVal::Timestamp(t) => AttrVal::Timestamp(t),
679 x => x,
680 };
681
682 packed_attrs.push((
683 self.get_or_create_event_attr_key("event.timestamp".into())
684 .await?,
685 remote_timestamp,
686 ));
687 } else if let Some(nanos_since_unix_epoch) = maybe_nanos_since_unix_epoch {
688 packed_attrs.push((
689 self.get_or_create_event_attr_key("event.timestamp".into())
690 .await?,
691 AttrVal::Timestamp(nanos_since_unix_epoch),
692 ));
693 }
694
695 for (name, value) in records {
697 let attrval = tracing_value_to_attr_val(value);
698
699 let key = if name.starts_with("event.") {
700 name.to_string()
701 } else {
702 format!("event.{}", name.as_str())
703 };
704
705 packed_attrs.push((self.get_or_create_event_attr_key(key).await?, attrval));
706 }
707
708 Ok(())
709 }
710}
711
712fn tracing_value_to_attr_val(value: TracingValue) -> AttrVal {
713 match value {
714 TracingValue::String(s) => s.into(),
715 TracingValue::F64(n) => n.into(),
716 TracingValue::I64(n) => n.into(),
717 TracingValue::U64(n) => (n as i128).into(),
718 TracingValue::Bool(b) => b.into(),
719 }
720}