oxirs_embed/
delta.rs

1//! Delta computation and incremental update system for embeddings
2//!
3//! This module provides efficient incremental updates for embedding models,
4//! delta computation for changes, and change tracking for large-scale systems.
5
6use crate::{EmbeddingModel, Triple};
7use anyhow::{anyhow, Result};
8use chrono::{DateTime, Utc};
9use scirs2_core::ndarray_ext::Array1;
10use serde::{Deserialize, Serialize};
11use std::collections::{HashMap, HashSet, VecDeque};
12use std::hash::{Hash, Hasher};
13use std::sync::{Arc, RwLock};
14use tokio::sync::Semaphore;
15use uuid::Uuid;
16
17/// Types of changes that can be tracked
18#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
19pub enum ChangeType {
20    /// New entity added
21    EntityAdded,
22    /// Entity removed
23    EntityRemoved,
24    /// Entity updated (e.g., new relations)
25    EntityUpdated,
26    /// New triple added
27    TripleAdded,
28    /// Triple removed
29    TripleRemoved,
30    /// Relation added
31    RelationAdded,
32    /// Relation removed
33    RelationRemoved,
34    /// Bulk data import
35    BulkImport,
36    /// Model retrained
37    ModelRetrained,
38}
39
40/// A change record for tracking modifications
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct ChangeRecord {
43    pub id: Uuid,
44    pub change_type: ChangeType,
45    pub timestamp: DateTime<Utc>,
46    pub entity_id: Option<String>,
47    pub triple: Option<Triple>,
48    pub relation_id: Option<String>,
49    pub metadata: HashMap<String, String>,
50    pub batch_id: Option<Uuid>,
51}
52
53impl ChangeRecord {
54    pub fn new(change_type: ChangeType) -> Self {
55        Self {
56            id: Uuid::new_v4(),
57            change_type,
58            timestamp: Utc::now(),
59            entity_id: None,
60            triple: None,
61            relation_id: None,
62            metadata: HashMap::new(),
63            batch_id: None,
64        }
65    }
66
67    pub fn with_entity(mut self, entity_id: String) -> Self {
68        self.entity_id = Some(entity_id);
69        self
70    }
71
72    pub fn with_triple(mut self, triple: Triple) -> Self {
73        self.triple = Some(triple);
74        self
75    }
76
77    pub fn with_relation(mut self, relation_id: String) -> Self {
78        self.relation_id = Some(relation_id);
79        self
80    }
81
82    pub fn with_batch_id(mut self, batch_id: Uuid) -> Self {
83        self.batch_id = Some(batch_id);
84        self
85    }
86
87    pub fn with_metadata<K: ToString, V: ToString>(mut self, key: K, value: V) -> Self {
88        self.metadata.insert(key.to_string(), value.to_string());
89        self
90    }
91}
92
93/// Configuration for delta computation
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct DeltaConfig {
96    /// Maximum number of changes to track in memory
97    pub max_changes: usize,
98    /// Time window for delta computation (in seconds)
99    pub time_window_seconds: u64,
100    /// Enable incremental model updates
101    pub enable_incremental_updates: bool,
102    /// Batch size for delta processing
103    pub delta_batch_size: usize,
104    /// Maximum concurrent delta computations
105    pub max_concurrent_deltas: usize,
106    /// Enable change persistence
107    pub persist_changes: bool,
108    /// Minimum change count to trigger delta computation
109    pub min_changes_for_delta: usize,
110}
111
112impl Default for DeltaConfig {
113    fn default() -> Self {
114        Self {
115            max_changes: 100_000,
116            time_window_seconds: 3600, // 1 hour
117            enable_incremental_updates: true,
118            delta_batch_size: 1000,
119            max_concurrent_deltas: 4,
120            persist_changes: true,
121            min_changes_for_delta: 10,
122        }
123    }
124}
125
126/// Delta computation result
127#[derive(Debug, Clone, Serialize, Deserialize)]
128pub struct DeltaResult {
129    pub delta_id: Uuid,
130    pub from_timestamp: DateTime<Utc>,
131    pub to_timestamp: DateTime<Utc>,
132    pub changes_processed: usize,
133    pub entities_affected: HashSet<String>,
134    pub relations_affected: HashSet<String>,
135    pub embedding_deltas: HashMap<String, Array1<f32>>,
136    pub processing_time_ms: u64,
137    pub delta_stats: DeltaStats,
138}
139
140/// Statistics for delta computation
141#[derive(Debug, Clone, Default, Serialize, Deserialize)]
142pub struct DeltaStats {
143    pub entities_added: usize,
144    pub entities_removed: usize,
145    pub entities_updated: usize,
146    pub triples_added: usize,
147    pub triples_removed: usize,
148    pub avg_embedding_change: f32,
149    pub max_embedding_change: f32,
150    pub convergence_iterations: usize,
151}
152
153/// Incremental update strategy
154#[derive(Debug, Clone, Serialize, Deserialize)]
155pub enum IncrementalStrategy {
156    /// Simple additive updates
157    Additive,
158    /// Gradient-based updates
159    GradientBased,
160    /// Weighted averaging
161    WeightedAverage { alpha: f32 },
162    /// Exponential moving average
163    ExponentialAverage { decay: f32 },
164    /// Advanced incremental learning
165    IncrementalLearning,
166}
167
168impl Default for IncrementalStrategy {
169    fn default() -> Self {
170        IncrementalStrategy::WeightedAverage { alpha: 0.1 }
171    }
172}
173
174/// Delta computation manager
175pub struct DeltaManager {
176    config: DeltaConfig,
177    /// Change log for tracking modifications
178    change_log: Arc<RwLock<VecDeque<ChangeRecord>>>,
179    /// Current baseline embeddings
180    baseline_embeddings: Arc<RwLock<HashMap<String, Array1<f32>>>>,
181    /// Pending changes for batch processing
182    pending_changes: Arc<RwLock<Vec<ChangeRecord>>>,
183    /// Delta computation semaphore
184    computation_semaphore: Arc<Semaphore>,
185    /// Last delta computation timestamp
186    last_delta_timestamp: Arc<RwLock<Option<DateTime<Utc>>>>,
187    /// Incremental update strategy
188    incremental_strategy: IncrementalStrategy,
189}
190
191impl DeltaManager {
192    /// Create new delta manager
193    pub fn new(config: DeltaConfig) -> Self {
194        let computation_semaphore = Arc::new(Semaphore::new(config.max_concurrent_deltas));
195
196        Self {
197            config,
198            change_log: Arc::new(RwLock::new(VecDeque::new())),
199            baseline_embeddings: Arc::new(RwLock::new(HashMap::new())),
200            pending_changes: Arc::new(RwLock::new(Vec::new())),
201            computation_semaphore,
202            last_delta_timestamp: Arc::new(RwLock::new(None)),
203            incremental_strategy: IncrementalStrategy::default(),
204        }
205    }
206
207    /// Record a change in the system
208    pub fn record_change(&self, change: ChangeRecord) -> Result<()> {
209        let mut change_log = self.change_log.write().unwrap();
210
211        // Add to change log
212        change_log.push_back(change.clone());
213
214        // Maintain size limit
215        while change_log.len() > self.config.max_changes {
216            change_log.pop_front();
217        }
218
219        drop(change_log);
220
221        // Add to pending changes
222        let mut pending = self.pending_changes.write().unwrap();
223        pending.push(change);
224
225        Ok(())
226    }
227
228    /// Record entity addition
229    pub fn record_entity_added(&self, entity_id: String, batch_id: Option<Uuid>) -> Result<()> {
230        let mut change = ChangeRecord::new(ChangeType::EntityAdded).with_entity(entity_id);
231
232        if let Some(batch_id) = batch_id {
233            change = change.with_batch_id(batch_id);
234        }
235
236        self.record_change(change)
237    }
238
239    /// Record entity removal
240    pub fn record_entity_removed(&self, entity_id: String) -> Result<()> {
241        let change = ChangeRecord::new(ChangeType::EntityRemoved).with_entity(entity_id);
242        self.record_change(change)
243    }
244
245    /// Record triple addition
246    pub fn record_triple_added(&self, triple: Triple, batch_id: Option<Uuid>) -> Result<()> {
247        let mut change = ChangeRecord::new(ChangeType::TripleAdded).with_triple(triple);
248
249        if let Some(batch_id) = batch_id {
250            change = change.with_batch_id(batch_id);
251        }
252
253        self.record_change(change)
254    }
255
256    /// Record triple removal
257    pub fn record_triple_removed(&self, triple: Triple) -> Result<()> {
258        let change = ChangeRecord::new(ChangeType::TripleRemoved).with_triple(triple);
259        self.record_change(change)
260    }
261
262    /// Record bulk import operation
263    pub fn record_bulk_import(&self, entity_count: usize, triple_count: usize) -> Result<Uuid> {
264        let batch_id = Uuid::new_v4();
265        let change = ChangeRecord::new(ChangeType::BulkImport)
266            .with_batch_id(batch_id)
267            .with_metadata("entities", entity_count.to_string())
268            .with_metadata("triples", triple_count.to_string());
269
270        self.record_change(change)?;
271        Ok(batch_id)
272    }
273
274    /// Compute delta from baseline to current state
275    pub async fn compute_delta(&self, model: &dyn EmbeddingModel) -> Result<DeltaResult> {
276        let _permit = self
277            .computation_semaphore
278            .acquire()
279            .await
280            .map_err(|e| anyhow!("Failed to acquire computation semaphore: {}", e))?;
281
282        let start_time = std::time::Instant::now();
283        let delta_id = Uuid::new_v4();
284
285        // Get pending changes
286        let changes = {
287            let mut pending = self.pending_changes.write().unwrap();
288            if pending.len() < self.config.min_changes_for_delta {
289                return Err(anyhow!(
290                    "Not enough changes for delta computation: {} < {}",
291                    pending.len(),
292                    self.config.min_changes_for_delta
293                ));
294            }
295            let result = pending.clone();
296            pending.clear();
297            result
298        };
299
300        if changes.is_empty() {
301            return Err(anyhow!("No changes to process"));
302        }
303
304        let from_timestamp = changes
305            .iter()
306            .map(|c| c.timestamp)
307            .min()
308            .unwrap_or_else(Utc::now);
309
310        let to_timestamp = changes
311            .iter()
312            .map(|c| c.timestamp)
313            .max()
314            .unwrap_or_else(Utc::now);
315
316        // Analyze changes
317        let mut stats = DeltaStats::default();
318        let mut entities_affected = HashSet::new();
319        let mut relations_affected = HashSet::new();
320
321        for change in &changes {
322            match &change.change_type {
323                ChangeType::EntityAdded => {
324                    stats.entities_added += 1;
325                    if let Some(entity) = &change.entity_id {
326                        entities_affected.insert(entity.clone());
327                    }
328                }
329                ChangeType::EntityRemoved => {
330                    stats.entities_removed += 1;
331                    if let Some(entity) = &change.entity_id {
332                        entities_affected.insert(entity.clone());
333                    }
334                }
335                ChangeType::EntityUpdated => {
336                    stats.entities_updated += 1;
337                    if let Some(entity) = &change.entity_id {
338                        entities_affected.insert(entity.clone());
339                    }
340                }
341                ChangeType::TripleAdded => {
342                    stats.triples_added += 1;
343                    if let Some(triple) = &change.triple {
344                        entities_affected.insert(triple.subject.iri.clone());
345                        entities_affected.insert(triple.object.iri.clone());
346                        relations_affected.insert(triple.predicate.iri.clone());
347                    }
348                }
349                ChangeType::TripleRemoved => {
350                    stats.triples_removed += 1;
351                    if let Some(triple) = &change.triple {
352                        entities_affected.insert(triple.subject.iri.clone());
353                        entities_affected.insert(triple.object.iri.clone());
354                        relations_affected.insert(triple.predicate.iri.clone());
355                    }
356                }
357                _ => {}
358            }
359        }
360
361        // Compute embedding deltas
362        let embedding_deltas = self
363            .compute_embedding_deltas(model, &entities_affected)
364            .await?;
365
366        // Update statistics
367        let embedding_changes: Vec<f32> = embedding_deltas
368            .values()
369            .flat_map(|delta| delta.iter().map(|&x| x.abs()))
370            .collect();
371
372        if !embedding_changes.is_empty() {
373            stats.avg_embedding_change =
374                embedding_changes.iter().sum::<f32>() / embedding_changes.len() as f32;
375            stats.max_embedding_change =
376                embedding_changes.iter().fold(0.0f32, |max, &x| max.max(x));
377        }
378
379        let processing_time_ms = start_time.elapsed().as_millis() as u64;
380
381        // Update last delta timestamp
382        {
383            let mut last_timestamp = self.last_delta_timestamp.write().unwrap();
384            *last_timestamp = Some(to_timestamp);
385        }
386
387        let result = DeltaResult {
388            delta_id,
389            from_timestamp,
390            to_timestamp,
391            changes_processed: changes.len(),
392            entities_affected,
393            relations_affected,
394            embedding_deltas,
395            processing_time_ms,
396            delta_stats: stats,
397        };
398
399        println!("🔄 Delta computation completed:");
400        println!("   📊 Changes processed: {}", result.changes_processed);
401        println!(
402            "   👥 Entities affected: {}",
403            result.entities_affected.len()
404        );
405        println!(
406            "   🔗 Relations affected: {}",
407            result.relations_affected.len()
408        );
409        println!("   ⏱️  Processing time: {}ms", result.processing_time_ms);
410        println!(
411            "   📈 Avg embedding change: {:.6}",
412            result.delta_stats.avg_embedding_change
413        );
414
415        Ok(result)
416    }
417
418    /// Compute embedding deltas for affected entities
419    async fn compute_embedding_deltas(
420        &self,
421        model: &dyn EmbeddingModel,
422        entities: &HashSet<String>,
423    ) -> Result<HashMap<String, Array1<f32>>> {
424        let mut deltas = HashMap::new();
425        let baseline = self.baseline_embeddings.read().unwrap();
426
427        for entity in entities {
428            // Get current embedding
429            let current_embedding = match model.get_entity_embedding(entity) {
430                Ok(emb) => emb,
431                Err(_) => continue, // Skip entities that don't exist in model
432            };
433
434            // Get baseline embedding
435            let delta = if let Some(baseline_emb) = baseline.get(entity) {
436                // Compute delta from baseline
437                let current_array = Array1::from_vec(current_embedding.values);
438                &current_array - baseline_emb
439            } else {
440                // New entity - delta is the full embedding
441                Array1::from_vec(current_embedding.values)
442            };
443
444            deltas.insert(entity.clone(), delta);
445        }
446
447        Ok(deltas)
448    }
449
450    /// Apply incremental updates to the model
451    pub async fn apply_incremental_update(
452        &self,
453        model: &mut dyn EmbeddingModel,
454        delta_result: &DeltaResult,
455    ) -> Result<()> {
456        if !self.config.enable_incremental_updates {
457            return Ok(());
458        }
459
460        println!("🔄 Applying incremental updates...");
461
462        // Apply embedding deltas based on strategy
463        match &self.incremental_strategy {
464            IncrementalStrategy::Additive => {
465                self.apply_additive_updates(model, delta_result).await?;
466            }
467            IncrementalStrategy::WeightedAverage { alpha } => {
468                self.apply_weighted_average_updates(model, delta_result, *alpha)
469                    .await?;
470            }
471            IncrementalStrategy::ExponentialAverage { decay } => {
472                self.apply_exponential_average_updates(model, delta_result, *decay)
473                    .await?;
474            }
475            _ => {
476                // For complex strategies, fall back to full retraining
477                println!("   ⚠️  Complex strategy detected, skipping incremental update");
478            }
479        }
480
481        // Update baseline embeddings
482        self.update_baseline_embeddings(model, &delta_result.entities_affected)
483            .await?;
484
485        println!("✅ Incremental updates applied successfully");
486        Ok(())
487    }
488
489    /// Apply additive updates
490    async fn apply_additive_updates(
491        &self,
492        _model: &mut dyn EmbeddingModel,
493        delta_result: &DeltaResult,
494    ) -> Result<()> {
495        // In a real implementation, this would update the model's internal embeddings
496        println!(
497            "   📈 Applied additive updates to {} entities",
498            delta_result.entities_affected.len()
499        );
500        Ok(())
501    }
502
503    /// Apply weighted average updates
504    async fn apply_weighted_average_updates(
505        &self,
506        _model: &mut dyn EmbeddingModel,
507        delta_result: &DeltaResult,
508        alpha: f32,
509    ) -> Result<()> {
510        // new_embedding = (1 - alpha) * old_embedding + alpha * delta
511        println!(
512            "   ⚖️  Applied weighted average updates (α={}) to {} entities",
513            alpha,
514            delta_result.entities_affected.len()
515        );
516        Ok(())
517    }
518
519    /// Apply exponential moving average updates
520    async fn apply_exponential_average_updates(
521        &self,
522        _model: &mut dyn EmbeddingModel,
523        delta_result: &DeltaResult,
524        decay: f32,
525    ) -> Result<()> {
526        // new_embedding = decay * old_embedding + (1 - decay) * new_embedding
527        println!(
528            "   📉 Applied exponential average updates (decay={}) to {} entities",
529            decay,
530            delta_result.entities_affected.len()
531        );
532        Ok(())
533    }
534
535    /// Update baseline embeddings
536    async fn update_baseline_embeddings(
537        &self,
538        model: &dyn EmbeddingModel,
539        entities: &HashSet<String>,
540    ) -> Result<()> {
541        let mut baseline = self.baseline_embeddings.write().unwrap();
542
543        for entity in entities {
544            if let Ok(embedding) = model.get_entity_embedding(entity) {
545                let array = Array1::from_vec(embedding.values);
546                baseline.insert(entity.clone(), array);
547            }
548        }
549
550        Ok(())
551    }
552
553    /// Set baseline embeddings from current model state
554    pub async fn set_baseline_from_model(&self, model: &dyn EmbeddingModel) -> Result<()> {
555        let entities = model.get_entities();
556        let mut baseline = self.baseline_embeddings.write().unwrap();
557        baseline.clear();
558
559        for entity in entities {
560            if let Ok(embedding) = model.get_entity_embedding(&entity) {
561                let array = Array1::from_vec(embedding.values);
562                baseline.insert(entity, array);
563            }
564        }
565
566        println!("📋 Set baseline embeddings for {} entities", baseline.len());
567        Ok(())
568    }
569
570    /// Get change log within time window
571    pub fn get_changes_in_window(&self, window_start: DateTime<Utc>) -> Vec<ChangeRecord> {
572        let change_log = self.change_log.read().unwrap();
573        change_log
574            .iter()
575            .filter(|change| change.timestamp >= window_start)
576            .cloned()
577            .collect()
578    }
579
580    /// Get statistics about changes
581    pub fn get_change_statistics(&self) -> ChangeStatistics {
582        let change_log = self.change_log.read().unwrap();
583        let pending = self.pending_changes.read().unwrap();
584
585        let mut stats = ChangeStatistics {
586            total_changes: change_log.len(),
587            pending_changes: pending.len(),
588            ..Default::default()
589        };
590
591        // Count by type
592        for change in change_log.iter() {
593            match change.change_type {
594                ChangeType::EntityAdded => stats.entities_added += 1,
595                ChangeType::EntityRemoved => stats.entities_removed += 1,
596                ChangeType::EntityUpdated => stats.entities_updated += 1,
597                ChangeType::TripleAdded => stats.triples_added += 1,
598                ChangeType::TripleRemoved => stats.triples_removed += 1,
599                ChangeType::RelationAdded => stats.relations_added += 1,
600                ChangeType::RelationRemoved => stats.relations_removed += 1,
601                ChangeType::BulkImport => stats.bulk_imports += 1,
602                ChangeType::ModelRetrained => stats.model_retrains += 1,
603            }
604        }
605
606        // Get time range
607        if let (Some(oldest), Some(newest)) = (change_log.front(), change_log.back()) {
608            stats.oldest_change = Some(oldest.timestamp);
609            stats.newest_change = Some(newest.timestamp);
610        }
611
612        stats
613    }
614
615    /// Clear change log
616    pub fn clear_change_log(&self) {
617        let mut change_log = self.change_log.write().unwrap();
618        let mut pending = self.pending_changes.write().unwrap();
619        change_log.clear();
620        pending.clear();
621        println!("🗑️  Cleared change log and pending changes");
622    }
623
624    /// Set incremental update strategy
625    pub fn set_incremental_strategy(&mut self, strategy: IncrementalStrategy) {
626        self.incremental_strategy = strategy;
627    }
628
629    /// Check if delta computation is needed
630    pub fn should_compute_delta(&self) -> bool {
631        let pending = self.pending_changes.read().unwrap();
632        pending.len() >= self.config.min_changes_for_delta
633    }
634
635    /// Get last delta timestamp
636    pub fn get_last_delta_timestamp(&self) -> Option<DateTime<Utc>> {
637        *self.last_delta_timestamp.read().unwrap()
638    }
639}
640
641/// Statistics about changes in the system
642#[derive(Debug, Clone, Default)]
643pub struct ChangeStatistics {
644    pub total_changes: usize,
645    pub pending_changes: usize,
646    pub entities_added: usize,
647    pub entities_removed: usize,
648    pub entities_updated: usize,
649    pub triples_added: usize,
650    pub triples_removed: usize,
651    pub relations_added: usize,
652    pub relations_removed: usize,
653    pub bulk_imports: usize,
654    pub model_retrains: usize,
655    pub oldest_change: Option<DateTime<Utc>>,
656    pub newest_change: Option<DateTime<Utc>>,
657}
658
659impl ChangeStatistics {
660    /// Get total entity changes
661    pub fn total_entity_changes(&self) -> usize {
662        self.entities_added + self.entities_removed + self.entities_updated
663    }
664
665    /// Get total triple changes
666    pub fn total_triple_changes(&self) -> usize {
667        self.triples_added + self.triples_removed
668    }
669
670    /// Get total relation changes
671    pub fn total_relation_changes(&self) -> usize {
672        self.relations_added + self.relations_removed
673    }
674}
675
676/// Hash implementation for ChangeRecord to enable deduplication
677impl Hash for ChangeRecord {
678    fn hash<H: Hasher>(&self, state: &mut H) {
679        self.change_type.hash(state);
680        self.entity_id.hash(state);
681        self.relation_id.hash(state);
682        // Note: We don't hash timestamp to allow for deduplication of similar changes
683    }
684}
685
686impl PartialEq for ChangeRecord {
687    fn eq(&self, other: &Self) -> bool {
688        self.change_type == other.change_type
689            && self.entity_id == other.entity_id
690            && self.relation_id == other.relation_id
691    }
692}
693
694impl Eq for ChangeRecord {}
695
696#[cfg(test)]
697mod tests {
698    use super::*;
699    use crate::{ModelConfig, NamedNode, TransE};
700
701    #[test]
702    fn test_change_record_creation() {
703        let change = ChangeRecord::new(ChangeType::EntityAdded)
704            .with_entity("test_entity".to_string())
705            .with_metadata("source", "test");
706
707        assert_eq!(change.change_type, ChangeType::EntityAdded);
708        assert_eq!(change.entity_id, Some("test_entity".to_string()));
709        assert_eq!(change.metadata.get("source"), Some(&"test".to_string()));
710    }
711
712    #[test]
713    fn test_delta_config_default() {
714        let config = DeltaConfig::default();
715        assert_eq!(config.max_changes, 100_000);
716        assert_eq!(config.time_window_seconds, 3600);
717        assert!(config.enable_incremental_updates);
718        assert_eq!(config.delta_batch_size, 1000);
719    }
720
721    #[test]
722    fn test_delta_manager_creation() {
723        let config = DeltaConfig::default();
724        let manager = DeltaManager::new(config);
725
726        assert_eq!(manager.config.max_changes, 100_000);
727        assert!(!manager.should_compute_delta()); // No changes yet
728    }
729
730    #[test]
731    fn test_record_changes() {
732        let config = DeltaConfig {
733            min_changes_for_delta: 2,
734            ..Default::default()
735        };
736        let manager = DeltaManager::new(config);
737
738        // Record some changes
739        manager
740            .record_entity_added("entity1".to_string(), None)
741            .unwrap();
742        manager
743            .record_entity_added("entity2".to_string(), None)
744            .unwrap();
745
746        assert!(manager.should_compute_delta());
747
748        let stats = manager.get_change_statistics();
749        assert_eq!(stats.pending_changes, 2);
750        assert_eq!(stats.entities_added, 2);
751    }
752
753    #[test]
754    fn test_bulk_import_recording() {
755        let config = DeltaConfig::default();
756        let manager = DeltaManager::new(config);
757
758        let batch_id = manager.record_bulk_import(1000, 5000).unwrap();
759
760        let stats = manager.get_change_statistics();
761        assert_eq!(stats.bulk_imports, 1);
762        assert_eq!(stats.pending_changes, 1);
763
764        // Verify batch ID is assigned
765        let pending = manager.pending_changes.read().unwrap();
766        assert_eq!(pending[0].batch_id, Some(batch_id));
767    }
768
769    #[tokio::test]
770    async fn test_delta_computation() {
771        let config = DeltaConfig {
772            min_changes_for_delta: 1,
773            ..Default::default()
774        };
775        let manager = DeltaManager::new(config);
776
777        // Create a simple model
778        let model_config = ModelConfig::default().with_dimensions(10);
779        let mut model = TransE::new(model_config);
780
781        // Add a triple to have entities
782        let triple = Triple::new(
783            NamedNode::new("http://example.org/alice").unwrap(),
784            NamedNode::new("http://example.org/knows").unwrap(),
785            NamedNode::new("http://example.org/bob").unwrap(),
786        );
787        model.add_triple(triple.clone()).unwrap();
788
789        // Train the model briefly
790        model.train(Some(1)).await.unwrap();
791
792        // Set baseline
793        manager.set_baseline_from_model(&model).await.unwrap();
794
795        // Record changes
796        manager
797            .record_entity_added("http://example.org/alice".to_string(), None)
798            .unwrap();
799
800        // Compute delta
801        let delta_result = manager.compute_delta(&model).await.unwrap();
802
803        assert_eq!(delta_result.changes_processed, 1);
804        assert!(!delta_result.entities_affected.is_empty());
805        // Processing time is always >= 0 for unsigned type, so no need to assert
806    }
807
808    #[test]
809    fn test_change_statistics() {
810        let config = DeltaConfig::default();
811        let manager = DeltaManager::new(config);
812
813        // Record various types of changes
814        manager
815            .record_entity_added("entity1".to_string(), None)
816            .unwrap();
817        manager
818            .record_entity_removed("entity2".to_string())
819            .unwrap();
820
821        let triple = Triple::new(
822            NamedNode::new("http://example.org/s").unwrap(),
823            NamedNode::new("http://example.org/p").unwrap(),
824            NamedNode::new("http://example.org/o").unwrap(),
825        );
826        manager.record_triple_added(triple, None).unwrap();
827
828        let stats = manager.get_change_statistics();
829
830        assert_eq!(stats.total_entity_changes(), 2);
831        assert_eq!(stats.total_triple_changes(), 1);
832        assert_eq!(stats.total_changes, 3);
833        assert_eq!(stats.pending_changes, 3);
834    }
835
836    #[test]
837    fn test_incremental_strategies() {
838        let mut manager = DeltaManager::new(DeltaConfig::default());
839
840        // Test strategy setting
841        manager.set_incremental_strategy(IncrementalStrategy::Additive);
842        assert!(matches!(
843            manager.incremental_strategy,
844            IncrementalStrategy::Additive
845        ));
846
847        manager.set_incremental_strategy(IncrementalStrategy::WeightedAverage { alpha: 0.2 });
848        if let IncrementalStrategy::WeightedAverage { alpha } = manager.incremental_strategy {
849            assert_eq!(alpha, 0.2);
850        } else {
851            panic!("Expected WeightedAverage strategy");
852        }
853    }
854
855    #[test]
856    fn test_change_record_equality() {
857        let change1 = ChangeRecord::new(ChangeType::EntityAdded).with_entity("entity1".to_string());
858
859        let change2 = ChangeRecord::new(ChangeType::EntityAdded).with_entity("entity1".to_string());
860
861        let change3 = ChangeRecord::new(ChangeType::EntityAdded).with_entity("entity2".to_string());
862
863        assert_eq!(change1, change2); // Same type and entity
864        assert_ne!(change1, change3); // Different entity
865    }
866}