1use crate::{Document, DocumentChunk, Metadata, RragError, RragResult};
7use serde::{Deserialize, Serialize};
8use std::collections::hash_map::DefaultHasher;
9use std::collections::{HashMap, HashSet};
10use std::hash::Hasher;
11use tokio::sync::RwLock;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct ChangeDetectionConfig {
16 pub enable_content_hash: bool,
18
19 pub enable_metadata_detection: bool,
21
22 pub enable_timestamp_detection: bool,
24
25 pub enable_chunk_detection: bool,
27
28 pub hash_algorithm: HashAlgorithm,
30
31 pub sensitivity: ChangeSensitivity,
33
34 pub max_change_history: usize,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40pub enum HashAlgorithm {
41 Default,
42 Sha256,
43 Blake3,
44 Xxhash,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub enum ChangeSensitivity {
50 Low,
52 Medium,
54 High,
56 Strict,
58}
59
60impl Default for ChangeDetectionConfig {
61 fn default() -> Self {
62 Self {
63 enable_content_hash: true,
64 enable_metadata_detection: true,
65 enable_timestamp_detection: true,
66 enable_chunk_detection: true,
67 hash_algorithm: HashAlgorithm::Default,
68 sensitivity: ChangeSensitivity::Medium,
69 max_change_history: 1000,
70 }
71 }
72}
73
74#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
76pub enum ChangeType {
77 Added,
79 ContentChanged,
81 MetadataChanged,
83 Moved,
85 Deleted,
87 NoChange,
89 Multiple(Vec<ChangeType>),
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct ChangeResult {
96 pub change_type: ChangeType,
98
99 pub document_id: String,
101
102 pub previous_hash: Option<String>,
104
105 pub current_hash: String,
107
108 pub delta: ContentDelta,
110
111 pub metadata_changes: MetadataChanges,
113
114 pub timestamps: ChangeTimestamps,
116
117 pub chunk_changes: Vec<ChunkChange>,
119
120 pub confidence: f64,
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct ContentDelta {
127 pub added_chars: usize,
129
130 pub removed_chars: usize,
132
133 pub modified_chars: usize,
135
136 pub previous_size: usize,
138
139 pub current_size: usize,
141
142 pub change_percentage: f64,
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize)]
148pub struct MetadataChanges {
149 pub added_keys: Vec<String>,
151
152 pub removed_keys: Vec<String>,
154
155 pub modified_keys: Vec<String>,
157
158 pub previous_metadata: HashMap<String, serde_json::Value>,
160
161 pub current_metadata: HashMap<String, serde_json::Value>,
163}
164
165#[derive(Debug, Clone, Serialize, Deserialize)]
167pub struct ChangeTimestamps {
168 pub detected_at: chrono::DateTime<chrono::Utc>,
170
171 pub last_modified: Option<chrono::DateTime<chrono::Utc>>,
173
174 pub previous_check: Option<chrono::DateTime<chrono::Utc>>,
176
177 pub time_since_change: Option<chrono::Duration>,
179}
180
181#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct ChunkChange {
184 pub chunk_index: usize,
186
187 pub change_type: ChangeType,
189
190 pub previous_hash: Option<String>,
192
193 pub current_hash: String,
195
196 pub delta: ContentDelta,
198}
199
200#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct DocumentChange {
203 pub document_id: String,
205
206 pub change_result: ChangeResult,
208
209 pub version: u64,
211
212 pub source: String,
214
215 pub context: HashMap<String, serde_json::Value>,
217}
218
219pub struct ChangeDetector {
221 config: ChangeDetectionConfig,
223
224 document_cache: RwLock<HashMap<String, DocumentState>>,
226
227 change_history: RwLock<Vec<DocumentChange>>,
229
230 stats: RwLock<ChangeDetectionStats>,
232}
233
234#[derive(Debug, Clone)]
236struct DocumentState {
237 content_hash: String,
239
240 metadata_hash: String,
242
243 chunk_hashes: Vec<String>,
245
246 last_checked: chrono::DateTime<chrono::Utc>,
248
249 metadata_snapshot: Metadata,
251
252 content_size: usize,
254
255 version: u64,
257}
258
259#[derive(Debug, Clone, Serialize, Deserialize)]
261pub struct ChangeDetectionStats {
262 pub total_processed: u64,
264
265 pub changes_by_type: HashMap<String, u64>,
267
268 pub avg_processing_time_ms: f64,
270
271 pub cache_hit_rate: f64,
273
274 pub false_positive_rate: f64,
276
277 pub last_updated: chrono::DateTime<chrono::Utc>,
279}
280
281impl ChangeDetector {
282 pub async fn new(config: ChangeDetectionConfig) -> RragResult<Self> {
284 Ok(Self {
285 config,
286 document_cache: RwLock::new(HashMap::new()),
287 change_history: RwLock::new(Vec::new()),
288 stats: RwLock::new(ChangeDetectionStats {
289 total_processed: 0,
290 changes_by_type: HashMap::new(),
291 avg_processing_time_ms: 0.0,
292 cache_hit_rate: 0.0,
293 false_positive_rate: 0.0,
294 last_updated: chrono::Utc::now(),
295 }),
296 })
297 }
298
299 pub async fn detect_changes(&self, document: &Document) -> RragResult<ChangeResult> {
301 let start_time = std::time::Instant::now();
302
303 {
305 let mut stats = self.stats.write().await;
306 stats.total_processed += 1;
307 }
308
309 let current_state = self.compute_document_state(document, None).await?;
311
312 let cache = self.document_cache.read().await;
314 let previous_state = cache.get(&document.id);
315
316 let change_result = match previous_state {
317 Some(prev_state) => {
318 self.compare_states(&document.id, prev_state, ¤t_state)
319 .await?
320 }
321 None => {
322 ChangeResult {
324 change_type: ChangeType::Added,
325 document_id: document.id.clone(),
326 previous_hash: None,
327 current_hash: current_state.content_hash.clone(),
328 delta: ContentDelta {
329 added_chars: current_state.content_size,
330 removed_chars: 0,
331 modified_chars: 0,
332 previous_size: 0,
333 current_size: current_state.content_size,
334 change_percentage: 1.0,
335 },
336 metadata_changes: MetadataChanges {
337 added_keys: current_state.metadata_snapshot.keys().cloned().collect(),
338 removed_keys: Vec::new(),
339 modified_keys: Vec::new(),
340 previous_metadata: HashMap::new(),
341 current_metadata: current_state.metadata_snapshot.clone(),
342 },
343 timestamps: ChangeTimestamps {
344 detected_at: chrono::Utc::now(),
345 last_modified: Some(document.created_at),
346 previous_check: None,
347 time_since_change: None,
348 },
349 chunk_changes: Vec::new(),
350 confidence: 1.0,
351 }
352 }
353 };
354
355 drop(cache);
357 {
358 let mut cache = self.document_cache.write().await;
359 cache.insert(document.id.clone(), current_state.clone());
360 }
361
362 {
364 let mut stats = self.stats.write().await;
365 let change_type_str = format!("{:?}", change_result.change_type);
366 *stats.changes_by_type.entry(change_type_str).or_insert(0) += 1;
367
368 let processing_time = start_time.elapsed().as_millis() as f64;
369 stats.avg_processing_time_ms = (stats.avg_processing_time_ms + processing_time) / 2.0;
370 stats.last_updated = chrono::Utc::now();
371 }
372
373 if change_result.change_type != ChangeType::NoChange {
375 let document_change = DocumentChange {
376 document_id: document.id.clone(),
377 change_result: change_result.clone(),
378 version: current_state.version,
379 source: "change_detector".to_string(),
380 context: HashMap::new(),
381 };
382
383 let mut history = self.change_history.write().await;
384 history.push(document_change);
385
386 if history.len() > self.config.max_change_history {
388 history.remove(0);
389 }
390 }
391
392 Ok(change_result)
393 }
394
395 pub async fn detect_changes_with_chunks(
397 &self,
398 document: &Document,
399 chunks: &[DocumentChunk],
400 ) -> RragResult<ChangeResult> {
401 let _start_time = std::time::Instant::now();
402
403 let current_state = self.compute_document_state(document, Some(chunks)).await?;
405
406 let cache = self.document_cache.read().await;
408 let previous_state = cache.get(&document.id);
409
410 let mut change_result = match previous_state {
411 Some(prev_state) => {
412 self.compare_states(&document.id, prev_state, ¤t_state)
413 .await?
414 }
415 None => {
416 let chunk_changes: Vec<ChunkChange> = chunks
418 .iter()
419 .enumerate()
420 .map(|(i, chunk)| ChunkChange {
421 chunk_index: i,
422 change_type: ChangeType::Added,
423 previous_hash: None,
424 current_hash: current_state.chunk_hashes[i].clone(),
425 delta: ContentDelta {
426 added_chars: chunk.content.len(),
427 removed_chars: 0,
428 modified_chars: 0,
429 previous_size: 0,
430 current_size: chunk.content.len(),
431 change_percentage: 1.0,
432 },
433 })
434 .collect();
435
436 ChangeResult {
437 change_type: ChangeType::Added,
438 document_id: document.id.clone(),
439 previous_hash: None,
440 current_hash: current_state.content_hash.clone(),
441 delta: ContentDelta {
442 added_chars: current_state.content_size,
443 removed_chars: 0,
444 modified_chars: 0,
445 previous_size: 0,
446 current_size: current_state.content_size,
447 change_percentage: 1.0,
448 },
449 metadata_changes: MetadataChanges {
450 added_keys: current_state.metadata_snapshot.keys().cloned().collect(),
451 removed_keys: Vec::new(),
452 modified_keys: Vec::new(),
453 previous_metadata: HashMap::new(),
454 current_metadata: current_state.metadata_snapshot.clone(),
455 },
456 timestamps: ChangeTimestamps {
457 detected_at: chrono::Utc::now(),
458 last_modified: Some(document.created_at),
459 previous_check: None,
460 time_since_change: None,
461 },
462 chunk_changes,
463 confidence: 1.0,
464 }
465 }
466 };
467
468 if self.config.enable_chunk_detection && change_result.chunk_changes.is_empty() {
470 if let Some(prev_state) = previous_state {
471 change_result.chunk_changes = self
472 .analyze_chunk_changes(
473 &prev_state.chunk_hashes,
474 ¤t_state.chunk_hashes,
475 chunks,
476 )
477 .await?;
478 }
479 }
480
481 drop(cache);
483 {
484 let mut cache = self.document_cache.write().await;
485 cache.insert(document.id.clone(), current_state);
486 }
487
488 Ok(change_result)
489 }
490
491 pub async fn get_change_history(&self, document_id: &str) -> RragResult<Vec<DocumentChange>> {
493 let history = self.change_history.read().await;
494 Ok(history
495 .iter()
496 .filter(|change| change.document_id == document_id)
497 .cloned()
498 .collect())
499 }
500
501 pub async fn get_stats(&self) -> ChangeDetectionStats {
503 self.stats.read().await.clone()
504 }
505
506 pub async fn clear_history(&self) -> RragResult<()> {
508 let mut history = self.change_history.write().await;
509 history.clear();
510 Ok(())
511 }
512
513 pub async fn health_check(&self) -> RragResult<bool> {
515 let _cache = self.document_cache.read().await;
517 let _history = self.change_history.read().await;
518 let _stats = self.stats.read().await;
519 Ok(true)
520 }
521
522 async fn compute_document_state(
524 &self,
525 document: &Document,
526 chunks: Option<&[DocumentChunk]>,
527 ) -> RragResult<DocumentState> {
528 let content_hash = self.compute_hash(document.content_str()).await?;
530
531 let metadata_json = serde_json::to_string(&document.metadata).map_err(|e| {
533 RragError::serialization_with_message("document_metadata", e.to_string())
534 })?;
535 let metadata_hash = self.compute_hash(&metadata_json).await?;
536
537 let chunk_hashes = if let Some(chunks) = chunks {
539 let mut hashes = Vec::with_capacity(chunks.len());
540 for chunk in chunks {
541 let chunk_hash = self.compute_hash(&chunk.content).await?;
542 hashes.push(chunk_hash);
543 }
544 hashes
545 } else {
546 Vec::new()
547 };
548
549 Ok(DocumentState {
550 content_hash,
551 metadata_hash,
552 chunk_hashes,
553 last_checked: chrono::Utc::now(),
554 metadata_snapshot: document.metadata.clone(),
555 content_size: document.content_str().len(),
556 version: 1, })
558 }
559
560 async fn compare_states(
562 &self,
563 document_id: &str,
564 previous: &DocumentState,
565 current: &DocumentState,
566 ) -> RragResult<ChangeResult> {
567 let mut change_types = Vec::new();
568
569 if previous.content_hash != current.content_hash {
571 change_types.push(ChangeType::ContentChanged);
572 }
573
574 if previous.metadata_hash != current.metadata_hash {
576 change_types.push(ChangeType::MetadataChanged);
577 }
578
579 let change_type = match change_types.len() {
580 0 => ChangeType::NoChange,
581 1 => change_types.into_iter().next().unwrap(),
582 _ => ChangeType::Multiple(change_types),
583 };
584
585 let delta = self.compute_content_delta(previous, current).await?;
587
588 let metadata_changes = self
590 .compute_metadata_changes(&previous.metadata_snapshot, ¤t.metadata_snapshot)
591 .await?;
592
593 let confidence = self.compute_confidence(&change_type, &delta).await?;
595
596 Ok(ChangeResult {
597 change_type,
598 document_id: document_id.to_string(),
599 previous_hash: Some(previous.content_hash.clone()),
600 current_hash: current.content_hash.clone(),
601 delta,
602 metadata_changes,
603 timestamps: ChangeTimestamps {
604 detected_at: chrono::Utc::now(),
605 last_modified: None,
606 previous_check: Some(previous.last_checked),
607 time_since_change: Some(chrono::Utc::now() - previous.last_checked),
608 },
609 chunk_changes: Vec::new(), confidence,
611 })
612 }
613
614 async fn analyze_chunk_changes(
616 &self,
617 previous_hashes: &[String],
618 current_hashes: &[String],
619 current_chunks: &[DocumentChunk],
620 ) -> RragResult<Vec<ChunkChange>> {
621 let mut chunk_changes = Vec::new();
622
623 let max_len = std::cmp::max(previous_hashes.len(), current_hashes.len());
624
625 for i in 0..max_len {
626 let prev_hash = previous_hashes.get(i);
627 let curr_hash = current_hashes.get(i);
628 let chunk = current_chunks.get(i);
629
630 let (change_type, current_hash, delta) = match (prev_hash, curr_hash, chunk) {
631 (Some(prev), Some(curr), Some(chunk)) => {
632 if prev != curr {
633 let delta = ContentDelta {
634 added_chars: 0, removed_chars: 0,
636 modified_chars: chunk.content.len(),
637 previous_size: chunk.content.len(), current_size: chunk.content.len(),
639 change_percentage: 0.5, };
641 (ChangeType::ContentChanged, curr.clone(), delta)
642 } else {
643 continue; }
645 }
646 (None, Some(curr), Some(chunk)) => {
647 let delta = ContentDelta {
648 added_chars: chunk.content.len(),
649 removed_chars: 0,
650 modified_chars: 0,
651 previous_size: 0,
652 current_size: chunk.content.len(),
653 change_percentage: 1.0,
654 };
655 (ChangeType::Added, curr.clone(), delta)
656 }
657 (Some(_), None, _) => {
658 let delta = ContentDelta {
659 added_chars: 0,
660 removed_chars: 0, modified_chars: 0,
662 previous_size: 0,
663 current_size: 0,
664 change_percentage: 1.0,
665 };
666 (ChangeType::Deleted, String::new(), delta)
667 }
668 _ => continue,
669 };
670
671 chunk_changes.push(ChunkChange {
672 chunk_index: i,
673 change_type,
674 previous_hash: prev_hash.cloned(),
675 current_hash,
676 delta,
677 });
678 }
679
680 Ok(chunk_changes)
681 }
682
683 async fn compute_hash(&self, content: &str) -> RragResult<String> {
685 let normalized_content = match self.config.sensitivity {
686 ChangeSensitivity::Low => {
687 content
689 .chars()
690 .filter(|c| !c.is_whitespace())
691 .collect::<String>()
692 .to_lowercase()
693 }
694 ChangeSensitivity::Medium => {
695 content
697 .split_whitespace()
698 .collect::<Vec<_>>()
699 .join(" ")
700 .to_lowercase()
701 }
702 ChangeSensitivity::High => {
703 content.to_lowercase()
705 }
706 ChangeSensitivity::Strict => {
707 content.to_string()
709 }
710 };
711
712 match self.config.hash_algorithm {
713 HashAlgorithm::Default => {
714 let mut hasher = DefaultHasher::new();
715 hasher.write(normalized_content.as_bytes());
716 Ok(format!("{:x}", hasher.finish()))
717 }
718 HashAlgorithm::Sha256 => {
719 let mut hasher = DefaultHasher::new();
721 hasher.write(normalized_content.as_bytes());
722 Ok(format!("sha256:{:x}", hasher.finish()))
723 }
724 HashAlgorithm::Blake3 => {
725 let mut hasher = DefaultHasher::new();
727 hasher.write(normalized_content.as_bytes());
728 Ok(format!("blake3:{:x}", hasher.finish()))
729 }
730 HashAlgorithm::Xxhash => {
731 let mut hasher = DefaultHasher::new();
733 hasher.write(normalized_content.as_bytes());
734 Ok(format!("xxhash:{:x}", hasher.finish()))
735 }
736 }
737 }
738
739 async fn compute_content_delta(
741 &self,
742 previous: &DocumentState,
743 current: &DocumentState,
744 ) -> RragResult<ContentDelta> {
745 let size_diff = current.content_size as i64 - previous.content_size as i64;
746
747 let (added_chars, removed_chars) = if size_diff > 0 {
748 (size_diff as usize, 0)
749 } else {
750 (0, (-size_diff) as usize)
751 };
752
753 let change_percentage = if previous.content_size == 0 {
754 1.0
755 } else {
756 (size_diff.abs() as f64) / (previous.content_size as f64)
757 };
758
759 Ok(ContentDelta {
760 added_chars,
761 removed_chars,
762 modified_chars: std::cmp::min(previous.content_size, current.content_size),
763 previous_size: previous.content_size,
764 current_size: current.content_size,
765 change_percentage: change_percentage.min(1.0),
766 })
767 }
768
769 async fn compute_metadata_changes(
771 &self,
772 previous: &Metadata,
773 current: &Metadata,
774 ) -> RragResult<MetadataChanges> {
775 let prev_keys: HashSet<String> = previous.keys().cloned().collect();
776 let curr_keys: HashSet<String> = current.keys().cloned().collect();
777
778 let added_keys: Vec<String> = curr_keys.difference(&prev_keys).cloned().collect();
779 let removed_keys: Vec<String> = prev_keys.difference(&curr_keys).cloned().collect();
780
781 let mut modified_keys = Vec::new();
782 for key in prev_keys.intersection(&curr_keys) {
783 if previous.get(key) != current.get(key) {
784 modified_keys.push(key.clone());
785 }
786 }
787
788 Ok(MetadataChanges {
789 added_keys,
790 removed_keys,
791 modified_keys,
792 previous_metadata: previous.clone(),
793 current_metadata: current.clone(),
794 })
795 }
796
797 async fn compute_confidence(
799 &self,
800 change_type: &ChangeType,
801 delta: &ContentDelta,
802 ) -> RragResult<f64> {
803 let base_confidence = match change_type {
804 ChangeType::Added | ChangeType::Deleted => 1.0,
805 ChangeType::NoChange => 1.0,
806 ChangeType::ContentChanged => {
807 0.7 + (delta.change_percentage * 0.3)
809 }
810 ChangeType::MetadataChanged => 0.8,
811 ChangeType::Moved => 0.9,
812 ChangeType::Multiple(_) => 0.9,
813 };
814
815 Ok(base_confidence)
816 }
817}
818
819#[cfg(test)]
820mod tests {
821 use super::*;
822
823 #[tokio::test]
824 async fn test_change_detector_creation() {
825 let config = ChangeDetectionConfig::default();
826 let detector = ChangeDetector::new(config).await.unwrap();
827 assert!(detector.health_check().await.unwrap());
828 }
829
830 #[tokio::test]
831 async fn test_new_document_detection() {
832 let detector = ChangeDetector::new(ChangeDetectionConfig::default())
833 .await
834 .unwrap();
835 let doc = Document::new("Test content");
836
837 let result = detector.detect_changes(&doc).await.unwrap();
838 assert_eq!(result.change_type, ChangeType::Added);
839 assert_eq!(result.document_id, doc.id);
840 assert!(result.delta.added_chars > 0);
841 }
842
843 #[tokio::test]
844 async fn test_no_change_detection() {
845 let detector = ChangeDetector::new(ChangeDetectionConfig::default())
846 .await
847 .unwrap();
848 let doc = Document::new("Test content");
849
850 let result1 = detector.detect_changes(&doc).await.unwrap();
852 assert_eq!(result1.change_type, ChangeType::Added);
853
854 let result2 = detector.detect_changes(&doc).await.unwrap();
856 assert_eq!(result2.change_type, ChangeType::NoChange);
857 }
858
859 #[tokio::test]
860 async fn test_content_change_detection() {
861 let detector = ChangeDetector::new(ChangeDetectionConfig::default())
862 .await
863 .unwrap();
864 let doc1 = Document::with_id("test", "Original content");
865 let doc2 = Document::with_id("test", "Modified content");
866
867 detector.detect_changes(&doc1).await.unwrap();
869
870 let result = detector.detect_changes(&doc2).await.unwrap();
872 assert_eq!(result.change_type, ChangeType::ContentChanged);
873 assert!(result.delta.change_percentage > 0.0);
874 }
875
876 #[tokio::test]
877 async fn test_metadata_change_detection() {
878 let detector = ChangeDetector::new(ChangeDetectionConfig::default())
879 .await
880 .unwrap();
881 let doc1 = Document::with_id("test", "Same content")
882 .with_metadata("key1", serde_json::Value::String("value1".to_string()));
883 let doc2 = Document::with_id("test", "Same content")
884 .with_metadata("key1", serde_json::Value::String("value2".to_string()));
885
886 detector.detect_changes(&doc1).await.unwrap();
888
889 let result = detector.detect_changes(&doc2).await.unwrap();
891 assert_eq!(result.change_type, ChangeType::MetadataChanged);
892 assert!(!result.metadata_changes.modified_keys.is_empty());
893 }
894
895 #[test]
896 fn test_hash_algorithms() {
897 let config_default = ChangeDetectionConfig {
899 hash_algorithm: HashAlgorithm::Default,
900 ..Default::default()
901 };
902 let config_sha256 = ChangeDetectionConfig {
903 hash_algorithm: HashAlgorithm::Sha256,
904 ..Default::default()
905 };
906
907 assert_ne!(
908 format!("{:?}", config_default.hash_algorithm),
909 format!("{:?}", config_sha256.hash_algorithm)
910 );
911 }
912
913 #[test]
914 fn test_change_sensitivity() {
915 let sensitivities = [
916 ChangeSensitivity::Low,
917 ChangeSensitivity::Medium,
918 ChangeSensitivity::High,
919 ChangeSensitivity::Strict,
920 ];
921
922 for (i, sens1) in sensitivities.iter().enumerate() {
924 for (j, sens2) in sensitivities.iter().enumerate() {
925 if i != j {
926 assert_ne!(format!("{:?}", sens1), format!("{:?}", sens2));
927 }
928 }
929 }
930 }
931}