1use crate::{EmbeddingModel, Triple};
7use anyhow::{anyhow, Result};
8use chrono::{DateTime, Utc};
9use scirs2_core::ndarray_ext::Array1;
10use serde::{Deserialize, Serialize};
11use std::collections::{HashMap, HashSet, VecDeque};
12use std::hash::{Hash, Hasher};
13use std::sync::{Arc, RwLock};
14use tokio::sync::Semaphore;
15use uuid::Uuid;
16
17#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
19pub enum ChangeType {
20 EntityAdded,
22 EntityRemoved,
24 EntityUpdated,
26 TripleAdded,
28 TripleRemoved,
30 RelationAdded,
32 RelationRemoved,
34 BulkImport,
36 ModelRetrained,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct ChangeRecord {
43 pub id: Uuid,
44 pub change_type: ChangeType,
45 pub timestamp: DateTime<Utc>,
46 pub entity_id: Option<String>,
47 pub triple: Option<Triple>,
48 pub relation_id: Option<String>,
49 pub metadata: HashMap<String, String>,
50 pub batch_id: Option<Uuid>,
51}
52
53impl ChangeRecord {
54 pub fn new(change_type: ChangeType) -> Self {
55 Self {
56 id: Uuid::new_v4(),
57 change_type,
58 timestamp: Utc::now(),
59 entity_id: None,
60 triple: None,
61 relation_id: None,
62 metadata: HashMap::new(),
63 batch_id: None,
64 }
65 }
66
67 pub fn with_entity(mut self, entity_id: String) -> Self {
68 self.entity_id = Some(entity_id);
69 self
70 }
71
72 pub fn with_triple(mut self, triple: Triple) -> Self {
73 self.triple = Some(triple);
74 self
75 }
76
77 pub fn with_relation(mut self, relation_id: String) -> Self {
78 self.relation_id = Some(relation_id);
79 self
80 }
81
82 pub fn with_batch_id(mut self, batch_id: Uuid) -> Self {
83 self.batch_id = Some(batch_id);
84 self
85 }
86
87 pub fn with_metadata<K: ToString, V: ToString>(mut self, key: K, value: V) -> Self {
88 self.metadata.insert(key.to_string(), value.to_string());
89 self
90 }
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct DeltaConfig {
96 pub max_changes: usize,
98 pub time_window_seconds: u64,
100 pub enable_incremental_updates: bool,
102 pub delta_batch_size: usize,
104 pub max_concurrent_deltas: usize,
106 pub persist_changes: bool,
108 pub min_changes_for_delta: usize,
110}
111
112impl Default for DeltaConfig {
113 fn default() -> Self {
114 Self {
115 max_changes: 100_000,
116 time_window_seconds: 3600, enable_incremental_updates: true,
118 delta_batch_size: 1000,
119 max_concurrent_deltas: 4,
120 persist_changes: true,
121 min_changes_for_delta: 10,
122 }
123 }
124}
125
126#[derive(Debug, Clone, Serialize, Deserialize)]
128pub struct DeltaResult {
129 pub delta_id: Uuid,
130 pub from_timestamp: DateTime<Utc>,
131 pub to_timestamp: DateTime<Utc>,
132 pub changes_processed: usize,
133 pub entities_affected: HashSet<String>,
134 pub relations_affected: HashSet<String>,
135 pub embedding_deltas: HashMap<String, Array1<f32>>,
136 pub processing_time_ms: u64,
137 pub delta_stats: DeltaStats,
138}
139
140#[derive(Debug, Clone, Default, Serialize, Deserialize)]
142pub struct DeltaStats {
143 pub entities_added: usize,
144 pub entities_removed: usize,
145 pub entities_updated: usize,
146 pub triples_added: usize,
147 pub triples_removed: usize,
148 pub avg_embedding_change: f32,
149 pub max_embedding_change: f32,
150 pub convergence_iterations: usize,
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize)]
155pub enum IncrementalStrategy {
156 Additive,
158 GradientBased,
160 WeightedAverage { alpha: f32 },
162 ExponentialAverage { decay: f32 },
164 IncrementalLearning,
166}
167
168impl Default for IncrementalStrategy {
169 fn default() -> Self {
170 IncrementalStrategy::WeightedAverage { alpha: 0.1 }
171 }
172}
173
174pub struct DeltaManager {
176 config: DeltaConfig,
177 change_log: Arc<RwLock<VecDeque<ChangeRecord>>>,
179 baseline_embeddings: Arc<RwLock<HashMap<String, Array1<f32>>>>,
181 pending_changes: Arc<RwLock<Vec<ChangeRecord>>>,
183 computation_semaphore: Arc<Semaphore>,
185 last_delta_timestamp: Arc<RwLock<Option<DateTime<Utc>>>>,
187 incremental_strategy: IncrementalStrategy,
189}
190
191impl DeltaManager {
192 pub fn new(config: DeltaConfig) -> Self {
194 let computation_semaphore = Arc::new(Semaphore::new(config.max_concurrent_deltas));
195
196 Self {
197 config,
198 change_log: Arc::new(RwLock::new(VecDeque::new())),
199 baseline_embeddings: Arc::new(RwLock::new(HashMap::new())),
200 pending_changes: Arc::new(RwLock::new(Vec::new())),
201 computation_semaphore,
202 last_delta_timestamp: Arc::new(RwLock::new(None)),
203 incremental_strategy: IncrementalStrategy::default(),
204 }
205 }
206
207 pub fn record_change(&self, change: ChangeRecord) -> Result<()> {
209 let mut change_log = self.change_log.write().unwrap();
210
211 change_log.push_back(change.clone());
213
214 while change_log.len() > self.config.max_changes {
216 change_log.pop_front();
217 }
218
219 drop(change_log);
220
221 let mut pending = self.pending_changes.write().unwrap();
223 pending.push(change);
224
225 Ok(())
226 }
227
228 pub fn record_entity_added(&self, entity_id: String, batch_id: Option<Uuid>) -> Result<()> {
230 let mut change = ChangeRecord::new(ChangeType::EntityAdded).with_entity(entity_id);
231
232 if let Some(batch_id) = batch_id {
233 change = change.with_batch_id(batch_id);
234 }
235
236 self.record_change(change)
237 }
238
239 pub fn record_entity_removed(&self, entity_id: String) -> Result<()> {
241 let change = ChangeRecord::new(ChangeType::EntityRemoved).with_entity(entity_id);
242 self.record_change(change)
243 }
244
245 pub fn record_triple_added(&self, triple: Triple, batch_id: Option<Uuid>) -> Result<()> {
247 let mut change = ChangeRecord::new(ChangeType::TripleAdded).with_triple(triple);
248
249 if let Some(batch_id) = batch_id {
250 change = change.with_batch_id(batch_id);
251 }
252
253 self.record_change(change)
254 }
255
256 pub fn record_triple_removed(&self, triple: Triple) -> Result<()> {
258 let change = ChangeRecord::new(ChangeType::TripleRemoved).with_triple(triple);
259 self.record_change(change)
260 }
261
262 pub fn record_bulk_import(&self, entity_count: usize, triple_count: usize) -> Result<Uuid> {
264 let batch_id = Uuid::new_v4();
265 let change = ChangeRecord::new(ChangeType::BulkImport)
266 .with_batch_id(batch_id)
267 .with_metadata("entities", entity_count.to_string())
268 .with_metadata("triples", triple_count.to_string());
269
270 self.record_change(change)?;
271 Ok(batch_id)
272 }
273
274 pub async fn compute_delta(&self, model: &dyn EmbeddingModel) -> Result<DeltaResult> {
276 let _permit = self
277 .computation_semaphore
278 .acquire()
279 .await
280 .map_err(|e| anyhow!("Failed to acquire computation semaphore: {}", e))?;
281
282 let start_time = std::time::Instant::now();
283 let delta_id = Uuid::new_v4();
284
285 let changes = {
287 let mut pending = self.pending_changes.write().unwrap();
288 if pending.len() < self.config.min_changes_for_delta {
289 return Err(anyhow!(
290 "Not enough changes for delta computation: {} < {}",
291 pending.len(),
292 self.config.min_changes_for_delta
293 ));
294 }
295 let result = pending.clone();
296 pending.clear();
297 result
298 };
299
300 if changes.is_empty() {
301 return Err(anyhow!("No changes to process"));
302 }
303
304 let from_timestamp = changes
305 .iter()
306 .map(|c| c.timestamp)
307 .min()
308 .unwrap_or_else(Utc::now);
309
310 let to_timestamp = changes
311 .iter()
312 .map(|c| c.timestamp)
313 .max()
314 .unwrap_or_else(Utc::now);
315
316 let mut stats = DeltaStats::default();
318 let mut entities_affected = HashSet::new();
319 let mut relations_affected = HashSet::new();
320
321 for change in &changes {
322 match &change.change_type {
323 ChangeType::EntityAdded => {
324 stats.entities_added += 1;
325 if let Some(entity) = &change.entity_id {
326 entities_affected.insert(entity.clone());
327 }
328 }
329 ChangeType::EntityRemoved => {
330 stats.entities_removed += 1;
331 if let Some(entity) = &change.entity_id {
332 entities_affected.insert(entity.clone());
333 }
334 }
335 ChangeType::EntityUpdated => {
336 stats.entities_updated += 1;
337 if let Some(entity) = &change.entity_id {
338 entities_affected.insert(entity.clone());
339 }
340 }
341 ChangeType::TripleAdded => {
342 stats.triples_added += 1;
343 if let Some(triple) = &change.triple {
344 entities_affected.insert(triple.subject.iri.clone());
345 entities_affected.insert(triple.object.iri.clone());
346 relations_affected.insert(triple.predicate.iri.clone());
347 }
348 }
349 ChangeType::TripleRemoved => {
350 stats.triples_removed += 1;
351 if let Some(triple) = &change.triple {
352 entities_affected.insert(triple.subject.iri.clone());
353 entities_affected.insert(triple.object.iri.clone());
354 relations_affected.insert(triple.predicate.iri.clone());
355 }
356 }
357 _ => {}
358 }
359 }
360
361 let embedding_deltas = self
363 .compute_embedding_deltas(model, &entities_affected)
364 .await?;
365
366 let embedding_changes: Vec<f32> = embedding_deltas
368 .values()
369 .flat_map(|delta| delta.iter().map(|&x| x.abs()))
370 .collect();
371
372 if !embedding_changes.is_empty() {
373 stats.avg_embedding_change =
374 embedding_changes.iter().sum::<f32>() / embedding_changes.len() as f32;
375 stats.max_embedding_change =
376 embedding_changes.iter().fold(0.0f32, |max, &x| max.max(x));
377 }
378
379 let processing_time_ms = start_time.elapsed().as_millis() as u64;
380
381 {
383 let mut last_timestamp = self.last_delta_timestamp.write().unwrap();
384 *last_timestamp = Some(to_timestamp);
385 }
386
387 let result = DeltaResult {
388 delta_id,
389 from_timestamp,
390 to_timestamp,
391 changes_processed: changes.len(),
392 entities_affected,
393 relations_affected,
394 embedding_deltas,
395 processing_time_ms,
396 delta_stats: stats,
397 };
398
399 println!("🔄 Delta computation completed:");
400 println!(" 📊 Changes processed: {}", result.changes_processed);
401 println!(
402 " 👥 Entities affected: {}",
403 result.entities_affected.len()
404 );
405 println!(
406 " 🔗 Relations affected: {}",
407 result.relations_affected.len()
408 );
409 println!(" ⏱️ Processing time: {}ms", result.processing_time_ms);
410 println!(
411 " 📈 Avg embedding change: {:.6}",
412 result.delta_stats.avg_embedding_change
413 );
414
415 Ok(result)
416 }
417
418 async fn compute_embedding_deltas(
420 &self,
421 model: &dyn EmbeddingModel,
422 entities: &HashSet<String>,
423 ) -> Result<HashMap<String, Array1<f32>>> {
424 let mut deltas = HashMap::new();
425 let baseline = self.baseline_embeddings.read().unwrap();
426
427 for entity in entities {
428 let current_embedding = match model.get_entity_embedding(entity) {
430 Ok(emb) => emb,
431 Err(_) => continue, };
433
434 let delta = if let Some(baseline_emb) = baseline.get(entity) {
436 let current_array = Array1::from_vec(current_embedding.values);
438 ¤t_array - baseline_emb
439 } else {
440 Array1::from_vec(current_embedding.values)
442 };
443
444 deltas.insert(entity.clone(), delta);
445 }
446
447 Ok(deltas)
448 }
449
450 pub async fn apply_incremental_update(
452 &self,
453 model: &mut dyn EmbeddingModel,
454 delta_result: &DeltaResult,
455 ) -> Result<()> {
456 if !self.config.enable_incremental_updates {
457 return Ok(());
458 }
459
460 println!("🔄 Applying incremental updates...");
461
462 match &self.incremental_strategy {
464 IncrementalStrategy::Additive => {
465 self.apply_additive_updates(model, delta_result).await?;
466 }
467 IncrementalStrategy::WeightedAverage { alpha } => {
468 self.apply_weighted_average_updates(model, delta_result, *alpha)
469 .await?;
470 }
471 IncrementalStrategy::ExponentialAverage { decay } => {
472 self.apply_exponential_average_updates(model, delta_result, *decay)
473 .await?;
474 }
475 _ => {
476 println!(" ⚠️ Complex strategy detected, skipping incremental update");
478 }
479 }
480
481 self.update_baseline_embeddings(model, &delta_result.entities_affected)
483 .await?;
484
485 println!("✅ Incremental updates applied successfully");
486 Ok(())
487 }
488
489 async fn apply_additive_updates(
491 &self,
492 _model: &mut dyn EmbeddingModel,
493 delta_result: &DeltaResult,
494 ) -> Result<()> {
495 println!(
497 " 📈 Applied additive updates to {} entities",
498 delta_result.entities_affected.len()
499 );
500 Ok(())
501 }
502
503 async fn apply_weighted_average_updates(
505 &self,
506 _model: &mut dyn EmbeddingModel,
507 delta_result: &DeltaResult,
508 alpha: f32,
509 ) -> Result<()> {
510 println!(
512 " ⚖️ Applied weighted average updates (α={}) to {} entities",
513 alpha,
514 delta_result.entities_affected.len()
515 );
516 Ok(())
517 }
518
519 async fn apply_exponential_average_updates(
521 &self,
522 _model: &mut dyn EmbeddingModel,
523 delta_result: &DeltaResult,
524 decay: f32,
525 ) -> Result<()> {
526 println!(
528 " 📉 Applied exponential average updates (decay={}) to {} entities",
529 decay,
530 delta_result.entities_affected.len()
531 );
532 Ok(())
533 }
534
535 async fn update_baseline_embeddings(
537 &self,
538 model: &dyn EmbeddingModel,
539 entities: &HashSet<String>,
540 ) -> Result<()> {
541 let mut baseline = self.baseline_embeddings.write().unwrap();
542
543 for entity in entities {
544 if let Ok(embedding) = model.get_entity_embedding(entity) {
545 let array = Array1::from_vec(embedding.values);
546 baseline.insert(entity.clone(), array);
547 }
548 }
549
550 Ok(())
551 }
552
553 pub async fn set_baseline_from_model(&self, model: &dyn EmbeddingModel) -> Result<()> {
555 let entities = model.get_entities();
556 let mut baseline = self.baseline_embeddings.write().unwrap();
557 baseline.clear();
558
559 for entity in entities {
560 if let Ok(embedding) = model.get_entity_embedding(&entity) {
561 let array = Array1::from_vec(embedding.values);
562 baseline.insert(entity, array);
563 }
564 }
565
566 println!("📋 Set baseline embeddings for {} entities", baseline.len());
567 Ok(())
568 }
569
570 pub fn get_changes_in_window(&self, window_start: DateTime<Utc>) -> Vec<ChangeRecord> {
572 let change_log = self.change_log.read().unwrap();
573 change_log
574 .iter()
575 .filter(|change| change.timestamp >= window_start)
576 .cloned()
577 .collect()
578 }
579
580 pub fn get_change_statistics(&self) -> ChangeStatistics {
582 let change_log = self.change_log.read().unwrap();
583 let pending = self.pending_changes.read().unwrap();
584
585 let mut stats = ChangeStatistics {
586 total_changes: change_log.len(),
587 pending_changes: pending.len(),
588 ..Default::default()
589 };
590
591 for change in change_log.iter() {
593 match change.change_type {
594 ChangeType::EntityAdded => stats.entities_added += 1,
595 ChangeType::EntityRemoved => stats.entities_removed += 1,
596 ChangeType::EntityUpdated => stats.entities_updated += 1,
597 ChangeType::TripleAdded => stats.triples_added += 1,
598 ChangeType::TripleRemoved => stats.triples_removed += 1,
599 ChangeType::RelationAdded => stats.relations_added += 1,
600 ChangeType::RelationRemoved => stats.relations_removed += 1,
601 ChangeType::BulkImport => stats.bulk_imports += 1,
602 ChangeType::ModelRetrained => stats.model_retrains += 1,
603 }
604 }
605
606 if let (Some(oldest), Some(newest)) = (change_log.front(), change_log.back()) {
608 stats.oldest_change = Some(oldest.timestamp);
609 stats.newest_change = Some(newest.timestamp);
610 }
611
612 stats
613 }
614
615 pub fn clear_change_log(&self) {
617 let mut change_log = self.change_log.write().unwrap();
618 let mut pending = self.pending_changes.write().unwrap();
619 change_log.clear();
620 pending.clear();
621 println!("🗑️ Cleared change log and pending changes");
622 }
623
624 pub fn set_incremental_strategy(&mut self, strategy: IncrementalStrategy) {
626 self.incremental_strategy = strategy;
627 }
628
629 pub fn should_compute_delta(&self) -> bool {
631 let pending = self.pending_changes.read().unwrap();
632 pending.len() >= self.config.min_changes_for_delta
633 }
634
635 pub fn get_last_delta_timestamp(&self) -> Option<DateTime<Utc>> {
637 *self.last_delta_timestamp.read().unwrap()
638 }
639}
640
641#[derive(Debug, Clone, Default)]
643pub struct ChangeStatistics {
644 pub total_changes: usize,
645 pub pending_changes: usize,
646 pub entities_added: usize,
647 pub entities_removed: usize,
648 pub entities_updated: usize,
649 pub triples_added: usize,
650 pub triples_removed: usize,
651 pub relations_added: usize,
652 pub relations_removed: usize,
653 pub bulk_imports: usize,
654 pub model_retrains: usize,
655 pub oldest_change: Option<DateTime<Utc>>,
656 pub newest_change: Option<DateTime<Utc>>,
657}
658
659impl ChangeStatistics {
660 pub fn total_entity_changes(&self) -> usize {
662 self.entities_added + self.entities_removed + self.entities_updated
663 }
664
665 pub fn total_triple_changes(&self) -> usize {
667 self.triples_added + self.triples_removed
668 }
669
670 pub fn total_relation_changes(&self) -> usize {
672 self.relations_added + self.relations_removed
673 }
674}
675
676impl Hash for ChangeRecord {
678 fn hash<H: Hasher>(&self, state: &mut H) {
679 self.change_type.hash(state);
680 self.entity_id.hash(state);
681 self.relation_id.hash(state);
682 }
684}
685
686impl PartialEq for ChangeRecord {
687 fn eq(&self, other: &Self) -> bool {
688 self.change_type == other.change_type
689 && self.entity_id == other.entity_id
690 && self.relation_id == other.relation_id
691 }
692}
693
694impl Eq for ChangeRecord {}
695
696#[cfg(test)]
697mod tests {
698 use super::*;
699 use crate::{ModelConfig, NamedNode, TransE};
700
701 #[test]
702 fn test_change_record_creation() {
703 let change = ChangeRecord::new(ChangeType::EntityAdded)
704 .with_entity("test_entity".to_string())
705 .with_metadata("source", "test");
706
707 assert_eq!(change.change_type, ChangeType::EntityAdded);
708 assert_eq!(change.entity_id, Some("test_entity".to_string()));
709 assert_eq!(change.metadata.get("source"), Some(&"test".to_string()));
710 }
711
712 #[test]
713 fn test_delta_config_default() {
714 let config = DeltaConfig::default();
715 assert_eq!(config.max_changes, 100_000);
716 assert_eq!(config.time_window_seconds, 3600);
717 assert!(config.enable_incremental_updates);
718 assert_eq!(config.delta_batch_size, 1000);
719 }
720
721 #[test]
722 fn test_delta_manager_creation() {
723 let config = DeltaConfig::default();
724 let manager = DeltaManager::new(config);
725
726 assert_eq!(manager.config.max_changes, 100_000);
727 assert!(!manager.should_compute_delta()); }
729
730 #[test]
731 fn test_record_changes() {
732 let config = DeltaConfig {
733 min_changes_for_delta: 2,
734 ..Default::default()
735 };
736 let manager = DeltaManager::new(config);
737
738 manager
740 .record_entity_added("entity1".to_string(), None)
741 .unwrap();
742 manager
743 .record_entity_added("entity2".to_string(), None)
744 .unwrap();
745
746 assert!(manager.should_compute_delta());
747
748 let stats = manager.get_change_statistics();
749 assert_eq!(stats.pending_changes, 2);
750 assert_eq!(stats.entities_added, 2);
751 }
752
753 #[test]
754 fn test_bulk_import_recording() {
755 let config = DeltaConfig::default();
756 let manager = DeltaManager::new(config);
757
758 let batch_id = manager.record_bulk_import(1000, 5000).unwrap();
759
760 let stats = manager.get_change_statistics();
761 assert_eq!(stats.bulk_imports, 1);
762 assert_eq!(stats.pending_changes, 1);
763
764 let pending = manager.pending_changes.read().unwrap();
766 assert_eq!(pending[0].batch_id, Some(batch_id));
767 }
768
769 #[tokio::test]
770 async fn test_delta_computation() {
771 let config = DeltaConfig {
772 min_changes_for_delta: 1,
773 ..Default::default()
774 };
775 let manager = DeltaManager::new(config);
776
777 let model_config = ModelConfig::default().with_dimensions(10);
779 let mut model = TransE::new(model_config);
780
781 let triple = Triple::new(
783 NamedNode::new("http://example.org/alice").unwrap(),
784 NamedNode::new("http://example.org/knows").unwrap(),
785 NamedNode::new("http://example.org/bob").unwrap(),
786 );
787 model.add_triple(triple.clone()).unwrap();
788
789 model.train(Some(1)).await.unwrap();
791
792 manager.set_baseline_from_model(&model).await.unwrap();
794
795 manager
797 .record_entity_added("http://example.org/alice".to_string(), None)
798 .unwrap();
799
800 let delta_result = manager.compute_delta(&model).await.unwrap();
802
803 assert_eq!(delta_result.changes_processed, 1);
804 assert!(!delta_result.entities_affected.is_empty());
805 }
807
808 #[test]
809 fn test_change_statistics() {
810 let config = DeltaConfig::default();
811 let manager = DeltaManager::new(config);
812
813 manager
815 .record_entity_added("entity1".to_string(), None)
816 .unwrap();
817 manager
818 .record_entity_removed("entity2".to_string())
819 .unwrap();
820
821 let triple = Triple::new(
822 NamedNode::new("http://example.org/s").unwrap(),
823 NamedNode::new("http://example.org/p").unwrap(),
824 NamedNode::new("http://example.org/o").unwrap(),
825 );
826 manager.record_triple_added(triple, None).unwrap();
827
828 let stats = manager.get_change_statistics();
829
830 assert_eq!(stats.total_entity_changes(), 2);
831 assert_eq!(stats.total_triple_changes(), 1);
832 assert_eq!(stats.total_changes, 3);
833 assert_eq!(stats.pending_changes, 3);
834 }
835
836 #[test]
837 fn test_incremental_strategies() {
838 let mut manager = DeltaManager::new(DeltaConfig::default());
839
840 manager.set_incremental_strategy(IncrementalStrategy::Additive);
842 assert!(matches!(
843 manager.incremental_strategy,
844 IncrementalStrategy::Additive
845 ));
846
847 manager.set_incremental_strategy(IncrementalStrategy::WeightedAverage { alpha: 0.2 });
848 if let IncrementalStrategy::WeightedAverage { alpha } = manager.incremental_strategy {
849 assert_eq!(alpha, 0.2);
850 } else {
851 panic!("Expected WeightedAverage strategy");
852 }
853 }
854
855 #[test]
856 fn test_change_record_equality() {
857 let change1 = ChangeRecord::new(ChangeType::EntityAdded).with_entity("entity1".to_string());
858
859 let change2 = ChangeRecord::new(ChangeType::EntityAdded).with_entity("entity1".to_string());
860
861 let change3 = ChangeRecord::new(ChangeType::EntityAdded).with_entity("entity2".to_string());
862
863 assert_eq!(change1, change2); assert_ne!(change1, change3); }
866}