hydro2_async_scheduler/
task_item.rs

1// ---------------- [ File: src/task_item.rs ]
2crate::ix!();
3
4/// Each node execution is one `TaskItem`.
5#[derive(Debug,Builder,MutGetters,Getters)]
6#[builder(setter(into),pattern="owned")]
7#[getset(get = "pub", get_mut="pub")]
8pub struct TaskItem<'threads,T>
9where
10    T: Debug + Send + Sync + 'threads,
11{
12    /// Which node index to run
13    node_idx:           usize,
14    /// The concurrency permit
15    permit:             Option<OwnedSemaphorePermit>,
16    /// The network
17    network:            Arc<AsyncMutex<Network<T>>>,
18    /// Shared in-degrees
19    shared_in_degs:     Arc<AsyncMutex<Vec<usize>>>,
20    /// If streaming is enabled
21    output_tx:          Option<StreamingOutputSender<T>>,
22    /// Checkpoint callback
23    checkpoint_cb:      Option<Arc<dyn CheckpointCallback>>,
24    /// Freed children => push to this channel
25    child_nodes_tx:     tokio::sync::mpsc::Sender<usize>,
26    /// Completed
27    completed_nodes:    SharedCompletedNodes,
28
29    /// Freed children => we used to put them in child_nodes_tx, 
30    /// but we actually want them to go into ready_nodes_tx
31    ready_nodes_tx:     tokio::sync::mpsc::Sender<usize>,
32
33    #[builder(default)]
34    threads_lifetime:   std::marker::PhantomData<&'threads ()>,
35}
36
37#[macro_export]
38macro_rules! task_item {
39    (
40        node_idx:         $node_idx:expr, 
41        permit:           $permit:expr, 
42        network:          $network:expr, 
43        shared_in_degs:   $shared_in_degs:expr, 
44        output_tx:        $output_tx:expr, 
45        checkpoint_cb:    $checkpoint_cb:expr, 
46        child_nodes_tx:   $child_nodes_tx:expr, 
47        ready_nodes_tx:   $ready_nodes_tx:expr, 
48        completed_nodes:  $completed_nodes:expr
49    ) => {{
50        TaskItemBuilder::default()
51            .node_idx($node_idx)
52            .permit($permit)
53            .network(Arc::clone(&$network))
54            .shared_in_degs(Arc::clone(&$shared_in_degs))
55            .output_tx($output_tx.clone())
56            .checkpoint_cb($checkpoint_cb.clone())
57            .child_nodes_tx($child_nodes_tx.clone())
58            .ready_nodes_tx($ready_nodes_tx.clone())
59            .completed_nodes($completed_nodes.clone())
60            .build()
61            .expect("Failed to build TaskItem")
62    }};
63}