sqlitegraph 2.0.7

Embedded graph database with full ACID transactions, HNSW vector search, dual backend support, and comprehensive graph algorithms library
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
//! Publisher for in-process pub/sub system
//!
//! This module implements the event delivery mechanism using Rust channels.
//!
//! # Architecture
//!
//! - **Channel-based:** Uses `std::sync::mpsc` for in-process delivery
//! - **Multiple subscribers:** Each subscriber gets their own channel
//! - **Synchronous emit:** `emit()` is synchronous on the commit path (no background threads)
//! - **Best-effort delivery:** Events are dropped if channel is full or receiver is gone
//!
//! # Thread Safety
//!
//! The `Publisher` uses `Arc<Mutex<>>` to allow thread-safe access to the subscriber list.
//! This means multiple threads can subscribe/unsubscribe concurrently.

use crate::backend::native::v2::pubsub::{NodeMetadata, PubSubEvent, SubscriberId, SubscriptionFilter};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};

/// Publishes events to subscribers via channels
///
/// The Publisher maintains a list of subscribers with their channels and filters.
/// When an event is emitted, it is delivered to all subscribers whose filters match.
///
/// # Best-Effort Delivery
///
/// This is a **best-effort** system:
/// - If a channel is full, the event is dropped
/// - If a receiver has been dropped, the event is dropped
/// - No blocking, no retries, no guarantees
///
/// This design ensures that slow or dead subscribers cannot block the commit path.
///
/// # Thread Safety
///
/// Publisher can be safely shared across threads via `Arc<Mutex<Publisher>>` or
/// by cloning the `Arc` wrapping the internal state.
#[derive(Debug)]
pub struct Publisher {
    /// Channel senders for each subscriber
    ///
    /// Each tuple contains:
    /// - SubscriberId: Unique identifier for the subscriber
    /// - Sender: Channel end for sending events to this subscriber
    /// - SubscriptionFilter: Filter for which events this subscriber receives
    senders: Arc<Mutex<Vec<(SubscriberId, Sender<PubSubEvent>, SubscriptionFilter)>>>,
    /// Next subscriber ID
    ///
    /// Used to generate unique subscriber IDs. We don't use SubscriberId::new()
    /// here because we need to track IDs within the publisher context.
    next_id: Arc<Mutex<u64>>,
}

impl Publisher {
    /// Create a new publisher
    ///
    /// # Example
    ///
    /// ```rust
    /// use sqlitegraph::backend::native::v2::pubsub::Publisher;
    ///
    /// let publisher = Publisher::new();
    /// assert_eq!(publisher.subscriber_count(), 0);
    /// ```
    pub fn new() -> Self {
        Self {
            senders: Arc::new(Mutex::new(Vec::new())),
            next_id: Arc::new(Mutex::new(1)),
        }
    }

    /// Subscribe to events with a filter
    ///
    /// Creates a new channel for this subscriber and returns the receiver end.
    /// The subscriber will receive only events that match the provided filter.
    ///
    /// # Arguments
    ///
    /// * `filter` - Filter for which events this subscriber receives
    ///
    /// # Returns
    ///
    /// A tuple of:
    /// - `SubscriberId`: Unique identifier for this subscription
    /// - `Receiver<PubSubEvent>`: Channel end for receiving events
    ///
    /// # Example
    ///
    /// ```rust
    /// use sqlitegraph::backend::native::v2::pubsub::{Publisher, SubscriptionFilter};
    ///
    /// let publisher = Publisher::new();
    /// let filter = SubscriptionFilter::all();
    /// let (_id, _rx) = publisher.subscribe(filter);
    /// ```
    pub fn subscribe(&self, filter: SubscriptionFilter) -> (SubscriberId, Receiver<PubSubEvent>) {
        let (tx, rx) = mpsc::channel();
        let id = {
            let mut next = self.next_id.lock().unwrap();
            let id = SubscriberId::from_raw(*next);
            *next += 1;
            id
        };
        let mut senders = self.senders.lock().unwrap();
        senders.push((id, tx, filter));
        (id, rx)
    }

