1use anyhow::{anyhow, Result};
7use serde::{Deserialize, Serialize};
8use std::collections::{HashMap, HashSet};
9use std::mem;
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12use tokio::sync::{broadcast, mpsc, RwLock};
13use tokio::time::interval;
14use tracing::{debug, error, info, warn};
15
16#[cfg(test)]
17use crate::StreamConfig;
18use crate::{EventMetadata, RdfPatch, StreamConsumer, StreamEvent, StreamProducer};
19
20pub struct StoreChangeDetector {
22 store: Arc<dyn RdfStore>,
24 strategy: ChangeDetectionStrategy,
26 producer: Arc<RwLock<StreamProducer>>,
28 change_buffer: Arc<RwLock<Vec<StoreChange>>>,
30 config: ChangeDetectorConfig,
32 stats: Arc<RwLock<ChangeDetectorStats>>,
34 change_notifier: broadcast::Sender<StoreChangeEvent>,
36}
37
38#[async_trait::async_trait]
40pub trait RdfStore: Send + Sync {
41 async fn get_transaction_log_position(&self) -> Result<u64>;
43
44 async fn read_transaction_log(
46 &self,
47 from: u64,
48 limit: usize,
49 ) -> Result<Vec<TransactionLogEntry>>;
50
51 async fn subscribe_changes(&self) -> Result<mpsc::Receiver<StoreChange>>;
53
54 async fn get_statistics(&self) -> Result<StoreStatistics>;
56
57 async fn apply_patch(&self, patch: &RdfPatch) -> Result<()>;
59
60 async fn execute_update(&self, update: &str) -> Result<()>;
62
63 async fn query(&self, query: &str) -> Result<QueryResult>;
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize)]
69pub enum ChangeDetectionStrategy {
70 TransactionLog {
72 poll_interval: Duration,
73 batch_size: usize,
74 },
75 TriggerBased { trigger_types: Vec<TriggerType> },
77 Polling {
79 poll_interval: Duration,
80 snapshot_interval: Duration,
81 },
82 EventSourcing { event_store_url: String },
84 Hybrid {
86 primary: Box<ChangeDetectionStrategy>,
87 fallback: Box<ChangeDetectionStrategy>,
88 },
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize)]
93pub enum TriggerType {
94 Insert,
95 Delete,
96 Update,
97 GraphChange,
98 SchemaChange,
99}
100
101#[derive(Debug, Clone)]
103pub struct StoreChange {
104 pub change_type: ChangeType,
105 pub timestamp: chrono::DateTime<chrono::Utc>,
106 pub transaction_id: Option<String>,
107 pub user: Option<String>,
108 pub affected_triples: Vec<Triple>,
109 pub metadata: HashMap<String, String>,
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
114pub enum ChangeType {
115 TripleAdded,
116 TripleRemoved,
117 GraphCreated,
118 GraphDeleted,
119 GraphCleared,
120 BulkUpdate,
121 SchemaUpdate,
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
126pub struct Triple {
127 pub subject: String,
128 pub predicate: String,
129 pub object: String,
130 pub graph: Option<String>,
131}
132
133#[derive(Debug, Clone)]
135pub struct TransactionLogEntry {
136 pub position: u64,
137 pub timestamp: chrono::DateTime<chrono::Utc>,
138 pub transaction_id: String,
139 pub operations: Vec<LogOperation>,
140 pub metadata: HashMap<String, String>,
141}
142
143#[derive(Debug, Clone)]
145pub enum LogOperation {
146 Add(Triple),
147 Remove(Triple),
148 CreateGraph(String),
149 DeleteGraph(String),
150 ClearGraph(Option<String>),
151}
152
153#[derive(Debug, Clone)]
155pub struct StoreStatistics {
156 pub total_triples: u64,
157 pub total_graphs: u64,
158 pub transaction_count: u64,
159 pub last_modified: chrono::DateTime<chrono::Utc>,
160}
161
162#[derive(Debug, Clone)]
164pub struct QueryResult {
165 pub bindings: Vec<HashMap<String, String>>,
166}
167
168#[derive(Debug, Clone)]
170pub struct ChangeDetectorConfig {
171 pub buffer_size: usize,
173 pub flush_interval: Duration,
175 pub enable_deduplication: bool,
177 pub dedup_window: Duration,
179 pub enable_compression: bool,
181 pub compression_threshold: usize,
183}
184
185impl Default for ChangeDetectorConfig {
186 fn default() -> Self {
187 Self {
188 buffer_size: 1000,
189 flush_interval: Duration::from_millis(100),
190 enable_deduplication: true,
191 dedup_window: Duration::from_secs(60),
192 enable_compression: true,
193 compression_threshold: 100,
194 }
195 }
196}
197
198#[derive(Debug, Default, Clone)]
200pub struct ChangeDetectorStats {
201 pub changes_detected: u64,
202 pub changes_published: u64,
203 pub changes_deduplicated: u64,
204 pub batches_compressed: u64,
205 pub errors: u64,
206 pub last_position: u64,
207 pub lag_ms: u64,
208}
209
210#[derive(Debug, Clone)]
212pub enum StoreChangeEvent {
213 ChangesDetected { count: usize },
215 ChangesPublished { count: usize },
217 Error { message: String },
219 LagDetected { lag_ms: u64 },
221}
222
223impl StoreChangeDetector {
224 pub async fn new(
226 store: Arc<dyn RdfStore>,
227 strategy: ChangeDetectionStrategy,
228 producer: Arc<RwLock<StreamProducer>>,
229 config: ChangeDetectorConfig,
230 ) -> Result<Self> {
231 let (tx, _) = broadcast::channel(1000);
232
233 Ok(Self {
234 store,
235 strategy,
236 producer,
237 change_buffer: Arc::new(RwLock::new(Vec::new())),
238 config,
239 stats: Arc::new(RwLock::new(ChangeDetectorStats::default())),
240 change_notifier: tx,
241 })
242 }
243
244 pub fn start(
246 &self,
247 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + '_>> {
248 Box::pin(async move {
249 match &self.strategy {
250 ChangeDetectionStrategy::TransactionLog {
251 poll_interval,
252 batch_size,
253 } => {
254 self.start_transaction_log_tailing(*poll_interval, *batch_size)
255 .await
256 }
257 ChangeDetectionStrategy::TriggerBased { .. } => {
258 self.start_trigger_based_detection().await
259 }
260 ChangeDetectionStrategy::Polling { poll_interval, .. } => {
261 self.start_polling_detection(*poll_interval).await
262 }
263 ChangeDetectionStrategy::EventSourcing { .. } => self.start_event_sourcing().await,
264 ChangeDetectionStrategy::Hybrid {
265 primary: _,
266 fallback: _,
267 } => self.start_hybrid_detection().await,
268 }
269 })
270 }
271
272 async fn start_transaction_log_tailing(
274 &self,
275 poll_interval: Duration,
276 batch_size: usize,
277 ) -> Result<()> {
278 let store = self.store.clone();
279 let buffer = self.change_buffer.clone();
280 let stats = self.stats.clone();
281 let notifier = self.change_notifier.clone();
282
283 tokio::spawn(async move {
284 let mut interval = interval(poll_interval);
285 let mut last_position = 0u64;
286
287 loop {
288 interval.tick().await;
289
290 match store.read_transaction_log(last_position, batch_size).await {
291 Ok(entries) => {
292 if entries.is_empty() {
293 continue;
294 }
295
296 let mut changes = Vec::new();
297
298 for entry in entries {
299 last_position = last_position.max(entry.position + 1);
300
301 for op in entry.operations {
302 let change = match op {
303 LogOperation::Add(triple) => StoreChange {
304 change_type: ChangeType::TripleAdded,
305 timestamp: entry.timestamp,
306 transaction_id: Some(entry.transaction_id.clone()),
307 user: entry.metadata.get("user").cloned(),
308 affected_triples: vec![triple],
309 metadata: entry.metadata.clone(),
310 },
311 LogOperation::Remove(triple) => StoreChange {
312 change_type: ChangeType::TripleRemoved,
313 timestamp: entry.timestamp,
314 transaction_id: Some(entry.transaction_id.clone()),
315 user: entry.metadata.get("user").cloned(),
316 affected_triples: vec![triple],
317 metadata: entry.metadata.clone(),
318 },
319 LogOperation::CreateGraph(graph) => StoreChange {
320 change_type: ChangeType::GraphCreated,
321 timestamp: entry.timestamp,
322 transaction_id: Some(entry.transaction_id.clone()),
323 user: entry.metadata.get("user").cloned(),
324 affected_triples: vec![],
325 metadata: {
326 let mut meta = entry.metadata.clone();
327 meta.insert("graph".to_string(), graph);
328 meta
329 },
330 },
331 LogOperation::DeleteGraph(graph) => StoreChange {
332 change_type: ChangeType::GraphDeleted,
333 timestamp: entry.timestamp,
334 transaction_id: Some(entry.transaction_id.clone()),
335 user: entry.metadata.get("user").cloned(),
336 affected_triples: vec![],
337 metadata: {
338 let mut meta = entry.metadata.clone();
339 meta.insert("graph".to_string(), graph);
340 meta
341 },
342 },
343 LogOperation::ClearGraph(graph) => StoreChange {
344 change_type: ChangeType::GraphCleared,
345 timestamp: entry.timestamp,
346 transaction_id: Some(entry.transaction_id.clone()),
347 user: entry.metadata.get("user").cloned(),
348 affected_triples: vec![],
349 metadata: {
350 let mut meta = entry.metadata.clone();
351 if let Some(g) = graph {
352 meta.insert("graph".to_string(), g);
353 }
354 meta
355 },
356 },
357 };
358
359 changes.push(change);
360 }
361 }
362
363 {
365 let mut stats_guard = stats.write().await;
366 stats_guard.changes_detected += changes.len() as u64;
367 stats_guard.last_position = last_position;
368 }
369
370 buffer.write().await.extend(changes.clone());
372
373 let _ = notifier.send(StoreChangeEvent::ChangesDetected {
375 count: changes.len(),
376 });
377
378 debug!("Detected {} changes from transaction log", changes.len());
379 }
380 Err(e) => {
381 error!("Failed to read transaction log: {}", e);
382 stats.write().await.errors += 1;
383 let _ = notifier.send(StoreChangeEvent::Error {
384 message: e.to_string(),
385 });
386 }
387 }
388 }
389 });
390
391 self.start_buffer_flusher().await;
393
394 Ok(())
395 }
396
397 async fn start_trigger_based_detection(&self) -> Result<()> {
399 let mut receiver = self.store.subscribe_changes().await?;
400 let buffer = self.change_buffer.clone();
401 let stats = self.stats.clone();
402 let notifier = self.change_notifier.clone();
403
404 tokio::spawn(async move {
405 while let Some(change) = receiver.recv().await {
406 stats.write().await.changes_detected += 1;
408
409 buffer.write().await.push(change);
411
412 let _ = notifier.send(StoreChangeEvent::ChangesDetected { count: 1 });
414 }
415 });
416
417 self.start_buffer_flusher().await;
419
420 Ok(())
421 }
422
423 async fn start_polling_detection(&self, poll_interval: Duration) -> Result<()> {
425 let store = self.store.clone();
426 let buffer = self.change_buffer.clone();
427 let stats = self.stats.clone();
428
429 tokio::spawn(async move {
433 let mut interval = interval(poll_interval);
434 let mut last_snapshot: Option<StoreSnapshot> = None;
435
436 loop {
437 interval.tick().await;
438
439 match Self::take_snapshot(&store).await {
441 Ok(current_snapshot) => {
442 if let Some(last) = &last_snapshot {
443 let changes = Self::compare_snapshots(last, ¤t_snapshot);
445
446 if !changes.is_empty() {
447 stats.write().await.changes_detected += changes.len() as u64;
448 buffer.write().await.extend(changes);
449 }
450 }
451
452 last_snapshot = Some(current_snapshot);
453 }
454 Err(e) => {
455 error!("Failed to take snapshot: {}", e);
456 stats.write().await.errors += 1;
457 }
458 }
459 }
460 });
461
462 Ok(())
463 }
464
465 async fn start_event_sourcing(&self) -> Result<()> {
467 let ChangeDetectionStrategy::EventSourcing { event_store_url } = &self.strategy else {
468 return Err(anyhow!("Invalid strategy for event sourcing"));
469 };
470
471 let _store = self.store.clone();
472 let producer = self.producer.clone();
473 let stats = self.stats.clone();
474 let notifier = self.change_notifier.clone();
475 let event_store_url_clone = event_store_url.clone();
476
477 tokio::spawn(async move {
478 let mut interval = interval(Duration::from_secs(5));
479 let mut last_event_id = 0u64;
480
481 loop {
482 interval.tick().await;
483
484 match Self::fetch_events_from_store(&event_store_url_clone, last_event_id).await {
486 Ok(events) => {
487 if !events.is_empty() {
488 debug!("Fetched {} events from event store", events.len());
489
490 let changes = Self::convert_events_to_changes(events);
492
493 if let Some(last_change) = changes.last() {
495 if let Some(event_id_str) = last_change.metadata.get("event_id") {
496 if let Ok(event_id) = event_id_str.parse::<u64>() {
497 last_event_id = event_id;
498 }
499 }
500 }
501
502 let stream_events = Self::convert_to_stream_events(changes);
504 let count = stream_events.len();
505
506 match producer.write().await.publish_batch(stream_events).await {
507 Ok(_) => {
508 stats.write().await.changes_published += count as u64;
509 let _ =
510 notifier.send(StoreChangeEvent::ChangesPublished { count });
511 }
512 Err(e) => {
513 error!("Failed to publish events from event store: {}", e);
514 stats.write().await.errors += 1;
515 }
516 }
517 }
518 }
519 Err(e) => {
520 error!("Failed to fetch events from event store: {}", e);
521 stats.write().await.errors += 1;
522 }
523 }
524 }
525 });
526
527 info!("Started event sourcing from: {}", event_store_url);
528 Ok(())
529 }
530
531 async fn start_hybrid_detection(&self) -> Result<()> {
533 let ChangeDetectionStrategy::Hybrid { primary, fallback } = &self.strategy else {
534 return Err(anyhow!("Invalid strategy for hybrid detection"));
535 };
536
537 let primary_detector = Self::new(
538 self.store.clone(),
539 *primary.clone(),
540 self.producer.clone(),
541 self.config.clone(),
542 )
543 .await?;
544
545 let fallback_detector = Self::new(
546 self.store.clone(),
547 *fallback.clone(),
548 self.producer.clone(),
549 self.config.clone(),
550 )
551 .await?;
552
553 let stats = self.stats.clone();
554 let notifier = self.change_notifier.clone();
555
556 if let Err(e) = primary_detector.start().await {
558 error!("Primary strategy failed: {}", e);
559 stats.write().await.errors += 1;
560 let _ = notifier.send(StoreChangeEvent::Error {
561 message: format!("Primary strategy failed: {e}"),
562 });
563 }
564
565 let fallback_stats = self.stats.clone();
567 let fallback_notifier = self.change_notifier.clone();
568
569 tokio::spawn(async move {
570 tokio::time::sleep(Duration::from_secs(30)).await;
572
573 let primary_events = fallback_stats.read().await.changes_published;
575
576 if primary_events == 0 {
578 info!("Primary strategy not producing events, starting fallback");
579
580 if let Err(e) = fallback_detector
581 .start_polling_detection(Duration::from_secs(5))
582 .await
583 {
584 error!("Fallback strategy failed: {}", e);
585 fallback_stats.write().await.errors += 1;
586 let _ = fallback_notifier.send(StoreChangeEvent::Error {
587 message: format!("Fallback strategy failed: {e}"),
588 });
589 }
590 }
591 });
592
593 info!("Started hybrid detection with primary and fallback strategies");
594 Ok(())
595 }
596
597 async fn start_buffer_flusher(&self) {
599 let buffer = self.change_buffer.clone();
600 let producer = self.producer.clone();
601 let config = self.config.clone();
602 let stats = self.stats.clone();
603 let notifier = self.change_notifier.clone();
604
605 tokio::spawn(async move {
606 let mut interval = interval(config.flush_interval);
607 let mut dedup_cache = if config.enable_deduplication {
608 Some(DedupCache::new(config.dedup_window))
609 } else {
610 None
611 };
612
613 loop {
614 interval.tick().await;
615
616 let mut changes = {
617 let mut buffer_guard = buffer.write().await;
618 std::mem::take(&mut *buffer_guard)
619 };
620
621 if changes.is_empty() {
622 continue;
623 }
624
625 if let Some(cache) = &mut dedup_cache {
627 let original_count = changes.len();
628 changes = cache.deduplicate(changes);
629 let dedup_count = original_count - changes.len();
630
631 if dedup_count > 0 {
632 stats.write().await.changes_deduplicated += dedup_count as u64;
633 debug!("Deduplicated {} changes", dedup_count);
634 }
635 }
636
637 let events = Self::convert_to_stream_events(changes);
639 let count = events.len();
640
641 match producer.write().await.publish_batch(events).await {
643 Ok(_) => {
644 stats.write().await.changes_published += count as u64;
645 let _ = notifier.send(StoreChangeEvent::ChangesPublished { count });
646 debug!("Published {} changes to stream", count);
647 }
648 Err(e) => {
649 error!("Failed to publish changes: {}", e);
650 stats.write().await.errors += 1;
651 let _ = notifier.send(StoreChangeEvent::Error {
652 message: e.to_string(),
653 });
654 }
655 }
656 }
657 });
658 }
659
660 fn convert_to_stream_events(changes: Vec<StoreChange>) -> Vec<StreamEvent> {
662 changes
663 .into_iter()
664 .flat_map(|change| {
665 let metadata = EventMetadata {
666 event_id: uuid::Uuid::new_v4().to_string(),
667 timestamp: change.timestamp,
668 source: "store-change-detector".to_string(),
669 user: change.user,
670 context: change.transaction_id.clone(),
671 caused_by: None,
672 version: "1.0".to_string(),
673 properties: change.metadata.clone(),
674 checksum: None,
675 };
676
677 match change.change_type {
678 ChangeType::TripleAdded => change
679 .affected_triples
680 .into_iter()
681 .map(|triple| StreamEvent::TripleAdded {
682 subject: triple.subject,
683 predicate: triple.predicate,
684 object: triple.object,
685 graph: triple.graph,
686 metadata: metadata.clone(),
687 })
688 .collect(),
689 ChangeType::TripleRemoved => change
690 .affected_triples
691 .into_iter()
692 .map(|triple| StreamEvent::TripleRemoved {
693 subject: triple.subject,
694 predicate: triple.predicate,
695 object: triple.object,
696 graph: triple.graph,
697 metadata: metadata.clone(),
698 })
699 .collect(),
700 ChangeType::GraphCreated => {
701 if let Some(graph) = change.metadata.get("graph") {
702 vec![StreamEvent::GraphCreated {
703 graph: graph.clone(),
704 metadata,
705 }]
706 } else {
707 vec![]
708 }
709 }
710 ChangeType::GraphDeleted => {
711 if let Some(graph) = change.metadata.get("graph") {
712 vec![StreamEvent::GraphDeleted {
713 graph: graph.clone(),
714 metadata,
715 }]
716 } else {
717 vec![]
718 }
719 }
720 ChangeType::GraphCleared => {
721 vec![StreamEvent::GraphCleared {
722 graph: change.metadata.get("graph").cloned(),
723 metadata,
724 }]
725 }
726 _ => vec![],
727 }
728 })
729 .collect()
730 }
731
732 async fn take_snapshot(store: &Arc<dyn RdfStore>) -> Result<StoreSnapshot> {
734 let stats = store.get_statistics().await?;
735
736 let checksum = format!(
738 "{}-{}-{}",
739 stats.total_triples,
740 stats.total_graphs,
741 stats.last_modified.timestamp()
742 );
743
744 Ok(StoreSnapshot {
745 timestamp: chrono::Utc::now(),
746 triple_count: stats.total_triples,
747 graph_count: stats.total_graphs,
748 checksum,
749 })
750 }
751
752 fn compare_snapshots(old: &StoreSnapshot, new: &StoreSnapshot) -> Vec<StoreChange> {
754 let mut changes = Vec::new();
755
756 if new.triple_count != old.triple_count {
758 let change = if new.triple_count > old.triple_count {
759 StoreChange {
760 change_type: ChangeType::BulkUpdate,
761 timestamp: new.timestamp,
762 transaction_id: None,
763 user: Some("system".to_string()),
764 affected_triples: vec![],
765 metadata: {
766 let mut meta = HashMap::new();
767 meta.insert("change_type".to_string(), "triples_added".to_string());
768 meta.insert("old_count".to_string(), old.triple_count.to_string());
769 meta.insert("new_count".to_string(), new.triple_count.to_string());
770 meta
771 },
772 }
773 } else {
774 StoreChange {
775 change_type: ChangeType::BulkUpdate,
776 timestamp: new.timestamp,
777 transaction_id: None,
778 user: Some("system".to_string()),
779 affected_triples: vec![],
780 metadata: {
781 let mut meta = HashMap::new();
782 meta.insert("change_type".to_string(), "triples_removed".to_string());
783 meta.insert("old_count".to_string(), old.triple_count.to_string());
784 meta.insert("new_count".to_string(), new.triple_count.to_string());
785 meta
786 },
787 }
788 };
789 changes.push(change);
790 }
791
792 if new.graph_count != old.graph_count {
794 let change = if new.graph_count > old.graph_count {
795 StoreChange {
796 change_type: ChangeType::GraphCreated,
797 timestamp: new.timestamp,
798 transaction_id: None,
799 user: Some("system".to_string()),
800 affected_triples: vec![],
801 metadata: {
802 let mut meta = HashMap::new();
803 meta.insert("change_type".to_string(), "graphs_added".to_string());
804 meta.insert("old_count".to_string(), old.graph_count.to_string());
805 meta.insert("new_count".to_string(), new.graph_count.to_string());
806 meta
807 },
808 }
809 } else {
810 StoreChange {
811 change_type: ChangeType::GraphDeleted,
812 timestamp: new.timestamp,
813 transaction_id: None,
814 user: Some("system".to_string()),
815 affected_triples: vec![],
816 metadata: {
817 let mut meta = HashMap::new();
818 meta.insert("change_type".to_string(), "graphs_removed".to_string());
819 meta.insert("old_count".to_string(), old.graph_count.to_string());
820 meta.insert("new_count".to_string(), new.graph_count.to_string());
821 meta
822 },
823 }
824 };
825 changes.push(change);
826 }
827
828 changes
829 }
830
831 pub async fn get_stats(&self) -> ChangeDetectorStats {
833 self.stats.read().await.clone()
834 }
835
836 pub fn subscribe(&self) -> broadcast::Receiver<StoreChangeEvent> {
838 self.change_notifier.subscribe()
839 }
840
841 async fn fetch_events_from_store(
843 event_store_url: &str,
844 from_id: u64,
845 ) -> Result<Vec<EventStoreEvent>> {
846 use reqwest;
850
851 let client = reqwest::Client::new();
852 let url = format!("{event_store_url}/events?from={from_id}&limit=100");
853
854 match client.get(&url).send().await {
855 Ok(response) => {
856 if response.status().is_success() {
857 match response.json::<Vec<EventStoreEvent>>().await {
858 Ok(events) => Ok(events),
859 Err(e) => {
860 warn!("Failed to parse events from event store: {}", e);
861 Ok(vec![]) }
863 }
864 } else {
865 warn!("Event store returned status: {}", response.status());
866 Ok(vec![])
867 }
868 }
869 Err(e) => {
870 debug!("Failed to connect to event store: {}", e);
871 Ok(vec![]) }
873 }
874 }
875
876 fn convert_events_to_changes(events: Vec<EventStoreEvent>) -> Vec<StoreChange> {
878 events
879 .into_iter()
880 .filter_map(|event| match event.event_type.as_str() {
881 "triple_added" => Some(StoreChange {
882 change_type: ChangeType::TripleAdded,
883 affected_triples: vec![Triple {
884 subject: event.data.get("subject")?.clone(),
885 predicate: event.data.get("predicate")?.clone(),
886 object: event.data.get("object")?.clone(),
887 graph: event.data.get("graph").cloned(),
888 }],
889 timestamp: event.timestamp,
890 transaction_id: event.transaction_id,
891 user: Some(event.user.unwrap_or_else(|| "system".to_string())),
892 metadata: event.metadata,
893 }),
894 "triple_removed" => Some(StoreChange {
895 change_type: ChangeType::TripleRemoved,
896 affected_triples: vec![Triple {
897 subject: event.data.get("subject")?.clone(),
898 predicate: event.data.get("predicate")?.clone(),
899 object: event.data.get("object")?.clone(),
900 graph: event.data.get("graph").cloned(),
901 }],
902 timestamp: event.timestamp,
903 transaction_id: event.transaction_id,
904 user: Some(event.user.unwrap_or_else(|| "system".to_string())),
905 metadata: event.metadata,
906 }),
907 "graph_created" => Some(StoreChange {
908 change_type: ChangeType::GraphCreated,
909 affected_triples: vec![],
910 timestamp: event.timestamp,
911 transaction_id: event.transaction_id,
912 user: Some(event.user.unwrap_or_else(|| "system".to_string())),
913 metadata: {
914 let mut meta = event.metadata;
915 if let Some(graph) = event.data.get("graph") {
916 meta.insert("graph".to_string(), graph.clone());
917 }
918 meta
919 },
920 }),
921 _ => {
922 debug!("Unknown event type: {}", event.event_type);
923 None
924 }
925 })
926 .collect()
927 }
928}
929
930#[derive(Debug, Clone, Serialize, Deserialize)]
932struct EventStoreEvent {
933 pub id: u64,
934 pub event_type: String,
935 pub data: HashMap<String, String>,
936 pub metadata: HashMap<String, String>,
937 pub timestamp: chrono::DateTime<chrono::Utc>,
938 pub transaction_id: Option<String>,
939 pub user: Option<String>,
940}
941
942#[derive(Debug, Clone)]
944struct StoreSnapshot {
945 timestamp: chrono::DateTime<chrono::Utc>,
946 triple_count: u64,
947 graph_count: u64,
948 checksum: String,
949}
950
951struct DedupCache {
953 seen: Arc<RwLock<HashSet<String>>>,
954 window: Duration,
955 last_cleanup: Arc<RwLock<Instant>>,
956}
957
958impl DedupCache {
959 fn new(window: Duration) -> Self {
960 Self {
961 seen: Arc::new(RwLock::new(HashSet::new())),
962 window,
963 last_cleanup: Arc::new(RwLock::new(Instant::now())),
964 }
965 }
966
967 fn deduplicate(&mut self, changes: Vec<StoreChange>) -> Vec<StoreChange> {
968 changes
971 }
972}
973
974pub struct RealtimeUpdateManager {
976 subscribers: Arc<RwLock<HashMap<String, UpdateSubscriber>>>,
978 consumer: Arc<RwLock<StreamConsumer>>,
980 filters: Arc<RwLock<Vec<UpdateFilter>>>,
982 config: UpdateManagerConfig,
984 stats: Arc<RwLock<UpdateManagerStats>>,
986}
987
988#[derive(Debug)]
990struct UpdateSubscriber {
991 id: String,
992 channel: UpdateChannel,
993 filters: Vec<UpdateFilter>,
994 created_at: Instant,
995 last_update: Option<Instant>,
996 update_count: u64,
997}
998
999#[derive(Debug)]
1001pub enum UpdateChannel {
1002 WebSocket(mpsc::Sender<UpdateNotification>),
1004 ServerSentEvents(mpsc::Sender<UpdateNotification>),
1006 Webhook {
1008 url: String,
1009 headers: HashMap<String, String>,
1010 },
1011 MessageQueue { topic: String },
1013}
1014
1015#[derive(Debug, Clone)]
1017pub struct UpdateFilter {
1018 pub graph_filter: Option<String>,
1020 pub subject_pattern: Option<regex::Regex>,
1022 pub predicate_filter: Option<String>,
1024 pub change_types: Option<Vec<ChangeType>>,
1026}
1027
1028#[derive(Debug, Clone, Serialize, Deserialize)]
1030pub struct UpdateNotification {
1031 pub id: String,
1032 pub timestamp: chrono::DateTime<chrono::Utc>,
1033 pub changes: Vec<ChangeNotification>,
1034 pub metadata: HashMap<String, String>,
1035}
1036
1037#[derive(Debug, Clone, Serialize, Deserialize)]
1039pub struct ChangeNotification {
1040 pub change_type: String,
1041 pub subject: Option<String>,
1042 pub predicate: Option<String>,
1043 pub object: Option<String>,
1044 pub graph: Option<String>,
1045}
1046
1047#[derive(Debug, Clone)]
1049pub struct UpdateManagerConfig {
1050 pub max_subscribers: usize,
1052 pub batch_size: usize,
1054 pub batch_timeout: Duration,
1056 pub enable_filtering: bool,
1058 pub retry_webhooks: bool,
1060 pub webhook_timeout: Duration,
1062}
1063
1064impl Default for UpdateManagerConfig {
1065 fn default() -> Self {
1066 Self {
1067 max_subscribers: 1000,
1068 batch_size: 100,
1069 batch_timeout: Duration::from_millis(50),
1070 enable_filtering: true,
1071 retry_webhooks: true,
1072 webhook_timeout: Duration::from_secs(5),
1073 }
1074 }
1075}
1076
1077#[derive(Debug, Default, Clone)]
1079pub struct UpdateManagerStats {
1080 pub total_subscribers: usize,
1081 pub active_subscribers: usize,
1082 pub updates_sent: u64,
1083 pub updates_filtered: u64,
1084 pub webhook_failures: u64,
1085 pub avg_latency_ms: f64,
1086}
1087
1088impl RealtimeUpdateManager {
1089 pub async fn new(
1091 consumer: Arc<RwLock<StreamConsumer>>,
1092 config: UpdateManagerConfig,
1093 ) -> Result<Self> {
1094 Ok(Self {
1095 subscribers: Arc::new(RwLock::new(HashMap::new())),
1096 consumer,
1097 filters: Arc::new(RwLock::new(Vec::new())),
1098 config,
1099 stats: Arc::new(RwLock::new(UpdateManagerStats::default())),
1100 })
1101 }
1102
1103 pub async fn start(&self) -> Result<()> {
1105 let consumer = self.consumer.clone();
1106 let subscribers = self.subscribers.clone();
1107 let filters = self.filters.clone();
1108 let config = self.config.clone();
1109 let stats = self.stats.clone();
1110
1111 tokio::spawn(async move {
1112 let mut batch = Vec::new();
1113 let mut batch_timer = Instant::now();
1114
1115 loop {
1116 match tokio::time::timeout(
1118 Duration::from_millis(10),
1119 consumer.write().await.consume(),
1120 )
1121 .await
1122 {
1123 Ok(Ok(Some(event))) => {
1124 batch.push(event);
1125
1126 if batch.len() >= config.batch_size
1128 || batch_timer.elapsed() > config.batch_timeout
1129 {
1130 Self::process_batch(
1131 mem::take(&mut batch),
1132 &subscribers,
1133 &filters,
1134 &config,
1135 &stats,
1136 )
1137 .await;
1138 batch_timer = Instant::now();
1139 }
1140 }
1141 Ok(Ok(None)) => {
1142 if !batch.is_empty() && batch_timer.elapsed() > config.batch_timeout {
1144 Self::process_batch(
1145 mem::take(&mut batch),
1146 &subscribers,
1147 &filters,
1148 &config,
1149 &stats,
1150 )
1151 .await;
1152 batch_timer = Instant::now();
1153 }
1154 }
1155 Ok(Err(e)) => {
1156 error!("Consumer error: {}", e);
1157 }
1158 Err(_) => {
1159 if !batch.is_empty() && batch_timer.elapsed() > config.batch_timeout {
1161 Self::process_batch(
1162 mem::take(&mut batch),
1163 &subscribers,
1164 &filters,
1165 &config,
1166 &stats,
1167 )
1168 .await;
1169 batch_timer = Instant::now();
1170 }
1171 }
1172 }
1173 }
1174 });
1175
1176 Ok(())
1177 }
1178
1179 async fn process_batch(
1181 events: Vec<StreamEvent>,
1182 subscribers: &Arc<RwLock<HashMap<String, UpdateSubscriber>>>,
1183 _filters: &Arc<RwLock<Vec<UpdateFilter>>>,
1184 config: &UpdateManagerConfig,
1185 stats: &Arc<RwLock<UpdateManagerStats>>,
1186 ) {
1187 let notification = UpdateNotification {
1188 id: uuid::Uuid::new_v4().to_string(),
1189 timestamp: chrono::Utc::now(),
1190 changes: events.iter().map(Self::event_to_notification).collect(),
1191 metadata: HashMap::new(),
1192 };
1193
1194 let subscribers_guard = subscribers.read().await;
1195 for (_id, subscriber) in subscribers_guard.iter() {
1196 if config.enable_filtering && !subscriber.filters.is_empty() {
1198 let filtered_changes: Vec<_> = notification
1199 .changes
1200 .iter()
1201 .filter(|change| Self::matches_filters(change, &subscriber.filters))
1202 .cloned()
1203 .collect();
1204
1205 if filtered_changes.is_empty() {
1206 continue;
1207 }
1208
1209 let filtered_notification = UpdateNotification {
1210 changes: filtered_changes,
1211 ..notification.clone()
1212 };
1213
1214 Self::send_to_subscriber(subscriber, filtered_notification, config, stats).await;
1215 } else {
1216 Self::send_to_subscriber(subscriber, notification.clone(), config, stats).await;
1217 }
1218 }
1219 }
1220
1221 fn event_to_notification(event: &StreamEvent) -> ChangeNotification {
1223 match event {
1224 StreamEvent::TripleAdded {
1225 subject,
1226 predicate,
1227 object,
1228 graph,
1229 ..
1230 } => ChangeNotification {
1231 change_type: "triple_added".to_string(),
1232 subject: Some(subject.clone()),
1233 predicate: Some(predicate.clone()),
1234 object: Some(object.clone()),
1235 graph: graph.clone(),
1236 },
1237 StreamEvent::TripleRemoved {
1238 subject,
1239 predicate,
1240 object,
1241 graph,
1242 ..
1243 } => ChangeNotification {
1244 change_type: "triple_removed".to_string(),
1245 subject: Some(subject.clone()),
1246 predicate: Some(predicate.clone()),
1247 object: Some(object.clone()),
1248 graph: graph.clone(),
1249 },
1250 StreamEvent::GraphCreated { graph, .. } => ChangeNotification {
1251 change_type: "graph_created".to_string(),
1252 subject: None,
1253 predicate: None,
1254 object: None,
1255 graph: Some(graph.clone()),
1256 },
1257 _ => ChangeNotification {
1258 change_type: "unknown".to_string(),
1259 subject: None,
1260 predicate: None,
1261 object: None,
1262 graph: None,
1263 },
1264 }
1265 }
1266
1267 fn matches_filters(change: &ChangeNotification, filters: &[UpdateFilter]) -> bool {
1269 filters.iter().any(|filter| {
1270 if let Some(graph_filter) = &filter.graph_filter {
1272 if change.graph.as_ref() != Some(graph_filter) {
1273 return false;
1274 }
1275 }
1276
1277 if let Some(pattern) = &filter.subject_pattern {
1279 if let Some(subject) = &change.subject {
1280 if !pattern.is_match(subject) {
1281 return false;
1282 }
1283 }
1284 }
1285
1286 if let Some(pred_filter) = &filter.predicate_filter {
1288 if change.predicate.as_ref() != Some(pred_filter) {
1289 return false;
1290 }
1291 }
1292
1293 true
1294 })
1295 }
1296
1297 async fn send_to_subscriber(
1299 subscriber: &UpdateSubscriber,
1300 notification: UpdateNotification,
1301 _config: &UpdateManagerConfig,
1302 stats: &Arc<RwLock<UpdateManagerStats>>,
1303 ) {
1304 match &subscriber.channel {
1305 UpdateChannel::WebSocket(sender) => {
1306 if let Err(e) = sender.send(notification).await {
1307 warn!(
1308 "Failed to send to WebSocket subscriber {}: {}",
1309 subscriber.id, e
1310 );
1311 } else {
1312 stats.write().await.updates_sent += 1;
1313 }
1314 }
1315 UpdateChannel::Webhook { url, headers } => {
1316 let client = reqwest::Client::new();
1318 let mut request = client.post(url).json(¬ification);
1319
1320 for (key, value) in headers {
1322 request = request.header(key, value);
1323 }
1324
1325 match tokio::time::timeout(Duration::from_secs(5), request.send()).await {
1327 Ok(Ok(response)) => {
1328 if response.status().is_success() {
1329 stats.write().await.updates_sent += 1;
1330 debug!("Webhook delivered successfully to {}", url);
1331 } else {
1332 warn!(
1333 "Webhook delivery failed with status {}: {}",
1334 response.status(),
1335 url
1336 );
1337 }
1338 }
1339 Ok(Err(e)) => {
1340 warn!("Webhook delivery error for {}: {}", url, e);
1341 }
1342 Err(_) => {
1343 warn!("Webhook delivery timeout for {}", url);
1344 }
1345 }
1346 }
1347 _ => {
1348 warn!("Update channel not implemented yet");
1349 }
1350 }
1351 }
1352
1353 pub async fn subscribe(
1355 &self,
1356 channel: UpdateChannel,
1357 filters: Vec<UpdateFilter>,
1358 ) -> Result<String> {
1359 let mut subscribers = self.subscribers.write().await;
1360
1361 if subscribers.len() >= self.config.max_subscribers {
1362 return Err(anyhow!("Maximum subscriber limit reached"));
1363 }
1364
1365 let id = uuid::Uuid::new_v4().to_string();
1366 let subscriber = UpdateSubscriber {
1367 id: id.clone(),
1368 channel,
1369 filters,
1370 created_at: Instant::now(),
1371 last_update: None,
1372 update_count: 0,
1373 };
1374
1375 subscribers.insert(id.clone(), subscriber);
1376
1377 self.stats.write().await.total_subscribers = subscribers.len();
1378 self.stats.write().await.active_subscribers = subscribers.len();
1379
1380 Ok(id)
1381 }
1382
1383 pub async fn unsubscribe(&self, id: &str) -> Result<()> {
1385 let mut subscribers = self.subscribers.write().await;
1386 subscribers
1387 .remove(id)
1388 .ok_or_else(|| anyhow!("Subscriber not found"))?;
1389
1390 self.stats.write().await.total_subscribers = subscribers.len();
1391 self.stats.write().await.active_subscribers = subscribers.len();
1392
1393 Ok(())
1394 }
1395
1396 pub async fn get_stats(&self) -> UpdateManagerStats {
1398 self.stats.read().await.clone()
1399 }
1400}
1401
1402#[cfg(test)]
1403pub mod tests {
1404 use super::*;
1405
1406 pub struct MockRdfStore {
1408 pub log_position: Arc<RwLock<u64>>,
1409 pub changes: Arc<RwLock<Vec<TransactionLogEntry>>>,
1410 }
1411
1412 #[async_trait::async_trait]
1413 impl RdfStore for MockRdfStore {
1414 async fn get_transaction_log_position(&self) -> Result<u64> {
1415 Ok(*self.log_position.read().await)
1416 }
1417
1418 async fn read_transaction_log(
1419 &self,
1420 from: u64,
1421 limit: usize,
1422 ) -> Result<Vec<TransactionLogEntry>> {
1423 let changes = self.changes.read().await;
1424 Ok(changes
1425 .iter()
1426 .filter(|e| e.position >= from)
1427 .take(limit)
1428 .cloned()
1429 .collect())
1430 }
1431
1432 async fn subscribe_changes(&self) -> Result<mpsc::Receiver<StoreChange>> {
1433 let (_tx, rx) = mpsc::channel(100);
1434 Ok(rx)
1435 }
1436
1437 async fn get_statistics(&self) -> Result<StoreStatistics> {
1438 Ok(StoreStatistics {
1439 total_triples: 1000,
1440 total_graphs: 10,
1441 transaction_count: 100,
1442 last_modified: chrono::Utc::now(),
1443 })
1444 }
1445
1446 async fn apply_patch(&self, _patch: &RdfPatch) -> Result<()> {
1447 Ok(())
1448 }
1449
1450 async fn execute_update(&self, _update: &str) -> Result<()> {
1451 Ok(())
1452 }
1453
1454 async fn query(&self, _query: &str) -> Result<QueryResult> {
1455 Ok(QueryResult { bindings: vec![] })
1456 }
1457 }
1458
1459 #[tokio::test]
1460 async fn test_change_detection() {
1461 let store = Arc::new(MockRdfStore {
1462 log_position: Arc::new(RwLock::new(0)),
1463 changes: Arc::new(RwLock::new(vec![])),
1464 });
1465
1466 let stream_config = StreamConfig::memory();
1467 let producer = Arc::new(RwLock::new(
1468 StreamProducer::new(stream_config).await.unwrap(),
1469 ));
1470
1471 let strategy = ChangeDetectionStrategy::TransactionLog {
1472 poll_interval: Duration::from_millis(100),
1473 batch_size: 100,
1474 };
1475
1476 let detector =
1477 StoreChangeDetector::new(store, strategy, producer, ChangeDetectorConfig::default())
1478 .await
1479 .unwrap();
1480
1481 detector.start().await.unwrap();
1483
1484 let stats = detector.get_stats().await;
1486 assert_eq!(stats.changes_detected, 0);
1487 }
1488}