Struct TaskChain

Source
pub struct TaskChain { /* private fields */ }
Expand description

Structure representing a task-chain that manages the flow of tasks based on signals and expectations.

A TaskChain is associated with a shared CondvarPair to synchronize the tasks, but each task holds its own TaskChain instance. The taskchain manages task execution flow by defining the expected kind of task (Kinds) and the signal (Signal) to be used for notifying other tasks.

The TaskChain controls two main components:

  • kind: The expected task kind (Kinds) that this specific task is waiting to execute.
  • next: The signal (Signal) that is sent when the task completes, which can be sent either explicitly within the task logic or automatically when the TaskChain instance is dropped.

Tasks interact with a shared CondvarPair to ensure proper synchronization, allowing tasks to wait for signals and proceed in the correct order.

§Example Usage:

use std::sync::{Arc, atomic::{AtomicU32, Ordering}};
use std::thread;
use std::time::Duration;
use log::info;
use taskchain::{Kinds, Signal, CondvarPair,TaskChain};

// Create a shared CondvarPair and an atomic counter.
let cvp = Arc::new(CondvarPair::new(Signal::INACTIVE));
let count = AtomicU32::new(0);

// Start the thread scope to manage multiple threads.
thread::scope(|s| {
    // First thread that doesn't wait for any signal but sends `TRIGGER(Kinds::SPECIFIED(0))`.
    thread::Builder::new()
        .name("first".to_string())
        .spawn_scoped(s, || {
            info!(
                "hello from the {:?} scoped thread, no waiting",
                thread::current().name().unwrap()
            );
            let mut pl = TaskChain::new(
                Arc::clone(&cvp),
                Kinds::ANY,
                Signal::TRIGGER(Kinds::SPECIFIED(0)),
            );

            assert_eq!(count.load(Ordering::SeqCst), 0);
            count.fetch_add(1, Ordering::SeqCst); // Increment the counter.
            thread::sleep(Duration::from_millis(100));
            info!(
                "The {:?} scoped thread sent a signal {:?}",
                thread::current().name().unwrap(),
                Signal::TRIGGER(Kinds::SPECIFIED(0))
            );
            pl.notify(); // Notify the second thread.
        })
        .unwrap();

    // Second thread that waits for the `TRIGGER(Kinds::SPECIFIED(0))` signal.
    thread::Builder::new()
        .name("second".to_string())
        .spawn_scoped(s, || {
            info!(
                "hello from the {:?} scoped thread, waiting for signal {:?}",
                thread::current().name().unwrap(),
                Signal::TRIGGER(Kinds::SPECIFIED(0))
            );
            let mut pl = TaskChain::new(
                Arc::clone(&cvp),
                Kinds::SPECIFIED(0),
                Signal::TRIGGER(Kinds::SPECIFIED(1)),
            );
            pl.wait(Duration::ZERO); // Wait for the first thread's signal.
            assert_eq!(count.load(Ordering::SeqCst), 1); // Ensure first thread incremented.
            count.fetch_add(1, Ordering::SeqCst); // Increment the counter.
            info!(
                "The {:?} scoped thread sent a signal {:?}",
                thread::current().name().unwrap(),
                Signal::TRIGGER(Kinds::SPECIFIED(1))
            );
            thread::sleep(Duration::from_millis(100));
        })
        .unwrap();

    // Third thread that waits for the `TRIGGER(Kinds::SPECIFIED(1))` signal.
    thread::Builder::new()
        .name("third".to_string())
        .spawn_scoped(s, || {
            info!(
                "hello from the {:?} scoped thread, waiting for signal {:?}",
                thread::current().name().unwrap(),
                Signal::TRIGGER(Kinds::SPECIFIED(1))
            );
            let mut pl = TaskChain::new(Arc::clone(&cvp), Kinds::SPECIFIED(1), Signal::INACTIVE);
            pl.wait(Duration::ZERO); // Wait for the second thread's signal.
            assert_eq!(count.load(Ordering::SeqCst), 2); // Ensure second thread incremented.
            count.fetch_add(1, Ordering::SeqCst); // Increment the counter.
            info!(
                "The {:?} scoped thread sent a signal {:?}",
                thread::current().name().unwrap(),
                Signal::INACTIVE
            );
            thread::sleep(Duration::from_millis(100));
        })
        .unwrap();
});

// Ensure all threads completed and incremented the counter.
assert_eq!(count.load(Ordering::SeqCst), 3);
info!("all tasks completed");

Each task holds its own TaskChain instance, sharing the same CondvarPair to ensure tasks proceed sequentially based on the signals. When the TaskChain instance is dropped, the next signal is automatically sent, ensuring that tasks are not left waiting indefinitely.

Implementations§

Source§

impl TaskChain

Source

pub fn new(cvp: Arc<CondvarPair>, kind: Kinds, next: Signal) -> Self

Creates a new TaskChain instance.

§Parameters
  • cvp: An Arc reference to the shared CondvarPair that will synchronize the tasks.
  • kind: The expected Kinds value representing the type of task this taskchain will wait for.
  • next: The Signal that will be sent when the current task completes, either explicitly via notify() or implicitly when the TaskChain instance is dropped.
§Returns

A new TaskChain instance that holds a reference to the shared CondvarPair.

Source

pub fn wait(&mut self, dur: Duration) -> bool

Waits for the taskchain to receive the expected signal.

The task will block and wait until the taskchain receives a signal that matches the expected Kinds. This is how the taskchain enforces sequential task execution based on signals.

§Behavior
  • If the expected signal is not received, the task remains blocked.
  • Once the expected signal is received, the task will proceed.
  • dur is the duration for which the task will wait for the expected signal, Duration::ZERO means wait indefinitely.
  • return true if the wait was successful, false if the wait was interrupted by a timeout or other reason.
Source

pub fn notify(&mut self)

Notifies the taskchain, signaling that the current task has completed.

This method explicitly sends the next signal, which allows the next task in the taskchain to proceed. It is usually called at the end of a task to notify subsequent tasks that they can now execute.

Trait Implementations§

Source§

impl Drop for TaskChain

Source§

fn drop(&mut self)

Automatically notifies the taskchain when the TaskChain instance is dropped.

This method ensures that even if a task ends without explicitly calling notify(), the next signal in the taskchain will still be sent when the TaskChain instance goes out of scope.

This behavior is particularly useful for avoiding deadlocks or tasks getting stuck waiting for signals. `

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.