1use serde::{Deserialize, Serialize};
16use std::collections::{BTreeMap, HashMap, VecDeque};
17use std::sync::Arc;
18use std::time::{Duration, Instant, SystemTime};
19use tokio::sync::RwLock;
20
21use crate::error::StreamError;
22
23pub type VersionId = u64;
25
26pub type BranchId = String;
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct VersioningConfig {
32 pub max_versions: usize,
34 pub max_age: Duration,
36 pub auto_snapshot: bool,
38 pub snapshot_interval: Duration,
40 pub compress_old_versions: bool,
42 pub compression_threshold: Duration,
44 pub enable_branching: bool,
46 pub max_branches: usize,
48}
49
50impl Default for VersioningConfig {
51 fn default() -> Self {
52 Self {
53 max_versions: 1000,
54 max_age: Duration::from_secs(86400 * 7), auto_snapshot: true,
56 snapshot_interval: Duration::from_secs(3600), compress_old_versions: true,
58 compression_threshold: Duration::from_secs(86400), enable_branching: true,
60 max_branches: 10,
61 }
62 }
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct VersionMetadata {
68 pub version_id: VersionId,
70 pub created_at: SystemTime,
72 pub parent_version: Option<VersionId>,
74 pub branch_id: BranchId,
76 pub description: String,
78 pub event_count: usize,
80 pub size_bytes: usize,
82 pub is_compressed: bool,
84 pub tags: HashMap<String, String>,
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct VersionedEvent<T> {
91 pub data: T,
93 pub version_id: VersionId,
95 pub timestamp: SystemTime,
97 pub sequence: u64,
99 pub is_deleted: bool,
101 pub deleted_in_version: Option<VersionId>,
103}
104
105#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct Snapshot<T> {
108 pub snapshot_id: String,
110 pub version_id: VersionId,
112 pub timestamp: SystemTime,
114 pub events: Vec<VersionedEvent<T>>,
116 pub metadata: HashMap<String, String>,
118 pub size_bytes: usize,
120}
121
122#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct Branch {
125 pub branch_id: BranchId,
127 pub name: String,
129 pub base_version: VersionId,
131 pub head_version: VersionId,
133 pub created_at: SystemTime,
135 pub updated_at: SystemTime,
137 pub description: String,
139 pub is_main: bool,
141}
142
143#[derive(Debug, Clone, Serialize, Deserialize)]
145pub struct TimeTravelQuery {
146 pub target: TimeTravelTarget,
148 pub branch_id: Option<BranchId>,
150 pub filter: Option<String>,
152 pub projection: Option<Vec<String>>,
154 pub limit: Option<usize>,
156 pub include_deleted: bool,
158}
159
160#[derive(Debug, Clone, Serialize, Deserialize)]
162pub enum TimeTravelTarget {
163 Version(VersionId),
165 Timestamp(SystemTime),
167 RelativeTime(Duration),
169 Latest,
171 Snapshot(String),
173}
174
175#[derive(Debug, Clone, Serialize, Deserialize)]
177pub struct VersionDiff<T> {
178 pub from_version: VersionId,
180 pub to_version: VersionId,
182 pub added: Vec<VersionedEvent<T>>,
184 pub deleted: Vec<VersionedEvent<T>>,
186 pub modified: Vec<(VersionedEvent<T>, VersionedEvent<T>)>,
188 pub unchanged_count: usize,
190}
191
192#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
194pub enum ChangeType {
195 Add,
197 Delete,
199 Modify,
201}
202
203#[derive(Debug, Clone, Serialize, Deserialize)]
205pub struct Change<T> {
206 pub change_type: ChangeType,
208 pub data: T,
210 pub previous: Option<T>,
212 pub timestamp: SystemTime,
214}
215
216#[derive(Debug, Clone, Serialize, Deserialize)]
218pub struct Changeset<T> {
219 pub from_version: VersionId,
221 pub to_version: VersionId,
223 pub changes: Vec<Change<T>>,
225 pub created_at: SystemTime,
227}
228
229#[derive(Debug, Clone, Default, Serialize, Deserialize)]
231pub struct VersioningStats {
232 pub total_versions: usize,
234 pub total_events: usize,
236 pub total_size_bytes: usize,
238 pub snapshot_count: usize,
240 pub branch_count: usize,
242 pub oldest_version: Option<SystemTime>,
244 pub newest_version: Option<SystemTime>,
246 pub avg_events_per_version: f64,
248 pub compression_ratio: f64,
250 pub time_travel_queries: u64,
252 pub avg_query_latency_ms: f64,
254}
255
256pub struct StreamVersioning<T>
258where
259 T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static,
260{
261 config: VersioningConfig,
263 current_version: Arc<RwLock<VersionId>>,
265 versions: Arc<RwLock<BTreeMap<VersionId, VersionMetadata>>>,
267 events: Arc<RwLock<HashMap<VersionId, Vec<VersionedEvent<T>>>>>,
269 snapshots: Arc<RwLock<HashMap<String, Snapshot<T>>>>,
271 branches: Arc<RwLock<HashMap<BranchId, Branch>>>,
273 current_branch: Arc<RwLock<BranchId>>,
275 stats: Arc<RwLock<VersioningStats>>,
277 last_snapshot: Arc<RwLock<Instant>>,
279 query_latencies: Arc<RwLock<VecDeque<f64>>>,
281}
282
283impl<T> StreamVersioning<T>
284where
285 T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static,
286{
287 pub fn new(config: VersioningConfig) -> Self {
289 let main_branch = Branch {
290 branch_id: "main".to_string(),
291 name: "Main Branch".to_string(),
292 base_version: 0,
293 head_version: 0,
294 created_at: SystemTime::now(),
295 updated_at: SystemTime::now(),
296 description: "Main development branch".to_string(),
297 is_main: true,
298 };
299
300 let mut branches = HashMap::new();
301 branches.insert("main".to_string(), main_branch);
302
303 let initial_version = VersionMetadata {
304 version_id: 0,
305 created_at: SystemTime::now(),
306 parent_version: None,
307 branch_id: "main".to_string(),
308 description: "Initial version".to_string(),
309 event_count: 0,
310 size_bytes: 0,
311 is_compressed: false,
312 tags: HashMap::new(),
313 };
314
315 let mut versions = BTreeMap::new();
316 versions.insert(0, initial_version);
317
318 Self {
319 config,
320 current_version: Arc::new(RwLock::new(0)),
321 versions: Arc::new(RwLock::new(versions)),
322 events: Arc::new(RwLock::new(HashMap::new())),
323 snapshots: Arc::new(RwLock::new(HashMap::new())),
324 branches: Arc::new(RwLock::new(branches)),
325 current_branch: Arc::new(RwLock::new("main".to_string())),
326 stats: Arc::new(RwLock::new(VersioningStats::default())),
327 last_snapshot: Arc::new(RwLock::new(Instant::now())),
328 query_latencies: Arc::new(RwLock::new(VecDeque::with_capacity(1000))),
329 }
330 }
331
332 pub async fn create_version(
334 &self,
335 events: Vec<T>,
336 description: &str,
337 ) -> Result<VersionId, StreamError> {
338 let mut current = self.current_version.write().await;
339 let mut versions = self.versions.write().await;
340 let mut event_storage = self.events.write().await;
341 let mut branches = self.branches.write().await;
342 let current_branch = self.current_branch.read().await.clone();
343
344 let new_version_id = *current + 1;
345
346 let versioned_events: Vec<VersionedEvent<T>> = events
348 .into_iter()
349 .enumerate()
350 .map(|(i, data)| VersionedEvent {
351 data,
352 version_id: new_version_id,
353 timestamp: SystemTime::now(),
354 sequence: i as u64,
355 is_deleted: false,
356 deleted_in_version: None,
357 })
358 .collect();
359
360 let event_count = versioned_events.len();
361 let size_bytes = self.estimate_size(&versioned_events);
362
363 let metadata = VersionMetadata {
365 version_id: new_version_id,
366 created_at: SystemTime::now(),
367 parent_version: Some(*current),
368 branch_id: current_branch.clone(),
369 description: description.to_string(),
370 event_count,
371 size_bytes,
372 is_compressed: false,
373 tags: HashMap::new(),
374 };
375
376 versions.insert(new_version_id, metadata);
378 event_storage.insert(new_version_id, versioned_events);
379
380 if let Some(branch) = branches.get_mut(¤t_branch) {
382 branch.head_version = new_version_id;
383 branch.updated_at = SystemTime::now();
384 }
385
386 *current = new_version_id;
387
388 self.update_stats_after_create(event_count, size_bytes)
390 .await;
391
392 self.apply_retention_policy(&mut versions, &mut event_storage)?;
394
395 drop(versions);
397 drop(event_storage);
398 drop(branches);
399 drop(current);
400
401 if self.config.auto_snapshot {
403 self.maybe_create_auto_snapshot().await?;
404 }
405
406 Ok(new_version_id)
407 }
408
409 pub async fn add_events(&self, events: Vec<T>) -> Result<usize, StreamError> {
411 let current = *self.current_version.read().await;
412 let mut event_storage = self.events.write().await;
413
414 let entry = event_storage.entry(current).or_insert_with(Vec::new);
415 let start_sequence = entry.len() as u64;
416
417 let versioned_events: Vec<VersionedEvent<T>> = events
418 .into_iter()
419 .enumerate()
420 .map(|(i, data)| VersionedEvent {
421 data,
422 version_id: current,
423 timestamp: SystemTime::now(),
424 sequence: start_sequence + i as u64,
425 is_deleted: false,
426 deleted_in_version: None,
427 })
428 .collect();
429
430 let count = versioned_events.len();
431 entry.extend(versioned_events);
432
433 let mut versions = self.versions.write().await;
435 if let Some(metadata) = versions.get_mut(¤t) {
436 metadata.event_count = entry.len();
437 metadata.size_bytes = self.estimate_size(entry);
438 }
439
440 Ok(count)
441 }
442
443 pub async fn time_travel_query(
445 &self,
446 query: TimeTravelQuery,
447 ) -> Result<Vec<VersionedEvent<T>>, StreamError> {
448 let start = Instant::now();
449
450 let target_version = self.resolve_target(&query.target).await?;
451 let branch_id = query.branch_id.unwrap_or_else(|| "main".to_string());
452
453 let versions = self.versions.read().await;
455 let event_storage = self.events.read().await;
456
457 let mut result_events = Vec::new();
458
459 for (version_id, metadata) in versions.iter() {
460 if *version_id > target_version {
461 break;
462 }
463
464 if metadata.branch_id != branch_id {
465 continue;
466 }
467
468 if let Some(events) = event_storage.get(version_id) {
469 for event in events {
470 if event.is_deleted && !query.include_deleted {
472 if let Some(deleted_version) = event.deleted_in_version {
473 if deleted_version <= target_version {
474 continue;
475 }
476 }
477 }
478
479 result_events.push(event.clone());
480 }
481 }
482 }
483
484 if let Some(limit) = query.limit {
486 result_events.truncate(limit);
487 }
488
489 let latency = start.elapsed().as_secs_f64() * 1000.0;
491 self.record_query_latency(latency).await;
492
493 Ok(result_events)
494 }
495
496 pub async fn get_at_version(
498 &self,
499 version_id: VersionId,
500 ) -> Result<Vec<VersionedEvent<T>>, StreamError> {
501 let query = TimeTravelQuery {
502 target: TimeTravelTarget::Version(version_id),
503 branch_id: None,
504 filter: None,
505 projection: None,
506 limit: None,
507 include_deleted: false,
508 };
509
510 self.time_travel_query(query).await
511 }
512
513 pub async fn get_at_timestamp(
515 &self,
516 timestamp: SystemTime,
517 ) -> Result<Vec<VersionedEvent<T>>, StreamError> {
518 let query = TimeTravelQuery {
519 target: TimeTravelTarget::Timestamp(timestamp),
520 branch_id: None,
521 filter: None,
522 projection: None,
523 limit: None,
524 include_deleted: false,
525 };
526
527 self.time_travel_query(query).await
528 }
529
530 pub async fn create_snapshot(&self, name: &str) -> Result<String, StreamError> {
532 let current = *self.current_version.read().await;
533 let events = self.get_at_version(current).await?;
534
535 let snapshot_id = format!("{}_{}", name, current);
536 let size_bytes = self.estimate_size(&events);
537
538 let snapshot = Snapshot {
539 snapshot_id: snapshot_id.clone(),
540 version_id: current,
541 timestamp: SystemTime::now(),
542 events,
543 metadata: HashMap::new(),
544 size_bytes,
545 };
546
547 let mut snapshots = self.snapshots.write().await;
548 snapshots.insert(snapshot_id.clone(), snapshot);
549
550 let mut stats = self.stats.write().await;
552 stats.snapshot_count += 1;
553
554 Ok(snapshot_id)
555 }
556
557 pub async fn restore_snapshot(&self, snapshot_id: &str) -> Result<VersionId, StreamError> {
559 let snapshots = self.snapshots.read().await;
560 let snapshot = snapshots
561 .get(snapshot_id)
562 .ok_or_else(|| StreamError::NotFound(format!("Snapshot not found: {}", snapshot_id)))?
563 .clone();
564 drop(snapshots);
565
566 let events: Vec<T> = snapshot.events.into_iter().map(|e| e.data).collect();
568
569 self.create_version(events, &format!("Restored from snapshot: {}", snapshot_id))
570 .await
571 }
572
573 pub async fn create_branch(
575 &self,
576 name: &str,
577 description: &str,
578 ) -> Result<BranchId, StreamError> {
579 if !self.config.enable_branching {
580 return Err(StreamError::Configuration(
581 "Branching is not enabled".to_string(),
582 ));
583 }
584
585 let mut branches = self.branches.write().await;
586
587 if branches.len() >= self.config.max_branches {
588 return Err(StreamError::ResourceExhausted(
589 "Maximum number of branches reached".to_string(),
590 ));
591 }
592
593 let branch_id = format!("branch_{}", uuid::Uuid::new_v4());
594 let current = *self.current_version.read().await;
595
596 let branch = Branch {
597 branch_id: branch_id.clone(),
598 name: name.to_string(),
599 base_version: current,
600 head_version: current,
601 created_at: SystemTime::now(),
602 updated_at: SystemTime::now(),
603 description: description.to_string(),
604 is_main: false,
605 };
606
607 branches.insert(branch_id.clone(), branch);
608
609 let mut stats = self.stats.write().await;
611 stats.branch_count += 1;
612
613 Ok(branch_id)
614 }
615
616 pub async fn switch_branch(&self, branch_id: &str) -> Result<(), StreamError> {
618 let branches = self.branches.read().await;
619
620 if !branches.contains_key(branch_id) {
621 return Err(StreamError::NotFound(format!(
622 "Branch not found: {}",
623 branch_id
624 )));
625 }
626
627 let head_version = branches.get(branch_id).unwrap().head_version;
628 drop(branches);
629
630 let mut current_branch = self.current_branch.write().await;
631 let mut current_version = self.current_version.write().await;
632
633 *current_branch = branch_id.to_string();
634 *current_version = head_version;
635
636 Ok(())
637 }
638
639 pub async fn merge_branch(&self, source_branch_id: &str) -> Result<VersionId, StreamError> {
641 let branches = self.branches.read().await;
642 let current_branch_id = self.current_branch.read().await.clone();
643
644 let source_branch = branches
645 .get(source_branch_id)
646 .ok_or_else(|| {
647 StreamError::NotFound(format!("Branch not found: {}", source_branch_id))
648 })?
649 .clone();
650
651 let target_branch = branches
652 .get(¤t_branch_id)
653 .ok_or_else(|| {
654 StreamError::NotFound(format!("Branch not found: {}", current_branch_id))
655 })?
656 .clone();
657
658 drop(branches);
659
660 let source_events = self
662 .get_branch_events_since(source_branch_id, source_branch.base_version)
663 .await?;
664
665 let events: Vec<T> = source_events.into_iter().map(|e| e.data).collect();
667
668 self.create_version(
669 events,
670 &format!("Merge {} into {}", source_branch.name, target_branch.name),
671 )
672 .await
673 }
674
675 pub async fn diff(
677 &self,
678 from_version: VersionId,
679 to_version: VersionId,
680 ) -> Result<VersionDiff<T>, StreamError> {
681 let from_events = self.get_at_version(from_version).await?;
682 let to_events = self.get_at_version(to_version).await?;
683
684 let from_map: HashMap<u64, &VersionedEvent<T>> =
686 from_events.iter().map(|e| (e.sequence, e)).collect();
687 let to_map: HashMap<u64, &VersionedEvent<T>> =
688 to_events.iter().map(|e| (e.sequence, e)).collect();
689
690 let mut added = Vec::new();
691 let mut deleted = Vec::new();
692 let mut modified = Vec::new();
693 let mut unchanged_count = 0;
694
695 for (seq, event) in &to_map {
697 if let Some(from_event) = from_map.get(seq) {
698 if event.version_id != from_event.version_id {
700 modified.push(((*from_event).clone(), (*event).clone()));
701 } else {
702 unchanged_count += 1;
703 }
704 } else {
705 added.push((*event).clone());
706 }
707 }
708
709 for (seq, event) in &from_map {
711 if !to_map.contains_key(seq) {
712 deleted.push((*event).clone());
713 }
714 }
715
716 Ok(VersionDiff {
717 from_version,
718 to_version,
719 added,
720 deleted,
721 modified,
722 unchanged_count,
723 })
724 }
725
726 pub async fn generate_changeset(
728 &self,
729 from_version: VersionId,
730 to_version: VersionId,
731 ) -> Result<Changeset<T>, StreamError> {
732 let diff = self.diff(from_version, to_version).await?;
733
734 let mut changes = Vec::new();
735
736 for event in diff.added {
738 changes.push(Change {
739 change_type: ChangeType::Add,
740 data: event.data,
741 previous: None,
742 timestamp: event.timestamp,
743 });
744 }
745
746 for event in diff.deleted {
748 changes.push(Change {
749 change_type: ChangeType::Delete,
750 data: event.data,
751 previous: None,
752 timestamp: SystemTime::now(),
753 });
754 }
755
756 for (old, new) in diff.modified {
758 changes.push(Change {
759 change_type: ChangeType::Modify,
760 data: new.data,
761 previous: Some(old.data),
762 timestamp: new.timestamp,
763 });
764 }
765
766 Ok(Changeset {
767 from_version,
768 to_version,
769 changes,
770 created_at: SystemTime::now(),
771 })
772 }
773
774 pub async fn get_version_history(&self) -> Vec<VersionMetadata> {
776 let versions = self.versions.read().await;
777 versions.values().cloned().collect()
778 }
779
780 pub async fn get_branches(&self) -> Vec<Branch> {
782 let branches = self.branches.read().await;
783 branches.values().cloned().collect()
784 }
785
786 pub async fn current_version(&self) -> VersionId {
788 *self.current_version.read().await
789 }
790
791 pub async fn current_branch(&self) -> BranchId {
793 self.current_branch.read().await.clone()
794 }
795
796 pub async fn get_stats(&self) -> VersioningStats {
798 self.stats.read().await.clone()
799 }
800
801 pub async fn tag_version(
803 &self,
804 version_id: VersionId,
805 key: &str,
806 value: &str,
807 ) -> Result<(), StreamError> {
808 let mut versions = self.versions.write().await;
809
810 if let Some(metadata) = versions.get_mut(&version_id) {
811 metadata.tags.insert(key.to_string(), value.to_string());
812 Ok(())
813 } else {
814 Err(StreamError::NotFound(format!(
815 "Version not found: {}",
816 version_id
817 )))
818 }
819 }
820
821 pub async fn find_by_tag(&self, key: &str, value: &str) -> Vec<VersionId> {
823 let versions = self.versions.read().await;
824
825 versions
826 .iter()
827 .filter(|(_, m)| m.tags.get(key).map(|v| v == value).unwrap_or(false))
828 .map(|(id, _)| *id)
829 .collect()
830 }
831
832 pub async fn delete_branch(&self, branch_id: &str) -> Result<(), StreamError> {
834 let mut branches = self.branches.write().await;
835
836 if let Some(branch) = branches.get(branch_id) {
837 if branch.is_main {
838 return Err(StreamError::InvalidOperation(
839 "Cannot delete main branch".to_string(),
840 ));
841 }
842 } else {
843 return Err(StreamError::NotFound(format!(
844 "Branch not found: {}",
845 branch_id
846 )));
847 }
848
849 branches.remove(branch_id);
850
851 let mut stats = self.stats.write().await;
853 stats.branch_count = stats.branch_count.saturating_sub(1);
854
855 Ok(())
856 }
857
858 pub async fn compact(&self) -> Result<usize, StreamError> {
860 let mut event_storage = self.events.write().await;
861 let mut versions = self.versions.write().await;
862
863 let threshold = SystemTime::now() - self.config.compression_threshold;
864 let mut compacted_count = 0;
865
866 for (version_id, metadata) in versions.iter_mut() {
867 if metadata.created_at < threshold && !metadata.is_compressed {
868 metadata.is_compressed = true;
871 compacted_count += 1;
872
873 if let Some(events) = event_storage.get_mut(version_id) {
875 metadata.size_bytes = self.estimate_size(events) / 2; }
878 }
879 }
880
881 Ok(compacted_count)
882 }
883
884 fn resolve_target<'a>(
887 &'a self,
888 target: &'a TimeTravelTarget,
889 ) -> std::pin::Pin<
890 Box<dyn std::future::Future<Output = Result<VersionId, StreamError>> + Send + 'a>,
891 > {
892 Box::pin(async move {
893 match target {
894 TimeTravelTarget::Version(v) => Ok(*v),
895 TimeTravelTarget::Latest => Ok(*self.current_version.read().await),
896 TimeTravelTarget::Timestamp(ts) => {
897 let versions = self.versions.read().await;
898 let mut best_version = 0;
899
900 for (version_id, metadata) in versions.iter() {
901 if metadata.created_at <= *ts {
902 best_version = *version_id;
903 } else {
904 break;
905 }
906 }
907
908 Ok(best_version)
909 }
910 TimeTravelTarget::RelativeTime(duration) => {
911 let target_time = SystemTime::now() - *duration;
912 self.resolve_target(&TimeTravelTarget::Timestamp(target_time))
913 .await
914 }
915 TimeTravelTarget::Snapshot(snapshot_id) => {
916 let snapshots = self.snapshots.read().await;
917 snapshots
918 .get(snapshot_id)
919 .map(|s| s.version_id)
920 .ok_or_else(|| {
921 StreamError::NotFound(format!("Snapshot not found: {}", snapshot_id))
922 })
923 }
924 }
925 })
926 }
927
928 async fn get_branch_events_since(
929 &self,
930 branch_id: &str,
931 since_version: VersionId,
932 ) -> Result<Vec<VersionedEvent<T>>, StreamError> {
933 let versions = self.versions.read().await;
934 let event_storage = self.events.read().await;
935
936 let mut result = Vec::new();
937
938 for (version_id, metadata) in versions.iter() {
939 if *version_id <= since_version {
940 continue;
941 }
942
943 if metadata.branch_id != branch_id {
944 continue;
945 }
946
947 if let Some(events) = event_storage.get(version_id) {
948 result.extend(events.clone());
949 }
950 }
951
952 Ok(result)
953 }
954
955 fn estimate_size<S: Serialize>(&self, data: &S) -> usize {
956 serde_json::to_vec(data).map(|v| v.len()).unwrap_or(0)
958 }
959
960 async fn update_stats_after_create(&self, event_count: usize, size_bytes: usize) {
961 let mut stats = self.stats.write().await;
962 stats.total_versions += 1;
963 stats.total_events += event_count;
964 stats.total_size_bytes += size_bytes;
965 stats.newest_version = Some(SystemTime::now());
966
967 if stats.oldest_version.is_none() {
968 stats.oldest_version = Some(SystemTime::now());
969 }
970
971 if stats.total_versions > 0 {
972 stats.avg_events_per_version = stats.total_events as f64 / stats.total_versions as f64;
973 }
974 }
975
976 async fn record_query_latency(&self, latency_ms: f64) {
977 let mut latencies = self.query_latencies.write().await;
978 latencies.push_back(latency_ms);
979
980 if latencies.len() > 1000 {
981 latencies.pop_front();
982 }
983
984 let mut stats = self.stats.write().await;
986 stats.time_travel_queries += 1;
987
988 if !latencies.is_empty() {
989 stats.avg_query_latency_ms = latencies.iter().sum::<f64>() / latencies.len() as f64;
990 }
991 }
992
993 async fn maybe_create_auto_snapshot(&self) -> Result<(), StreamError> {
994 let last = *self.last_snapshot.read().await;
995
996 if last.elapsed() >= self.config.snapshot_interval {
997 let current = *self.current_version.read().await;
998 let name = format!("auto_{}", current);
999 self.create_snapshot(&name).await?;
1000
1001 let mut last_snapshot = self.last_snapshot.write().await;
1002 *last_snapshot = Instant::now();
1003 }
1004
1005 Ok(())
1006 }
1007
1008 fn apply_retention_policy(
1009 &self,
1010 versions: &mut BTreeMap<VersionId, VersionMetadata>,
1011 event_storage: &mut HashMap<VersionId, Vec<VersionedEvent<T>>>,
1012 ) -> Result<(), StreamError> {
1013 while versions.len() > self.config.max_versions {
1015 if let Some((&oldest_id, _)) = versions.iter().next() {
1016 if oldest_id == 0 {
1018 break;
1019 }
1020
1021 versions.remove(&oldest_id);
1022 event_storage.remove(&oldest_id);
1023 } else {
1024 break;
1025 }
1026 }
1027
1028 let cutoff = SystemTime::now() - self.config.max_age;
1030 let mut to_remove = Vec::new();
1031
1032 for (version_id, metadata) in versions.iter() {
1033 if *version_id == 0 {
1034 continue; }
1036
1037 if metadata.created_at < cutoff {
1038 to_remove.push(*version_id);
1039 }
1040 }
1041
1042 for version_id in to_remove {
1043 versions.remove(&version_id);
1044 event_storage.remove(&version_id);
1045 }
1046
1047 Ok(())
1048 }
1049}
1050
1051#[cfg(test)]
1052mod tests {
1053 use super::*;
1054
1055 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1056 struct TestEvent {
1057 id: u64,
1058 value: String,
1059 }
1060
1061 #[tokio::test]
1062 async fn test_create_version() {
1063 let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1064
1065 let events = vec![
1066 TestEvent {
1067 id: 1,
1068 value: "first".to_string(),
1069 },
1070 TestEvent {
1071 id: 2,
1072 value: "second".to_string(),
1073 },
1074 ];
1075
1076 let version_id = versioning
1077 .create_version(events, "Test version")
1078 .await
1079 .unwrap();
1080
1081 assert_eq!(version_id, 1);
1082 assert_eq!(versioning.current_version().await, 1);
1083 }
1084
1085 #[tokio::test]
1086 async fn test_time_travel_query() {
1087 let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1088
1089 let events1 = vec![TestEvent {
1091 id: 1,
1092 value: "v1".to_string(),
1093 }];
1094 versioning
1095 .create_version(events1, "Version 1")
1096 .await
1097 .unwrap();
1098
1099 let events2 = vec![TestEvent {
1101 id: 2,
1102 value: "v2".to_string(),
1103 }];
1104 versioning
1105 .create_version(events2, "Version 2")
1106 .await
1107 .unwrap();
1108
1109 let result = versioning.get_at_version(1).await.unwrap();
1111 assert_eq!(result.len(), 1);
1112 assert_eq!(result[0].data.id, 1);
1113
1114 let result = versioning.get_at_version(2).await.unwrap();
1116 assert_eq!(result.len(), 2);
1117 }
1118
1119 #[tokio::test]
1120 async fn test_snapshot_create_and_restore() {
1121 let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1122
1123 let events = vec![TestEvent {
1124 id: 1,
1125 value: "snapshot_test".to_string(),
1126 }];
1127 versioning
1128 .create_version(events, "Test version")
1129 .await
1130 .unwrap();
1131
1132 let snapshot_id = versioning.create_snapshot("test_snap").await.unwrap();
1134 assert!(snapshot_id.contains("test_snap"));
1135
1136 versioning
1138 .create_version(
1139 vec![TestEvent {
1140 id: 2,
1141 value: "after_snap".to_string(),
1142 }],
1143 "After snapshot",
1144 )
1145 .await
1146 .unwrap();
1147
1148 let restored_version = versioning.restore_snapshot(&snapshot_id).await.unwrap();
1150 assert!(restored_version > 0);
1151 }
1152
1153 #[tokio::test]
1154 async fn test_branching() {
1155 let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1156
1157 versioning
1159 .create_version(
1160 vec![TestEvent {
1161 id: 1,
1162 value: "main".to_string(),
1163 }],
1164 "Initial",
1165 )
1166 .await
1167 .unwrap();
1168
1169 let branch_id = versioning
1171 .create_branch("feature", "Feature branch")
1172 .await
1173 .unwrap();
1174
1175 versioning.switch_branch(&branch_id).await.unwrap();
1177 assert_eq!(versioning.current_branch().await, branch_id);
1178
1179 versioning
1181 .create_version(
1182 vec![TestEvent {
1183 id: 2,
1184 value: "feature".to_string(),
1185 }],
1186 "Feature work",
1187 )
1188 .await
1189 .unwrap();
1190
1191 versioning.switch_branch("main").await.unwrap();
1193 assert_eq!(versioning.current_branch().await, "main");
1194 }
1195
1196 #[tokio::test]
1197 async fn test_diff() {
1198 let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1199
1200 versioning
1202 .create_version(
1203 vec![TestEvent {
1204 id: 1,
1205 value: "v1".to_string(),
1206 }],
1207 "V1",
1208 )
1209 .await
1210 .unwrap();
1211
1212 versioning
1214 .create_version(
1215 vec![
1216 TestEvent {
1217 id: 1,
1218 value: "v1".to_string(),
1219 },
1220 TestEvent {
1221 id: 2,
1222 value: "v2".to_string(),
1223 },
1224 ],
1225 "V2",
1226 )
1227 .await
1228 .unwrap();
1229
1230 let diff = versioning.diff(1, 2).await.unwrap();
1231 assert_eq!(diff.from_version, 1);
1232 assert_eq!(diff.to_version, 2);
1233 assert!(!diff.added.is_empty());
1234 }
1235
1236 #[tokio::test]
1237 async fn test_changeset() {
1238 let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1239
1240 versioning
1241 .create_version(
1242 vec![TestEvent {
1243 id: 1,
1244 value: "initial".to_string(),
1245 }],
1246 "Initial",
1247 )
1248 .await
1249 .unwrap();
1250
1251 versioning
1252 .create_version(
1253 vec![
1254 TestEvent {
1255 id: 1,
1256 value: "initial".to_string(),
1257 },
1258 TestEvent {
1259 id: 2,
1260 value: "added".to_string(),
1261 },
1262 ],
1263 "Added",
1264 )
1265 .await
1266 .unwrap();
1267
1268 let changeset = versioning.generate_changeset(1, 2).await.unwrap();
1269 assert!(!changeset.changes.is_empty());
1270
1271 let add_changes: Vec<_> = changeset
1272 .changes
1273 .iter()
1274 .filter(|c| c.change_type == ChangeType::Add)
1275 .collect();
1276 assert!(!add_changes.is_empty());
1277 }
1278
1279 #[tokio::test]
1280 async fn test_tagging() {
1281 let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1282
1283 let version_id = versioning
1284 .create_version(
1285 vec![TestEvent {
1286 id: 1,
1287 value: "tagged".to_string(),
1288 }],
1289 "Tagged version",
1290 )
1291 .await
1292 .unwrap();
1293
1294 versioning
1295 .tag_version(version_id, "release", "v1.0.0")
1296 .await
1297 .unwrap();
1298
1299 let found = versioning.find_by_tag("release", "v1.0.0").await;
1300 assert!(found.contains(&version_id));
1301 }
1302
1303 #[tokio::test]
1304 async fn test_relative_time_query() {
1305 let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1306
1307 versioning
1308 .create_version(
1309 vec![TestEvent {
1310 id: 1,
1311 value: "recent".to_string(),
1312 }],
1313 "Recent",
1314 )
1315 .await
1316 .unwrap();
1317
1318 let query = TimeTravelQuery {
1320 target: TimeTravelTarget::RelativeTime(Duration::from_secs(0)),
1321 branch_id: None,
1322 filter: None,
1323 projection: None,
1324 limit: None,
1325 include_deleted: false,
1326 };
1327
1328 let result = versioning.time_travel_query(query).await.unwrap();
1329 assert!(!result.is_empty());
1330 }
1331
1332 #[tokio::test]
1333 async fn test_stats() {
1334 let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1335
1336 versioning
1337 .create_version(
1338 vec![TestEvent {
1339 id: 1,
1340 value: "test".to_string(),
1341 }],
1342 "Test",
1343 )
1344 .await
1345 .unwrap();
1346
1347 let stats = versioning.get_stats().await;
1348 assert!(stats.total_versions >= 1);
1349 assert!(stats.total_events >= 1);
1350 }
1351
1352 #[tokio::test]
1353 async fn test_compact() {
1354 let config = VersioningConfig {
1355 compression_threshold: Duration::from_secs(0), ..Default::default()
1357 };
1358
1359 let versioning = StreamVersioning::<TestEvent>::new(config);
1360
1361 versioning
1362 .create_version(
1363 vec![TestEvent {
1364 id: 1,
1365 value: "compact_test".to_string(),
1366 }],
1367 "To compact",
1368 )
1369 .await
1370 .unwrap();
1371
1372 let _compacted = versioning.compact().await.unwrap();
1373 }
1375
1376 #[tokio::test]
1377 async fn test_delete_branch() {
1378 let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1379
1380 let branch_id = versioning
1381 .create_branch("to_delete", "Will be deleted")
1382 .await
1383 .unwrap();
1384
1385 versioning.delete_branch(&branch_id).await.unwrap();
1386
1387 let branches = versioning.get_branches().await;
1388 assert!(!branches.iter().any(|b| b.branch_id == branch_id));
1389 }
1390
1391 #[tokio::test]
1392 async fn test_cannot_delete_main_branch() {
1393 let versioning = StreamVersioning::<TestEvent>::new(VersioningConfig::default());
1394
1395 let result = versioning.delete_branch("main").await;
1396 assert!(result.is_err());
1397 }
1398 #[tokio::test]
1399 async fn test_retention_policy_concurrency() {
1400 let config = VersioningConfig {
1402 max_versions: 1,
1403 ..Default::default()
1404 };
1405 let versioning = StreamVersioning::<TestEvent>::new(config);
1406
1407 versioning.create_version(vec![], "v1").await.unwrap();
1409
1410 versioning.create_version(vec![], "v2").await.unwrap();
1413 }
1414}