Skip to main content

oxirs_stream/
stream_versioning.rs

1//! Stream Versioning and Time-Travel Queries
2//!
3//! This module provides comprehensive stream versioning capabilities with
4//! time-travel query support for historical data analysis and replay.
5//!
6//! # Features
7//!
8//! - **Version Management**: Track and manage stream data versions
9//! - **Time-Travel Queries**: Query historical stream states
10//! - **Snapshot Management**: Create and restore point-in-time snapshots
11//! - **Branching**: Create branches for what-if analysis
12//! - **Diff Operations**: Compare versions and generate changesets
13//! - **Retention Policies**: Automatic version cleanup and archival
14
15use serde::{Deserialize, Serialize};
16use std::collections::{BTreeMap, HashMap, VecDeque};
17use std::sync::Arc;
18use std::time::{Duration, Instant, SystemTime};
19use tokio::sync::RwLock;
20
21use crate::error::StreamError;
22
23/// Version identifier type
24pub type VersionId = u64;
25
26/// Branch identifier type
27pub type BranchId = String;
28
29/// Stream versioning configuration
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct VersioningConfig {
32    /// Maximum number of versions to retain
33    pub max_versions: usize,
34    /// Maximum age for version retention
35    pub max_age: Duration,
36    /// Enable automatic snapshots
37    pub auto_snapshot: bool,
38    /// Snapshot interval
39    pub snapshot_interval: Duration,
40    /// Enable compression for old versions
41    pub compress_old_versions: bool,
42    /// Compression threshold (versions older than this get compressed)
43    pub compression_threshold: Duration,
44    /// Enable branching support
45    pub enable_branching: bool,
46    /// Maximum number of branches
47    pub max_branches: usize,
48}
49
50impl Default for VersioningConfig {
51    fn default() -> Self {
52        Self {
53            max_versions: 1000,
54            max_age: Duration::from_secs(86400 * 7), // 7 days
55            auto_snapshot: true,
56            snapshot_interval: Duration::from_secs(3600), // 1 hour
57            compress_old_versions: true,
58            compression_threshold: Duration::from_secs(86400), // 1 day
59            enable_branching: true,
60            max_branches: 10,
61        }
62    }
63}
64
65/// Version metadata
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct VersionMetadata {
68    /// Version ID
69    pub version_id: VersionId,
70    /// Creation timestamp
71    pub created_at: SystemTime,
72    /// Parent version (None for root)
73    pub parent_version: Option<VersionId>,
74    /// Branch this version belongs to
75    pub branch_id: BranchId,
76    /// Version description
77    pub description: String,
78    /// Number of events in this version
79    pub event_count: usize,
80    /// Size in bytes
81    pub size_bytes: usize,
82    /// Is this version compressed
83    pub is_compressed: bool,
84    /// Custom tags
85    pub tags: HashMap<String, String>,
86}
87
88/// A versioned event with metadata
89#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct VersionedEvent<T> {
91    /// The event data
92    pub data: T,
93    /// Version this event was added in
94    pub version_id: VersionId,
95    /// Event timestamp
96    pub timestamp: SystemTime,
97    /// Event sequence number within version
98    pub sequence: u64,
99    /// Is this event deleted in a later version
100    pub is_deleted: bool,
101    /// Version where this event was deleted (if applicable)
102    pub deleted_in_version: Option<VersionId>,
103}
104
105/// A snapshot of stream state at a point in time
106#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct Snapshot<T> {
108    /// Snapshot identifier
109    pub snapshot_id: String,
110    /// Version at snapshot time
111    pub version_id: VersionId,
112    /// Snapshot timestamp
113    pub timestamp: SystemTime,
114    /// Events in the snapshot
115    pub events: Vec<VersionedEvent<T>>,
116    /// Snapshot metadata
117    pub metadata: HashMap<String, String>,
118    /// Size in bytes
119    pub size_bytes: usize,
120}
121
122/// Branch information
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct Branch {
125    /// Branch identifier
126    pub branch_id: BranchId,
127    /// Branch name
128    pub name: String,
129    /// Base version (where branch was created)
130    pub base_version: VersionId,
131    /// Current head version
132    pub head_version: VersionId,
133    /// Creation timestamp
134    pub created_at: SystemTime,
135    /// Last update timestamp
136    pub updated_at: SystemTime,
137    /// Branch description
138    pub description: String,
139    /// Is this the main branch
140    pub is_main: bool,
141}
142
143/// Time-travel query specification
144#[derive(Debug, Clone, Serialize, Deserialize)]
145pub struct TimeTravelQuery {
146    /// Target version or timestamp
147    pub target: TimeTravelTarget,
148    /// Branch to query
149    pub branch_id: Option<BranchId>,
150    /// Filter predicate
151    pub filter: Option<String>,
152    /// Projection fields
153    pub projection: Option<Vec<String>>,
154    /// Result limit
155    pub limit: Option<usize>,
156    /// Include deleted events
157    pub include_deleted: bool,
158}
159
160/// Target for time-travel query
161#[derive(Debug, Clone, Serialize, Deserialize)]
162pub enum TimeTravelTarget {
163    /// Specific version ID
164    Version(VersionId),
165    /// Point in time
166    Timestamp(SystemTime),
167    /// Relative time (e.g., 1 hour ago)
168    RelativeTime(Duration),
169    /// Latest version
170    Latest,
171    /// Specific snapshot
172    Snapshot(String),
173}
174
175/// Diff between two versions
176#[derive(Debug, Clone, Serialize, Deserialize)]
177pub struct VersionDiff<T> {
178    /// Source version
179    pub from_version: VersionId,
180    /// Target version
181    pub to_version: VersionId,
182    /// Added events
183    pub added: Vec<VersionedEvent<T>>,
184    /// Deleted events
185    pub deleted: Vec<VersionedEvent<T>>,
186    /// Modified events (old value, new value)
187    pub modified: Vec<(VersionedEvent<T>, VersionedEvent<T>)>,
188    /// Number of unchanged events
189    pub unchanged_count: usize,
190}
191
192/// Change operation type
193#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
194pub enum ChangeType {
195    /// Event added
196    Add,
197    /// Event deleted
198    Delete,
199    /// Event modified
200    Modify,
201}
202
203/// A single change in a changeset
204#[derive(Debug, Clone, Serialize, Deserialize)]
205pub struct Change<T> {
206    /// Change type
207    pub change_type: ChangeType,
208    /// Event data
209    pub data: T,
210    /// Previous value (for modifications)
211    pub previous: Option<T>,
212    /// Timestamp
213    pub timestamp: SystemTime,
214}
215
216/// Changeset between versions
217#[derive(Debug, Clone, Serialize, Deserialize)]
218pub struct Changeset<T> {
219    /// Source version
220    pub from_version: VersionId,
221    /// Target version
222    pub to_version: VersionId,
223    /// List of changes
224    pub changes: Vec<Change<T>>,
225    /// Creation timestamp
226    pub created_at: SystemTime,
227}
228
229/// Versioning statistics
230#[derive(Debug, Clone, Default, Serialize, Deserialize)]
231pub struct VersioningStats {
232    /// Total number of versions
233    pub total_versions: usize,
234    /// Total number of events across all versions
235    pub total_events: usize,
236    /// Total storage size
237    pub total_size_bytes: usize,
238    /// Number of snapshots
239    pub snapshot_count: usize,
240    /// Number of branches
241    pub branch_count: usize,
242    /// Oldest version timestamp
243    pub oldest_version: Option<SystemTime>,
244    /// Newest version timestamp
245    pub newest_version: Option<SystemTime>,
246    /// Average events per version
247    pub avg_events_per_version: f64,
248    /// Compression ratio
249    pub compression_ratio: f64,
250    /// Number of time-travel queries executed
251    pub time_travel_queries: u64,
252    /// Average query latency
253    pub avg_query_latency_ms: f64,
254}
255
256/// Stream versioning manager
257pub struct StreamVersioning<T>
258where
259    T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static,
260{
261    /// Configuration
262    config: VersioningConfig,
263    /// Current version
264    current_version: Arc<RwLock<VersionId>>,
265    /// Version metadata index
266    versions: Arc<RwLock<BTreeMap<VersionId, VersionMetadata>>>,
267    /// Event storage by version
268    events: Arc<RwLock<HashMap<VersionId, Vec<VersionedEvent<T>>>>>,
269    /// Snapshots
270    snapshots: Arc<RwLock<HashMap<String, Snapshot<T>>>>,
271    /// Branches
272    branches: Arc<RwLock<HashMap<BranchId, Branch>>>,
273    /// Current branch
274    current_branch: Arc<RwLock<BranchId>>,
275    /// Statistics
276    stats: Arc<RwLock<VersioningStats>>,
277    /// Last snapshot time
278    last_snapshot: Arc<RwLock<Instant>>,
279    /// Query latencies for stats
280    query_latencies: Arc<RwLock<VecDeque<f64>>>,
281}
282
283impl<T> StreamVersioning<T>
284where
285    T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static,
286{
287    /// Create a new stream versioning manager
288    pub fn new(config: VersioningConfig) -> Self {
289        let main_branch = Branch {
290            branch_id: "main".to_string(),
291            name: "Main Branch".to_string(),
292            base_version: 0,
293            head_version: 0,
294            created_at: SystemTime::now(),
295            updated_at: SystemTime::now(),
296            description: "Main development branch".to_string(),
297            is_main: true,
298        };
299
300        let mut branches = HashMap::new();
301        branches.insert("main".to_string(), main_branch);
302
303        let initial_version = VersionMetadata {
304            version_id: 0,
305            created_at: SystemTime::now(),
306            parent_version: None,
307            branch_id: "main".to_string(),
308            description: "Initial version".to_string(),
309            event_count: 0,
310            size_bytes: 0,
311            is_compressed: false,
312            tags: HashMap::new(),
313        };
314
315        let mut versions = BTreeMap::new();
316        versions.insert(0, initial_version);
317
318        Self {
319            config,
320            current_version: Arc::new(RwLock::new(0)),
321            versions: Arc::new(RwLock::new(versions)),
322            events: Arc::new(RwLock::new(HashMap::new())),
323            snapshots: Arc::new(RwLock::new(HashMap::new())),
324            branches: Arc::new(RwLock::new(branches)),
325            current_branch: Arc::new(RwLock::new("main".to_string())),
326            stats: Arc::new(RwLock::new(VersioningStats::default())),
327            last_snapshot: Arc::new(RwLock::new(Instant::now())),
328            query_latencies: Arc::new(RwLock::new(VecDeque::with_capacity(1000))),
329        }
330    }
331
332    /// Create a new version with events
333    pub async fn create_version(
334        &self,
335        events: Vec<T>,
336        description: &str,
337    ) -> Result<VersionId, StreamError> {
338        let mut current = self.current_version.write().await;
339        let mut versions = self.versions.write().await;
340        let mut event_storage = self.events.write().await;
341        let mut branches = self.branches.write().await;
342        let current_branch = self.current_branch.read().await.clone();
343
344        let new_version_id = *current + 1;
345
346        // Create versioned events
347        let versioned_events: Vec<VersionedEvent<T>> = events
348            .into_iter()
349            .enumerate()
350            .map(|(i, data)| VersionedEvent {
351                data,
352                version_id: new_version_id,
353                timestamp: SystemTime::now(),
354                sequence: i as u64,
355                is_deleted: false,
356                deleted_in_version: None,
357            })
358            .collect();
359
360        let event_count = versioned_events.len();
361        let size_bytes = self.estimate_size(&versioned_events);
362
363        // Create version metadata
364        let metadata = VersionMetadata {
365            version_id: new_version_id,
366            created_at: SystemTime::now(),
367            parent_version: Some(*current),
368            branch_id: current_branch.clone(),
369            description: description.to_string(),
370            event_count,
371            size_bytes,
372            is_compressed: false,
373            tags: HashMap::new(),
374        };
375
376        // Store version
377        versions.insert(new_version_id, metadata);
378        event_storage.insert(new_version_id, versioned_events);
379
380        // Update branch head
381        if let Some(branch) = branches.get_mut(&current_branch) {
382            branch.head_version = new_version_id;
383            branch.updated_at = SystemTime::now();
384        }
385
386        *current = new_version_id;
387
388        // Update stats
389        self.update_stats_after_create(event_count, size_bytes)
390            .await;
391
392        // Apply retention policy
393        self.apply_retention_policy(&mut versions, &mut event_storage)?;
394
395        // Drop locks before calling maybe_create_auto_snapshot which acquires its own locks
396        drop(versions);
397        drop(event_storage);
398        drop(branches);
399        drop(current);
400
401        // Check if we need to create auto-snapshot
402        if self.config.auto_snapshot {
403            self.maybe_create_auto_snapshot().await?;
404        }
405
406        Ok(new_version_id)
407    }
408
409    /// Add events to current version
410    pub async fn add_events(&self, events: Vec<T>) -> Result<usize, StreamError> {
411        let current = *self.current_version.read().await;
412        let mut event_storage = self.events.write().await;
413
414        let entry = event_storage.entry(current).or_insert_with(Vec::new);
415        let start_sequence = entry.len() as u64;
416
417        let versioned_events: Vec<VersionedEvent<T>> = events
418            .into_iter()
419            .enumerate()
420            .map(|(i, data)| VersionedEvent {
421                data,
422                version_id: current,
423                timestamp: SystemTime::now(),
424                sequence: start_sequence + i as u64,
425                is_deleted: false,
426                deleted_in_version: None,
427            })
428            .collect();
429
430        let count = versioned_events.len();
431        entry.extend(versioned_events);
432
433        // Update version metadata
434        let mut versions = self.versions.write().await;
435        if let Some(metadata) = versions.get_mut(&current) {
436            metadata.event_count = entry.len();
437            metadata.size_bytes = self.estimate_size(entry);
438        }
439
440        Ok(count)
441    }
442
443    /// Execute a time-travel query
444    pub async fn time_travel_query(
445        &self,
446        query: TimeTravelQuery,
447    ) -> Result<Vec<VersionedEvent<T>>, StreamError> {
448        let start = Instant::now();
449
450        let target_version = self.resolve_target(&query.target).await?;
451        let branch_id = query.branch_id.unwrap_or_else(|| "main".to_string());
452
453        // Collect all events up to target version for the branch
454        let versions = self.versions.read().await;
455        let event_storage = self.events.read().await;
456
457        let mut result_events = Vec::new();
458
459        for (version_id, metadata) in versions.iter() {
460            if *version_id > target_version {
461                break;
462            }
463
464            if metadata.branch_id != branch_id {
465                continue;
466            }
467
468            if let Some(events) = event_storage.get(version_id) {
469                for event in events {
470                    // Skip deleted events unless requested
471                    if event.is_deleted && !query.include_deleted {
472                        if let Some(deleted_version) = event.deleted_in_version {
473                            if deleted_version <= target_version {
474                                continue;
475                            }
476                        }
477                    }
478
479                    result_events.push(event.clone());
480                }
481            }
482        }
483
484        // Apply limit
485        if let Some(limit) = query.limit {
486            result_events.truncate(limit);
487        }
488
489        // Record query latency
490        let latency = start.elapsed().as_secs_f64() * 1000.0;
491        self.record_query_latency(latency).await;
492
493        Ok(result_events)
494    }
495
496    /// Get events at a specific version
497    pub async fn get_at_version(
498        &self,
499        version_id: VersionId,
500    ) -> Result<Vec<VersionedEvent<T>>, StreamError> {
501        let query = TimeTravelQuery {
502            target: TimeTravelTarget::Version(version_id),
503            branch_id: None,
504            filter: None,
505            projection: None,
506            limit: None,
507            include_deleted: false,
508        };
509
510        self.time_travel_query(query).await
511    }
512
513    /// Get events at a specific timestamp
514    pub async fn get_at_timestamp(
515        &self,
516        timestamp: SystemTime,
517    ) -> Result<Vec<VersionedEvent<T>>, StreamError> {
518        let query = TimeTravelQuery {
519            target: TimeTravelTarget::Timestamp(timestamp),
520            branch_id: None,
521            filter: None,
522            projection: None,
523            limit: None,
524            include_deleted: false,
525        };
526
527        self.time_travel_query(query).await
528    }
529
530    /// Create a snapshot
531    pub async fn create_snapshot(&self, name: &str) -> Result<String, StreamError> {
532        let current = *self.current_version.read().await;
533        let events = self.get_at_version(current).await?;
534
535        let snapshot_id = format!("{}_{}", name, current);
536        let size_bytes = self.estimate_size(&events);
537
538        let snapshot = Snapshot {
539            snapshot_id: snapshot_id.clone(),
540            version_id: current,
541            timestamp: SystemTime::now(),
542            events,
543            metadata: HashMap::new(),
544            size_bytes,
545        };
546
547        let mut snapshots = self.snapshots.write().await;
548        snapshots.insert(snapshot_id.clone(), snapshot);
549
550        // Update stats
551        let mut stats = self.stats.write().await;
552        stats.snapshot_count += 1;
553
554        Ok(snapshot_id)
555    }
556
557    /// Restore from snapshot
558    pub async fn restore_snapshot(&self, snapshot_id: &str) -> Result<VersionId, StreamError> {
559        let snapshots = self.snapshots.read().await;
560        let snapshot = snapshots
561            .get(snapshot_id)
562            .ok_or_else(|| StreamError::NotFound(format!("Snapshot not found: {}", snapshot_id)))?
563            .clone();
564        drop(snapshots);
565
566        // Create a new version from snapshot
567        let events: Vec<T> = snapshot.events.into_iter().map(|e| e.data).collect();
568
569        self.create_version(events, &format!("Restored from snapshot: {}", snapshot_id))
570            .await
571    }
572
573    /// Create a new branch
574    pub async fn create_branch(
575        &self,
576        name: &str,
577        description: &str,
578    ) -> Result<BranchId, StreamError> {
579        if !self.config.enable_branching {
580            return Err(StreamError::Configuration(
581                "Branching is not enabled".to_string(),
582            ));
583        }
584
585        let mut branches = self.branches.write().await;
586
587        if branches.len() >= self.config.max_branches {
588            return Err(StreamError::ResourceExhausted(
589                "Maximum number of branches reached".to_string(),
590            ));
591        }
592
593        let branch_id = format!("branch_{}", uuid::Uuid::new_v4());
594        let current = *self.current_version.read().await;
595
596        let branch = Branch {
597            branch_id: branch_id.clone(),
598            name: name.to_string(),
599            base_version: current,
600            head_version: current,
601            created_at: SystemTime::now(),
602            updated_at: SystemTime::now(),
603            description: description.to_string(),
604            is_main: false,
605        };
606
607        branches.insert(branch_id.clone(), branch);
608
609        // Update stats
610        let mut stats = self.stats.write().await;
611        stats.branch_count += 1;
612
613        Ok(branch_id)
614    }
615
616    /// Switch to a different branch
617    pub async fn switch_branch(&self, branch_id: &str) -> Result<(), StreamError> {
618        let branches = self.branches.read().await;
619
620        if !branches.contains_key(branch_id) {
621            return Err(StreamError::NotFound(format!(
622                "Branch not found: {}",
623                branch_id
624            )));
625        }
626
627        let head_version = branches
628            .get(branch_id)
629            .expect("branch should exist after contains_key check")
630            .head_version;
631        drop(branches);
632
633        let mut current_branch = self.current_branch.write().await;
634        let mut current_version = self.current_version.write().await;
635
636        *current_branch = branch_id.to_string();
637        *current_version = head_version;
638
639        Ok(())
640    }
641
642    /// Merge a branch into the current branch
643    pub async fn merge_branch(&self, source_branch_id: &str) -> Result<VersionId, StreamError> {
644        let branches = self.branches.read().await;
645        let current_branch_id = self.current_branch.read().await.clone();
646
647        let source_branch = branches
648            .get(source_branch_id)
649            .ok_or_else(|| {
650                StreamError::NotFound(format!("Branch not found: {}", source_branch_id))
651            })?
652            .clone();
653
654        let target_branch = branches
655            .get(&current_branch_id)
656            .ok_or_else(|| {
657                StreamError::NotFound(format!("Branch not found: {}", current_branch_id))
658            })?
659            .clone();
660
661        drop(branches);
662
663        // Get events from source branch since divergence
664        let source_events = self
665            .get_branch_events_since(source_branch_id, source_branch.base_version)
666            .await?;
667
668        // Create merge version
669        let events: Vec<T> = source_events.into_iter().map(|e| e.data).collect();
670
671        self.create_version(
672            events,
673            &format!("Merge {} into {}", source_branch.name, target_branch.name),
674        )
675        .await
676    }
677
678    /// Get diff between two versions
679    pub async fn diff(
680        &self,
681        from_version: VersionId,
682        to_version: VersionId,
683    ) -> Result<VersionDiff<T>, StreamError> {
684        let from_events = self.get_at_version(from_version).await?;
685        let to_events = self.get_at_version(to_version).await?;
686
687        // Build event maps by sequence for comparison
688        let from_map: HashMap<u64, &VersionedEvent<T>> =
689            from_events.iter().map(|e| (e.sequence, e)).collect();
690        let to_map: HashMap<u64, &VersionedEvent<T>> =
691            to_events.iter().map(|e| (e.sequence, e)).collect();
692
693        let mut added = Vec::new();
694        let mut deleted = Vec::new();
695        let mut modified = Vec::new();
696        let mut unchanged_count = 0;
697
698        // Find added and modified
699        for (seq, event) in &to_map {
700            if let Some(from_event) = from_map.get(seq) {
701                // Check if modified (simplified - just compare version)
702                if event.version_id != from_event.version_id {
703                    modified.push(((*from_event).clone(), (*event).clone()));
704                } else {
705                    unchanged_count += 1;
706                }
707            } else {
708                added.push((*event).clone());
709            }
710        }
711
712        // Find deleted
713        for (seq, event) in &from_map {
714            if !to_map.contains_key(seq) {
715                deleted.push((*event).clone());
716            }
717        }
718
719        Ok(VersionDiff {
720            from_version,
721            to_version,
722            added,
723            deleted,
724            modified,
725            unchanged_count,
726        })
727    }
728
729    /// Generate changeset between versions
730    pub async fn generate_changeset(
731        &self,
732        from_version: VersionId,
733        to_version: VersionId,
734    ) -> Result<Changeset<T>, StreamError> {
735        let diff = self.diff(from_version, to_version).await?;
736
737        let mut changes = Vec::new();
738
739        // Add changes for added events
740        for event in diff.added {
741            changes.push(Change {
742                change_type: ChangeType::Add,
743                data: event.data,
744                previous: None,
745                timestamp: event.timestamp,
746            });
747        }
748
749        // Add changes for deleted events
750        for event in diff.deleted {
751            changes.push(Change {
752                change_type: ChangeType::Delete,
753                data: event.data,
754                previous: None,
755                timestamp: SystemTime::now(),
756            });
757        }
758
759        // Add changes for modified events
760        for (old, new) in diff.modified {
761            changes.push(Change {
762                change_type: ChangeType::Modify,
763                data: new.data,
764                previous: Some(old.data),
765                timestamp: new.timestamp,
766            });
767        }
768
769        Ok(Changeset {
770            from_version,
771            to_version,
772            changes,
773            created_at: SystemTime::now(),
774        })
775    }
776
777    /// Get version history
778    pub async fn get_version_history(&self) -> Vec<VersionMetadata> {
779        let versions = self.versions.read().await;
780        versions.values().cloned().collect()
781    }
782
783    /// Get all branches
784    pub async fn get_branches(&self) -> Vec<Branch> {
785        let branches = self.branches.read().await;
786        branches.values().cloned().collect()
787    }
788
789    /// Get current version
790    pub async fn current_version(&self) -> VersionId {
791        *self.current_version.read().await
792    }
793
794    /// Get current branch
795    pub async fn current_branch(&self) -> BranchId {
796        self.current_branch.read().await.clone()
797    }
798
799    /// Get statistics
800    pub async fn get_stats(&self) -> VersioningStats {
801        self.stats.read().await.clone()
802    }
803
804    /// Tag a version
805    pub async fn tag_version(
806        &self,
807        version_id: VersionId,
808        key: &str,
809        value: &str,
810    ) -> Result<(), StreamError> {
811        let mut versions = self.versions.write().await;
812
813        if let Some(metadata) = versions.get_mut(&version_id) {
814            metadata.tags.insert(key.to_string(), value.to_string());
815            Ok(())
816        } else {
817            Err(StreamError::NotFound(format!(
818                "Version not found: {}",
819                version_id
820            )))
821        }
822    }
823
824    /// Find versions by tag
825    pub async fn find_by_tag(&self, key: &str, value: &str) -> Vec<VersionId> {
826        let versions = self.versions.read().await;
827
828        versions
829            .iter()
830            .filter(|(_, m)| m.tags.get(key).map(|v| v == value).unwrap_or(false))
831            .map(|(id, _)| *id)
832            .collect()
833    }
834
835    /// Delete a branch
836    pub async fn delete_branch(&self, branch_id: &str) -> Result<(), StreamError> {
837        let mut branches = self.branches.write().await;
838
839        if let Some(branch) = branches.get(branch_id) {
840            if branch.is_main {
841                return Err(StreamError::InvalidOperation(
842                    "Cannot delete main branch".to_string(),
843                ));
844            }
845        } else {
846            return Err(StreamError::NotFound(format!(
847                "Branch not found: {}",
848                branch_id
849            )));
850        }
851
852        branches.remove(branch_id);
853
854        // Update stats
855        let mut stats = self.stats.write().await;
856        stats.branch_count = stats.branch_count.saturating_sub(1);
857
858        Ok(())
859    }
860
861    /// Compact old versions
862    pub async fn compact(&self) -> Result<usize, StreamError> {
863        let mut event_storage = self.events.write().await;
864        let mut versions = self.versions.write().await;
865
866        let threshold = SystemTime::now() - self.config.compression_threshold;
867        let mut compacted_count = 0;
868
869        for (version_id, metadata) in versions.iter_mut() {
870            if metadata.created_at < threshold && !metadata.is_compressed {
871                // In a real implementation, we would compress the events
872                // For now, we just mark them as compressed
873                metadata.is_compressed = true;
874                compacted_count += 1;
875
876                // Optionally reduce storage (simplified)
877                if let Some(events) = event_storage.get_mut(version_id) {
878                    // In reality, we'd compress the serialized form
879                    metadata.size_bytes = self.estimate_size(events) / 2; // Simulated compression
880                }
881            }
882        }
883
884        Ok(compacted_count)
885    }
886
887    // Private helper methods
888
889    fn resolve_target<'a>(
890        &'a self,
891        target: &'a TimeTravelTarget,
892    ) -> std::pin::Pin<
893        Box<dyn std::future::Future<Output = Result<VersionId, StreamError>> + Send + 'a>,
894    > {
895        Box::pin(async move {
896            match target {
897                TimeTravelTarget::Version(v) => Ok(*v),
898                TimeTravelTarget::Latest => Ok(*self.current_version.read().await),
899                TimeTravelTarget::Timestamp(ts) => {
900                    let versions = self.versions.read().await;
901                    let mut best_version = 0;
902
903                    for (version_id, metadata) in versions.iter() {
904                        if metadata.created_at <= *ts {
905                            best_version = *version_id;
906                        } else {
907                            break;
908                        }
909                    }
910
911                    Ok(best_version)
912                }
913                TimeTravelTarget::RelativeTime(duration) => {
914                    let target_time = SystemTime::now() - *duration;
915                    self.resolve_target(&TimeTravelTarget::Timestamp(target_time))
916                        .await
917                }
918                TimeTravelTarget::Snapshot(snapshot_id) => {
919                    let snapshots = self.snapshots.read().await;
920                    snapshots
921                        .get(snapshot_id)
922                        .map(|s| s.version_id)
923                        .ok_or_else(|| {
924                            StreamError::NotFound(format!("Snapshot not found: {}", snapshot_id))
925                        })
926                }
927            }
928        })
929    }
930
931    async fn get_branch_events_since(
932        &self,
933        branch_id: &str,
934        since_version: VersionId,
935    ) -> Result<Vec<VersionedEvent<T>>, StreamError> {
936        let versions = self.versions.read().await;
937        let event_storage = self.events.read().await;
938
939        let mut result = Vec::new();
940
941        for (version_id, metadata) in versions.iter() {
942            if *version_id <= since_version {
943                continue;
944            }
945
946            if metadata.branch_id != branch_id {
947                continue;
948            }
949
950            if let Some(events) = event_storage.get(version_id) {
951                result.extend(events.clone());
952            }
953        }
954
955        Ok(result)
956    }
957
958    fn estimate_size<S: Serialize>(&self, data: &S) -> usize {
959        // Rough estimate using serialization
960        serde_json::to_vec(data).map(|v| v.len()).unwrap_or(0)
961    }
962
963    async fn update_stats_after_create(&self, event_count: usize, size_bytes: usize) {
964        let mut stats = self.stats.write().await;
965        stats.total_versions += 1;
966        stats.total_events += event_count;
967        stats.total_size_bytes += size_bytes;
968        stats.newest_version = Some(SystemTime::now());
969
970        if stats.oldest_version.is_none() {
971            stats.oldest_version = Some(SystemTime::now());
972        }
973
974        if stats.total_versions > 0 {
975            stats.avg_events_per_version = stats.total_events as f64 / stats.total_versions as f64;
976        }
977    }
978
979    async fn record_query_latency(&self, latency_ms: f64) {
980        let mut latencies = self.query_latencies.write().await;
981        latencies.push_back(latency_ms);
982
983        if latencies.len() > 1000 {
984            latencies.pop_front();
985        }
986
987        // Update stats
988        let mut stats = self.stats.write().await;
989        stats.time_travel_queries += 1;
990
991        if !latencies.is_empty() {
992            stats.avg_query_latency_ms = latencies.iter().sum::<f64>() / latencies.len() as f64;
993        }
994    }
995
996    async fn maybe_create_auto_snapshot(&self) -> Result<(), StreamError> {
997        let last = *self.last_snapshot.read().await;
998
999        if last.elapsed() >= self.config.snapshot_interval {
1000            let current = *self.current_version.read().await;
1001            let name = format!("auto_{}", current);
1002            self.create_snapshot(&name).await?;
1003
1004            let mut last_snapshot = self.last_snapshot.write().await;
1005            *last_snapshot = Instant::now();
1006        }
1007
1008        Ok(())
1009    }
1010
1011    fn apply_retention_policy(
1012        &self,
1013        versions: &mut BTreeMap<VersionId, VersionMetadata>,
1014        event_storage: &mut HashMap<VersionId, Vec<VersionedEvent<T>>>,
1015    ) -> Result<(), StreamError> {
1016        // Check max versions
1017        while versions.len() > self.config.max_versions {
1018            if let Some((&oldest_id, _)) = versions.iter().next() {
1019                // Don't delete version 0
1020                if oldest_id == 0 {
1021                    break;
1022                }
1023
1024                versions.remove(&oldest_id);
1025                event_storage.remove(&oldest_id);
1026            } else {
1027                break;
1028            }
1029        }
1030
1031        // Check max age
1032        let cutoff = SystemTime::now() - self.config.max_age;
1033        let mut to_remove = Vec::new();
1034
1035        for (version_id, metadata) in versions.iter() {
1036            if *version_id == 0 {
1037                continue; // Keep initial version
1038            }
1039
1040            if metadata.created_at < cutoff {
1041                to_remove.push(*version_id);
1042            }
1043        }
1044
1045        for version_id in to_remove {
1046            versions.remove(&version_id);
1047            event_storage.remove(&version_id);
1048        }
1049
1050        Ok(())
1051    }
1052}
1053
1054#[cfg(test)]
1055mod tests {
1056    use super::*;
1057
1058    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1059    struct TestEvent {
1060        id: u64,
1061        value: String,
1062    }
1063
1064    #[tokio::test]
1065    async fn test_create_version() {
1066        let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1067
1068        let events = vec![
1069            TestEvent {
1070                id: 1,
1071                value: "first".to_string(),
1072            },
1073            TestEvent {
1074                id: 2,
1075                value: "second".to_string(),
1076            },
1077        ];
1078
1079        let version_id = versioning
1080            .create_version(events, "Test version")
1081            .await
1082            .unwrap();
1083
1084        assert_eq!(version_id, 1);
1085        assert_eq!(versioning.current_version().await, 1);
1086    }
1087
1088    #[tokio::test]
1089    async fn test_time_travel_query() {
1090        let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1091
1092        // Create version 1
1093        let events1 = vec![TestEvent {
1094            id: 1,
1095            value: "v1".to_string(),
1096        }];
1097        versioning
1098            .create_version(events1, "Version 1")
1099            .await
1100            .unwrap();
1101
1102        // Create version 2
1103        let events2 = vec![TestEvent {
1104            id: 2,
1105            value: "v2".to_string(),
1106        }];
1107        versioning
1108            .create_version(events2, "Version 2")
1109            .await
1110            .unwrap();
1111
1112        // Query version 1
1113        let result = versioning.get_at_version(1).await.unwrap();
1114        assert_eq!(result.len(), 1);
1115        assert_eq!(result[0].data.id, 1);
1116
1117        // Query version 2
1118        let result = versioning.get_at_version(2).await.unwrap();
1119        assert_eq!(result.len(), 2);
1120    }
1121
1122    #[tokio::test]
1123    async fn test_snapshot_create_and_restore() {
1124        let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1125
1126        let events = vec![TestEvent {
1127            id: 1,
1128            value: "snapshot_test".to_string(),
1129        }];
1130        versioning
1131            .create_version(events, "Test version")
1132            .await
1133            .unwrap();
1134
1135        // Create snapshot
1136        let snapshot_id = versioning.create_snapshot("test_snap").await.unwrap();
1137        assert!(snapshot_id.contains("test_snap"));
1138
1139        // Add more events
1140        versioning
1141            .create_version(
1142                vec![TestEvent {
1143                    id: 2,
1144                    value: "after_snap".to_string(),
1145                }],
1146                "After snapshot",
1147            )
1148            .await
1149            .unwrap();
1150
1151        // Restore snapshot
1152        let restored_version = versioning.restore_snapshot(&snapshot_id).await.unwrap();
1153        assert!(restored_version > 0);
1154    }
1155
1156    #[tokio::test]
1157    async fn test_branching() {
1158        let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1159
1160        // Create initial version
1161        versioning
1162            .create_version(
1163                vec![TestEvent {
1164                    id: 1,
1165                    value: "main".to_string(),
1166                }],
1167                "Initial",
1168            )
1169            .await
1170            .unwrap();
1171
1172        // Create branch
1173        let branch_id = versioning
1174            .create_branch("feature", "Feature branch")
1175            .await
1176            .unwrap();
1177
1178        // Switch to branch
1179        versioning.switch_branch(&branch_id).await.unwrap();
1180        assert_eq!(versioning.current_branch().await, branch_id);
1181
1182        // Create version on branch
1183        versioning
1184            .create_version(
1185                vec![TestEvent {
1186                    id: 2,
1187                    value: "feature".to_string(),
1188                }],
1189                "Feature work",
1190            )
1191            .await
1192            .unwrap();
1193
1194        // Switch back to main
1195        versioning.switch_branch("main").await.unwrap();
1196        assert_eq!(versioning.current_branch().await, "main");
1197    }
1198
1199    #[tokio::test]
1200    async fn test_diff() {
1201        let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1202
1203        // Version 1
1204        versioning
1205            .create_version(
1206                vec![TestEvent {
1207                    id: 1,
1208                    value: "v1".to_string(),
1209                }],
1210                "V1",
1211            )
1212            .await
1213            .unwrap();
1214
1215        // Version 2
1216        versioning
1217            .create_version(
1218                vec![
1219                    TestEvent {
1220                        id: 1,
1221                        value: "v1".to_string(),
1222                    },
1223                    TestEvent {
1224                        id: 2,
1225                        value: "v2".to_string(),
1226                    },
1227                ],
1228                "V2",
1229            )
1230            .await
1231            .unwrap();
1232
1233        let diff = versioning.diff(1, 2).await.unwrap();
1234        assert_eq!(diff.from_version, 1);
1235        assert_eq!(diff.to_version, 2);
1236        assert!(!diff.added.is_empty());
1237    }
1238
1239    #[tokio::test]
1240    async fn test_changeset() {
1241        let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1242
1243        versioning
1244            .create_version(
1245                vec![TestEvent {
1246                    id: 1,
1247                    value: "initial".to_string(),
1248                }],
1249                "Initial",
1250            )
1251            .await
1252            .unwrap();
1253
1254        versioning
1255            .create_version(
1256                vec![
1257                    TestEvent {
1258                        id: 1,
1259                        value: "initial".to_string(),
1260                    },
1261                    TestEvent {
1262                        id: 2,
1263                        value: "added".to_string(),
1264                    },
1265                ],
1266                "Added",
1267            )
1268            .await
1269            .unwrap();
1270
1271        let changeset = versioning.generate_changeset(1, 2).await.unwrap();
1272        assert!(!changeset.changes.is_empty());
1273
1274        let add_changes: Vec<_> = changeset
1275            .changes
1276            .iter()
1277            .filter(|c| c.change_type == ChangeType::Add)
1278            .collect();
1279        assert!(!add_changes.is_empty());
1280    }
1281
1282    #[tokio::test]
1283    async fn test_tagging() {
1284        let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1285
1286        let version_id = versioning
1287            .create_version(
1288                vec![TestEvent {
1289                    id: 1,
1290                    value: "tagged".to_string(),
1291                }],
1292                "Tagged version",
1293            )
1294            .await
1295            .unwrap();
1296
1297        versioning
1298            .tag_version(version_id, "release", "v1.0.0")
1299            .await
1300            .unwrap();
1301
1302        let found = versioning.find_by_tag("release", "v1.0.0").await;
1303        assert!(found.contains(&version_id));
1304    }
1305
1306    #[tokio::test]
1307    async fn test_relative_time_query() {
1308        let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1309
1310        versioning
1311            .create_version(
1312                vec![TestEvent {
1313                    id: 1,
1314                    value: "recent".to_string(),
1315                }],
1316                "Recent",
1317            )
1318            .await
1319            .unwrap();
1320
1321        // Query with relative time
1322        let query = TimeTravelQuery {
1323            target: TimeTravelTarget::RelativeTime(Duration::from_secs(0)),
1324            branch_id: None,
1325            filter: None,
1326            projection: None,
1327            limit: None,
1328            include_deleted: false,
1329        };
1330
1331        let result = versioning.time_travel_query(query).await.unwrap();
1332        assert!(!result.is_empty());
1333    }
1334
1335    #[tokio::test]
1336    async fn test_stats() {
1337        let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1338
1339        versioning
1340            .create_version(
1341                vec![TestEvent {
1342                    id: 1,
1343                    value: "test".to_string(),
1344                }],
1345                "Test",
1346            )
1347            .await
1348            .unwrap();
1349
1350        let stats = versioning.get_stats().await;
1351        assert!(stats.total_versions >= 1);
1352        assert!(stats.total_events >= 1);
1353    }
1354
1355    #[tokio::test]
1356    async fn test_compact() {
1357        let config = VersioningConfig {
1358            compression_threshold: Duration::from_secs(0), // Immediate compression
1359            ..Default::default()
1360        };
1361
1362        let versioning = StreamVersioning::<TestEvent>::new(config);
1363
1364        versioning
1365            .create_version(
1366                vec![TestEvent {
1367                    id: 1,
1368                    value: "compact_test".to_string(),
1369                }],
1370                "To compact",
1371            )
1372            .await
1373            .unwrap();
1374
1375        let _compacted = versioning.compact().await.unwrap();
1376        // Successfully compacted (number of events retained)
1377    }
1378
1379    #[tokio::test]
1380    async fn test_delete_branch() {
1381        let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1382
1383        let branch_id = versioning
1384            .create_branch("to_delete", "Will be deleted")
1385            .await
1386            .unwrap();
1387
1388        versioning.delete_branch(&branch_id).await.unwrap();
1389
1390        let branches = versioning.get_branches().await;
1391        assert!(!branches.iter().any(|b| b.branch_id == branch_id));
1392    }
1393
1394    #[tokio::test]
1395    async fn test_cannot_delete_main_branch() {
1396        let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1397
1398        let result = versioning.delete_branch("main").await;
1399        assert!(result.is_err());
1400    }
1401    #[tokio::test]
1402    async fn test_retention_policy_concurrency() {
1403        // Configure to trigger retention policy immediately
1404        let config = VersioningConfig {
1405            max_versions: 1,
1406            ..Default::default()
1407        };
1408        let versioning = StreamVersioning::<TestEvent>::new(config);
1409
1410        // Create version 1
1411        versioning.create_version(vec![], "v1").await.unwrap();
1412
1413        // Create version 2 - this should trigger retention policy and deadlock
1414        // because create_version holds locks and calls apply_retention_policy which wants locks
1415        versioning.create_version(vec![], "v2").await.unwrap();
1416    }
1417}