oxirs_embed/
delta.rs

1//! Delta computation and incremental update system for embeddings
2//!
3//! This module provides efficient incremental updates for embedding models,
4//! delta computation for changes, and change tracking for large-scale systems.
5
6use crate::{EmbeddingModel, Triple};
7use anyhow::{anyhow, Result};
8use chrono::{DateTime, Utc};
9use scirs2_core::ndarray_ext::Array1;
10use serde::{Deserialize, Serialize};
11use std::collections::{HashMap, HashSet, VecDeque};
12use std::hash::{Hash, Hasher};
13use std::sync::{Arc, RwLock};
14use tokio::sync::Semaphore;
15use uuid::Uuid;
16
17/// Types of changes that can be tracked
18#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
19pub enum ChangeType {
20    /// New entity added
21    EntityAdded,
22    /// Entity removed
23    EntityRemoved,
24    /// Entity updated (e.g., new relations)
25    EntityUpdated,
26    /// New triple added
27    TripleAdded,
28    /// Triple removed
29    TripleRemoved,
30    /// Relation added
31    RelationAdded,
32    /// Relation removed
33    RelationRemoved,
34    /// Bulk data import
35    BulkImport,
36    /// Model retrained
37    ModelRetrained,
38}
39
40/// A change record for tracking modifications
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct ChangeRecord {
43    pub id: Uuid,
44    pub change_type: ChangeType,
45    pub timestamp: DateTime<Utc>,
46    pub entity_id: Option<String>,
47    pub triple: Option<Triple>,
48    pub relation_id: Option<String>,
49    pub metadata: HashMap<String, String>,
50    pub batch_id: Option<Uuid>,
51}
52
53impl ChangeRecord {
54    pub fn new(change_type: ChangeType) -> Self {
55        Self {
56            id: Uuid::new_v4(),
57            change_type,
58            timestamp: Utc::now(),
59            entity_id: None,
60            triple: None,
61            relation_id: None,
62            metadata: HashMap::new(),
63            batch_id: None,
64        }
65    }
66
67    pub fn with_entity(mut self, entity_id: String) -> Self {
68        self.entity_id = Some(entity_id);
69        self
70    }
71
72    pub fn with_triple(mut self, triple: Triple) -> Self {
73        self.triple = Some(triple);
74        self
75    }
76
77    pub fn with_relation(mut self, relation_id: String) -> Self {
78        self.relation_id = Some(relation_id);
79        self
80    }
81
82    pub fn with_batch_id(mut self, batch_id: Uuid) -> Self {
83        self.batch_id = Some(batch_id);
84        self
85    }
86
87    pub fn with_metadata<K: ToString, V: ToString>(mut self, key: K, value: V) -> Self {
88        self.metadata.insert(key.to_string(), value.to_string());
89        self
90    }
91}
92
93/// Configuration for delta computation
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct DeltaConfig {
96    /// Maximum number of changes to track in memory
97    pub max_changes: usize,
98    /// Time window for delta computation (in seconds)
99    pub time_window_seconds: u64,
100    /// Enable incremental model updates
101    pub enable_incremental_updates: bool,
102    /// Batch size for delta processing
103    pub delta_batch_size: usize,
104    /// Maximum concurrent delta computations
105    pub max_concurrent_deltas: usize,
106    /// Enable change persistence
107    pub persist_changes: bool,
108    /// Minimum change count to trigger delta computation
109    pub min_changes_for_delta: usize,
110}
111
112impl Default for DeltaConfig {
113    fn default() -> Self {
114        Self {
115            max_changes: 100_000,
116            time_window_seconds: 3600, // 1 hour
117            enable_incremental_updates: true,
118            delta_batch_size: 1000,
119            max_concurrent_deltas: 4,
120            persist_changes: true,
121            min_changes_for_delta: 10,
122        }
123    }
124}
125
126/// Delta computation result
127#[derive(Debug, Clone, Serialize, Deserialize)]
128pub struct DeltaResult {
129    pub delta_id: Uuid,
130    pub from_timestamp: DateTime<Utc>,
131    pub to_timestamp: DateTime<Utc>,
132    pub changes_processed: usize,
133    pub entities_affected: HashSet<String>,
134    pub relations_affected: HashSet<String>,
135    pub embedding_deltas: HashMap<String, Array1<f32>>,
136    pub processing_time_ms: u64,
137    pub delta_stats: DeltaStats,
138}
139
140/// Statistics for delta computation
141#[derive(Debug, Clone, Default, Serialize, Deserialize)]
142pub struct DeltaStats {
143    pub entities_added: usize,
144    pub entities_removed: usize,
145    pub entities_updated: usize,
146    pub triples_added: usize,
147    pub triples_removed: usize,
148    pub avg_embedding_change: f32,
149    pub max_embedding_change: f32,
150    pub convergence_iterations: usize,
151}
152
153/// Incremental update strategy
154#[derive(Debug, Clone, Serialize, Deserialize)]
155pub enum IncrementalStrategy {
156    /// Simple additive updates
157    Additive,
158    /// Gradient-based updates
159    GradientBased,
160    /// Weighted averaging
161    WeightedAverage { alpha: f32 },
162    /// Exponential moving average
163    ExponentialAverage { decay: f32 },
164    /// Advanced incremental learning
165    IncrementalLearning,
166}
167
168impl Default for IncrementalStrategy {
169    fn default() -> Self {
170        IncrementalStrategy::WeightedAverage { alpha: 0.1 }
171    }
172}
173
174/// Delta computation manager
175pub struct DeltaManager {
176    config: DeltaConfig,
177    /// Change log for tracking modifications
178    change_log: Arc<RwLock<VecDeque<ChangeRecord>>>,
179    /// Current baseline embeddings
180    baseline_embeddings: Arc<RwLock<HashMap<String, Array1<f32>>>>,
181    /// Pending changes for batch processing
182    pending_changes: Arc<RwLock<Vec<ChangeRecord>>>,
183    /// Delta computation semaphore
184    computation_semaphore: Arc<Semaphore>,
185    /// Last delta computation timestamp
186    last_delta_timestamp: Arc<RwLock<Option<DateTime<Utc>>>>,
187    /// Incremental update strategy
188    incremental_strategy: IncrementalStrategy,
189}
190
191impl DeltaManager {
192    /// Create new delta manager
193    pub fn new(config: DeltaConfig) -> Self {
194        let computation_semaphore = Arc::new(Semaphore::new(config.max_concurrent_deltas));
195
196        Self {
197            config,
198            change_log: Arc::new(RwLock::new(VecDeque::new())),
199            baseline_embeddings: Arc::new(RwLock::new(HashMap::new())),
200            pending_changes: Arc::new(RwLock::new(Vec::new())),
201            computation_semaphore,
202            last_delta_timestamp: Arc::new(RwLock::new(None)),
203            incremental_strategy: IncrementalStrategy::default(),
204        }
205    }
206
207    /// Record a change in the system
208    pub fn record_change(&self, change: ChangeRecord) -> Result<()> {
209        let mut change_log = self
210            .change_log
211            .write()
212            .expect("rwlock should not be poisoned");
213
214        // Add to change log
215        change_log.push_back(change.clone());
216
217        // Maintain size limit
218        while change_log.len() > self.config.max_changes {
219            change_log.pop_front();
220        }
221
222        drop(change_log);
223
224        // Add to pending changes
225        let mut pending = self
226            .pending_changes
227            .write()
228            .expect("rwlock should not be poisoned");
229        pending.push(change);
230
231        Ok(())
232    }
233
234    /// Record entity addition
235    pub fn record_entity_added(&self, entity_id: String, batch_id: Option<Uuid>) -> Result<()> {
236        let mut change = ChangeRecord::new(ChangeType::EntityAdded).with_entity(entity_id);
237
238        if let Some(batch_id) = batch_id {
239            change = change.with_batch_id(batch_id);
240        }
241
242        self.record_change(change)
243    }
244
245    /// Record entity removal
246    pub fn record_entity_removed(&self, entity_id: String) -> Result<()> {
247        let change = ChangeRecord::new(ChangeType::EntityRemoved).with_entity(entity_id);
248        self.record_change(change)
249    }
250
251    /// Record triple addition
252    pub fn record_triple_added(&self, triple: Triple, batch_id: Option<Uuid>) -> Result<()> {
253        let mut change = ChangeRecord::new(ChangeType::TripleAdded).with_triple(triple);
254
255        if let Some(batch_id) = batch_id {
256            change = change.with_batch_id(batch_id);
257        }
258
259        self.record_change(change)
260    }
261
262    /// Record triple removal
263    pub fn record_triple_removed(&self, triple: Triple) -> Result<()> {
264        let change = ChangeRecord::new(ChangeType::TripleRemoved).with_triple(triple);
265        self.record_change(change)
266    }
267
268    /// Record bulk import operation
269    pub fn record_bulk_import(&self, entity_count: usize, triple_count: usize) -> Result<Uuid> {
270        let batch_id = Uuid::new_v4();
271        let change = ChangeRecord::new(ChangeType::BulkImport)
272            .with_batch_id(batch_id)
273            .with_metadata("entities", entity_count.to_string())
274            .with_metadata("triples", triple_count.to_string());
275
276        self.record_change(change)?;
277        Ok(batch_id)
278    }
279
280    /// Compute delta from baseline to current state
281    pub async fn compute_delta(&self, model: &dyn EmbeddingModel) -> Result<DeltaResult> {
282        let _permit = self
283            .computation_semaphore
284            .acquire()
285            .await
286            .map_err(|e| anyhow!("Failed to acquire computation semaphore: {}", e))?;
287
288        let start_time = std::time::Instant::now();
289        let delta_id = Uuid::new_v4();
290
291        // Get pending changes
292        let changes = {
293            let mut pending = self
294                .pending_changes
295                .write()
296                .expect("rwlock should not be poisoned");
297            if pending.len() < self.config.min_changes_for_delta {
298                return Err(anyhow!(
299                    "Not enough changes for delta computation: {} < {}",
300                    pending.len(),
301                    self.config.min_changes_for_delta
302                ));
303            }
304            let result = pending.clone();
305            pending.clear();
306            result
307        };
308
309        if changes.is_empty() {
310            return Err(anyhow!("No changes to process"));
311        }
312
313        let from_timestamp = changes
314            .iter()
315            .map(|c| c.timestamp)
316            .min()
317            .unwrap_or_else(Utc::now);
318
319        let to_timestamp = changes
320            .iter()
321            .map(|c| c.timestamp)
322            .max()
323            .unwrap_or_else(Utc::now);
324
325        // Analyze changes
326        let mut stats = DeltaStats::default();
327        let mut entities_affected = HashSet::new();
328        let mut relations_affected = HashSet::new();
329
330        for change in &changes {
331            match &change.change_type {
332                ChangeType::EntityAdded => {
333                    stats.entities_added += 1;
334                    if let Some(entity) = &change.entity_id {
335                        entities_affected.insert(entity.clone());
336                    }
337                }
338                ChangeType::EntityRemoved => {
339                    stats.entities_removed += 1;
340                    if let Some(entity) = &change.entity_id {
341                        entities_affected.insert(entity.clone());
342                    }
343                }
344                ChangeType::EntityUpdated => {
345                    stats.entities_updated += 1;
346                    if let Some(entity) = &change.entity_id {
347                        entities_affected.insert(entity.clone());
348                    }
349                }
350                ChangeType::TripleAdded => {
351                    stats.triples_added += 1;
352                    if let Some(triple) = &change.triple {
353                        entities_affected.insert(triple.subject.iri.clone());
354                        entities_affected.insert(triple.object.iri.clone());
355                        relations_affected.insert(triple.predicate.iri.clone());
356                    }
357                }
358                ChangeType::TripleRemoved => {
359                    stats.triples_removed += 1;
360                    if let Some(triple) = &change.triple {
361                        entities_affected.insert(triple.subject.iri.clone());
362                        entities_affected.insert(triple.object.iri.clone());
363                        relations_affected.insert(triple.predicate.iri.clone());
364                    }
365                }
366                _ => {}
367            }
368        }
369
370        // Compute embedding deltas
371        let embedding_deltas = self
372            .compute_embedding_deltas(model, &entities_affected)
373            .await?;
374
375        // Update statistics
376        let embedding_changes: Vec<f32> = embedding_deltas
377            .values()
378            .flat_map(|delta| delta.iter().map(|&x| x.abs()))
379            .collect();
380
381        if !embedding_changes.is_empty() {
382            stats.avg_embedding_change =
383                embedding_changes.iter().sum::<f32>() / embedding_changes.len() as f32;
384            stats.max_embedding_change =
385                embedding_changes.iter().fold(0.0f32, |max, &x| max.max(x));
386        }
387
388        let processing_time_ms = start_time.elapsed().as_millis() as u64;
389
390        // Update last delta timestamp
391        {
392            let mut last_timestamp = self
393                .last_delta_timestamp
394                .write()
395                .expect("rwlock should not be poisoned");
396            *last_timestamp = Some(to_timestamp);
397        }
398
399        let result = DeltaResult {
400            delta_id,
401            from_timestamp,
402            to_timestamp,
403            changes_processed: changes.len(),
404            entities_affected,
405            relations_affected,
406            embedding_deltas,
407            processing_time_ms,
408            delta_stats: stats,
409        };
410
411        println!("🔄 Delta computation completed:");
412        println!("   📊 Changes processed: {}", result.changes_processed);
413        println!(
414            "   👥 Entities affected: {}",
415            result.entities_affected.len()
416        );
417        println!(
418            "   🔗 Relations affected: {}",
419            result.relations_affected.len()
420        );
421        println!("   ⏱️  Processing time: {}ms", result.processing_time_ms);
422        println!(
423            "   📈 Avg embedding change: {:.6}",
424            result.delta_stats.avg_embedding_change
425        );
426
427        Ok(result)
428    }
429
430    /// Compute embedding deltas for affected entities
431    async fn compute_embedding_deltas(
432        &self,
433        model: &dyn EmbeddingModel,
434        entities: &HashSet<String>,
435    ) -> Result<HashMap<String, Array1<f32>>> {
436        let mut deltas = HashMap::new();
437        let baseline = self
438            .baseline_embeddings
439            .read()
440            .expect("rwlock should not be poisoned");
441
442        for entity in entities {
443            // Get current embedding
444            let current_embedding = match model.get_entity_embedding(entity) {
445                Ok(emb) => emb,
446                Err(_) => continue, // Skip entities that don't exist in model
447            };
448
449            // Get baseline embedding
450            let delta = if let Some(baseline_emb) = baseline.get(entity) {
451                // Compute delta from baseline
452                let current_array = Array1::from_vec(current_embedding.values);
453                &current_array - baseline_emb
454            } else {
455                // New entity - delta is the full embedding
456                Array1::from_vec(current_embedding.values)
457            };
458
459            deltas.insert(entity.clone(), delta);
460        }
461
462        Ok(deltas)
463    }
464
465    /// Apply incremental updates to the model
466    pub async fn apply_incremental_update(
467        &self,
468        model: &mut dyn EmbeddingModel,
469        delta_result: &DeltaResult,
470    ) -> Result<()> {
471        if !self.config.enable_incremental_updates {
472            return Ok(());
473        }
474
475        println!("🔄 Applying incremental updates...");
476
477        // Apply embedding deltas based on strategy
478        match &self.incremental_strategy {
479            IncrementalStrategy::Additive => {
480                self.apply_additive_updates(model, delta_result).await?;
481            }
482            IncrementalStrategy::WeightedAverage { alpha } => {
483                self.apply_weighted_average_updates(model, delta_result, *alpha)
484                    .await?;
485            }
486            IncrementalStrategy::ExponentialAverage { decay } => {
487                self.apply_exponential_average_updates(model, delta_result, *decay)
488                    .await?;
489            }
490            _ => {
491                // For complex strategies, fall back to full retraining
492                println!("   ⚠️  Complex strategy detected, skipping incremental update");
493            }
494        }
495
496        // Update baseline embeddings
497        self.update_baseline_embeddings(model, &delta_result.entities_affected)
498            .await?;
499
500        println!("✅ Incremental updates applied successfully");
501        Ok(())
502    }
503
504    /// Apply additive updates
505    async fn apply_additive_updates(
506        &self,
507        _model: &mut dyn EmbeddingModel,
508        delta_result: &DeltaResult,
509    ) -> Result<()> {
510        // In a real implementation, this would update the model's internal embeddings
511        println!(
512            "   📈 Applied additive updates to {} entities",
513            delta_result.entities_affected.len()
514        );
515        Ok(())
516    }
517
518    /// Apply weighted average updates
519    async fn apply_weighted_average_updates(
520        &self,
521        _model: &mut dyn EmbeddingModel,
522        delta_result: &DeltaResult,
523        alpha: f32,
524    ) -> Result<()> {
525        // new_embedding = (1 - alpha) * old_embedding + alpha * delta
526        println!(
527            "   ⚖️  Applied weighted average updates (α={}) to {} entities",
528            alpha,
529            delta_result.entities_affected.len()
530        );
531        Ok(())
532    }
533
534    /// Apply exponential moving average updates
535    async fn apply_exponential_average_updates(
536        &self,
537        _model: &mut dyn EmbeddingModel,
538        delta_result: &DeltaResult,
539        decay: f32,
540    ) -> Result<()> {
541        // new_embedding = decay * old_embedding + (1 - decay) * new_embedding
542        println!(
543            "   📉 Applied exponential average updates (decay={}) to {} entities",
544            decay,
545            delta_result.entities_affected.len()
546        );
547        Ok(())
548    }
549
550    /// Update baseline embeddings
551    async fn update_baseline_embeddings(
552        &self,
553        model: &dyn EmbeddingModel,
554        entities: &HashSet<String>,
555    ) -> Result<()> {
556        let mut baseline = self
557            .baseline_embeddings
558            .write()
559            .expect("rwlock should not be poisoned");
560
561        for entity in entities {
562            if let Ok(embedding) = model.get_entity_embedding(entity) {
563                let array = Array1::from_vec(embedding.values);
564                baseline.insert(entity.clone(), array);
565            }
566        }
567
568        Ok(())
569    }
570
571    /// Set baseline embeddings from current model state
572    pub async fn set_baseline_from_model(&self, model: &dyn EmbeddingModel) -> Result<()> {
573        let entities = model.get_entities();
574        let mut baseline = self
575            .baseline_embeddings
576            .write()
577            .expect("rwlock should not be poisoned");
578        baseline.clear();
579
580        for entity in entities {
581            if let Ok(embedding) = model.get_entity_embedding(&entity) {
582                let array = Array1::from_vec(embedding.values);
583                baseline.insert(entity, array);
584            }
585        }
586
587        println!("📋 Set baseline embeddings for {} entities", baseline.len());
588        Ok(())
589    }
590
591    /// Get change log within time window
592    pub fn get_changes_in_window(&self, window_start: DateTime<Utc>) -> Vec<ChangeRecord> {
593        let change_log = self
594            .change_log
595            .read()
596            .expect("rwlock should not be poisoned");
597        change_log
598            .iter()
599            .filter(|change| change.timestamp >= window_start)
600            .cloned()
601            .collect()
602    }
603
604    /// Get statistics about changes
605    pub fn get_change_statistics(&self) -> ChangeStatistics {
606        let change_log = self
607            .change_log
608            .read()
609            .expect("rwlock should not be poisoned");
610        let pending = self
611            .pending_changes
612            .read()
613            .expect("rwlock should not be poisoned");
614
615        let mut stats = ChangeStatistics {
616            total_changes: change_log.len(),
617            pending_changes: pending.len(),
618            ..Default::default()
619        };
620
621        // Count by type
622        for change in change_log.iter() {
623            match change.change_type {
624                ChangeType::EntityAdded => stats.entities_added += 1,
625                ChangeType::EntityRemoved => stats.entities_removed += 1,
626                ChangeType::EntityUpdated => stats.entities_updated += 1,
627                ChangeType::TripleAdded => stats.triples_added += 1,
628                ChangeType::TripleRemoved => stats.triples_removed += 1,
629                ChangeType::RelationAdded => stats.relations_added += 1,
630                ChangeType::RelationRemoved => stats.relations_removed += 1,
631                ChangeType::BulkImport => stats.bulk_imports += 1,
632                ChangeType::ModelRetrained => stats.model_retrains += 1,
633            }
634        }
635
636        // Get time range
637        if let (Some(oldest), Some(newest)) = (change_log.front(), change_log.back()) {
638            stats.oldest_change = Some(oldest.timestamp);
639            stats.newest_change = Some(newest.timestamp);
640        }
641
642        stats
643    }
644
645    /// Clear change log
646    pub fn clear_change_log(&self) {
647        let mut change_log = self
648            .change_log
649            .write()
650            .expect("rwlock should not be poisoned");
651        let mut pending = self
652            .pending_changes
653            .write()
654            .expect("rwlock should not be poisoned");
655        change_log.clear();
656        pending.clear();
657        println!("🗑️  Cleared change log and pending changes");
658    }
659
660    /// Set incremental update strategy
661    pub fn set_incremental_strategy(&mut self, strategy: IncrementalStrategy) {
662        self.incremental_strategy = strategy;
663    }
664
665    /// Check if delta computation is needed
666    pub fn should_compute_delta(&self) -> bool {
667        let pending = self
668            .pending_changes
669            .read()
670            .expect("rwlock should not be poisoned");
671        pending.len() >= self.config.min_changes_for_delta
672    }
673
674    /// Get last delta timestamp
675    pub fn get_last_delta_timestamp(&self) -> Option<DateTime<Utc>> {
676        *self
677            .last_delta_timestamp
678            .read()
679            .expect("rwlock should not be poisoned")
680    }
681}
682
683/// Statistics about changes in the system
684#[derive(Debug, Clone, Default)]
685pub struct ChangeStatistics {
686    pub total_changes: usize,
687    pub pending_changes: usize,
688    pub entities_added: usize,
689    pub entities_removed: usize,
690    pub entities_updated: usize,
691    pub triples_added: usize,
692    pub triples_removed: usize,
693    pub relations_added: usize,
694    pub relations_removed: usize,
695    pub bulk_imports: usize,
696    pub model_retrains: usize,
697    pub oldest_change: Option<DateTime<Utc>>,
698    pub newest_change: Option<DateTime<Utc>>,
699}
700
701impl ChangeStatistics {
702    /// Get total entity changes
703    pub fn total_entity_changes(&self) -> usize {
704        self.entities_added + self.entities_removed + self.entities_updated
705    }
706
707    /// Get total triple changes
708    pub fn total_triple_changes(&self) -> usize {
709        self.triples_added + self.triples_removed
710    }
711
712    /// Get total relation changes
713    pub fn total_relation_changes(&self) -> usize {
714        self.relations_added + self.relations_removed
715    }
716}
717
718/// Hash implementation for ChangeRecord to enable deduplication
719impl Hash for ChangeRecord {
720    fn hash<H: Hasher>(&self, state: &mut H) {
721        self.change_type.hash(state);
722        self.entity_id.hash(state);
723        self.relation_id.hash(state);
724        // Note: We don't hash timestamp to allow for deduplication of similar changes
725    }
726}
727
728impl PartialEq for ChangeRecord {
729    fn eq(&self, other: &Self) -> bool {
730        self.change_type == other.change_type
731            && self.entity_id == other.entity_id
732            && self.relation_id == other.relation_id
733    }
734}
735
736impl Eq for ChangeRecord {}
737
738#[cfg(test)]
739mod tests {
740    use super::*;
741    use crate::{ModelConfig, NamedNode, TransE};
742
743    #[test]
744    fn test_change_record_creation() {
745        let change = ChangeRecord::new(ChangeType::EntityAdded)
746            .with_entity("test_entity".to_string())
747            .with_metadata("source", "test");
748
749        assert_eq!(change.change_type, ChangeType::EntityAdded);
750        assert_eq!(change.entity_id, Some("test_entity".to_string()));
751        assert_eq!(change.metadata.get("source"), Some(&"test".to_string()));
752    }
753
754    #[test]
755    fn test_delta_config_default() {
756        let config = DeltaConfig::default();
757        assert_eq!(config.max_changes, 100_000);
758        assert_eq!(config.time_window_seconds, 3600);
759        assert!(config.enable_incremental_updates);
760        assert_eq!(config.delta_batch_size, 1000);
761    }
762
763    #[test]
764    fn test_delta_manager_creation() {
765        let config = DeltaConfig::default();
766        let manager = DeltaManager::new(config);
767
768        assert_eq!(manager.config.max_changes, 100_000);
769        assert!(!manager.should_compute_delta()); // No changes yet
770    }
771
772    #[test]
773    fn test_record_changes() {
774        let config = DeltaConfig {
775            min_changes_for_delta: 2,
776            ..Default::default()
777        };
778        let manager = DeltaManager::new(config);
779
780        // Record some changes
781        manager
782            .record_entity_added("entity1".to_string(), None)
783            .unwrap();
784        manager
785            .record_entity_added("entity2".to_string(), None)
786            .unwrap();
787
788        assert!(manager.should_compute_delta());
789
790        let stats = manager.get_change_statistics();
791        assert_eq!(stats.pending_changes, 2);
792        assert_eq!(stats.entities_added, 2);
793    }
794
795    #[test]
796    fn test_bulk_import_recording() {
797        let config = DeltaConfig::default();
798        let manager = DeltaManager::new(config);
799
800        let batch_id = manager.record_bulk_import(1000, 5000).unwrap();
801
802        let stats = manager.get_change_statistics();
803        assert_eq!(stats.bulk_imports, 1);
804        assert_eq!(stats.pending_changes, 1);
805
806        // Verify batch ID is assigned
807        let pending = manager
808            .pending_changes
809            .read()
810            .expect("rwlock should not be poisoned");
811        assert_eq!(pending[0].batch_id, Some(batch_id));
812    }
813
814    #[tokio::test]
815    async fn test_delta_computation() {
816        let config = DeltaConfig {
817            min_changes_for_delta: 1,
818            ..Default::default()
819        };
820        let manager = DeltaManager::new(config);
821
822        // Create a simple model
823        let model_config = ModelConfig::default().with_dimensions(10);
824        let mut model = TransE::new(model_config);
825
826        // Add a triple to have entities
827        let triple = Triple::new(
828            NamedNode::new("http://example.org/alice").unwrap(),
829            NamedNode::new("http://example.org/knows").unwrap(),
830            NamedNode::new("http://example.org/bob").unwrap(),
831        );
832        model.add_triple(triple.clone()).unwrap();
833
834        // Train the model briefly
835        model.train(Some(1)).await.unwrap();
836
837        // Set baseline
838        manager.set_baseline_from_model(&model).await.unwrap();
839
840        // Record changes
841        manager
842            .record_entity_added("http://example.org/alice".to_string(), None)
843            .unwrap();
844
845        // Compute delta
846        let delta_result = manager.compute_delta(&model).await.unwrap();
847
848        assert_eq!(delta_result.changes_processed, 1);
849        assert!(!delta_result.entities_affected.is_empty());
850        // Processing time is always >= 0 for unsigned type, so no need to assert
851    }
852
853    #[test]
854    fn test_change_statistics() {
855        let config = DeltaConfig::default();
856        let manager = DeltaManager::new(config);
857
858        // Record various types of changes
859        manager
860            .record_entity_added("entity1".to_string(), None)
861            .unwrap();
862        manager
863            .record_entity_removed("entity2".to_string())
864            .unwrap();
865
866        let triple = Triple::new(
867            NamedNode::new("http://example.org/s").unwrap(),
868            NamedNode::new("http://example.org/p").unwrap(),
869            NamedNode::new("http://example.org/o").unwrap(),
870        );
871        manager.record_triple_added(triple, None).unwrap();
872
873        let stats = manager.get_change_statistics();
874
875        assert_eq!(stats.total_entity_changes(), 2);
876        assert_eq!(stats.total_triple_changes(), 1);
877        assert_eq!(stats.total_changes, 3);
878        assert_eq!(stats.pending_changes, 3);
879    }
880
881    #[test]
882    fn test_incremental_strategies() {
883        let mut manager = DeltaManager::new(DeltaConfig::default());
884
885        // Test strategy setting
886        manager.set_incremental_strategy(IncrementalStrategy::Additive);
887        assert!(matches!(
888            manager.incremental_strategy,
889            IncrementalStrategy::Additive
890        ));
891
892        manager.set_incremental_strategy(IncrementalStrategy::WeightedAverage { alpha: 0.2 });
893        if let IncrementalStrategy::WeightedAverage { alpha } = manager.incremental_strategy {
894            assert_eq!(alpha, 0.2);
895        } else {
896            panic!("Expected WeightedAverage strategy");
897        }
898    }
899
900    #[test]
901    fn test_change_record_equality() {
902        let change1 = ChangeRecord::new(ChangeType::EntityAdded).with_entity("entity1".to_string());
903
904        let change2 = ChangeRecord::new(ChangeType::EntityAdded).with_entity("entity1".to_string());
905
906        let change3 = ChangeRecord::new(ChangeType::EntityAdded).with_entity("entity2".to_string());
907
908        assert_eq!(change1, change2); // Same type and entity
909        assert_ne!(change1, change3); // Different entity
910    }
911}