Skip to main content

oxirs_stream/
store_integration.rs

1//! # OxiRS Store Integration
2//!
3//! This module provides deep integration between oxirs-stream and oxirs-core store
4//! with change detection, real-time updates, and bi-directional synchronization.
5
6use anyhow::{anyhow, Result};
7use serde::{Deserialize, Serialize};
8use std::collections::{HashMap, HashSet};
9use std::mem;
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12use tokio::sync::{broadcast, mpsc, RwLock};
13use tokio::time::interval;
14use tracing::{debug, error, info, warn};
15
16#[cfg(test)]
17use crate::StreamConfig;
18use crate::{EventMetadata, RdfPatch, StreamConsumer, StreamEvent, StreamProducer};
19
20/// Store change detector for monitoring RDF store changes
21pub struct StoreChangeDetector {
22    /// Store connection
23    store: Arc<dyn RdfStore>,
24    /// Change detection strategy
25    strategy: ChangeDetectionStrategy,
26    /// Stream producer for publishing changes
27    producer: Arc<RwLock<StreamProducer>>,
28    /// Change buffer for batching
29    change_buffer: Arc<RwLock<Vec<StoreChange>>>,
30    /// Configuration
31    config: ChangeDetectorConfig,
32    /// Statistics
33    stats: Arc<RwLock<ChangeDetectorStats>>,
34    /// Change event notifier
35    change_notifier: broadcast::Sender<StoreChangeEvent>,
36}
37
38/// RDF Store trait for abstraction
39#[async_trait::async_trait]
40pub trait RdfStore: Send + Sync {
41    /// Get current transaction log position
42    async fn get_transaction_log_position(&self) -> Result<u64>;
43
44    /// Read transaction log from position
45    async fn read_transaction_log(
46        &self,
47        from: u64,
48        limit: usize,
49    ) -> Result<Vec<TransactionLogEntry>>;
50
51    /// Subscribe to store changes
52    async fn subscribe_changes(&self) -> Result<mpsc::Receiver<StoreChange>>;
53
54    /// Get store statistics
55    async fn get_statistics(&self) -> Result<StoreStatistics>;
56
57    /// Apply a patch to the store
58    async fn apply_patch(&self, patch: &RdfPatch) -> Result<()>;
59
60    /// Execute SPARQL update
61    async fn execute_update(&self, update: &str) -> Result<()>;
62
63    /// Query store
64    async fn query(&self, query: &str) -> Result<QueryResult>;
65}
66
67/// Change detection strategies
68#[derive(Debug, Clone, Serialize, Deserialize)]
69pub enum ChangeDetectionStrategy {
70    /// Transaction log tailing
71    TransactionLog {
72        poll_interval: Duration,
73        batch_size: usize,
74    },
75    /// Trigger-based detection
76    TriggerBased { trigger_types: Vec<TriggerType> },
77    /// Polling-based detection
78    Polling {
79        poll_interval: Duration,
80        snapshot_interval: Duration,
81    },
82    /// Event sourcing
83    EventSourcing { event_store_url: String },
84    /// Hybrid approach
85    Hybrid {
86        primary: Box<ChangeDetectionStrategy>,
87        fallback: Box<ChangeDetectionStrategy>,
88    },
89}
90
91/// Trigger types for trigger-based detection
92#[derive(Debug, Clone, Serialize, Deserialize)]
93pub enum TriggerType {
94    Insert,
95    Delete,
96    Update,
97    GraphChange,
98    SchemaChange,
99}
100
101/// Store change event
102#[derive(Debug, Clone)]
103pub struct StoreChange {
104    pub change_type: ChangeType,
105    pub timestamp: chrono::DateTime<chrono::Utc>,
106    pub transaction_id: Option<String>,
107    pub user: Option<String>,
108    pub affected_triples: Vec<Triple>,
109    pub metadata: HashMap<String, String>,
110}
111
112/// Change types
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub enum ChangeType {
115    TripleAdded,
116    TripleRemoved,
117    GraphCreated,
118    GraphDeleted,
119    GraphCleared,
120    BulkUpdate,
121    SchemaUpdate,
122}
123
124/// Triple representation
125#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
126pub struct Triple {
127    pub subject: String,
128    pub predicate: String,
129    pub object: String,
130    pub graph: Option<String>,
131}
132
133/// Transaction log entry
134#[derive(Debug, Clone)]
135pub struct TransactionLogEntry {
136    pub position: u64,
137    pub timestamp: chrono::DateTime<chrono::Utc>,
138    pub transaction_id: String,
139    pub operations: Vec<LogOperation>,
140    pub metadata: HashMap<String, String>,
141}
142
143/// Log operation types
144#[derive(Debug, Clone)]
145pub enum LogOperation {
146    Add(Triple),
147    Remove(Triple),
148    CreateGraph(String),
149    DeleteGraph(String),
150    ClearGraph(Option<String>),
151}
152
153/// Store statistics
154#[derive(Debug, Clone)]
155pub struct StoreStatistics {
156    pub total_triples: u64,
157    pub total_graphs: u64,
158    pub transaction_count: u64,
159    pub last_modified: chrono::DateTime<chrono::Utc>,
160}
161
162/// Query result placeholder
163#[derive(Debug, Clone)]
164pub struct QueryResult {
165    pub bindings: Vec<HashMap<String, String>>,
166}
167
168/// Change detector configuration
169#[derive(Debug, Clone)]
170pub struct ChangeDetectorConfig {
171    /// Maximum buffer size before flushing
172    pub buffer_size: usize,
173    /// Buffer flush interval
174    pub flush_interval: Duration,
175    /// Enable deduplication
176    pub enable_deduplication: bool,
177    /// Deduplication window
178    pub dedup_window: Duration,
179    /// Enable change compression
180    pub enable_compression: bool,
181    /// Minimum batch size for compression
182    pub compression_threshold: usize,
183}
184
185impl Default for ChangeDetectorConfig {
186    fn default() -> Self {
187        Self {
188            buffer_size: 1000,
189            flush_interval: Duration::from_millis(100),
190            enable_deduplication: true,
191            dedup_window: Duration::from_secs(60),
192            enable_compression: true,
193            compression_threshold: 100,
194        }
195    }
196}
197
198/// Change detector statistics
199#[derive(Debug, Default, Clone)]
200pub struct ChangeDetectorStats {
201    pub changes_detected: u64,
202    pub changes_published: u64,
203    pub changes_deduplicated: u64,
204    pub batches_compressed: u64,
205    pub errors: u64,
206    pub last_position: u64,
207    pub lag_ms: u64,
208}
209
210/// Store change event notifications
211#[derive(Debug, Clone)]
212pub enum StoreChangeEvent {
213    /// Changes detected
214    ChangesDetected { count: usize },
215    /// Changes published
216    ChangesPublished { count: usize },
217    /// Error occurred
218    Error { message: String },
219    /// Lag detected
220    LagDetected { lag_ms: u64 },
221}
222
223impl StoreChangeDetector {
224    /// Create a new store change detector
225    pub async fn new(
226        store: Arc<dyn RdfStore>,
227        strategy: ChangeDetectionStrategy,
228        producer: Arc<RwLock<StreamProducer>>,
229        config: ChangeDetectorConfig,
230    ) -> Result<Self> {
231        let (tx, _) = broadcast::channel(1000);
232
233        Ok(Self {
234            store,
235            strategy,
236            producer,
237            change_buffer: Arc::new(RwLock::new(Vec::new())),
238            config,
239            stats: Arc::new(RwLock::new(ChangeDetectorStats::default())),
240            change_notifier: tx,
241        })
242    }
243
244    /// Start change detection
245    pub fn start(
246        &self,
247    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + '_>> {
248        Box::pin(async move {
249            match &self.strategy {
250                ChangeDetectionStrategy::TransactionLog {
251                    poll_interval,
252                    batch_size,
253                } => {
254                    self.start_transaction_log_tailing(*poll_interval, *batch_size)
255                        .await
256                }
257                ChangeDetectionStrategy::TriggerBased { .. } => {
258                    self.start_trigger_based_detection().await
259                }
260                ChangeDetectionStrategy::Polling { poll_interval, .. } => {
261                    self.start_polling_detection(*poll_interval).await
262                }
263                ChangeDetectionStrategy::EventSourcing { .. } => self.start_event_sourcing().await,
264                ChangeDetectionStrategy::Hybrid {
265                    primary: _,
266                    fallback: _,
267                } => self.start_hybrid_detection().await,
268            }
269        })
270    }
271
272    /// Start transaction log tailing
273    async fn start_transaction_log_tailing(
274        &self,
275        poll_interval: Duration,
276        batch_size: usize,
277    ) -> Result<()> {
278        let store = self.store.clone();
279        let buffer = self.change_buffer.clone();
280        let stats = self.stats.clone();
281        let notifier = self.change_notifier.clone();
282
283        tokio::spawn(async move {
284            let mut interval = interval(poll_interval);
285            let mut last_position = 0u64;
286
287            loop {
288                interval.tick().await;
289
290                match store.read_transaction_log(last_position, batch_size).await {
291                    Ok(entries) => {
292                        if entries.is_empty() {
293                            continue;
294                        }
295
296                        let mut changes = Vec::new();
297
298                        for entry in entries {
299                            last_position = last_position.max(entry.position + 1);
300
301                            for op in entry.operations {
302                                let change = match op {
303                                    LogOperation::Add(triple) => StoreChange {
304                                        change_type: ChangeType::TripleAdded,
305                                        timestamp: entry.timestamp,
306                                        transaction_id: Some(entry.transaction_id.clone()),
307                                        user: entry.metadata.get("user").cloned(),
308                                        affected_triples: vec![triple],
309                                        metadata: entry.metadata.clone(),
310                                    },
311                                    LogOperation::Remove(triple) => StoreChange {
312                                        change_type: ChangeType::TripleRemoved,
313                                        timestamp: entry.timestamp,
314                                        transaction_id: Some(entry.transaction_id.clone()),
315                                        user: entry.metadata.get("user").cloned(),
316                                        affected_triples: vec![triple],
317                                        metadata: entry.metadata.clone(),
318                                    },
319                                    LogOperation::CreateGraph(graph) => StoreChange {
320                                        change_type: ChangeType::GraphCreated,
321                                        timestamp: entry.timestamp,
322                                        transaction_id: Some(entry.transaction_id.clone()),
323                                        user: entry.metadata.get("user").cloned(),
324                                        affected_triples: vec![],
325                                        metadata: {
326                                            let mut meta = entry.metadata.clone();
327                                            meta.insert("graph".to_string(), graph);
328                                            meta
329                                        },
330                                    },
331                                    LogOperation::DeleteGraph(graph) => StoreChange {
332                                        change_type: ChangeType::GraphDeleted,
333                                        timestamp: entry.timestamp,
334                                        transaction_id: Some(entry.transaction_id.clone()),
335                                        user: entry.metadata.get("user").cloned(),
336                                        affected_triples: vec![],
337                                        metadata: {
338                                            let mut meta = entry.metadata.clone();
339                                            meta.insert("graph".to_string(), graph);
340                                            meta
341                                        },
342                                    },
343                                    LogOperation::ClearGraph(graph) => StoreChange {
344                                        change_type: ChangeType::GraphCleared,
345                                        timestamp: entry.timestamp,
346                                        transaction_id: Some(entry.transaction_id.clone()),
347                                        user: entry.metadata.get("user").cloned(),
348                                        affected_triples: vec![],
349                                        metadata: {
350                                            let mut meta = entry.metadata.clone();
351                                            if let Some(g) = graph {
352                                                meta.insert("graph".to_string(), g);
353                                            }
354                                            meta
355                                        },
356                                    },
357                                };
358
359                                changes.push(change);
360                            }
361                        }
362
363                        // Update statistics
364                        {
365                            let mut stats_guard = stats.write().await;
366                            stats_guard.changes_detected += changes.len() as u64;
367                            stats_guard.last_position = last_position;
368                        }
369
370                        // Add to buffer
371                        buffer.write().await.extend(changes.clone());
372
373                        // Notify
374                        let _ = notifier.send(StoreChangeEvent::ChangesDetected {
375                            count: changes.len(),
376                        });
377
378                        debug!("Detected {} changes from transaction log", changes.len());
379                    }
380                    Err(e) => {
381                        error!("Failed to read transaction log: {}", e);
382                        stats.write().await.errors += 1;
383                        let _ = notifier.send(StoreChangeEvent::Error {
384                            message: e.to_string(),
385                        });
386                    }
387                }
388            }
389        });
390
391        // Start buffer flusher
392        self.start_buffer_flusher().await;
393
394        Ok(())
395    }
396
397    /// Start trigger-based detection
398    async fn start_trigger_based_detection(&self) -> Result<()> {
399        let mut receiver = self.store.subscribe_changes().await?;
400        let buffer = self.change_buffer.clone();
401        let stats = self.stats.clone();
402        let notifier = self.change_notifier.clone();
403
404        tokio::spawn(async move {
405            while let Some(change) = receiver.recv().await {
406                // Update statistics
407                stats.write().await.changes_detected += 1;
408
409                // Add to buffer
410                buffer.write().await.push(change);
411
412                // Notify
413                let _ = notifier.send(StoreChangeEvent::ChangesDetected { count: 1 });
414            }
415        });
416
417        // Start buffer flusher
418        self.start_buffer_flusher().await;
419
420        Ok(())
421    }
422
423    /// Start polling-based detection
424    async fn start_polling_detection(&self, poll_interval: Duration) -> Result<()> {
425        let store = self.store.clone();
426        let buffer = self.change_buffer.clone();
427        let stats = self.stats.clone();
428
429        // This would implement snapshot-based change detection
430        // by comparing periodic snapshots of the store
431
432        tokio::spawn(async move {
433            let mut interval = interval(poll_interval);
434            let mut last_snapshot: Option<StoreSnapshot> = None;
435
436            loop {
437                interval.tick().await;
438
439                // Get current snapshot
440                match Self::take_snapshot(&store).await {
441                    Ok(current_snapshot) => {
442                        if let Some(last) = &last_snapshot {
443                            // Compare snapshots and detect changes
444                            let changes = Self::compare_snapshots(last, &current_snapshot);
445
446                            if !changes.is_empty() {
447                                stats.write().await.changes_detected += changes.len() as u64;
448                                buffer.write().await.extend(changes);
449                            }
450                        }
451
452                        last_snapshot = Some(current_snapshot);
453                    }
454                    Err(e) => {
455                        error!("Failed to take snapshot: {}", e);
456                        stats.write().await.errors += 1;
457                    }
458                }
459            }
460        });
461
462        Ok(())
463    }
464
465    /// Start event sourcing
466    async fn start_event_sourcing(&self) -> Result<()> {
467        let ChangeDetectionStrategy::EventSourcing { event_store_url } = &self.strategy else {
468            return Err(anyhow!("Invalid strategy for event sourcing"));
469        };
470
471        let _store = self.store.clone();
472        let producer = self.producer.clone();
473        let stats = self.stats.clone();
474        let notifier = self.change_notifier.clone();
475        let event_store_url_clone = event_store_url.clone();
476
477        tokio::spawn(async move {
478            let mut interval = interval(Duration::from_secs(5));
479            let mut last_event_id = 0u64;
480
481            loop {
482                interval.tick().await;
483
484                // Connect to event store and fetch events
485                match Self::fetch_events_from_store(&event_store_url_clone, last_event_id).await {
486                    Ok(events) => {
487                        if !events.is_empty() {
488                            debug!("Fetched {} events from event store", events.len());
489
490                            // Convert events to store changes
491                            let changes = Self::convert_events_to_changes(events);
492
493                            // Update last event ID from metadata if available
494                            if let Some(last_change) = changes.last() {
495                                if let Some(event_id_str) = last_change.metadata.get("event_id") {
496                                    if let Ok(event_id) = event_id_str.parse::<u64>() {
497                                        last_event_id = event_id;
498                                    }
499                                }
500                            }
501
502                            // Convert to stream events and publish
503                            let stream_events = Self::convert_to_stream_events(changes);
504                            let count = stream_events.len();
505
506                            match producer.write().await.publish_batch(stream_events).await {
507                                Ok(_) => {
508                                    stats.write().await.changes_published += count as u64;
509                                    let _ =
510                                        notifier.send(StoreChangeEvent::ChangesPublished { count });
511                                }
512                                Err(e) => {
513                                    error!("Failed to publish events from event store: {}", e);
514                                    stats.write().await.errors += 1;
515                                }
516                            }
517                        }
518                    }
519                    Err(e) => {
520                        error!("Failed to fetch events from event store: {}", e);
521                        stats.write().await.errors += 1;
522                    }
523                }
524            }
525        });
526
527        info!("Started event sourcing from: {}", event_store_url);
528        Ok(())
529    }
530
531    /// Start hybrid detection
532    async fn start_hybrid_detection(&self) -> Result<()> {
533        let ChangeDetectionStrategy::Hybrid { primary, fallback } = &self.strategy else {
534            return Err(anyhow!("Invalid strategy for hybrid detection"));
535        };
536
537        let primary_detector = Self::new(
538            self.store.clone(),
539            *primary.clone(),
540            self.producer.clone(),
541            self.config.clone(),
542        )
543        .await?;
544
545        let fallback_detector = Self::new(
546            self.store.clone(),
547            *fallback.clone(),
548            self.producer.clone(),
549            self.config.clone(),
550        )
551        .await?;
552
553        let stats = self.stats.clone();
554        let notifier = self.change_notifier.clone();
555
556        // Start primary strategy
557        if let Err(e) = primary_detector.start().await {
558            error!("Primary strategy failed: {}", e);
559            stats.write().await.errors += 1;
560            let _ = notifier.send(StoreChangeEvent::Error {
561                message: format!("Primary strategy failed: {e}"),
562            });
563        }
564
565        // Start fallback strategy with monitoring
566        let fallback_stats = self.stats.clone();
567        let fallback_notifier = self.change_notifier.clone();
568
569        tokio::spawn(async move {
570            // Wait a bit to see if primary strategy is working
571            tokio::time::sleep(Duration::from_secs(30)).await;
572
573            // Check if primary strategy is producing events
574            let primary_events = fallback_stats.read().await.changes_published;
575
576            // If no events in the last 30 seconds, start fallback
577            if primary_events == 0 {
578                info!("Primary strategy not producing events, starting fallback");
579
580                if let Err(e) = fallback_detector
581                    .start_polling_detection(Duration::from_secs(5))
582                    .await
583                {
584                    error!("Fallback strategy failed: {}", e);
585                    fallback_stats.write().await.errors += 1;
586                    let _ = fallback_notifier.send(StoreChangeEvent::Error {
587                        message: format!("Fallback strategy failed: {e}"),
588                    });
589                }
590            }
591        });
592
593        info!("Started hybrid detection with primary and fallback strategies");
594        Ok(())
595    }
596
597    /// Start buffer flusher
598    async fn start_buffer_flusher(&self) {
599        let buffer = self.change_buffer.clone();
600        let producer = self.producer.clone();
601        let config = self.config.clone();
602        let stats = self.stats.clone();
603        let notifier = self.change_notifier.clone();
604
605        tokio::spawn(async move {
606            let mut interval = interval(config.flush_interval);
607            let mut dedup_cache = if config.enable_deduplication {
608                Some(DedupCache::new(config.dedup_window))
609            } else {
610                None
611            };
612
613            loop {
614                interval.tick().await;
615
616                let mut changes = {
617                    let mut buffer_guard = buffer.write().await;
618                    std::mem::take(&mut *buffer_guard)
619                };
620
621                if changes.is_empty() {
622                    continue;
623                }
624
625                // Apply deduplication if enabled
626                if let Some(cache) = &mut dedup_cache {
627                    let original_count = changes.len();
628                    changes = cache.deduplicate(changes);
629                    let dedup_count = original_count - changes.len();
630
631                    if dedup_count > 0 {
632                        stats.write().await.changes_deduplicated += dedup_count as u64;
633                        debug!("Deduplicated {} changes", dedup_count);
634                    }
635                }
636
637                // Convert to stream events
638                let events = Self::convert_to_stream_events(changes);
639                let count = events.len();
640
641                // Publish events
642                match producer.write().await.publish_batch(events).await {
643                    Ok(_) => {
644                        stats.write().await.changes_published += count as u64;
645                        let _ = notifier.send(StoreChangeEvent::ChangesPublished { count });
646                        debug!("Published {} changes to stream", count);
647                    }
648                    Err(e) => {
649                        error!("Failed to publish changes: {}", e);
650                        stats.write().await.errors += 1;
651                        let _ = notifier.send(StoreChangeEvent::Error {
652                            message: e.to_string(),
653                        });
654                    }
655                }
656            }
657        });
658    }
659
660    /// Convert store changes to stream events
661    fn convert_to_stream_events(changes: Vec<StoreChange>) -> Vec<StreamEvent> {
662        changes
663            .into_iter()
664            .flat_map(|change| {
665                let metadata = EventMetadata {
666                    event_id: uuid::Uuid::new_v4().to_string(),
667                    timestamp: change.timestamp,
668                    source: "store-change-detector".to_string(),
669                    user: change.user,
670                    context: change.transaction_id.clone(),
671                    caused_by: None,
672                    version: "1.0".to_string(),
673                    properties: change.metadata.clone(),
674                    checksum: None,
675                };
676
677                match change.change_type {
678                    ChangeType::TripleAdded => change
679                        .affected_triples
680                        .into_iter()
681                        .map(|triple| StreamEvent::TripleAdded {
682                            subject: triple.subject,
683                            predicate: triple.predicate,
684                            object: triple.object,
685                            graph: triple.graph,
686                            metadata: metadata.clone(),
687                        })
688                        .collect(),
689                    ChangeType::TripleRemoved => change
690                        .affected_triples
691                        .into_iter()
692                        .map(|triple| StreamEvent::TripleRemoved {
693                            subject: triple.subject,
694                            predicate: triple.predicate,
695                            object: triple.object,
696                            graph: triple.graph,
697                            metadata: metadata.clone(),
698                        })
699                        .collect(),
700                    ChangeType::GraphCreated => {
701                        if let Some(graph) = change.metadata.get("graph") {
702                            vec![StreamEvent::GraphCreated {
703                                graph: graph.clone(),
704                                metadata,
705                            }]
706                        } else {
707                            vec![]
708                        }
709                    }
710                    ChangeType::GraphDeleted => {
711                        if let Some(graph) = change.metadata.get("graph") {
712                            vec![StreamEvent::GraphDeleted {
713                                graph: graph.clone(),
714                                metadata,
715                            }]
716                        } else {
717                            vec![]
718                        }
719                    }
720                    ChangeType::GraphCleared => {
721                        vec![StreamEvent::GraphCleared {
722                            graph: change.metadata.get("graph").cloned(),
723                            metadata,
724                        }]
725                    }
726                    _ => vec![],
727                }
728            })
729            .collect()
730    }
731
732    /// Take a snapshot of the store
733    async fn take_snapshot(store: &Arc<dyn RdfStore>) -> Result<StoreSnapshot> {
734        let stats = store.get_statistics().await?;
735
736        // Generate a basic content hash by querying key metrics
737        let checksum = format!(
738            "{}-{}-{}",
739            stats.total_triples,
740            stats.total_graphs,
741            stats.last_modified.timestamp()
742        );
743
744        Ok(StoreSnapshot {
745            timestamp: chrono::Utc::now(),
746            triple_count: stats.total_triples,
747            graph_count: stats.total_graphs,
748            checksum,
749        })
750    }
751
752    /// Compare two snapshots to detect changes
753    fn compare_snapshots(old: &StoreSnapshot, new: &StoreSnapshot) -> Vec<StoreChange> {
754        let mut changes = Vec::new();
755
756        // Detect triple count changes
757        if new.triple_count != old.triple_count {
758            let change = if new.triple_count > old.triple_count {
759                StoreChange {
760                    change_type: ChangeType::BulkUpdate,
761                    timestamp: new.timestamp,
762                    transaction_id: None,
763                    user: Some("system".to_string()),
764                    affected_triples: vec![],
765                    metadata: {
766                        let mut meta = HashMap::new();
767                        meta.insert("change_type".to_string(), "triples_added".to_string());
768                        meta.insert("old_count".to_string(), old.triple_count.to_string());
769                        meta.insert("new_count".to_string(), new.triple_count.to_string());
770                        meta
771                    },
772                }
773            } else {
774                StoreChange {
775                    change_type: ChangeType::BulkUpdate,
776                    timestamp: new.timestamp,
777                    transaction_id: None,
778                    user: Some("system".to_string()),
779                    affected_triples: vec![],
780                    metadata: {
781                        let mut meta = HashMap::new();
782                        meta.insert("change_type".to_string(), "triples_removed".to_string());
783                        meta.insert("old_count".to_string(), old.triple_count.to_string());
784                        meta.insert("new_count".to_string(), new.triple_count.to_string());
785                        meta
786                    },
787                }
788            };
789            changes.push(change);
790        }
791
792        // Detect graph count changes
793        if new.graph_count != old.graph_count {
794            let change = if new.graph_count > old.graph_count {
795                StoreChange {
796                    change_type: ChangeType::GraphCreated,
797                    timestamp: new.timestamp,
798                    transaction_id: None,
799                    user: Some("system".to_string()),
800                    affected_triples: vec![],
801                    metadata: {
802                        let mut meta = HashMap::new();
803                        meta.insert("change_type".to_string(), "graphs_added".to_string());
804                        meta.insert("old_count".to_string(), old.graph_count.to_string());
805                        meta.insert("new_count".to_string(), new.graph_count.to_string());
806                        meta
807                    },
808                }
809            } else {
810                StoreChange {
811                    change_type: ChangeType::GraphDeleted,
812                    timestamp: new.timestamp,
813                    transaction_id: None,
814                    user: Some("system".to_string()),
815                    affected_triples: vec![],
816                    metadata: {
817                        let mut meta = HashMap::new();
818                        meta.insert("change_type".to_string(), "graphs_removed".to_string());
819                        meta.insert("old_count".to_string(), old.graph_count.to_string());
820                        meta.insert("new_count".to_string(), new.graph_count.to_string());
821                        meta
822                    },
823                }
824            };
825            changes.push(change);
826        }
827
828        changes
829    }
830
831    /// Get statistics
832    pub async fn get_stats(&self) -> ChangeDetectorStats {
833        self.stats.read().await.clone()
834    }
835
836    /// Subscribe to change events
837    pub fn subscribe(&self) -> broadcast::Receiver<StoreChangeEvent> {
838        self.change_notifier.subscribe()
839    }
840
841    /// Fetch events from external event store
842    async fn fetch_events_from_store(
843        event_store_url: &str,
844        from_id: u64,
845    ) -> Result<Vec<EventStoreEvent>> {
846        // This would make HTTP requests to fetch events from external event store
847        // For now, simulate fetching events
848
849        use reqwest;
850
851        let client = reqwest::Client::new();
852        let url = format!("{event_store_url}/events?from={from_id}&limit=100");
853
854        match client.get(&url).send().await {
855            Ok(response) => {
856                if response.status().is_success() {
857                    match response.json::<Vec<EventStoreEvent>>().await {
858                        Ok(events) => Ok(events),
859                        Err(e) => {
860                            warn!("Failed to parse events from event store: {}", e);
861                            Ok(vec![]) // Return empty vec instead of error
862                        }
863                    }
864                } else {
865                    warn!("Event store returned status: {}", response.status());
866                    Ok(vec![])
867                }
868            }
869            Err(e) => {
870                debug!("Failed to connect to event store: {}", e);
871                Ok(vec![]) // Return empty vec for connection issues
872            }
873        }
874    }
875
876    /// Convert event store events to store changes
877    fn convert_events_to_changes(events: Vec<EventStoreEvent>) -> Vec<StoreChange> {
878        events
879            .into_iter()
880            .filter_map(|event| match event.event_type.as_str() {
881                "triple_added" => Some(StoreChange {
882                    change_type: ChangeType::TripleAdded,
883                    affected_triples: vec![Triple {
884                        subject: event.data.get("subject")?.clone(),
885                        predicate: event.data.get("predicate")?.clone(),
886                        object: event.data.get("object")?.clone(),
887                        graph: event.data.get("graph").cloned(),
888                    }],
889                    timestamp: event.timestamp,
890                    transaction_id: event.transaction_id,
891                    user: Some(event.user.unwrap_or_else(|| "system".to_string())),
892                    metadata: event.metadata,
893                }),
894                "triple_removed" => Some(StoreChange {
895                    change_type: ChangeType::TripleRemoved,
896                    affected_triples: vec![Triple {
897                        subject: event.data.get("subject")?.clone(),
898                        predicate: event.data.get("predicate")?.clone(),
899                        object: event.data.get("object")?.clone(),
900                        graph: event.data.get("graph").cloned(),
901                    }],
902                    timestamp: event.timestamp,
903                    transaction_id: event.transaction_id,
904                    user: Some(event.user.unwrap_or_else(|| "system".to_string())),
905                    metadata: event.metadata,
906                }),
907                "graph_created" => Some(StoreChange {
908                    change_type: ChangeType::GraphCreated,
909                    affected_triples: vec![],
910                    timestamp: event.timestamp,
911                    transaction_id: event.transaction_id,
912                    user: Some(event.user.unwrap_or_else(|| "system".to_string())),
913                    metadata: {
914                        let mut meta = event.metadata;
915                        if let Some(graph) = event.data.get("graph") {
916                            meta.insert("graph".to_string(), graph.clone());
917                        }
918                        meta
919                    },
920                }),
921                _ => {
922                    debug!("Unknown event type: {}", event.event_type);
923                    None
924                }
925            })
926            .collect()
927    }
928}
929
930/// Event store event from external event store
931#[derive(Debug, Clone, Serialize, Deserialize)]
932struct EventStoreEvent {
933    pub id: u64,
934    pub event_type: String,
935    pub data: HashMap<String, String>,
936    pub metadata: HashMap<String, String>,
937    pub timestamp: chrono::DateTime<chrono::Utc>,
938    pub transaction_id: Option<String>,
939    pub user: Option<String>,
940}
941
942/// Store snapshot for polling-based detection
943#[derive(Debug, Clone)]
944struct StoreSnapshot {
945    timestamp: chrono::DateTime<chrono::Utc>,
946    triple_count: u64,
947    graph_count: u64,
948    checksum: String,
949}
950
951/// Deduplication cache
952struct DedupCache {
953    seen: Arc<RwLock<HashSet<String>>>,
954    window: Duration,
955    last_cleanup: Arc<RwLock<Instant>>,
956}
957
958impl DedupCache {
959    fn new(window: Duration) -> Self {
960        Self {
961            seen: Arc::new(RwLock::new(HashSet::new())),
962            window,
963            last_cleanup: Arc::new(RwLock::new(Instant::now())),
964        }
965    }
966
967    fn deduplicate(&mut self, changes: Vec<StoreChange>) -> Vec<StoreChange> {
968        // This would implement proper deduplication logic
969        // For now, return all changes
970        changes
971    }
972}
973
974/// Real-time update manager for pushing updates to subscribers
975pub struct RealtimeUpdateManager {
976    /// Update subscribers
977    subscribers: Arc<RwLock<HashMap<String, UpdateSubscriber>>>,
978    /// Stream consumer
979    consumer: Arc<RwLock<StreamConsumer>>,
980    /// Update filters
981    filters: Arc<RwLock<Vec<UpdateFilter>>>,
982    /// Configuration
983    config: UpdateManagerConfig,
984    /// Statistics
985    stats: Arc<RwLock<UpdateManagerStats>>,
986}
987
988/// Update subscriber
989#[derive(Debug)]
990struct UpdateSubscriber {
991    id: String,
992    channel: UpdateChannel,
993    filters: Vec<UpdateFilter>,
994    created_at: Instant,
995    last_update: Option<Instant>,
996    update_count: u64,
997}
998
999/// Update channel types
1000#[derive(Debug)]
1001pub enum UpdateChannel {
1002    /// WebSocket connection
1003    WebSocket(mpsc::Sender<UpdateNotification>),
1004    /// Server-sent events
1005    ServerSentEvents(mpsc::Sender<UpdateNotification>),
1006    /// Webhook
1007    Webhook {
1008        url: String,
1009        headers: HashMap<String, String>,
1010    },
1011    /// Message queue
1012    MessageQueue { topic: String },
1013}
1014
1015/// Update filter
1016#[derive(Debug, Clone)]
1017pub struct UpdateFilter {
1018    /// Filter by graph
1019    pub graph_filter: Option<String>,
1020    /// Filter by subject pattern
1021    pub subject_pattern: Option<regex::Regex>,
1022    /// Filter by predicate
1023    pub predicate_filter: Option<String>,
1024    /// Filter by change type
1025    pub change_types: Option<Vec<ChangeType>>,
1026}
1027
1028/// Update notification
1029#[derive(Debug, Clone, Serialize, Deserialize)]
1030pub struct UpdateNotification {
1031    pub id: String,
1032    pub timestamp: chrono::DateTime<chrono::Utc>,
1033    pub changes: Vec<ChangeNotification>,
1034    pub metadata: HashMap<String, String>,
1035}
1036
1037/// Change notification
1038#[derive(Debug, Clone, Serialize, Deserialize)]
1039pub struct ChangeNotification {
1040    pub change_type: String,
1041    pub subject: Option<String>,
1042    pub predicate: Option<String>,
1043    pub object: Option<String>,
1044    pub graph: Option<String>,
1045}
1046
1047/// Update manager configuration
1048#[derive(Debug, Clone)]
1049pub struct UpdateManagerConfig {
1050    /// Maximum subscribers
1051    pub max_subscribers: usize,
1052    /// Update batch size
1053    pub batch_size: usize,
1054    /// Batch timeout
1055    pub batch_timeout: Duration,
1056    /// Enable filtering
1057    pub enable_filtering: bool,
1058    /// Retry failed webhooks
1059    pub retry_webhooks: bool,
1060    /// Webhook timeout
1061    pub webhook_timeout: Duration,
1062}
1063
1064impl Default for UpdateManagerConfig {
1065    fn default() -> Self {
1066        Self {
1067            max_subscribers: 1000,
1068            batch_size: 100,
1069            batch_timeout: Duration::from_millis(50),
1070            enable_filtering: true,
1071            retry_webhooks: true,
1072            webhook_timeout: Duration::from_secs(5),
1073        }
1074    }
1075}
1076
1077/// Update manager statistics
1078#[derive(Debug, Default, Clone)]
1079pub struct UpdateManagerStats {
1080    pub total_subscribers: usize,
1081    pub active_subscribers: usize,
1082    pub updates_sent: u64,
1083    pub updates_filtered: u64,
1084    pub webhook_failures: u64,
1085    pub avg_latency_ms: f64,
1086}
1087
1088impl RealtimeUpdateManager {
1089    /// Create a new realtime update manager
1090    pub async fn new(
1091        consumer: Arc<RwLock<StreamConsumer>>,
1092        config: UpdateManagerConfig,
1093    ) -> Result<Self> {
1094        Ok(Self {
1095            subscribers: Arc::new(RwLock::new(HashMap::new())),
1096            consumer,
1097            filters: Arc::new(RwLock::new(Vec::new())),
1098            config,
1099            stats: Arc::new(RwLock::new(UpdateManagerStats::default())),
1100        })
1101    }
1102
1103    /// Start processing updates
1104    pub async fn start(&self) -> Result<()> {
1105        let consumer = self.consumer.clone();
1106        let subscribers = self.subscribers.clone();
1107        let filters = self.filters.clone();
1108        let config = self.config.clone();
1109        let stats = self.stats.clone();
1110
1111        tokio::spawn(async move {
1112            let mut batch = Vec::new();
1113            let mut batch_timer = Instant::now();
1114
1115            loop {
1116                // Try to consume an event
1117                match tokio::time::timeout(
1118                    Duration::from_millis(10),
1119                    consumer.write().await.consume(),
1120                )
1121                .await
1122                {
1123                    Ok(Ok(Some(event))) => {
1124                        batch.push(event);
1125
1126                        // Check if batch is ready
1127                        if batch.len() >= config.batch_size
1128                            || batch_timer.elapsed() > config.batch_timeout
1129                        {
1130                            Self::process_batch(
1131                                mem::take(&mut batch),
1132                                &subscribers,
1133                                &filters,
1134                                &config,
1135                                &stats,
1136                            )
1137                            .await;
1138                            batch_timer = Instant::now();
1139                        }
1140                    }
1141                    Ok(Ok(None)) => {
1142                        // No event available
1143                        if !batch.is_empty() && batch_timer.elapsed() > config.batch_timeout {
1144                            Self::process_batch(
1145                                mem::take(&mut batch),
1146                                &subscribers,
1147                                &filters,
1148                                &config,
1149                                &stats,
1150                            )
1151                            .await;
1152                            batch_timer = Instant::now();
1153                        }
1154                    }
1155                    Ok(Err(e)) => {
1156                        error!("Consumer error: {}", e);
1157                    }
1158                    Err(_) => {
1159                        // Timeout - check batch
1160                        if !batch.is_empty() && batch_timer.elapsed() > config.batch_timeout {
1161                            Self::process_batch(
1162                                mem::take(&mut batch),
1163                                &subscribers,
1164                                &filters,
1165                                &config,
1166                                &stats,
1167                            )
1168                            .await;
1169                            batch_timer = Instant::now();
1170                        }
1171                    }
1172                }
1173            }
1174        });
1175
1176        Ok(())
1177    }
1178
1179    /// Process a batch of events
1180    async fn process_batch(
1181        events: Vec<StreamEvent>,
1182        subscribers: &Arc<RwLock<HashMap<String, UpdateSubscriber>>>,
1183        _filters: &Arc<RwLock<Vec<UpdateFilter>>>,
1184        config: &UpdateManagerConfig,
1185        stats: &Arc<RwLock<UpdateManagerStats>>,
1186    ) {
1187        let notification = UpdateNotification {
1188            id: uuid::Uuid::new_v4().to_string(),
1189            timestamp: chrono::Utc::now(),
1190            changes: events.iter().map(Self::event_to_notification).collect(),
1191            metadata: HashMap::new(),
1192        };
1193
1194        let subscribers_guard = subscribers.read().await;
1195        for (_id, subscriber) in subscribers_guard.iter() {
1196            // Apply filters if enabled
1197            if config.enable_filtering && !subscriber.filters.is_empty() {
1198                let filtered_changes: Vec<_> = notification
1199                    .changes
1200                    .iter()
1201                    .filter(|change| Self::matches_filters(change, &subscriber.filters))
1202                    .cloned()
1203                    .collect();
1204
1205                if filtered_changes.is_empty() {
1206                    continue;
1207                }
1208
1209                let filtered_notification = UpdateNotification {
1210                    changes: filtered_changes,
1211                    ..notification.clone()
1212                };
1213
1214                Self::send_to_subscriber(subscriber, filtered_notification, config, stats).await;
1215            } else {
1216                Self::send_to_subscriber(subscriber, notification.clone(), config, stats).await;
1217            }
1218        }
1219    }
1220
1221    /// Convert stream event to change notification
1222    fn event_to_notification(event: &StreamEvent) -> ChangeNotification {
1223        match event {
1224            StreamEvent::TripleAdded {
1225                subject,
1226                predicate,
1227                object,
1228                graph,
1229                ..
1230            } => ChangeNotification {
1231                change_type: "triple_added".to_string(),
1232                subject: Some(subject.clone()),
1233                predicate: Some(predicate.clone()),
1234                object: Some(object.clone()),
1235                graph: graph.clone(),
1236            },
1237            StreamEvent::TripleRemoved {
1238                subject,
1239                predicate,
1240                object,
1241                graph,
1242                ..
1243            } => ChangeNotification {
1244                change_type: "triple_removed".to_string(),
1245                subject: Some(subject.clone()),
1246                predicate: Some(predicate.clone()),
1247                object: Some(object.clone()),
1248                graph: graph.clone(),
1249            },
1250            StreamEvent::GraphCreated { graph, .. } => ChangeNotification {
1251                change_type: "graph_created".to_string(),
1252                subject: None,
1253                predicate: None,
1254                object: None,
1255                graph: Some(graph.clone()),
1256            },
1257            _ => ChangeNotification {
1258                change_type: "unknown".to_string(),
1259                subject: None,
1260                predicate: None,
1261                object: None,
1262                graph: None,
1263            },
1264        }
1265    }
1266
1267    /// Check if change matches filters
1268    fn matches_filters(change: &ChangeNotification, filters: &[UpdateFilter]) -> bool {
1269        filters.iter().any(|filter| {
1270            // Check graph filter
1271            if let Some(graph_filter) = &filter.graph_filter {
1272                if change.graph.as_ref() != Some(graph_filter) {
1273                    return false;
1274                }
1275            }
1276
1277            // Check subject pattern
1278            if let Some(pattern) = &filter.subject_pattern {
1279                if let Some(subject) = &change.subject {
1280                    if !pattern.is_match(subject) {
1281                        return false;
1282                    }
1283                }
1284            }
1285
1286            // Check predicate filter
1287            if let Some(pred_filter) = &filter.predicate_filter {
1288                if change.predicate.as_ref() != Some(pred_filter) {
1289                    return false;
1290                }
1291            }
1292
1293            true
1294        })
1295    }
1296
1297    /// Send notification to subscriber
1298    async fn send_to_subscriber(
1299        subscriber: &UpdateSubscriber,
1300        notification: UpdateNotification,
1301        _config: &UpdateManagerConfig,
1302        stats: &Arc<RwLock<UpdateManagerStats>>,
1303    ) {
1304        match &subscriber.channel {
1305            UpdateChannel::WebSocket(sender) => {
1306                if let Err(e) = sender.send(notification).await {
1307                    warn!(
1308                        "Failed to send to WebSocket subscriber {}: {}",
1309                        subscriber.id, e
1310                    );
1311                } else {
1312                    stats.write().await.updates_sent += 1;
1313                }
1314            }
1315            UpdateChannel::Webhook { url, headers } => {
1316                // Implement webhook delivery using reqwest
1317                let client = reqwest::Client::new();
1318                let mut request = client.post(url).json(&notification);
1319
1320                // Add custom headers
1321                for (key, value) in headers {
1322                    request = request.header(key, value);
1323                }
1324
1325                // Send webhook with timeout
1326                match tokio::time::timeout(Duration::from_secs(5), request.send()).await {
1327                    Ok(Ok(response)) => {
1328                        if response.status().is_success() {
1329                            stats.write().await.updates_sent += 1;
1330                            debug!("Webhook delivered successfully to {}", url);
1331                        } else {
1332                            warn!(
1333                                "Webhook delivery failed with status {}: {}",
1334                                response.status(),
1335                                url
1336                            );
1337                        }
1338                    }
1339                    Ok(Err(e)) => {
1340                        warn!("Webhook delivery error for {}: {}", url, e);
1341                    }
1342                    Err(_) => {
1343                        warn!("Webhook delivery timeout for {}", url);
1344                    }
1345                }
1346            }
1347            _ => {
1348                warn!("Update channel not implemented yet");
1349            }
1350        }
1351    }
1352
1353    /// Subscribe for updates
1354    pub async fn subscribe(
1355        &self,
1356        channel: UpdateChannel,
1357        filters: Vec<UpdateFilter>,
1358    ) -> Result<String> {
1359        let mut subscribers = self.subscribers.write().await;
1360
1361        if subscribers.len() >= self.config.max_subscribers {
1362            return Err(anyhow!("Maximum subscriber limit reached"));
1363        }
1364
1365        let id = uuid::Uuid::new_v4().to_string();
1366        let subscriber = UpdateSubscriber {
1367            id: id.clone(),
1368            channel,
1369            filters,
1370            created_at: Instant::now(),
1371            last_update: None,
1372            update_count: 0,
1373        };
1374
1375        subscribers.insert(id.clone(), subscriber);
1376
1377        self.stats.write().await.total_subscribers = subscribers.len();
1378        self.stats.write().await.active_subscribers = subscribers.len();
1379
1380        Ok(id)
1381    }
1382
1383    /// Unsubscribe from updates
1384    pub async fn unsubscribe(&self, id: &str) -> Result<()> {
1385        let mut subscribers = self.subscribers.write().await;
1386        subscribers
1387            .remove(id)
1388            .ok_or_else(|| anyhow!("Subscriber not found"))?;
1389
1390        self.stats.write().await.total_subscribers = subscribers.len();
1391        self.stats.write().await.active_subscribers = subscribers.len();
1392
1393        Ok(())
1394    }
1395
1396    /// Get statistics
1397    pub async fn get_stats(&self) -> UpdateManagerStats {
1398        self.stats.read().await.clone()
1399    }
1400}
1401
1402#[cfg(test)]
1403pub mod tests {
1404    use super::*;
1405
1406    // Mock RDF store for testing
1407    pub struct MockRdfStore {
1408        pub log_position: Arc<RwLock<u64>>,
1409        pub changes: Arc<RwLock<Vec<TransactionLogEntry>>>,
1410    }
1411
1412    #[async_trait::async_trait]
1413    impl RdfStore for MockRdfStore {
1414        async fn get_transaction_log_position(&self) -> Result<u64> {
1415            Ok(*self.log_position.read().await)
1416        }
1417
1418        async fn read_transaction_log(
1419            &self,
1420            from: u64,
1421            limit: usize,
1422        ) -> Result<Vec<TransactionLogEntry>> {
1423            let changes = self.changes.read().await;
1424            Ok(changes
1425                .iter()
1426                .filter(|e| e.position >= from)
1427                .take(limit)
1428                .cloned()
1429                .collect())
1430        }
1431
1432        async fn subscribe_changes(&self) -> Result<mpsc::Receiver<StoreChange>> {
1433            let (_tx, rx) = mpsc::channel(100);
1434            Ok(rx)
1435        }
1436
1437        async fn get_statistics(&self) -> Result<StoreStatistics> {
1438            Ok(StoreStatistics {
1439                total_triples: 1000,
1440                total_graphs: 10,
1441                transaction_count: 100,
1442                last_modified: chrono::Utc::now(),
1443            })
1444        }
1445
1446        async fn apply_patch(&self, _patch: &RdfPatch) -> Result<()> {
1447            Ok(())
1448        }
1449
1450        async fn execute_update(&self, _update: &str) -> Result<()> {
1451            Ok(())
1452        }
1453
1454        async fn query(&self, _query: &str) -> Result<QueryResult> {
1455            Ok(QueryResult { bindings: vec![] })
1456        }
1457    }
1458
1459    #[tokio::test]
1460    async fn test_change_detection() {
1461        let store = Arc::new(MockRdfStore {
1462            log_position: Arc::new(RwLock::new(0)),
1463            changes: Arc::new(RwLock::new(vec![])),
1464        });
1465
1466        let stream_config = StreamConfig::memory();
1467        let producer = Arc::new(RwLock::new(
1468            StreamProducer::new(stream_config).await.unwrap(),
1469        ));
1470
1471        let strategy = ChangeDetectionStrategy::TransactionLog {
1472            poll_interval: Duration::from_millis(100),
1473            batch_size: 100,
1474        };
1475
1476        let detector =
1477            StoreChangeDetector::new(store, strategy, producer, ChangeDetectorConfig::default())
1478                .await
1479                .unwrap();
1480
1481        // Start detection
1482        detector.start().await.unwrap();
1483
1484        // Verify statistics
1485        let stats = detector.get_stats().await;
1486        assert_eq!(stats.changes_detected, 0);
1487    }
1488}