Skip to main content

oxirs_stream/
time_travel.rs

1//! # Time-Travel Query System
2//!
3//! Advanced temporal query capabilities for OxiRS Stream, enabling querying
4//! data at any point in time, temporal analytics, and historical state reconstruction.
5
6use crate::event_sourcing::{EventStoreTrait, EventStream};
7use crate::StreamEvent;
8use anyhow::{anyhow, Result};
9use chrono::{DateTime, Duration as ChronoDuration, Utc};
10use serde::{Deserialize, Serialize};
11use std::collections::{BTreeMap, HashMap, HashSet};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use tokio::sync::RwLock;
15use tracing::{debug, error, info, warn};
16use uuid::Uuid;
17
18/// Type alias for custom filter functions to reduce complexity
19pub type CustomFilterFn = Box<dyn Fn(&StreamEvent) -> bool + Send + Sync>;
20
21/// Time-travel query configuration
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct TimeTravelConfig {
24    /// Maximum time window for time-travel queries
25    pub max_time_window_days: u32,
26    /// Enable temporal indexing for faster queries
27    pub enable_temporal_indexing: bool,
28    /// Temporal index granularity (minutes)
29    pub index_granularity_minutes: u32,
30    /// Maximum concurrent time-travel queries
31    pub max_concurrent_queries: usize,
32    /// Query timeout in seconds
33    pub query_timeout_seconds: u64,
34    /// Enable result caching
35    pub enable_result_caching: bool,
36    /// Cache TTL in minutes
37    pub cache_ttl_minutes: u32,
38    /// Maximum cache size in MB
39    pub max_cache_size_mb: usize,
40}
41
42impl Default for TimeTravelConfig {
43    fn default() -> Self {
44        Self {
45            max_time_window_days: 365,
46            enable_temporal_indexing: true,
47            index_granularity_minutes: 60,
48            max_concurrent_queries: 100,
49            query_timeout_seconds: 300,
50            enable_result_caching: true,
51            cache_ttl_minutes: 60,
52            max_cache_size_mb: 1024,
53        }
54    }
55}
56
57/// Time point specification for queries
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub enum TimePoint {
60    /// Specific timestamp
61    Timestamp(DateTime<Utc>),
62    /// Relative time from now
63    RelativeTime(ChronoDuration),
64    /// Event version number
65    Version(u64),
66    /// Event ID
67    EventId(Uuid),
68    /// Named snapshot
69    Snapshot(String),
70}
71
72/// Time range specification for queries
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct TimeRange {
75    pub start: TimePoint,
76    pub end: TimePoint,
77}
78
79/// Temporal query specification
80#[derive(Debug, Clone)]
81pub struct TemporalQuery {
82    pub query_id: Uuid,
83    pub time_point: Option<TimePoint>,
84    pub time_range: Option<TimeRange>,
85    pub filter: TemporalFilter,
86    pub projection: TemporalProjection,
87    pub ordering: TemporalOrdering,
88    pub limit: Option<usize>,
89}
90
91impl Default for TemporalQuery {
92    fn default() -> Self {
93        Self::new()
94    }
95}
96
97impl TemporalQuery {
98    /// Create a new temporal query
99    pub fn new() -> Self {
100        Self {
101            query_id: Uuid::new_v4(),
102            time_point: None,
103            time_range: None,
104            filter: TemporalFilter::default(),
105            projection: TemporalProjection::default(),
106            ordering: TemporalOrdering::default(),
107            limit: None,
108        }
109    }
110
111    /// Query at specific time point
112    pub fn at_time(mut self, time_point: TimePoint) -> Self {
113        self.time_point = Some(time_point);
114        self
115    }
116
117    /// Query within time range
118    pub fn in_range(mut self, time_range: TimeRange) -> Self {
119        self.time_range = Some(time_range);
120        self
121    }
122
123    /// Add filter
124    pub fn filter(mut self, filter: TemporalFilter) -> Self {
125        self.filter = filter;
126        self
127    }
128
129    /// Set projection
130    pub fn project(mut self, projection: TemporalProjection) -> Self {
131        self.projection = projection;
132        self
133    }
134
135    /// Set ordering
136    pub fn order_by(mut self, ordering: TemporalOrdering) -> Self {
137        self.ordering = ordering;
138        self
139    }
140
141    /// Set limit
142    pub fn limit(mut self, limit: usize) -> Self {
143        self.limit = Some(limit);
144        self
145    }
146}
147
148/// Temporal filter for events
149#[derive(Default)]
150pub struct TemporalFilter {
151    pub event_types: Option<HashSet<String>>,
152    pub aggregate_ids: Option<HashSet<String>>,
153    pub user_ids: Option<HashSet<String>>,
154    pub sources: Option<HashSet<String>>,
155    pub custom_filters: Vec<CustomFilterFn>,
156}
157
158impl std::fmt::Debug for TemporalFilter {
159    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160        f.debug_struct("TemporalFilter")
161            .field("event_types", &self.event_types)
162            .field("aggregate_ids", &self.aggregate_ids)
163            .field("user_ids", &self.user_ids)
164            .field("sources", &self.sources)
165            .field(
166                "custom_filters",
167                &format!("<{} filters>", self.custom_filters.len()),
168            )
169            .finish()
170    }
171}
172
173impl Clone for TemporalFilter {
174    fn clone(&self) -> Self {
175        Self {
176            event_types: self.event_types.clone(),
177            aggregate_ids: self.aggregate_ids.clone(),
178            user_ids: self.user_ids.clone(),
179            sources: self.sources.clone(),
180            custom_filters: Vec::new(), // Cannot clone function pointers
181        }
182    }
183}
184
185/// Temporal projection specification
186#[derive(Debug, Clone, Default)]
187pub enum TemporalProjection {
188    /// Return full events
189    #[default]
190    FullEvents,
191    /// Return only metadata
192    MetadataOnly,
193    /// Return specific fields
194    Fields(Vec<String>),
195    /// Return aggregated data
196    Aggregation(AggregationType),
197}
198
199/// Aggregation type for temporal queries
200#[derive(Debug, Clone)]
201pub enum AggregationType {
202    Count,
203    CountBy(String),
204    Timeline(ChronoDuration),
205    Statistics,
206}
207
208/// Temporal ordering specification
209#[derive(Debug, Clone, Default)]
210pub enum TemporalOrdering {
211    /// Order by timestamp ascending
212    TimeAscending,
213    /// Order by timestamp descending
214    #[default]
215    TimeDescending,
216    /// Order by version ascending
217    VersionAscending,
218    /// Order by version descending
219    VersionDescending,
220    /// Order by custom field
221    Custom(String, bool), // field, ascending
222}
223
224/// Result of a temporal query
225#[derive(Debug, Clone)]
226pub struct TemporalQueryResult {
227    pub query_id: Uuid,
228    pub events: Vec<StreamEvent>,
229    pub metadata: TemporalResultMetadata,
230    pub aggregations: Option<TemporalAggregations>,
231    pub execution_time: Duration,
232    pub from_cache: bool,
233}
234
235/// Metadata about temporal query results
236#[derive(Debug, Clone)]
237pub struct TemporalResultMetadata {
238    pub total_events: usize,
239    pub time_range_covered: Option<(DateTime<Utc>, DateTime<Utc>)>,
240    pub version_range_covered: Option<(u64, u64)>,
241    pub aggregates_scanned: HashSet<String>,
242    pub index_hits: usize,
243    pub index_misses: usize,
244}
245
246/// Aggregated data from temporal queries
247#[derive(Debug, Clone)]
248pub struct TemporalAggregations {
249    pub count: usize,
250    pub count_by_type: HashMap<String, usize>,
251    pub timeline: Vec<TimelinePoint>,
252    pub statistics: TemporalStatistics,
253}
254
255/// Point in timeline aggregation
256#[derive(Debug, Clone)]
257pub struct TimelinePoint {
258    pub timestamp: DateTime<Utc>,
259    pub count: usize,
260    pub event_types: HashMap<String, usize>,
261}
262
263/// Statistical data from temporal queries
264#[derive(Debug, Clone)]
265pub struct TemporalStatistics {
266    pub events_per_second: f64,
267    pub peak_throughput: f64,
268    pub average_event_size: f64,
269    pub unique_aggregates: usize,
270    pub unique_users: usize,
271    pub time_span: ChronoDuration,
272}
273
274/// Temporal index for efficient time-travel queries
275#[derive(Debug)]
276struct TemporalIndex {
277    /// Time-based index: timestamp -> event IDs
278    time_index: BTreeMap<DateTime<Utc>, Vec<Uuid>>,
279    /// Version-based index: version -> event metadata
280    version_index: BTreeMap<u64, EventIndexEntry>,
281    /// Aggregate-based index: aggregate_id -> time-ordered events
282    aggregate_index: HashMap<String, BTreeMap<DateTime<Utc>, Vec<Uuid>>>,
283    /// Type-based index: event_type -> time-ordered events
284    type_index: HashMap<String, BTreeMap<DateTime<Utc>, Vec<Uuid>>>,
285}
286
287#[derive(Debug, Clone)]
288struct EventIndexEntry {
289    pub event_id: Uuid,
290    pub timestamp: DateTime<Utc>,
291    pub aggregate_id: String,
292    pub event_type: String,
293    pub version: u64,
294}
295
296impl TemporalIndex {
297    fn new() -> Self {
298        Self {
299            time_index: BTreeMap::new(),
300            version_index: BTreeMap::new(),
301            aggregate_index: HashMap::new(),
302            type_index: HashMap::new(),
303        }
304    }
305
306    fn add_event(&mut self, event: &StreamEvent) {
307        let metadata = event.metadata();
308        let timestamp = metadata.timestamp;
309        let event_id = uuid::Uuid::parse_str(&metadata.event_id).unwrap_or(uuid::Uuid::new_v4());
310        let aggregate_id = metadata.context.clone().unwrap_or_default();
311        let event_type = format!("{event:?}");
312        let version = metadata.version.parse::<u64>().unwrap_or(0);
313
314        // Time index
315        self.time_index.entry(timestamp).or_default().push(event_id);
316
317        // Version index
318        self.version_index.insert(
319            version,
320            EventIndexEntry {
321                event_id,
322                timestamp,
323                aggregate_id: aggregate_id.clone(),
324                event_type: event_type.clone(),
325                version,
326            },
327        );
328
329        // Aggregate index
330        self.aggregate_index
331            .entry(aggregate_id)
332            .or_default()
333            .entry(timestamp)
334            .or_default()
335            .push(event_id);
336
337        // Type index
338        self.type_index
339            .entry(event_type)
340            .or_default()
341            .entry(timestamp)
342            .or_default()
343            .push(event_id);
344    }
345
346    fn find_events_by_time_range(&self, start: DateTime<Utc>, end: DateTime<Utc>) -> Vec<Uuid> {
347        let mut event_ids = Vec::new();
348
349        for (_, ids) in self.time_index.range(start..=end) {
350            event_ids.extend_from_slice(ids);
351        }
352
353        event_ids
354    }
355
356    fn find_events_by_version_range(&self, start: u64, end: u64) -> Vec<Uuid> {
357        let mut event_ids = Vec::new();
358
359        for (_, entry) in self.version_index.range(start..=end) {
360            event_ids.push(entry.event_id);
361        }
362
363        event_ids
364    }
365
366    fn find_events_by_aggregate(
367        &self,
368        aggregate_id: &str,
369        start: DateTime<Utc>,
370        end: DateTime<Utc>,
371    ) -> Vec<Uuid> {
372        if let Some(time_map) = self.aggregate_index.get(aggregate_id) {
373            let mut event_ids = Vec::new();
374            for (_, ids) in time_map.range(start..=end) {
375                event_ids.extend_from_slice(ids);
376            }
377            event_ids
378        } else {
379            Vec::new()
380        }
381    }
382}
383
384/// Time-travel query engine
385pub struct TimeTravelEngine {
386    config: TimeTravelConfig,
387    event_store: Arc<dyn EventStoreTrait>,
388    event_stream: Arc<dyn EventStream>,
389    temporal_index: Arc<RwLock<TemporalIndex>>,
390    query_cache: Arc<RwLock<QueryCache>>,
391    query_semaphore: Arc<tokio::sync::Semaphore>,
392    metrics: Arc<RwLock<TimeTravelMetrics>>,
393}
394
395impl TimeTravelEngine {
396    /// Create a new time-travel engine
397    pub fn new(
398        config: TimeTravelConfig,
399        event_store: Arc<dyn EventStoreTrait>,
400        event_stream: Arc<dyn EventStream>,
401    ) -> Self {
402        Self {
403            query_semaphore: Arc::new(tokio::sync::Semaphore::new(config.max_concurrent_queries)),
404            temporal_index: Arc::new(RwLock::new(TemporalIndex::new())),
405            query_cache: Arc::new(RwLock::new(QueryCache::new(config.clone()))),
406            config,
407            event_store,
408            event_stream,
409            metrics: Arc::new(RwLock::new(TimeTravelMetrics::default())),
410        }
411    }
412
413    /// Start the time-travel engine
414    pub async fn start(&self) -> Result<()> {
415        info!("Starting time-travel engine");
416
417        // Build initial index if enabled
418        if self.config.enable_temporal_indexing {
419            self.build_temporal_index().await?;
420        }
421
422        // Start index maintenance task
423        let index = Arc::clone(&self.temporal_index);
424        let event_stream = Arc::clone(&self.event_stream);
425
426        tokio::spawn(async move {
427            let mut interval = tokio::time::interval(Duration::from_secs(60));
428            loop {
429                interval.tick().await;
430                if let Err(e) =
431                    Self::update_index(Arc::clone(&index), Arc::clone(&event_stream)).await
432                {
433                    error!("Failed to update temporal index: {}", e);
434                }
435            }
436        });
437
438        info!("Time-travel engine started successfully");
439        Ok(())
440    }
441
442    /// Execute a temporal query
443    pub async fn execute_query(&self, query: TemporalQuery) -> Result<TemporalQueryResult> {
444        let start_time = Instant::now();
445        let query_id = query.query_id;
446
447        debug!("Executing temporal query {}", query_id);
448
449        // Acquire semaphore for concurrency control
450        let _permit = self.query_semaphore.acquire().await?;
451
452        // Update metrics
453        {
454            let mut metrics = self.metrics.write().await;
455            metrics.queries_executed += 1;
456            metrics.active_queries += 1;
457        }
458
459        // Check cache first
460        let cache_key = self.generate_cache_key(&query);
461        if self.config.enable_result_caching {
462            let cache = self.query_cache.read().await;
463            if let Some(cached_result) = cache.get(&cache_key) {
464                let mut metrics = self.metrics.write().await;
465                metrics.active_queries -= 1;
466                metrics.cache_hits += 1;
467
468                return Ok(TemporalQueryResult {
469                    query_id,
470                    events: cached_result.events,
471                    metadata: cached_result.metadata,
472                    aggregations: cached_result.aggregations,
473                    execution_time: start_time.elapsed(),
474                    from_cache: true,
475                });
476            }
477        }
478
479        let result = self.execute_query_internal(query).await;
480
481        // Update metrics
482        {
483            let mut metrics = self.metrics.write().await;
484            metrics.active_queries -= 1;
485            match &result {
486                Ok(_) => {
487                    metrics.queries_succeeded += 1;
488                    if !self.config.enable_result_caching {
489                        metrics.cache_misses += 1;
490                    }
491                }
492                Err(_) => metrics.queries_failed += 1,
493            }
494        }
495
496        let execution_time = start_time.elapsed();
497        debug!(
498            "Temporal query {} executed in {:?}",
499            query_id, execution_time
500        );
501
502        if let Ok(ref res) = result {
503            // Cache result if applicable
504            if self.config.enable_result_caching {
505                let mut cache = self.query_cache.write().await;
506                cache.set(cache_key, res.clone());
507            }
508        }
509
510        result.map(|mut r| {
511            r.execution_time = execution_time;
512            r.from_cache = false;
513            r
514        })
515    }
516
517    /// Execute query internally
518    async fn execute_query_internal(&self, query: TemporalQuery) -> Result<TemporalQueryResult> {
519        let query_id = query.query_id;
520
521        // Resolve time points to actual timestamps
522        let (start_time, end_time) = self.resolve_time_range(&query).await?;
523
524        // Find candidate events
525        let candidate_event_ids = if self.config.enable_temporal_indexing {
526            self.find_events_with_index(&query, start_time, end_time)
527                .await?
528        } else {
529            self.find_events_without_index(&query, start_time, end_time)
530                .await?
531        };
532
533        // Load full events
534        let mut events = Vec::new();
535        for event_id in candidate_event_ids {
536            if let Some(event) = self.load_event(event_id).await? {
537                if self.matches_filter(&event, &query.filter) {
538                    events.push(event);
539                }
540            }
541        }
542
543        // Apply ordering
544        self.apply_ordering(&mut events, &query.ordering);
545
546        // Apply limit
547        if let Some(limit) = query.limit {
548            events.truncate(limit);
549        }
550
551        // Generate metadata
552        let metadata = self.generate_result_metadata(&events, start_time, end_time);
553
554        // Generate aggregations if requested
555        let aggregations = match query.projection {
556            TemporalProjection::Aggregation(ref agg_type) => {
557                Some(self.generate_aggregations(&events, agg_type, start_time, end_time)?)
558            }
559            _ => None,
560        };
561
562        // Apply projection
563        let projected_events = self.apply_projection(events, &query.projection);
564
565        Ok(TemporalQueryResult {
566            query_id,
567            events: projected_events,
568            metadata,
569            aggregations,
570            execution_time: Duration::default(), // Will be set by caller
571            from_cache: false,
572        })
573    }
574
575    /// Query state at specific time point
576    pub async fn query_state_at_time(
577        &self,
578        aggregate_id: &str,
579        time_point: TimePoint,
580    ) -> Result<Vec<StreamEvent>> {
581        let query = TemporalQuery::new()
582            .at_time(time_point)
583            .filter(TemporalFilter {
584                aggregate_ids: Some(std::iter::once(aggregate_id.to_string()).collect()),
585                ..Default::default()
586            });
587
588        let result = self.execute_query(query).await?;
589        Ok(result.events)
590    }
591
592    /// Query changes between two time points
593    pub async fn query_changes_between(
594        &self,
595        start: TimePoint,
596        end: TimePoint,
597        filter: Option<TemporalFilter>,
598    ) -> Result<Vec<StreamEvent>> {
599        let query = TemporalQuery::new()
600            .in_range(TimeRange { start, end })
601            .filter(filter.unwrap_or_default());
602
603        let result = self.execute_query(query).await?;
604        Ok(result.events)
605    }
606
607    /// Query timeline aggregation
608    pub async fn query_timeline(
609        &self,
610        time_range: TimeRange,
611        granularity: ChronoDuration,
612        filter: Option<TemporalFilter>,
613    ) -> Result<Vec<TimelinePoint>> {
614        let query = TemporalQuery::new()
615            .in_range(time_range)
616            .filter(filter.unwrap_or_default())
617            .project(TemporalProjection::Aggregation(AggregationType::Timeline(
618                granularity,
619            )));
620
621        let result = self.execute_query(query).await?;
622        Ok(result.aggregations.map(|a| a.timeline).unwrap_or_default())
623    }
624
625    /// Build temporal index from existing events
626    async fn build_temporal_index(&self) -> Result<()> {
627        info!("Building temporal index");
628
629        let events = self
630            .event_stream
631            .read_events_from_position(0, usize::MAX)
632            .await?;
633        let mut index = self.temporal_index.write().await;
634
635        for stored_event in events {
636            index.add_event(&stored_event.event_data);
637        }
638
639        info!(
640            "Temporal index built with {} events",
641            index.time_index.len()
642        );
643        Ok(())
644    }
645
646    /// Update index with new events
647    async fn update_index(
648        index: Arc<RwLock<TemporalIndex>>,
649        event_stream: Arc<dyn EventStream>,
650    ) -> Result<()> {
651        // This would typically track the last processed position
652        // For simplicity, we'll just rebuild periodically
653        let events = event_stream.read_events_from_position(0, 10000).await?;
654        let mut idx = index.write().await;
655
656        for stored_event in events {
657            idx.add_event(&stored_event.event_data);
658        }
659
660        Ok(())
661    }
662
663    /// Resolve time range from query specification
664    async fn resolve_time_range(
665        &self,
666        query: &TemporalQuery,
667    ) -> Result<(DateTime<Utc>, DateTime<Utc>)> {
668        let now = Utc::now();
669
670        match (&query.time_point, &query.time_range) {
671            (Some(time_point), None) => {
672                let timestamp = self.resolve_time_point(time_point).await?;
673                Ok((timestamp, timestamp))
674            }
675            (None, Some(time_range)) => {
676                let start = self.resolve_time_point(&time_range.start).await?;
677                let end = self.resolve_time_point(&time_range.end).await?;
678                Ok((start, end))
679            }
680            (None, None) => {
681                // Default to last 24 hours
682                let start = now - ChronoDuration::hours(24);
683                Ok((start, now))
684            }
685            (Some(_), Some(_)) => Err(anyhow!("Cannot specify both time_point and time_range")),
686        }
687    }
688
689    /// Resolve a time point to an actual timestamp
690    async fn resolve_time_point(&self, time_point: &TimePoint) -> Result<DateTime<Utc>> {
691        match time_point {
692            TimePoint::Timestamp(timestamp) => Ok(*timestamp),
693            TimePoint::RelativeTime(duration) => Ok(Utc::now() + *duration),
694            TimePoint::Version(version) => {
695                // Find timestamp for this version
696                let index = self.temporal_index.read().await;
697                if let Some(entry) = index.version_index.get(version) {
698                    Ok(entry.timestamp)
699                } else {
700                    Err(anyhow!("Version {} not found", version))
701                }
702            }
703            TimePoint::EventId(event_id) => {
704                // Find timestamp for this event ID
705                if let Some(event) = self.load_event(*event_id).await? {
706                    Ok(event.metadata().timestamp)
707                } else {
708                    Err(anyhow!("Event {} not found", event_id))
709                }
710            }
711            TimePoint::Snapshot(name) => {
712                // This would integrate with snapshot store
713                Err(anyhow!("Snapshot resolution not implemented: {}", name))
714            }
715        }
716    }
717
718    /// Find events using temporal index
719    async fn find_events_with_index(
720        &self,
721        query: &TemporalQuery,
722        start_time: DateTime<Utc>,
723        end_time: DateTime<Utc>,
724    ) -> Result<Vec<Uuid>> {
725        let index = self.temporal_index.read().await;
726
727        // Use most specific index available
728        if let Some(ref aggregate_ids) = query.filter.aggregate_ids {
729            if aggregate_ids.len() == 1 {
730                let aggregate_id = aggregate_ids
731                    .iter()
732                    .next()
733                    .expect("aggregate_ids validated to have exactly 1 element");
734                return Ok(index.find_events_by_aggregate(aggregate_id, start_time, end_time));
735            }
736        }
737
738        Ok(index.find_events_by_time_range(start_time, end_time))
739    }
740
741    /// Find events without using index (sequential scan)
742    async fn find_events_without_index(
743        &self,
744        _query: &TemporalQuery,
745        _start_time: DateTime<Utc>,
746        _end_time: DateTime<Utc>,
747    ) -> Result<Vec<Uuid>> {
748        // This would scan all events in the time range
749        // For now, return empty set as this requires event store iteration
750        warn!("Sequential scan not implemented, returning empty result");
751        Ok(Vec::new())
752    }
753
754    /// Load a specific event by ID
755    async fn load_event(&self, _event_id: Uuid) -> Result<Option<StreamEvent>> {
756        // This would load from event store by ID
757        // For now, return None as this requires event store lookup by ID
758        Ok(None)
759    }
760
761    /// Check if event matches filter
762    fn matches_filter(&self, event: &StreamEvent, filter: &TemporalFilter) -> bool {
763        let metadata = event.metadata();
764        let event_type_str = format!("{event:?}");
765
766        if let Some(ref event_types) = filter.event_types {
767            if !event_types.contains(&event_type_str) {
768                return false;
769            }
770        }
771
772        if let Some(ref aggregate_ids) = filter.aggregate_ids {
773            if let Some(ref context) = metadata.context {
774                if !aggregate_ids.contains(context) {
775                    return false;
776                }
777            } else {
778                return false;
779            }
780        }
781
782        if let Some(ref user_ids) = filter.user_ids {
783            if let Some(ref user) = metadata.user {
784                if !user_ids.contains(user) {
785                    return false;
786                }
787            } else {
788                return false;
789            }
790        }
791
792        if let Some(ref sources) = filter.sources {
793            if !sources.contains(&metadata.source) {
794                return false;
795            }
796        }
797
798        // Apply custom filters
799        for custom_filter in &filter.custom_filters {
800            if !custom_filter(event) {
801                return false;
802            }
803        }
804
805        true
806    }
807
808    /// Apply ordering to events
809    fn apply_ordering(&self, events: &mut [StreamEvent], ordering: &TemporalOrdering) {
810        match ordering {
811            TemporalOrdering::TimeAscending => {
812                events.sort_by(|a, b| a.metadata().timestamp.cmp(&b.metadata().timestamp));
813            }
814            TemporalOrdering::TimeDescending => {
815                events.sort_by(|a, b| b.metadata().timestamp.cmp(&a.metadata().timestamp));
816            }
817            TemporalOrdering::VersionAscending => {
818                events.sort_by(|a, b| a.metadata().version.cmp(&b.metadata().version));
819            }
820            TemporalOrdering::VersionDescending => {
821                events.sort_by(|a, b| b.metadata().version.cmp(&a.metadata().version));
822            }
823            TemporalOrdering::Custom(_field, _ascending) => {
824                // Custom field ordering would be implemented here
825                warn!("Custom ordering not implemented");
826            }
827        }
828    }
829
830    /// Apply projection to events
831    fn apply_projection(
832        &self,
833        events: Vec<StreamEvent>,
834        projection: &TemporalProjection,
835    ) -> Vec<StreamEvent> {
836        match projection {
837            TemporalProjection::FullEvents => events,
838            TemporalProjection::MetadataOnly => {
839                // Return events with only metadata (simplified data)
840                // For metadata-only projection, we keep the event but could filter data in a real implementation
841                events
842            }
843            TemporalProjection::Fields(_fields) => {
844                // Field projection would be implemented here
845                warn!("Field projection not implemented");
846                events
847            }
848            TemporalProjection::Aggregation(_) => {
849                // Aggregation results are handled separately
850                Vec::new()
851            }
852        }
853    }
854
855    /// Generate result metadata
856    fn generate_result_metadata(
857        &self,
858        events: &[StreamEvent],
859        _start_time: DateTime<Utc>,
860        _end_time: DateTime<Utc>,
861    ) -> TemporalResultMetadata {
862        let total_events = events.len();
863
864        let time_range_covered = if !events.is_empty() {
865            let min_time = events
866                .iter()
867                .map(|e| e.metadata().timestamp)
868                .min()
869                .expect("events validated to be non-empty");
870            let max_time = events
871                .iter()
872                .map(|e| e.metadata().timestamp)
873                .max()
874                .expect("events validated to be non-empty");
875            Some((min_time, max_time))
876        } else {
877            None
878        };
879
880        let version_range_covered = if !events.is_empty() {
881            let min_version = events
882                .iter()
883                .filter_map(|e| e.metadata().version.parse::<u64>().ok())
884                .min();
885            let max_version = events
886                .iter()
887                .filter_map(|e| e.metadata().version.parse::<u64>().ok())
888                .max();
889            if let (Some(min), Some(max)) = (min_version, max_version) {
890                Some((min, max))
891            } else {
892                None
893            }
894        } else {
895            None
896        };
897
898        let aggregates_scanned: HashSet<String> = events
899            .iter()
900            .filter_map(|e| e.metadata().context.clone())
901            .collect();
902
903        TemporalResultMetadata {
904            total_events,
905            time_range_covered,
906            version_range_covered,
907            aggregates_scanned,
908            index_hits: 0, // Would be tracked during execution
909            index_misses: 0,
910        }
911    }
912
913    /// Generate aggregations
914    fn generate_aggregations(
915        &self,
916        events: &[StreamEvent],
917        agg_type: &AggregationType,
918        start_time: DateTime<Utc>,
919        end_time: DateTime<Utc>,
920    ) -> Result<TemporalAggregations> {
921        match agg_type {
922            AggregationType::Count => Ok(TemporalAggregations {
923                count: events.len(),
924                count_by_type: HashMap::new(),
925                timeline: Vec::new(),
926                statistics: self.calculate_statistics(events, start_time, end_time),
927            }),
928            AggregationType::CountBy(field) => {
929                let mut count_by_type = HashMap::new();
930                for event in events {
931                    if field == "event_type" {
932                        let event_type = format!("{event:?}");
933                        *count_by_type.entry(event_type).or_insert(0) += 1;
934                    }
935                    // Other fields would be handled here
936                }
937
938                Ok(TemporalAggregations {
939                    count: events.len(),
940                    count_by_type,
941                    timeline: Vec::new(),
942                    statistics: self.calculate_statistics(events, start_time, end_time),
943                })
944            }
945            AggregationType::Timeline(granularity) => {
946                let timeline = self.generate_timeline(events, *granularity, start_time, end_time);
947
948                Ok(TemporalAggregations {
949                    count: events.len(),
950                    count_by_type: HashMap::new(),
951                    timeline,
952                    statistics: self.calculate_statistics(events, start_time, end_time),
953                })
954            }
955            AggregationType::Statistics => Ok(TemporalAggregations {
956                count: events.len(),
957                count_by_type: HashMap::new(),
958                timeline: Vec::new(),
959                statistics: self.calculate_statistics(events, start_time, end_time),
960            }),
961        }
962    }
963
964    /// Generate timeline aggregation
965    fn generate_timeline(
966        &self,
967        events: &[StreamEvent],
968        granularity: ChronoDuration,
969        start_time: DateTime<Utc>,
970        end_time: DateTime<Utc>,
971    ) -> Vec<TimelinePoint> {
972        let mut timeline = Vec::new();
973        let mut current_time = start_time;
974
975        while current_time < end_time {
976            let window_end = current_time + granularity;
977
978            let events_in_window: Vec<_> = events
979                .iter()
980                .filter(|e| {
981                    e.metadata().timestamp >= current_time && e.metadata().timestamp < window_end
982                })
983                .collect();
984
985            let mut event_types = HashMap::new();
986            for event in &events_in_window {
987                let event_type = format!("{event:?}");
988                *event_types.entry(event_type).or_insert(0) += 1;
989            }
990
991            timeline.push(TimelinePoint {
992                timestamp: current_time,
993                count: events_in_window.len(),
994                event_types,
995            });
996
997            current_time = window_end;
998        }
999
1000        timeline
1001    }
1002
1003    /// Calculate temporal statistics
1004    fn calculate_statistics(
1005        &self,
1006        events: &[StreamEvent],
1007        start_time: DateTime<Utc>,
1008        end_time: DateTime<Utc>,
1009    ) -> TemporalStatistics {
1010        let time_span = end_time.signed_duration_since(start_time);
1011        let events_per_second = if time_span.num_seconds() > 0 {
1012            events.len() as f64 / time_span.num_seconds() as f64
1013        } else {
1014            0.0
1015        };
1016
1017        // Calculate peak throughput (events per second in busiest minute)
1018        let peak_throughput = if !events.is_empty() {
1019            let mut minute_counts = HashMap::new();
1020            for event in events {
1021                let minute = event
1022                    .metadata()
1023                    .timestamp
1024                    .format("%Y-%m-%d %H:%M")
1025                    .to_string();
1026                *minute_counts.entry(minute).or_insert(0) += 1;
1027            }
1028            minute_counts.values().max().copied().unwrap_or(0) as f64
1029        } else {
1030            0.0
1031        };
1032
1033        // Calculate average event size
1034        let total_size: usize = events.iter().map(|e| format!("{e:?}").len()).sum();
1035        let average_event_size = if !events.is_empty() {
1036            total_size as f64 / events.len() as f64
1037        } else {
1038            0.0
1039        };
1040
1041        let unique_aggregates = events
1042            .iter()
1043            .filter_map(|e| e.metadata().context.as_ref())
1044            .collect::<HashSet<_>>()
1045            .len();
1046
1047        let unique_users = events
1048            .iter()
1049            .filter_map(|e| e.metadata().user.as_ref())
1050            .collect::<HashSet<_>>()
1051            .len();
1052
1053        TemporalStatistics {
1054            events_per_second,
1055            peak_throughput,
1056            average_event_size,
1057            unique_aggregates,
1058            unique_users,
1059            time_span,
1060        }
1061    }
1062
1063    /// Generate cache key for query
1064    fn generate_cache_key(&self, query: &TemporalQuery) -> String {
1065        // Simple cache key based on query structure
1066        format!("temporal_query_{:?}", query.query_id)
1067    }
1068
1069    /// Get time-travel metrics
1070    pub async fn get_metrics(&self) -> TimeTravelMetrics {
1071        self.metrics.read().await.clone()
1072    }
1073}
1074
1075/// Query cache for temporal queries
1076#[derive(Debug)]
1077struct QueryCache {
1078    config: TimeTravelConfig,
1079    entries: HashMap<String, CachedResult>,
1080}
1081
1082#[derive(Debug, Clone)]
1083struct CachedResult {
1084    events: Vec<StreamEvent>,
1085    metadata: TemporalResultMetadata,
1086    aggregations: Option<TemporalAggregations>,
1087    cached_at: DateTime<Utc>,
1088}
1089
1090impl QueryCache {
1091    fn new(config: TimeTravelConfig) -> Self {
1092        Self {
1093            config,
1094            entries: HashMap::new(),
1095        }
1096    }
1097
1098    fn get(&self, key: &str) -> Option<CachedResult> {
1099        if let Some(entry) = self.entries.get(key) {
1100            let age = Utc::now().signed_duration_since(entry.cached_at);
1101            if age.num_minutes() < self.config.cache_ttl_minutes as i64 {
1102                return Some(entry.clone());
1103            }
1104        }
1105        None
1106    }
1107
1108    fn set(&mut self, key: String, result: TemporalQueryResult) {
1109        let entry = CachedResult {
1110            events: result.events,
1111            metadata: result.metadata,
1112            aggregations: result.aggregations,
1113            cached_at: Utc::now(),
1114        };
1115
1116        self.entries.insert(key, entry);
1117        self.evict_if_needed();
1118    }
1119
1120    fn evict_if_needed(&mut self) {
1121        // Remove expired entries
1122        let now = Utc::now();
1123        self.entries.retain(|_, entry| {
1124            let age = now.signed_duration_since(entry.cached_at);
1125            age.num_minutes() < self.config.cache_ttl_minutes as i64
1126        });
1127
1128        // Simple memory management (could be more sophisticated)
1129        while self.entries.len() > 1000 {
1130            if let Some(oldest_key) = self
1131                .entries
1132                .iter()
1133                .min_by_key(|(_, entry)| entry.cached_at)
1134                .map(|(key, _)| key.clone())
1135            {
1136                self.entries.remove(&oldest_key);
1137            } else {
1138                break;
1139            }
1140        }
1141    }
1142}
1143
1144/// Time-travel engine metrics
1145#[derive(Debug, Clone, Default)]
1146pub struct TimeTravelMetrics {
1147    pub queries_executed: u64,
1148    pub queries_succeeded: u64,
1149    pub queries_failed: u64,
1150    pub active_queries: u64,
1151    pub cache_hits: u64,
1152    pub cache_misses: u64,
1153    pub index_hits: u64,
1154    pub index_misses: u64,
1155    pub average_query_time_ms: f64,
1156}
1157
1158#[cfg(test)]
1159mod tests {
1160    use super::*;
1161
1162    #[tokio::test]
1163    async fn test_time_travel_config_defaults() {
1164        let config = TimeTravelConfig::default();
1165        assert_eq!(config.max_time_window_days, 365);
1166        assert!(config.enable_temporal_indexing);
1167        assert_eq!(config.index_granularity_minutes, 60);
1168    }
1169
1170    #[tokio::test]
1171    async fn test_temporal_query_builder() {
1172        let query = TemporalQuery::new()
1173            .at_time(TimePoint::Timestamp(Utc::now()))
1174            .filter(TemporalFilter::default())
1175            .order_by(TemporalOrdering::TimeDescending)
1176            .limit(100);
1177
1178        assert!(query.time_point.is_some());
1179        assert!(query.limit.is_some());
1180        assert_eq!(query.limit.unwrap(), 100);
1181    }
1182
1183    #[tokio::test]
1184    async fn test_time_point_resolution() {
1185        let now = Utc::now();
1186        let relative = TimePoint::RelativeTime(ChronoDuration::hours(-1));
1187
1188        match relative {
1189            TimePoint::RelativeTime(duration) => {
1190                let resolved = now + duration;
1191                assert!(resolved < now);
1192            }
1193            _ => panic!("Expected RelativeTime"),
1194        }
1195    }
1196
1197    #[tokio::test]
1198    async fn test_temporal_filter() {
1199        let filter = TemporalFilter {
1200            event_types: Some(std::iter::once("TestEvent".to_string()).collect()),
1201            ..Default::default()
1202        };
1203
1204        assert!(filter.event_types.is_some());
1205        assert!(filter.event_types.as_ref().unwrap().contains("TestEvent"));
1206    }
1207}