    /// Unsubscribe a subscriber
    ///
    /// Removes the subscriber's channel from the publisher.
    /// Any events in the subscriber's channel are lost when the channel is dropped.
    ///
    /// # Arguments
    ///
    /// * `id` - The subscriber ID to unsubscribe
    ///
    /// # Returns
    ///
    /// `true` if the subscriber existed and was removed, `false` if not found.
    ///
    /// # Example
    ///
    /// ```rust
    /// use sqlitegraph::backend::native::v2::pubsub::{Publisher, SubscriptionFilter};
    ///
    /// let publisher = Publisher::new();
    /// let filter = SubscriptionFilter::all();
    /// let (id, _rx) = publisher.subscribe(filter);
    ///
    /// assert!(publisher.unsubscribe(id));
    /// assert!(!publisher.unsubscribe(id)); // Already unsubscribed
    /// ```
    pub fn unsubscribe(&self, id: SubscriberId) -> bool {
        let mut senders = self.senders.lock().unwrap();
        let original_len = senders.len();
        senders.retain(|(sub_id, _, _)| *sub_id != id);
        senders.len() < original_len
    }

    /// Emit an event to all matching subscribers
    ///
    /// Iterates through all subscribers and sends the event to those whose
    /// filters match. The event is cloned for each subscriber.
    ///
    /// # Best-Effort Delivery
    ///
    /// - If a channel is full, the send fails silently
    /// - If a receiver has been dropped, the send fails silently
    /// - No blocking, no retries
    ///
    /// This ensures that a slow or dead subscriber cannot block the commit path.
    ///
    /// # Pattern-Based Subscriptions
    ///
    /// This method uses simple matching (ID-based only). For pattern-based
    /// subscriptions (kind_patterns, name_patterns), use `emit_with_metadata()`
    /// instead to provide node metadata for pattern matching.
    ///
    /// # Arguments
    ///
    /// * `event` - The event to emit
    ///
    /// # Example
    ///
    /// ```rust
    /// use sqlitegraph::backend::native::v2::pubsub::{Publisher, PubSubEvent, SubscriptionFilter};
    ///
    /// let publisher = Publisher::new();
    /// let filter = SubscriptionFilter::all();
    /// let (_id, rx) = publisher.subscribe(filter);
    ///
    /// publisher.emit(PubSubEvent::SnapshotCommitted { snapshot_id: 100 });
    /// ```
    pub fn emit(&self, event: PubSubEvent) {
        let senders = self.senders.lock().unwrap();
        for (_, sender, filter) in senders.iter() {
            // Check if event matches filter (simple matching, no pattern support)
            if filter.matches_simple(&event) {
                // Send, ignore errors (channel full/closed = best-effort)
                let _ = sender.send(event.clone());
            }
        }
    }

    /// Emit an event to all matching subscribers (with pattern support)
    ///
    /// This method supports pattern-based subscriptions by accepting node metadata
    /// for NodeChanged events. For other event types, metadata is ignored.
    ///
    /// # Pattern-Based Subscriptions
    ///
    /// When any subscriber has `kind_patterns` or `name_patterns` filters, the
    /// caller must provide node metadata for NodeChanged events. If metadata is
    /// not provided, pattern-based subscribers will not receive the event.
    ///
    /// # Arguments
    ///
    /// * `event` - The event to emit
    /// * `node_metadata` - Optional node metadata for pattern matching (only used for NodeChanged events)
    ///
    /// # Example
    ///
    /// ```rust
    /// use sqlitegraph::backend::native::v2::pubsub::{Publisher, PubSubEvent, SubscriptionFilter, NodeMetadata};
    ///
    /// let publisher = Publisher::new();
    /// let filter = SubscriptionFilter::kind_patterns(vec!["agent:*".to_string()]);
    /// let (_id, rx) = publisher.subscribe(filter);
    ///
    /// let metadata = NodeMetadata::new("agent:worker".to_string(), "agent-123".to_string());
    /// publisher.emit_with_metadata(
    ///     PubSubEvent::NodeChanged { node_id: 1, snapshot_id: 100 },
    ///     Some(&metadata)
    /// );
    /// ```
    pub fn emit_with_metadata(&self, event: PubSubEvent, node_metadata: Option<&NodeMetadata>) {
        let senders = self.senders.lock().unwrap();
        for (_, sender, filter) in senders.iter() {
            // Check if event matches filter (with pattern support)
            if filter.matches(&event, node_metadata) {
                // Send, ignore errors (channel full/closed = best-effort)
                let _ = sender.send(event.clone());
            }
        }
    }

