1use crate::{EmbeddingModel, Triple};
7use anyhow::{anyhow, Result};
8use chrono::{DateTime, Utc};
9use scirs2_core::ndarray_ext::Array1;
10use serde::{Deserialize, Serialize};
11use std::collections::{HashMap, HashSet, VecDeque};
12use std::hash::{Hash, Hasher};
13use std::sync::{Arc, RwLock};
14use tokio::sync::Semaphore;
15use uuid::Uuid;
16
17#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
19pub enum ChangeType {
20 EntityAdded,
22 EntityRemoved,
24 EntityUpdated,
26 TripleAdded,
28 TripleRemoved,
30 RelationAdded,
32 RelationRemoved,
34 BulkImport,
36 ModelRetrained,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct ChangeRecord {
43 pub id: Uuid,
44 pub change_type: ChangeType,
45 pub timestamp: DateTime<Utc>,
46 pub entity_id: Option<String>,
47 pub triple: Option<Triple>,
48 pub relation_id: Option<String>,
49 pub metadata: HashMap<String, String>,
50 pub batch_id: Option<Uuid>,
51}
52
53impl ChangeRecord {
54 pub fn new(change_type: ChangeType) -> Self {
55 Self {
56 id: Uuid::new_v4(),
57 change_type,
58 timestamp: Utc::now(),
59 entity_id: None,
60 triple: None,
61 relation_id: None,
62 metadata: HashMap::new(),
63 batch_id: None,
64 }
65 }
66
67 pub fn with_entity(mut self, entity_id: String) -> Self {
68 self.entity_id = Some(entity_id);
69 self
70 }
71
72 pub fn with_triple(mut self, triple: Triple) -> Self {
73 self.triple = Some(triple);
74 self
75 }
76
77 pub fn with_relation(mut self, relation_id: String) -> Self {
78 self.relation_id = Some(relation_id);
79 self
80 }
81
82 pub fn with_batch_id(mut self, batch_id: Uuid) -> Self {
83 self.batch_id = Some(batch_id);
84 self
85 }
86
87 pub fn with_metadata<K: ToString, V: ToString>(mut self, key: K, value: V) -> Self {
88 self.metadata.insert(key.to_string(), value.to_string());
89 self
90 }
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct DeltaConfig {
96 pub max_changes: usize,
98 pub time_window_seconds: u64,
100 pub enable_incremental_updates: bool,
102 pub delta_batch_size: usize,
104 pub max_concurrent_deltas: usize,
106 pub persist_changes: bool,
108 pub min_changes_for_delta: usize,
110}
111
112impl Default for DeltaConfig {
113 fn default() -> Self {
114 Self {
115 max_changes: 100_000,
116 time_window_seconds: 3600, enable_incremental_updates: true,
118 delta_batch_size: 1000,
119 max_concurrent_deltas: 4,
120 persist_changes: true,
121 min_changes_for_delta: 10,
122 }
123 }
124}
125
126#[derive(Debug, Clone, Serialize, Deserialize)]
128pub struct DeltaResult {
129 pub delta_id: Uuid,
130 pub from_timestamp: DateTime<Utc>,
131 pub to_timestamp: DateTime<Utc>,
132 pub changes_processed: usize,
133 pub entities_affected: HashSet<String>,
134 pub relations_affected: HashSet<String>,
135 pub embedding_deltas: HashMap<String, Array1<f32>>,
136 pub processing_time_ms: u64,
137 pub delta_stats: DeltaStats,
138}
139
140#[derive(Debug, Clone, Default, Serialize, Deserialize)]
142pub struct DeltaStats {
143 pub entities_added: usize,
144 pub entities_removed: usize,
145 pub entities_updated: usize,
146 pub triples_added: usize,
147 pub triples_removed: usize,
148 pub avg_embedding_change: f32,
149 pub max_embedding_change: f32,
150 pub convergence_iterations: usize,
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize)]
155pub enum IncrementalStrategy {
156 Additive,
158 GradientBased,
160 WeightedAverage { alpha: f32 },
162 ExponentialAverage { decay: f32 },
164 IncrementalLearning,
166}
167
168impl Default for IncrementalStrategy {
169 fn default() -> Self {
170 IncrementalStrategy::WeightedAverage { alpha: 0.1 }
171 }
172}
173
174pub struct DeltaManager {
176 config: DeltaConfig,
177 change_log: Arc<RwLock<VecDeque<ChangeRecord>>>,
179 baseline_embeddings: Arc<RwLock<HashMap<String, Array1<f32>>>>,
181 pending_changes: Arc<RwLock<Vec<ChangeRecord>>>,
183 computation_semaphore: Arc<Semaphore>,
185 last_delta_timestamp: Arc<RwLock<Option<DateTime<Utc>>>>,
187 incremental_strategy: IncrementalStrategy,
189}
190
191impl DeltaManager {
192 pub fn new(config: DeltaConfig) -> Self {
194 let computation_semaphore = Arc::new(Semaphore::new(config.max_concurrent_deltas));
195
196 Self {
197 config,
198 change_log: Arc::new(RwLock::new(VecDeque::new())),
199 baseline_embeddings: Arc::new(RwLock::new(HashMap::new())),
200 pending_changes: Arc::new(RwLock::new(Vec::new())),
201 computation_semaphore,
202 last_delta_timestamp: Arc::new(RwLock::new(None)),
203 incremental_strategy: IncrementalStrategy::default(),
204 }
205 }
206
207 pub fn record_change(&self, change: ChangeRecord) -> Result<()> {
209 let mut change_log = self
210 .change_log
211 .write()
212 .expect("rwlock should not be poisoned");
213
214 change_log.push_back(change.clone());
216
217 while change_log.len() > self.config.max_changes {
219 change_log.pop_front();
220 }
221
222 drop(change_log);
223
224 let mut pending = self
226 .pending_changes
227 .write()
228 .expect("rwlock should not be poisoned");
229 pending.push(change);
230
231 Ok(())
232 }
233
234 pub fn record_entity_added(&self, entity_id: String, batch_id: Option<Uuid>) -> Result<()> {
236 let mut change = ChangeRecord::new(ChangeType::EntityAdded).with_entity(entity_id);
237
238 if let Some(batch_id) = batch_id {
239 change = change.with_batch_id(batch_id);
240 }
241
242 self.record_change(change)
243 }
244
245 pub fn record_entity_removed(&self, entity_id: String) -> Result<()> {
247 let change = ChangeRecord::new(ChangeType::EntityRemoved).with_entity(entity_id);
248 self.record_change(change)
249 }
250
251 pub fn record_triple_added(&self, triple: Triple, batch_id: Option<Uuid>) -> Result<()> {
253 let mut change = ChangeRecord::new(ChangeType::TripleAdded).with_triple(triple);
254
255 if let Some(batch_id) = batch_id {
256 change = change.with_batch_id(batch_id);
257 }
258
259 self.record_change(change)
260 }
261
262 pub fn record_triple_removed(&self, triple: Triple) -> Result<()> {
264 let change = ChangeRecord::new(ChangeType::TripleRemoved).with_triple(triple);
265 self.record_change(change)
266 }
267
268 pub fn record_bulk_import(&self, entity_count: usize, triple_count: usize) -> Result<Uuid> {
270 let batch_id = Uuid::new_v4();
271 let change = ChangeRecord::new(ChangeType::BulkImport)
272 .with_batch_id(batch_id)
273 .with_metadata("entities", entity_count.to_string())
274 .with_metadata("triples", triple_count.to_string());
275
276 self.record_change(change)?;
277 Ok(batch_id)
278 }
279
280 pub async fn compute_delta(&self, model: &dyn EmbeddingModel) -> Result<DeltaResult> {
282 let _permit = self
283 .computation_semaphore
284 .acquire()
285 .await
286 .map_err(|e| anyhow!("Failed to acquire computation semaphore: {}", e))?;
287
288 let start_time = std::time::Instant::now();
289 let delta_id = Uuid::new_v4();
290
291 let changes = {
293 let mut pending = self
294 .pending_changes
295 .write()
296 .expect("rwlock should not be poisoned");
297 if pending.len() < self.config.min_changes_for_delta {
298 return Err(anyhow!(
299 "Not enough changes for delta computation: {} < {}",
300 pending.len(),
301 self.config.min_changes_for_delta
302 ));
303 }
304 let result = pending.clone();
305 pending.clear();
306 result
307 };
308
309 if changes.is_empty() {
310 return Err(anyhow!("No changes to process"));
311 }
312
313 let from_timestamp = changes
314 .iter()
315 .map(|c| c.timestamp)
316 .min()
317 .unwrap_or_else(Utc::now);
318
319 let to_timestamp = changes
320 .iter()
321 .map(|c| c.timestamp)
322 .max()
323 .unwrap_or_else(Utc::now);
324
325 let mut stats = DeltaStats::default();
327 let mut entities_affected = HashSet::new();
328 let mut relations_affected = HashSet::new();
329
330 for change in &changes {
331 match &change.change_type {
332 ChangeType::EntityAdded => {
333 stats.entities_added += 1;
334 if let Some(entity) = &change.entity_id {
335 entities_affected.insert(entity.clone());
336 }
337 }
338 ChangeType::EntityRemoved => {
339 stats.entities_removed += 1;
340 if let Some(entity) = &change.entity_id {
341 entities_affected.insert(entity.clone());
342 }
343 }
344 ChangeType::EntityUpdated => {
345 stats.entities_updated += 1;
346 if let Some(entity) = &change.entity_id {
347 entities_affected.insert(entity.clone());
348 }
349 }
350 ChangeType::TripleAdded => {
351 stats.triples_added += 1;
352 if let Some(triple) = &change.triple {
353 entities_affected.insert(triple.subject.iri.clone());
354 entities_affected.insert(triple.object.iri.clone());
355 relations_affected.insert(triple.predicate.iri.clone());
356 }
357 }
358 ChangeType::TripleRemoved => {
359 stats.triples_removed += 1;
360 if let Some(triple) = &change.triple {
361 entities_affected.insert(triple.subject.iri.clone());
362 entities_affected.insert(triple.object.iri.clone());
363 relations_affected.insert(triple.predicate.iri.clone());
364 }
365 }
366 _ => {}
367 }
368 }
369
370 let embedding_deltas = self
372 .compute_embedding_deltas(model, &entities_affected)
373 .await?;
374
375 let embedding_changes: Vec<f32> = embedding_deltas
377 .values()
378 .flat_map(|delta| delta.iter().map(|&x| x.abs()))
379 .collect();
380
381 if !embedding_changes.is_empty() {
382 stats.avg_embedding_change =
383 embedding_changes.iter().sum::<f32>() / embedding_changes.len() as f32;
384 stats.max_embedding_change =
385 embedding_changes.iter().fold(0.0f32, |max, &x| max.max(x));
386 }
387
388 let processing_time_ms = start_time.elapsed().as_millis() as u64;
389
390 {
392 let mut last_timestamp = self
393 .last_delta_timestamp
394 .write()
395 .expect("rwlock should not be poisoned");
396 *last_timestamp = Some(to_timestamp);
397 }
398
399 let result = DeltaResult {
400 delta_id,
401 from_timestamp,
402 to_timestamp,
403 changes_processed: changes.len(),
404 entities_affected,
405 relations_affected,
406 embedding_deltas,
407 processing_time_ms,
408 delta_stats: stats,
409 };
410
411 println!("🔄 Delta computation completed:");
412 println!(" 📊 Changes processed: {}", result.changes_processed);
413 println!(
414 " 👥 Entities affected: {}",
415 result.entities_affected.len()
416 );
417 println!(
418 " 🔗 Relations affected: {}",
419 result.relations_affected.len()
420 );
421 println!(" ⏱️ Processing time: {}ms", result.processing_time_ms);
422 println!(
423 " 📈 Avg embedding change: {:.6}",
424 result.delta_stats.avg_embedding_change
425 );
426
427 Ok(result)
428 }
429
430 async fn compute_embedding_deltas(
432 &self,
433 model: &dyn EmbeddingModel,
434 entities: &HashSet<String>,
435 ) -> Result<HashMap<String, Array1<f32>>> {
436 let mut deltas = HashMap::new();
437 let baseline = self
438 .baseline_embeddings
439 .read()
440 .expect("rwlock should not be poisoned");
441
442 for entity in entities {
443 let current_embedding = match model.get_entity_embedding(entity) {
445 Ok(emb) => emb,
446 Err(_) => continue, };
448
449 let delta = if let Some(baseline_emb) = baseline.get(entity) {
451 let current_array = Array1::from_vec(current_embedding.values);
453 ¤t_array - baseline_emb
454 } else {
455 Array1::from_vec(current_embedding.values)
457 };
458
459 deltas.insert(entity.clone(), delta);
460 }
461
462 Ok(deltas)
463 }
464
465 pub async fn apply_incremental_update(
467 &self,
468 model: &mut dyn EmbeddingModel,
469 delta_result: &DeltaResult,
470 ) -> Result<()> {
471 if !self.config.enable_incremental_updates {
472 return Ok(());
473 }
474
475 println!("🔄 Applying incremental updates...");
476
477 match &self.incremental_strategy {
479 IncrementalStrategy::Additive => {
480 self.apply_additive_updates(model, delta_result).await?;
481 }
482 IncrementalStrategy::WeightedAverage { alpha } => {
483 self.apply_weighted_average_updates(model, delta_result, *alpha)
484 .await?;
485 }
486 IncrementalStrategy::ExponentialAverage { decay } => {
487 self.apply_exponential_average_updates(model, delta_result, *decay)
488 .await?;
489 }
490 _ => {
491 println!(" ⚠️ Complex strategy detected, skipping incremental update");
493 }
494 }
495
496 self.update_baseline_embeddings(model, &delta_result.entities_affected)
498 .await?;
499
500 println!("✅ Incremental updates applied successfully");
501 Ok(())
502 }
503
504 async fn apply_additive_updates(
506 &self,
507 _model: &mut dyn EmbeddingModel,
508 delta_result: &DeltaResult,
509 ) -> Result<()> {
510 println!(
512 " 📈 Applied additive updates to {} entities",
513 delta_result.entities_affected.len()
514 );
515 Ok(())
516 }
517
518 async fn apply_weighted_average_updates(
520 &self,
521 _model: &mut dyn EmbeddingModel,
522 delta_result: &DeltaResult,
523 alpha: f32,
524 ) -> Result<()> {
525 println!(
527 " ⚖️ Applied weighted average updates (α={}) to {} entities",
528 alpha,
529 delta_result.entities_affected.len()
530 );
531 Ok(())
532 }
533
534 async fn apply_exponential_average_updates(
536 &self,
537 _model: &mut dyn EmbeddingModel,
538 delta_result: &DeltaResult,
539 decay: f32,
540 ) -> Result<()> {
541 println!(
543 " 📉 Applied exponential average updates (decay={}) to {} entities",
544 decay,
545 delta_result.entities_affected.len()
546 );
547 Ok(())
548 }
549
550 async fn update_baseline_embeddings(
552 &self,
553 model: &dyn EmbeddingModel,
554 entities: &HashSet<String>,
555 ) -> Result<()> {
556 let mut baseline = self
557 .baseline_embeddings
558 .write()
559 .expect("rwlock should not be poisoned");
560
561 for entity in entities {
562 if let Ok(embedding) = model.get_entity_embedding(entity) {
563 let array = Array1::from_vec(embedding.values);
564 baseline.insert(entity.clone(), array);
565 }
566 }
567
568 Ok(())
569 }
570
571 pub async fn set_baseline_from_model(&self, model: &dyn EmbeddingModel) -> Result<()> {
573 let entities = model.get_entities();
574 let mut baseline = self
575 .baseline_embeddings
576 .write()
577 .expect("rwlock should not be poisoned");
578 baseline.clear();
579
580 for entity in entities {
581 if let Ok(embedding) = model.get_entity_embedding(&entity) {
582 let array = Array1::from_vec(embedding.values);
583 baseline.insert(entity, array);
584 }
585 }
586
587 println!("📋 Set baseline embeddings for {} entities", baseline.len());
588 Ok(())
589 }
590
591 pub fn get_changes_in_window(&self, window_start: DateTime<Utc>) -> Vec<ChangeRecord> {
593 let change_log = self
594 .change_log
595 .read()
596 .expect("rwlock should not be poisoned");
597 change_log
598 .iter()
599 .filter(|change| change.timestamp >= window_start)
600 .cloned()
601 .collect()
602 }
603
604 pub fn get_change_statistics(&self) -> ChangeStatistics {
606 let change_log = self
607 .change_log
608 .read()
609 .expect("rwlock should not be poisoned");
610 let pending = self
611 .pending_changes
612 .read()
613 .expect("rwlock should not be poisoned");
614
615 let mut stats = ChangeStatistics {
616 total_changes: change_log.len(),
617 pending_changes: pending.len(),
618 ..Default::default()
619 };
620
621 for change in change_log.iter() {
623 match change.change_type {
624 ChangeType::EntityAdded => stats.entities_added += 1,
625 ChangeType::EntityRemoved => stats.entities_removed += 1,
626 ChangeType::EntityUpdated => stats.entities_updated += 1,
627 ChangeType::TripleAdded => stats.triples_added += 1,
628 ChangeType::TripleRemoved => stats.triples_removed += 1,
629 ChangeType::RelationAdded => stats.relations_added += 1,
630 ChangeType::RelationRemoved => stats.relations_removed += 1,
631 ChangeType::BulkImport => stats.bulk_imports += 1,
632 ChangeType::ModelRetrained => stats.model_retrains += 1,
633 }
634 }
635
636 if let (Some(oldest), Some(newest)) = (change_log.front(), change_log.back()) {
638 stats.oldest_change = Some(oldest.timestamp);
639 stats.newest_change = Some(newest.timestamp);
640 }
641
642 stats
643 }
644
645 pub fn clear_change_log(&self) {
647 let mut change_log = self
648 .change_log
649 .write()
650 .expect("rwlock should not be poisoned");
651 let mut pending = self
652 .pending_changes
653 .write()
654 .expect("rwlock should not be poisoned");
655 change_log.clear();
656 pending.clear();
657 println!("🗑️ Cleared change log and pending changes");
658 }
659
660 pub fn set_incremental_strategy(&mut self, strategy: IncrementalStrategy) {
662 self.incremental_strategy = strategy;
663 }
664
665 pub fn should_compute_delta(&self) -> bool {
667 let pending = self
668 .pending_changes
669 .read()
670 .expect("rwlock should not be poisoned");
671 pending.len() >= self.config.min_changes_for_delta
672 }
673
674 pub fn get_last_delta_timestamp(&self) -> Option<DateTime<Utc>> {
676 *self
677 .last_delta_timestamp
678 .read()
679 .expect("rwlock should not be poisoned")
680 }
681}
682
683#[derive(Debug, Clone, Default)]
685pub struct ChangeStatistics {
686 pub total_changes: usize,
687 pub pending_changes: usize,
688 pub entities_added: usize,
689 pub entities_removed: usize,
690 pub entities_updated: usize,
691 pub triples_added: usize,
692 pub triples_removed: usize,
693 pub relations_added: usize,
694 pub relations_removed: usize,
695 pub bulk_imports: usize,
696 pub model_retrains: usize,
697 pub oldest_change: Option<DateTime<Utc>>,
698 pub newest_change: Option<DateTime<Utc>>,
699}
700
701impl ChangeStatistics {
702 pub fn total_entity_changes(&self) -> usize {
704 self.entities_added + self.entities_removed + self.entities_updated
705 }
706
707 pub fn total_triple_changes(&self) -> usize {
709 self.triples_added + self.triples_removed
710 }
711
712 pub fn total_relation_changes(&self) -> usize {
714 self.relations_added + self.relations_removed
715 }
716}
717
718impl Hash for ChangeRecord {
720 fn hash<H: Hasher>(&self, state: &mut H) {
721 self.change_type.hash(state);
722 self.entity_id.hash(state);
723 self.relation_id.hash(state);
724 }
726}
727
728impl PartialEq for ChangeRecord {
729 fn eq(&self, other: &Self) -> bool {
730 self.change_type == other.change_type
731 && self.entity_id == other.entity_id
732 && self.relation_id == other.relation_id
733 }
734}
735
736impl Eq for ChangeRecord {}
737
738#[cfg(test)]
739mod tests {
740 use super::*;
741 use crate::{ModelConfig, NamedNode, TransE};
742
743 #[test]
744 fn test_change_record_creation() {
745 let change = ChangeRecord::new(ChangeType::EntityAdded)
746 .with_entity("test_entity".to_string())
747 .with_metadata("source", "test");
748
749 assert_eq!(change.change_type, ChangeType::EntityAdded);
750 assert_eq!(change.entity_id, Some("test_entity".to_string()));
751 assert_eq!(change.metadata.get("source"), Some(&"test".to_string()));
752 }
753
754 #[test]
755 fn test_delta_config_default() {
756 let config = DeltaConfig::default();
757 assert_eq!(config.max_changes, 100_000);
758 assert_eq!(config.time_window_seconds, 3600);
759 assert!(config.enable_incremental_updates);
760 assert_eq!(config.delta_batch_size, 1000);
761 }
762
763 #[test]
764 fn test_delta_manager_creation() {
765 let config = DeltaConfig::default();
766 let manager = DeltaManager::new(config);
767
768 assert_eq!(manager.config.max_changes, 100_000);
769 assert!(!manager.should_compute_delta()); }
771
772 #[test]
773 fn test_record_changes() {
774 let config = DeltaConfig {
775 min_changes_for_delta: 2,
776 ..Default::default()
777 };
778 let manager = DeltaManager::new(config);
779
780 manager
782 .record_entity_added("entity1".to_string(), None)
783 .unwrap();
784 manager
785 .record_entity_added("entity2".to_string(), None)
786 .unwrap();
787
788 assert!(manager.should_compute_delta());
789
790 let stats = manager.get_change_statistics();
791 assert_eq!(stats.pending_changes, 2);
792 assert_eq!(stats.entities_added, 2);
793 }
794
795 #[test]
796 fn test_bulk_import_recording() {
797 let config = DeltaConfig::default();
798 let manager = DeltaManager::new(config);
799
800 let batch_id = manager.record_bulk_import(1000, 5000).unwrap();
801
802 let stats = manager.get_change_statistics();
803 assert_eq!(stats.bulk_imports, 1);
804 assert_eq!(stats.pending_changes, 1);
805
806 let pending = manager
808 .pending_changes
809 .read()
810 .expect("rwlock should not be poisoned");
811 assert_eq!(pending[0].batch_id, Some(batch_id));
812 }
813
814 #[tokio::test]
815 async fn test_delta_computation() {
816 let config = DeltaConfig {
817 min_changes_for_delta: 1,
818 ..Default::default()
819 };
820 let manager = DeltaManager::new(config);
821
822 let model_config = ModelConfig::default().with_dimensions(10);
824 let mut model = TransE::new(model_config);
825
826 let triple = Triple::new(
828 NamedNode::new("http://example.org/alice").unwrap(),
829 NamedNode::new("http://example.org/knows").unwrap(),
830 NamedNode::new("http://example.org/bob").unwrap(),
831 );
832 model.add_triple(triple.clone()).unwrap();
833
834 model.train(Some(1)).await.unwrap();
836
837 manager.set_baseline_from_model(&model).await.unwrap();
839
840 manager
842 .record_entity_added("http://example.org/alice".to_string(), None)
843 .unwrap();
844
845 let delta_result = manager.compute_delta(&model).await.unwrap();
847
848 assert_eq!(delta_result.changes_processed, 1);
849 assert!(!delta_result.entities_affected.is_empty());
850 }
852
853 #[test]
854 fn test_change_statistics() {
855 let config = DeltaConfig::default();
856 let manager = DeltaManager::new(config);
857
858 manager
860 .record_entity_added("entity1".to_string(), None)
861 .unwrap();
862 manager
863 .record_entity_removed("entity2".to_string())
864 .unwrap();
865
866 let triple = Triple::new(
867 NamedNode::new("http://example.org/s").unwrap(),
868 NamedNode::new("http://example.org/p").unwrap(),
869 NamedNode::new("http://example.org/o").unwrap(),
870 );
871 manager.record_triple_added(triple, None).unwrap();
872
873 let stats = manager.get_change_statistics();
874
875 assert_eq!(stats.total_entity_changes(), 2);
876 assert_eq!(stats.total_triple_changes(), 1);
877 assert_eq!(stats.total_changes, 3);
878 assert_eq!(stats.pending_changes, 3);
879 }
880
881 #[test]
882 fn test_incremental_strategies() {
883 let mut manager = DeltaManager::new(DeltaConfig::default());
884
885 manager.set_incremental_strategy(IncrementalStrategy::Additive);
887 assert!(matches!(
888 manager.incremental_strategy,
889 IncrementalStrategy::Additive
890 ));
891
892 manager.set_incremental_strategy(IncrementalStrategy::WeightedAverage { alpha: 0.2 });
893 if let IncrementalStrategy::WeightedAverage { alpha } = manager.incremental_strategy {
894 assert_eq!(alpha, 0.2);
895 } else {
896 panic!("Expected WeightedAverage strategy");
897 }
898 }
899
900 #[test]
901 fn test_change_record_equality() {
902 let change1 = ChangeRecord::new(ChangeType::EntityAdded).with_entity("entity1".to_string());
903
904 let change2 = ChangeRecord::new(ChangeType::EntityAdded).with_entity("entity1".to_string());
905
906 let change3 = ChangeRecord::new(ChangeType::EntityAdded).with_entity("entity2".to_string());
907
908 assert_eq!(change1, change2); assert_ne!(change1, change3); }
911}