Skip to main content

sqry_core/graph/unified/concurrent/
channel.rs

1//! `UpdateChannel`: Single-writer serialization for graph updates.
2//!
3//! This module implements a channel-based write serialization pattern
4//! that ensures all graph mutations are processed by a single writer thread.
5//!
6//! # Design
7//!
8//! Multiple producers can send update commands through the channel, but a
9//! single consumer thread processes them sequentially. This pattern:
10//!
11//! - Eliminates write contention on the graph
12//! - Guarantees FIFO ordering of updates
13//! - Enables batching of updates for efficiency
14//! - Simplifies reasoning about concurrent modifications
15//!
16//! # Usage
17//!
18//! ```rust,ignore
19//! use sqry_core::graph::unified::concurrent::channel::{UpdateChannel, GraphUpdate};
20//!
21//! let (channel, receiver) = UpdateChannel::new();
22//!
23//! // Send updates from multiple threads
24//! channel.send(GraphUpdate::AddNode { ... })?;
25//!
26//! // Process updates in single writer thread
27//! while let Ok(update) = receiver.recv() {
28//!     graph.apply(update);
29//! }
30//! ```
31
32use std::sync::Arc;
33use std::sync::mpsc::{self, Receiver, RecvError, SendError, Sender, TryRecvError};
34
35use super::super::edge::kind::EdgeKind;
36use super::super::file::FileId;
37use super::super::node::{NodeId, NodeKind};
38
39/// Graph update operations that can be sent through the channel.
40#[derive(Debug, Clone)]
41pub enum GraphUpdate {
42    /// Add a new node to the graph.
43    AddNode {
44        /// The kind of node to add.
45        kind: NodeKind,
46        /// Name of the node (will be interned).
47        name: String,
48        /// File the node belongs to.
49        file: FileId,
50    },
51
52    /// Remove a node from the graph.
53    RemoveNode {
54        /// ID of the node to remove.
55        node: NodeId,
56    },
57
58    /// Add an edge between nodes.
59    AddEdge {
60        /// Source node of the edge.
61        source: NodeId,
62        /// Target node of the edge.
63        target: NodeId,
64        /// Kind of relationship.
65        kind: EdgeKind,
66        /// File where the edge is defined.
67        file: FileId,
68    },
69
70    /// Remove an edge between nodes.
71    RemoveEdge {
72        /// Source node of the edge.
73        source: NodeId,
74        /// Target node of the edge.
75        target: NodeId,
76        /// Kind of relationship.
77        kind: EdgeKind,
78        /// File where the edge was defined.
79        file: FileId,
80    },
81
82    /// Clear all data for a specific file.
83    ClearFile {
84        /// File to clear.
85        file: FileId,
86    },
87
88    /// Trigger compaction of the delta buffer.
89    TriggerCompaction,
90
91    /// Shutdown the update processor.
92    Shutdown,
93}
94
95/// Error type for update channel operations.
96#[derive(Debug, Clone, PartialEq, Eq)]
97pub enum ChannelError {
98    /// The channel has been disconnected.
99    Disconnected,
100    /// The channel is full (for bounded channels).
101    Full,
102}
103
104impl std::fmt::Display for ChannelError {
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        match self {
107            Self::Disconnected => write!(f, "channel disconnected"),
108            Self::Full => write!(f, "channel full"),
109        }
110    }
111}
112
113impl std::error::Error for ChannelError {}
114
115impl<T> From<SendError<T>> for ChannelError {
116    fn from(_: SendError<T>) -> Self {
117        Self::Disconnected
118    }
119}
120
121impl From<RecvError> for ChannelError {
122    fn from(_: RecvError) -> Self {
123        Self::Disconnected
124    }
125}
126
127/// Handle for sending graph updates through the channel.
128///
129/// This handle is cheaply cloneable and can be shared across threads.
130/// All updates sent through any clone will be serialized by the receiver.
131#[derive(Debug, Clone)]
132pub struct UpdateChannel {
133    sender: Sender<GraphUpdate>,
134    /// Counter for tracking sent updates (for testing/debugging).
135    updates_sent: Arc<std::sync::atomic::AtomicU64>,
136}
137
138impl UpdateChannel {
139    /// Creates a new update channel pair.
140    ///
141    /// Returns the sender handle and receiver. The receiver should be
142    /// processed by a single thread to ensure serialization.
143    #[must_use]
144    pub fn new() -> (Self, UpdateReceiver) {
145        let (sender, receiver) = mpsc::channel();
146        let updates_sent = Arc::new(std::sync::atomic::AtomicU64::new(0));
147        let updates_received = Arc::new(std::sync::atomic::AtomicU64::new(0));
148
149        (
150            Self {
151                sender,
152                updates_sent: Arc::clone(&updates_sent),
153            },
154            UpdateReceiver {
155                receiver,
156                updates_received,
157            },
158        )
159    }
160
161    /// Creates a bounded update channel pair.
162    ///
163    /// A bounded channel will block senders when the buffer is full,
164    /// providing natural back-pressure.
165    #[must_use]
166    pub fn bounded(capacity: usize) -> (Self, UpdateReceiver) {
167        let (sender, receiver) = mpsc::sync_channel(capacity);
168        let updates_sent = Arc::new(std::sync::atomic::AtomicU64::new(0));
169        let updates_received = Arc::new(std::sync::atomic::AtomicU64::new(0));
170
171        (
172            Self {
173                sender: {
174                    // Convert sync_channel sender to regular sender interface
175                    // by wrapping in a new unbounded channel that forwards
176                    let (tx, rx) = mpsc::channel();
177                    std::thread::spawn(move || {
178                        while let Ok(update) = rx.recv() {
179                            if sender.send(update).is_err() {
180                                break;
181                            }
182                        }
183                    });
184                    tx
185                },
186                updates_sent: Arc::clone(&updates_sent),
187            },
188            UpdateReceiver {
189                receiver,
190                updates_received,
191            },
192        )
193    }
194
195    /// Sends an update through the channel.
196    ///
197    /// # Errors
198    ///
199    /// Returns `ChannelError::Disconnected` if the receiver has been dropped.
200    pub fn send(&self, update: GraphUpdate) -> Result<(), ChannelError> {
201        self.sender.send(update)?;
202        self.updates_sent
203            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
204        Ok(())
205    }
206
207    /// Returns the number of updates sent through this channel.
208    #[must_use]
209    pub fn updates_sent(&self) -> u64 {
210        self.updates_sent.load(std::sync::atomic::Ordering::Relaxed)
211    }
212}
213
214impl Default for UpdateChannel {
215    fn default() -> Self {
216        Self::new().0
217    }
218}
219
220/// Receiver for processing graph updates.
221///
222/// This should be owned by a single thread that processes updates
223/// sequentially to ensure serialization.
224#[derive(Debug)]
225pub struct UpdateReceiver {
226    receiver: Receiver<GraphUpdate>,
227    /// Counter for tracking received updates.
228    updates_received: Arc<std::sync::atomic::AtomicU64>,
229}
230
231impl UpdateReceiver {
232    /// Receives the next update, blocking until one is available.
233    ///
234    /// # Errors
235    ///
236    /// Returns `ChannelError::Disconnected` if all senders have been dropped.
237    pub fn recv(&self) -> Result<GraphUpdate, ChannelError> {
238        let update = self.receiver.recv()?;
239        self.updates_received
240            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
241        Ok(update)
242    }
243
244    /// Tries to receive the next update without blocking.
245    ///
246    /// Returns `None` if no update is currently available.
247    ///
248    /// # Errors
249    ///
250    /// Returns `ChannelError::Disconnected` if all senders have been dropped.
251    pub fn try_recv(&self) -> Result<Option<GraphUpdate>, ChannelError> {
252        match self.receiver.try_recv() {
253            Ok(update) => {
254                self.updates_received
255                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
256                Ok(Some(update))
257            }
258            Err(TryRecvError::Empty) => Ok(None),
259            Err(TryRecvError::Disconnected) => Err(ChannelError::Disconnected),
260        }
261    }
262
263    /// Returns an iterator over available updates.
264    ///
265    /// The iterator will block waiting for updates and complete
266    /// when all senders are dropped.
267    pub fn iter(&self) -> impl Iterator<Item = GraphUpdate> + '_ {
268        std::iter::from_fn(|| self.recv().ok())
269    }
270
271    /// Returns the number of updates received through this channel.
272    #[must_use]
273    pub fn updates_received(&self) -> u64 {
274        self.updates_received
275            .load(std::sync::atomic::Ordering::Relaxed)
276    }
277}
278
279/// Stats for monitoring channel throughput.
280#[derive(Debug, Clone, Copy, PartialEq, Eq)]
281pub struct ChannelStats {
282    /// Number of updates sent.
283    pub sent: u64,
284    /// Number of updates received.
285    pub received: u64,
286}
287
288impl ChannelStats {
289    /// Returns the number of updates in flight (sent but not yet received).
290    #[must_use]
291    pub fn in_flight(&self) -> u64 {
292        self.sent.saturating_sub(self.received)
293    }
294}
295
296#[cfg(test)]
297mod tests {
298    use super::*;
299    use std::thread;
300    use std::time::Duration;
301
302    #[test]
303    fn test_channel_new() {
304        let (sender, _receiver) = UpdateChannel::new();
305        assert_eq!(sender.updates_sent(), 0);
306    }
307
308    #[test]
309    fn test_channel_default() {
310        let sender: UpdateChannel = UpdateChannel::default();
311        assert_eq!(sender.updates_sent(), 0);
312    }
313
314    #[test]
315    fn test_send_receive() {
316        let (sender, receiver) = UpdateChannel::new();
317
318        let file = FileId::new(1);
319        sender
320            .send(GraphUpdate::ClearFile { file })
321            .expect("send failed");
322
323        let update = receiver.recv().expect("recv failed");
324        match update {
325            GraphUpdate::ClearFile { file: f } => assert_eq!(f, file),
326            _ => panic!("wrong update type"),
327        }
328    }
329
330    #[test]
331    fn test_updates_serialized() {
332        let (sender, receiver) = UpdateChannel::new();
333        let sender_clone = sender.clone();
334
335        // Send from multiple threads
336        let handle1 = thread::spawn(move || {
337            for i in 0..100 {
338                let file = FileId::new(i);
339                sender.send(GraphUpdate::ClearFile { file }).unwrap();
340            }
341        });
342
343        let handle2 = thread::spawn(move || {
344            for i in 100..200 {
345                let file = FileId::new(i);
346                sender_clone.send(GraphUpdate::ClearFile { file }).unwrap();
347            }
348        });
349
350        handle1.join().unwrap();
351        handle2.join().unwrap();
352
353        // All updates should be received
354        let mut received_updates = Vec::new();
355        while let Ok(Some(update)) = receiver.try_recv() {
356            received_updates.push(update);
357        }
358
359        assert_eq!(received_updates.len(), 200);
360        assert_eq!(receiver.updates_received(), 200);
361    }
362
363    #[test]
364    fn test_channel_ordering() {
365        let (sender, receiver) = UpdateChannel::new();
366
367        // Send updates in specific order
368        for i in 0..100 {
369            let file = FileId::new(i);
370            sender.send(GraphUpdate::ClearFile { file }).unwrap();
371        }
372
373        // Receive and verify FIFO order
374        for i in 0..100 {
375            let update = receiver.recv().unwrap();
376            match update {
377                GraphUpdate::ClearFile { file } => {
378                    assert_eq!(file.index(), i);
379                }
380                _ => panic!("wrong update type"),
381            }
382        }
383    }
384
385    #[test]
386    fn test_try_recv_empty() {
387        let (sender, receiver) = UpdateChannel::new();
388        // Keep sender alive so channel isn't disconnected
389        let _sender = sender;
390        assert!(receiver.try_recv().unwrap().is_none());
391    }
392
393    #[test]
394    fn test_try_recv_available() {
395        let (sender, receiver) = UpdateChannel::new();
396
397        let file = FileId::new(42);
398        sender.send(GraphUpdate::ClearFile { file }).unwrap();
399
400        let result = receiver.try_recv().unwrap();
401        assert!(result.is_some());
402    }
403
404    #[test]
405    fn test_disconnected_sender() {
406        let (sender, receiver) = UpdateChannel::new();
407        drop(sender);
408
409        let result = receiver.recv();
410        assert!(matches!(result, Err(ChannelError::Disconnected)));
411    }
412
413    #[test]
414    fn test_disconnected_receiver() {
415        let (sender, receiver) = UpdateChannel::new();
416        drop(receiver);
417
418        let file = FileId::new(1);
419        let result = sender.send(GraphUpdate::ClearFile { file });
420        assert!(matches!(result, Err(ChannelError::Disconnected)));
421    }
422
423    #[test]
424    fn test_update_kinds() {
425        let (sender, receiver) = UpdateChannel::new();
426
427        // Test all update types
428        sender
429            .send(GraphUpdate::AddNode {
430                kind: NodeKind::Function,
431                name: "test".to_string(),
432                file: FileId::new(1),
433            })
434            .unwrap();
435
436        sender
437            .send(GraphUpdate::RemoveNode {
438                node: NodeId::new(1, 0),
439            })
440            .unwrap();
441
442        sender
443            .send(GraphUpdate::AddEdge {
444                source: NodeId::new(1, 0),
445                target: NodeId::new(2, 0),
446                kind: EdgeKind::Calls {
447                    argument_count: 0,
448                    is_async: false,
449                },
450                file: FileId::new(1),
451            })
452            .unwrap();
453
454        sender
455            .send(GraphUpdate::RemoveEdge {
456                source: NodeId::new(1, 0),
457                target: NodeId::new(2, 0),
458                kind: EdgeKind::Calls {
459                    argument_count: 0,
460                    is_async: false,
461                },
462                file: FileId::new(1),
463            })
464            .unwrap();
465
466        sender
467            .send(GraphUpdate::ClearFile {
468                file: FileId::new(1),
469            })
470            .unwrap();
471
472        sender.send(GraphUpdate::TriggerCompaction).unwrap();
473        sender.send(GraphUpdate::Shutdown).unwrap();
474
475        assert_eq!(sender.updates_sent(), 7);
476
477        // Verify all received
478        let mut count = 0;
479        while receiver.try_recv().unwrap().is_some() {
480            count += 1;
481        }
482        assert_eq!(count, 7);
483    }
484
485    #[test]
486    fn test_channel_stats() {
487        let stats = ChannelStats {
488            sent: 100,
489            received: 75,
490        };
491        assert_eq!(stats.in_flight(), 25);
492    }
493
494    #[test]
495    fn test_channel_stats_saturating() {
496        // Edge case: received > sent (shouldn't happen, but handle gracefully)
497        let stats = ChannelStats {
498            sent: 50,
499            received: 75,
500        };
501        assert_eq!(stats.in_flight(), 0);
502    }
503
504    #[test]
505    fn test_iter() {
506        let (sender, receiver) = UpdateChannel::new();
507
508        for i in 0..10 {
509            sender
510                .send(GraphUpdate::ClearFile {
511                    file: FileId::new(i),
512                })
513                .unwrap();
514        }
515        drop(sender);
516
517        let updates: Vec<_> = receiver.iter().collect();
518        assert_eq!(updates.len(), 10);
519    }
520
521    #[test]
522    fn test_clone_sender() {
523        let (sender, receiver) = UpdateChannel::new();
524        let sender2 = sender.clone();
525
526        sender
527            .send(GraphUpdate::ClearFile {
528                file: FileId::new(1),
529            })
530            .unwrap();
531        sender2
532            .send(GraphUpdate::ClearFile {
533                file: FileId::new(2),
534            })
535            .unwrap();
536
537        // Both senders share the same counter
538        assert_eq!(sender.updates_sent(), 2);
539        assert_eq!(sender2.updates_sent(), 2);
540
541        let mut count = 0;
542        while receiver.try_recv().unwrap().is_some() {
543            count += 1;
544        }
545        assert_eq!(count, 2);
546    }
547
548    #[test]
549    fn test_channel_error_display() {
550        let err = ChannelError::Disconnected;
551        assert_eq!(format!("{err}"), "channel disconnected");
552
553        let err = ChannelError::Full;
554        assert_eq!(format!("{err}"), "channel full");
555    }
556
557    #[test]
558    fn test_concurrent_send_receive() {
559        let (sender, receiver) = UpdateChannel::new();
560        let sender_clone = sender.clone();
561
562        // Producer thread
563        let producer = thread::spawn(move || {
564            for i in 0..1000 {
565                sender
566                    .send(GraphUpdate::ClearFile {
567                        file: FileId::new(i),
568                    })
569                    .unwrap();
570            }
571        });
572
573        // Second producer
574        let producer2 = thread::spawn(move || {
575            for i in 1000..2000 {
576                sender_clone
577                    .send(GraphUpdate::ClearFile {
578                        file: FileId::new(i),
579                    })
580                    .unwrap();
581            }
582        });
583
584        // Consumer thread
585        let consumer = thread::spawn(move || {
586            let mut count = 0u64;
587            loop {
588                match receiver.try_recv() {
589                    Ok(Some(_)) => count += 1,
590                    Ok(None) => {
591                        if count >= 2000 {
592                            break;
593                        }
594                        thread::sleep(Duration::from_micros(10));
595                    }
596                    Err(ChannelError::Disconnected) => break,
597                    Err(_) => {}
598                }
599            }
600            count
601        });
602
603        producer.join().unwrap();
604        producer2.join().unwrap();
605        let received_count = consumer.join().unwrap();
606
607        assert_eq!(received_count, 2000);
608    }
609}