1use crate::{Document, Metadata, RragResult};
7use serde::{Deserialize, Serialize};
8use std::collections::{HashMap, VecDeque};
9use std::sync::Arc;
10use tokio::sync::RwLock;
11use uuid::Uuid;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct VersioningConfig {
16 pub max_versions_per_document: usize,
18
19 pub enable_auto_cleanup: bool,
21
22 pub retention_period_days: u32,
24
25 pub conflict_detection: ConflictDetectionStrategy,
27
28 pub default_resolution: ResolutionStrategy,
30
31 pub enable_version_compression: bool,
33
34 pub enable_change_tracking: bool,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40pub enum ConflictDetectionStrategy {
41 VersionNumber,
43 Timestamp,
45 ContentHash,
47 VersionVector,
49 Custom(String),
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
55pub enum ResolutionStrategy {
56 KeepNewer,
58 KeepOlder,
60 Merge,
62 Manual,
64 MetadataPriority(String),
66 Custom(String),
68}
69
70impl Default for VersioningConfig {
71 fn default() -> Self {
72 Self {
73 max_versions_per_document: 10,
74 enable_auto_cleanup: true,
75 retention_period_days: 30,
76 conflict_detection: ConflictDetectionStrategy::Timestamp,
77 default_resolution: ResolutionStrategy::KeepNewer,
78 enable_version_compression: true,
79 enable_change_tracking: true,
80 }
81 }
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct DocumentVersion {
87 pub version_id: String,
89
90 pub document_id: String,
92
93 pub version_number: u64,
95
96 pub created_at: chrono::DateTime<chrono::Utc>,
98
99 pub author: String,
101
102 pub content_hash: String,
104
105 pub metadata_hash: String,
107
108 pub tags: Vec<String>,
110
111 pub change_summary: Option<ChangeSummary>,
113
114 pub parent_version: Option<String>,
116
117 pub branch: String,
119
120 pub metadata: Metadata,
122
123 pub compressed_data: Option<Vec<u8>>,
125
126 pub data_size_bytes: u64,
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct ChangeSummary {
133 pub change_type: ChangeType,
135
136 pub additions: usize,
138
139 pub deletions: usize,
141
142 pub modifications: usize,
144
145 pub affected_sections: Vec<String>,
147
148 pub description: Option<String>,
150}
151
152#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
154pub enum ChangeType {
155 Major,
157 Minor,
159 Metadata,
161 Formatting,
163 Restructure,
165 Initial,
167}
168
169#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct VersionConflict {
172 pub conflict_id: String,
174
175 pub document_id: String,
177
178 pub conflicting_versions: Vec<String>,
180
181 pub detected_at: chrono::DateTime<chrono::Utc>,
183
184 pub conflict_type: ConflictType,
186
187 pub resolution_status: ResolutionStatus,
189
190 pub auto_resolution: Option<VersionResolution>,
192
193 pub manual_resolution: Option<VersionResolution>,
195
196 pub context: HashMap<String, serde_json::Value>,
198}
199
200#[derive(Debug, Clone, Serialize, Deserialize)]
202pub enum ConflictType {
203 ConcurrentModification,
205 VersionMismatch,
207 TimestampInconsistency,
209 ContentMismatch,
211 BranchMergeConflict,
213 DependencyConflict,
215}
216
217#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
219pub enum ResolutionStatus {
220 Unresolved,
222 AutoResolved,
224 ManuallyResolved,
226 InProgress,
228 Failed,
230}
231
232#[derive(Debug, Clone, Serialize, Deserialize)]
234pub struct VersionResolution {
235 pub strategy: ResolutionStrategy,
237
238 pub chosen_version: String,
240
241 pub resolved_at: chrono::DateTime<chrono::Utc>,
243
244 pub resolved_by: String,
246
247 pub notes: Option<String>,
249
250 pub merged_content: Option<Document>,
252
253 pub metadata: Metadata,
255}
256
257#[derive(Debug, Clone, Serialize, Deserialize)]
259pub struct VersionHistory {
260 pub document_id: String,
262
263 pub versions: VecDeque<DocumentVersion>,
265
266 pub current_version: String,
268
269 pub branches: HashMap<String, String>, pub version_tree: HashMap<String, Vec<String>>, pub metadata: Metadata,
277
278 pub last_updated: chrono::DateTime<chrono::Utc>,
280}
281
282pub struct VersionManager {
284 config: VersioningConfig,
286
287 version_histories: Arc<RwLock<HashMap<String, VersionHistory>>>,
289
290 conflicts: Arc<RwLock<HashMap<String, VersionConflict>>>,
292
293 stats: Arc<RwLock<VersionStats>>,
295
296 task_handles: Arc<tokio::sync::Mutex<Vec<tokio::task::JoinHandle<()>>>>,
298}
299
300#[derive(Debug, Clone, Serialize, Deserialize)]
302pub struct VersionStats {
303 pub total_documents: usize,
305
306 pub total_versions: u64,
308
309 pub avg_versions_per_document: f64,
311
312 pub total_conflicts: u64,
314
315 pub auto_resolved_conflicts: u64,
317
318 pub manually_resolved_conflicts: u64,
320
321 pub unresolved_conflicts: usize,
323
324 pub storage_usage_bytes: u64,
326
327 pub compression_ratio: f64,
329
330 pub last_updated: chrono::DateTime<chrono::Utc>,
332}
333
334impl VersionManager {
335 pub async fn new(config: VersioningConfig) -> RragResult<Self> {
337 let manager = Self {
338 config,
339 version_histories: Arc::new(RwLock::new(HashMap::new())),
340 conflicts: Arc::new(RwLock::new(HashMap::new())),
341 stats: Arc::new(RwLock::new(VersionStats {
342 total_documents: 0,
343 total_versions: 0,
344 avg_versions_per_document: 0.0,
345 total_conflicts: 0,
346 auto_resolved_conflicts: 0,
347 manually_resolved_conflicts: 0,
348 unresolved_conflicts: 0,
349 storage_usage_bytes: 0,
350 compression_ratio: 1.0,
351 last_updated: chrono::Utc::now(),
352 })),
353 task_handles: Arc::new(tokio::sync::Mutex::new(Vec::new())),
354 };
355
356 manager.start_background_tasks().await?;
357 Ok(manager)
358 }
359
360 pub async fn create_version(
362 &self,
363 document: &Document,
364 author: &str,
365 change_type: ChangeType,
366 branch: Option<&str>,
367 ) -> RragResult<DocumentVersion> {
368 let version_id = Uuid::new_v4().to_string();
369 let branch = branch.unwrap_or("main").to_string();
370
371 let mut histories = self.version_histories.write().await;
373 let history = histories
374 .entry(document.id.clone())
375 .or_insert_with(|| VersionHistory {
376 document_id: document.id.clone(),
377 versions: VecDeque::new(),
378 current_version: version_id.clone(),
379 branches: HashMap::new(),
380 version_tree: HashMap::new(),
381 metadata: HashMap::new(),
382 last_updated: chrono::Utc::now(),
383 });
384
385 let version_number = history.versions.len() as u64 + 1;
387
388 let parent_version = history.branches.get(&branch).cloned();
390
391 let content_hash = self.compute_hash(document.content_str()).await?;
393 let metadata_json = serde_json::to_string(&document.metadata)?;
394 let metadata_hash = self.compute_hash(&metadata_json).await?;
395
396 let change_summary = if self.config.enable_change_tracking {
398 self.compute_change_summary(document, &parent_version, change_type.clone(), history)
399 .await?
400 } else {
401 None
402 };
403
404 let (compressed_data, data_size) = if self.config.enable_version_compression {
406 let data = serde_json::to_vec(document)?;
407 let compressed = self.compress_data(&data).await?;
408 let size = compressed.len() as u64;
409 (Some(compressed), size)
410 } else {
411 let data = serde_json::to_vec(document)?;
412 (None, data.len() as u64)
413 };
414
415 let version = DocumentVersion {
417 version_id: version_id.clone(),
418 document_id: document.id.clone(),
419 version_number,
420 created_at: chrono::Utc::now(),
421 author: author.to_string(),
422 content_hash,
423 metadata_hash,
424 tags: Vec::new(),
425 change_summary,
426 parent_version: parent_version.clone(),
427 branch: branch.clone(),
428 metadata: document.metadata.clone(),
429 compressed_data,
430 data_size_bytes: data_size,
431 };
432
433 history.versions.push_back(version.clone());
435 history.current_version = version_id.clone();
436 history.branches.insert(branch, version_id.clone());
437 history.last_updated = chrono::Utc::now();
438
439 if let Some(parent) = &parent_version {
441 history
442 .version_tree
443 .entry(parent.clone())
444 .or_insert_with(Vec::new)
445 .push(version_id.clone());
446 }
447
448 if history.versions.len() > self.config.max_versions_per_document {
450 history.versions.pop_front();
451 }
452
453 {
455 let mut stats = self.stats.write().await;
456 stats.total_versions += 1;
457 stats.total_documents = histories.len();
458 stats.avg_versions_per_document =
459 stats.total_versions as f64 / stats.total_documents as f64;
460 stats.storage_usage_bytes += data_size;
461 stats.last_updated = chrono::Utc::now();
462 }
463
464 Ok(version)
465 }
466
467 pub async fn get_version(
469 &self,
470 document_id: &str,
471 version_id: &str,
472 ) -> RragResult<Option<DocumentVersion>> {
473 let histories = self.version_histories.read().await;
474
475 if let Some(history) = histories.get(document_id) {
476 for version in &history.versions {
477 if version.version_id == version_id {
478 return Ok(Some(version.clone()));
479 }
480 }
481 }
482
483 Ok(None)
484 }
485
486 pub async fn get_current_version(
488 &self,
489 document_id: &str,
490 ) -> RragResult<Option<DocumentVersion>> {
491 let histories = self.version_histories.read().await;
492
493 if let Some(history) = histories.get(document_id) {
494 return self
495 .get_version(document_id, &history.current_version)
496 .await;
497 }
498
499 Ok(None)
500 }
501
502 pub async fn get_version_history(
504 &self,
505 document_id: &str,
506 ) -> RragResult<Option<VersionHistory>> {
507 let histories = self.version_histories.read().await;
508 Ok(histories.get(document_id).cloned())
509 }
510
511 pub async fn detect_conflicts(
513 &self,
514 document: &Document,
515 expected_version: Option<&str>,
516 ) -> RragResult<Option<VersionConflict>> {
517 let histories = self.version_histories.read().await;
518
519 if let Some(history) = histories.get(&document.id) {
520 if let Some(expected) = expected_version {
521 if history.current_version != expected {
522 let conflict_id = Uuid::new_v4().to_string();
524 let conflict = VersionConflict {
525 conflict_id,
526 document_id: document.id.clone(),
527 conflicting_versions: vec![
528 history.current_version.clone(),
529 expected.to_string(),
530 ],
531 detected_at: chrono::Utc::now(),
532 conflict_type: ConflictType::VersionMismatch,
533 resolution_status: ResolutionStatus::Unresolved,
534 auto_resolution: None,
535 manual_resolution: None,
536 context: HashMap::new(),
537 };
538
539 return Ok(Some(conflict));
540 }
541 }
542 }
543
544 Ok(None)
545 }
546
547 pub async fn resolve_conflict(
549 &self,
550 conflict_id: &str,
551 resolution: VersionResolution,
552 ) -> RragResult<bool> {
553 let mut conflicts = self.conflicts.write().await;
554
555 if let Some(conflict) = conflicts.get_mut(conflict_id) {
556 conflict.manual_resolution = Some(resolution);
557 conflict.resolution_status = ResolutionStatus::ManuallyResolved;
558
559 {
561 let mut stats = self.stats.write().await;
562 stats.manually_resolved_conflicts += 1;
563 stats.unresolved_conflicts = conflicts
564 .values()
565 .filter(|c| c.resolution_status == ResolutionStatus::Unresolved)
566 .count();
567 }
568
569 return Ok(true);
570 }
571
572 Ok(false)
573 }
574
575 pub async fn get_unresolved_conflicts(&self) -> RragResult<Vec<VersionConflict>> {
577 let conflicts = self.conflicts.read().await;
578 Ok(conflicts
579 .values()
580 .filter(|c| c.resolution_status == ResolutionStatus::Unresolved)
581 .cloned()
582 .collect())
583 }
584
585 pub async fn get_stats(&self) -> VersionStats {
587 self.stats.read().await.clone()
588 }
589
590 pub async fn health_check(&self) -> RragResult<bool> {
592 let handles = self.task_handles.lock().await;
593 let all_running = handles.iter().all(|handle| !handle.is_finished());
594
595 let stats = self.get_stats().await;
596 let healthy_stats = stats.unresolved_conflicts < 1000; Ok(all_running && healthy_stats)
599 }
600
601 async fn start_background_tasks(&self) -> RragResult<()> {
603 let mut handles = self.task_handles.lock().await;
604
605 if self.config.enable_auto_cleanup {
606 handles.push(self.start_cleanup_task().await);
607 }
608
609 handles.push(self.start_conflict_auto_resolution_task().await);
610
611 Ok(())
612 }
613
614 async fn start_cleanup_task(&self) -> tokio::task::JoinHandle<()> {
616 let version_histories = Arc::clone(&self.version_histories);
617 let config = self.config.clone();
618
619 tokio::spawn(async move {
620 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(3600)); loop {
623 interval.tick().await;
624
625 let cutoff_date = chrono::Utc::now()
626 - chrono::Duration::days(config.retention_period_days as i64);
627 let mut histories = version_histories.write().await;
628
629 for history in histories.values_mut() {
630 history
631 .versions
632 .retain(|version| version.created_at > cutoff_date);
633 }
634 }
635 })
636 }
637
638 async fn start_conflict_auto_resolution_task(&self) -> tokio::task::JoinHandle<()> {
640 let conflicts = Arc::clone(&self.conflicts);
641 let stats = Arc::clone(&self.stats);
642 let config = self.config.clone();
643
644 tokio::spawn(async move {
645 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(300)); loop {
648 interval.tick().await;
649
650 let mut conflicts_guard = conflicts.write().await;
651 let mut resolved_count = 0;
652
653 for conflict in conflicts_guard.values_mut() {
654 if conflict.resolution_status == ResolutionStatus::Unresolved {
655 if let Some(auto_resolution) =
657 Self::apply_auto_resolution(conflict, &config.default_resolution).await
658 {
659 conflict.auto_resolution = Some(auto_resolution);
660 conflict.resolution_status = ResolutionStatus::AutoResolved;
661 resolved_count += 1;
662 }
663 }
664 }
665
666 if resolved_count > 0 {
668 let mut stats_guard = stats.write().await;
669 stats_guard.auto_resolved_conflicts += resolved_count;
670 stats_guard.unresolved_conflicts = conflicts_guard
671 .values()
672 .filter(|c| c.resolution_status == ResolutionStatus::Unresolved)
673 .count();
674 }
675 }
676 })
677 }
678
679 async fn apply_auto_resolution(
681 conflict: &VersionConflict,
682 strategy: &ResolutionStrategy,
683 ) -> Option<VersionResolution> {
684 match strategy {
685 ResolutionStrategy::KeepNewer => {
686 Some(VersionResolution {
689 strategy: strategy.clone(),
690 chosen_version: conflict.conflicting_versions[0].clone(), resolved_at: chrono::Utc::now(),
692 resolved_by: "auto_resolver".to_string(),
693 notes: Some("Automatically resolved by keeping newer version".to_string()),
694 merged_content: None,
695 metadata: HashMap::new(),
696 })
697 }
698 ResolutionStrategy::KeepOlder => Some(VersionResolution {
699 strategy: strategy.clone(),
700 chosen_version: conflict.conflicting_versions.last().unwrap().clone(),
701 resolved_at: chrono::Utc::now(),
702 resolved_by: "auto_resolver".to_string(),
703 notes: Some("Automatically resolved by keeping older version".to_string()),
704 merged_content: None,
705 metadata: HashMap::new(),
706 }),
707 ResolutionStrategy::Manual => None, _ => None, }
710 }
711
712 async fn compute_hash(&self, content: &str) -> RragResult<String> {
714 use std::collections::hash_map::DefaultHasher;
715 use std::hash::{Hash, Hasher};
716
717 let mut hasher = DefaultHasher::new();
718 content.hash(&mut hasher);
719 Ok(format!("{:x}", hasher.finish()))
720 }
721
722 async fn compute_change_summary(
724 &self,
725 document: &Document,
726 parent_version_id: &Option<String>,
727 change_type: ChangeType,
728 history: &VersionHistory,
729 ) -> RragResult<Option<ChangeSummary>> {
730 if let Some(parent_id) = parent_version_id {
731 if let Some(parent_version) =
733 history.versions.iter().find(|v| v.version_id == *parent_id)
734 {
735 let current_size = document.content_str().len();
737 let estimated_previous_size = parent_version.data_size_bytes as usize;
738
739 let (additions, deletions) = if current_size > estimated_previous_size {
740 (current_size - estimated_previous_size, 0)
741 } else {
742 (0, estimated_previous_size - current_size)
743 };
744
745 return Ok(Some(ChangeSummary {
746 change_type,
747 additions,
748 deletions,
749 modifications: std::cmp::min(current_size, estimated_previous_size),
750 affected_sections: Vec::new(), description: None,
752 }));
753 }
754 }
755
756 Ok(Some(ChangeSummary {
758 change_type: ChangeType::Initial,
759 additions: document.content_str().len(),
760 deletions: 0,
761 modifications: 0,
762 affected_sections: Vec::new(),
763 description: Some("Initial version".to_string()),
764 }))
765 }
766
767 async fn compress_data(&self, data: &[u8]) -> RragResult<Vec<u8>> {
769 Ok(data.to_vec())
771 }
772}
773
774#[cfg(test)]
775mod tests {
776 use super::*;
777 use crate::Document;
778
779 #[tokio::test]
780 async fn test_version_manager_creation() {
781 let config = VersioningConfig::default();
782 let manager = VersionManager::new(config).await.unwrap();
783 assert!(manager.health_check().await.unwrap());
784 }
785
786 #[tokio::test]
787 async fn test_create_version() {
788 let manager = VersionManager::new(VersioningConfig::default())
789 .await
790 .unwrap();
791 let doc = Document::new("Test content");
792
793 let version = manager
794 .create_version(&doc, "test_author", ChangeType::Initial, None)
795 .await
796 .unwrap();
797
798 assert_eq!(version.document_id, doc.id);
799 assert_eq!(version.version_number, 1);
800 assert_eq!(version.author, "test_author");
801 assert_eq!(version.branch, "main");
802 }
803
804 #[tokio::test]
805 async fn test_version_retrieval() {
806 let manager = VersionManager::new(VersioningConfig::default())
807 .await
808 .unwrap();
809 let doc = Document::new("Test content");
810
811 let version = manager
812 .create_version(&doc, "test_author", ChangeType::Initial, None)
813 .await
814 .unwrap();
815
816 let retrieved = manager
818 .get_version(&doc.id, &version.version_id)
819 .await
820 .unwrap()
821 .unwrap();
822 assert_eq!(retrieved.version_id, version.version_id);
823
824 let current = manager.get_current_version(&doc.id).await.unwrap().unwrap();
826 assert_eq!(current.version_id, version.version_id);
827 }
828
829 #[tokio::test]
830 async fn test_multiple_versions() {
831 let manager = VersionManager::new(VersioningConfig::default())
832 .await
833 .unwrap();
834 let doc1 = Document::with_id("test_doc", "Initial content");
835 let doc2 = Document::with_id("test_doc", "Updated content");
836
837 let version1 = manager
839 .create_version(&doc1, "author1", ChangeType::Initial, None)
840 .await
841 .unwrap();
842 assert_eq!(version1.version_number, 1);
843
844 let version2 = manager
846 .create_version(&doc2, "author2", ChangeType::Major, None)
847 .await
848 .unwrap();
849 assert_eq!(version2.version_number, 2);
850
851 let current = manager
853 .get_current_version("test_doc")
854 .await
855 .unwrap()
856 .unwrap();
857 assert_eq!(current.version_id, version2.version_id);
858
859 let history = manager
861 .get_version_history("test_doc")
862 .await
863 .unwrap()
864 .unwrap();
865 assert_eq!(history.versions.len(), 2);
866 }
867
868 #[tokio::test]
869 async fn test_conflict_detection() {
870 let manager = VersionManager::new(VersioningConfig::default())
871 .await
872 .unwrap();
873 let doc = Document::new("Test content");
874
875 let version1 = manager
877 .create_version(&doc, "author1", ChangeType::Initial, None)
878 .await
879 .unwrap();
880
881 let conflict = manager
883 .detect_conflicts(&doc, Some("wrong_version_id"))
884 .await
885 .unwrap();
886 assert!(conflict.is_some());
887
888 let conflict = conflict.unwrap();
889 assert_eq!(conflict.document_id, doc.id);
890 assert_eq!(conflict.conflict_type, ConflictType::VersionMismatch);
891
892 let no_conflict = manager
894 .detect_conflicts(&doc, Some(&version1.version_id))
895 .await
896 .unwrap();
897 assert!(no_conflict.is_none());
898 }
899
900 #[test]
901 fn test_change_types() {
902 let change_types = vec![
903 ChangeType::Major,
904 ChangeType::Minor,
905 ChangeType::Metadata,
906 ChangeType::Formatting,
907 ChangeType::Restructure,
908 ChangeType::Initial,
909 ];
910
911 for (i, type1) in change_types.iter().enumerate() {
913 for (j, type2) in change_types.iter().enumerate() {
914 if i != j {
915 assert_ne!(type1, type2);
916 }
917 }
918 }
919 }
920
921 #[test]
922 fn test_resolution_strategies() {
923 let strategies = vec![
924 ResolutionStrategy::KeepNewer,
925 ResolutionStrategy::KeepOlder,
926 ResolutionStrategy::Merge,
927 ResolutionStrategy::Manual,
928 ResolutionStrategy::MetadataPriority("priority".to_string()),
929 ResolutionStrategy::Custom("custom_logic".to_string()),
930 ];
931
932 for (i, strategy1) in strategies.iter().enumerate() {
934 for (j, strategy2) in strategies.iter().enumerate() {
935 if i != j {
936 assert_ne!(format!("{:?}", strategy1), format!("{:?}", strategy2));
937 }
938 }
939 }
940 }
941}