1use serde::{Deserialize, Serialize};
16use std::collections::{BTreeMap, HashMap, VecDeque};
17use std::sync::Arc;
18use std::time::{Duration, Instant, SystemTime};
19use tokio::sync::RwLock;
20
21use crate::error::StreamError;
22
23pub type VersionId = u64;
25
26pub type BranchId = String;
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct VersioningConfig {
32 pub max_versions: usize,
34 pub max_age: Duration,
36 pub auto_snapshot: bool,
38 pub snapshot_interval: Duration,
40 pub compress_old_versions: bool,
42 pub compression_threshold: Duration,
44 pub enable_branching: bool,
46 pub max_branches: usize,
48}
49
50impl Default for VersioningConfig {
51 fn default() -> Self {
52 Self {
53 max_versions: 1000,
54 max_age: Duration::from_secs(86400 * 7), auto_snapshot: true,
56 snapshot_interval: Duration::from_secs(3600), compress_old_versions: true,
58 compression_threshold: Duration::from_secs(86400), enable_branching: true,
60 max_branches: 10,
61 }
62 }
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct VersionMetadata {
68 pub version_id: VersionId,
70 pub created_at: SystemTime,
72 pub parent_version: Option<VersionId>,
74 pub branch_id: BranchId,
76 pub description: String,
78 pub event_count: usize,
80 pub size_bytes: usize,
82 pub is_compressed: bool,
84 pub tags: HashMap<String, String>,
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct VersionedEvent<T> {
91 pub data: T,
93 pub version_id: VersionId,
95 pub timestamp: SystemTime,
97 pub sequence: u64,
99 pub is_deleted: bool,
101 pub deleted_in_version: Option<VersionId>,
103}
104
105#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct Snapshot<T> {
108 pub snapshot_id: String,
110 pub version_id: VersionId,
112 pub timestamp: SystemTime,
114 pub events: Vec<VersionedEvent<T>>,
116 pub metadata: HashMap<String, String>,
118 pub size_bytes: usize,
120}
121
122#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct Branch {
125 pub branch_id: BranchId,
127 pub name: String,
129 pub base_version: VersionId,
131 pub head_version: VersionId,
133 pub created_at: SystemTime,
135 pub updated_at: SystemTime,
137 pub description: String,
139 pub is_main: bool,
141}
142
143#[derive(Debug, Clone, Serialize, Deserialize)]
145pub struct TimeTravelQuery {
146 pub target: TimeTravelTarget,
148 pub branch_id: Option<BranchId>,
150 pub filter: Option<String>,
152 pub projection: Option<Vec<String>>,
154 pub limit: Option<usize>,
156 pub include_deleted: bool,
158}
159
160#[derive(Debug, Clone, Serialize, Deserialize)]
162pub enum TimeTravelTarget {
163 Version(VersionId),
165 Timestamp(SystemTime),
167 RelativeTime(Duration),
169 Latest,
171 Snapshot(String),
173}
174
175#[derive(Debug, Clone, Serialize, Deserialize)]
177pub struct VersionDiff<T> {
178 pub from_version: VersionId,
180 pub to_version: VersionId,
182 pub added: Vec<VersionedEvent<T>>,
184 pub deleted: Vec<VersionedEvent<T>>,
186 pub modified: Vec<(VersionedEvent<T>, VersionedEvent<T>)>,
188 pub unchanged_count: usize,
190}
191
192#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
194pub enum ChangeType {
195 Add,
197 Delete,
199 Modify,
201}
202
203#[derive(Debug, Clone, Serialize, Deserialize)]
205pub struct Change<T> {
206 pub change_type: ChangeType,
208 pub data: T,
210 pub previous: Option<T>,
212 pub timestamp: SystemTime,
214}
215
216#[derive(Debug, Clone, Serialize, Deserialize)]
218pub struct Changeset<T> {
219 pub from_version: VersionId,
221 pub to_version: VersionId,
223 pub changes: Vec<Change<T>>,
225 pub created_at: SystemTime,
227}
228
229#[derive(Debug, Clone, Default, Serialize, Deserialize)]
231pub struct VersioningStats {
232 pub total_versions: usize,
234 pub total_events: usize,
236 pub total_size_bytes: usize,
238 pub snapshot_count: usize,
240 pub branch_count: usize,
242 pub oldest_version: Option<SystemTime>,
244 pub newest_version: Option<SystemTime>,
246 pub avg_events_per_version: f64,
248 pub compression_ratio: f64,
250 pub time_travel_queries: u64,
252 pub avg_query_latency_ms: f64,
254}
255
256pub struct StreamVersioning<T>
258where
259 T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static,
260{
261 config: VersioningConfig,
263 current_version: Arc<RwLock<VersionId>>,
265 versions: Arc<RwLock<BTreeMap<VersionId, VersionMetadata>>>,
267 events: Arc<RwLock<HashMap<VersionId, Vec<VersionedEvent<T>>>>>,
269 snapshots: Arc<RwLock<HashMap<String, Snapshot<T>>>>,
271 branches: Arc<RwLock<HashMap<BranchId, Branch>>>,
273 current_branch: Arc<RwLock<BranchId>>,
275 stats: Arc<RwLock<VersioningStats>>,
277 last_snapshot: Arc<RwLock<Instant>>,
279 query_latencies: Arc<RwLock<VecDeque<f64>>>,
281}
282
283impl<T> StreamVersioning<T>
284where
285 T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static,
286{
287 pub fn new(config: VersioningConfig) -> Self {
289 let main_branch = Branch {
290 branch_id: "main".to_string(),
291 name: "Main Branch".to_string(),
292 base_version: 0,
293 head_version: 0,
294 created_at: SystemTime::now(),
295 updated_at: SystemTime::now(),
296 description: "Main development branch".to_string(),
297 is_main: true,
298 };
299
300 let mut branches = HashMap::new();
301 branches.insert("main".to_string(), main_branch);
302
303 let initial_version = VersionMetadata {
304 version_id: 0,
305 created_at: SystemTime::now(),
306 parent_version: None,
307 branch_id: "main".to_string(),
308 description: "Initial version".to_string(),
309 event_count: 0,
310 size_bytes: 0,
311 is_compressed: false,
312 tags: HashMap::new(),
313 };
314
315 let mut versions = BTreeMap::new();
316 versions.insert(0, initial_version);
317
318 Self {
319 config,
320 current_version: Arc::new(RwLock::new(0)),
321 versions: Arc::new(RwLock::new(versions)),
322 events: Arc::new(RwLock::new(HashMap::new())),
323 snapshots: Arc::new(RwLock::new(HashMap::new())),
324 branches: Arc::new(RwLock::new(branches)),
325 current_branch: Arc::new(RwLock::new("main".to_string())),
326 stats: Arc::new(RwLock::new(VersioningStats::default())),
327 last_snapshot: Arc::new(RwLock::new(Instant::now())),
328 query_latencies: Arc::new(RwLock::new(VecDeque::with_capacity(1000))),
329 }
330 }
331
332 pub async fn create_version(
334 &self,
335 events: Vec<T>,
336 description: &str,
337 ) -> Result<VersionId, StreamError> {
338 let mut current = self.current_version.write().await;
339 let mut versions = self.versions.write().await;
340 let mut event_storage = self.events.write().await;
341 let mut branches = self.branches.write().await;
342 let current_branch = self.current_branch.read().await.clone();
343
344 let new_version_id = *current + 1;
345
346 let versioned_events: Vec<VersionedEvent<T>> = events
348 .into_iter()
349 .enumerate()
350 .map(|(i, data)| VersionedEvent {
351 data,
352 version_id: new_version_id,
353 timestamp: SystemTime::now(),
354 sequence: i as u64,
355 is_deleted: false,
356 deleted_in_version: None,
357 })
358 .collect();
359
360 let event_count = versioned_events.len();
361 let size_bytes = self.estimate_size(&versioned_events);
362
363 let metadata = VersionMetadata {
365 version_id: new_version_id,
366 created_at: SystemTime::now(),
367 parent_version: Some(*current),
368 branch_id: current_branch.clone(),
369 description: description.to_string(),
370 event_count,
371 size_bytes,
372 is_compressed: false,
373 tags: HashMap::new(),
374 };
375
376 versions.insert(new_version_id, metadata);
378 event_storage.insert(new_version_id, versioned_events);
379
380 if let Some(branch) = branches.get_mut(¤t_branch) {
382 branch.head_version = new_version_id;
383 branch.updated_at = SystemTime::now();
384 }
385
386 *current = new_version_id;
387
388 self.update_stats_after_create(event_count, size_bytes)
390 .await;
391
392 self.apply_retention_policy(&mut versions, &mut event_storage)?;
394
395 drop(versions);
397 drop(event_storage);
398 drop(branches);
399 drop(current);
400
401 if self.config.auto_snapshot {
403 self.maybe_create_auto_snapshot().await?;
404 }
405
406 Ok(new_version_id)
407 }
408
409 pub async fn add_events(&self, events: Vec<T>) -> Result<usize, StreamError> {
411 let current = *self.current_version.read().await;
412 let mut event_storage = self.events.write().await;
413
414 let entry = event_storage.entry(current).or_insert_with(Vec::new);
415 let start_sequence = entry.len() as u64;
416
417 let versioned_events: Vec<VersionedEvent<T>> = events
418 .into_iter()
419 .enumerate()
420 .map(|(i, data)| VersionedEvent {
421 data,
422 version_id: current,
423 timestamp: SystemTime::now(),
424 sequence: start_sequence + i as u64,
425 is_deleted: false,
426 deleted_in_version: None,
427 })
428 .collect();
429
430 let count = versioned_events.len();
431 entry.extend(versioned_events);
432
433 let mut versions = self.versions.write().await;
435 if let Some(metadata) = versions.get_mut(¤t) {
436 metadata.event_count = entry.len();
437 metadata.size_bytes = self.estimate_size(entry);
438 }
439
440 Ok(count)
441 }
442
443 pub async fn time_travel_query(
445 &self,
446 query: TimeTravelQuery,
447 ) -> Result<Vec<VersionedEvent<T>>, StreamError> {
448 let start = Instant::now();
449
450 let target_version = self.resolve_target(&query.target).await?;
451 let branch_id = query.branch_id.unwrap_or_else(|| "main".to_string());
452
453 let versions = self.versions.read().await;
455 let event_storage = self.events.read().await;
456
457 let mut result_events = Vec::new();
458
459 for (version_id, metadata) in versions.iter() {
460 if *version_id > target_version {
461 break;
462 }
463
464 if metadata.branch_id != branch_id {
465 continue;
466 }
467
468 if let Some(events) = event_storage.get(version_id) {
469 for event in events {
470 if event.is_deleted && !query.include_deleted {
472 if let Some(deleted_version) = event.deleted_in_version {
473 if deleted_version <= target_version {
474 continue;
475 }
476 }
477 }
478
479 result_events.push(event.clone());
480 }
481 }
482 }
483
484 if let Some(limit) = query.limit {
486 result_events.truncate(limit);
487 }
488
489 let latency = start.elapsed().as_secs_f64() * 1000.0;
491 self.record_query_latency(latency).await;
492
493 Ok(result_events)
494 }
495
496 pub async fn get_at_version(
498 &self,
499 version_id: VersionId,
500 ) -> Result<Vec<VersionedEvent<T>>, StreamError> {
501 let query = TimeTravelQuery {
502 target: TimeTravelTarget::Version(version_id),
503 branch_id: None,
504 filter: None,
505 projection: None,
506 limit: None,
507 include_deleted: false,
508 };
509
510 self.time_travel_query(query).await
511 }
512
513 pub async fn get_at_timestamp(
515 &self,
516 timestamp: SystemTime,
517 ) -> Result<Vec<VersionedEvent<T>>, StreamError> {
518 let query = TimeTravelQuery {
519 target: TimeTravelTarget::Timestamp(timestamp),
520 branch_id: None,
521 filter: None,
522 projection: None,
523 limit: None,
524 include_deleted: false,
525 };
526
527 self.time_travel_query(query).await
528 }
529
530 pub async fn create_snapshot(&self, name: &str) -> Result<String, StreamError> {
532 let current = *self.current_version.read().await;
533 let events = self.get_at_version(current).await?;
534
535 let snapshot_id = format!("{}_{}", name, current);
536 let size_bytes = self.estimate_size(&events);
537
538 let snapshot = Snapshot {
539 snapshot_id: snapshot_id.clone(),
540 version_id: current,
541 timestamp: SystemTime::now(),
542 events,
543 metadata: HashMap::new(),
544 size_bytes,
545 };
546
547 let mut snapshots = self.snapshots.write().await;
548 snapshots.insert(snapshot_id.clone(), snapshot);
549
550 let mut stats = self.stats.write().await;
552 stats.snapshot_count += 1;
553
554 Ok(snapshot_id)
555 }
556
557 pub async fn restore_snapshot(&self, snapshot_id: &str) -> Result<VersionId, StreamError> {
559 let snapshots = self.snapshots.read().await;
560 let snapshot = snapshots
561 .get(snapshot_id)
562 .ok_or_else(|| StreamError::NotFound(format!("Snapshot not found: {}", snapshot_id)))?
563 .clone();
564 drop(snapshots);
565
566 let events: Vec<T> = snapshot.events.into_iter().map(|e| e.data).collect();
568
569 self.create_version(events, &format!("Restored from snapshot: {}", snapshot_id))
570 .await
571 }
572
573 pub async fn create_branch(
575 &self,
576 name: &str,
577 description: &str,
578 ) -> Result<BranchId, StreamError> {
579 if !self.config.enable_branching {
580 return Err(StreamError::Configuration(
581 "Branching is not enabled".to_string(),
582 ));
583 }
584
585 let mut branches = self.branches.write().await;
586
587 if branches.len() >= self.config.max_branches {
588 return Err(StreamError::ResourceExhausted(
589 "Maximum number of branches reached".to_string(),
590 ));
591 }
592
593 let branch_id = format!("branch_{}", uuid::Uuid::new_v4());
594 let current = *self.current_version.read().await;
595
596 let branch = Branch {
597 branch_id: branch_id.clone(),
598 name: name.to_string(),
599 base_version: current,
600 head_version: current,
601 created_at: SystemTime::now(),
602 updated_at: SystemTime::now(),
603 description: description.to_string(),
604 is_main: false,
605 };
606
607 branches.insert(branch_id.clone(), branch);
608
609 let mut stats = self.stats.write().await;
611 stats.branch_count += 1;
612
613 Ok(branch_id)
614 }
615
616 pub async fn switch_branch(&self, branch_id: &str) -> Result<(), StreamError> {
618 let branches = self.branches.read().await;
619
620 if !branches.contains_key(branch_id) {
621 return Err(StreamError::NotFound(format!(
622 "Branch not found: {}",
623 branch_id
624 )));
625 }
626
627 let head_version = branches
628 .get(branch_id)
629 .expect("branch should exist after contains_key check")
630 .head_version;
631 drop(branches);
632
633 let mut current_branch = self.current_branch.write().await;
634 let mut current_version = self.current_version.write().await;
635
636 *current_branch = branch_id.to_string();
637 *current_version = head_version;
638
639 Ok(())
640 }
641
642 pub async fn merge_branch(&self, source_branch_id: &str) -> Result<VersionId, StreamError> {
644 let branches = self.branches.read().await;
645 let current_branch_id = self.current_branch.read().await.clone();
646
647 let source_branch = branches
648 .get(source_branch_id)
649 .ok_or_else(|| {
650 StreamError::NotFound(format!("Branch not found: {}", source_branch_id))
651 })?
652 .clone();
653
654 let target_branch = branches
655 .get(¤t_branch_id)
656 .ok_or_else(|| {
657 StreamError::NotFound(format!("Branch not found: {}", current_branch_id))
658 })?
659 .clone();
660
661 drop(branches);
662
663 let source_events = self
665 .get_branch_events_since(source_branch_id, source_branch.base_version)
666 .await?;
667
668 let events: Vec<T> = source_events.into_iter().map(|e| e.data).collect();
670
671 self.create_version(
672 events,
673 &format!("Merge {} into {}", source_branch.name, target_branch.name),
674 )
675 .await
676 }
677
678 pub async fn diff(
680 &self,
681 from_version: VersionId,
682 to_version: VersionId,
683 ) -> Result<VersionDiff<T>, StreamError> {
684 let from_events = self.get_at_version(from_version).await?;
685 let to_events = self.get_at_version(to_version).await?;
686
687 let from_map: HashMap<u64, &VersionedEvent<T>> =
689 from_events.iter().map(|e| (e.sequence, e)).collect();
690 let to_map: HashMap<u64, &VersionedEvent<T>> =
691 to_events.iter().map(|e| (e.sequence, e)).collect();
692
693 let mut added = Vec::new();
694 let mut deleted = Vec::new();
695 let mut modified = Vec::new();
696 let mut unchanged_count = 0;
697
698 for (seq, event) in &to_map {
700 if let Some(from_event) = from_map.get(seq) {
701 if event.version_id != from_event.version_id {
703 modified.push(((*from_event).clone(), (*event).clone()));
704 } else {
705 unchanged_count += 1;
706 }
707 } else {
708 added.push((*event).clone());
709 }
710 }
711
712 for (seq, event) in &from_map {
714 if !to_map.contains_key(seq) {
715 deleted.push((*event).clone());
716 }
717 }
718
719 Ok(VersionDiff {
720 from_version,
721 to_version,
722 added,
723 deleted,
724 modified,
725 unchanged_count,
726 })
727 }
728
729 pub async fn generate_changeset(
731 &self,
732 from_version: VersionId,
733 to_version: VersionId,
734 ) -> Result<Changeset<T>, StreamError> {
735 let diff = self.diff(from_version, to_version).await?;
736
737 let mut changes = Vec::new();
738
739 for event in diff.added {
741 changes.push(Change {
742 change_type: ChangeType::Add,
743 data: event.data,
744 previous: None,
745 timestamp: event.timestamp,
746 });
747 }
748
749 for event in diff.deleted {
751 changes.push(Change {
752 change_type: ChangeType::Delete,
753 data: event.data,
754 previous: None,
755 timestamp: SystemTime::now(),
756 });
757 }
758
759 for (old, new) in diff.modified {
761 changes.push(Change {
762 change_type: ChangeType::Modify,
763 data: new.data,
764 previous: Some(old.data),
765 timestamp: new.timestamp,
766 });
767 }
768
769 Ok(Changeset {
770 from_version,
771 to_version,
772 changes,
773 created_at: SystemTime::now(),
774 })
775 }
776
777 pub async fn get_version_history(&self) -> Vec<VersionMetadata> {
779 let versions = self.versions.read().await;
780 versions.values().cloned().collect()
781 }
782
783 pub async fn get_branches(&self) -> Vec<Branch> {
785 let branches = self.branches.read().await;
786 branches.values().cloned().collect()
787 }
788
789 pub async fn current_version(&self) -> VersionId {
791 *self.current_version.read().await
792 }
793
794 pub async fn current_branch(&self) -> BranchId {
796 self.current_branch.read().await.clone()
797 }
798
799 pub async fn get_stats(&self) -> VersioningStats {
801 self.stats.read().await.clone()
802 }
803
804 pub async fn tag_version(
806 &self,
807 version_id: VersionId,
808 key: &str,
809 value: &str,
810 ) -> Result<(), StreamError> {
811 let mut versions = self.versions.write().await;
812
813 if let Some(metadata) = versions.get_mut(&version_id) {
814 metadata.tags.insert(key.to_string(), value.to_string());
815 Ok(())
816 } else {
817 Err(StreamError::NotFound(format!(
818 "Version not found: {}",
819 version_id
820 )))
821 }
822 }
823
824 pub async fn find_by_tag(&self, key: &str, value: &str) -> Vec<VersionId> {
826 let versions = self.versions.read().await;
827
828 versions
829 .iter()
830 .filter(|(_, m)| m.tags.get(key).map(|v| v == value).unwrap_or(false))
831 .map(|(id, _)| *id)
832 .collect()
833 }
834
835 pub async fn delete_branch(&self, branch_id: &str) -> Result<(), StreamError> {
837 let mut branches = self.branches.write().await;
838
839 if let Some(branch) = branches.get(branch_id) {
840 if branch.is_main {
841 return Err(StreamError::InvalidOperation(
842 "Cannot delete main branch".to_string(),
843 ));
844 }
845 } else {
846 return Err(StreamError::NotFound(format!(
847 "Branch not found: {}",
848 branch_id
849 )));
850 }
851
852 branches.remove(branch_id);
853
854 let mut stats = self.stats.write().await;
856 stats.branch_count = stats.branch_count.saturating_sub(1);
857
858 Ok(())
859 }
860
861 pub async fn compact(&self) -> Result<usize, StreamError> {
863 let mut event_storage = self.events.write().await;
864 let mut versions = self.versions.write().await;
865
866 let threshold = SystemTime::now() - self.config.compression_threshold;
867 let mut compacted_count = 0;
868
869 for (version_id, metadata) in versions.iter_mut() {
870 if metadata.created_at < threshold && !metadata.is_compressed {
871 metadata.is_compressed = true;
874 compacted_count += 1;
875
876 if let Some(events) = event_storage.get_mut(version_id) {
878 metadata.size_bytes = self.estimate_size(events) / 2; }
881 }
882 }
883
884 Ok(compacted_count)
885 }
886
887 fn resolve_target<'a>(
890 &'a self,
891 target: &'a TimeTravelTarget,
892 ) -> std::pin::Pin<
893 Box<dyn std::future::Future<Output = Result<VersionId, StreamError>> + Send + 'a>,
894 > {
895 Box::pin(async move {
896 match target {
897 TimeTravelTarget::Version(v) => Ok(*v),
898 TimeTravelTarget::Latest => Ok(*self.current_version.read().await),
899 TimeTravelTarget::Timestamp(ts) => {
900 let versions = self.versions.read().await;
901 let mut best_version = 0;
902
903 for (version_id, metadata) in versions.iter() {
904 if metadata.created_at <= *ts {
905 best_version = *version_id;
906 } else {
907 break;
908 }
909 }
910
911 Ok(best_version)
912 }
913 TimeTravelTarget::RelativeTime(duration) => {
914 let target_time = SystemTime::now() - *duration;
915 self.resolve_target(&TimeTravelTarget::Timestamp(target_time))
916 .await
917 }
918 TimeTravelTarget::Snapshot(snapshot_id) => {
919 let snapshots = self.snapshots.read().await;
920 snapshots
921 .get(snapshot_id)
922 .map(|s| s.version_id)
923 .ok_or_else(|| {
924 StreamError::NotFound(format!("Snapshot not found: {}", snapshot_id))
925 })
926 }
927 }
928 })
929 }
930
931 async fn get_branch_events_since(
932 &self,
933 branch_id: &str,
934 since_version: VersionId,
935 ) -> Result<Vec<VersionedEvent<T>>, StreamError> {
936 let versions = self.versions.read().await;
937 let event_storage = self.events.read().await;
938
939 let mut result = Vec::new();
940
941 for (version_id, metadata) in versions.iter() {
942 if *version_id <= since_version {
943 continue;
944 }
945
946 if metadata.branch_id != branch_id {
947 continue;
948 }
949
950 if let Some(events) = event_storage.get(version_id) {
951 result.extend(events.clone());
952 }
953 }
954
955 Ok(result)
956 }
957
958 fn estimate_size<S: Serialize>(&self, data: &S) -> usize {
959 serde_json::to_vec(data).map(|v| v.len()).unwrap_or(0)
961 }
962
963 async fn update_stats_after_create(&self, event_count: usize, size_bytes: usize) {
964 let mut stats = self.stats.write().await;
965 stats.total_versions += 1;
966 stats.total_events += event_count;
967 stats.total_size_bytes += size_bytes;
968 stats.newest_version = Some(SystemTime::now());
969
970 if stats.oldest_version.is_none() {
971 stats.oldest_version = Some(SystemTime::now());
972 }
973
974 if stats.total_versions > 0 {
975 stats.avg_events_per_version = stats.total_events as f64 / stats.total_versions as f64;
976 }
977 }
978
979 async fn record_query_latency(&self, latency_ms: f64) {
980 let mut latencies = self.query_latencies.write().await;
981 latencies.push_back(latency_ms);
982
983 if latencies.len() > 1000 {
984 latencies.pop_front();
985 }
986
987 let mut stats = self.stats.write().await;
989 stats.time_travel_queries += 1;
990
991 if !latencies.is_empty() {
992 stats.avg_query_latency_ms = latencies.iter().sum::<f64>() / latencies.len() as f64;
993 }
994 }
995
996 async fn maybe_create_auto_snapshot(&self) -> Result<(), StreamError> {
997 let last = *self.last_snapshot.read().await;
998
999 if last.elapsed() >= self.config.snapshot_interval {
1000 let current = *self.current_version.read().await;
1001 let name = format!("auto_{}", current);
1002 self.create_snapshot(&name).await?;
1003
1004 let mut last_snapshot = self.last_snapshot.write().await;
1005 *last_snapshot = Instant::now();
1006 }
1007
1008 Ok(())
1009 }
1010
1011 fn apply_retention_policy(
1012 &self,
1013 versions: &mut BTreeMap<VersionId, VersionMetadata>,
1014 event_storage: &mut HashMap<VersionId, Vec<VersionedEvent<T>>>,
1015 ) -> Result<(), StreamError> {
1016 while versions.len() > self.config.max_versions {
1018 if let Some((&oldest_id, _)) = versions.iter().next() {
1019 if oldest_id == 0 {
1021 break;
1022 }
1023
1024 versions.remove(&oldest_id);
1025 event_storage.remove(&oldest_id);
1026 } else {
1027 break;
1028 }
1029 }
1030
1031 let cutoff = SystemTime::now() - self.config.max_age;
1033 let mut to_remove = Vec::new();
1034
1035 for (version_id, metadata) in versions.iter() {
1036 if *version_id == 0 {
1037 continue; }
1039
1040 if metadata.created_at < cutoff {
1041 to_remove.push(*version_id);
1042 }
1043 }
1044
1045 for version_id in to_remove {
1046 versions.remove(&version_id);
1047 event_storage.remove(&version_id);
1048 }
1049
1050 Ok(())
1051 }
1052}
1053
1054#[cfg(test)]
1055mod tests {
1056 use super::*;
1057
1058 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1059 struct TestEvent {
1060 id: u64,
1061 value: String,
1062 }
1063
1064 #[tokio::test]
1065 async fn test_create_version() {
1066 let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1067
1068 let events = vec![
1069 TestEvent {
1070 id: 1,
1071 value: "first".to_string(),
1072 },
1073 TestEvent {
1074 id: 2,
1075 value: "second".to_string(),
1076 },
1077 ];
1078
1079 let version_id = versioning
1080 .create_version(events, "Test version")
1081 .await
1082 .unwrap();
1083
1084 assert_eq!(version_id, 1);
1085 assert_eq!(versioning.current_version().await, 1);
1086 }
1087
1088 #[tokio::test]
1089 async fn test_time_travel_query() {
1090 let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1091
1092 let events1 = vec![TestEvent {
1094 id: 1,
1095 value: "v1".to_string(),
1096 }];
1097 versioning
1098 .create_version(events1, "Version 1")
1099 .await
1100 .unwrap();
1101
1102 let events2 = vec![TestEvent {
1104 id: 2,
1105 value: "v2".to_string(),
1106 }];
1107 versioning
1108 .create_version(events2, "Version 2")
1109 .await
1110 .unwrap();
1111
1112 let result = versioning.get_at_version(1).await.unwrap();
1114 assert_eq!(result.len(), 1);
1115 assert_eq!(result[0].data.id, 1);
1116
1117 let result = versioning.get_at_version(2).await.unwrap();
1119 assert_eq!(result.len(), 2);
1120 }
1121
1122 #[tokio::test]
1123 async fn test_snapshot_create_and_restore() {
1124 let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1125
1126 let events = vec![TestEvent {
1127 id: 1,
1128 value: "snapshot_test".to_string(),
1129 }];
1130 versioning
1131 .create_version(events, "Test version")
1132 .await
1133 .unwrap();
1134
1135 let snapshot_id = versioning.create_snapshot("test_snap").await.unwrap();
1137 assert!(snapshot_id.contains("test_snap"));
1138
1139 versioning
1141 .create_version(
1142 vec![TestEvent {
1143 id: 2,
1144 value: "after_snap".to_string(),
1145 }],
1146 "After snapshot",
1147 )
1148 .await
1149 .unwrap();
1150
1151 let restored_version = versioning.restore_snapshot(&snapshot_id).await.unwrap();
1153 assert!(restored_version > 0);
1154 }
1155
1156 #[tokio::test]
1157 async fn test_branching() {
1158 let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1159
1160 versioning
1162 .create_version(
1163 vec![TestEvent {
1164 id: 1,
1165 value: "main".to_string(),
1166 }],
1167 "Initial",
1168 )
1169 .await
1170 .unwrap();
1171
1172 let branch_id = versioning
1174 .create_branch("feature", "Feature branch")
1175 .await
1176 .unwrap();
1177
1178 versioning.switch_branch(&branch_id).await.unwrap();
1180 assert_eq!(versioning.current_branch().await, branch_id);
1181
1182 versioning
1184 .create_version(
1185 vec![TestEvent {
1186 id: 2,
1187 value: "feature".to_string(),
1188 }],
1189 "Feature work",
1190 )
1191 .await
1192 .unwrap();
1193
1194 versioning.switch_branch("main").await.unwrap();
1196 assert_eq!(versioning.current_branch().await, "main");
1197 }
1198
1199 #[tokio::test]
1200 async fn test_diff() {
1201 let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1202
1203 versioning
1205 .create_version(
1206 vec![TestEvent {
1207 id: 1,
1208 value: "v1".to_string(),
1209 }],
1210 "V1",
1211 )
1212 .await
1213 .unwrap();
1214
1215 versioning
1217 .create_version(
1218 vec![
1219 TestEvent {
1220 id: 1,
1221 value: "v1".to_string(),
1222 },
1223 TestEvent {
1224 id: 2,
1225 value: "v2".to_string(),
1226 },
1227 ],
1228 "V2",
1229 )
1230 .await
1231 .unwrap();
1232
1233 let diff = versioning.diff(1, 2).await.unwrap();
1234 assert_eq!(diff.from_version, 1);
1235 assert_eq!(diff.to_version, 2);
1236 assert!(!diff.added.is_empty());
1237 }
1238
1239 #[tokio::test]
1240 async fn test_changeset() {
1241 let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1242
1243 versioning
1244 .create_version(
1245 vec![TestEvent {
1246 id: 1,
1247 value: "initial".to_string(),
1248 }],
1249 "Initial",
1250 )
1251 .await
1252 .unwrap();
1253
1254 versioning
1255 .create_version(
1256 vec![
1257 TestEvent {
1258 id: 1,
1259 value: "initial".to_string(),
1260 },
1261 TestEvent {
1262 id: 2,
1263 value: "added".to_string(),
1264 },
1265 ],
1266 "Added",
1267 )
1268 .await
1269 .unwrap();
1270
1271 let changeset = versioning.generate_changeset(1, 2).await.unwrap();
1272 assert!(!changeset.changes.is_empty());
1273
1274 let add_changes: Vec<_> = changeset
1275 .changes
1276 .iter()
1277 .filter(|c| c.change_type == ChangeType::Add)
1278 .collect();
1279 assert!(!add_changes.is_empty());
1280 }
1281
1282 #[tokio::test]
1283 async fn test_tagging() {
1284 let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1285
1286 let version_id = versioning
1287 .create_version(
1288 vec![TestEvent {
1289 id: 1,
1290 value: "tagged".to_string(),
1291 }],
1292 "Tagged version",
1293 )
1294 .await
1295 .unwrap();
1296
1297 versioning
1298 .tag_version(version_id, "release", "v1.0.0")
1299 .await
1300 .unwrap();
1301
1302 let found = versioning.find_by_tag("release", "v1.0.0").await;
1303 assert!(found.contains(&version_id));
1304 }
1305
1306 #[tokio::test]
1307 async fn test_relative_time_query() {
1308 let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1309
1310 versioning
1311 .create_version(
1312 vec![TestEvent {
1313 id: 1,
1314 value: "recent".to_string(),
1315 }],
1316 "Recent",
1317 )
1318 .await
1319 .unwrap();
1320
1321 let query = TimeTravelQuery {
1323 target: TimeTravelTarget::RelativeTime(Duration::from_secs(0)),
1324 branch_id: None,
1325 filter: None,
1326 projection: None,
1327 limit: None,
1328 include_deleted: false,
1329 };
1330
1331 let result = versioning.time_travel_query(query).await.unwrap();
1332 assert!(!result.is_empty());
1333 }
1334
1335 #[tokio::test]
1336 async fn test_stats() {
1337 let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1338
1339 versioning
1340 .create_version(
1341 vec![TestEvent {
1342 id: 1,
1343 value: "test".to_string(),
1344 }],
1345 "Test",
1346 )
1347 .await
1348 .unwrap();
1349
1350 let stats = versioning.get_stats().await;
1351 assert!(stats.total_versions >= 1);
1352 assert!(stats.total_events >= 1);
1353 }
1354
1355 #[tokio::test]
1356 async fn test_compact() {
1357 let config = VersioningConfig {
1358 compression_threshold: Duration::from_secs(0), ..Default::default()
1360 };
1361
1362 let versioning = StreamVersioning::<TestEvent>::new(config);
1363
1364 versioning
1365 .create_version(
1366 vec![TestEvent {
1367 id: 1,
1368 value: "compact_test".to_string(),
1369 }],
1370 "To compact",
1371 )
1372 .await
1373 .unwrap();
1374
1375 let _compacted = versioning.compact().await.unwrap();
1376 }
1378
1379 #[tokio::test]
1380 async fn test_delete_branch() {
1381 let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1382
1383 let branch_id = versioning
1384 .create_branch("to_delete", "Will be deleted")
1385 .await
1386 .unwrap();
1387
1388 versioning.delete_branch(&branch_id).await.unwrap();
1389
1390 let branches = versioning.get_branches().await;
1391 assert!(!branches.iter().any(|b| b.branch_id == branch_id));
1392 }
1393
1394 #[tokio::test]
1395 async fn test_cannot_delete_main_branch() {
1396 let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1397
1398 let result = versioning.delete_branch("main").await;
1399 assert!(result.is_err());
1400 }
1401 #[tokio::test]
1402 async fn test_retention_policy_concurrency() {
1403 let config = VersioningConfig {
1405 max_versions: 1,
1406 ..Default::default()
1407 };
1408 let versioning = StreamVersioning::<TestEvent>::new(config);
1409
1410 versioning.create_version(vec![], "v1").await.unwrap();
1412
1413 versioning.create_version(vec![], "v2").await.unwrap();
1416 }
1417}