hydro2_async_scheduler/
create_worker_channels.rs

1// ---------------- [ File: src/create_worker_channels.rs ]
2crate::ix!();
3
4pub fn create_worker_channels<'threads, T>(num_workers: usize, buffer_size: usize) 
5-> (Vec<mpsc::Sender<TaskItem<'threads, T>>>, Vec<mpsc::Receiver<TaskItem<'threads, T>>>) 
6where
7    T: Debug + Send + Sync + 'threads,
8{
9    let mut worker_senders   = Vec::with_capacity(num_workers);
10    let mut worker_receivers = Vec::with_capacity(num_workers);
11
12    for w in 0..num_workers {
13        let (tx, rx) = mpsc::channel::<TaskItem<'threads, T>>(buffer_size);
14        worker_senders.push(tx);
15        worker_receivers.push(rx);
16        eprintln!("created worker channel for worker #{}", w);
17    }
18
19    (worker_senders, worker_receivers)
20}
21
22#[cfg(test)]
23mod create_worker_channels_tests {
24    use super::*;
25    use crate::mpsc::error::TryRecvError;
26
27    #[traced_test]
28    async fn test_create_worker_channels_initialization() {
29        let num_workers = 4;
30        let buffer_size = 10;
31
32        let (worker_senders, worker_receivers) = create_worker_channels::<()>(num_workers, buffer_size);
33
34        assert_eq!(worker_senders.len(), num_workers);
35        assert_eq!(worker_receivers.len(), num_workers);
36    }
37
38    #[traced_test]
39    async fn test_worker_channels_can_send_and_receive() {
40        let num_workers = 2;
41        let buffer_size = 10;
42        let (worker_senders, mut worker_receivers) = create_worker_channels::<()>(num_workers, buffer_size);
43
44        let network             = Arc::new(AsyncMutex::new(Network::default()));
45        let shared_in_degs      = Arc::new(AsyncMutex::new(vec![]));
46        let completed_nodes     = SharedCompletedNodes::new();
47        let (child_nodes_tx, _) = mpsc::channel(10);
48        let (ready_nodes_tx, _) = mpsc::channel(10);
49
50        let task = task_item!(
51            node_idx:        42_usize,
52            permit:          None,
53            network:         network,
54            shared_in_degs:  shared_in_degs,
55            output_tx:       None,
56            checkpoint_cb:   None,
57            child_nodes_tx:  child_nodes_tx,
58            ready_nodes_tx:  ready_nodes_tx,
59            completed_nodes: completed_nodes
60        );
61
62        worker_senders[0].send(task).await.expect("Failed to send task");
63
64        let received_task = worker_receivers[0].recv().await.expect("Failed to receive task");
65        assert_eq!(*received_task.node_idx(), 42);
66    }
67
68    #[traced_test]
69    async fn test_no_message_in_empty_channel() {
70        let num_workers = 2;
71        let buffer_size = 10;
72
73        // Ensure the senders are kept in scope until the end of the test
74        let (worker_senders, mut worker_receivers) = create_worker_channels::<()>(num_workers, buffer_size);
75
76        // Validate the channel is empty but not disconnected
77        match worker_receivers[0].try_recv() {
78            Err(TryRecvError::Empty) => (),
79            Err(TryRecvError::Disconnected) => panic!("The sender was disconnected unexpectedly."),
80            _ => panic!("Channel should be empty but was not"),
81        }
82
83        // Explicitly drop senders after the validation to ensure the channel isn't closed prematurely
84        drop(worker_senders);
85    }
86}