sqry_core/graph/unified/concurrent/
channel.rs1use std::sync::Arc;
33use std::sync::mpsc::{self, Receiver, RecvError, SendError, Sender, TryRecvError};
34
35use super::super::edge::kind::EdgeKind;
36#[cfg(test)]
37use super::super::edge::kind::ResolvedVia;
38use super::super::file::FileId;
39use super::super::node::{NodeId, NodeKind};
40
41#[derive(Debug, Clone)]
43pub enum GraphUpdate {
44 AddNode {
46 kind: NodeKind,
48 name: String,
50 file: FileId,
52 },
53
54 RemoveNode {
56 node: NodeId,
58 },
59
60 AddEdge {
62 source: NodeId,
64 target: NodeId,
66 kind: EdgeKind,
68 file: FileId,
70 },
71
72 RemoveEdge {
74 source: NodeId,
76 target: NodeId,
78 kind: EdgeKind,
80 file: FileId,
82 },
83
84 ClearFile {
86 file: FileId,
88 },
89
90 TriggerCompaction,
92
93 Shutdown,
95}
96
97#[derive(Debug, Clone, PartialEq, Eq)]
99pub enum ChannelError {
100 Disconnected,
102 Full,
104}
105
106impl std::fmt::Display for ChannelError {
107 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108 match self {
109 Self::Disconnected => write!(f, "channel disconnected"),
110 Self::Full => write!(f, "channel full"),
111 }
112 }
113}
114
115impl std::error::Error for ChannelError {}
116
117impl<T> From<SendError<T>> for ChannelError {
118 fn from(_: SendError<T>) -> Self {
119 Self::Disconnected
120 }
121}
122
123impl From<RecvError> for ChannelError {
124 fn from(_: RecvError) -> Self {
125 Self::Disconnected
126 }
127}
128
129#[derive(Debug, Clone)]
134pub struct UpdateChannel {
135 sender: Sender<GraphUpdate>,
136 updates_sent: Arc<std::sync::atomic::AtomicU64>,
138}
139
140impl UpdateChannel {
141 #[must_use]
146 pub fn new() -> (Self, UpdateReceiver) {
147 let (sender, receiver) = mpsc::channel();
148 let updates_sent = Arc::new(std::sync::atomic::AtomicU64::new(0));
149 let updates_received = Arc::new(std::sync::atomic::AtomicU64::new(0));
150
151 (
152 Self {
153 sender,
154 updates_sent: Arc::clone(&updates_sent),
155 },
156 UpdateReceiver {
157 receiver,
158 updates_received,
159 },
160 )
161 }
162
163 #[must_use]
168 pub fn bounded(capacity: usize) -> (Self, UpdateReceiver) {
169 let (sender, receiver) = mpsc::sync_channel(capacity);
170 let updates_sent = Arc::new(std::sync::atomic::AtomicU64::new(0));
171 let updates_received = Arc::new(std::sync::atomic::AtomicU64::new(0));
172
173 (
174 Self {
175 sender: {
176 let (tx, rx) = mpsc::channel();
179 std::thread::spawn(move || {
180 while let Ok(update) = rx.recv() {
181 if sender.send(update).is_err() {
182 break;
183 }
184 }
185 });
186 tx
187 },
188 updates_sent: Arc::clone(&updates_sent),
189 },
190 UpdateReceiver {
191 receiver,
192 updates_received,
193 },
194 )
195 }
196
197 pub fn send(&self, update: GraphUpdate) -> Result<(), ChannelError> {
203 self.sender.send(update)?;
204 self.updates_sent
205 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
206 Ok(())
207 }
208
209 #[must_use]
211 pub fn updates_sent(&self) -> u64 {
212 self.updates_sent.load(std::sync::atomic::Ordering::Relaxed)
213 }
214}
215
216impl Default for UpdateChannel {
217 fn default() -> Self {
218 Self::new().0
219 }
220}
221
222#[derive(Debug)]
227pub struct UpdateReceiver {
228 receiver: Receiver<GraphUpdate>,
229 updates_received: Arc<std::sync::atomic::AtomicU64>,
231}
232
233impl UpdateReceiver {
234 pub fn recv(&self) -> Result<GraphUpdate, ChannelError> {
240 let update = self.receiver.recv()?;
241 self.updates_received
242 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
243 Ok(update)
244 }
245
246 pub fn try_recv(&self) -> Result<Option<GraphUpdate>, ChannelError> {
254 match self.receiver.try_recv() {
255 Ok(update) => {
256 self.updates_received
257 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
258 Ok(Some(update))
259 }
260 Err(TryRecvError::Empty) => Ok(None),
261 Err(TryRecvError::Disconnected) => Err(ChannelError::Disconnected),
262 }
263 }
264
265 pub fn iter(&self) -> impl Iterator<Item = GraphUpdate> + '_ {
270 std::iter::from_fn(|| self.recv().ok())
271 }
272
273 #[must_use]
275 pub fn updates_received(&self) -> u64 {
276 self.updates_received
277 .load(std::sync::atomic::Ordering::Relaxed)
278 }
279}
280
281#[derive(Debug, Clone, Copy, PartialEq, Eq)]
283pub struct ChannelStats {
284 pub sent: u64,
286 pub received: u64,
288}
289
290impl ChannelStats {
291 #[must_use]
293 pub fn in_flight(&self) -> u64 {
294 self.sent.saturating_sub(self.received)
295 }
296}
297
298#[cfg(test)]
299mod tests {
300 use super::*;
301 use std::thread;
302 use std::time::Duration;
303
304 #[test]
305 fn test_channel_new() {
306 let (sender, _receiver) = UpdateChannel::new();
307 assert_eq!(sender.updates_sent(), 0);
308 }
309
310 #[test]
311 fn test_channel_default() {
312 let sender: UpdateChannel = UpdateChannel::default();
313 assert_eq!(sender.updates_sent(), 0);
314 }
315
316 #[test]
317 fn test_send_receive() {
318 let (sender, receiver) = UpdateChannel::new();
319
320 let file = FileId::new(1);
321 sender
322 .send(GraphUpdate::ClearFile { file })
323 .expect("send failed");
324
325 let update = receiver.recv().expect("recv failed");
326 match update {
327 GraphUpdate::ClearFile { file: f } => assert_eq!(f, file),
328 _ => panic!("wrong update type"),
329 }
330 }
331
332 #[test]
333 fn test_updates_serialized() {
334 let (sender, receiver) = UpdateChannel::new();
335 let sender_clone = sender.clone();
336
337 let handle1 = thread::spawn(move || {
339 for i in 0..100 {
340 let file = FileId::new(i);
341 sender.send(GraphUpdate::ClearFile { file }).unwrap();
342 }
343 });
344
345 let handle2 = thread::spawn(move || {
346 for i in 100..200 {
347 let file = FileId::new(i);
348 sender_clone.send(GraphUpdate::ClearFile { file }).unwrap();
349 }
350 });
351
352 handle1.join().unwrap();
353 handle2.join().unwrap();
354
355 let mut received_updates = Vec::new();
357 while let Ok(Some(update)) = receiver.try_recv() {
358 received_updates.push(update);
359 }
360
361 assert_eq!(received_updates.len(), 200);
362 assert_eq!(receiver.updates_received(), 200);
363 }
364
365 #[test]
366 fn test_channel_ordering() {
367 let (sender, receiver) = UpdateChannel::new();
368
369 for i in 0..100 {
371 let file = FileId::new(i);
372 sender.send(GraphUpdate::ClearFile { file }).unwrap();
373 }
374
375 for i in 0..100 {
377 let update = receiver.recv().unwrap();
378 match update {
379 GraphUpdate::ClearFile { file } => {
380 assert_eq!(file.index(), i);
381 }
382 _ => panic!("wrong update type"),
383 }
384 }
385 }
386
387 #[test]
388 fn test_try_recv_empty() {
389 let (sender, receiver) = UpdateChannel::new();
390 let _sender = sender;
392 assert!(receiver.try_recv().unwrap().is_none());
393 }
394
395 #[test]
396 fn test_try_recv_available() {
397 let (sender, receiver) = UpdateChannel::new();
398
399 let file = FileId::new(42);
400 sender.send(GraphUpdate::ClearFile { file }).unwrap();
401
402 let result = receiver.try_recv().unwrap();
403 assert!(result.is_some());
404 }
405
406 #[test]
407 fn test_disconnected_sender() {
408 let (sender, receiver) = UpdateChannel::new();
409 drop(sender);
410
411 let result = receiver.recv();
412 assert!(matches!(result, Err(ChannelError::Disconnected)));
413 }
414
415 #[test]
416 fn test_disconnected_receiver() {
417 let (sender, receiver) = UpdateChannel::new();
418 drop(receiver);
419
420 let file = FileId::new(1);
421 let result = sender.send(GraphUpdate::ClearFile { file });
422 assert!(matches!(result, Err(ChannelError::Disconnected)));
423 }
424
425 #[test]
426 fn test_update_kinds() {
427 let (sender, receiver) = UpdateChannel::new();
428
429 sender
431 .send(GraphUpdate::AddNode {
432 kind: NodeKind::Function,
433 name: "test".to_string(),
434 file: FileId::new(1),
435 })
436 .unwrap();
437
438 sender
439 .send(GraphUpdate::RemoveNode {
440 node: NodeId::new(1, 0),
441 })
442 .unwrap();
443
444 sender
445 .send(GraphUpdate::AddEdge {
446 source: NodeId::new(1, 0),
447 target: NodeId::new(2, 0),
448 kind: EdgeKind::Calls {
449 argument_count: 0,
450 is_async: false,
451 resolved_via: ResolvedVia::Direct,
452 },
453 file: FileId::new(1),
454 })
455 .unwrap();
456
457 sender
458 .send(GraphUpdate::RemoveEdge {
459 source: NodeId::new(1, 0),
460 target: NodeId::new(2, 0),
461 kind: EdgeKind::Calls {
462 argument_count: 0,
463 is_async: false,
464 resolved_via: ResolvedVia::Direct,
465 },
466 file: FileId::new(1),
467 })
468 .unwrap();
469
470 sender
471 .send(GraphUpdate::ClearFile {
472 file: FileId::new(1),
473 })
474 .unwrap();
475
476 sender.send(GraphUpdate::TriggerCompaction).unwrap();
477 sender.send(GraphUpdate::Shutdown).unwrap();
478
479 assert_eq!(sender.updates_sent(), 7);
480
481 let mut count = 0;
483 while receiver.try_recv().unwrap().is_some() {
484 count += 1;
485 }
486 assert_eq!(count, 7);
487 }
488
489 #[test]
490 fn test_channel_stats() {
491 let stats = ChannelStats {
492 sent: 100,
493 received: 75,
494 };
495 assert_eq!(stats.in_flight(), 25);
496 }
497
498 #[test]
499 fn test_channel_stats_saturating() {
500 let stats = ChannelStats {
502 sent: 50,
503 received: 75,
504 };
505 assert_eq!(stats.in_flight(), 0);
506 }
507
508 #[test]
509 fn test_iter() {
510 let (sender, receiver) = UpdateChannel::new();
511
512 for i in 0..10 {
513 sender
514 .send(GraphUpdate::ClearFile {
515 file: FileId::new(i),
516 })
517 .unwrap();
518 }
519 drop(sender);
520
521 let updates: Vec<_> = receiver.iter().collect();
522 assert_eq!(updates.len(), 10);
523 }
524
525 #[test]
526 fn test_clone_sender() {
527 let (sender, receiver) = UpdateChannel::new();
528 let sender2 = sender.clone();
529
530 sender
531 .send(GraphUpdate::ClearFile {
532 file: FileId::new(1),
533 })
534 .unwrap();
535 sender2
536 .send(GraphUpdate::ClearFile {
537 file: FileId::new(2),
538 })
539 .unwrap();
540
541 assert_eq!(sender.updates_sent(), 2);
543 assert_eq!(sender2.updates_sent(), 2);
544
545 let mut count = 0;
546 while receiver.try_recv().unwrap().is_some() {
547 count += 1;
548 }
549 assert_eq!(count, 2);
550 }
551
552 #[test]
553 fn test_channel_error_display() {
554 let err = ChannelError::Disconnected;
555 assert_eq!(format!("{err}"), "channel disconnected");
556
557 let err = ChannelError::Full;
558 assert_eq!(format!("{err}"), "channel full");
559 }
560
561 #[test]
562 fn test_concurrent_send_receive() {
563 let (sender, receiver) = UpdateChannel::new();
564 let sender_clone = sender.clone();
565
566 let producer = thread::spawn(move || {
568 for i in 0..1000 {
569 sender
570 .send(GraphUpdate::ClearFile {
571 file: FileId::new(i),
572 })
573 .unwrap();
574 }
575 });
576
577 let producer2 = thread::spawn(move || {
579 for i in 1000..2000 {
580 sender_clone
581 .send(GraphUpdate::ClearFile {
582 file: FileId::new(i),
583 })
584 .unwrap();
585 }
586 });
587
588 let consumer = thread::spawn(move || {
590 let mut count = 0u64;
591 loop {
592 match receiver.try_recv() {
593 Ok(Some(_)) => count += 1,
594 Ok(None) => {
595 if count >= 2000 {
596 break;
597 }
598 thread::sleep(Duration::from_micros(10));
599 }
600 Err(ChannelError::Disconnected) => break,
601 Err(_) => {}
602 }
603 }
604 count
605 });
606
607 producer.join().unwrap();
608 producer2.join().unwrap();
609 let received_count = consumer.join().unwrap();
610
611 assert_eq!(received_count, 2000);
612 }
613}