    /// Get current subscriber count
    ///
    /// Returns the number of active subscribers.
    ///
    /// # Example
    ///
    /// ```rust
    /// use sqlitegraph::backend::native::v2::pubsub::{Publisher, SubscriptionFilter};
    ///
    /// let publisher = Publisher::new();
    /// assert_eq!(publisher.subscriber_count(), 0);
    ///
    /// let (_id1, _rx1) = publisher.subscribe(SubscriptionFilter::all());
    /// assert_eq!(publisher.subscriber_count(), 1);
    ///
    /// let (_id2, _rx2) = publisher.subscribe(SubscriptionFilter::all());
    /// assert_eq!(publisher.subscriber_count(), 2);
    /// ```
    pub fn subscriber_count(&self) -> usize {
        self.senders.lock().unwrap().len()
    }
}

impl Default for Publisher {
    fn default() -> Self {
        Self::new()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::backend::native::v2::pubsub::{PubSubEvent, PubSubEventType, SubscriptionFilter};

    #[test]
    fn test_publisher_creation() {
        let pubber = Publisher::new();
        assert_eq!(pubber.subscriber_count(), 0);
    }

    #[test]
    fn test_default_publisher() {
        let pubber = Publisher::default();
        assert_eq!(pubber.subscriber_count(), 0);
    }

    #[test]
    fn test_subscribe_unsubscribe() {
        let pubber = Publisher::new();
        let filter = SubscriptionFilter::all();

        // Subscribe
        let (id, _rx) = pubber.subscribe(filter.clone());
        assert_eq!(pubber.subscriber_count(), 1);

        // Unsubscribe
        assert!(pubber.unsubscribe(id));
        assert_eq!(pubber.subscriber_count(), 0);

        // Unsubscribe non-existent
        assert!(!pubber.unsubscribe(id));
    }

    #[test]
    fn test_multiple_subscribers() {
        let pubber = Publisher::new();
        let filter = SubscriptionFilter::all();

        let (_id1, _rx1) = pubber.subscribe(filter.clone());
        assert_eq!(pubber.subscriber_count(), 1);

        let (_id2, _rx2) = pubber.subscribe(filter.clone());
        assert_eq!(pubber.subscriber_count(), 2);

        let (_id3, _rx3) = pubber.subscribe(filter);
        assert_eq!(pubber.subscriber_count(), 3);
    }

    #[test]
    fn test_emit_to_single_subscriber() {
        let pubber = Publisher::new();
        let filter = SubscriptionFilter::all();
        let (_id, rx) = pubber.subscribe(filter);

        pubber.emit(PubSubEvent::SnapshotCommitted { snapshot_id: 100 });

        let received = rx.recv().unwrap();
        assert_eq!(received.snapshot_id(), 100);
    }

    #[test]
    fn test_emit_to_multiple_subscribers() {
        let pubber = Publisher::new();
        let filter = SubscriptionFilter::all();

        let (_id1, rx1) = pubber.subscribe(filter.clone());
        let (_id2, rx2) = pubber.subscribe(filter);

        pubber.emit(PubSubEvent::SnapshotCommitted { snapshot_id: 200 });

        assert_eq!(rx1.recv().unwrap().snapshot_id(), 200);
        assert_eq!(rx2.recv().unwrap().snapshot_id(), 200);
    }

    #[test]
    fn test_filter_by_event_type() {
        let pubber = Publisher::new();
        let node_filter = SubscriptionFilter::event_types(vec![PubSubEventType::Node]);
        let (_id, rx) = pubber.subscribe(node_filter);

        pubber.emit(PubSubEvent::NodeChanged {
            node_id: 1,
            snapshot_id: 100,
        });
        pubber.emit(PubSubEvent::EdgeChanged {
            edge_id: 1,
            snapshot_id: 100,
        });

        // Should receive NodeChanged but not EdgeChanged
        let received = rx.recv().unwrap();
        assert!(received.is_node_event());

        // Channel should be empty (EdgeChanged was filtered)
        let timeout = std::time::Duration::from_millis(100);
        let result = rx.recv_timeout(timeout);
        assert!(result.is_err()); // Timeout = no more events
    }

    #[test]
    fn test_filter_by_specific_node() {
        let pubber = Publisher::new();
        let filter = SubscriptionFilter::nodes(vec![42, 43]);
        let (_id, rx) = pubber.subscribe(filter);

        pubber.emit(PubSubEvent::NodeChanged {
            node_id: 42,
            snapshot_id: 100,
        });
        pubber.emit(PubSubEvent::NodeChanged {
            node_id: 99,
            snapshot_id: 101,
        });

        // Should receive node 42 but not node 99
        let received = rx.recv().unwrap();
        assert_eq!(received.snapshot_id(), 100);

        // Channel should be empty (node 99 was filtered)
        let timeout = std::time::Duration::from_millis(100);
        let result = rx.recv_timeout(timeout);
        assert!(result.is_err());
    }

    #[test]
    fn test_best_effort_delivery() {
        let pubber = Publisher::new();
        let filter = SubscriptionFilter::all();
        let (_id, rx) = pubber.subscribe(filter);

        // Drop receiver - next send should not block/fail
        drop(rx);

        // This should not panic (best-effort)
        pubber.emit(PubSubEvent::SnapshotCommitted { snapshot_id: 100 });
    }

    #[test]
    fn test_mixed_filters() {
        let pubber = Publisher::new();

        // Subscriber 1: Only node events
        let node_filter = SubscriptionFilter::event_types(vec![PubSubEventType::Node]);
        let (_id1, rx1) = pubber.subscribe(node_filter);

        // Subscriber 2: Only edge events
        let edge_filter = SubscriptionFilter::event_types(vec![PubSubEventType::Edge]);
        let (_id2, rx2) = pubber.subscribe(edge_filter);

        // Subscriber 3: All events
        let all_filter = SubscriptionFilter::all();
        let (_id3, rx3) = pubber.subscribe(all_filter);

        pubber.emit(PubSubEvent::NodeChanged {
            node_id: 1,
            snapshot_id: 100,
        });
        pubber.emit(PubSubEvent::EdgeChanged {
            edge_id: 1,
            snapshot_id: 100,
        });

        // Subscriber 1 should only receive node event
        assert!(rx1.recv().unwrap().is_node_event());
        assert!(
            rx1.recv_timeout(std::time::Duration::from_millis(100))
                .is_err()
        );

        // Subscriber 2 should only receive edge event
        assert!(rx2.recv().unwrap().is_edge_event());
        assert!(
            rx2.recv_timeout(std::time::Duration::from_millis(100))
                .is_err()
        );

        // Subscriber 3 should receive both events
        assert!(rx3.recv().unwrap().is_node_event());
        assert!(rx3.recv().unwrap().is_edge_event());
    }
}