use crate::{Document, DocumentChunk, Metadata, RragError, RragResult};
use serde::{Deserialize, Serialize};
use std::collections::hash_map::DefaultHasher;
use std::collections::{HashMap, HashSet};
use std::hash::Hasher;
use tokio::sync::RwLock;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChangeDetectionConfig {
pub enable_content_hash: bool,
pub enable_metadata_detection: bool,
pub enable_timestamp_detection: bool,
pub enable_chunk_detection: bool,
pub hash_algorithm: HashAlgorithm,
pub sensitivity: ChangeSensitivity,
pub max_change_history: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum HashAlgorithm {
Default,
Sha256,
Blake3,
Xxhash,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ChangeSensitivity {
Low,
Medium,
High,
Strict,
}
impl Default for ChangeDetectionConfig {
fn default() -> Self {
Self {
enable_content_hash: true,
enable_metadata_detection: true,
enable_timestamp_detection: true,
enable_chunk_detection: true,
hash_algorithm: HashAlgorithm::Default,
sensitivity: ChangeSensitivity::Medium,
max_change_history: 1000,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum ChangeType {
Added,
ContentChanged,
MetadataChanged,
Moved,
Deleted,
NoChange,
Multiple(Vec<ChangeType>),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChangeResult {
pub change_type: ChangeType,
pub document_id: String,
pub previous_hash: Option<String>,
pub current_hash: String,
pub delta: ContentDelta,
pub metadata_changes: MetadataChanges,
pub timestamps: ChangeTimestamps,
pub chunk_changes: Vec<ChunkChange>,
pub confidence: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ContentDelta {
pub added_chars: usize,
pub removed_chars: usize,
pub modified_chars: usize,
pub previous_size: usize,
pub current_size: usize,
pub change_percentage: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetadataChanges {
pub added_keys: Vec<String>,
pub removed_keys: Vec<String>,
pub modified_keys: Vec<String>,
pub previous_metadata: HashMap<String, serde_json::Value>,
pub current_metadata: HashMap<String, serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChangeTimestamps {
pub detected_at: chrono::DateTime<chrono::Utc>,
pub last_modified: Option<chrono::DateTime<chrono::Utc>>,
pub previous_check: Option<chrono::DateTime<chrono::Utc>>,
pub time_since_change: Option<chrono::Duration>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChunkChange {
pub chunk_index: usize,
pub change_type: ChangeType,
pub previous_hash: Option<String>,
pub current_hash: String,
pub delta: ContentDelta,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DocumentChange {
pub document_id: String,
pub change_result: ChangeResult,
pub version: u64,
pub source: String,
pub context: HashMap<String, serde_json::Value>,
}
pub struct ChangeDetector {
config: ChangeDetectionConfig,
document_cache: RwLock<HashMap<String, DocumentState>>,
change_history: RwLock<Vec<DocumentChange>>,
stats: RwLock<ChangeDetectionStats>,
}
#[derive(Debug, Clone)]
struct DocumentState {
content_hash: String,
metadata_hash: String,
chunk_hashes: Vec<String>,
last_checked: chrono::DateTime<chrono::Utc>,
metadata_snapshot: Metadata,
content_size: usize,
version: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChangeDetectionStats {
pub total_processed: u64,
pub changes_by_type: HashMap<String, u64>,
pub avg_processing_time_ms: f64,
pub cache_hit_rate: f64,
pub false_positive_rate: f64,
pub last_updated: chrono::DateTime<chrono::Utc>,
}
impl ChangeDetector {
pub async fn new(config: ChangeDetectionConfig) -> RragResult<Self> {
Ok(Self {
config,
document_cache: RwLock::new(HashMap::new()),
change_history: RwLock::new(Vec::new()),
stats: RwLock::new(ChangeDetectionStats {
total_processed: 0,
changes_by_type: HashMap::new(),
avg_processing_time_ms: 0.0,
cache_hit_rate: 0.0,
false_positive_rate: 0.0,
last_updated: chrono::Utc::now(),
}),
})
}
pub async fn detect_changes(&self, document: &Document) -> RragResult<ChangeResult> {
let start_time = std::time::Instant::now();
{
let mut stats = self.stats.write().await;
stats.total_processed += 1;
}
let current_state = self.compute_document_state(document, None).await?;
let cache = self.document_cache.read().await;
let previous_state = cache.get(&document.id);
let change_result = match previous_state {
Some(prev_state) => {
self.compare_states(&document.id, prev_state, ¤t_state)
.await?
}
None => {
ChangeResult {
change_type: ChangeType::Added,
document_id: document.id.clone(),
previous_hash: None,
current_hash: current_state.content_hash.clone(),
delta: ContentDelta {
added_chars: current_state.content_size,
removed_chars: 0,
modified_chars: 0,
previous_size: 0,
current_size: current_state.content_size,
change_percentage: 1.0,
},
metadata_changes: MetadataChanges {
added_keys: current_state.metadata_snapshot.keys().cloned().collect(),
removed_keys: Vec::new(),
modified_keys: Vec::new(),
previous_metadata: HashMap::new(),
current_metadata: current_state.metadata_snapshot.clone(),
},
timestamps: ChangeTimestamps {
detected_at: chrono::Utc::now(),
last_modified: Some(document.created_at),
previous_check: None,
time_since_change: None,
},
chunk_changes: Vec::new(),
confidence: 1.0,
}
}
};
drop(cache);
{
let mut cache = self.document_cache.write().await;
cache.insert(document.id.clone(), current_state.clone());
}
{
let mut stats = self.stats.write().await;
let change_type_str = format!("{:?}", change_result.change_type);
*stats.changes_by_type.entry(change_type_str).or_insert(0) += 1;
let processing_time = start_time.elapsed().as_millis() as f64;
stats.avg_processing_time_ms = (stats.avg_processing_time_ms + processing_time) / 2.0;
stats.last_updated = chrono::Utc::now();
}
if change_result.change_type != ChangeType::NoChange {
let document_change = DocumentChange {
document_id: document.id.clone(),
change_result: change_result.clone(),
version: current_state.version,
source: "change_detector".to_string(),
context: HashMap::new(),
};
let mut history = self.change_history.write().await;
history.push(document_change);
if history.len() > self.config.max_change_history {
history.remove(0);
}
}
Ok(change_result)
}
pub async fn detect_changes_with_chunks(
&self,
document: &Document,
chunks: &[DocumentChunk],
) -> RragResult<ChangeResult> {
let _start_time = std::time::Instant::now();
let current_state = self.compute_document_state(document, Some(chunks)).await?;
let cache = self.document_cache.read().await;
let previous_state = cache.get(&document.id);
let mut change_result = match previous_state {
Some(prev_state) => {
self.compare_states(&document.id, prev_state, ¤t_state)
.await?
}
None => {
let chunk_changes: Vec<ChunkChange> = chunks
.iter()
.enumerate()
.map(|(i, chunk)| ChunkChange {
chunk_index: i,
change_type: ChangeType::Added,
previous_hash: None,
current_hash: current_state.chunk_hashes[i].clone(),
delta: ContentDelta {
added_chars: chunk.content.len(),
removed_chars: 0,
modified_chars: 0,
previous_size: 0,
current_size: chunk.content.len(),
change_percentage: 1.0,
},
})
.collect();
ChangeResult {
change_type: ChangeType::Added,
document_id: document.id.clone(),
previous_hash: None,
current_hash: current_state.content_hash.clone(),
delta: ContentDelta {
added_chars: current_state.content_size,
removed_chars: 0,
modified_chars: 0,
previous_size: 0,
current_size: current_state.content_size,
change_percentage: 1.0,
},
metadata_changes: MetadataChanges {
added_keys: current_state.metadata_snapshot.keys().cloned().collect(),
removed_keys: Vec::new(),
modified_keys: Vec::new(),
previous_metadata: HashMap::new(),
current_metadata: current_state.metadata_snapshot.clone(),
},
timestamps: ChangeTimestamps {
detected_at: chrono::Utc::now(),
last_modified: Some(document.created_at),
previous_check: None,
time_since_change: None,
},
chunk_changes,
confidence: 1.0,
}
}
};
if self.config.enable_chunk_detection && change_result.chunk_changes.is_empty() {
if let Some(prev_state) = previous_state {
change_result.chunk_changes = self
.analyze_chunk_changes(
&prev_state.chunk_hashes,
¤t_state.chunk_hashes,
chunks,
)
.await?;
}
}
drop(cache);
{
let mut cache = self.document_cache.write().await;
cache.insert(document.id.clone(), current_state);
}
Ok(change_result)
}
pub async fn get_change_history(&self, document_id: &str) -> RragResult<Vec<DocumentChange>> {
let history = self.change_history.read().await;
Ok(history
.iter()
.filter(|change| change.document_id == document_id)
.cloned()
.collect())
}
pub async fn get_stats(&self) -> ChangeDetectionStats {
self.stats.read().await.clone()
}
pub async fn clear_history(&self) -> RragResult<()> {
let mut history = self.change_history.write().await;
history.clear();
Ok(())
}
pub async fn health_check(&self) -> RragResult<bool> {
let _cache = self.document_cache.read().await;
let _history = self.change_history.read().await;
let _stats = self.stats.read().await;
Ok(true)
}
async fn compute_document_state(
&self,
document: &Document,
chunks: Option<&[DocumentChunk]>,
) -> RragResult<DocumentState> {
let content_hash = self.compute_hash(document.content_str()).await?;
let metadata_json = serde_json::to_string(&document.metadata).map_err(|e| {
RragError::serialization_with_message("document_metadata", e.to_string())
})?;
let metadata_hash = self.compute_hash(&metadata_json).await?;
let chunk_hashes = if let Some(chunks) = chunks {
let mut hashes = Vec::with_capacity(chunks.len());
for chunk in chunks {
let chunk_hash = self.compute_hash(&chunk.content).await?;
hashes.push(chunk_hash);
}
hashes
} else {
Vec::new()
};
Ok(DocumentState {
content_hash,
metadata_hash,
chunk_hashes,
last_checked: chrono::Utc::now(),
metadata_snapshot: document.metadata.clone(),
content_size: document.content_str().len(),
version: 1, })
}
async fn compare_states(
&self,
document_id: &str,
previous: &DocumentState,
current: &DocumentState,
) -> RragResult<ChangeResult> {
let mut change_types = Vec::new();
if previous.content_hash != current.content_hash {
change_types.push(ChangeType::ContentChanged);
}
if previous.metadata_hash != current.metadata_hash {
change_types.push(ChangeType::MetadataChanged);
}
let change_type = match change_types.len() {
0 => ChangeType::NoChange,
1 => change_types.into_iter().next().unwrap(),
_ => ChangeType::Multiple(change_types),
};
let delta = self.compute_content_delta(previous, current).await?;
let metadata_changes = self
.compute_metadata_changes(&previous.metadata_snapshot, ¤t.metadata_snapshot)
.await?;
let confidence = self.compute_confidence(&change_type, &delta).await?;
Ok(ChangeResult {
change_type,
document_id: document_id.to_string(),
previous_hash: Some(previous.content_hash.clone()),
current_hash: current.content_hash.clone(),
delta,
metadata_changes,
timestamps: ChangeTimestamps {
detected_at: chrono::Utc::now(),
last_modified: None,
previous_check: Some(previous.last_checked),
time_since_change: Some(chrono::Utc::now() - previous.last_checked),
},
chunk_changes: Vec::new(), confidence,
})
}
async fn analyze_chunk_changes(
&self,
previous_hashes: &[String],
current_hashes: &[String],
current_chunks: &[DocumentChunk],
) -> RragResult<Vec<ChunkChange>> {
let mut chunk_changes = Vec::new();
let max_len = std::cmp::max(previous_hashes.len(), current_hashes.len());
for i in 0..max_len {
let prev_hash = previous_hashes.get(i);
let curr_hash = current_hashes.get(i);
let chunk = current_chunks.get(i);
let (change_type, current_hash, delta) = match (prev_hash, curr_hash, chunk) {
(Some(prev), Some(curr), Some(chunk)) => {
if prev != curr {
let delta = ContentDelta {
added_chars: 0, removed_chars: 0,
modified_chars: chunk.content.len(),
previous_size: chunk.content.len(), current_size: chunk.content.len(),
change_percentage: 0.5, };
(ChangeType::ContentChanged, curr.clone(), delta)
} else {
continue; }
}
(None, Some(curr), Some(chunk)) => {
let delta = ContentDelta {
added_chars: chunk.content.len(),
removed_chars: 0,
modified_chars: 0,
previous_size: 0,
current_size: chunk.content.len(),
change_percentage: 1.0,
};
(ChangeType::Added, curr.clone(), delta)
}
(Some(_), None, _) => {
let delta = ContentDelta {
added_chars: 0,
removed_chars: 0, modified_chars: 0,
previous_size: 0,
current_size: 0,
change_percentage: 1.0,
};
(ChangeType::Deleted, String::new(), delta)
}
_ => continue,
};
chunk_changes.push(ChunkChange {
chunk_index: i,
change_type,
previous_hash: prev_hash.cloned(),
current_hash,
delta,
});
}
Ok(chunk_changes)
}
async fn compute_hash(&self, content: &str) -> RragResult<String> {
let normalized_content = match self.config.sensitivity {
ChangeSensitivity::Low => {
content
.chars()
.filter(|c| !c.is_whitespace())
.collect::<String>()
.to_lowercase()
}
ChangeSensitivity::Medium => {
content
.split_whitespace()
.collect::<Vec<_>>()
.join(" ")
.to_lowercase()
}
ChangeSensitivity::High => {
content.to_lowercase()
}
ChangeSensitivity::Strict => {
content.to_string()
}
};
match self.config.hash_algorithm {
HashAlgorithm::Default => {
let mut hasher = DefaultHasher::new();
hasher.write(normalized_content.as_bytes());
Ok(format!("{:x}", hasher.finish()))
}
HashAlgorithm::Sha256 => {
let mut hasher = DefaultHasher::new();
hasher.write(normalized_content.as_bytes());
Ok(format!("sha256:{:x}", hasher.finish()))
}
HashAlgorithm::Blake3 => {
let mut hasher = DefaultHasher::new();
hasher.write(normalized_content.as_bytes());
Ok(format!("blake3:{:x}", hasher.finish()))
}
HashAlgorithm::Xxhash => {
let mut hasher = DefaultHasher::new();
hasher.write(normalized_content.as_bytes());
Ok(format!("xxhash:{:x}", hasher.finish()))
}
}
}
async fn compute_content_delta(
&self,
previous: &DocumentState,
current: &DocumentState,
) -> RragResult<ContentDelta> {
let size_diff = current.content_size as i64 - previous.content_size as i64;
let (added_chars, removed_chars) = if size_diff > 0 {
(size_diff as usize, 0)
} else {
(0, (-size_diff) as usize)
};
let change_percentage = if previous.content_size == 0 {
1.0
} else {
(size_diff.abs() as f64) / (previous.content_size as f64)
};
Ok(ContentDelta {
added_chars,
removed_chars,
modified_chars: std::cmp::min(previous.content_size, current.content_size),
previous_size: previous.content_size,
current_size: current.content_size,
change_percentage: change_percentage.min(1.0),
})
}
async fn compute_metadata_changes(
&self,
previous: &Metadata,
current: &Metadata,
) -> RragResult<MetadataChanges> {
let prev_keys: HashSet<String> = previous.keys().cloned().collect();
let curr_keys: HashSet<String> = current.keys().cloned().collect();
let added_keys: Vec<String> = curr_keys.difference(&prev_keys).cloned().collect();
let removed_keys: Vec<String> = prev_keys.difference(&curr_keys).cloned().collect();
let mut modified_keys = Vec::new();
for key in prev_keys.intersection(&curr_keys) {
if previous.get(key) != current.get(key) {
modified_keys.push(key.clone());
}
}
Ok(MetadataChanges {
added_keys,
removed_keys,
modified_keys,
previous_metadata: previous.clone(),
current_metadata: current.clone(),
})
}
async fn compute_confidence(
&self,
change_type: &ChangeType,
delta: &ContentDelta,
) -> RragResult<f64> {
let base_confidence = match change_type {
ChangeType::Added | ChangeType::Deleted => 1.0,
ChangeType::NoChange => 1.0,
ChangeType::ContentChanged => {
0.7 + (delta.change_percentage * 0.3)
}
ChangeType::MetadataChanged => 0.8,
ChangeType::Moved => 0.9,
ChangeType::Multiple(_) => 0.9,
};
Ok(base_confidence)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_change_detector_creation() {
let config = ChangeDetectionConfig::default();
let detector = ChangeDetector::new(config).await.unwrap();
assert!(detector.health_check().await.unwrap());
}
#[tokio::test]
async fn test_new_document_detection() {
let detector = ChangeDetector::new(ChangeDetectionConfig::default())
.await
.unwrap();
let doc = Document::new("Test content");
let result = detector.detect_changes(&doc).await.unwrap();
assert_eq!(result.change_type, ChangeType::Added);
assert_eq!(result.document_id, doc.id);
assert!(result.delta.added_chars > 0);
}
#[tokio::test]
async fn test_no_change_detection() {
let detector = ChangeDetector::new(ChangeDetectionConfig::default())
.await
.unwrap();
let doc = Document::new("Test content");
let result1 = detector.detect_changes(&doc).await.unwrap();
assert_eq!(result1.change_type, ChangeType::Added);
let result2 = detector.detect_changes(&doc).await.unwrap();
assert_eq!(result2.change_type, ChangeType::NoChange);
}
#[tokio::test]
async fn test_content_change_detection() {
let detector = ChangeDetector::new(ChangeDetectionConfig::default())
.await
.unwrap();
let doc1 = Document::with_id("test", "Original content");
let doc2 = Document::with_id("test", "Modified content");
detector.detect_changes(&doc1).await.unwrap();
let result = detector.detect_changes(&doc2).await.unwrap();
assert_eq!(result.change_type, ChangeType::ContentChanged);
assert!(result.delta.change_percentage > 0.0);
}
#[tokio::test]
async fn test_metadata_change_detection() {
let detector = ChangeDetector::new(ChangeDetectionConfig::default())
.await
.unwrap();
let doc1 = Document::with_id("test", "Same content")
.with_metadata("key1", serde_json::Value::String("value1".to_string()));
let doc2 = Document::with_id("test", "Same content")
.with_metadata("key1", serde_json::Value::String("value2".to_string()));
detector.detect_changes(&doc1).await.unwrap();
let result = detector.detect_changes(&doc2).await.unwrap();
assert_eq!(result.change_type, ChangeType::MetadataChanged);
assert!(!result.metadata_changes.modified_keys.is_empty());
}
#[test]
fn test_hash_algorithms() {
let config_default = ChangeDetectionConfig {
hash_algorithm: HashAlgorithm::Default,
..Default::default()
};
let config_sha256 = ChangeDetectionConfig {
hash_algorithm: HashAlgorithm::Sha256,
..Default::default()
};
assert_ne!(
format!("{:?}", config_default.hash_algorithm),
format!("{:?}", config_sha256.hash_algorithm)
);
}
#[test]
fn test_change_sensitivity() {
let sensitivities = [
ChangeSensitivity::Low,
ChangeSensitivity::Medium,
ChangeSensitivity::High,
ChangeSensitivity::Strict,
];
for (i, sens1) in sensitivities.iter().enumerate() {
for (j, sens2) in sensitivities.iter().enumerate() {
if i != j {
assert_ne!(format!("{:?}", sens1), format!("{:?}", sens2));
}
}
}
}
}