hydro2_async_scheduler/
create_worker_channels.rs1crate::ix!();
3
4pub fn create_worker_channels<'threads, T>(num_workers: usize, buffer_size: usize)
5-> (Vec<mpsc::Sender<TaskItem<'threads, T>>>, Vec<mpsc::Receiver<TaskItem<'threads, T>>>)
6where
7 T: Debug + Send + Sync + 'threads,
8{
9 let mut worker_senders = Vec::with_capacity(num_workers);
10 let mut worker_receivers = Vec::with_capacity(num_workers);
11
12 for w in 0..num_workers {
13 let (tx, rx) = mpsc::channel::<TaskItem<'threads, T>>(buffer_size);
14 worker_senders.push(tx);
15 worker_receivers.push(rx);
16 eprintln!("created worker channel for worker #{}", w);
17 }
18
19 (worker_senders, worker_receivers)
20}
21
22#[cfg(test)]
23mod create_worker_channels_tests {
24 use super::*;
25 use crate::mpsc::error::TryRecvError;
26
27 #[traced_test]
28 async fn test_create_worker_channels_initialization() {
29 let num_workers = 4;
30 let buffer_size = 10;
31
32 let (worker_senders, worker_receivers) = create_worker_channels::<()>(num_workers, buffer_size);
33
34 assert_eq!(worker_senders.len(), num_workers);
35 assert_eq!(worker_receivers.len(), num_workers);
36 }
37
38 #[traced_test]
39 async fn test_worker_channels_can_send_and_receive() {
40 let num_workers = 2;
41 let buffer_size = 10;
42 let (worker_senders, mut worker_receivers) = create_worker_channels::<()>(num_workers, buffer_size);
43
44 let network = Arc::new(AsyncMutex::new(Network::default()));
45 let shared_in_degs = Arc::new(AsyncMutex::new(vec![]));
46 let completed_nodes = SharedCompletedNodes::new();
47 let (child_nodes_tx, _) = mpsc::channel(10);
48 let (ready_nodes_tx, _) = mpsc::channel(10);
49
50 let task = task_item!(
51 node_idx: 42_usize,
52 permit: None,
53 network: network,
54 shared_in_degs: shared_in_degs,
55 output_tx: None,
56 checkpoint_cb: None,
57 child_nodes_tx: child_nodes_tx,
58 ready_nodes_tx: ready_nodes_tx,
59 completed_nodes: completed_nodes
60 );
61
62 worker_senders[0].send(task).await.expect("Failed to send task");
63
64 let received_task = worker_receivers[0].recv().await.expect("Failed to receive task");
65 assert_eq!(*received_task.node_idx(), 42);
66 }
67
68 #[traced_test]
69 async fn test_no_message_in_empty_channel() {
70 let num_workers = 2;
71 let buffer_size = 10;
72
73 let (worker_senders, mut worker_receivers) = create_worker_channels::<()>(num_workers, buffer_size);
75
76 match worker_receivers[0].try_recv() {
78 Err(TryRecvError::Empty) => (),
79 Err(TryRecvError::Disconnected) => panic!("The sender was disconnected unexpectedly."),
80 _ => panic!("Channel should be empty but was not"),
81 }
82
83 drop(worker_senders);
85 }
86}