oxirs_stream/
stream_versioning.rs

1//! Stream Versioning and Time-Travel Queries
2//!
3//! This module provides comprehensive stream versioning capabilities with
4//! time-travel query support for historical data analysis and replay.
5//!
6//! # Features
7//!
8//! - **Version Management**: Track and manage stream data versions
9//! - **Time-Travel Queries**: Query historical stream states
10//! - **Snapshot Management**: Create and restore point-in-time snapshots
11//! - **Branching**: Create branches for what-if analysis
12//! - **Diff Operations**: Compare versions and generate changesets
13//! - **Retention Policies**: Automatic version cleanup and archival
14
15use serde::{Deserialize, Serialize};
16use std::collections::{BTreeMap, HashMap, VecDeque};
17use std::sync::Arc;
18use std::time::{Duration, Instant, SystemTime};
19use tokio::sync::RwLock;
20
21use crate::error::StreamError;
22
23/// Version identifier type
24pub type VersionId = u64;
25
26/// Branch identifier type
27pub type BranchId = String;
28
29/// Stream versioning configuration
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct VersioningConfig {
32    /// Maximum number of versions to retain
33    pub max_versions: usize,
34    /// Maximum age for version retention
35    pub max_age: Duration,
36    /// Enable automatic snapshots
37    pub auto_snapshot: bool,
38    /// Snapshot interval
39    pub snapshot_interval: Duration,
40    /// Enable compression for old versions
41    pub compress_old_versions: bool,
42    /// Compression threshold (versions older than this get compressed)
43    pub compression_threshold: Duration,
44    /// Enable branching support
45    pub enable_branching: bool,
46    /// Maximum number of branches
47    pub max_branches: usize,
48}
49
50impl Default for VersioningConfig {
51    fn default() -> Self {
52        Self {
53            max_versions: 1000,
54            max_age: Duration::from_secs(86400 * 7), // 7 days
55            auto_snapshot: true,
56            snapshot_interval: Duration::from_secs(3600), // 1 hour
57            compress_old_versions: true,
58            compression_threshold: Duration::from_secs(86400), // 1 day
59            enable_branching: true,
60            max_branches: 10,
61        }
62    }
63}
64
65/// Version metadata
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct VersionMetadata {
68    /// Version ID
69    pub version_id: VersionId,
70    /// Creation timestamp
71    pub created_at: SystemTime,
72    /// Parent version (None for root)
73    pub parent_version: Option<VersionId>,
74    /// Branch this version belongs to
75    pub branch_id: BranchId,
76    /// Version description
77    pub description: String,
78    /// Number of events in this version
79    pub event_count: usize,
80    /// Size in bytes
81    pub size_bytes: usize,
82    /// Is this version compressed
83    pub is_compressed: bool,
84    /// Custom tags
85    pub tags: HashMap<String, String>,
86}
87
88/// A versioned event with metadata
89#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct VersionedEvent<T> {
91    /// The event data
92    pub data: T,
93    /// Version this event was added in
94    pub version_id: VersionId,
95    /// Event timestamp
96    pub timestamp: SystemTime,
97    /// Event sequence number within version
98    pub sequence: u64,
99    /// Is this event deleted in a later version
100    pub is_deleted: bool,
101    /// Version where this event was deleted (if applicable)
102    pub deleted_in_version: Option<VersionId>,
103}
104
105/// A snapshot of stream state at a point in time
106#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct Snapshot<T> {
108    /// Snapshot identifier
109    pub snapshot_id: String,
110    /// Version at snapshot time
111    pub version_id: VersionId,
112    /// Snapshot timestamp
113    pub timestamp: SystemTime,
114    /// Events in the snapshot
115    pub events: Vec<VersionedEvent<T>>,
116    /// Snapshot metadata
117    pub metadata: HashMap<String, String>,
118    /// Size in bytes
119    pub size_bytes: usize,
120}
121
122/// Branch information
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct Branch {
125    /// Branch identifier
126    pub branch_id: BranchId,
127    /// Branch name
128    pub name: String,
129    /// Base version (where branch was created)
130    pub base_version: VersionId,
131    /// Current head version
132    pub head_version: VersionId,
133    /// Creation timestamp
134    pub created_at: SystemTime,
135    /// Last update timestamp
136    pub updated_at: SystemTime,
137    /// Branch description
138    pub description: String,
139    /// Is this the main branch
140    pub is_main: bool,
141}
142
143/// Time-travel query specification
144#[derive(Debug, Clone, Serialize, Deserialize)]
145pub struct TimeTravelQuery {
146    /// Target version or timestamp
147    pub target: TimeTravelTarget,
148    /// Branch to query
149    pub branch_id: Option<BranchId>,
150    /// Filter predicate
151    pub filter: Option<String>,
152    /// Projection fields
153    pub projection: Option<Vec<String>>,
154    /// Result limit
155    pub limit: Option<usize>,
156    /// Include deleted events
157    pub include_deleted: bool,
158}
159
160/// Target for time-travel query
161#[derive(Debug, Clone, Serialize, Deserialize)]
162pub enum TimeTravelTarget {
163    /// Specific version ID
164    Version(VersionId),
165    /// Point in time
166    Timestamp(SystemTime),
167    /// Relative time (e.g., 1 hour ago)
168    RelativeTime(Duration),
169    /// Latest version
170    Latest,
171    /// Specific snapshot
172    Snapshot(String),
173}
174
175/// Diff between two versions
176#[derive(Debug, Clone, Serialize, Deserialize)]
177pub struct VersionDiff<T> {
178    /// Source version
179    pub from_version: VersionId,
180    /// Target version
181    pub to_version: VersionId,
182    /// Added events
183    pub added: Vec<VersionedEvent<T>>,
184    /// Deleted events
185    pub deleted: Vec<VersionedEvent<T>>,
186    /// Modified events (old value, new value)
187    pub modified: Vec<(VersionedEvent<T>, VersionedEvent<T>)>,
188    /// Number of unchanged events
189    pub unchanged_count: usize,
190}
191
192/// Change operation type
193#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
194pub enum ChangeType {
195    /// Event added
196    Add,
197    /// Event deleted
198    Delete,
199    /// Event modified
200    Modify,
201}
202
203/// A single change in a changeset
204#[derive(Debug, Clone, Serialize, Deserialize)]
205pub struct Change<T> {
206    /// Change type
207    pub change_type: ChangeType,
208    /// Event data
209    pub data: T,
210    /// Previous value (for modifications)
211    pub previous: Option<T>,
212    /// Timestamp
213    pub timestamp: SystemTime,
214}
215
216/// Changeset between versions
217#[derive(Debug, Clone, Serialize, Deserialize)]
218pub struct Changeset<T> {
219    /// Source version
220    pub from_version: VersionId,
221    /// Target version
222    pub to_version: VersionId,
223    /// List of changes
224    pub changes: Vec<Change<T>>,
225    /// Creation timestamp
226    pub created_at: SystemTime,
227}
228
229/// Versioning statistics
230#[derive(Debug, Clone, Default, Serialize, Deserialize)]
231pub struct VersioningStats {
232    /// Total number of versions
233    pub total_versions: usize,
234    /// Total number of events across all versions
235    pub total_events: usize,
236    /// Total storage size
237    pub total_size_bytes: usize,
238    /// Number of snapshots
239    pub snapshot_count: usize,
240    /// Number of branches
241    pub branch_count: usize,
242    /// Oldest version timestamp
243    pub oldest_version: Option<SystemTime>,
244    /// Newest version timestamp
245    pub newest_version: Option<SystemTime>,
246    /// Average events per version
247    pub avg_events_per_version: f64,
248    /// Compression ratio
249    pub compression_ratio: f64,
250    /// Number of time-travel queries executed
251    pub time_travel_queries: u64,
252    /// Average query latency
253    pub avg_query_latency_ms: f64,
254}
255
256/// Stream versioning manager
257pub struct StreamVersioning<T>
258where
259    T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static,
260{
261    /// Configuration
262    config: VersioningConfig,
263    /// Current version
264    current_version: Arc<RwLock<VersionId>>,
265    /// Version metadata index
266    versions: Arc<RwLock<BTreeMap<VersionId, VersionMetadata>>>,
267    /// Event storage by version
268    events: Arc<RwLock<HashMap<VersionId, Vec<VersionedEvent<T>>>>>,
269    /// Snapshots
270    snapshots: Arc<RwLock<HashMap<String, Snapshot<T>>>>,
271    /// Branches
272    branches: Arc<RwLock<HashMap<BranchId, Branch>>>,
273    /// Current branch
274    current_branch: Arc<RwLock<BranchId>>,
275    /// Statistics
276    stats: Arc<RwLock<VersioningStats>>,
277    /// Last snapshot time
278    last_snapshot: Arc<RwLock<Instant>>,
279    /// Query latencies for stats
280    query_latencies: Arc<RwLock<VecDeque<f64>>>,
281}
282
283impl<T> StreamVersioning<T>
284where
285    T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static,
286{
287    /// Create a new stream versioning manager
288    pub fn new(config: VersioningConfig) -> Self {
289        let main_branch = Branch {
290            branch_id: "main".to_string(),
291            name: "Main Branch".to_string(),
292            base_version: 0,
293            head_version: 0,
294            created_at: SystemTime::now(),
295            updated_at: SystemTime::now(),
296            description: "Main development branch".to_string(),
297            is_main: true,
298        };
299
300        let mut branches = HashMap::new();
301        branches.insert("main".to_string(), main_branch);
302
303        let initial_version = VersionMetadata {
304            version_id: 0,
305            created_at: SystemTime::now(),
306            parent_version: None,
307            branch_id: "main".to_string(),
308            description: "Initial version".to_string(),
309            event_count: 0,
310            size_bytes: 0,
311            is_compressed: false,
312            tags: HashMap::new(),
313        };
314
315        let mut versions = BTreeMap::new();
316        versions.insert(0, initial_version);
317
318        Self {
319            config,
320            current_version: Arc::new(RwLock::new(0)),
321            versions: Arc::new(RwLock::new(versions)),
322            events: Arc::new(RwLock::new(HashMap::new())),
323            snapshots: Arc::new(RwLock::new(HashMap::new())),
324            branches: Arc::new(RwLock::new(branches)),
325            current_branch: Arc::new(RwLock::new("main".to_string())),
326            stats: Arc::new(RwLock::new(VersioningStats::default())),
327            last_snapshot: Arc::new(RwLock::new(Instant::now())),
328            query_latencies: Arc::new(RwLock::new(VecDeque::with_capacity(1000))),
329        }
330    }
331
332    /// Create a new version with events
333    pub async fn create_version(
334        &self,
335        events: Vec<T>,
336        description: &str,
337    ) -> Result<VersionId, StreamError> {
338        let mut current = self.current_version.write().await;
339        let mut versions = self.versions.write().await;
340        let mut event_storage = self.events.write().await;
341        let mut branches = self.branches.write().await;
342        let current_branch = self.current_branch.read().await.clone();
343
344        let new_version_id = *current + 1;
345
346        // Create versioned events
347        let versioned_events: Vec<VersionedEvent<T>> = events
348            .into_iter()
349            .enumerate()
350            .map(|(i, data)| VersionedEvent {
351                data,
352                version_id: new_version_id,
353                timestamp: SystemTime::now(),
354                sequence: i as u64,
355                is_deleted: false,
356                deleted_in_version: None,
357            })
358            .collect();
359
360        let event_count = versioned_events.len();
361        let size_bytes = self.estimate_size(&versioned_events);
362
363        // Create version metadata
364        let metadata = VersionMetadata {
365            version_id: new_version_id,
366            created_at: SystemTime::now(),
367            parent_version: Some(*current),
368            branch_id: current_branch.clone(),
369            description: description.to_string(),
370            event_count,
371            size_bytes,
372            is_compressed: false,
373            tags: HashMap::new(),
374        };
375
376        // Store version
377        versions.insert(new_version_id, metadata);
378        event_storage.insert(new_version_id, versioned_events);
379
380        // Update branch head
381        if let Some(branch) = branches.get_mut(&current_branch) {
382            branch.head_version = new_version_id;
383            branch.updated_at = SystemTime::now();
384        }
385
386        *current = new_version_id;
387
388        // Update stats
389        self.update_stats_after_create(event_count, size_bytes)
390            .await;
391
392        // Apply retention policy
393        self.apply_retention_policy(&mut versions, &mut event_storage)?;
394
395        // Drop locks before calling maybe_create_auto_snapshot which acquires its own locks
396        drop(versions);
397        drop(event_storage);
398        drop(branches);
399        drop(current);
400
401        // Check if we need to create auto-snapshot
402        if self.config.auto_snapshot {
403            self.maybe_create_auto_snapshot().await?;
404        }
405
406        Ok(new_version_id)
407    }
408
409    /// Add events to current version
410    pub async fn add_events(&self, events: Vec<T>) -> Result<usize, StreamError> {
411        let current = *self.current_version.read().await;
412        let mut event_storage = self.events.write().await;
413
414        let entry = event_storage.entry(current).or_insert_with(Vec::new);
415        let start_sequence = entry.len() as u64;
416
417        let versioned_events: Vec<VersionedEvent<T>> = events
418            .into_iter()
419            .enumerate()
420            .map(|(i, data)| VersionedEvent {
421                data,
422                version_id: current,
423                timestamp: SystemTime::now(),
424                sequence: start_sequence + i as u64,
425                is_deleted: false,
426                deleted_in_version: None,
427            })
428            .collect();
429
430        let count = versioned_events.len();
431        entry.extend(versioned_events);
432
433        // Update version metadata
434        let mut versions = self.versions.write().await;
435        if let Some(metadata) = versions.get_mut(&current) {
436            metadata.event_count = entry.len();
437            metadata.size_bytes = self.estimate_size(entry);
438        }
439
440        Ok(count)
441    }
442
443    /// Execute a time-travel query
444    pub async fn time_travel_query(
445        &self,
446        query: TimeTravelQuery,
447    ) -> Result<Vec<VersionedEvent<T>>, StreamError> {
448        let start = Instant::now();
449
450        let target_version = self.resolve_target(&query.target).await?;
451        let branch_id = query.branch_id.unwrap_or_else(|| "main".to_string());
452
453        // Collect all events up to target version for the branch
454        let versions = self.versions.read().await;
455        let event_storage = self.events.read().await;
456
457        let mut result_events = Vec::new();
458
459        for (version_id, metadata) in versions.iter() {
460            if *version_id > target_version {
461                break;
462            }
463
464            if metadata.branch_id != branch_id {
465                continue;
466            }
467
468            if let Some(events) = event_storage.get(version_id) {
469                for event in events {
470                    // Skip deleted events unless requested
471                    if event.is_deleted && !query.include_deleted {
472                        if let Some(deleted_version) = event.deleted_in_version {
473                            if deleted_version <= target_version {
474                                continue;
475                            }
476                        }
477                    }
478
479                    result_events.push(event.clone());
480                }
481            }
482        }
483
484        // Apply limit
485        if let Some(limit) = query.limit {
486            result_events.truncate(limit);
487        }
488
489        // Record query latency
490        let latency = start.elapsed().as_secs_f64() * 1000.0;
491        self.record_query_latency(latency).await;
492
493        Ok(result_events)
494    }
495
496    /// Get events at a specific version
497    pub async fn get_at_version(
498        &self,
499        version_id: VersionId,
500    ) -> Result<Vec<VersionedEvent<T>>, StreamError> {
501        let query = TimeTravelQuery {
502            target: TimeTravelTarget::Version(version_id),
503            branch_id: None,
504            filter: None,
505            projection: None,
506            limit: None,
507            include_deleted: false,
508        };
509
510        self.time_travel_query(query).await
511    }
512
513    /// Get events at a specific timestamp
514    pub async fn get_at_timestamp(
515        &self,
516        timestamp: SystemTime,
517    ) -> Result<Vec<VersionedEvent<T>>, StreamError> {
518        let query = TimeTravelQuery {
519            target: TimeTravelTarget::Timestamp(timestamp),
520            branch_id: None,
521            filter: None,
522            projection: None,
523            limit: None,
524            include_deleted: false,
525        };
526
527        self.time_travel_query(query).await
528    }
529
530    /// Create a snapshot
531    pub async fn create_snapshot(&self, name: &str) -> Result<String, StreamError> {
532        let current = *self.current_version.read().await;
533        let events = self.get_at_version(current).await?;
534
535        let snapshot_id = format!("{}_{}", name, current);
536        let size_bytes = self.estimate_size(&events);
537
538        let snapshot = Snapshot {
539            snapshot_id: snapshot_id.clone(),
540            version_id: current,
541            timestamp: SystemTime::now(),
542            events,
543            metadata: HashMap::new(),
544            size_bytes,
545        };
546
547        let mut snapshots = self.snapshots.write().await;
548        snapshots.insert(snapshot_id.clone(), snapshot);
549
550        // Update stats
551        let mut stats = self.stats.write().await;
552        stats.snapshot_count += 1;
553
554        Ok(snapshot_id)
555    }
556
557    /// Restore from snapshot
558    pub async fn restore_snapshot(&self, snapshot_id: &str) -> Result<VersionId, StreamError> {
559        let snapshots = self.snapshots.read().await;
560        let snapshot = snapshots
561            .get(snapshot_id)
562            .ok_or_else(|| StreamError::NotFound(format!("Snapshot not found: {}", snapshot_id)))?
563            .clone();
564        drop(snapshots);
565
566        // Create a new version from snapshot
567        let events: Vec<T> = snapshot.events.into_iter().map(|e| e.data).collect();
568
569        self.create_version(events, &format!("Restored from snapshot: {}", snapshot_id))
570            .await
571    }
572
573    /// Create a new branch
574    pub async fn create_branch(
575        &self,
576        name: &str,
577        description: &str,
578    ) -> Result<BranchId, StreamError> {
579        if !self.config.enable_branching {
580            return Err(StreamError::Configuration(
581                "Branching is not enabled".to_string(),
582            ));
583        }
584
585        let mut branches = self.branches.write().await;
586
587        if branches.len() >= self.config.max_branches {
588            return Err(StreamError::ResourceExhausted(
589                "Maximum number of branches reached".to_string(),
590            ));
591        }
592
593        let branch_id = format!("branch_{}", uuid::Uuid::new_v4());
594        let current = *self.current_version.read().await;
595
596        let branch = Branch {
597            branch_id: branch_id.clone(),
598            name: name.to_string(),
599            base_version: current,
600            head_version: current,
601            created_at: SystemTime::now(),
602            updated_at: SystemTime::now(),
603            description: description.to_string(),
604            is_main: false,
605        };
606
607        branches.insert(branch_id.clone(), branch);
608
609        // Update stats
610        let mut stats = self.stats.write().await;
611        stats.branch_count += 1;
612
613        Ok(branch_id)
614    }
615
616    /// Switch to a different branch
617    pub async fn switch_branch(&self, branch_id: &str) -> Result<(), StreamError> {
618        let branches = self.branches.read().await;
619
620        if !branches.contains_key(branch_id) {
621            return Err(StreamError::NotFound(format!(
622                "Branch not found: {}",
623                branch_id
624            )));
625        }
626
627        let head_version = branches.get(branch_id).unwrap().head_version;
628        drop(branches);
629
630        let mut current_branch = self.current_branch.write().await;
631        let mut current_version = self.current_version.write().await;
632
633        *current_branch = branch_id.to_string();
634        *current_version = head_version;
635
636        Ok(())
637    }
638
639    /// Merge a branch into the current branch
640    pub async fn merge_branch(&self, source_branch_id: &str) -> Result<VersionId, StreamError> {
641        let branches = self.branches.read().await;
642        let current_branch_id = self.current_branch.read().await.clone();
643
644        let source_branch = branches
645            .get(source_branch_id)
646            .ok_or_else(|| {
647                StreamError::NotFound(format!("Branch not found: {}", source_branch_id))
648            })?
649            .clone();
650
651        let target_branch = branches
652            .get(&current_branch_id)
653            .ok_or_else(|| {
654                StreamError::NotFound(format!("Branch not found: {}", current_branch_id))
655            })?
656            .clone();
657
658        drop(branches);
659
660        // Get events from source branch since divergence
661        let source_events = self
662            .get_branch_events_since(source_branch_id, source_branch.base_version)
663            .await?;
664
665        // Create merge version
666        let events: Vec<T> = source_events.into_iter().map(|e| e.data).collect();
667
668        self.create_version(
669            events,
670            &format!("Merge {} into {}", source_branch.name, target_branch.name),
671        )
672        .await
673    }
674
675    /// Get diff between two versions
676    pub async fn diff(
677        &self,
678        from_version: VersionId,
679        to_version: VersionId,
680    ) -> Result<VersionDiff<T>, StreamError> {
681        let from_events = self.get_at_version(from_version).await?;
682        let to_events = self.get_at_version(to_version).await?;
683
684        // Build event maps by sequence for comparison
685        let from_map: HashMap<u64, &VersionedEvent<T>> =
686            from_events.iter().map(|e| (e.sequence, e)).collect();
687        let to_map: HashMap<u64, &VersionedEvent<T>> =
688            to_events.iter().map(|e| (e.sequence, e)).collect();
689
690        let mut added = Vec::new();
691        let mut deleted = Vec::new();
692        let mut modified = Vec::new();
693        let mut unchanged_count = 0;
694
695        // Find added and modified
696        for (seq, event) in &to_map {
697            if let Some(from_event) = from_map.get(seq) {
698                // Check if modified (simplified - just compare version)
699                if event.version_id != from_event.version_id {
700                    modified.push(((*from_event).clone(), (*event).clone()));
701                } else {
702                    unchanged_count += 1;
703                }
704            } else {
705                added.push((*event).clone());
706            }
707        }
708
709        // Find deleted
710        for (seq, event) in &from_map {
711            if !to_map.contains_key(seq) {
712                deleted.push((*event).clone());
713            }
714        }
715
716        Ok(VersionDiff {
717            from_version,
718            to_version,
719            added,
720            deleted,
721            modified,
722            unchanged_count,
723        })
724    }
725
726    /// Generate changeset between versions
727    pub async fn generate_changeset(
728        &self,
729        from_version: VersionId,
730        to_version: VersionId,
731    ) -> Result<Changeset<T>, StreamError> {
732        let diff = self.diff(from_version, to_version).await?;
733
734        let mut changes = Vec::new();
735
736        // Add changes for added events
737        for event in diff.added {
738            changes.push(Change {
739                change_type: ChangeType::Add,
740                data: event.data,
741                previous: None,
742                timestamp: event.timestamp,
743            });
744        }
745
746        // Add changes for deleted events
747        for event in diff.deleted {
748            changes.push(Change {
749                change_type: ChangeType::Delete,
750                data: event.data,
751                previous: None,
752                timestamp: SystemTime::now(),
753            });
754        }
755
756        // Add changes for modified events
757        for (old, new) in diff.modified {
758            changes.push(Change {
759                change_type: ChangeType::Modify,
760                data: new.data,
761                previous: Some(old.data),
762                timestamp: new.timestamp,
763            });
764        }
765
766        Ok(Changeset {
767            from_version,
768            to_version,
769            changes,
770            created_at: SystemTime::now(),
771        })
772    }
773
774    /// Get version history
775    pub async fn get_version_history(&self) -> Vec<VersionMetadata> {
776        let versions = self.versions.read().await;
777        versions.values().cloned().collect()
778    }
779
780    /// Get all branches
781    pub async fn get_branches(&self) -> Vec<Branch> {
782        let branches = self.branches.read().await;
783        branches.values().cloned().collect()
784    }
785
786    /// Get current version
787    pub async fn current_version(&self) -> VersionId {
788        *self.current_version.read().await
789    }
790
791    /// Get current branch
792    pub async fn current_branch(&self) -> BranchId {
793        self.current_branch.read().await.clone()
794    }
795
796    /// Get statistics
797    pub async fn get_stats(&self) -> VersioningStats {
798        self.stats.read().await.clone()
799    }
800
801    /// Tag a version
802    pub async fn tag_version(
803        &self,
804        version_id: VersionId,
805        key: &str,
806        value: &str,
807    ) -> Result<(), StreamError> {
808        let mut versions = self.versions.write().await;
809
810        if let Some(metadata) = versions.get_mut(&version_id) {
811            metadata.tags.insert(key.to_string(), value.to_string());
812            Ok(())
813        } else {
814            Err(StreamError::NotFound(format!(
815                "Version not found: {}",
816                version_id
817            )))
818        }
819    }
820
821    /// Find versions by tag
822    pub async fn find_by_tag(&self, key: &str, value: &str) -> Vec<VersionId> {
823        let versions = self.versions.read().await;
824
825        versions
826            .iter()
827            .filter(|(_, m)| m.tags.get(key).map(|v| v == value).unwrap_or(false))
828            .map(|(id, _)| *id)
829            .collect()
830    }
831
832    /// Delete a branch
833    pub async fn delete_branch(&self, branch_id: &str) -> Result<(), StreamError> {
834        let mut branches = self.branches.write().await;
835
836        if let Some(branch) = branches.get(branch_id) {
837            if branch.is_main {
838                return Err(StreamError::InvalidOperation(
839                    "Cannot delete main branch".to_string(),
840                ));
841            }
842        } else {
843            return Err(StreamError::NotFound(format!(
844                "Branch not found: {}",
845                branch_id
846            )));
847        }
848
849        branches.remove(branch_id);
850
851        // Update stats
852        let mut stats = self.stats.write().await;
853        stats.branch_count = stats.branch_count.saturating_sub(1);
854
855        Ok(())
856    }
857
858    /// Compact old versions
859    pub async fn compact(&self) -> Result<usize, StreamError> {
860        let mut event_storage = self.events.write().await;
861        let mut versions = self.versions.write().await;
862
863        let threshold = SystemTime::now() - self.config.compression_threshold;
864        let mut compacted_count = 0;
865
866        for (version_id, metadata) in versions.iter_mut() {
867            if metadata.created_at < threshold && !metadata.is_compressed {
868                // In a real implementation, we would compress the events
869                // For now, we just mark them as compressed
870                metadata.is_compressed = true;
871                compacted_count += 1;
872
873                // Optionally reduce storage (simplified)
874                if let Some(events) = event_storage.get_mut(version_id) {
875                    // In reality, we'd compress the serialized form
876                    metadata.size_bytes = self.estimate_size(events) / 2; // Simulated compression
877                }
878            }
879        }
880
881        Ok(compacted_count)
882    }
883
884    // Private helper methods
885
886    fn resolve_target<'a>(
887        &'a self,
888        target: &'a TimeTravelTarget,
889    ) -> std::pin::Pin<
890        Box<dyn std::future::Future<Output = Result<VersionId, StreamError>> + Send + 'a>,
891    > {
892        Box::pin(async move {
893            match target {
894                TimeTravelTarget::Version(v) => Ok(*v),
895                TimeTravelTarget::Latest => Ok(*self.current_version.read().await),
896                TimeTravelTarget::Timestamp(ts) => {
897                    let versions = self.versions.read().await;
898                    let mut best_version = 0;
899
900                    for (version_id, metadata) in versions.iter() {
901                        if metadata.created_at <= *ts {
902                            best_version = *version_id;
903                        } else {
904                            break;
905                        }
906                    }
907
908                    Ok(best_version)
909                }
910                TimeTravelTarget::RelativeTime(duration) => {
911                    let target_time = SystemTime::now() - *duration;
912                    self.resolve_target(&TimeTravelTarget::Timestamp(target_time))
913                        .await
914                }
915                TimeTravelTarget::Snapshot(snapshot_id) => {
916                    let snapshots = self.snapshots.read().await;
917                    snapshots
918                        .get(snapshot_id)
919                        .map(|s| s.version_id)
920                        .ok_or_else(|| {
921                            StreamError::NotFound(format!("Snapshot not found: {}", snapshot_id))
922                        })
923                }
924            }
925        })
926    }
927
928    async fn get_branch_events_since(
929        &self,
930        branch_id: &str,
931        since_version: VersionId,
932    ) -> Result<Vec<VersionedEvent<T>>, StreamError> {
933        let versions = self.versions.read().await;
934        let event_storage = self.events.read().await;
935
936        let mut result = Vec::new();
937
938        for (version_id, metadata) in versions.iter() {
939            if *version_id <= since_version {
940                continue;
941            }
942
943            if metadata.branch_id != branch_id {
944                continue;
945            }
946
947            if let Some(events) = event_storage.get(version_id) {
948                result.extend(events.clone());
949            }
950        }
951
952        Ok(result)
953    }
954
955    fn estimate_size<S: Serialize>(&self, data: &S) -> usize {
956        // Rough estimate using serialization
957        serde_json::to_vec(data).map(|v| v.len()).unwrap_or(0)
958    }
959
960    async fn update_stats_after_create(&self, event_count: usize, size_bytes: usize) {
961        let mut stats = self.stats.write().await;
962        stats.total_versions += 1;
963        stats.total_events += event_count;
964        stats.total_size_bytes += size_bytes;
965        stats.newest_version = Some(SystemTime::now());
966
967        if stats.oldest_version.is_none() {
968            stats.oldest_version = Some(SystemTime::now());
969        }
970
971        if stats.total_versions > 0 {
972            stats.avg_events_per_version = stats.total_events as f64 / stats.total_versions as f64;
973        }
974    }
975
976    async fn record_query_latency(&self, latency_ms: f64) {
977        let mut latencies = self.query_latencies.write().await;
978        latencies.push_back(latency_ms);
979
980        if latencies.len() > 1000 {
981            latencies.pop_front();
982        }
983
984        // Update stats
985        let mut stats = self.stats.write().await;
986        stats.time_travel_queries += 1;
987
988        if !latencies.is_empty() {
989            stats.avg_query_latency_ms = latencies.iter().sum::<f64>() / latencies.len() as f64;
990        }
991    }
992
993    async fn maybe_create_auto_snapshot(&self) -> Result<(), StreamError> {
994        let last = *self.last_snapshot.read().await;
995
996        if last.elapsed() >= self.config.snapshot_interval {
997            let current = *self.current_version.read().await;
998            let name = format!("auto_{}", current);
999            self.create_snapshot(&name).await?;
1000
1001            let mut last_snapshot = self.last_snapshot.write().await;
1002            *last_snapshot = Instant::now();
1003        }
1004
1005        Ok(())
1006    }
1007
1008    fn apply_retention_policy(
1009        &self,
1010        versions: &mut BTreeMap<VersionId, VersionMetadata>,
1011        event_storage: &mut HashMap<VersionId, Vec<VersionedEvent<T>>>,
1012    ) -> Result<(), StreamError> {
1013        // Check max versions
1014        while versions.len() > self.config.max_versions {
1015            if let Some((&oldest_id, _)) = versions.iter().next() {
1016                // Don't delete version 0
1017                if oldest_id == 0 {
1018                    break;
1019                }
1020
1021                versions.remove(&oldest_id);
1022                event_storage.remove(&oldest_id);
1023            } else {
1024                break;
1025            }
1026        }
1027
1028        // Check max age
1029        let cutoff = SystemTime::now() - self.config.max_age;
1030        let mut to_remove = Vec::new();
1031
1032        for (version_id, metadata) in versions.iter() {
1033            if *version_id == 0 {
1034                continue; // Keep initial version
1035            }
1036
1037            if metadata.created_at < cutoff {
1038                to_remove.push(*version_id);
1039            }
1040        }
1041
1042        for version_id in to_remove {
1043            versions.remove(&version_id);
1044            event_storage.remove(&version_id);
1045        }
1046
1047        Ok(())
1048    }
1049}
1050
1051#[cfg(test)]
1052mod tests {
1053    use super::*;
1054
1055    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1056    struct TestEvent {
1057        id: u64,
1058        value: String,
1059    }
1060
1061    #[tokio::test]
1062    async fn test_create_version() {
1063        let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1064
1065        let events = vec![
1066            TestEvent {
1067                id: 1,
1068                value: "first".to_string(),
1069            },
1070            TestEvent {
1071                id: 2,
1072                value: "second".to_string(),
1073            },
1074        ];
1075
1076        let version_id = versioning
1077            .create_version(events, "Test version")
1078            .await
1079            .unwrap();
1080
1081        assert_eq!(version_id, 1);
1082        assert_eq!(versioning.current_version().await, 1);
1083    }
1084
1085    #[tokio::test]
1086    async fn test_time_travel_query() {
1087        let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1088
1089        // Create version 1
1090        let events1 = vec![TestEvent {
1091            id: 1,
1092            value: "v1".to_string(),
1093        }];
1094        versioning
1095            .create_version(events1, "Version 1")
1096            .await
1097            .unwrap();
1098
1099        // Create version 2
1100        let events2 = vec![TestEvent {
1101            id: 2,
1102            value: "v2".to_string(),
1103        }];
1104        versioning
1105            .create_version(events2, "Version 2")
1106            .await
1107            .unwrap();
1108
1109        // Query version 1
1110        let result = versioning.get_at_version(1).await.unwrap();
1111        assert_eq!(result.len(), 1);
1112        assert_eq!(result[0].data.id, 1);
1113
1114        // Query version 2
1115        let result = versioning.get_at_version(2).await.unwrap();
1116        assert_eq!(result.len(), 2);
1117    }
1118
1119    #[tokio::test]
1120    async fn test_snapshot_create_and_restore() {
1121        let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1122
1123        let events = vec![TestEvent {
1124            id: 1,
1125            value: "snapshot_test".to_string(),
1126        }];
1127        versioning
1128            .create_version(events, "Test version")
1129            .await
1130            .unwrap();
1131
1132        // Create snapshot
1133        let snapshot_id = versioning.create_snapshot("test_snap").await.unwrap();
1134        assert!(snapshot_id.contains("test_snap"));
1135
1136        // Add more events
1137        versioning
1138            .create_version(
1139                vec![TestEvent {
1140                    id: 2,
1141                    value: "after_snap".to_string(),
1142                }],
1143                "After snapshot",
1144            )
1145            .await
1146            .unwrap();
1147
1148        // Restore snapshot
1149        let restored_version = versioning.restore_snapshot(&snapshot_id).await.unwrap();
1150        assert!(restored_version > 0);
1151    }
1152
1153    #[tokio::test]
1154    async fn test_branching() {
1155        let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1156
1157        // Create initial version
1158        versioning
1159            .create_version(
1160                vec![TestEvent {
1161                    id: 1,
1162                    value: "main".to_string(),
1163                }],
1164                "Initial",
1165            )
1166            .await
1167            .unwrap();
1168
1169        // Create branch
1170        let branch_id = versioning
1171            .create_branch("feature", "Feature branch")
1172            .await
1173            .unwrap();
1174
1175        // Switch to branch
1176        versioning.switch_branch(&branch_id).await.unwrap();
1177        assert_eq!(versioning.current_branch().await, branch_id);
1178
1179        // Create version on branch
1180        versioning
1181            .create_version(
1182                vec![TestEvent {
1183                    id: 2,
1184                    value: "feature".to_string(),
1185                }],
1186                "Feature work",
1187            )
1188            .await
1189            .unwrap();
1190
1191        // Switch back to main
1192        versioning.switch_branch("main").await.unwrap();
1193        assert_eq!(versioning.current_branch().await, "main");
1194    }
1195
1196    #[tokio::test]
1197    async fn test_diff() {
1198        let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1199
1200        // Version 1
1201        versioning
1202            .create_version(
1203                vec![TestEvent {
1204                    id: 1,
1205                    value: "v1".to_string(),
1206                }],
1207                "V1",
1208            )
1209            .await
1210            .unwrap();
1211
1212        // Version 2
1213        versioning
1214            .create_version(
1215                vec![
1216                    TestEvent {
1217                        id: 1,
1218                        value: "v1".to_string(),
1219                    },
1220                    TestEvent {
1221                        id: 2,
1222                        value: "v2".to_string(),
1223                    },
1224                ],
1225                "V2",
1226            )
1227            .await
1228            .unwrap();
1229
1230        let diff = versioning.diff(1, 2).await.unwrap();
1231        assert_eq!(diff.from_version, 1);
1232        assert_eq!(diff.to_version, 2);
1233        assert!(!diff.added.is_empty());
1234    }
1235
1236    #[tokio::test]
1237    async fn test_changeset() {
1238        let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1239
1240        versioning
1241            .create_version(
1242                vec![TestEvent {
1243                    id: 1,
1244                    value: "initial".to_string(),
1245                }],
1246                "Initial",
1247            )
1248            .await
1249            .unwrap();
1250
1251        versioning
1252            .create_version(
1253                vec![
1254                    TestEvent {
1255                        id: 1,
1256                        value: "initial".to_string(),
1257                    },
1258                    TestEvent {
1259                        id: 2,
1260                        value: "added".to_string(),
1261                    },
1262                ],
1263                "Added",
1264            )
1265            .await
1266            .unwrap();
1267
1268        let changeset = versioning.generate_changeset(1, 2).await.unwrap();
1269        assert!(!changeset.changes.is_empty());
1270
1271        let add_changes: Vec<_> = changeset
1272            .changes
1273            .iter()
1274            .filter(|c| c.change_type == ChangeType::Add)
1275            .collect();
1276        assert!(!add_changes.is_empty());
1277    }
1278
1279    #[tokio::test]
1280    async fn test_tagging() {
1281        let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1282
1283        let version_id = versioning
1284            .create_version(
1285                vec![TestEvent {
1286                    id: 1,
1287                    value: "tagged".to_string(),
1288                }],
1289                "Tagged version",
1290            )
1291            .await
1292            .unwrap();
1293
1294        versioning
1295            .tag_version(version_id, "release", "v1.0.0")
1296            .await
1297            .unwrap();
1298
1299        let found = versioning.find_by_tag("release", "v1.0.0").await;
1300        assert!(found.contains(&version_id));
1301    }
1302
1303    #[tokio::test]
1304    async fn test_relative_time_query() {
1305        let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1306
1307        versioning
1308            .create_version(
1309                vec![TestEvent {
1310                    id: 1,
1311                    value: "recent".to_string(),
1312                }],
1313                "Recent",
1314            )
1315            .await
1316            .unwrap();
1317
1318        // Query with relative time
1319        let query = TimeTravelQuery {
1320            target: TimeTravelTarget::RelativeTime(Duration::from_secs(0)),
1321            branch_id: None,
1322            filter: None,
1323            projection: None,
1324            limit: None,
1325            include_deleted: false,
1326        };
1327
1328        let result = versioning.time_travel_query(query).await.unwrap();
1329        assert!(!result.is_empty());
1330    }
1331
1332    #[tokio::test]
1333    async fn test_stats() {
1334        let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1335
1336        versioning
1337            .create_version(
1338                vec![TestEvent {
1339                    id: 1,
1340                    value: "test".to_string(),
1341                }],
1342                "Test",
1343            )
1344            .await
1345            .unwrap();
1346
1347        let stats = versioning.get_stats().await;
1348        assert!(stats.total_versions >= 1);
1349        assert!(stats.total_events >= 1);
1350    }
1351
1352    #[tokio::test]
1353    async fn test_compact() {
1354        let config = VersioningConfig {
1355            compression_threshold: Duration::from_secs(0), // Immediate compression
1356            ..Default::default()
1357        };
1358
1359        let versioning = StreamVersioning::<TestEvent>::new(config);
1360
1361        versioning
1362            .create_version(
1363                vec![TestEvent {
1364                    id: 1,
1365                    value: "compact_test".to_string(),
1366                }],
1367                "To compact",
1368            )
1369            .await
1370            .unwrap();
1371
1372        let _compacted = versioning.compact().await.unwrap();
1373        // Successfully compacted (number of events retained)
1374    }
1375
1376    #[tokio::test]
1377    async fn test_delete_branch() {
1378        let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1379
1380        let branch_id = versioning
1381            .create_branch("to_delete", "Will be deleted")
1382            .await
1383            .unwrap();
1384
1385        versioning.delete_branch(&branch_id).await.unwrap();
1386
1387        let branches = versioning.get_branches().await;
1388        assert!(!branches.iter().any(|b| b.branch_id == branch_id));
1389    }
1390
1391    #[tokio::test]
1392    async fn test_cannot_delete_main_branch() {
1393        let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1394
1395        let result = versioning.delete_branch("main").await;
1396        assert!(result.is_err());
1397    }
1398    #[tokio::test]
1399    async fn test_retention_policy_concurrency() {
1400        // Configure to trigger retention policy immediately
1401        let config = VersioningConfig {
1402            max_versions: 1,
1403            ..Default::default()
1404        };
1405        let versioning = StreamVersioning::<TestEvent>::new(config);
1406
1407        // Create version 1
1408        versioning.create_version(vec![], "v1").await.unwrap();
1409
1410        // Create version 2 - this should trigger retention policy and deadlock
1411        // because create_version holds locks and calls apply_retention_policy which wants locks
1412        versioning.create_version(vec![], "v2").await.unwrap();
1413    }
1414}