venator_engine/engine/
sync_engine.rs

1use std::collections::{BTreeMap, HashMap};
2
3use anyhow::{anyhow, Context, Error as AnyError};
4use tokio::sync::mpsc::{self};
5use tracing::instrument;
6
7use crate::context::{EventContext, SpanContext};
8use crate::filter::{
9    BasicEventFilter, BasicSpanFilter, BoundSearch, FilterPredicate, IndexedEventFilter,
10    IndexedEventFilterIterator, IndexedSpanFilter, IndexedSpanFilterIterator, Query,
11};
12use crate::index::{EventIndexes, SpanEventIndexes, SpanIndexes};
13use crate::models::{CloseSpanEvent, EnterSpanEvent, EventKey, FollowsSpanEvent};
14use crate::storage::Storage;
15use crate::subscription::{EventSubscription, SpanSubscription, Subscriber};
16use crate::{
17    ComposedEvent, ComposedSpan, CreateSpanEvent, DatasetStats, DeleteFilter, DeleteMetrics, Event,
18    FullSpanId, InstanceId, NewEvent, NewResource, NewSpanEvent, NewSpanEventKind, Resource,
19    ResourceKey, Span, SpanEvent, SpanEventKey, SpanEventKind, SpanKey, SubscriptionId, Timestamp,
20    UpdateSpanEvent, ValueOperator,
21};
22
23/// Provides the core engine functionality.
24pub struct SyncEngine<S> {
25    pub(crate) storage: S,
26    pub(crate) span_indexes: SpanIndexes,
27    pub(crate) span_event_indexes: SpanEventIndexes,
28    pub(crate) event_indexes: EventIndexes,
29
30    resources: HashMap<ResourceKey, Resource>,
31
32    next_subscriber_id: usize,
33    span_subscribers: HashMap<usize, SpanSubscription>,
34    event_subscribers: HashMap<usize, EventSubscription>,
35}
36
37impl<S: Storage> SyncEngine<S> {
38    pub fn new(storage: S) -> Result<SyncEngine<S>, AnyError> {
39        let mut engine = SyncEngine {
40            storage,
41            span_indexes: SpanIndexes::new(),
42            span_event_indexes: SpanEventIndexes::new(),
43            event_indexes: EventIndexes::new(),
44
45            resources: HashMap::new(),
46
47            next_subscriber_id: 0,
48            span_subscribers: HashMap::new(),
49            event_subscribers: HashMap::new(),
50        };
51
52        tracing::info!("initializing engine");
53
54        let resources = engine
55            .storage
56            .get_all_resources()
57            .context("failed to load resources")?
58            .collect::<Vec<_>>();
59
60        for resource in resources {
61            let resource = resource.context("failed to load resource")?;
62            engine.insert_resource_bookeeping(&resource);
63        }
64
65        let indexes_result = engine
66            .storage
67            .as_index_storage()
68            .and_then(|s| s.get_indexes().transpose());
69
70        match indexes_result {
71            Some(Ok(indexes)) => {
72                tracing::info!("loaded indexes from storage");
73
74                let (span_indexes, span_event_indexes, event_indexes) = indexes;
75
76                engine.span_indexes = span_indexes;
77                engine.span_event_indexes = span_event_indexes;
78                engine.event_indexes = event_indexes;
79            }
80            Some(Err(err)) => {
81                tracing::warn!(?err, "failed to load indexes from storage");
82
83                let spans = engine
84                    .storage
85                    .get_all_spans()
86                    .context("failed to load spans")?
87                    .collect::<Vec<_>>();
88
89                for span in spans {
90                    let span = span.context("failed to load span")?;
91                    engine.insert_span_bookeeping(&span);
92                }
93
94                let span_events = engine
95                    .storage
96                    .get_all_span_events()
97                    .context("failed to load span events")?
98                    .collect::<Vec<_>>();
99
100                for span_event in span_events {
101                    let span_event = span_event.context("failed to load span event")?;
102                    engine.insert_span_event_bookeeping(&span_event);
103                }
104
105                let events = engine
106                    .storage
107                    .get_all_events()
108                    .context("failed to load events")?
109                    .collect::<Vec<_>>();
110
111                for event in events {
112                    let event = event.context("failed to load event")?;
113                    engine.insert_event_bookeeping(&event);
114                }
115            }
116            None => {
117                tracing::warn!("no indexes from storage");
118
119                let spans = engine
120                    .storage
121                    .get_all_spans()
122                    .context("failed to load spans")?
123                    .collect::<Vec<_>>();
124
125                for span in spans {
126                    let span = span.context("failed to load span")?;
127                    engine.insert_span_bookeeping(&span);
128                }
129
130                let span_events = engine
131                    .storage
132                    .get_all_span_events()
133                    .context("failed to load span events")?
134                    .collect::<Vec<_>>();
135
136                for span_event in span_events {
137                    let span_event = span_event.context("failed to load span event")?;
138                    engine.insert_span_event_bookeeping(&span_event);
139                }
140
141                let events = engine
142                    .storage
143                    .get_all_events()
144                    .context("failed to load events")?
145                    .collect::<Vec<_>>();
146
147                for event in events {
148                    let event = event.context("failed to load event")?;
149                    engine.insert_event_bookeeping(&event);
150                }
151            }
152        }
153
154        if !engine.span_indexes.durations.open.is_empty() {
155            let last_event = engine.event_indexes.all.last();
156            let last_span_event = engine.span_event_indexes.all.last();
157            let last_at = match (last_event, last_span_event) {
158                (Some(event), Some(span_event)) => Ord::max(*event, *span_event),
159                (None, Some(span_event)) => *span_event,
160                (Some(event), None) => *event,
161                (None, None) => panic!("not possible to have open span but no span events"),
162            };
163
164            let at = last_at.saturating_add(1);
165
166            for span_key in engine.span_indexes.durations.open.clone() {
167                engine.span_indexes.update_with_closed(span_key, at);
168                engine
169                    .storage
170                    .update_span_closed(span_key, at, None)
171                    .context("failed to close span")?;
172            }
173        }
174
175        tracing::info!("loaded {} spans", engine.span_indexes.all.len());
176        tracing::info!("loaded {} span events", engine.span_event_indexes.all.len());
177        tracing::info!("loaded {} events", engine.event_indexes.all.len());
178
179        Ok(engine)
180    }
181
182    #[instrument(level = tracing::Level::TRACE, skip_all)]
183    pub fn query_event(&self, query: Query) -> Vec<ComposedEvent> {
184        tracing::debug!(?query, "querying for events");
185
186        let limit = query.limit;
187        IndexedEventFilterIterator::new(query, self)
188            .take(limit)
189            .filter_map(|event_key| {
190                self.storage
191                    .get_event(event_key)
192                    .inspect_err(|err| tracing::warn!(?err, "failed to load event"))
193                    .ok()
194            })
195            .map(|event| EventContext::with_event(&event, &self.storage).render())
196            .collect()
197    }
198
199    #[instrument(level = tracing::Level::TRACE, skip_all)]
200    pub fn query_event_count(&self, query: Query) -> usize {
201        tracing::debug!(?query, "querying for event counts");
202
203        let event_iter = IndexedEventFilterIterator::new(query, self);
204
205        match event_iter.size_hint() {
206            (min, Some(max)) if min == max => min,
207            _ => event_iter.count(),
208        }
209    }
210
211    #[instrument(level = tracing::Level::TRACE, skip_all)]
212    pub fn query_span(&self, query: Query) -> Vec<ComposedSpan> {
213        tracing::debug!(?query, "querying for spans");
214
215        let limit = query.limit;
216        IndexedSpanFilterIterator::new(query, self)
217            .take(limit)
218            .filter_map(|span_key| {
219                self.storage
220                    .get_span(span_key)
221                    .inspect_err(|err| tracing::warn!(?err, "failed to load span"))
222                    .ok()
223            })
224            .map(|span| SpanContext::with_span(&span, &self.storage).render())
225            .collect()
226    }
227
228    #[instrument(level = tracing::Level::TRACE, skip_all)]
229    pub fn query_span_count(&self, query: Query) -> usize {
230        tracing::debug!(?query, "querying for span counts");
231
232        let span_iter = IndexedSpanFilterIterator::new(query, self);
233
234        match span_iter.size_hint() {
235            (min, Some(max)) if min == max => min,
236            _ => span_iter.count(),
237        }
238    }
239
240    #[instrument(level = tracing::Level::TRACE, skip_all)]
241    #[doc(hidden)]
242    pub fn query_span_event(&self, _query: Query) -> Vec<SpanEvent> {
243        unimplemented!()
244    }
245
246    #[instrument(level = tracing::Level::TRACE, skip_all)]
247    pub fn query_stats(&self) -> DatasetStats {
248        tracing::debug!("querying for stats");
249
250        let event_start = self.event_indexes.all.first().copied();
251        let event_end = self.event_indexes.all.last().copied();
252        let span_start = self.span_indexes.all.first().copied();
253        let span_end = self.span_indexes.all.last().copied(); // TODO: not technically right, but maybe okay
254
255        DatasetStats {
256            start: crate::filter::merge(event_start, span_start, Ord::min),
257            end: crate::filter::merge(event_end, span_end, Ord::max),
258            total_events: self.event_indexes.all.len(),
259            total_spans: self.span_indexes.all.len(),
260        }
261    }
262
263    #[instrument(level = tracing::Level::TRACE, skip_all)]
264    pub fn insert_resource(&mut self, resource: NewResource) -> Result<ResourceKey, AnyError> {
265        tracing::debug!(?resource, "inserting resource");
266
267        if let Some((key, _)) = self
268            .resources
269            .iter()
270            .find(|(_, r)| r.attributes == resource.attributes)
271        {
272            return Ok(*key);
273        }
274
275        let keys = self.resources.keys().copied().collect::<Vec<_>>();
276        let now = now();
277        let resource_key = get_unique_timestamp(now, &keys);
278        let resource = Resource {
279            created_at: resource_key,
280            attributes: resource.attributes,
281        };
282
283        self.insert_resource_bookeeping(&resource);
284        self.storage
285            .insert_resource(resource)
286            .context("failed to insert resource")?;
287
288        Ok(resource_key)
289    }
290
291    fn insert_resource_bookeeping(&mut self, resource: &Resource) {
292        self.resources.insert(resource.key(), resource.clone());
293    }
294
295    #[instrument(level = tracing::Level::TRACE, skip_all)]
296    pub fn disconnect_tracing_instance(&mut self, instance_id: InstanceId) -> Result<(), AnyError> {
297        tracing::debug!(instance_id, "disconnecting tracing instance");
298
299        let at = now();
300
301        let filter = IndexedSpanFilter::And(vec![
302            IndexedSpanFilter::Single(&self.span_indexes.durations.open, None),
303            IndexedSpanFilter::Single(
304                self.span_indexes
305                    .instances
306                    .get(&instance_id)
307                    .map(Vec::as_slice)
308                    .unwrap_or_default(),
309                None,
310            ),
311        ]);
312
313        let open_spans = IndexedSpanFilterIterator::new_internal(filter, self).collect::<Vec<_>>();
314
315        for span_key in open_spans {
316            self.span_indexes.update_with_closed(span_key, at);
317            self.storage
318                .update_span_closed(span_key, at, None)
319                .context("failed to close span")?;
320        }
321
322        Ok(())
323    }
324
325    #[instrument(level = tracing::Level::TRACE, skip_all)]
326    pub fn insert_span_event(
327        &mut self,
328        mut new_span_event: NewSpanEvent,
329    ) -> Result<SpanEventKey, AnyError> {
330        tracing::debug!(span_event = ?new_span_event, "inserting span event");
331
332        let span_event_key =
333            get_unique_timestamp(new_span_event.timestamp, &self.span_event_indexes.all);
334        new_span_event.timestamp = span_event_key;
335
336        match new_span_event.kind {
337            NewSpanEventKind::Create(new_create_event) => {
338                if self.span_indexes.ids.contains_key(&new_span_event.span_id) {
339                    return Err(anyhow::anyhow!("duplicate span id"));
340                }
341
342                // parent may not yet exist, that is ok
343                let parent_id = new_create_event.parent_id;
344                let parent_key = parent_id.and_then(|id| self.span_indexes.ids.get(&id).copied());
345
346                let created_at =
347                    get_unique_timestamp(new_span_event.timestamp, &self.span_indexes.all);
348
349                let span = Span {
350                    kind: new_create_event.kind,
351                    resource_key: new_create_event.resource_key,
352                    id: new_span_event.span_id,
353                    created_at,
354                    closed_at: None,
355                    busy: None,
356                    parent_id,
357                    parent_key,
358                    links: Vec::new(),
359                    name: new_create_event.name.clone(),
360                    namespace: new_create_event.namespace.clone(),
361                    function: new_create_event.function.clone(),
362                    level: new_create_event.level,
363                    file_name: new_create_event.file_name.clone(),
364                    file_line: new_create_event.file_line,
365                    file_column: new_create_event.file_column,
366                    instrumentation_attributes: new_create_event.instrumentation_attributes.clone(),
367                    attributes: new_create_event.attributes.clone(),
368                };
369
370                let span_event = SpanEvent {
371                    timestamp: new_span_event.timestamp,
372                    span_key: span.created_at,
373                    kind: SpanEventKind::Create(CreateSpanEvent {
374                        kind: new_create_event.kind,
375                        resource_key: new_create_event.resource_key,
376                        parent_key,
377                        name: new_create_event.name,
378                        namespace: new_create_event.namespace,
379                        function: new_create_event.function,
380                        level: new_create_event.level,
381                        file_name: new_create_event.file_name,
382                        file_line: new_create_event.file_line,
383                        file_column: new_create_event.file_column,
384                        instrumentation_attributes: new_create_event.instrumentation_attributes,
385                        attributes: new_create_event.attributes,
386                    }),
387                };
388
389                let (child_spans, child_events) = self.insert_span_bookeeping(&span);
390                self.storage
391                    .insert_span(span.clone())
392                    .context("failed to insert span")?;
393                self.storage
394                    .update_span_parents(span.key(), &child_spans)
395                    .context("failed to update span parents")?;
396                self.storage
397                    .update_event_parents(span.key(), &child_events)
398                    .context("failed to update event parents")?;
399
400                self.insert_span_event_bookeeping(&span_event);
401                self.storage
402                    .insert_span_event(span_event)
403                    .context("failed to insert span event")?;
404
405                if !self.event_subscribers.is_empty() {
406                    let root = SpanContext::with_span(&span, &self.storage).trace_root();
407                    let descendent_events = self
408                        .event_indexes
409                        .traces
410                        .get(&root)
411                        .map(Vec::as_slice)
412                        .unwrap_or_default()
413                        .iter()
414                        .copied()
415                        .filter(|key| {
416                            EventContext::new(*key, &self.storage)
417                                .parents()
418                                .any(|p| p.key() == span.key())
419                        });
420
421                    // update subscribers for events that may have been updated
422                    // by a new parent
423                    for event_key in descendent_events {
424                        let context = EventContext::new(event_key, &self.storage);
425                        for subscriber in self.event_subscribers.values_mut() {
426                            subscriber.on_event(&context);
427                        }
428                    }
429
430                    self.event_subscribers.retain(|_, s| s.connected());
431                }
432
433                if !self.span_subscribers.is_empty() {
434                    for subscriber in self.span_subscribers.values_mut() {
435                        subscriber.on_span(&SpanContext::with_span(&span, &self.storage));
436                    }
437
438                    let root = SpanContext::with_span(&span, &self.storage).trace_root();
439                    let descendent_spans = self
440                        .span_indexes
441                        .traces
442                        .get(&root)
443                        .map(Vec::as_slice)
444                        .unwrap_or_default()
445                        .iter()
446                        .copied()
447                        .filter(|key| {
448                            SpanContext::new(*key, &self.storage)
449                                .parents()
450                                .any(|p| p.key() == span.key())
451                        });
452
453                    // update subscribers for spans that may have been updated
454                    // by a new parent
455                    for span_key in descendent_spans {
456                        let context = SpanContext::new(span_key, &self.storage);
457                        for subscriber in self.span_subscribers.values_mut() {
458                            subscriber.on_span(&context);
459                        }
460                    }
461
462                    self.span_subscribers.retain(|_, s| s.connected());
463                }
464            }
465            NewSpanEventKind::Update(new_update_event) => {
466                let span_key = self
467                    .span_indexes
468                    .ids
469                    .get(&new_span_event.span_id)
470                    .copied()
471                    .ok_or(anyhow!("unknown span id"))?;
472
473                let span = SpanContext::new(span_key, &self.storage);
474                let trace = span.trace_root();
475
476                let update_event = UpdateSpanEvent {
477                    attributes: new_update_event.attributes.clone(),
478                };
479
480                let descendent_spans = self
481                    .span_indexes
482                    .traces
483                    .get(&trace)
484                    .map(Vec::as_slice)
485                    .unwrap_or_default()
486                    .iter()
487                    .cloned()
488                    .filter(|key| {
489                        SpanContext::new(*key, &self.storage)
490                            .parents()
491                            .any(|p| p.key() == span_key)
492                            || *key == span_key // also update self
493                    })
494                    .collect::<Vec<_>>();
495
496                for child_span_key in descendent_spans {
497                    // check if nested span attribute changed
498                    self.span_indexes.update_with_new_field_on_parent(
499                        &SpanContext::new(child_span_key, &self.storage),
500                        span_key,
501                        &update_event.attributes,
502                    );
503                }
504
505                let descendent_events = self
506                    .event_indexes
507                    .traces
508                    .get(&trace)
509                    .map(Vec::as_slice)
510                    .unwrap_or_default()
511                    .iter()
512                    .cloned()
513                    .filter(|key| {
514                        EventContext::new(*key, &self.storage)
515                            .parents()
516                            .any(|p| p.key() == span_key)
517                    })
518                    .collect::<Vec<_>>();
519
520                for event_key in descendent_events {
521                    // check if nested event attribute changed
522                    self.event_indexes.update_with_new_field_on_parent(
523                        &EventContext::new(event_key, &self.storage),
524                        span_key,
525                        &update_event.attributes,
526                    );
527                }
528
529                let span_event = SpanEvent {
530                    timestamp: new_span_event.timestamp,
531                    span_key,
532                    kind: SpanEventKind::Update(update_event),
533                };
534
535                self.storage
536                    .update_span_attributes(span_key, new_update_event.attributes)
537                    .context("failed to update span attributes")?;
538
539                self.insert_span_event_bookeeping(&span_event);
540                self.storage
541                    .insert_span_event(span_event)
542                    .context("failed to insert span event")?;
543
544                if !self.event_subscribers.is_empty() {
545                    let descendent_events = self
546                        .event_indexes
547                        .traces
548                        .get(&trace)
549                        .map(Vec::as_slice)
550                        .unwrap_or_default()
551                        .iter()
552                        .copied()
553                        .filter(|key| {
554                            EventContext::new(*key, &self.storage)
555                                .parents()
556                                .any(|p| p.key() == span_key)
557                        });
558
559                    // update subscribers for events that may have been updated
560                    // by an updated parent
561                    for event_key in descendent_events {
562                        let context = EventContext::new(event_key, &self.storage);
563                        for subscriber in self.event_subscribers.values_mut() {
564                            subscriber.on_event(&context);
565                        }
566                    }
567
568                    self.event_subscribers.retain(|_, s| s.connected());
569                }
570
571                if !self.span_subscribers.is_empty() {
572                    let descendent_spans = self
573                        .span_indexes
574                        .traces
575                        .get(&trace)
576                        .map(Vec::as_slice)
577                        .unwrap_or_default()
578                        .iter()
579                        .copied()
580                        .filter(|key| {
581                            SpanContext::new(*key, &self.storage)
582                                .parents()
583                                .any(|p| p.key() == span_key)
584                        });
585
586                    // update subscribers for spans that may have been updated
587                    // by an updated parent
588                    for span_key in descendent_spans {
589                        let context = SpanContext::new(span_key, &self.storage);
590                        for subscriber in self.span_subscribers.values_mut() {
591                            subscriber.on_span(&context);
592                        }
593                    }
594
595                    self.span_subscribers.retain(|_, s| s.connected());
596                }
597            }
598            NewSpanEventKind::Follows(new_follows_event) => {
599                let span_key = self
600                    .span_indexes
601                    .ids
602                    .get(&new_span_event.span_id)
603                    .copied()
604                    .ok_or(anyhow!("unknown span id"))?;
605
606                let FullSpanId::Tracing(instance_id, _) = new_span_event.span_id else {
607                    return Err(anyhow!("invalid span kind, expected tracing"));
608                };
609
610                let follows_span_id = FullSpanId::Tracing(instance_id, new_follows_event.follows);
611                let follows_span_key = self
612                    .span_indexes
613                    .ids
614                    .get(&follows_span_id)
615                    .copied()
616                    .ok_or(anyhow!("unknown span id"))?;
617
618                // TODO: check against circular following
619                // TODO: check against duplicates
620
621                let span_event = SpanEvent {
622                    timestamp: new_span_event.timestamp,
623                    span_key,
624                    kind: SpanEventKind::Follows(FollowsSpanEvent {
625                        follows: follows_span_key,
626                    }),
627                };
628
629                self.storage
630                    .update_span_link(span_key, follows_span_id, BTreeMap::new())
631                    .context("failed to update span link")?;
632
633                self.insert_span_event_bookeeping(&span_event);
634                self.storage
635                    .insert_span_event(span_event)
636                    .context("failed to insert span event")?;
637            }
638            NewSpanEventKind::Enter(new_enter_event) => {
639                let span_key = self
640                    .span_indexes
641                    .ids
642                    .get(&new_span_event.span_id)
643                    .copied()
644                    .ok_or(anyhow!("unknown span id"))?;
645
646                let span_event = SpanEvent {
647                    timestamp: new_span_event.timestamp,
648                    span_key,
649                    kind: SpanEventKind::Enter(EnterSpanEvent {
650                        thread_id: new_enter_event.thread_id,
651                    }),
652                };
653
654                self.insert_span_event_bookeeping(&span_event);
655                self.storage
656                    .insert_span_event(span_event)
657                    .context("failed to insert span event")?;
658            }
659            NewSpanEventKind::Exit => {
660                let span_key = self
661                    .span_indexes
662                    .ids
663                    .get(&new_span_event.span_id)
664                    .copied()
665                    .ok_or(anyhow!("unknown span id"))?;
666
667                let span_event = SpanEvent {
668                    timestamp: new_span_event.timestamp,
669                    span_key,
670                    kind: SpanEventKind::Exit,
671                };
672
673                self.insert_span_event_bookeeping(&span_event);
674                self.storage
675                    .insert_span_event(span_event)
676                    .context("failed to insert span event")?;
677            }
678            NewSpanEventKind::Close(new_close_event) => {
679                let span_key = self
680                    .span_indexes
681                    .ids
682                    .get(&new_span_event.span_id)
683                    .copied()
684                    .ok_or(anyhow!("unknown span id"))?;
685
686                let busy = if let Some(busy) = new_close_event.busy {
687                    Some(busy)
688                } else {
689                    let mut busy = 0;
690                    let mut last_enter = None;
691                    for span_event_key in &self.span_event_indexes.spans[&span_key] {
692                        let Ok(span_event) = self.storage.get_span_event(*span_event_key) else {
693                            tracing::warn!("failed to get span event, ignoring");
694                            continue;
695                        };
696                        match &span_event.kind {
697                            SpanEventKind::Enter(_) => {
698                                last_enter = Some(span_event.timestamp);
699                            }
700                            SpanEventKind::Exit => {
701                                if let Some(enter) = last_enter {
702                                    busy += span_event.timestamp.get() - enter.get();
703                                }
704                                last_enter = None;
705                            }
706                            _ => {}
707                        }
708                    }
709
710                    if busy == 0 {
711                        None
712                    } else {
713                        Some(busy)
714                    }
715                };
716
717                let span_event = SpanEvent {
718                    timestamp: new_span_event.timestamp,
719                    span_key,
720                    kind: SpanEventKind::Close(CloseSpanEvent { busy }),
721                };
722
723                let updated = self
724                    .span_indexes
725                    .update_with_closed(span_key, new_span_event.timestamp);
726
727                if !updated {
728                    return Err(anyhow::anyhow!("span already closed"));
729                }
730
731                self.storage
732                    .update_span_closed(span_key, new_span_event.timestamp, busy)
733                    .context("failed to close span")?;
734
735                self.insert_span_event_bookeeping(&span_event);
736                self.storage
737                    .insert_span_event(span_event)
738                    .context("failed to insert span event")?;
739            }
740        }
741
742        Ok(span_event_key)
743    }
744
745    fn insert_span_bookeeping(&mut self, span: &Span) -> (Vec<SpanKey>, Vec<EventKey>) {
746        let span_key = span.created_at;
747
748        let orphaned_spans = self
749            .span_indexes
750            .update_with_new_span(&SpanContext::with_span(span, &self.storage));
751
752        let trace = SpanContext::with_span(span, &self.storage).trace_root();
753
754        let descendent_spans = self
755            .span_indexes
756            .traces
757            .get(&trace)
758            .map(Vec::as_slice)
759            .unwrap_or_default()
760            .iter()
761            .cloned()
762            .filter(|key| *key != span_key)
763            .filter(|key| {
764                orphaned_spans.contains(key)
765                    || SpanContext::new(*key, &self.storage)
766                        .parents()
767                        .any(|p| orphaned_spans.contains(&p.key()))
768            })
769            .collect::<Vec<_>>();
770
771        for descendent in descendent_spans {
772            self.span_indexes.update_with_new_field_on_parent(
773                &SpanContext::new(descendent, &self.storage),
774                span.key(),
775                &span.attributes,
776            );
777        }
778
779        let orphaned_events = self
780            .event_indexes
781            .update_with_new_span(&SpanContext::with_span(span, &self.storage));
782
783        let descendent_events = self
784            .event_indexes
785            .traces
786            .get(&trace)
787            .map(Vec::as_slice)
788            .unwrap_or_default()
789            .iter()
790            .cloned()
791            .filter(|key| {
792                orphaned_events.contains(key)
793                    || EventContext::new(*key, &self.storage)
794                        .parents()
795                        .any(|p| orphaned_events.contains(&p.key()))
796            })
797            .collect::<Vec<_>>();
798
799        for descendent in descendent_events {
800            self.event_indexes.update_with_new_field_on_parent(
801                &EventContext::new(descendent, &self.storage),
802                span.key(),
803                &span.attributes,
804            );
805        }
806
807        (orphaned_spans, orphaned_events)
808    }
809
810    fn insert_span_event_bookeeping(&mut self, span_event: &SpanEvent) {
811        self.span_event_indexes
812            .update_with_new_span_event(span_event);
813    }
814
815    #[instrument(level = tracing::Level::TRACE, skip_all)]
816    pub fn insert_event(&mut self, mut new_event: NewEvent) -> Result<(), AnyError> {
817        let event_key = get_unique_timestamp(new_event.timestamp, &self.event_indexes.all);
818        new_event.timestamp = event_key;
819
820        // parent may not yet exist, that is ok
821        let parent_id = new_event.span_id;
822        let parent_key = parent_id.and_then(|id| self.span_indexes.ids.get(&id).copied());
823
824        let event = Event {
825            kind: new_event.kind,
826            resource_key: new_event.resource_key,
827            timestamp: new_event.timestamp,
828            parent_id,
829            parent_key,
830            content: new_event.content,
831            namespace: new_event.namespace,
832            function: new_event.function,
833            level: new_event.level,
834            file_name: new_event.file_name,
835            file_line: new_event.file_line,
836            file_column: new_event.file_column,
837            attributes: new_event.attributes,
838        };
839
840        self.insert_event_bookeeping(&event);
841        self.storage
842            .insert_event(event.clone())
843            .context("failed to insert event")?;
844
845        let context = EventContext::with_event(&event, &self.storage);
846        for subscriber in self.event_subscribers.values_mut() {
847            subscriber.on_event(&context);
848        }
849
850        self.event_subscribers.retain(|_, s| s.connected());
851
852        Ok(())
853    }
854
855    fn insert_event_bookeeping(&mut self, event: &Event) {
856        self.event_indexes
857            .update_with_new_event(&EventContext::with_event(event, &self.storage));
858    }
859
860    #[instrument(level = tracing::Level::TRACE, skip_all)]
861    pub fn delete(&mut self, filter: DeleteFilter) -> Result<DeleteMetrics, AnyError> {
862        let root_spans =
863            self.get_root_spans_in_range_filter(filter.start, filter.end, filter.inside);
864        let root_events =
865            self.get_root_events_in_range_filter(filter.start, filter.end, filter.inside);
866
867        let spans_from_root_spans = root_spans
868            .iter()
869            .flat_map(|root| {
870                self.span_indexes
871                    .traces
872                    .get(&SpanContext::new(*root, &self.storage).trace_root())
873                    .map(Vec::as_slice)
874                    .unwrap_or_default()
875                    .iter()
876                    .cloned()
877            })
878            .collect::<Vec<SpanKey>>();
879        let events_from_root_spans = root_spans
880            .iter()
881            .flat_map(|root| {
882                self.event_indexes
883                    .traces
884                    .get(&SpanContext::new(*root, &self.storage).trace_root())
885                    .map(Vec::as_slice)
886                    .unwrap_or_default()
887                    .iter()
888                    .cloned()
889            })
890            .collect::<Vec<EventKey>>();
891        let span_events = spans_from_root_spans
892            .iter()
893            .flat_map(|span| {
894                self.span_event_indexes
895                    .spans
896                    .get(span)
897                    .map(Vec::as_slice)
898                    .unwrap_or_default()
899                    .iter()
900                    .cloned()
901            })
902            .collect::<Vec<SpanEventKey>>();
903
904        if filter.dry_run {
905            return Ok(DeleteMetrics {
906                spans: spans_from_root_spans.len(),
907                span_events: span_events.len(),
908                events: root_events.len() + events_from_root_spans.len(),
909            });
910        }
911
912        let mut spans_to_delete = spans_from_root_spans;
913        let mut span_events_to_delete = span_events;
914        let mut events_to_delete = root_events;
915        events_to_delete.extend(events_from_root_spans);
916
917        spans_to_delete.sort();
918        span_events_to_delete.sort();
919        events_to_delete.sort();
920
921        // drop smaller scoped entities from storage first to avoid integrity
922        // issues if things go wrong
923
924        self.storage
925            .drop_events(&events_to_delete)
926            .context("failed to drop events")?;
927        self.storage
928            .drop_span_events(&span_events_to_delete)
929            .context("failed to drop span events")?;
930        self.storage
931            .drop_spans(&spans_to_delete)
932            .context("failed to drop spans")?;
933
934        // remove smaller scoped entities from indexes last for some efficiency
935
936        self.remove_spans_bookeeping(&spans_to_delete);
937        self.remove_span_events_bookeeping(&span_events_to_delete);
938        self.remove_events_bookeeping(&events_to_delete);
939
940        let resources_to_delete = self
941            .resources
942            .keys()
943            .copied()
944            .filter(|resource_key| {
945                let used_by_spans = self
946                    .span_indexes
947                    .resources
948                    .get(resource_key)
949                    .is_some_and(|r| !r.is_empty());
950
951                let used_by_events = self
952                    .event_indexes
953                    .resources
954                    .get(resource_key)
955                    .is_some_and(|r| !r.is_empty());
956
957                !used_by_spans && !used_by_events
958            })
959            .collect::<Vec<_>>();
960
961        self.storage
962            .drop_resources(&resources_to_delete)
963            .context("failed to drop events")?;
964
965        for resource_key in resources_to_delete {
966            self.resources.remove(&resource_key);
967        }
968
969        Ok(DeleteMetrics {
970            spans: spans_to_delete.len(),
971            span_events: span_events_to_delete.len(),
972            events: events_to_delete.len(),
973        })
974    }
975
976    fn get_root_spans_in_range_filter(
977        &self,
978        start: Timestamp,
979        end: Timestamp,
980        inside: bool,
981    ) -> Vec<SpanKey> {
982        let filter = if inside {
983            BasicSpanFilter::And(vec![
984                BasicSpanFilter::Created(ValueOperator::Lte, end),
985                BasicSpanFilter::Closed(ValueOperator::Gte, start),
986                BasicSpanFilter::Root,
987            ])
988        } else {
989            BasicSpanFilter::And(vec![
990                BasicSpanFilter::Or(vec![
991                    BasicSpanFilter::Created(ValueOperator::Gt, end),
992                    BasicSpanFilter::Closed(ValueOperator::Lt, start),
993                ]),
994                BasicSpanFilter::Root,
995            ])
996        };
997
998        let indexed_filter =
999            IndexedSpanFilter::build(Some(filter), &self.span_indexes, &self.storage);
1000        let iter = IndexedSpanFilterIterator::new_internal(indexed_filter, self);
1001
1002        iter.collect()
1003    }
1004
1005    fn get_root_events_in_range_filter(
1006        &self,
1007        start: Timestamp,
1008        end: Timestamp,
1009        inside: bool,
1010    ) -> Vec<SpanKey> {
1011        let filter = if inside {
1012            BasicEventFilter::And(vec![
1013                BasicEventFilter::Timestamp(ValueOperator::Lte, end),
1014                BasicEventFilter::Timestamp(ValueOperator::Gte, start),
1015                BasicEventFilter::Root,
1016            ])
1017        } else {
1018            BasicEventFilter::And(vec![
1019                BasicEventFilter::Or(vec![
1020                    BasicEventFilter::Timestamp(ValueOperator::Gt, end),
1021                    BasicEventFilter::Timestamp(ValueOperator::Lt, start),
1022                ]),
1023                BasicEventFilter::Root,
1024            ])
1025        };
1026
1027        let indexed_filter =
1028            IndexedEventFilter::build(Some(filter), &self.event_indexes, &self.storage);
1029        let iter = IndexedEventFilterIterator::new_internal(indexed_filter, self);
1030
1031        iter.collect()
1032    }
1033
1034    fn remove_spans_bookeeping(&mut self, spans: &[SpanKey]) {
1035        self.span_indexes.remove_spans(spans);
1036        self.span_event_indexes.remove_spans(spans);
1037        self.event_indexes.remove_spans(spans);
1038    }
1039
1040    fn remove_span_events_bookeeping(&mut self, span_events: &[SpanEventKey]) {
1041        self.span_event_indexes.remove_span_events(span_events);
1042    }
1043
1044    fn remove_events_bookeeping(&mut self, events: &[EventKey]) {
1045        self.event_indexes.remove_events(events);
1046    }
1047
1048    #[instrument(level = tracing::Level::TRACE, skip_all)]
1049    pub fn copy_dataset(&self, target_storage: &mut dyn Storage) -> Result<(), AnyError> {
1050        let resources = self
1051            .storage
1052            .get_all_resources()
1053            .context("failed to get resources")?
1054            .collect::<Vec<_>>();
1055
1056        for resource in resources {
1057            let resource = resource.context("failed to get resource")?;
1058            target_storage
1059                .insert_resource((*resource).clone())
1060                .context("failed to insert resource")?;
1061        }
1062
1063        let spans = self
1064            .storage
1065            .get_all_spans()
1066            .context("failed to get spans")?
1067            .collect::<Vec<_>>();
1068
1069        for span in spans {
1070            let span = span.context("failed to get span")?;
1071            target_storage
1072                .insert_span((*span).clone())
1073                .context("failed to insert span")?;
1074        }
1075
1076        let span_events = self
1077            .storage
1078            .get_all_span_events()
1079            .context("failed to get span events")?
1080            .collect::<Vec<_>>();
1081
1082        for span_event in span_events {
1083            let span_event = span_event.context("failed to get span event")?;
1084            target_storage
1085                .insert_span_event((*span_event).clone())
1086                .context("failed to insert span event")?;
1087        }
1088
1089        let events = self
1090            .storage
1091            .get_all_events()
1092            .context("failed to get events")?
1093            .collect::<Vec<_>>();
1094
1095        for event in events {
1096            let event = event.context("failed to get event")?;
1097            target_storage
1098                .insert_event((*event).clone())
1099                .context("failed to insert event")?;
1100        }
1101
1102        Ok(())
1103    }
1104
1105    #[instrument(level = tracing::Level::TRACE, skip_all)]
1106    pub fn subscribe_to_spans(
1107        &mut self,
1108        filter: Vec<FilterPredicate>,
1109    ) -> Result<Subscriber<ComposedSpan>, AnyError> {
1110        let mut filter = BasicSpanFilter::And(
1111            filter
1112                .into_iter()
1113                .map(|p| BasicSpanFilter::from_predicate(p, &self.span_indexes.ids))
1114                .collect::<Result<_, _>>()
1115                .context("invalid span filter")?,
1116        );
1117        filter.simplify();
1118
1119        let id = self.next_subscriber_id;
1120        self.next_subscriber_id += 1;
1121
1122        let (sender, receiver) = mpsc::unbounded_channel();
1123
1124        self.span_subscribers
1125            .insert(id, SpanSubscription::new(filter, sender));
1126
1127        Ok((id, receiver))
1128    }
1129
1130    #[instrument(level = tracing::Level::TRACE, skip_all)]
1131    pub fn unsubscribe_from_spans(&mut self, id: SubscriptionId) {
1132        self.span_subscribers.remove(&id);
1133    }
1134
1135    #[instrument(level = tracing::Level::TRACE, skip_all)]
1136    pub fn subscribe_to_events(
1137        &mut self,
1138        filter: Vec<FilterPredicate>,
1139    ) -> Result<Subscriber<ComposedEvent>, AnyError> {
1140        let mut filter = BasicEventFilter::And(
1141            filter
1142                .into_iter()
1143                .map(|p| BasicEventFilter::from_predicate(p, &self.span_indexes.ids))
1144                .collect::<Result<_, _>>()
1145                .context("invalid event filter")?,
1146        );
1147        filter.simplify();
1148
1149        let id = self.next_subscriber_id;
1150        self.next_subscriber_id += 1;
1151
1152        let (sender, receiver) = mpsc::unbounded_channel();
1153
1154        self.event_subscribers
1155            .insert(id, EventSubscription::new(filter, sender));
1156
1157        Ok((id, receiver))
1158    }
1159
1160    #[instrument(level = tracing::Level::TRACE, skip_all)]
1161    pub fn unsubscribe_from_events(&mut self, id: SubscriptionId) {
1162        self.event_subscribers.remove(&id);
1163    }
1164
1165    #[instrument(level = tracing::Level::TRACE, skip_all)]
1166    pub fn shutdown(&mut self) -> Result<(), AnyError> {
1167        if let Some(s) = self.storage.as_index_storage_mut() {
1168            s.update_indexes(
1169                &self.span_indexes,
1170                &self.span_event_indexes,
1171                &self.event_indexes,
1172            )
1173            .context("failed to update indexes")?;
1174        }
1175
1176        Ok(())
1177    }
1178}
1179
1180fn get_unique_timestamp(mut timestamp: Timestamp, existing: &[Timestamp]) -> Timestamp {
1181    let mut idx = existing.lower_bound(&timestamp);
1182
1183    while idx < existing.len() && timestamp == existing[idx] {
1184        idx += 1;
1185        timestamp = timestamp.saturating_add(1);
1186    }
1187
1188    timestamp
1189}
1190
1191fn now() -> Timestamp {
1192    #[cfg(test)]
1193    return Timestamp::new(1000).unwrap();
1194
1195    #[cfg(not(test))]
1196    {
1197        use std::time::{SystemTime, UNIX_EPOCH};
1198        let timestamp = SystemTime::now()
1199            .duration_since(UNIX_EPOCH)
1200            .expect("now should not be before the UNIX epoch")
1201            .as_micros();
1202
1203        let timestamp = u64::try_from(timestamp)
1204            .expect("microseconds shouldn't exceed a u64 until the year 586,912 AD");
1205
1206        Timestamp::new(timestamp).expect("now should not be at the UNIX epoch")
1207    }
1208}
1209
1210#[cfg(test)]
1211mod tests {
1212    use crate::filter::Order;
1213    use crate::models::{
1214        Level, NewCloseSpanEvent, NewCreateSpanEvent, NewUpdateSpanEvent, SourceKind,
1215    };
1216    use crate::storage::TransientStorage;
1217    use crate::Value;
1218
1219    use super::*;
1220
1221    #[test]
1222    fn test_event_filters() {
1223        let mut engine = SyncEngine::new(TransientStorage::new()).unwrap();
1224
1225        let resource_key = engine
1226            .insert_resource(NewResource {
1227                attributes: BTreeMap::new(),
1228            })
1229            .unwrap();
1230
1231        let simple = |id: u64, level: i32, attribute1: &str, attribute2: &str| -> NewEvent {
1232            NewEvent {
1233                kind: SourceKind::Tracing,
1234                resource_key,
1235                timestamp: id.try_into().unwrap(),
1236                span_id: None,
1237                content: Value::Str("event".to_owned()),
1238                namespace: Some("crate::storage::tests".to_owned()),
1239                function: Some("test".to_owned()),
1240                level: Level::from_tracing_level(level).unwrap(),
1241                file_name: None,
1242                file_line: None,
1243                file_column: None,
1244                attributes: BTreeMap::from_iter([
1245                    ("attribute1".to_owned(), Value::Str(attribute1.to_owned())),
1246                    ("attribute2".to_owned(), Value::Str(attribute2.to_owned())),
1247                ]),
1248            }
1249        };
1250
1251        engine.insert_event(simple(1, 4, "test", "A")).unwrap(); // excluded by timestamp
1252        engine.insert_event(simple(2, 1, "test", "A")).unwrap(); // excluded by level
1253        engine.insert_event(simple(3, 2, "test", "A")).unwrap(); // excluded by level
1254        engine.insert_event(simple(4, 3, "test", "A")).unwrap();
1255        engine.insert_event(simple(5, 4, "test", "A")).unwrap();
1256        engine.insert_event(simple(6, 4, "blah", "A")).unwrap(); // excluded by "blah"
1257        engine.insert_event(simple(7, 4, "test", "B")).unwrap(); // excluded by "B"
1258        engine.insert_event(simple(8, 4, "test", "C")).unwrap(); // excluded by "C"
1259        engine.insert_event(simple(9, 4, "test", "A")).unwrap(); // excluded by timestamp
1260
1261        let events = engine.query_event(Query {
1262            filter: FilterPredicate::parse(
1263                "#level: >=WARN @\"attribute1\": test @\"attribute2\": A",
1264            )
1265            .unwrap(),
1266            order: Order::Asc,
1267            limit: 3,
1268            start: Timestamp::new(2).unwrap(),
1269            end: Timestamp::new(8).unwrap(),
1270            previous: None,
1271        });
1272
1273        assert_eq!(events.len(), 2);
1274        assert_eq!(events[0].timestamp, Timestamp::new(4).unwrap());
1275        assert_eq!(events[1].timestamp, Timestamp::new(5).unwrap());
1276    }
1277
1278    #[test]
1279    fn test_span_filters() {
1280        let mut engine = SyncEngine::new(TransientStorage::new()).unwrap();
1281
1282        let resource_key = engine
1283            .insert_resource(NewResource {
1284                attributes: BTreeMap::new(),
1285            })
1286            .unwrap();
1287
1288        let simple_open =
1289            |open: u64, level: i32, attribute1: &str, attribute2: &str| -> NewSpanEvent {
1290                NewSpanEvent {
1291                    timestamp: Timestamp::new(open).unwrap(),
1292                    span_id: FullSpanId::Tracing(1.try_into().unwrap(), open),
1293                    kind: NewSpanEventKind::Create(NewCreateSpanEvent {
1294                        kind: SourceKind::Tracing,
1295                        resource_key,
1296                        parent_id: None,
1297                        name: "test".to_owned(),
1298                        namespace: Some("crate::storage::tests".to_owned()),
1299                        function: None,
1300                        level: Level::from_tracing_level(level).unwrap(),
1301                        file_name: None,
1302                        file_line: None,
1303                        file_column: None,
1304                        instrumentation_attributes: BTreeMap::default(),
1305                        attributes: BTreeMap::from_iter([
1306                            ("attribute1".to_owned(), Value::Str(attribute1.to_owned())),
1307                            ("attribute2".to_owned(), Value::Str(attribute2.to_owned())),
1308                        ]),
1309                    }),
1310                }
1311            };
1312
1313        let simple_close = |open: u64, close: u64| -> NewSpanEvent {
1314            NewSpanEvent {
1315                timestamp: Timestamp::new(close).unwrap(),
1316                span_id: FullSpanId::Tracing(1.try_into().unwrap(), open),
1317                kind: NewSpanEventKind::Close(NewCloseSpanEvent { busy: None }),
1318            }
1319        };
1320
1321        engine
1322            .insert_span_event(simple_open(1, 4, "test", "A"))
1323            .unwrap(); // excluded by timestamp
1324        engine.insert_span_event(simple_close(1, 2)).unwrap();
1325        engine
1326            .insert_span_event(simple_open(3, 1, "test", "A"))
1327            .unwrap(); // excluded by level
1328        engine.insert_span_event(simple_close(3, 6)).unwrap();
1329        engine
1330            .insert_span_event(simple_open(4, 2, "test", "A"))
1331            .unwrap(); // excluded by level
1332        engine.insert_span_event(simple_close(4, 7)).unwrap();
1333        engine
1334            .insert_span_event(simple_open(5, 3, "test", "A"))
1335            .unwrap();
1336        engine.insert_span_event(simple_close(5, 8)).unwrap();
1337        engine
1338            .insert_span_event(simple_open(9, 4, "test", "A"))
1339            .unwrap();
1340        engine
1341            .insert_span_event(simple_open(10, 4, "blah", "A"))
1342            .unwrap(); // excluded by "blah"
1343        engine
1344            .insert_span_event(simple_open(11, 4, "test", "B"))
1345            .unwrap(); // excluded by "B"
1346        engine
1347            .insert_span_event(simple_open(12, 4, "test", "C"))
1348            .unwrap(); // excluded by "C"
1349        engine
1350            .insert_span_event(simple_open(13, 4, "test", "A"))
1351            .unwrap(); // excluded by timestamp
1352
1353        let spans = engine.query_span(Query {
1354            filter: FilterPredicate::parse(
1355                "#level: >=WARN @\"attribute1\": test @\"attribute2\": A",
1356            )
1357            .unwrap(),
1358            order: Order::Asc,
1359            limit: 5,
1360            start: Timestamp::new(2).unwrap(),
1361            end: Timestamp::new(10).unwrap(),
1362            previous: None,
1363        });
1364
1365        assert_eq!(spans.len(), 2);
1366        assert_eq!(spans[0].created_at, Timestamp::new(5).unwrap());
1367        assert_eq!(spans[1].created_at, Timestamp::new(9).unwrap());
1368    }
1369
1370    #[test]
1371    fn event_found_with_resource_attribute() {
1372        let mut engine = SyncEngine::new(TransientStorage::new()).unwrap();
1373
1374        let resource_key = engine
1375            .insert_resource(NewResource {
1376                attributes: BTreeMap::from_iter([("attr1".to_owned(), Value::Str("A".to_owned()))]),
1377            })
1378            .unwrap();
1379
1380        let now = now();
1381        engine
1382            .insert_event(NewEvent {
1383                kind: SourceKind::Tracing,
1384                resource_key,
1385                timestamp: now.saturating_add(1),
1386                span_id: None,
1387                content: Value::Str("event".to_owned()),
1388                namespace: Some("crate::storage::tests".to_owned()),
1389                function: Some("test".to_owned()),
1390                level: Level::Error,
1391                file_name: None,
1392                file_line: None,
1393                file_column: None,
1394                attributes: BTreeMap::new(),
1395            })
1396            .unwrap();
1397
1398        let events = engine.query_event(Query {
1399            filter: FilterPredicate::parse("@\"attr1\": A").unwrap(),
1400            order: Order::Asc,
1401            limit: 5,
1402            start: now,
1403            end: now.saturating_add(2),
1404            previous: None,
1405        });
1406
1407        assert_eq!(events.len(), 1);
1408
1409        let events = engine.query_event(Query {
1410            filter: FilterPredicate::parse("@\"attr1\": B").unwrap(),
1411            order: Order::Asc,
1412            limit: 5,
1413            start: now,
1414            end: now.saturating_add(2),
1415            previous: None,
1416        });
1417
1418        assert_eq!(events.len(), 0);
1419    }
1420
1421    #[test]
1422    fn event_found_with_inherent_attribute() {
1423        let mut engine = SyncEngine::new(TransientStorage::new()).unwrap();
1424
1425        let resource_key = engine
1426            .insert_resource(NewResource {
1427                attributes: BTreeMap::from_iter([("attr1".to_owned(), Value::Str("A".to_owned()))]),
1428            })
1429            .unwrap();
1430
1431        let now = now();
1432        engine
1433            .insert_event(NewEvent {
1434                kind: SourceKind::Tracing,
1435                resource_key,
1436                timestamp: now.saturating_add(1),
1437                span_id: None,
1438                content: Value::Str("event".to_owned()),
1439                namespace: Some("crate::storage::tests".to_owned()),
1440                function: Some("test".to_owned()),
1441                level: Level::Error,
1442                file_name: None,
1443                file_line: None,
1444                file_column: None,
1445                attributes: BTreeMap::from_iter([("attr1".to_owned(), Value::Str("B".to_owned()))]),
1446            })
1447            .unwrap();
1448
1449        let events = engine.query_event(Query {
1450            filter: FilterPredicate::parse("@\"attr1\": A").unwrap(),
1451            order: Order::Asc,
1452            limit: 5,
1453            start: now,
1454            end: now.saturating_add(2),
1455            previous: None,
1456        });
1457
1458        assert_eq!(events.len(), 0);
1459
1460        let events = engine.query_event(Query {
1461            filter: FilterPredicate::parse("@\"attr1\": B").unwrap(),
1462            order: Order::Asc,
1463            limit: 5,
1464            start: now,
1465            end: now.saturating_add(2),
1466            previous: None,
1467        });
1468
1469        assert_eq!(events.len(), 1);
1470    }
1471
1472    #[test]
1473    fn event_found_with_span_attribute() {
1474        let mut engine = SyncEngine::new(TransientStorage::new()).unwrap();
1475
1476        let resource_key = engine
1477            .insert_resource(NewResource {
1478                attributes: BTreeMap::from_iter([("attr1".to_owned(), Value::Str("A".to_owned()))]),
1479            })
1480            .unwrap();
1481
1482        engine
1483            .insert_span_event(NewSpanEvent {
1484                timestamp: now(),
1485                span_id: FullSpanId::Tracing(1.try_into().unwrap(), 1),
1486                kind: NewSpanEventKind::Create(NewCreateSpanEvent {
1487                    kind: SourceKind::Tracing,
1488                    resource_key,
1489                    parent_id: None,
1490                    name: "test".to_owned(),
1491                    namespace: Some("crate::storage::tests".to_owned()),
1492                    function: None,
1493                    level: Level::Error,
1494                    file_name: None,
1495                    file_line: None,
1496                    file_column: None,
1497                    instrumentation_attributes: BTreeMap::default(),
1498                    attributes: BTreeMap::from_iter([(
1499                        "attr1".to_owned(),
1500                        Value::Str("C".to_owned()),
1501                    )]),
1502                }),
1503            })
1504            .unwrap();
1505
1506        let now = now();
1507        engine
1508            .insert_event(NewEvent {
1509                kind: SourceKind::Tracing,
1510                resource_key,
1511                timestamp: now.saturating_add(1),
1512                span_id: Some(FullSpanId::Tracing(1.try_into().unwrap(), 1)),
1513                content: Value::Str("event".to_owned()),
1514                namespace: Some("crate::storage::tests".to_owned()),
1515                function: Some("test".to_owned()),
1516                level: Level::Error,
1517                file_name: None,
1518                file_line: None,
1519                file_column: None,
1520                attributes: BTreeMap::new(),
1521            })
1522            .unwrap();
1523
1524        let events = engine.query_event(Query {
1525            filter: FilterPredicate::parse("@\"attr1\": A").unwrap(),
1526            order: Order::Asc,
1527            limit: 5,
1528            start: now,
1529            end: now.saturating_add(2),
1530            previous: None,
1531        });
1532
1533        assert_eq!(events.len(), 0);
1534
1535        let events = engine.query_event(Query {
1536            filter: FilterPredicate::parse("@\"attr1\": C").unwrap(),
1537            order: Order::Asc,
1538            limit: 5,
1539            start: now,
1540            end: now.saturating_add(2),
1541            previous: None,
1542        });
1543
1544        assert_eq!(events.len(), 1);
1545    }
1546
1547    #[test]
1548    fn event_found_with_nonindexed_updated_span_attribute() {
1549        let mut engine = SyncEngine::new(TransientStorage::new()).unwrap();
1550
1551        let resource_key = engine
1552            .insert_resource(NewResource {
1553                attributes: BTreeMap::from_iter([("attr1".to_owned(), Value::Str("A".to_owned()))]),
1554            })
1555            .unwrap();
1556
1557        engine
1558            .insert_span_event(NewSpanEvent {
1559                timestamp: now(),
1560                span_id: FullSpanId::Tracing(1.try_into().unwrap(), 1),
1561                kind: NewSpanEventKind::Create(NewCreateSpanEvent {
1562                    kind: SourceKind::Tracing,
1563                    resource_key,
1564                    parent_id: None,
1565                    name: "test".to_owned(),
1566                    namespace: Some("crate::storage::tests".to_owned()),
1567                    function: None,
1568                    level: Level::Error,
1569                    file_name: None,
1570                    file_line: None,
1571                    file_column: None,
1572                    instrumentation_attributes: BTreeMap::default(),
1573                    attributes: BTreeMap::new(),
1574                }),
1575            })
1576            .unwrap();
1577
1578        let now = now();
1579        engine
1580            .insert_event(NewEvent {
1581                kind: SourceKind::Tracing,
1582                resource_key,
1583                timestamp: now.saturating_add(1),
1584                span_id: Some(FullSpanId::Tracing(1.try_into().unwrap(), 1)),
1585                content: Value::Str("event".to_owned()),
1586                namespace: Some("crate::storage::tests".to_owned()),
1587                function: Some("test".to_owned()),
1588                level: Level::Error,
1589                file_name: None,
1590                file_line: None,
1591                file_column: None,
1592                attributes: BTreeMap::new(),
1593            })
1594            .unwrap();
1595
1596        engine
1597            .insert_span_event(NewSpanEvent {
1598                timestamp: super::now(),
1599                span_id: FullSpanId::Tracing(1.try_into().unwrap(), 1),
1600                kind: NewSpanEventKind::Update(NewUpdateSpanEvent {
1601                    attributes: BTreeMap::from_iter([(
1602                        "attr1".to_owned(),
1603                        Value::Str("C".to_owned()),
1604                    )]),
1605                }),
1606            })
1607            .unwrap();
1608
1609        let events = engine.query_event(Query {
1610            filter: FilterPredicate::parse("@\"attr1\": A").unwrap(),
1611            order: Order::Asc,
1612            limit: 5,
1613            start: now,
1614            end: now.saturating_add(2),
1615            previous: None,
1616        });
1617
1618        assert_eq!(events.len(), 0);
1619
1620        let events = engine.query_event(Query {
1621            filter: FilterPredicate::parse("@\"attr1\": C").unwrap(),
1622            order: Order::Asc,
1623            limit: 5,
1624            start: now,
1625            end: now.saturating_add(2),
1626            previous: None,
1627        });
1628
1629        assert_eq!(events.len(), 1);
1630    }
1631
1632    #[test]
1633    fn event_found_with_indexed_updated_span_attribute() {
1634        let mut engine = SyncEngine::new(TransientStorage::new()).unwrap();
1635
1636        let resource_key = engine
1637            .insert_resource(NewResource {
1638                attributes: BTreeMap::from_iter([("attr1".to_owned(), Value::Str("A".to_owned()))]),
1639            })
1640            .unwrap();
1641
1642        engine
1643            .insert_span_event(NewSpanEvent {
1644                timestamp: now(),
1645                span_id: FullSpanId::Tracing(1.try_into().unwrap(), 1),
1646                kind: NewSpanEventKind::Create(NewCreateSpanEvent {
1647                    kind: SourceKind::Tracing,
1648                    resource_key,
1649                    parent_id: None,
1650                    name: "test".to_owned(),
1651                    namespace: Some("crate::storage::tests".to_owned()),
1652                    function: None,
1653                    level: Level::Error,
1654                    file_name: None,
1655                    file_line: None,
1656                    file_column: None,
1657                    instrumentation_attributes: BTreeMap::default(),
1658                    attributes: BTreeMap::new(),
1659                }),
1660            })
1661            .unwrap();
1662
1663        let now = now();
1664        engine
1665            .insert_event(NewEvent {
1666                kind: SourceKind::Tracing,
1667                resource_key,
1668                timestamp: now.saturating_add(1),
1669                span_id: Some(FullSpanId::Tracing(1.try_into().unwrap(), 1)),
1670                content: Value::Str("event".to_owned()),
1671                namespace: Some("crate::storage::tests".to_owned()),
1672                function: Some("test".to_owned()),
1673                level: Level::Error,
1674                file_name: None,
1675                file_line: None,
1676                file_column: None,
1677                attributes: BTreeMap::new(),
1678            })
1679            .unwrap();
1680
1681        engine
1682            .insert_span_event(NewSpanEvent {
1683                timestamp: super::now(),
1684                span_id: FullSpanId::Tracing(1.try_into().unwrap(), 1),
1685                kind: NewSpanEventKind::Update(NewUpdateSpanEvent {
1686                    attributes: BTreeMap::from_iter([(
1687                        "attr1".to_owned(),
1688                        Value::Str("C".to_owned()),
1689                    )]),
1690                }),
1691            })
1692            .unwrap();
1693
1694        let events = engine.query_event(Query {
1695            filter: FilterPredicate::parse("@\"attr1\": A").unwrap(),
1696            order: Order::Asc,
1697            limit: 5,
1698            start: now,
1699            end: now.saturating_add(2),
1700            previous: None,
1701        });
1702
1703        assert_eq!(events.len(), 0);
1704
1705        let events = engine.query_event(Query {
1706            filter: FilterPredicate::parse("@\"attr1\": C").unwrap(),
1707            order: Order::Asc,
1708            limit: 5,
1709            start: now,
1710            end: now.saturating_add(2),
1711            previous: None,
1712        });
1713
1714        assert_eq!(events.len(), 1);
1715    }
1716
1717    #[test]
1718    fn empty_filter_returns_all_event_counts() {
1719        let mut engine = SyncEngine::new(TransientStorage::new()).unwrap();
1720
1721        let resource_key = engine
1722            .insert_resource(NewResource {
1723                attributes: BTreeMap::from_iter([("attr1".to_owned(), Value::Str("A".to_owned()))]),
1724            })
1725            .unwrap();
1726
1727        let now = now();
1728        engine
1729            .insert_event(NewEvent {
1730                kind: SourceKind::Tracing,
1731                resource_key,
1732                timestamp: now.saturating_add(1),
1733                span_id: None,
1734                content: Value::Str("event".to_owned()),
1735                namespace: None,
1736                function: None,
1737                level: Level::Error,
1738                file_name: None,
1739                file_line: None,
1740                file_column: None,
1741                attributes: BTreeMap::new(),
1742            })
1743            .unwrap();
1744
1745        let query = Query {
1746            filter: FilterPredicate::parse("").unwrap(),
1747            order: Order::Asc,
1748            limit: 0,
1749            start: Timestamp::MIN,
1750            end: Timestamp::MAX,
1751            previous: None,
1752        };
1753
1754        assert_eq!(engine.query_event_count(query.clone()), 1)
1755    }
1756
1757    #[test]
1758    fn span_found_with_updated_attribute() {
1759        let mut engine = SyncEngine::new(TransientStorage::new()).unwrap();
1760
1761        let resource_key = engine
1762            .insert_resource(NewResource {
1763                attributes: BTreeMap::from_iter([("attr1".to_owned(), Value::Str("A".to_owned()))]),
1764            })
1765            .unwrap();
1766
1767        engine
1768            .insert_span_event(NewSpanEvent {
1769                timestamp: now(),
1770                span_id: FullSpanId::Tracing(1.try_into().unwrap(), 1),
1771                kind: NewSpanEventKind::Create(NewCreateSpanEvent {
1772                    kind: SourceKind::Tracing,
1773                    resource_key,
1774                    parent_id: None,
1775                    name: "test".to_owned(),
1776                    namespace: Some("crate::storage::tests".to_owned()),
1777                    function: None,
1778                    level: Level::Error,
1779                    file_name: None,
1780                    file_line: None,
1781                    file_column: None,
1782                    instrumentation_attributes: BTreeMap::default(),
1783                    attributes: BTreeMap::new(),
1784                }),
1785            })
1786            .unwrap();
1787
1788        let now = now();
1789        engine
1790            .insert_span_event(NewSpanEvent {
1791                timestamp: now,
1792                span_id: FullSpanId::Tracing(1.try_into().unwrap(), 1),
1793                kind: NewSpanEventKind::Update(NewUpdateSpanEvent {
1794                    attributes: BTreeMap::from_iter([(
1795                        "attr1".to_owned(),
1796                        Value::Str("C".to_owned()),
1797                    )]),
1798                }),
1799            })
1800            .unwrap();
1801
1802        let spans = engine.query_span(Query {
1803            filter: FilterPredicate::parse("@\"attr1\": C").unwrap(),
1804            order: Order::Asc,
1805            limit: 5,
1806            start: now,
1807            end: now.saturating_add(2),
1808            previous: None,
1809        });
1810
1811        assert_eq!(spans.len(), 1);
1812    }
1813
1814    #[test]
1815    fn span_found_with_post_parent_attribute() {
1816        let mut engine = SyncEngine::new(TransientStorage::new()).unwrap();
1817
1818        let resource_key = engine
1819            .insert_resource(NewResource {
1820                attributes: BTreeMap::from_iter([]),
1821            })
1822            .unwrap();
1823
1824        engine
1825            .insert_span_event(NewSpanEvent {
1826                timestamp: 1001.try_into().unwrap(),
1827                span_id: FullSpanId::Opentelemetry(1, 2),
1828                kind: NewSpanEventKind::Create(NewCreateSpanEvent {
1829                    kind: SourceKind::Opentelemetry,
1830                    resource_key,
1831                    parent_id: Some(FullSpanId::Opentelemetry(1, 1)),
1832                    name: "test".to_owned(),
1833                    namespace: Some("crate::storage::tests".to_owned()),
1834                    function: None,
1835                    level: Level::Error,
1836                    file_name: None,
1837                    file_line: None,
1838                    file_column: None,
1839                    instrumentation_attributes: BTreeMap::default(),
1840                    attributes: BTreeMap::new(),
1841                }),
1842            })
1843            .unwrap();
1844
1845        engine
1846            .insert_span_event(NewSpanEvent {
1847                timestamp: 1000.try_into().unwrap(),
1848                span_id: FullSpanId::Opentelemetry(1, 1),
1849                kind: NewSpanEventKind::Create(NewCreateSpanEvent {
1850                    kind: SourceKind::Opentelemetry,
1851                    resource_key,
1852                    parent_id: None,
1853                    name: "test".to_owned(),
1854                    namespace: Some("crate::storage::tests".to_owned()),
1855                    function: None,
1856                    level: Level::Error,
1857                    file_name: None,
1858                    file_line: None,
1859                    file_column: None,
1860                    instrumentation_attributes: BTreeMap::default(),
1861                    attributes: BTreeMap::from_iter([(
1862                        "attr1".to_owned(),
1863                        Value::Str("C".to_owned()),
1864                    )]),
1865                }),
1866            })
1867            .unwrap();
1868
1869        let spans = engine.query_span(Query {
1870            filter: FilterPredicate::parse("@\"attr1\": C").unwrap(),
1871            order: Order::Asc,
1872            limit: 5,
1873            start: now(),
1874            end: now().saturating_add(2),
1875            previous: None,
1876        });
1877
1878        assert_eq!(spans.len(), 2);
1879    }
1880}