1use crate::event_sourcing::{EventStoreTrait, EventStream};
7use crate::StreamEvent;
8use anyhow::{anyhow, Result};
9use chrono::{DateTime, Duration as ChronoDuration, Utc};
10use serde::{Deserialize, Serialize};
11use std::collections::{BTreeMap, HashMap, HashSet};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use tokio::sync::RwLock;
15use tracing::{debug, error, info, warn};
16use uuid::Uuid;
17
18pub type CustomFilterFn = Box<dyn Fn(&StreamEvent) -> bool + Send + Sync>;
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct TimeTravelConfig {
24 pub max_time_window_days: u32,
26 pub enable_temporal_indexing: bool,
28 pub index_granularity_minutes: u32,
30 pub max_concurrent_queries: usize,
32 pub query_timeout_seconds: u64,
34 pub enable_result_caching: bool,
36 pub cache_ttl_minutes: u32,
38 pub max_cache_size_mb: usize,
40}
41
42impl Default for TimeTravelConfig {
43 fn default() -> Self {
44 Self {
45 max_time_window_days: 365,
46 enable_temporal_indexing: true,
47 index_granularity_minutes: 60,
48 max_concurrent_queries: 100,
49 query_timeout_seconds: 300,
50 enable_result_caching: true,
51 cache_ttl_minutes: 60,
52 max_cache_size_mb: 1024,
53 }
54 }
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
59pub enum TimePoint {
60 Timestamp(DateTime<Utc>),
62 RelativeTime(ChronoDuration),
64 Version(u64),
66 EventId(Uuid),
68 Snapshot(String),
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct TimeRange {
75 pub start: TimePoint,
76 pub end: TimePoint,
77}
78
79#[derive(Debug, Clone)]
81pub struct TemporalQuery {
82 pub query_id: Uuid,
83 pub time_point: Option<TimePoint>,
84 pub time_range: Option<TimeRange>,
85 pub filter: TemporalFilter,
86 pub projection: TemporalProjection,
87 pub ordering: TemporalOrdering,
88 pub limit: Option<usize>,
89}
90
91impl Default for TemporalQuery {
92 fn default() -> Self {
93 Self::new()
94 }
95}
96
97impl TemporalQuery {
98 pub fn new() -> Self {
100 Self {
101 query_id: Uuid::new_v4(),
102 time_point: None,
103 time_range: None,
104 filter: TemporalFilter::default(),
105 projection: TemporalProjection::default(),
106 ordering: TemporalOrdering::default(),
107 limit: None,
108 }
109 }
110
111 pub fn at_time(mut self, time_point: TimePoint) -> Self {
113 self.time_point = Some(time_point);
114 self
115 }
116
117 pub fn in_range(mut self, time_range: TimeRange) -> Self {
119 self.time_range = Some(time_range);
120 self
121 }
122
123 pub fn filter(mut self, filter: TemporalFilter) -> Self {
125 self.filter = filter;
126 self
127 }
128
129 pub fn project(mut self, projection: TemporalProjection) -> Self {
131 self.projection = projection;
132 self
133 }
134
135 pub fn order_by(mut self, ordering: TemporalOrdering) -> Self {
137 self.ordering = ordering;
138 self
139 }
140
141 pub fn limit(mut self, limit: usize) -> Self {
143 self.limit = Some(limit);
144 self
145 }
146}
147
148#[derive(Default)]
150pub struct TemporalFilter {
151 pub event_types: Option<HashSet<String>>,
152 pub aggregate_ids: Option<HashSet<String>>,
153 pub user_ids: Option<HashSet<String>>,
154 pub sources: Option<HashSet<String>>,
155 pub custom_filters: Vec<CustomFilterFn>,
156}
157
158impl std::fmt::Debug for TemporalFilter {
159 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160 f.debug_struct("TemporalFilter")
161 .field("event_types", &self.event_types)
162 .field("aggregate_ids", &self.aggregate_ids)
163 .field("user_ids", &self.user_ids)
164 .field("sources", &self.sources)
165 .field(
166 "custom_filters",
167 &format!("<{} filters>", self.custom_filters.len()),
168 )
169 .finish()
170 }
171}
172
173impl Clone for TemporalFilter {
174 fn clone(&self) -> Self {
175 Self {
176 event_types: self.event_types.clone(),
177 aggregate_ids: self.aggregate_ids.clone(),
178 user_ids: self.user_ids.clone(),
179 sources: self.sources.clone(),
180 custom_filters: Vec::new(), }
182 }
183}
184
185#[derive(Debug, Clone, Default)]
187pub enum TemporalProjection {
188 #[default]
190 FullEvents,
191 MetadataOnly,
193 Fields(Vec<String>),
195 Aggregation(AggregationType),
197}
198
199#[derive(Debug, Clone)]
201pub enum AggregationType {
202 Count,
203 CountBy(String),
204 Timeline(ChronoDuration),
205 Statistics,
206}
207
208#[derive(Debug, Clone, Default)]
210pub enum TemporalOrdering {
211 TimeAscending,
213 #[default]
215 TimeDescending,
216 VersionAscending,
218 VersionDescending,
220 Custom(String, bool), }
223
224#[derive(Debug, Clone)]
226pub struct TemporalQueryResult {
227 pub query_id: Uuid,
228 pub events: Vec<StreamEvent>,
229 pub metadata: TemporalResultMetadata,
230 pub aggregations: Option<TemporalAggregations>,
231 pub execution_time: Duration,
232 pub from_cache: bool,
233}
234
235#[derive(Debug, Clone)]
237pub struct TemporalResultMetadata {
238 pub total_events: usize,
239 pub time_range_covered: Option<(DateTime<Utc>, DateTime<Utc>)>,
240 pub version_range_covered: Option<(u64, u64)>,
241 pub aggregates_scanned: HashSet<String>,
242 pub index_hits: usize,
243 pub index_misses: usize,
244}
245
246#[derive(Debug, Clone)]
248pub struct TemporalAggregations {
249 pub count: usize,
250 pub count_by_type: HashMap<String, usize>,
251 pub timeline: Vec<TimelinePoint>,
252 pub statistics: TemporalStatistics,
253}
254
255#[derive(Debug, Clone)]
257pub struct TimelinePoint {
258 pub timestamp: DateTime<Utc>,
259 pub count: usize,
260 pub event_types: HashMap<String, usize>,
261}
262
263#[derive(Debug, Clone)]
265pub struct TemporalStatistics {
266 pub events_per_second: f64,
267 pub peak_throughput: f64,
268 pub average_event_size: f64,
269 pub unique_aggregates: usize,
270 pub unique_users: usize,
271 pub time_span: ChronoDuration,
272}
273
274#[derive(Debug)]
276struct TemporalIndex {
277 time_index: BTreeMap<DateTime<Utc>, Vec<Uuid>>,
279 version_index: BTreeMap<u64, EventIndexEntry>,
281 aggregate_index: HashMap<String, BTreeMap<DateTime<Utc>, Vec<Uuid>>>,
283 type_index: HashMap<String, BTreeMap<DateTime<Utc>, Vec<Uuid>>>,
285}
286
287#[derive(Debug, Clone)]
288struct EventIndexEntry {
289 pub event_id: Uuid,
290 pub timestamp: DateTime<Utc>,
291 pub aggregate_id: String,
292 pub event_type: String,
293 pub version: u64,
294}
295
296impl TemporalIndex {
297 fn new() -> Self {
298 Self {
299 time_index: BTreeMap::new(),
300 version_index: BTreeMap::new(),
301 aggregate_index: HashMap::new(),
302 type_index: HashMap::new(),
303 }
304 }
305
306 fn add_event(&mut self, event: &StreamEvent) {
307 let metadata = event.metadata();
308 let timestamp = metadata.timestamp;
309 let event_id = uuid::Uuid::parse_str(&metadata.event_id).unwrap_or(uuid::Uuid::new_v4());
310 let aggregate_id = metadata.context.clone().unwrap_or_default();
311 let event_type = format!("{event:?}");
312 let version = metadata.version.parse::<u64>().unwrap_or(0);
313
314 self.time_index.entry(timestamp).or_default().push(event_id);
316
317 self.version_index.insert(
319 version,
320 EventIndexEntry {
321 event_id,
322 timestamp,
323 aggregate_id: aggregate_id.clone(),
324 event_type: event_type.clone(),
325 version,
326 },
327 );
328
329 self.aggregate_index
331 .entry(aggregate_id)
332 .or_default()
333 .entry(timestamp)
334 .or_default()
335 .push(event_id);
336
337 self.type_index
339 .entry(event_type)
340 .or_default()
341 .entry(timestamp)
342 .or_default()
343 .push(event_id);
344 }
345
346 fn find_events_by_time_range(&self, start: DateTime<Utc>, end: DateTime<Utc>) -> Vec<Uuid> {
347 let mut event_ids = Vec::new();
348
349 for (_, ids) in self.time_index.range(start..=end) {
350 event_ids.extend_from_slice(ids);
351 }
352
353 event_ids
354 }
355
356 fn find_events_by_version_range(&self, start: u64, end: u64) -> Vec<Uuid> {
357 let mut event_ids = Vec::new();
358
359 for (_, entry) in self.version_index.range(start..=end) {
360 event_ids.push(entry.event_id);
361 }
362
363 event_ids
364 }
365
366 fn find_events_by_aggregate(
367 &self,
368 aggregate_id: &str,
369 start: DateTime<Utc>,
370 end: DateTime<Utc>,
371 ) -> Vec<Uuid> {
372 if let Some(time_map) = self.aggregate_index.get(aggregate_id) {
373 let mut event_ids = Vec::new();
374 for (_, ids) in time_map.range(start..=end) {
375 event_ids.extend_from_slice(ids);
376 }
377 event_ids
378 } else {
379 Vec::new()
380 }
381 }
382}
383
384pub struct TimeTravelEngine {
386 config: TimeTravelConfig,
387 event_store: Arc<dyn EventStoreTrait>,
388 event_stream: Arc<dyn EventStream>,
389 temporal_index: Arc<RwLock<TemporalIndex>>,
390 query_cache: Arc<RwLock<QueryCache>>,
391 query_semaphore: Arc<tokio::sync::Semaphore>,
392 metrics: Arc<RwLock<TimeTravelMetrics>>,
393}
394
395impl TimeTravelEngine {
396 pub fn new(
398 config: TimeTravelConfig,
399 event_store: Arc<dyn EventStoreTrait>,
400 event_stream: Arc<dyn EventStream>,
401 ) -> Self {
402 Self {
403 query_semaphore: Arc::new(tokio::sync::Semaphore::new(config.max_concurrent_queries)),
404 temporal_index: Arc::new(RwLock::new(TemporalIndex::new())),
405 query_cache: Arc::new(RwLock::new(QueryCache::new(config.clone()))),
406 config,
407 event_store,
408 event_stream,
409 metrics: Arc::new(RwLock::new(TimeTravelMetrics::default())),
410 }
411 }
412
413 pub async fn start(&self) -> Result<()> {
415 info!("Starting time-travel engine");
416
417 if self.config.enable_temporal_indexing {
419 self.build_temporal_index().await?;
420 }
421
422 let index = Arc::clone(&self.temporal_index);
424 let event_stream = Arc::clone(&self.event_stream);
425
426 tokio::spawn(async move {
427 let mut interval = tokio::time::interval(Duration::from_secs(60));
428 loop {
429 interval.tick().await;
430 if let Err(e) =
431 Self::update_index(Arc::clone(&index), Arc::clone(&event_stream)).await
432 {
433 error!("Failed to update temporal index: {}", e);
434 }
435 }
436 });
437
438 info!("Time-travel engine started successfully");
439 Ok(())
440 }
441
442 pub async fn execute_query(&self, query: TemporalQuery) -> Result<TemporalQueryResult> {
444 let start_time = Instant::now();
445 let query_id = query.query_id;
446
447 debug!("Executing temporal query {}", query_id);
448
449 let _permit = self.query_semaphore.acquire().await?;
451
452 {
454 let mut metrics = self.metrics.write().await;
455 metrics.queries_executed += 1;
456 metrics.active_queries += 1;
457 }
458
459 let cache_key = self.generate_cache_key(&query);
461 if self.config.enable_result_caching {
462 let cache = self.query_cache.read().await;
463 if let Some(cached_result) = cache.get(&cache_key) {
464 let mut metrics = self.metrics.write().await;
465 metrics.active_queries -= 1;
466 metrics.cache_hits += 1;
467
468 return Ok(TemporalQueryResult {
469 query_id,
470 events: cached_result.events,
471 metadata: cached_result.metadata,
472 aggregations: cached_result.aggregations,
473 execution_time: start_time.elapsed(),
474 from_cache: true,
475 });
476 }
477 }
478
479 let result = self.execute_query_internal(query).await;
480
481 {
483 let mut metrics = self.metrics.write().await;
484 metrics.active_queries -= 1;
485 match &result {
486 Ok(_) => {
487 metrics.queries_succeeded += 1;
488 if !self.config.enable_result_caching {
489 metrics.cache_misses += 1;
490 }
491 }
492 Err(_) => metrics.queries_failed += 1,
493 }
494 }
495
496 let execution_time = start_time.elapsed();
497 debug!(
498 "Temporal query {} executed in {:?}",
499 query_id, execution_time
500 );
501
502 if let Ok(ref res) = result {
503 if self.config.enable_result_caching {
505 let mut cache = self.query_cache.write().await;
506 cache.set(cache_key, res.clone());
507 }
508 }
509
510 result.map(|mut r| {
511 r.execution_time = execution_time;
512 r.from_cache = false;
513 r
514 })
515 }
516
517 async fn execute_query_internal(&self, query: TemporalQuery) -> Result<TemporalQueryResult> {
519 let query_id = query.query_id;
520
521 let (start_time, end_time) = self.resolve_time_range(&query).await?;
523
524 let candidate_event_ids = if self.config.enable_temporal_indexing {
526 self.find_events_with_index(&query, start_time, end_time)
527 .await?
528 } else {
529 self.find_events_without_index(&query, start_time, end_time)
530 .await?
531 };
532
533 let mut events = Vec::new();
535 for event_id in candidate_event_ids {
536 if let Some(event) = self.load_event(event_id).await? {
537 if self.matches_filter(&event, &query.filter) {
538 events.push(event);
539 }
540 }
541 }
542
543 self.apply_ordering(&mut events, &query.ordering);
545
546 if let Some(limit) = query.limit {
548 events.truncate(limit);
549 }
550
551 let metadata = self.generate_result_metadata(&events, start_time, end_time);
553
554 let aggregations = match query.projection {
556 TemporalProjection::Aggregation(ref agg_type) => {
557 Some(self.generate_aggregations(&events, agg_type, start_time, end_time)?)
558 }
559 _ => None,
560 };
561
562 let projected_events = self.apply_projection(events, &query.projection);
564
565 Ok(TemporalQueryResult {
566 query_id,
567 events: projected_events,
568 metadata,
569 aggregations,
570 execution_time: Duration::default(), from_cache: false,
572 })
573 }
574
575 pub async fn query_state_at_time(
577 &self,
578 aggregate_id: &str,
579 time_point: TimePoint,
580 ) -> Result<Vec<StreamEvent>> {
581 let query = TemporalQuery::new()
582 .at_time(time_point)
583 .filter(TemporalFilter {
584 aggregate_ids: Some(std::iter::once(aggregate_id.to_string()).collect()),
585 ..Default::default()
586 });
587
588 let result = self.execute_query(query).await?;
589 Ok(result.events)
590 }
591
592 pub async fn query_changes_between(
594 &self,
595 start: TimePoint,
596 end: TimePoint,
597 filter: Option<TemporalFilter>,
598 ) -> Result<Vec<StreamEvent>> {
599 let query = TemporalQuery::new()
600 .in_range(TimeRange { start, end })
601 .filter(filter.unwrap_or_default());
602
603 let result = self.execute_query(query).await?;
604 Ok(result.events)
605 }
606
607 pub async fn query_timeline(
609 &self,
610 time_range: TimeRange,
611 granularity: ChronoDuration,
612 filter: Option<TemporalFilter>,
613 ) -> Result<Vec<TimelinePoint>> {
614 let query = TemporalQuery::new()
615 .in_range(time_range)
616 .filter(filter.unwrap_or_default())
617 .project(TemporalProjection::Aggregation(AggregationType::Timeline(
618 granularity,
619 )));
620
621 let result = self.execute_query(query).await?;
622 Ok(result.aggregations.map(|a| a.timeline).unwrap_or_default())
623 }
624
625 async fn build_temporal_index(&self) -> Result<()> {
627 info!("Building temporal index");
628
629 let events = self
630 .event_stream
631 .read_events_from_position(0, usize::MAX)
632 .await?;
633 let mut index = self.temporal_index.write().await;
634
635 for stored_event in events {
636 index.add_event(&stored_event.event_data);
637 }
638
639 info!(
640 "Temporal index built with {} events",
641 index.time_index.len()
642 );
643 Ok(())
644 }
645
646 async fn update_index(
648 index: Arc<RwLock<TemporalIndex>>,
649 event_stream: Arc<dyn EventStream>,
650 ) -> Result<()> {
651 let events = event_stream.read_events_from_position(0, 10000).await?;
654 let mut idx = index.write().await;
655
656 for stored_event in events {
657 idx.add_event(&stored_event.event_data);
658 }
659
660 Ok(())
661 }
662
663 async fn resolve_time_range(
665 &self,
666 query: &TemporalQuery,
667 ) -> Result<(DateTime<Utc>, DateTime<Utc>)> {
668 let now = Utc::now();
669
670 match (&query.time_point, &query.time_range) {
671 (Some(time_point), None) => {
672 let timestamp = self.resolve_time_point(time_point).await?;
673 Ok((timestamp, timestamp))
674 }
675 (None, Some(time_range)) => {
676 let start = self.resolve_time_point(&time_range.start).await?;
677 let end = self.resolve_time_point(&time_range.end).await?;
678 Ok((start, end))
679 }
680 (None, None) => {
681 let start = now - ChronoDuration::hours(24);
683 Ok((start, now))
684 }
685 (Some(_), Some(_)) => Err(anyhow!("Cannot specify both time_point and time_range")),
686 }
687 }
688
689 async fn resolve_time_point(&self, time_point: &TimePoint) -> Result<DateTime<Utc>> {
691 match time_point {
692 TimePoint::Timestamp(timestamp) => Ok(*timestamp),
693 TimePoint::RelativeTime(duration) => Ok(Utc::now() + *duration),
694 TimePoint::Version(version) => {
695 let index = self.temporal_index.read().await;
697 if let Some(entry) = index.version_index.get(version) {
698 Ok(entry.timestamp)
699 } else {
700 Err(anyhow!("Version {} not found", version))
701 }
702 }
703 TimePoint::EventId(event_id) => {
704 if let Some(event) = self.load_event(*event_id).await? {
706 Ok(event.metadata().timestamp)
707 } else {
708 Err(anyhow!("Event {} not found", event_id))
709 }
710 }
711 TimePoint::Snapshot(name) => {
712 Err(anyhow!("Snapshot resolution not implemented: {}", name))
714 }
715 }
716 }
717
718 async fn find_events_with_index(
720 &self,
721 query: &TemporalQuery,
722 start_time: DateTime<Utc>,
723 end_time: DateTime<Utc>,
724 ) -> Result<Vec<Uuid>> {
725 let index = self.temporal_index.read().await;
726
727 if let Some(ref aggregate_ids) = query.filter.aggregate_ids {
729 if aggregate_ids.len() == 1 {
730 let aggregate_id = aggregate_ids
731 .iter()
732 .next()
733 .expect("aggregate_ids validated to have exactly 1 element");
734 return Ok(index.find_events_by_aggregate(aggregate_id, start_time, end_time));
735 }
736 }
737
738 Ok(index.find_events_by_time_range(start_time, end_time))
739 }
740
741 async fn find_events_without_index(
743 &self,
744 _query: &TemporalQuery,
745 _start_time: DateTime<Utc>,
746 _end_time: DateTime<Utc>,
747 ) -> Result<Vec<Uuid>> {
748 warn!("Sequential scan not implemented, returning empty result");
751 Ok(Vec::new())
752 }
753
754 async fn load_event(&self, _event_id: Uuid) -> Result<Option<StreamEvent>> {
756 Ok(None)
759 }
760
761 fn matches_filter(&self, event: &StreamEvent, filter: &TemporalFilter) -> bool {
763 let metadata = event.metadata();
764 let event_type_str = format!("{event:?}");
765
766 if let Some(ref event_types) = filter.event_types {
767 if !event_types.contains(&event_type_str) {
768 return false;
769 }
770 }
771
772 if let Some(ref aggregate_ids) = filter.aggregate_ids {
773 if let Some(ref context) = metadata.context {
774 if !aggregate_ids.contains(context) {
775 return false;
776 }
777 } else {
778 return false;
779 }
780 }
781
782 if let Some(ref user_ids) = filter.user_ids {
783 if let Some(ref user) = metadata.user {
784 if !user_ids.contains(user) {
785 return false;
786 }
787 } else {
788 return false;
789 }
790 }
791
792 if let Some(ref sources) = filter.sources {
793 if !sources.contains(&metadata.source) {
794 return false;
795 }
796 }
797
798 for custom_filter in &filter.custom_filters {
800 if !custom_filter(event) {
801 return false;
802 }
803 }
804
805 true
806 }
807
808 fn apply_ordering(&self, events: &mut [StreamEvent], ordering: &TemporalOrdering) {
810 match ordering {
811 TemporalOrdering::TimeAscending => {
812 events.sort_by(|a, b| a.metadata().timestamp.cmp(&b.metadata().timestamp));
813 }
814 TemporalOrdering::TimeDescending => {
815 events.sort_by(|a, b| b.metadata().timestamp.cmp(&a.metadata().timestamp));
816 }
817 TemporalOrdering::VersionAscending => {
818 events.sort_by(|a, b| a.metadata().version.cmp(&b.metadata().version));
819 }
820 TemporalOrdering::VersionDescending => {
821 events.sort_by(|a, b| b.metadata().version.cmp(&a.metadata().version));
822 }
823 TemporalOrdering::Custom(_field, _ascending) => {
824 warn!("Custom ordering not implemented");
826 }
827 }
828 }
829
830 fn apply_projection(
832 &self,
833 events: Vec<StreamEvent>,
834 projection: &TemporalProjection,
835 ) -> Vec<StreamEvent> {
836 match projection {
837 TemporalProjection::FullEvents => events,
838 TemporalProjection::MetadataOnly => {
839 events
842 }
843 TemporalProjection::Fields(_fields) => {
844 warn!("Field projection not implemented");
846 events
847 }
848 TemporalProjection::Aggregation(_) => {
849 Vec::new()
851 }
852 }
853 }
854
855 fn generate_result_metadata(
857 &self,
858 events: &[StreamEvent],
859 _start_time: DateTime<Utc>,
860 _end_time: DateTime<Utc>,
861 ) -> TemporalResultMetadata {
862 let total_events = events.len();
863
864 let time_range_covered = if !events.is_empty() {
865 let min_time = events
866 .iter()
867 .map(|e| e.metadata().timestamp)
868 .min()
869 .expect("events validated to be non-empty");
870 let max_time = events
871 .iter()
872 .map(|e| e.metadata().timestamp)
873 .max()
874 .expect("events validated to be non-empty");
875 Some((min_time, max_time))
876 } else {
877 None
878 };
879
880 let version_range_covered = if !events.is_empty() {
881 let min_version = events
882 .iter()
883 .filter_map(|e| e.metadata().version.parse::<u64>().ok())
884 .min();
885 let max_version = events
886 .iter()
887 .filter_map(|e| e.metadata().version.parse::<u64>().ok())
888 .max();
889 if let (Some(min), Some(max)) = (min_version, max_version) {
890 Some((min, max))
891 } else {
892 None
893 }
894 } else {
895 None
896 };
897
898 let aggregates_scanned: HashSet<String> = events
899 .iter()
900 .filter_map(|e| e.metadata().context.clone())
901 .collect();
902
903 TemporalResultMetadata {
904 total_events,
905 time_range_covered,
906 version_range_covered,
907 aggregates_scanned,
908 index_hits: 0, index_misses: 0,
910 }
911 }
912
913 fn generate_aggregations(
915 &self,
916 events: &[StreamEvent],
917 agg_type: &AggregationType,
918 start_time: DateTime<Utc>,
919 end_time: DateTime<Utc>,
920 ) -> Result<TemporalAggregations> {
921 match agg_type {
922 AggregationType::Count => Ok(TemporalAggregations {
923 count: events.len(),
924 count_by_type: HashMap::new(),
925 timeline: Vec::new(),
926 statistics: self.calculate_statistics(events, start_time, end_time),
927 }),
928 AggregationType::CountBy(field) => {
929 let mut count_by_type = HashMap::new();
930 for event in events {
931 if field == "event_type" {
932 let event_type = format!("{event:?}");
933 *count_by_type.entry(event_type).or_insert(0) += 1;
934 }
935 }
937
938 Ok(TemporalAggregations {
939 count: events.len(),
940 count_by_type,
941 timeline: Vec::new(),
942 statistics: self.calculate_statistics(events, start_time, end_time),
943 })
944 }
945 AggregationType::Timeline(granularity) => {
946 let timeline = self.generate_timeline(events, *granularity, start_time, end_time);
947
948 Ok(TemporalAggregations {
949 count: events.len(),
950 count_by_type: HashMap::new(),
951 timeline,
952 statistics: self.calculate_statistics(events, start_time, end_time),
953 })
954 }
955 AggregationType::Statistics => Ok(TemporalAggregations {
956 count: events.len(),
957 count_by_type: HashMap::new(),
958 timeline: Vec::new(),
959 statistics: self.calculate_statistics(events, start_time, end_time),
960 }),
961 }
962 }
963
964 fn generate_timeline(
966 &self,
967 events: &[StreamEvent],
968 granularity: ChronoDuration,
969 start_time: DateTime<Utc>,
970 end_time: DateTime<Utc>,
971 ) -> Vec<TimelinePoint> {
972 let mut timeline = Vec::new();
973 let mut current_time = start_time;
974
975 while current_time < end_time {
976 let window_end = current_time + granularity;
977
978 let events_in_window: Vec<_> = events
979 .iter()
980 .filter(|e| {
981 e.metadata().timestamp >= current_time && e.metadata().timestamp < window_end
982 })
983 .collect();
984
985 let mut event_types = HashMap::new();
986 for event in &events_in_window {
987 let event_type = format!("{event:?}");
988 *event_types.entry(event_type).or_insert(0) += 1;
989 }
990
991 timeline.push(TimelinePoint {
992 timestamp: current_time,
993 count: events_in_window.len(),
994 event_types,
995 });
996
997 current_time = window_end;
998 }
999
1000 timeline
1001 }
1002
1003 fn calculate_statistics(
1005 &self,
1006 events: &[StreamEvent],
1007 start_time: DateTime<Utc>,
1008 end_time: DateTime<Utc>,
1009 ) -> TemporalStatistics {
1010 let time_span = end_time.signed_duration_since(start_time);
1011 let events_per_second = if time_span.num_seconds() > 0 {
1012 events.len() as f64 / time_span.num_seconds() as f64
1013 } else {
1014 0.0
1015 };
1016
1017 let peak_throughput = if !events.is_empty() {
1019 let mut minute_counts = HashMap::new();
1020 for event in events {
1021 let minute = event
1022 .metadata()
1023 .timestamp
1024 .format("%Y-%m-%d %H:%M")
1025 .to_string();
1026 *minute_counts.entry(minute).or_insert(0) += 1;
1027 }
1028 minute_counts.values().max().copied().unwrap_or(0) as f64
1029 } else {
1030 0.0
1031 };
1032
1033 let total_size: usize = events.iter().map(|e| format!("{e:?}").len()).sum();
1035 let average_event_size = if !events.is_empty() {
1036 total_size as f64 / events.len() as f64
1037 } else {
1038 0.0
1039 };
1040
1041 let unique_aggregates = events
1042 .iter()
1043 .filter_map(|e| e.metadata().context.as_ref())
1044 .collect::<HashSet<_>>()
1045 .len();
1046
1047 let unique_users = events
1048 .iter()
1049 .filter_map(|e| e.metadata().user.as_ref())
1050 .collect::<HashSet<_>>()
1051 .len();
1052
1053 TemporalStatistics {
1054 events_per_second,
1055 peak_throughput,
1056 average_event_size,
1057 unique_aggregates,
1058 unique_users,
1059 time_span,
1060 }
1061 }
1062
1063 fn generate_cache_key(&self, query: &TemporalQuery) -> String {
1065 format!("temporal_query_{:?}", query.query_id)
1067 }
1068
1069 pub async fn get_metrics(&self) -> TimeTravelMetrics {
1071 self.metrics.read().await.clone()
1072 }
1073}
1074
1075#[derive(Debug)]
1077struct QueryCache {
1078 config: TimeTravelConfig,
1079 entries: HashMap<String, CachedResult>,
1080}
1081
1082#[derive(Debug, Clone)]
1083struct CachedResult {
1084 events: Vec<StreamEvent>,
1085 metadata: TemporalResultMetadata,
1086 aggregations: Option<TemporalAggregations>,
1087 cached_at: DateTime<Utc>,
1088}
1089
1090impl QueryCache {
1091 fn new(config: TimeTravelConfig) -> Self {
1092 Self {
1093 config,
1094 entries: HashMap::new(),
1095 }
1096 }
1097
1098 fn get(&self, key: &str) -> Option<CachedResult> {
1099 if let Some(entry) = self.entries.get(key) {
1100 let age = Utc::now().signed_duration_since(entry.cached_at);
1101 if age.num_minutes() < self.config.cache_ttl_minutes as i64 {
1102 return Some(entry.clone());
1103 }
1104 }
1105 None
1106 }
1107
1108 fn set(&mut self, key: String, result: TemporalQueryResult) {
1109 let entry = CachedResult {
1110 events: result.events,
1111 metadata: result.metadata,
1112 aggregations: result.aggregations,
1113 cached_at: Utc::now(),
1114 };
1115
1116 self.entries.insert(key, entry);
1117 self.evict_if_needed();
1118 }
1119
1120 fn evict_if_needed(&mut self) {
1121 let now = Utc::now();
1123 self.entries.retain(|_, entry| {
1124 let age = now.signed_duration_since(entry.cached_at);
1125 age.num_minutes() < self.config.cache_ttl_minutes as i64
1126 });
1127
1128 while self.entries.len() > 1000 {
1130 if let Some(oldest_key) = self
1131 .entries
1132 .iter()
1133 .min_by_key(|(_, entry)| entry.cached_at)
1134 .map(|(key, _)| key.clone())
1135 {
1136 self.entries.remove(&oldest_key);
1137 } else {
1138 break;
1139 }
1140 }
1141 }
1142}
1143
1144#[derive(Debug, Clone, Default)]
1146pub struct TimeTravelMetrics {
1147 pub queries_executed: u64,
1148 pub queries_succeeded: u64,
1149 pub queries_failed: u64,
1150 pub active_queries: u64,
1151 pub cache_hits: u64,
1152 pub cache_misses: u64,
1153 pub index_hits: u64,
1154 pub index_misses: u64,
1155 pub average_query_time_ms: f64,
1156}
1157
1158#[cfg(test)]
1159mod tests {
1160 use super::*;
1161
1162 #[tokio::test]
1163 async fn test_time_travel_config_defaults() {
1164 let config = TimeTravelConfig::default();
1165 assert_eq!(config.max_time_window_days, 365);
1166 assert!(config.enable_temporal_indexing);
1167 assert_eq!(config.index_granularity_minutes, 60);
1168 }
1169
1170 #[tokio::test]
1171 async fn test_temporal_query_builder() {
1172 let query = TemporalQuery::new()
1173 .at_time(TimePoint::Timestamp(Utc::now()))
1174 .filter(TemporalFilter::default())
1175 .order_by(TemporalOrdering::TimeDescending)
1176 .limit(100);
1177
1178 assert!(query.time_point.is_some());
1179 assert!(query.limit.is_some());
1180 assert_eq!(query.limit.unwrap(), 100);
1181 }
1182
1183 #[tokio::test]
1184 async fn test_time_point_resolution() {
1185 let now = Utc::now();
1186 let relative = TimePoint::RelativeTime(ChronoDuration::hours(-1));
1187
1188 match relative {
1189 TimePoint::RelativeTime(duration) => {
1190 let resolved = now + duration;
1191 assert!(resolved < now);
1192 }
1193 _ => panic!("Expected RelativeTime"),
1194 }
1195 }
1196
1197 #[tokio::test]
1198 async fn test_temporal_filter() {
1199 let filter = TemporalFilter {
1200 event_types: Some(std::iter::once("TestEvent".to_string()).collect()),
1201 ..Default::default()
1202 };
1203
1204 assert!(filter.event_types.is_some());
1205 assert!(filter.event_types.as_ref().unwrap().contains("TestEvent"));
1206 }
1207}