1use std::collections::{BTreeMap, HashMap};
2
3use anyhow::{anyhow, Context, Error as AnyError};
4use tokio::sync::mpsc::{self};
5use tracing::instrument;
6
7use crate::context::{EventContext, SpanContext};
8use crate::filter::{
9 BasicEventFilter, BasicSpanFilter, BoundSearch, FilterPredicate, IndexedEventFilter,
10 IndexedEventFilterIterator, IndexedSpanFilter, IndexedSpanFilterIterator, Query,
11};
12use crate::index::{EventIndexes, SpanEventIndexes, SpanIndexes};
13use crate::models::{CloseSpanEvent, EnterSpanEvent, EventKey, FollowsSpanEvent};
14use crate::storage::Storage;
15use crate::subscription::{EventSubscription, SpanSubscription, Subscriber};
16use crate::{
17 ComposedEvent, ComposedSpan, CreateSpanEvent, DatasetStats, DeleteFilter, DeleteMetrics, Event,
18 FullSpanId, InstanceId, NewEvent, NewResource, NewSpanEvent, NewSpanEventKind, Resource,
19 ResourceKey, Span, SpanEvent, SpanEventKey, SpanEventKind, SpanKey, SubscriptionId, Timestamp,
20 UpdateSpanEvent, ValueOperator,
21};
22
23pub struct SyncEngine<S> {
25 pub(crate) storage: S,
26 pub(crate) span_indexes: SpanIndexes,
27 pub(crate) span_event_indexes: SpanEventIndexes,
28 pub(crate) event_indexes: EventIndexes,
29
30 resources: HashMap<ResourceKey, Resource>,
31
32 next_subscriber_id: usize,
33 span_subscribers: HashMap<usize, SpanSubscription>,
34 event_subscribers: HashMap<usize, EventSubscription>,
35}
36
37impl<S: Storage> SyncEngine<S> {
38 pub fn new(storage: S) -> Result<SyncEngine<S>, AnyError> {
39 let mut engine = SyncEngine {
40 storage,
41 span_indexes: SpanIndexes::new(),
42 span_event_indexes: SpanEventIndexes::new(),
43 event_indexes: EventIndexes::new(),
44
45 resources: HashMap::new(),
46
47 next_subscriber_id: 0,
48 span_subscribers: HashMap::new(),
49 event_subscribers: HashMap::new(),
50 };
51
52 tracing::info!("initializing engine");
53
54 let resources = engine
55 .storage
56 .get_all_resources()
57 .context("failed to load resources")?
58 .collect::<Vec<_>>();
59
60 for resource in resources {
61 let resource = resource.context("failed to load resource")?;
62 engine.insert_resource_bookeeping(&resource);
63 }
64
65 let indexes_result = engine
66 .storage
67 .as_index_storage()
68 .and_then(|s| s.get_indexes().transpose());
69
70 match indexes_result {
71 Some(Ok(indexes)) => {
72 tracing::info!("loaded indexes from storage");
73
74 let (span_indexes, span_event_indexes, event_indexes) = indexes;
75
76 engine.span_indexes = span_indexes;
77 engine.span_event_indexes = span_event_indexes;
78 engine.event_indexes = event_indexes;
79 }
80 Some(Err(err)) => {
81 tracing::warn!(?err, "failed to load indexes from storage");
82
83 let spans = engine
84 .storage
85 .get_all_spans()
86 .context("failed to load spans")?
87 .collect::<Vec<_>>();
88
89 for span in spans {
90 let span = span.context("failed to load span")?;
91 engine.insert_span_bookeeping(&span);
92 }
93
94 let span_events = engine
95 .storage
96 .get_all_span_events()
97 .context("failed to load span events")?
98 .collect::<Vec<_>>();
99
100 for span_event in span_events {
101 let span_event = span_event.context("failed to load span event")?;
102 engine.insert_span_event_bookeeping(&span_event);
103 }
104
105 let events = engine
106 .storage
107 .get_all_events()
108 .context("failed to load events")?
109 .collect::<Vec<_>>();
110
111 for event in events {
112 let event = event.context("failed to load event")?;
113 engine.insert_event_bookeeping(&event);
114 }
115 }
116 None => {
117 tracing::warn!("no indexes from storage");
118
119 let spans = engine
120 .storage
121 .get_all_spans()
122 .context("failed to load spans")?
123 .collect::<Vec<_>>();
124
125 for span in spans {
126 let span = span.context("failed to load span")?;
127 engine.insert_span_bookeeping(&span);
128 }
129
130 let span_events = engine
131 .storage
132 .get_all_span_events()
133 .context("failed to load span events")?
134 .collect::<Vec<_>>();
135
136 for span_event in span_events {
137 let span_event = span_event.context("failed to load span event")?;
138 engine.insert_span_event_bookeeping(&span_event);
139 }
140
141 let events = engine
142 .storage
143 .get_all_events()
144 .context("failed to load events")?
145 .collect::<Vec<_>>();
146
147 for event in events {
148 let event = event.context("failed to load event")?;
149 engine.insert_event_bookeeping(&event);
150 }
151 }
152 }
153
154 if !engine.span_indexes.durations.open.is_empty() {
155 let last_event = engine.event_indexes.all.last();
156 let last_span_event = engine.span_event_indexes.all.last();
157 let last_at = match (last_event, last_span_event) {
158 (Some(event), Some(span_event)) => Ord::max(*event, *span_event),
159 (None, Some(span_event)) => *span_event,
160 (Some(event), None) => *event,
161 (None, None) => panic!("not possible to have open span but no span events"),
162 };
163
164 let at = last_at.saturating_add(1);
165
166 for span_key in engine.span_indexes.durations.open.clone() {
167 engine.span_indexes.update_with_closed(span_key, at);
168 engine
169 .storage
170 .update_span_closed(span_key, at, None)
171 .context("failed to close span")?;
172 }
173 }
174
175 tracing::info!("loaded {} spans", engine.span_indexes.all.len());
176 tracing::info!("loaded {} span events", engine.span_event_indexes.all.len());
177 tracing::info!("loaded {} events", engine.event_indexes.all.len());
178
179 Ok(engine)
180 }
181
182 #[instrument(level = tracing::Level::TRACE, skip_all)]
183 pub fn query_event(&self, query: Query) -> Vec<ComposedEvent> {
184 tracing::debug!(?query, "querying for events");
185
186 let limit = query.limit;
187 IndexedEventFilterIterator::new(query, self)
188 .take(limit)
189 .filter_map(|event_key| {
190 self.storage
191 .get_event(event_key)
192 .inspect_err(|err| tracing::warn!(?err, "failed to load event"))
193 .ok()
194 })
195 .map(|event| EventContext::with_event(&event, &self.storage).render())
196 .collect()
197 }
198
199 #[instrument(level = tracing::Level::TRACE, skip_all)]
200 pub fn query_event_count(&self, query: Query) -> usize {
201 tracing::debug!(?query, "querying for event counts");
202
203 let event_iter = IndexedEventFilterIterator::new(query, self);
204
205 match event_iter.size_hint() {
206 (min, Some(max)) if min == max => min,
207 _ => event_iter.count(),
208 }
209 }
210
211 #[instrument(level = tracing::Level::TRACE, skip_all)]
212 pub fn query_span(&self, query: Query) -> Vec<ComposedSpan> {
213 tracing::debug!(?query, "querying for spans");
214
215 let limit = query.limit;
216 IndexedSpanFilterIterator::new(query, self)
217 .take(limit)
218 .filter_map(|span_key| {
219 self.storage
220 .get_span(span_key)
221 .inspect_err(|err| tracing::warn!(?err, "failed to load span"))
222 .ok()
223 })
224 .map(|span| SpanContext::with_span(&span, &self.storage).render())
225 .collect()
226 }
227
228 #[instrument(level = tracing::Level::TRACE, skip_all)]
229 pub fn query_span_count(&self, query: Query) -> usize {
230 tracing::debug!(?query, "querying for span counts");
231
232 let span_iter = IndexedSpanFilterIterator::new(query, self);
233
234 match span_iter.size_hint() {
235 (min, Some(max)) if min == max => min,
236 _ => span_iter.count(),
237 }
238 }
239
240 #[instrument(level = tracing::Level::TRACE, skip_all)]
241 #[doc(hidden)]
242 pub fn query_span_event(&self, _query: Query) -> Vec<SpanEvent> {
243 unimplemented!()
244 }
245
246 #[instrument(level = tracing::Level::TRACE, skip_all)]
247 pub fn query_stats(&self) -> DatasetStats {
248 tracing::debug!("querying for stats");
249
250 let event_start = self.event_indexes.all.first().copied();
251 let event_end = self.event_indexes.all.last().copied();
252 let span_start = self.span_indexes.all.first().copied();
253 let span_end = self.span_indexes.all.last().copied(); DatasetStats {
256 start: crate::filter::merge(event_start, span_start, Ord::min),
257 end: crate::filter::merge(event_end, span_end, Ord::max),
258 total_events: self.event_indexes.all.len(),
259 total_spans: self.span_indexes.all.len(),
260 }
261 }
262
263 #[instrument(level = tracing::Level::TRACE, skip_all)]
264 pub fn insert_resource(&mut self, resource: NewResource) -> Result<ResourceKey, AnyError> {
265 tracing::debug!(?resource, "inserting resource");
266
267 if let Some((key, _)) = self
268 .resources
269 .iter()
270 .find(|(_, r)| r.attributes == resource.attributes)
271 {
272 return Ok(*key);
273 }
274
275 let keys = self.resources.keys().copied().collect::<Vec<_>>();
276 let now = now();
277 let resource_key = get_unique_timestamp(now, &keys);
278 let resource = Resource {
279 created_at: resource_key,
280 attributes: resource.attributes,
281 };
282
283 self.insert_resource_bookeeping(&resource);
284 self.storage
285 .insert_resource(resource)
286 .context("failed to insert resource")?;
287
288 Ok(resource_key)
289 }
290
291 fn insert_resource_bookeeping(&mut self, resource: &Resource) {
292 self.resources.insert(resource.key(), resource.clone());
293 }
294
295 #[instrument(level = tracing::Level::TRACE, skip_all)]
296 pub fn disconnect_tracing_instance(&mut self, instance_id: InstanceId) -> Result<(), AnyError> {
297 tracing::debug!(instance_id, "disconnecting tracing instance");
298
299 let at = now();
300
301 let filter = IndexedSpanFilter::And(vec![
302 IndexedSpanFilter::Single(&self.span_indexes.durations.open, None),
303 IndexedSpanFilter::Single(
304 self.span_indexes
305 .instances
306 .get(&instance_id)
307 .map(Vec::as_slice)
308 .unwrap_or_default(),
309 None,
310 ),
311 ]);
312
313 let open_spans = IndexedSpanFilterIterator::new_internal(filter, self).collect::<Vec<_>>();
314
315 for span_key in open_spans {
316 self.span_indexes.update_with_closed(span_key, at);
317 self.storage
318 .update_span_closed(span_key, at, None)
319 .context("failed to close span")?;
320 }
321
322 Ok(())
323 }
324
325 #[instrument(level = tracing::Level::TRACE, skip_all)]
326 pub fn insert_span_event(
327 &mut self,
328 mut new_span_event: NewSpanEvent,
329 ) -> Result<SpanEventKey, AnyError> {
330 tracing::debug!(span_event = ?new_span_event, "inserting span event");
331
332 let span_event_key =
333 get_unique_timestamp(new_span_event.timestamp, &self.span_event_indexes.all);
334 new_span_event.timestamp = span_event_key;
335
336 match new_span_event.kind {
337 NewSpanEventKind::Create(new_create_event) => {
338 if self.span_indexes.ids.contains_key(&new_span_event.span_id) {
339 return Err(anyhow::anyhow!("duplicate span id"));
340 }
341
342 let parent_id = new_create_event.parent_id;
344 let parent_key = parent_id.and_then(|id| self.span_indexes.ids.get(&id).copied());
345
346 let created_at =
347 get_unique_timestamp(new_span_event.timestamp, &self.span_indexes.all);
348
349 let span = Span {
350 kind: new_create_event.kind,
351 resource_key: new_create_event.resource_key,
352 id: new_span_event.span_id,
353 created_at,
354 closed_at: None,
355 busy: None,
356 parent_id,
357 parent_key,
358 links: Vec::new(),
359 name: new_create_event.name.clone(),
360 namespace: new_create_event.namespace.clone(),
361 function: new_create_event.function.clone(),
362 level: new_create_event.level,
363 file_name: new_create_event.file_name.clone(),
364 file_line: new_create_event.file_line,
365 file_column: new_create_event.file_column,
366 instrumentation_attributes: new_create_event.instrumentation_attributes.clone(),
367 attributes: new_create_event.attributes.clone(),
368 };
369
370 let span_event = SpanEvent {
371 timestamp: new_span_event.timestamp,
372 span_key: span.created_at,
373 kind: SpanEventKind::Create(CreateSpanEvent {
374 kind: new_create_event.kind,
375 resource_key: new_create_event.resource_key,
376 parent_key,
377 name: new_create_event.name,
378 namespace: new_create_event.namespace,
379 function: new_create_event.function,
380 level: new_create_event.level,
381 file_name: new_create_event.file_name,
382 file_line: new_create_event.file_line,
383 file_column: new_create_event.file_column,
384 instrumentation_attributes: new_create_event.instrumentation_attributes,
385 attributes: new_create_event.attributes,
386 }),
387 };
388
389 let (child_spans, child_events) = self.insert_span_bookeeping(&span);
390 self.storage
391 .insert_span(span.clone())
392 .context("failed to insert span")?;
393 self.storage
394 .update_span_parents(span.key(), &child_spans)
395 .context("failed to update span parents")?;
396 self.storage
397 .update_event_parents(span.key(), &child_events)
398 .context("failed to update event parents")?;
399
400 self.insert_span_event_bookeeping(&span_event);
401 self.storage
402 .insert_span_event(span_event)
403 .context("failed to insert span event")?;
404
405 if !self.event_subscribers.is_empty() {
406 let root = SpanContext::with_span(&span, &self.storage).trace_root();
407 let descendent_events = self
408 .event_indexes
409 .traces
410 .get(&root)
411 .map(Vec::as_slice)
412 .unwrap_or_default()
413 .iter()
414 .copied()
415 .filter(|key| {
416 EventContext::new(*key, &self.storage)
417 .parents()
418 .any(|p| p.key() == span.key())
419 });
420
421 for event_key in descendent_events {
424 let context = EventContext::new(event_key, &self.storage);
425 for subscriber in self.event_subscribers.values_mut() {
426 subscriber.on_event(&context);
427 }
428 }
429
430 self.event_subscribers.retain(|_, s| s.connected());
431 }
432
433 if !self.span_subscribers.is_empty() {
434 for subscriber in self.span_subscribers.values_mut() {
435 subscriber.on_span(&SpanContext::with_span(&span, &self.storage));
436 }
437
438 let root = SpanContext::with_span(&span, &self.storage).trace_root();
439 let descendent_spans = self
440 .span_indexes
441 .traces
442 .get(&root)
443 .map(Vec::as_slice)
444 .unwrap_or_default()
445 .iter()
446 .copied()
447 .filter(|key| {
448 SpanContext::new(*key, &self.storage)
449 .parents()
450 .any(|p| p.key() == span.key())
451 });
452
453 for span_key in descendent_spans {
456 let context = SpanContext::new(span_key, &self.storage);
457 for subscriber in self.span_subscribers.values_mut() {
458 subscriber.on_span(&context);
459 }
460 }
461
462 self.span_subscribers.retain(|_, s| s.connected());
463 }
464 }
465 NewSpanEventKind::Update(new_update_event) => {
466 let span_key = self
467 .span_indexes
468 .ids
469 .get(&new_span_event.span_id)
470 .copied()
471 .ok_or(anyhow!("unknown span id"))?;
472
473 let span = SpanContext::new(span_key, &self.storage);
474 let trace = span.trace_root();
475
476 let update_event = UpdateSpanEvent {
477 attributes: new_update_event.attributes.clone(),
478 };
479
480 let descendent_spans = self
481 .span_indexes
482 .traces
483 .get(&trace)
484 .map(Vec::as_slice)
485 .unwrap_or_default()
486 .iter()
487 .cloned()
488 .filter(|key| {
489 SpanContext::new(*key, &self.storage)
490 .parents()
491 .any(|p| p.key() == span_key)
492 || *key == span_key })
494 .collect::<Vec<_>>();
495
496 for child_span_key in descendent_spans {
497 self.span_indexes.update_with_new_field_on_parent(
499 &SpanContext::new(child_span_key, &self.storage),
500 span_key,
501 &update_event.attributes,
502 );
503 }
504
505 let descendent_events = self
506 .event_indexes
507 .traces
508 .get(&trace)
509 .map(Vec::as_slice)
510 .unwrap_or_default()
511 .iter()
512 .cloned()
513 .filter(|key| {
514 EventContext::new(*key, &self.storage)
515 .parents()
516 .any(|p| p.key() == span_key)
517 })
518 .collect::<Vec<_>>();
519
520 for event_key in descendent_events {
521 self.event_indexes.update_with_new_field_on_parent(
523 &EventContext::new(event_key, &self.storage),
524 span_key,
525 &update_event.attributes,
526 );
527 }
528
529 let span_event = SpanEvent {
530 timestamp: new_span_event.timestamp,
531 span_key,
532 kind: SpanEventKind::Update(update_event),
533 };
534
535 self.storage
536 .update_span_attributes(span_key, new_update_event.attributes)
537 .context("failed to update span attributes")?;
538
539 self.insert_span_event_bookeeping(&span_event);
540 self.storage
541 .insert_span_event(span_event)
542 .context("failed to insert span event")?;
543
544 if !self.event_subscribers.is_empty() {
545 let descendent_events = self
546 .event_indexes
547 .traces
548 .get(&trace)
549 .map(Vec::as_slice)
550 .unwrap_or_default()
551 .iter()
552 .copied()
553 .filter(|key| {
554 EventContext::new(*key, &self.storage)
555 .parents()
556 .any(|p| p.key() == span_key)
557 });
558
559 for event_key in descendent_events {
562 let context = EventContext::new(event_key, &self.storage);
563 for subscriber in self.event_subscribers.values_mut() {
564 subscriber.on_event(&context);
565 }
566 }
567
568 self.event_subscribers.retain(|_, s| s.connected());
569 }
570
571 if !self.span_subscribers.is_empty() {
572 let descendent_spans = self
573 .span_indexes
574 .traces
575 .get(&trace)
576 .map(Vec::as_slice)
577 .unwrap_or_default()
578 .iter()
579 .copied()
580 .filter(|key| {
581 SpanContext::new(*key, &self.storage)
582 .parents()
583 .any(|p| p.key() == span_key)
584 });
585
586 for span_key in descendent_spans {
589 let context = SpanContext::new(span_key, &self.storage);
590 for subscriber in self.span_subscribers.values_mut() {
591 subscriber.on_span(&context);
592 }
593 }
594
595 self.span_subscribers.retain(|_, s| s.connected());
596 }
597 }
598 NewSpanEventKind::Follows(new_follows_event) => {
599 let span_key = self
600 .span_indexes
601 .ids
602 .get(&new_span_event.span_id)
603 .copied()
604 .ok_or(anyhow!("unknown span id"))?;
605
606 let FullSpanId::Tracing(instance_id, _) = new_span_event.span_id else {
607 return Err(anyhow!("invalid span kind, expected tracing"));
608 };
609
610 let follows_span_id = FullSpanId::Tracing(instance_id, new_follows_event.follows);
611 let follows_span_key = self
612 .span_indexes
613 .ids
614 .get(&follows_span_id)
615 .copied()
616 .ok_or(anyhow!("unknown span id"))?;
617
618 let span_event = SpanEvent {
622 timestamp: new_span_event.timestamp,
623 span_key,
624 kind: SpanEventKind::Follows(FollowsSpanEvent {
625 follows: follows_span_key,
626 }),
627 };
628
629 self.storage
630 .update_span_link(span_key, follows_span_id, BTreeMap::new())
631 .context("failed to update span link")?;
632
633 self.insert_span_event_bookeeping(&span_event);
634 self.storage
635 .insert_span_event(span_event)
636 .context("failed to insert span event")?;
637 }
638 NewSpanEventKind::Enter(new_enter_event) => {
639 let span_key = self
640 .span_indexes
641 .ids
642 .get(&new_span_event.span_id)
643 .copied()
644 .ok_or(anyhow!("unknown span id"))?;
645
646 let span_event = SpanEvent {
647 timestamp: new_span_event.timestamp,
648 span_key,
649 kind: SpanEventKind::Enter(EnterSpanEvent {
650 thread_id: new_enter_event.thread_id,
651 }),
652 };
653
654 self.insert_span_event_bookeeping(&span_event);
655 self.storage
656 .insert_span_event(span_event)
657 .context("failed to insert span event")?;
658 }
659 NewSpanEventKind::Exit => {
660 let span_key = self
661 .span_indexes
662 .ids
663 .get(&new_span_event.span_id)
664 .copied()
665 .ok_or(anyhow!("unknown span id"))?;
666
667 let span_event = SpanEvent {
668 timestamp: new_span_event.timestamp,
669 span_key,
670 kind: SpanEventKind::Exit,
671 };
672
673 self.insert_span_event_bookeeping(&span_event);
674 self.storage
675 .insert_span_event(span_event)
676 .context("failed to insert span event")?;
677 }
678 NewSpanEventKind::Close(new_close_event) => {
679 let span_key = self
680 .span_indexes
681 .ids
682 .get(&new_span_event.span_id)
683 .copied()
684 .ok_or(anyhow!("unknown span id"))?;
685
686 let busy = if let Some(busy) = new_close_event.busy {
687 Some(busy)
688 } else {
689 let mut busy = 0;
690 let mut last_enter = None;
691 for span_event_key in &self.span_event_indexes.spans[&span_key] {
692 let Ok(span_event) = self.storage.get_span_event(*span_event_key) else {
693 tracing::warn!("failed to get span event, ignoring");
694 continue;
695 };
696 match &span_event.kind {
697 SpanEventKind::Enter(_) => {
698 last_enter = Some(span_event.timestamp);
699 }
700 SpanEventKind::Exit => {
701 if let Some(enter) = last_enter {
702 busy += span_event.timestamp.get() - enter.get();
703 }
704 last_enter = None;
705 }
706 _ => {}
707 }
708 }
709
710 if busy == 0 {
711 None
712 } else {
713 Some(busy)
714 }
715 };
716
717 let span_event = SpanEvent {
718 timestamp: new_span_event.timestamp,
719 span_key,
720 kind: SpanEventKind::Close(CloseSpanEvent { busy }),
721 };
722
723 let updated = self
724 .span_indexes
725 .update_with_closed(span_key, new_span_event.timestamp);
726
727 if !updated {
728 return Err(anyhow::anyhow!("span already closed"));
729 }
730
731 self.storage
732 .update_span_closed(span_key, new_span_event.timestamp, busy)
733 .context("failed to close span")?;
734
735 self.insert_span_event_bookeeping(&span_event);
736 self.storage
737 .insert_span_event(span_event)
738 .context("failed to insert span event")?;
739 }
740 }
741
742 Ok(span_event_key)
743 }
744
745 fn insert_span_bookeeping(&mut self, span: &Span) -> (Vec<SpanKey>, Vec<EventKey>) {
746 let span_key = span.created_at;
747
748 let orphaned_spans = self
749 .span_indexes
750 .update_with_new_span(&SpanContext::with_span(span, &self.storage));
751
752 let trace = SpanContext::with_span(span, &self.storage).trace_root();
753
754 let descendent_spans = self
755 .span_indexes
756 .traces
757 .get(&trace)
758 .map(Vec::as_slice)
759 .unwrap_or_default()
760 .iter()
761 .cloned()
762 .filter(|key| *key != span_key)
763 .filter(|key| {
764 orphaned_spans.contains(key)
765 || SpanContext::new(*key, &self.storage)
766 .parents()
767 .any(|p| orphaned_spans.contains(&p.key()))
768 })
769 .collect::<Vec<_>>();
770
771 for descendent in descendent_spans {
772 self.span_indexes.update_with_new_field_on_parent(
773 &SpanContext::new(descendent, &self.storage),
774 span.key(),
775 &span.attributes,
776 );
777 }
778
779 let orphaned_events = self
780 .event_indexes
781 .update_with_new_span(&SpanContext::with_span(span, &self.storage));
782
783 let descendent_events = self
784 .event_indexes
785 .traces
786 .get(&trace)
787 .map(Vec::as_slice)
788 .unwrap_or_default()
789 .iter()
790 .cloned()
791 .filter(|key| {
792 orphaned_events.contains(key)
793 || EventContext::new(*key, &self.storage)
794 .parents()
795 .any(|p| orphaned_events.contains(&p.key()))
796 })
797 .collect::<Vec<_>>();
798
799 for descendent in descendent_events {
800 self.event_indexes.update_with_new_field_on_parent(
801 &EventContext::new(descendent, &self.storage),
802 span.key(),
803 &span.attributes,
804 );
805 }
806
807 (orphaned_spans, orphaned_events)
808 }
809
810 fn insert_span_event_bookeeping(&mut self, span_event: &SpanEvent) {
811 self.span_event_indexes
812 .update_with_new_span_event(span_event);
813 }
814
815 #[instrument(level = tracing::Level::TRACE, skip_all)]
816 pub fn insert_event(&mut self, mut new_event: NewEvent) -> Result<(), AnyError> {
817 let event_key = get_unique_timestamp(new_event.timestamp, &self.event_indexes.all);
818 new_event.timestamp = event_key;
819
820 let parent_id = new_event.span_id;
822 let parent_key = parent_id.and_then(|id| self.span_indexes.ids.get(&id).copied());
823
824 let event = Event {
825 kind: new_event.kind,
826 resource_key: new_event.resource_key,
827 timestamp: new_event.timestamp,
828 parent_id,
829 parent_key,
830 content: new_event.content,
831 namespace: new_event.namespace,
832 function: new_event.function,
833 level: new_event.level,
834 file_name: new_event.file_name,
835 file_line: new_event.file_line,
836 file_column: new_event.file_column,
837 attributes: new_event.attributes,
838 };
839
840 self.insert_event_bookeeping(&event);
841 self.storage
842 .insert_event(event.clone())
843 .context("failed to insert event")?;
844
845 let context = EventContext::with_event(&event, &self.storage);
846 for subscriber in self.event_subscribers.values_mut() {
847 subscriber.on_event(&context);
848 }
849
850 self.event_subscribers.retain(|_, s| s.connected());
851
852 Ok(())
853 }
854
855 fn insert_event_bookeeping(&mut self, event: &Event) {
856 self.event_indexes
857 .update_with_new_event(&EventContext::with_event(event, &self.storage));
858 }
859
860 #[instrument(level = tracing::Level::TRACE, skip_all)]
861 pub fn delete(&mut self, filter: DeleteFilter) -> Result<DeleteMetrics, AnyError> {
862 let root_spans =
863 self.get_root_spans_in_range_filter(filter.start, filter.end, filter.inside);
864 let root_events =
865 self.get_root_events_in_range_filter(filter.start, filter.end, filter.inside);
866
867 let spans_from_root_spans = root_spans
868 .iter()
869 .flat_map(|root| {
870 self.span_indexes
871 .traces
872 .get(&SpanContext::new(*root, &self.storage).trace_root())
873 .map(Vec::as_slice)
874 .unwrap_or_default()
875 .iter()
876 .cloned()
877 })
878 .collect::<Vec<SpanKey>>();
879 let events_from_root_spans = root_spans
880 .iter()
881 .flat_map(|root| {
882 self.event_indexes
883 .traces
884 .get(&SpanContext::new(*root, &self.storage).trace_root())
885 .map(Vec::as_slice)
886 .unwrap_or_default()
887 .iter()
888 .cloned()
889 })
890 .collect::<Vec<EventKey>>();
891 let span_events = spans_from_root_spans
892 .iter()
893 .flat_map(|span| {
894 self.span_event_indexes
895 .spans
896 .get(span)
897 .map(Vec::as_slice)
898 .unwrap_or_default()
899 .iter()
900 .cloned()
901 })
902 .collect::<Vec<SpanEventKey>>();
903
904 if filter.dry_run {
905 return Ok(DeleteMetrics {
906 spans: spans_from_root_spans.len(),
907 span_events: span_events.len(),
908 events: root_events.len() + events_from_root_spans.len(),
909 });
910 }
911
912 let mut spans_to_delete = spans_from_root_spans;
913 let mut span_events_to_delete = span_events;
914 let mut events_to_delete = root_events;
915 events_to_delete.extend(events_from_root_spans);
916
917 spans_to_delete.sort();
918 span_events_to_delete.sort();
919 events_to_delete.sort();
920
921 self.storage
925 .drop_events(&events_to_delete)
926 .context("failed to drop events")?;
927 self.storage
928 .drop_span_events(&span_events_to_delete)
929 .context("failed to drop span events")?;
930 self.storage
931 .drop_spans(&spans_to_delete)
932 .context("failed to drop spans")?;
933
934 self.remove_spans_bookeeping(&spans_to_delete);
937 self.remove_span_events_bookeeping(&span_events_to_delete);
938 self.remove_events_bookeeping(&events_to_delete);
939
940 let resources_to_delete = self
941 .resources
942 .keys()
943 .copied()
944 .filter(|resource_key| {
945 let used_by_spans = self
946 .span_indexes
947 .resources
948 .get(resource_key)
949 .is_some_and(|r| !r.is_empty());
950
951 let used_by_events = self
952 .event_indexes
953 .resources
954 .get(resource_key)
955 .is_some_and(|r| !r.is_empty());
956
957 !used_by_spans && !used_by_events
958 })
959 .collect::<Vec<_>>();
960
961 self.storage
962 .drop_resources(&resources_to_delete)
963 .context("failed to drop events")?;
964
965 for resource_key in resources_to_delete {
966 self.resources.remove(&resource_key);
967 }
968
969 Ok(DeleteMetrics {
970 spans: spans_to_delete.len(),
971 span_events: span_events_to_delete.len(),
972 events: events_to_delete.len(),
973 })
974 }
975
976 fn get_root_spans_in_range_filter(
977 &self,
978 start: Timestamp,
979 end: Timestamp,
980 inside: bool,
981 ) -> Vec<SpanKey> {
982 let filter = if inside {
983 BasicSpanFilter::And(vec![
984 BasicSpanFilter::Created(ValueOperator::Lte, end),
985 BasicSpanFilter::Closed(ValueOperator::Gte, start),
986 BasicSpanFilter::Root,
987 ])
988 } else {
989 BasicSpanFilter::And(vec![
990 BasicSpanFilter::Or(vec![
991 BasicSpanFilter::Created(ValueOperator::Gt, end),
992 BasicSpanFilter::Closed(ValueOperator::Lt, start),
993 ]),
994 BasicSpanFilter::Root,
995 ])
996 };
997
998 let indexed_filter =
999 IndexedSpanFilter::build(Some(filter), &self.span_indexes, &self.storage);
1000 let iter = IndexedSpanFilterIterator::new_internal(indexed_filter, self);
1001
1002 iter.collect()
1003 }
1004
1005 fn get_root_events_in_range_filter(
1006 &self,
1007 start: Timestamp,
1008 end: Timestamp,
1009 inside: bool,
1010 ) -> Vec<SpanKey> {
1011 let filter = if inside {
1012 BasicEventFilter::And(vec![
1013 BasicEventFilter::Timestamp(ValueOperator::Lte, end),
1014 BasicEventFilter::Timestamp(ValueOperator::Gte, start),
1015 BasicEventFilter::Root,
1016 ])
1017 } else {
1018 BasicEventFilter::And(vec![
1019 BasicEventFilter::Or(vec![
1020 BasicEventFilter::Timestamp(ValueOperator::Gt, end),
1021 BasicEventFilter::Timestamp(ValueOperator::Lt, start),
1022 ]),
1023 BasicEventFilter::Root,
1024 ])
1025 };
1026
1027 let indexed_filter =
1028 IndexedEventFilter::build(Some(filter), &self.event_indexes, &self.storage);
1029 let iter = IndexedEventFilterIterator::new_internal(indexed_filter, self);
1030
1031 iter.collect()
1032 }
1033
1034 fn remove_spans_bookeeping(&mut self, spans: &[SpanKey]) {
1035 self.span_indexes.remove_spans(spans);
1036 self.span_event_indexes.remove_spans(spans);
1037 self.event_indexes.remove_spans(spans);
1038 }
1039
1040 fn remove_span_events_bookeeping(&mut self, span_events: &[SpanEventKey]) {
1041 self.span_event_indexes.remove_span_events(span_events);
1042 }
1043
1044 fn remove_events_bookeeping(&mut self, events: &[EventKey]) {
1045 self.event_indexes.remove_events(events);
1046 }
1047
1048 #[instrument(level = tracing::Level::TRACE, skip_all)]
1049 pub fn copy_dataset(&self, target_storage: &mut dyn Storage) -> Result<(), AnyError> {
1050 let resources = self
1051 .storage
1052 .get_all_resources()
1053 .context("failed to get resources")?
1054 .collect::<Vec<_>>();
1055
1056 for resource in resources {
1057 let resource = resource.context("failed to get resource")?;
1058 target_storage
1059 .insert_resource((*resource).clone())
1060 .context("failed to insert resource")?;
1061 }
1062
1063 let spans = self
1064 .storage
1065 .get_all_spans()
1066 .context("failed to get spans")?
1067 .collect::<Vec<_>>();
1068
1069 for span in spans {
1070 let span = span.context("failed to get span")?;
1071 target_storage
1072 .insert_span((*span).clone())
1073 .context("failed to insert span")?;
1074 }
1075
1076 let span_events = self
1077 .storage
1078 .get_all_span_events()
1079 .context("failed to get span events")?
1080 .collect::<Vec<_>>();
1081
1082 for span_event in span_events {
1083 let span_event = span_event.context("failed to get span event")?;
1084 target_storage
1085 .insert_span_event((*span_event).clone())
1086 .context("failed to insert span event")?;
1087 }
1088
1089 let events = self
1090 .storage
1091 .get_all_events()
1092 .context("failed to get events")?
1093 .collect::<Vec<_>>();
1094
1095 for event in events {
1096 let event = event.context("failed to get event")?;
1097 target_storage
1098 .insert_event((*event).clone())
1099 .context("failed to insert event")?;
1100 }
1101
1102 Ok(())
1103 }
1104
1105 #[instrument(level = tracing::Level::TRACE, skip_all)]
1106 pub fn subscribe_to_spans(
1107 &mut self,
1108 filter: Vec<FilterPredicate>,
1109 ) -> Result<Subscriber<ComposedSpan>, AnyError> {
1110 let mut filter = BasicSpanFilter::And(
1111 filter
1112 .into_iter()
1113 .map(|p| BasicSpanFilter::from_predicate(p, &self.span_indexes.ids))
1114 .collect::<Result<_, _>>()
1115 .context("invalid span filter")?,
1116 );
1117 filter.simplify();
1118
1119 let id = self.next_subscriber_id;
1120 self.next_subscriber_id += 1;
1121
1122 let (sender, receiver) = mpsc::unbounded_channel();
1123
1124 self.span_subscribers
1125 .insert(id, SpanSubscription::new(filter, sender));
1126
1127 Ok((id, receiver))
1128 }
1129
1130 #[instrument(level = tracing::Level::TRACE, skip_all)]
1131 pub fn unsubscribe_from_spans(&mut self, id: SubscriptionId) {
1132 self.span_subscribers.remove(&id);
1133 }
1134
1135 #[instrument(level = tracing::Level::TRACE, skip_all)]
1136 pub fn subscribe_to_events(
1137 &mut self,
1138 filter: Vec<FilterPredicate>,
1139 ) -> Result<Subscriber<ComposedEvent>, AnyError> {
1140 let mut filter = BasicEventFilter::And(
1141 filter
1142 .into_iter()
1143 .map(|p| BasicEventFilter::from_predicate(p, &self.span_indexes.ids))
1144 .collect::<Result<_, _>>()
1145 .context("invalid event filter")?,
1146 );
1147 filter.simplify();
1148
1149 let id = self.next_subscriber_id;
1150 self.next_subscriber_id += 1;
1151
1152 let (sender, receiver) = mpsc::unbounded_channel();
1153
1154 self.event_subscribers
1155 .insert(id, EventSubscription::new(filter, sender));
1156
1157 Ok((id, receiver))
1158 }
1159
1160 #[instrument(level = tracing::Level::TRACE, skip_all)]
1161 pub fn unsubscribe_from_events(&mut self, id: SubscriptionId) {
1162 self.event_subscribers.remove(&id);
1163 }
1164
1165 #[instrument(level = tracing::Level::TRACE, skip_all)]
1166 pub fn shutdown(&mut self) -> Result<(), AnyError> {
1167 if let Some(s) = self.storage.as_index_storage_mut() {
1168 s.update_indexes(
1169 &self.span_indexes,
1170 &self.span_event_indexes,
1171 &self.event_indexes,
1172 )
1173 .context("failed to update indexes")?;
1174 }
1175
1176 Ok(())
1177 }
1178}
1179
1180fn get_unique_timestamp(mut timestamp: Timestamp, existing: &[Timestamp]) -> Timestamp {
1181 let mut idx = existing.lower_bound(×tamp);
1182
1183 while idx < existing.len() && timestamp == existing[idx] {
1184 idx += 1;
1185 timestamp = timestamp.saturating_add(1);
1186 }
1187
1188 timestamp
1189}
1190
1191fn now() -> Timestamp {
1192 #[cfg(test)]
1193 return Timestamp::new(1000).unwrap();
1194
1195 #[cfg(not(test))]
1196 {
1197 use std::time::{SystemTime, UNIX_EPOCH};
1198 let timestamp = SystemTime::now()
1199 .duration_since(UNIX_EPOCH)
1200 .expect("now should not be before the UNIX epoch")
1201 .as_micros();
1202
1203 let timestamp = u64::try_from(timestamp)
1204 .expect("microseconds shouldn't exceed a u64 until the year 586,912 AD");
1205
1206 Timestamp::new(timestamp).expect("now should not be at the UNIX epoch")
1207 }
1208}
1209
1210#[cfg(test)]
1211mod tests {
1212 use crate::filter::Order;
1213 use crate::models::{
1214 Level, NewCloseSpanEvent, NewCreateSpanEvent, NewUpdateSpanEvent, SourceKind,
1215 };
1216 use crate::storage::TransientStorage;
1217 use crate::Value;
1218
1219 use super::*;
1220
1221 #[test]
1222 fn test_event_filters() {
1223 let mut engine = SyncEngine::new(TransientStorage::new()).unwrap();
1224
1225 let resource_key = engine
1226 .insert_resource(NewResource {
1227 attributes: BTreeMap::new(),
1228 })
1229 .unwrap();
1230
1231 let simple = |id: u64, level: i32, attribute1: &str, attribute2: &str| -> NewEvent {
1232 NewEvent {
1233 kind: SourceKind::Tracing,
1234 resource_key,
1235 timestamp: id.try_into().unwrap(),
1236 span_id: None,
1237 content: Value::Str("event".to_owned()),
1238 namespace: Some("crate::storage::tests".to_owned()),
1239 function: Some("test".to_owned()),
1240 level: Level::from_tracing_level(level).unwrap(),
1241 file_name: None,
1242 file_line: None,
1243 file_column: None,
1244 attributes: BTreeMap::from_iter([
1245 ("attribute1".to_owned(), Value::Str(attribute1.to_owned())),
1246 ("attribute2".to_owned(), Value::Str(attribute2.to_owned())),
1247 ]),
1248 }
1249 };
1250
1251 engine.insert_event(simple(1, 4, "test", "A")).unwrap(); engine.insert_event(simple(2, 1, "test", "A")).unwrap(); engine.insert_event(simple(3, 2, "test", "A")).unwrap(); engine.insert_event(simple(4, 3, "test", "A")).unwrap();
1255 engine.insert_event(simple(5, 4, "test", "A")).unwrap();
1256 engine.insert_event(simple(6, 4, "blah", "A")).unwrap(); engine.insert_event(simple(7, 4, "test", "B")).unwrap(); engine.insert_event(simple(8, 4, "test", "C")).unwrap(); engine.insert_event(simple(9, 4, "test", "A")).unwrap(); let events = engine.query_event(Query {
1262 filter: FilterPredicate::parse(
1263 "#level: >=WARN @\"attribute1\": test @\"attribute2\": A",
1264 )
1265 .unwrap(),
1266 order: Order::Asc,
1267 limit: 3,
1268 start: Timestamp::new(2).unwrap(),
1269 end: Timestamp::new(8).unwrap(),
1270 previous: None,
1271 });
1272
1273 assert_eq!(events.len(), 2);
1274 assert_eq!(events[0].timestamp, Timestamp::new(4).unwrap());
1275 assert_eq!(events[1].timestamp, Timestamp::new(5).unwrap());
1276 }
1277
1278 #[test]
1279 fn test_span_filters() {
1280 let mut engine = SyncEngine::new(TransientStorage::new()).unwrap();
1281
1282 let resource_key = engine
1283 .insert_resource(NewResource {
1284 attributes: BTreeMap::new(),
1285 })
1286 .unwrap();
1287
1288 let simple_open =
1289 |open: u64, level: i32, attribute1: &str, attribute2: &str| -> NewSpanEvent {
1290 NewSpanEvent {
1291 timestamp: Timestamp::new(open).unwrap(),
1292 span_id: FullSpanId::Tracing(1.try_into().unwrap(), open),
1293 kind: NewSpanEventKind::Create(NewCreateSpanEvent {
1294 kind: SourceKind::Tracing,
1295 resource_key,
1296 parent_id: None,
1297 name: "test".to_owned(),
1298 namespace: Some("crate::storage::tests".to_owned()),
1299 function: None,
1300 level: Level::from_tracing_level(level).unwrap(),
1301 file_name: None,
1302 file_line: None,
1303 file_column: None,
1304 instrumentation_attributes: BTreeMap::default(),
1305 attributes: BTreeMap::from_iter([
1306 ("attribute1".to_owned(), Value::Str(attribute1.to_owned())),
1307 ("attribute2".to_owned(), Value::Str(attribute2.to_owned())),
1308 ]),
1309 }),
1310 }
1311 };
1312
1313 let simple_close = |open: u64, close: u64| -> NewSpanEvent {
1314 NewSpanEvent {
1315 timestamp: Timestamp::new(close).unwrap(),
1316 span_id: FullSpanId::Tracing(1.try_into().unwrap(), open),
1317 kind: NewSpanEventKind::Close(NewCloseSpanEvent { busy: None }),
1318 }
1319 };
1320
1321 engine
1322 .insert_span_event(simple_open(1, 4, "test", "A"))
1323 .unwrap(); engine.insert_span_event(simple_close(1, 2)).unwrap();
1325 engine
1326 .insert_span_event(simple_open(3, 1, "test", "A"))
1327 .unwrap(); engine.insert_span_event(simple_close(3, 6)).unwrap();
1329 engine
1330 .insert_span_event(simple_open(4, 2, "test", "A"))
1331 .unwrap(); engine.insert_span_event(simple_close(4, 7)).unwrap();
1333 engine
1334 .insert_span_event(simple_open(5, 3, "test", "A"))
1335 .unwrap();
1336 engine.insert_span_event(simple_close(5, 8)).unwrap();
1337 engine
1338 .insert_span_event(simple_open(9, 4, "test", "A"))
1339 .unwrap();
1340 engine
1341 .insert_span_event(simple_open(10, 4, "blah", "A"))
1342 .unwrap(); engine
1344 .insert_span_event(simple_open(11, 4, "test", "B"))
1345 .unwrap(); engine
1347 .insert_span_event(simple_open(12, 4, "test", "C"))
1348 .unwrap(); engine
1350 .insert_span_event(simple_open(13, 4, "test", "A"))
1351 .unwrap(); let spans = engine.query_span(Query {
1354 filter: FilterPredicate::parse(
1355 "#level: >=WARN @\"attribute1\": test @\"attribute2\": A",
1356 )
1357 .unwrap(),
1358 order: Order::Asc,
1359 limit: 5,
1360 start: Timestamp::new(2).unwrap(),
1361 end: Timestamp::new(10).unwrap(),
1362 previous: None,
1363 });
1364
1365 assert_eq!(spans.len(), 2);
1366 assert_eq!(spans[0].created_at, Timestamp::new(5).unwrap());
1367 assert_eq!(spans[1].created_at, Timestamp::new(9).unwrap());
1368 }
1369
1370 #[test]
1371 fn event_found_with_resource_attribute() {
1372 let mut engine = SyncEngine::new(TransientStorage::new()).unwrap();
1373
1374 let resource_key = engine
1375 .insert_resource(NewResource {
1376 attributes: BTreeMap::from_iter([("attr1".to_owned(), Value::Str("A".to_owned()))]),
1377 })
1378 .unwrap();
1379
1380 let now = now();
1381 engine
1382 .insert_event(NewEvent {
1383 kind: SourceKind::Tracing,
1384 resource_key,
1385 timestamp: now.saturating_add(1),
1386 span_id: None,
1387 content: Value::Str("event".to_owned()),
1388 namespace: Some("crate::storage::tests".to_owned()),
1389 function: Some("test".to_owned()),
1390 level: Level::Error,
1391 file_name: None,
1392 file_line: None,
1393 file_column: None,
1394 attributes: BTreeMap::new(),
1395 })
1396 .unwrap();
1397
1398 let events = engine.query_event(Query {
1399 filter: FilterPredicate::parse("@\"attr1\": A").unwrap(),
1400 order: Order::Asc,
1401 limit: 5,
1402 start: now,
1403 end: now.saturating_add(2),
1404 previous: None,
1405 });
1406
1407 assert_eq!(events.len(), 1);
1408
1409 let events = engine.query_event(Query {
1410 filter: FilterPredicate::parse("@\"attr1\": B").unwrap(),
1411 order: Order::Asc,
1412 limit: 5,
1413 start: now,
1414 end: now.saturating_add(2),
1415 previous: None,
1416 });
1417
1418 assert_eq!(events.len(), 0);
1419 }
1420
1421 #[test]
1422 fn event_found_with_inherent_attribute() {
1423 let mut engine = SyncEngine::new(TransientStorage::new()).unwrap();
1424
1425 let resource_key = engine
1426 .insert_resource(NewResource {
1427 attributes: BTreeMap::from_iter([("attr1".to_owned(), Value::Str("A".to_owned()))]),
1428 })
1429 .unwrap();
1430
1431 let now = now();
1432 engine
1433 .insert_event(NewEvent {
1434 kind: SourceKind::Tracing,
1435 resource_key,
1436 timestamp: now.saturating_add(1),
1437 span_id: None,
1438 content: Value::Str("event".to_owned()),
1439 namespace: Some("crate::storage::tests".to_owned()),
1440 function: Some("test".to_owned()),
1441 level: Level::Error,
1442 file_name: None,
1443 file_line: None,
1444 file_column: None,
1445 attributes: BTreeMap::from_iter([("attr1".to_owned(), Value::Str("B".to_owned()))]),
1446 })
1447 .unwrap();
1448
1449 let events = engine.query_event(Query {
1450 filter: FilterPredicate::parse("@\"attr1\": A").unwrap(),
1451 order: Order::Asc,
1452 limit: 5,
1453 start: now,
1454 end: now.saturating_add(2),
1455 previous: None,
1456 });
1457
1458 assert_eq!(events.len(), 0);
1459
1460 let events = engine.query_event(Query {
1461 filter: FilterPredicate::parse("@\"attr1\": B").unwrap(),
1462 order: Order::Asc,
1463 limit: 5,
1464 start: now,
1465 end: now.saturating_add(2),
1466 previous: None,
1467 });
1468
1469 assert_eq!(events.len(), 1);
1470 }
1471
1472 #[test]
1473 fn event_found_with_span_attribute() {
1474 let mut engine = SyncEngine::new(TransientStorage::new()).unwrap();
1475
1476 let resource_key = engine
1477 .insert_resource(NewResource {
1478 attributes: BTreeMap::from_iter([("attr1".to_owned(), Value::Str("A".to_owned()))]),
1479 })
1480 .unwrap();
1481
1482 engine
1483 .insert_span_event(NewSpanEvent {
1484 timestamp: now(),
1485 span_id: FullSpanId::Tracing(1.try_into().unwrap(), 1),
1486 kind: NewSpanEventKind::Create(NewCreateSpanEvent {
1487 kind: SourceKind::Tracing,
1488 resource_key,
1489 parent_id: None,
1490 name: "test".to_owned(),
1491 namespace: Some("crate::storage::tests".to_owned()),
1492 function: None,
1493 level: Level::Error,
1494 file_name: None,
1495 file_line: None,
1496 file_column: None,
1497 instrumentation_attributes: BTreeMap::default(),
1498 attributes: BTreeMap::from_iter([(
1499 "attr1".to_owned(),
1500 Value::Str("C".to_owned()),
1501 )]),
1502 }),
1503 })
1504 .unwrap();
1505
1506 let now = now();
1507 engine
1508 .insert_event(NewEvent {
1509 kind: SourceKind::Tracing,
1510 resource_key,
1511 timestamp: now.saturating_add(1),
1512 span_id: Some(FullSpanId::Tracing(1.try_into().unwrap(), 1)),
1513 content: Value::Str("event".to_owned()),
1514 namespace: Some("crate::storage::tests".to_owned()),
1515 function: Some("test".to_owned()),
1516 level: Level::Error,
1517 file_name: None,
1518 file_line: None,
1519 file_column: None,
1520 attributes: BTreeMap::new(),
1521 })
1522 .unwrap();
1523
1524 let events = engine.query_event(Query {
1525 filter: FilterPredicate::parse("@\"attr1\": A").unwrap(),
1526 order: Order::Asc,
1527 limit: 5,
1528 start: now,
1529 end: now.saturating_add(2),
1530 previous: None,
1531 });
1532
1533 assert_eq!(events.len(), 0);
1534
1535 let events = engine.query_event(Query {
1536 filter: FilterPredicate::parse("@\"attr1\": C").unwrap(),
1537 order: Order::Asc,
1538 limit: 5,
1539 start: now,
1540 end: now.saturating_add(2),
1541 previous: None,
1542 });
1543
1544 assert_eq!(events.len(), 1);
1545 }
1546
1547 #[test]
1548 fn event_found_with_nonindexed_updated_span_attribute() {
1549 let mut engine = SyncEngine::new(TransientStorage::new()).unwrap();
1550
1551 let resource_key = engine
1552 .insert_resource(NewResource {
1553 attributes: BTreeMap::from_iter([("attr1".to_owned(), Value::Str("A".to_owned()))]),
1554 })
1555 .unwrap();
1556
1557 engine
1558 .insert_span_event(NewSpanEvent {
1559 timestamp: now(),
1560 span_id: FullSpanId::Tracing(1.try_into().unwrap(), 1),
1561 kind: NewSpanEventKind::Create(NewCreateSpanEvent {
1562 kind: SourceKind::Tracing,
1563 resource_key,
1564 parent_id: None,
1565 name: "test".to_owned(),
1566 namespace: Some("crate::storage::tests".to_owned()),
1567 function: None,
1568 level: Level::Error,
1569 file_name: None,
1570 file_line: None,
1571 file_column: None,
1572 instrumentation_attributes: BTreeMap::default(),
1573 attributes: BTreeMap::new(),
1574 }),
1575 })
1576 .unwrap();
1577
1578 let now = now();
1579 engine
1580 .insert_event(NewEvent {
1581 kind: SourceKind::Tracing,
1582 resource_key,
1583 timestamp: now.saturating_add(1),
1584 span_id: Some(FullSpanId::Tracing(1.try_into().unwrap(), 1)),
1585 content: Value::Str("event".to_owned()),
1586 namespace: Some("crate::storage::tests".to_owned()),
1587 function: Some("test".to_owned()),
1588 level: Level::Error,
1589 file_name: None,
1590 file_line: None,
1591 file_column: None,
1592 attributes: BTreeMap::new(),
1593 })
1594 .unwrap();
1595
1596 engine
1597 .insert_span_event(NewSpanEvent {
1598 timestamp: super::now(),
1599 span_id: FullSpanId::Tracing(1.try_into().unwrap(), 1),
1600 kind: NewSpanEventKind::Update(NewUpdateSpanEvent {
1601 attributes: BTreeMap::from_iter([(
1602 "attr1".to_owned(),
1603 Value::Str("C".to_owned()),
1604 )]),
1605 }),
1606 })
1607 .unwrap();
1608
1609 let events = engine.query_event(Query {
1610 filter: FilterPredicate::parse("@\"attr1\": A").unwrap(),
1611 order: Order::Asc,
1612 limit: 5,
1613 start: now,
1614 end: now.saturating_add(2),
1615 previous: None,
1616 });
1617
1618 assert_eq!(events.len(), 0);
1619
1620 let events = engine.query_event(Query {
1621 filter: FilterPredicate::parse("@\"attr1\": C").unwrap(),
1622 order: Order::Asc,
1623 limit: 5,
1624 start: now,
1625 end: now.saturating_add(2),
1626 previous: None,
1627 });
1628
1629 assert_eq!(events.len(), 1);
1630 }
1631
1632 #[test]
1633 fn event_found_with_indexed_updated_span_attribute() {
1634 let mut engine = SyncEngine::new(TransientStorage::new()).unwrap();
1635
1636 let resource_key = engine
1637 .insert_resource(NewResource {
1638 attributes: BTreeMap::from_iter([("attr1".to_owned(), Value::Str("A".to_owned()))]),
1639 })
1640 .unwrap();
1641
1642 engine
1643 .insert_span_event(NewSpanEvent {
1644 timestamp: now(),
1645 span_id: FullSpanId::Tracing(1.try_into().unwrap(), 1),
1646 kind: NewSpanEventKind::Create(NewCreateSpanEvent {
1647 kind: SourceKind::Tracing,
1648 resource_key,
1649 parent_id: None,
1650 name: "test".to_owned(),
1651 namespace: Some("crate::storage::tests".to_owned()),
1652 function: None,
1653 level: Level::Error,
1654 file_name: None,
1655 file_line: None,
1656 file_column: None,
1657 instrumentation_attributes: BTreeMap::default(),
1658 attributes: BTreeMap::new(),
1659 }),
1660 })
1661 .unwrap();
1662
1663 let now = now();
1664 engine
1665 .insert_event(NewEvent {
1666 kind: SourceKind::Tracing,
1667 resource_key,
1668 timestamp: now.saturating_add(1),
1669 span_id: Some(FullSpanId::Tracing(1.try_into().unwrap(), 1)),
1670 content: Value::Str("event".to_owned()),
1671 namespace: Some("crate::storage::tests".to_owned()),
1672 function: Some("test".to_owned()),
1673 level: Level::Error,
1674 file_name: None,
1675 file_line: None,
1676 file_column: None,
1677 attributes: BTreeMap::new(),
1678 })
1679 .unwrap();
1680
1681 engine
1682 .insert_span_event(NewSpanEvent {
1683 timestamp: super::now(),
1684 span_id: FullSpanId::Tracing(1.try_into().unwrap(), 1),
1685 kind: NewSpanEventKind::Update(NewUpdateSpanEvent {
1686 attributes: BTreeMap::from_iter([(
1687 "attr1".to_owned(),
1688 Value::Str("C".to_owned()),
1689 )]),
1690 }),
1691 })
1692 .unwrap();
1693
1694 let events = engine.query_event(Query {
1695 filter: FilterPredicate::parse("@\"attr1\": A").unwrap(),
1696 order: Order::Asc,
1697 limit: 5,
1698 start: now,
1699 end: now.saturating_add(2),
1700 previous: None,
1701 });
1702
1703 assert_eq!(events.len(), 0);
1704
1705 let events = engine.query_event(Query {
1706 filter: FilterPredicate::parse("@\"attr1\": C").unwrap(),
1707 order: Order::Asc,
1708 limit: 5,
1709 start: now,
1710 end: now.saturating_add(2),
1711 previous: None,
1712 });
1713
1714 assert_eq!(events.len(), 1);
1715 }
1716
1717 #[test]
1718 fn empty_filter_returns_all_event_counts() {
1719 let mut engine = SyncEngine::new(TransientStorage::new()).unwrap();
1720
1721 let resource_key = engine
1722 .insert_resource(NewResource {
1723 attributes: BTreeMap::from_iter([("attr1".to_owned(), Value::Str("A".to_owned()))]),
1724 })
1725 .unwrap();
1726
1727 let now = now();
1728 engine
1729 .insert_event(NewEvent {
1730 kind: SourceKind::Tracing,
1731 resource_key,
1732 timestamp: now.saturating_add(1),
1733 span_id: None,
1734 content: Value::Str("event".to_owned()),
1735 namespace: None,
1736 function: None,
1737 level: Level::Error,
1738 file_name: None,
1739 file_line: None,
1740 file_column: None,
1741 attributes: BTreeMap::new(),
1742 })
1743 .unwrap();
1744
1745 let query = Query {
1746 filter: FilterPredicate::parse("").unwrap(),
1747 order: Order::Asc,
1748 limit: 0,
1749 start: Timestamp::MIN,
1750 end: Timestamp::MAX,
1751 previous: None,
1752 };
1753
1754 assert_eq!(engine.query_event_count(query.clone()), 1)
1755 }
1756
1757 #[test]
1758 fn span_found_with_updated_attribute() {
1759 let mut engine = SyncEngine::new(TransientStorage::new()).unwrap();
1760
1761 let resource_key = engine
1762 .insert_resource(NewResource {
1763 attributes: BTreeMap::from_iter([("attr1".to_owned(), Value::Str("A".to_owned()))]),
1764 })
1765 .unwrap();
1766
1767 engine
1768 .insert_span_event(NewSpanEvent {
1769 timestamp: now(),
1770 span_id: FullSpanId::Tracing(1.try_into().unwrap(), 1),
1771 kind: NewSpanEventKind::Create(NewCreateSpanEvent {
1772 kind: SourceKind::Tracing,
1773 resource_key,
1774 parent_id: None,
1775 name: "test".to_owned(),
1776 namespace: Some("crate::storage::tests".to_owned()),
1777 function: None,
1778 level: Level::Error,
1779 file_name: None,
1780 file_line: None,
1781 file_column: None,
1782 instrumentation_attributes: BTreeMap::default(),
1783 attributes: BTreeMap::new(),
1784 }),
1785 })
1786 .unwrap();
1787
1788 let now = now();
1789 engine
1790 .insert_span_event(NewSpanEvent {
1791 timestamp: now,
1792 span_id: FullSpanId::Tracing(1.try_into().unwrap(), 1),
1793 kind: NewSpanEventKind::Update(NewUpdateSpanEvent {
1794 attributes: BTreeMap::from_iter([(
1795 "attr1".to_owned(),
1796 Value::Str("C".to_owned()),
1797 )]),
1798 }),
1799 })
1800 .unwrap();
1801
1802 let spans = engine.query_span(Query {
1803 filter: FilterPredicate::parse("@\"attr1\": C").unwrap(),
1804 order: Order::Asc,
1805 limit: 5,
1806 start: now,
1807 end: now.saturating_add(2),
1808 previous: None,
1809 });
1810
1811 assert_eq!(spans.len(), 1);
1812 }
1813
1814 #[test]
1815 fn span_found_with_post_parent_attribute() {
1816 let mut engine = SyncEngine::new(TransientStorage::new()).unwrap();
1817
1818 let resource_key = engine
1819 .insert_resource(NewResource {
1820 attributes: BTreeMap::from_iter([]),
1821 })
1822 .unwrap();
1823
1824 engine
1825 .insert_span_event(NewSpanEvent {
1826 timestamp: 1001.try_into().unwrap(),
1827 span_id: FullSpanId::Opentelemetry(1, 2),
1828 kind: NewSpanEventKind::Create(NewCreateSpanEvent {
1829 kind: SourceKind::Opentelemetry,
1830 resource_key,
1831 parent_id: Some(FullSpanId::Opentelemetry(1, 1)),
1832 name: "test".to_owned(),
1833 namespace: Some("crate::storage::tests".to_owned()),
1834 function: None,
1835 level: Level::Error,
1836 file_name: None,
1837 file_line: None,
1838 file_column: None,
1839 instrumentation_attributes: BTreeMap::default(),
1840 attributes: BTreeMap::new(),
1841 }),
1842 })
1843 .unwrap();
1844
1845 engine
1846 .insert_span_event(NewSpanEvent {
1847 timestamp: 1000.try_into().unwrap(),
1848 span_id: FullSpanId::Opentelemetry(1, 1),
1849 kind: NewSpanEventKind::Create(NewCreateSpanEvent {
1850 kind: SourceKind::Opentelemetry,
1851 resource_key,
1852 parent_id: None,
1853 name: "test".to_owned(),
1854 namespace: Some("crate::storage::tests".to_owned()),
1855 function: None,
1856 level: Level::Error,
1857 file_name: None,
1858 file_line: None,
1859 file_column: None,
1860 instrumentation_attributes: BTreeMap::default(),
1861 attributes: BTreeMap::from_iter([(
1862 "attr1".to_owned(),
1863 Value::Str("C".to_owned()),
1864 )]),
1865 }),
1866 })
1867 .unwrap();
1868
1869 let spans = engine.query_span(Query {
1870 filter: FilterPredicate::parse("@\"attr1\": C").unwrap(),
1871 order: Order::Asc,
1872 limit: 5,
1873 start: now(),
1874 end: now().saturating_add(2),
1875 previous: None,
1876 });
1877
1878 assert_eq!(spans.len(), 2);
1879 }
1880}