sqry_core/graph/unified/concurrent/
channel.rs1use std::sync::Arc;
33use std::sync::mpsc::{self, Receiver, RecvError, SendError, Sender, TryRecvError};
34
35use super::super::edge::kind::EdgeKind;
36use super::super::file::FileId;
37use super::super::node::{NodeId, NodeKind};
38
39#[derive(Debug, Clone)]
41pub enum GraphUpdate {
42 AddNode {
44 kind: NodeKind,
46 name: String,
48 file: FileId,
50 },
51
52 RemoveNode {
54 node: NodeId,
56 },
57
58 AddEdge {
60 source: NodeId,
62 target: NodeId,
64 kind: EdgeKind,
66 file: FileId,
68 },
69
70 RemoveEdge {
72 source: NodeId,
74 target: NodeId,
76 kind: EdgeKind,
78 file: FileId,
80 },
81
82 ClearFile {
84 file: FileId,
86 },
87
88 TriggerCompaction,
90
91 Shutdown,
93}
94
95#[derive(Debug, Clone, PartialEq, Eq)]
97pub enum ChannelError {
98 Disconnected,
100 Full,
102}
103
104impl std::fmt::Display for ChannelError {
105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 match self {
107 Self::Disconnected => write!(f, "channel disconnected"),
108 Self::Full => write!(f, "channel full"),
109 }
110 }
111}
112
113impl std::error::Error for ChannelError {}
114
115impl<T> From<SendError<T>> for ChannelError {
116 fn from(_: SendError<T>) -> Self {
117 Self::Disconnected
118 }
119}
120
121impl From<RecvError> for ChannelError {
122 fn from(_: RecvError) -> Self {
123 Self::Disconnected
124 }
125}
126
127#[derive(Debug, Clone)]
132pub struct UpdateChannel {
133 sender: Sender<GraphUpdate>,
134 updates_sent: Arc<std::sync::atomic::AtomicU64>,
136}
137
138impl UpdateChannel {
139 #[must_use]
144 pub fn new() -> (Self, UpdateReceiver) {
145 let (sender, receiver) = mpsc::channel();
146 let updates_sent = Arc::new(std::sync::atomic::AtomicU64::new(0));
147 let updates_received = Arc::new(std::sync::atomic::AtomicU64::new(0));
148
149 (
150 Self {
151 sender,
152 updates_sent: Arc::clone(&updates_sent),
153 },
154 UpdateReceiver {
155 receiver,
156 updates_received,
157 },
158 )
159 }
160
161 #[must_use]
166 pub fn bounded(capacity: usize) -> (Self, UpdateReceiver) {
167 let (sender, receiver) = mpsc::sync_channel(capacity);
168 let updates_sent = Arc::new(std::sync::atomic::AtomicU64::new(0));
169 let updates_received = Arc::new(std::sync::atomic::AtomicU64::new(0));
170
171 (
172 Self {
173 sender: {
174 let (tx, rx) = mpsc::channel();
177 std::thread::spawn(move || {
178 while let Ok(update) = rx.recv() {
179 if sender.send(update).is_err() {
180 break;
181 }
182 }
183 });
184 tx
185 },
186 updates_sent: Arc::clone(&updates_sent),
187 },
188 UpdateReceiver {
189 receiver,
190 updates_received,
191 },
192 )
193 }
194
195 pub fn send(&self, update: GraphUpdate) -> Result<(), ChannelError> {
201 self.sender.send(update)?;
202 self.updates_sent
203 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
204 Ok(())
205 }
206
207 #[must_use]
209 pub fn updates_sent(&self) -> u64 {
210 self.updates_sent.load(std::sync::atomic::Ordering::Relaxed)
211 }
212}
213
214impl Default for UpdateChannel {
215 fn default() -> Self {
216 Self::new().0
217 }
218}
219
220#[derive(Debug)]
225pub struct UpdateReceiver {
226 receiver: Receiver<GraphUpdate>,
227 updates_received: Arc<std::sync::atomic::AtomicU64>,
229}
230
231impl UpdateReceiver {
232 pub fn recv(&self) -> Result<GraphUpdate, ChannelError> {
238 let update = self.receiver.recv()?;
239 self.updates_received
240 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
241 Ok(update)
242 }
243
244 pub fn try_recv(&self) -> Result<Option<GraphUpdate>, ChannelError> {
252 match self.receiver.try_recv() {
253 Ok(update) => {
254 self.updates_received
255 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
256 Ok(Some(update))
257 }
258 Err(TryRecvError::Empty) => Ok(None),
259 Err(TryRecvError::Disconnected) => Err(ChannelError::Disconnected),
260 }
261 }
262
263 pub fn iter(&self) -> impl Iterator<Item = GraphUpdate> + '_ {
268 std::iter::from_fn(|| self.recv().ok())
269 }
270
271 #[must_use]
273 pub fn updates_received(&self) -> u64 {
274 self.updates_received
275 .load(std::sync::atomic::Ordering::Relaxed)
276 }
277}
278
279#[derive(Debug, Clone, Copy, PartialEq, Eq)]
281pub struct ChannelStats {
282 pub sent: u64,
284 pub received: u64,
286}
287
288impl ChannelStats {
289 #[must_use]
291 pub fn in_flight(&self) -> u64 {
292 self.sent.saturating_sub(self.received)
293 }
294}
295
296#[cfg(test)]
297mod tests {
298 use super::*;
299 use std::thread;
300 use std::time::Duration;
301
302 #[test]
303 fn test_channel_new() {
304 let (sender, _receiver) = UpdateChannel::new();
305 assert_eq!(sender.updates_sent(), 0);
306 }
307
308 #[test]
309 fn test_channel_default() {
310 let sender: UpdateChannel = UpdateChannel::default();
311 assert_eq!(sender.updates_sent(), 0);
312 }
313
314 #[test]
315 fn test_send_receive() {
316 let (sender, receiver) = UpdateChannel::new();
317
318 let file = FileId::new(1);
319 sender
320 .send(GraphUpdate::ClearFile { file })
321 .expect("send failed");
322
323 let update = receiver.recv().expect("recv failed");
324 match update {
325 GraphUpdate::ClearFile { file: f } => assert_eq!(f, file),
326 _ => panic!("wrong update type"),
327 }
328 }
329
330 #[test]
331 fn test_updates_serialized() {
332 let (sender, receiver) = UpdateChannel::new();
333 let sender_clone = sender.clone();
334
335 let handle1 = thread::spawn(move || {
337 for i in 0..100 {
338 let file = FileId::new(i);
339 sender.send(GraphUpdate::ClearFile { file }).unwrap();
340 }
341 });
342
343 let handle2 = thread::spawn(move || {
344 for i in 100..200 {
345 let file = FileId::new(i);
346 sender_clone.send(GraphUpdate::ClearFile { file }).unwrap();
347 }
348 });
349
350 handle1.join().unwrap();
351 handle2.join().unwrap();
352
353 let mut received_updates = Vec::new();
355 while let Ok(Some(update)) = receiver.try_recv() {
356 received_updates.push(update);
357 }
358
359 assert_eq!(received_updates.len(), 200);
360 assert_eq!(receiver.updates_received(), 200);
361 }
362
363 #[test]
364 fn test_channel_ordering() {
365 let (sender, receiver) = UpdateChannel::new();
366
367 for i in 0..100 {
369 let file = FileId::new(i);
370 sender.send(GraphUpdate::ClearFile { file }).unwrap();
371 }
372
373 for i in 0..100 {
375 let update = receiver.recv().unwrap();
376 match update {
377 GraphUpdate::ClearFile { file } => {
378 assert_eq!(file.index(), i);
379 }
380 _ => panic!("wrong update type"),
381 }
382 }
383 }
384
385 #[test]
386 fn test_try_recv_empty() {
387 let (sender, receiver) = UpdateChannel::new();
388 let _sender = sender;
390 assert!(receiver.try_recv().unwrap().is_none());
391 }
392
393 #[test]
394 fn test_try_recv_available() {
395 let (sender, receiver) = UpdateChannel::new();
396
397 let file = FileId::new(42);
398 sender.send(GraphUpdate::ClearFile { file }).unwrap();
399
400 let result = receiver.try_recv().unwrap();
401 assert!(result.is_some());
402 }
403
404 #[test]
405 fn test_disconnected_sender() {
406 let (sender, receiver) = UpdateChannel::new();
407 drop(sender);
408
409 let result = receiver.recv();
410 assert!(matches!(result, Err(ChannelError::Disconnected)));
411 }
412
413 #[test]
414 fn test_disconnected_receiver() {
415 let (sender, receiver) = UpdateChannel::new();
416 drop(receiver);
417
418 let file = FileId::new(1);
419 let result = sender.send(GraphUpdate::ClearFile { file });
420 assert!(matches!(result, Err(ChannelError::Disconnected)));
421 }
422
423 #[test]
424 fn test_update_kinds() {
425 let (sender, receiver) = UpdateChannel::new();
426
427 sender
429 .send(GraphUpdate::AddNode {
430 kind: NodeKind::Function,
431 name: "test".to_string(),
432 file: FileId::new(1),
433 })
434 .unwrap();
435
436 sender
437 .send(GraphUpdate::RemoveNode {
438 node: NodeId::new(1, 0),
439 })
440 .unwrap();
441
442 sender
443 .send(GraphUpdate::AddEdge {
444 source: NodeId::new(1, 0),
445 target: NodeId::new(2, 0),
446 kind: EdgeKind::Calls {
447 argument_count: 0,
448 is_async: false,
449 },
450 file: FileId::new(1),
451 })
452 .unwrap();
453
454 sender
455 .send(GraphUpdate::RemoveEdge {
456 source: NodeId::new(1, 0),
457 target: NodeId::new(2, 0),
458 kind: EdgeKind::Calls {
459 argument_count: 0,
460 is_async: false,
461 },
462 file: FileId::new(1),
463 })
464 .unwrap();
465
466 sender
467 .send(GraphUpdate::ClearFile {
468 file: FileId::new(1),
469 })
470 .unwrap();
471
472 sender.send(GraphUpdate::TriggerCompaction).unwrap();
473 sender.send(GraphUpdate::Shutdown).unwrap();
474
475 assert_eq!(sender.updates_sent(), 7);
476
477 let mut count = 0;
479 while receiver.try_recv().unwrap().is_some() {
480 count += 1;
481 }
482 assert_eq!(count, 7);
483 }
484
485 #[test]
486 fn test_channel_stats() {
487 let stats = ChannelStats {
488 sent: 100,
489 received: 75,
490 };
491 assert_eq!(stats.in_flight(), 25);
492 }
493
494 #[test]
495 fn test_channel_stats_saturating() {
496 let stats = ChannelStats {
498 sent: 50,
499 received: 75,
500 };
501 assert_eq!(stats.in_flight(), 0);
502 }
503
504 #[test]
505 fn test_iter() {
506 let (sender, receiver) = UpdateChannel::new();
507
508 for i in 0..10 {
509 sender
510 .send(GraphUpdate::ClearFile {
511 file: FileId::new(i),
512 })
513 .unwrap();
514 }
515 drop(sender);
516
517 let updates: Vec<_> = receiver.iter().collect();
518 assert_eq!(updates.len(), 10);
519 }
520
521 #[test]
522 fn test_clone_sender() {
523 let (sender, receiver) = UpdateChannel::new();
524 let sender2 = sender.clone();
525
526 sender
527 .send(GraphUpdate::ClearFile {
528 file: FileId::new(1),
529 })
530 .unwrap();
531 sender2
532 .send(GraphUpdate::ClearFile {
533 file: FileId::new(2),
534 })
535 .unwrap();
536
537 assert_eq!(sender.updates_sent(), 2);
539 assert_eq!(sender2.updates_sent(), 2);
540
541 let mut count = 0;
542 while receiver.try_recv().unwrap().is_some() {
543 count += 1;
544 }
545 assert_eq!(count, 2);
546 }
547
548 #[test]
549 fn test_channel_error_display() {
550 let err = ChannelError::Disconnected;
551 assert_eq!(format!("{err}"), "channel disconnected");
552
553 let err = ChannelError::Full;
554 assert_eq!(format!("{err}"), "channel full");
555 }
556
557 #[test]
558 fn test_concurrent_send_receive() {
559 let (sender, receiver) = UpdateChannel::new();
560 let sender_clone = sender.clone();
561
562 let producer = thread::spawn(move || {
564 for i in 0..1000 {
565 sender
566 .send(GraphUpdate::ClearFile {
567 file: FileId::new(i),
568 })
569 .unwrap();
570 }
571 });
572
573 let producer2 = thread::spawn(move || {
575 for i in 1000..2000 {
576 sender_clone
577 .send(GraphUpdate::ClearFile {
578 file: FileId::new(i),
579 })
580 .unwrap();
581 }
582 });
583
584 let consumer = thread::spawn(move || {
586 let mut count = 0u64;
587 loop {
588 match receiver.try_recv() {
589 Ok(Some(_)) => count += 1,
590 Ok(None) => {
591 if count >= 2000 {
592 break;
593 }
594 thread::sleep(Duration::from_micros(10));
595 }
596 Err(ChannelError::Disconnected) => break,
597 Err(_) => {}
598 }
599 }
600 count
601 });
602
603 producer.join().unwrap();
604 producer2.join().unwrap();
605 let received_count = consumer.join().unwrap();
606
607 assert_eq!(received_count, 2000);
608 }
609}