Skip to main content

sqry_core/graph/unified/concurrent/
channel.rs

1//! `UpdateChannel`: Single-writer serialization for graph updates.
2//!
3//! This module implements a channel-based write serialization pattern
4//! that ensures all graph mutations are processed by a single writer thread.
5//!
6//! # Design
7//!
8//! Multiple producers can send update commands through the channel, but a
9//! single consumer thread processes them sequentially. This pattern:
10//!
11//! - Eliminates write contention on the graph
12//! - Guarantees FIFO ordering of updates
13//! - Enables batching of updates for efficiency
14//! - Simplifies reasoning about concurrent modifications
15//!
16//! # Usage
17//!
18//! ```rust,ignore
19//! use sqry_core::graph::unified::concurrent::channel::{UpdateChannel, GraphUpdate};
20//!
21//! let (channel, receiver) = UpdateChannel::new();
22//!
23//! // Send updates from multiple threads
24//! channel.send(GraphUpdate::AddNode { ... })?;
25//!
26//! // Process updates in single writer thread
27//! while let Ok(update) = receiver.recv() {
28//!     graph.apply(update);
29//! }
30//! ```
31
32use std::sync::Arc;
33use std::sync::mpsc::{self, Receiver, RecvError, SendError, Sender, TryRecvError};
34
35use super::super::edge::kind::EdgeKind;
36#[cfg(test)]
37use super::super::edge::kind::ResolvedVia;
38use super::super::file::FileId;
39use super::super::node::{NodeId, NodeKind};
40
41/// Graph update operations that can be sent through the channel.
42#[derive(Debug, Clone)]
43pub enum GraphUpdate {
44    /// Add a new node to the graph.
45    AddNode {
46        /// The kind of node to add.
47        kind: NodeKind,
48        /// Name of the node (will be interned).
49        name: String,
50        /// File the node belongs to.
51        file: FileId,
52    },
53
54    /// Remove a node from the graph.
55    RemoveNode {
56        /// ID of the node to remove.
57        node: NodeId,
58    },
59
60    /// Add an edge between nodes.
61    AddEdge {
62        /// Source node of the edge.
63        source: NodeId,
64        /// Target node of the edge.
65        target: NodeId,
66        /// Kind of relationship.
67        kind: EdgeKind,
68        /// File where the edge is defined.
69        file: FileId,
70    },
71
72    /// Remove an edge between nodes.
73    RemoveEdge {
74        /// Source node of the edge.
75        source: NodeId,
76        /// Target node of the edge.
77        target: NodeId,
78        /// Kind of relationship.
79        kind: EdgeKind,
80        /// File where the edge was defined.
81        file: FileId,
82    },
83
84    /// Clear all data for a specific file.
85    ClearFile {
86        /// File to clear.
87        file: FileId,
88    },
89
90    /// Trigger compaction of the delta buffer.
91    TriggerCompaction,
92
93    /// Shutdown the update processor.
94    Shutdown,
95}
96
97/// Error type for update channel operations.
98#[derive(Debug, Clone, PartialEq, Eq)]
99pub enum ChannelError {
100    /// The channel has been disconnected.
101    Disconnected,
102    /// The channel is full (for bounded channels).
103    Full,
104}
105
106impl std::fmt::Display for ChannelError {
107    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108        match self {
109            Self::Disconnected => write!(f, "channel disconnected"),
110            Self::Full => write!(f, "channel full"),
111        }
112    }
113}
114
115impl std::error::Error for ChannelError {}
116
117impl<T> From<SendError<T>> for ChannelError {
118    fn from(_: SendError<T>) -> Self {
119        Self::Disconnected
120    }
121}
122
123impl From<RecvError> for ChannelError {
124    fn from(_: RecvError) -> Self {
125        Self::Disconnected
126    }
127}
128
129/// Handle for sending graph updates through the channel.
130///
131/// This handle is cheaply cloneable and can be shared across threads.
132/// All updates sent through any clone will be serialized by the receiver.
133#[derive(Debug, Clone)]
134pub struct UpdateChannel {
135    sender: Sender<GraphUpdate>,
136    /// Counter for tracking sent updates (for testing/debugging).
137    updates_sent: Arc<std::sync::atomic::AtomicU64>,
138}
139
140impl UpdateChannel {
141    /// Creates a new update channel pair.
142    ///
143    /// Returns the sender handle and receiver. The receiver should be
144    /// processed by a single thread to ensure serialization.
145    #[must_use]
146    pub fn new() -> (Self, UpdateReceiver) {
147        let (sender, receiver) = mpsc::channel();
148        let updates_sent = Arc::new(std::sync::atomic::AtomicU64::new(0));
149        let updates_received = Arc::new(std::sync::atomic::AtomicU64::new(0));
150
151        (
152            Self {
153                sender,
154                updates_sent: Arc::clone(&updates_sent),
155            },
156            UpdateReceiver {
157                receiver,
158                updates_received,
159            },
160        )
161    }
162
163    /// Creates a bounded update channel pair.
164    ///
165    /// A bounded channel will block senders when the buffer is full,
166    /// providing natural back-pressure.
167    #[must_use]
168    pub fn bounded(capacity: usize) -> (Self, UpdateReceiver) {
169        let (sender, receiver) = mpsc::sync_channel(capacity);
170        let updates_sent = Arc::new(std::sync::atomic::AtomicU64::new(0));
171        let updates_received = Arc::new(std::sync::atomic::AtomicU64::new(0));
172
173        (
174            Self {
175                sender: {
176                    // Convert sync_channel sender to regular sender interface
177                    // by wrapping in a new unbounded channel that forwards
178                    let (tx, rx) = mpsc::channel();
179                    std::thread::spawn(move || {
180                        while let Ok(update) = rx.recv() {
181                            if sender.send(update).is_err() {
182                                break;
183                            }
184                        }
185                    });
186                    tx
187                },
188                updates_sent: Arc::clone(&updates_sent),
189            },
190            UpdateReceiver {
191                receiver,
192                updates_received,
193            },
194        )
195    }
196
197    /// Sends an update through the channel.
198    ///
199    /// # Errors
200    ///
201    /// Returns `ChannelError::Disconnected` if the receiver has been dropped.
202    pub fn send(&self, update: GraphUpdate) -> Result<(), ChannelError> {
203        self.sender.send(update)?;
204        self.updates_sent
205            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
206        Ok(())
207    }
208
209    /// Returns the number of updates sent through this channel.
210    #[must_use]
211    pub fn updates_sent(&self) -> u64 {
212        self.updates_sent.load(std::sync::atomic::Ordering::Relaxed)
213    }
214}
215
216impl Default for UpdateChannel {
217    fn default() -> Self {
218        Self::new().0
219    }
220}
221
222/// Receiver for processing graph updates.
223///
224/// This should be owned by a single thread that processes updates
225/// sequentially to ensure serialization.
226#[derive(Debug)]
227pub struct UpdateReceiver {
228    receiver: Receiver<GraphUpdate>,
229    /// Counter for tracking received updates.
230    updates_received: Arc<std::sync::atomic::AtomicU64>,
231}
232
233impl UpdateReceiver {
234    /// Receives the next update, blocking until one is available.
235    ///
236    /// # Errors
237    ///
238    /// Returns `ChannelError::Disconnected` if all senders have been dropped.
239    pub fn recv(&self) -> Result<GraphUpdate, ChannelError> {
240        let update = self.receiver.recv()?;
241        self.updates_received
242            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
243        Ok(update)
244    }
245
246    /// Tries to receive the next update without blocking.
247    ///
248    /// Returns `None` if no update is currently available.
249    ///
250    /// # Errors
251    ///
252    /// Returns `ChannelError::Disconnected` if all senders have been dropped.
253    pub fn try_recv(&self) -> Result<Option<GraphUpdate>, ChannelError> {
254        match self.receiver.try_recv() {
255            Ok(update) => {
256                self.updates_received
257                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
258                Ok(Some(update))
259            }
260            Err(TryRecvError::Empty) => Ok(None),
261            Err(TryRecvError::Disconnected) => Err(ChannelError::Disconnected),
262        }
263    }
264
265    /// Returns an iterator over available updates.
266    ///
267    /// The iterator will block waiting for updates and complete
268    /// when all senders are dropped.
269    pub fn iter(&self) -> impl Iterator<Item = GraphUpdate> + '_ {
270        std::iter::from_fn(|| self.recv().ok())
271    }
272
273    /// Returns the number of updates received through this channel.
274    #[must_use]
275    pub fn updates_received(&self) -> u64 {
276        self.updates_received
277            .load(std::sync::atomic::Ordering::Relaxed)
278    }
279}
280
281/// Stats for monitoring channel throughput.
282#[derive(Debug, Clone, Copy, PartialEq, Eq)]
283pub struct ChannelStats {
284    /// Number of updates sent.
285    pub sent: u64,
286    /// Number of updates received.
287    pub received: u64,
288}
289
290impl ChannelStats {
291    /// Returns the number of updates in flight (sent but not yet received).
292    #[must_use]
293    pub fn in_flight(&self) -> u64 {
294        self.sent.saturating_sub(self.received)
295    }
296}
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301    use std::thread;
302    use std::time::Duration;
303
304    #[test]
305    fn test_channel_new() {
306        let (sender, _receiver) = UpdateChannel::new();
307        assert_eq!(sender.updates_sent(), 0);
308    }
309
310    #[test]
311    fn test_channel_default() {
312        let sender: UpdateChannel = UpdateChannel::default();
313        assert_eq!(sender.updates_sent(), 0);
314    }
315
316    #[test]
317    fn test_send_receive() {
318        let (sender, receiver) = UpdateChannel::new();
319
320        let file = FileId::new(1);
321        sender
322            .send(GraphUpdate::ClearFile { file })
323            .expect("send failed");
324
325        let update = receiver.recv().expect("recv failed");
326        match update {
327            GraphUpdate::ClearFile { file: f } => assert_eq!(f, file),
328            _ => panic!("wrong update type"),
329        }
330    }
331
332    #[test]
333    fn test_updates_serialized() {
334        let (sender, receiver) = UpdateChannel::new();
335        let sender_clone = sender.clone();
336
337        // Send from multiple threads
338        let handle1 = thread::spawn(move || {
339            for i in 0..100 {
340                let file = FileId::new(i);
341                sender.send(GraphUpdate::ClearFile { file }).unwrap();
342            }
343        });
344
345        let handle2 = thread::spawn(move || {
346            for i in 100..200 {
347                let file = FileId::new(i);
348                sender_clone.send(GraphUpdate::ClearFile { file }).unwrap();
349            }
350        });
351
352        handle1.join().unwrap();
353        handle2.join().unwrap();
354
355        // All updates should be received
356        let mut received_updates = Vec::new();
357        while let Ok(Some(update)) = receiver.try_recv() {
358            received_updates.push(update);
359        }
360
361        assert_eq!(received_updates.len(), 200);
362        assert_eq!(receiver.updates_received(), 200);
363    }
364
365    #[test]
366    fn test_channel_ordering() {
367        let (sender, receiver) = UpdateChannel::new();
368
369        // Send updates in specific order
370        for i in 0..100 {
371            let file = FileId::new(i);
372            sender.send(GraphUpdate::ClearFile { file }).unwrap();
373        }
374
375        // Receive and verify FIFO order
376        for i in 0..100 {
377            let update = receiver.recv().unwrap();
378            match update {
379                GraphUpdate::ClearFile { file } => {
380                    assert_eq!(file.index(), i);
381                }
382                _ => panic!("wrong update type"),
383            }
384        }
385    }
386
387    #[test]
388    fn test_try_recv_empty() {
389        let (sender, receiver) = UpdateChannel::new();
390        // Keep sender alive so channel isn't disconnected
391        let _sender = sender;
392        assert!(receiver.try_recv().unwrap().is_none());
393    }
394
395    #[test]
396    fn test_try_recv_available() {
397        let (sender, receiver) = UpdateChannel::new();
398
399        let file = FileId::new(42);
400        sender.send(GraphUpdate::ClearFile { file }).unwrap();
401
402        let result = receiver.try_recv().unwrap();
403        assert!(result.is_some());
404    }
405
406    #[test]
407    fn test_disconnected_sender() {
408        let (sender, receiver) = UpdateChannel::new();
409        drop(sender);
410
411        let result = receiver.recv();
412        assert!(matches!(result, Err(ChannelError::Disconnected)));
413    }
414
415    #[test]
416    fn test_disconnected_receiver() {
417        let (sender, receiver) = UpdateChannel::new();
418        drop(receiver);
419
420        let file = FileId::new(1);
421        let result = sender.send(GraphUpdate::ClearFile { file });
422        assert!(matches!(result, Err(ChannelError::Disconnected)));
423    }
424
425    #[test]
426    fn test_update_kinds() {
427        let (sender, receiver) = UpdateChannel::new();
428
429        // Test all update types
430        sender
431            .send(GraphUpdate::AddNode {
432                kind: NodeKind::Function,
433                name: "test".to_string(),
434                file: FileId::new(1),
435            })
436            .unwrap();
437
438        sender
439            .send(GraphUpdate::RemoveNode {
440                node: NodeId::new(1, 0),
441            })
442            .unwrap();
443
444        sender
445            .send(GraphUpdate::AddEdge {
446                source: NodeId::new(1, 0),
447                target: NodeId::new(2, 0),
448                kind: EdgeKind::Calls {
449                    argument_count: 0,
450                    is_async: false,
451                    resolved_via: ResolvedVia::Direct,
452                },
453                file: FileId::new(1),
454            })
455            .unwrap();
456
457        sender
458            .send(GraphUpdate::RemoveEdge {
459                source: NodeId::new(1, 0),
460                target: NodeId::new(2, 0),
461                kind: EdgeKind::Calls {
462                    argument_count: 0,
463                    is_async: false,
464                    resolved_via: ResolvedVia::Direct,
465                },
466                file: FileId::new(1),
467            })
468            .unwrap();
469
470        sender
471            .send(GraphUpdate::ClearFile {
472                file: FileId::new(1),
473            })
474            .unwrap();
475
476        sender.send(GraphUpdate::TriggerCompaction).unwrap();
477        sender.send(GraphUpdate::Shutdown).unwrap();
478
479        assert_eq!(sender.updates_sent(), 7);
480
481        // Verify all received
482        let mut count = 0;
483        while receiver.try_recv().unwrap().is_some() {
484            count += 1;
485        }
486        assert_eq!(count, 7);
487    }
488
489    #[test]
490    fn test_channel_stats() {
491        let stats = ChannelStats {
492            sent: 100,
493            received: 75,
494        };
495        assert_eq!(stats.in_flight(), 25);
496    }
497
498    #[test]
499    fn test_channel_stats_saturating() {
500        // Edge case: received > sent (shouldn't happen, but handle gracefully)
501        let stats = ChannelStats {
502            sent: 50,
503            received: 75,
504        };
505        assert_eq!(stats.in_flight(), 0);
506    }
507
508    #[test]
509    fn test_iter() {
510        let (sender, receiver) = UpdateChannel::new();
511
512        for i in 0..10 {
513            sender
514                .send(GraphUpdate::ClearFile {
515                    file: FileId::new(i),
516                })
517                .unwrap();
518        }
519        drop(sender);
520
521        let updates: Vec<_> = receiver.iter().collect();
522        assert_eq!(updates.len(), 10);
523    }
524
525    #[test]
526    fn test_clone_sender() {
527        let (sender, receiver) = UpdateChannel::new();
528        let sender2 = sender.clone();
529
530        sender
531            .send(GraphUpdate::ClearFile {
532                file: FileId::new(1),
533            })
534            .unwrap();
535        sender2
536            .send(GraphUpdate::ClearFile {
537                file: FileId::new(2),
538            })
539            .unwrap();
540
541        // Both senders share the same counter
542        assert_eq!(sender.updates_sent(), 2);
543        assert_eq!(sender2.updates_sent(), 2);
544
545        let mut count = 0;
546        while receiver.try_recv().unwrap().is_some() {
547            count += 1;
548        }
549        assert_eq!(count, 2);
550    }
551
552    #[test]
553    fn test_channel_error_display() {
554        let err = ChannelError::Disconnected;
555        assert_eq!(format!("{err}"), "channel disconnected");
556
557        let err = ChannelError::Full;
558        assert_eq!(format!("{err}"), "channel full");
559    }
560
561    #[test]
562    fn test_concurrent_send_receive() {
563        let (sender, receiver) = UpdateChannel::new();
564        let sender_clone = sender.clone();
565
566        // Producer thread
567        let producer = thread::spawn(move || {
568            for i in 0..1000 {
569                sender
570                    .send(GraphUpdate::ClearFile {
571                        file: FileId::new(i),
572                    })
573                    .unwrap();
574            }
575        });
576
577        // Second producer
578        let producer2 = thread::spawn(move || {
579            for i in 1000..2000 {
580                sender_clone
581                    .send(GraphUpdate::ClearFile {
582                        file: FileId::new(i),
583                    })
584                    .unwrap();
585            }
586        });
587
588        // Consumer thread
589        let consumer = thread::spawn(move || {
590            let mut count = 0u64;
591            loop {
592                match receiver.try_recv() {
593                    Ok(Some(_)) => count += 1,
594                    Ok(None) => {
595                        if count >= 2000 {
596                            break;
597                        }
598                        thread::sleep(Duration::from_micros(10));
599                    }
600                    Err(ChannelError::Disconnected) => break,
601                    Err(_) => {}
602                }
603            }
604            count
605        });
606
607        producer.join().unwrap();
608        producer2.join().unwrap();
609        let received_count = consumer.join().unwrap();
610
611        assert_eq!(received_count, 2000);
612    }
613}