tasksitter/
node.rs

1use crate::error::Result;
2use crate::task::Task;
3use std::sync::Arc;
4
5/// Represents the next [Node]s (by index) to execute after the current node.
6///
7/// The `usize` corresponds to an index of a [Node] in the [Workflow](crate::workflow::Workflow)'s
8/// collection of `nodes`.
9/// These indices are known before the graph is constructed, and the indices are returned
10/// by [Workflow::add_node](crate::workflow::Workflow::add_node) as confirmation.
11#[derive(Debug, Clone)]
12pub enum NextNode {
13    /// Execute a single next node.
14    Single(usize),
15    /// Execute multiple next nodes.
16    Multiple(Vec<usize>),
17    /// No next node to execute (signifies an end-node).
18    None,
19}
20
21/// Represents a node in the graph. Each node has an associated [Task], and one or more [NextNode]s.
22#[derive(Debug)]
23pub struct Node {
24    task: Task,
25    /// The successors of this node in the graph. One or more of these nodes may be executed
26    /// as the [next nodes(s)](Node::next). The actual nodes to be executed are determined by the
27    /// [decider tasks](crate::task::Task::TrivialDecider) in [Node::run], if any.
28    ///
29    /// If a node has multiple successors and it's task is not a decider, all successors will be executed.
30    ///
31    /// The successors are fixed when the graph is constructed, while the [next nodes](Node::next) may change
32    /// during execution.
33    pub successors: NextNode,
34    /// The next node(s) to execute after this one. This set of next nodes might be reduced from
35    /// [successors](Node::successors) by [decider tasks](crate::task::Task::TrivialDecider) in [Node::run].
36    pub next: NextNode,
37}
38
39impl Node {
40    /// Constructs a new node with a given task and successor node(s).
41    pub fn new(task: Task, successors: NextNode) -> Self {
42        Self {
43            task,
44            successors: successors.clone(),
45            next: successors,
46        }
47    }
48
49    /// Runs the node's [task](Task) with the given input [NodeValue], returning the output [NodeValue].
50    /// This method is idempotent, and can be called multiple times with the same input to produce the same output,
51    /// as long as the task itself is idempotent, and the node's [successors](Node::successors) are unchanged.
52    ///
53    /// If the node's task is a [decider](crate::task::Task::TrivialDecider), it may also modify the node's
54    /// [next nodes](Node::next).
55    ///
56    /// This method runs the task directly, and is only non-synchronous if the task is one of the
57    /// [async variants](crate::task::Task::Async). For that reason, the [Result] is returned directly, rather
58    /// than wrapped in a future, and corresponds to the result returned by the task itself (or the return value
59    /// wrapped in a result, for "trivial" tasks).
60    pub async fn run(self: &mut Self, prev_result: NodeValue) -> Result<NodeValue> {
61        // reset next nodes to ensure runs are idempotent
62        self.next = self.successors.clone();
63        match &mut self.task {
64            Task::Trivial(f) => Ok(f(prev_result)),
65            Task::Generic(f) => f(prev_result),
66            Task::Async(f) => f(prev_result).await,
67            Task::TrivialDecider(f) => {
68                let (result, next) = f(prev_result, self.next.clone());
69                self.next = next;
70                Ok(result)
71            }
72            Task::GenericDecider(f) => {
73                let (result, next) = f(prev_result, self.next.clone())?;
74                self.next = next;
75                Ok(result)
76            }
77            Task::AsyncDecider(f) => {
78                let (result, next) = f(prev_result, self.next.clone()).await?;
79                self.next = next;
80                Ok(result)
81            }
82            Task::Custom(runnable) => runnable.run(prev_result),
83            Task::CustomDecider(runnable_decider) => {
84                let (result, next) = runnable_decider.run(prev_result, self.next.clone())?;
85                self.next = next;
86                Ok(result)
87            }
88            Task::AsyncCustom(async_runnable) => async_runnable.run(prev_result).await,
89            Task::AsyncCustomDecider(async_runnable_decider) => {
90                let (result, next) = async_runnable_decider.run(prev_result, self.next.clone()).await?;
91                self.next = next;
92                Ok(result)
93            }
94        }
95    }
96}
97
98/// Represents a dynamically-typed value which can be passed between nodes in the graph.
99/// This enum supports a variety of primitive types, as well as lists and byte arrays.
100///
101/// For more complex types, consider using [NodeValue::Any].
102#[derive(Debug, Clone)]
103pub enum DynValue {
104    Int(i32),
105    Float(f32),
106    Bool(bool),
107    Char(char),
108    UInt(u32),
109    Double(f64),
110    USize(usize),
111    Str(String),
112    List(Vec<DynValue>),
113    Bytes(Vec<u8>),
114}
115
116/// Represents the value produced or consumed by a [Node]. This enum can represent the absence of a
117/// value ([NodeValue::Void]), a dynamically-typed value ([NodeValue::Value]), or any arbitrary type
118/// ([NodeValue::Any]).
119///
120/// Please note that the `Any` variant uses `Arc<Box<...>>`, which means that cache-friendliness is sacrificed
121/// for flexibility. Use this variant sparingly, especially in hot paths. However, it's still obviously
122/// preferrable to serializing complex types into a [DynValue] or similar.
123#[derive(Debug, Clone)]
124pub enum NodeValue {
125    Void,
126    Value(DynValue),
127    Any(Arc<Box<dyn std::any::Any + Send + Sync>>),
128}