pub struct TaskChain { /* private fields */ }Expand description
Structure representing a task-chain that manages the flow of tasks based on signals and expectations.
A TaskChain is associated with a shared CondvarPair to synchronize the tasks, but each task holds its own
TaskChain instance. The taskchain manages task execution flow by defining the expected kind of task (Kinds)
and the signal (Signal) to be used for notifying other tasks.
The TaskChain controls two main components:
- kind: The expected task kind (Kinds) that this specific task is waiting to execute.
- next: The signal (Signal) that is sent when the task completes, which can be sent either explicitly within the task logic or automatically when theTaskChaininstance is dropped.
Tasks interact with a shared CondvarPair to ensure proper synchronization, allowing tasks to wait for signals
and proceed in the correct order.
§Example Usage:
use std::sync::{Arc, atomic::{AtomicU32, Ordering}};
use std::thread;
use std::time::Duration;
use log::info;
use taskchain::{Kinds, Signal, CondvarPair,TaskChain};
// Create a shared CondvarPair and an atomic counter.
let cvp = Arc::new(CondvarPair::new(Signal::INACTIVE));
let count = AtomicU32::new(0);
// Start the thread scope to manage multiple threads.
thread::scope(|s| {
    // First thread that doesn't wait for any signal but sends `TRIGGER(Kinds::SPECIFIED(0))`.
    thread::Builder::new()
        .name("first".to_string())
        .spawn_scoped(s, || {
            info!(
                "hello from the {:?} scoped thread, no waiting",
                thread::current().name().unwrap()
            );
            let mut pl = TaskChain::new(
                Arc::clone(&cvp),
                Kinds::ANY,
                Signal::TRIGGER(Kinds::SPECIFIED(0)),
            );
            assert_eq!(count.load(Ordering::SeqCst), 0);
            count.fetch_add(1, Ordering::SeqCst); // Increment the counter.
            thread::sleep(Duration::from_millis(100));
            info!(
                "The {:?} scoped thread sent a signal {:?}",
                thread::current().name().unwrap(),
                Signal::TRIGGER(Kinds::SPECIFIED(0))
            );
            pl.notify(); // Notify the second thread.
        })
        .unwrap();
    // Second thread that waits for the `TRIGGER(Kinds::SPECIFIED(0))` signal.
    thread::Builder::new()
        .name("second".to_string())
        .spawn_scoped(s, || {
            info!(
                "hello from the {:?} scoped thread, waiting for signal {:?}",
                thread::current().name().unwrap(),
                Signal::TRIGGER(Kinds::SPECIFIED(0))
            );
            let mut pl = TaskChain::new(
                Arc::clone(&cvp),
                Kinds::SPECIFIED(0),
                Signal::TRIGGER(Kinds::SPECIFIED(1)),
            );
            pl.wait(Duration::ZERO); // Wait for the first thread's signal.
            assert_eq!(count.load(Ordering::SeqCst), 1); // Ensure first thread incremented.
            count.fetch_add(1, Ordering::SeqCst); // Increment the counter.
            info!(
                "The {:?} scoped thread sent a signal {:?}",
                thread::current().name().unwrap(),
                Signal::TRIGGER(Kinds::SPECIFIED(1))
            );
            thread::sleep(Duration::from_millis(100));
        })
        .unwrap();
    // Third thread that waits for the `TRIGGER(Kinds::SPECIFIED(1))` signal.
    thread::Builder::new()
        .name("third".to_string())
        .spawn_scoped(s, || {
            info!(
                "hello from the {:?} scoped thread, waiting for signal {:?}",
                thread::current().name().unwrap(),
                Signal::TRIGGER(Kinds::SPECIFIED(1))
            );
            let mut pl = TaskChain::new(Arc::clone(&cvp), Kinds::SPECIFIED(1), Signal::INACTIVE);
            pl.wait(Duration::ZERO); // Wait for the second thread's signal.
            assert_eq!(count.load(Ordering::SeqCst), 2); // Ensure second thread incremented.
            count.fetch_add(1, Ordering::SeqCst); // Increment the counter.
            info!(
                "The {:?} scoped thread sent a signal {:?}",
                thread::current().name().unwrap(),
                Signal::INACTIVE
            );
            thread::sleep(Duration::from_millis(100));
        })
        .unwrap();
});
// Ensure all threads completed and incremented the counter.
assert_eq!(count.load(Ordering::SeqCst), 3);
info!("all tasks completed");Each task holds its own TaskChain instance, sharing the same CondvarPair to ensure tasks proceed sequentially
based on the signals. When the TaskChain instance is dropped, the next signal is automatically sent, ensuring
that tasks are not left waiting indefinitely.
Implementations§
Source§impl TaskChain
 
impl TaskChain
Sourcepub fn new(cvp: Arc<CondvarPair>, kind: Kinds, next: Signal) -> Self
 
pub fn new(cvp: Arc<CondvarPair>, kind: Kinds, next: Signal) -> Self
Creates a new TaskChain instance.
§Parameters
- cvp: An- Arcreference to the shared- CondvarPairthat will synchronize the tasks.
- kind: The expected- Kindsvalue representing the type of task this taskchain will wait for.
- next: The- Signalthat will be sent when the current task completes, either explicitly via- notify()or implicitly when the- TaskChaininstance is dropped.
§Returns
A new TaskChain instance that holds a reference to the shared CondvarPair.
Sourcepub fn wait(&mut self, dur: Duration) -> bool
 
pub fn wait(&mut self, dur: Duration) -> bool
Waits for the taskchain to receive the expected signal.
The task will block and wait until the taskchain receives a signal that matches the expected Kinds.
This is how the taskchain enforces sequential task execution based on signals.
§Behavior
- If the expected signal is not received, the task remains blocked.
- Once the expected signal is received, the task will proceed.
- duris the duration for which the task will wait for the expected signal, Duration::ZERO means wait indefinitely.
- return true if the wait was successful, false if the wait was interrupted by a timeout or other reason.
Trait Implementations§
Source§impl Drop for TaskChain
 
impl Drop for TaskChain
Source§fn drop(&mut self)
 
fn drop(&mut self)
Automatically notifies the taskchain when the TaskChain instance is dropped.
This method ensures that even if a task ends without explicitly calling notify(), the next signal
in the taskchain will still be sent when the TaskChain instance goes out of scope.
This behavior is particularly useful for avoiding deadlocks or tasks getting stuck waiting for signals. `