tasksitter/node.rs
1use crate::error::Result;
2use crate::task::Task;
3use std::sync::Arc;
4
5/// Represents the next [Node]s (by index) to execute after the current node.
6///
7/// The `usize` corresponds to an index of a [Node] in the [Workflow](crate::workflow::Workflow)'s
8/// collection of `nodes`.
9/// These indices are known before the graph is constructed, and the indices are returned
10/// by [Workflow::add_node](crate::workflow::Workflow::add_node) as confirmation.
11#[derive(Debug, Clone)]
12pub enum NextNode {
13 /// Execute a single next node.
14 Single(usize),
15 /// Execute multiple next nodes.
16 Multiple(Vec<usize>),
17 /// No next node to execute (signifies an end-node).
18 None,
19}
20
21/// Represents a node in the graph. Each node has an associated [Task], and one or more [NextNode]s.
22#[derive(Debug)]
23pub struct Node {
24 task: Task,
25 /// The successors of this node in the graph. One or more of these nodes may be executed
26 /// as the [next nodes(s)](Node::next). The actual nodes to be executed are determined by the
27 /// [decider tasks](crate::task::Task::TrivialDecider) in [Node::run], if any.
28 ///
29 /// If a node has multiple successors and it's task is not a decider, all successors will be executed.
30 ///
31 /// The successors are fixed when the graph is constructed, while the [next nodes](Node::next) may change
32 /// during execution.
33 pub successors: NextNode,
34 /// The next node(s) to execute after this one. This set of next nodes might be reduced from
35 /// [successors](Node::successors) by [decider tasks](crate::task::Task::TrivialDecider) in [Node::run].
36 pub next: NextNode,
37}
38
39impl Node {
40 /// Constructs a new node with a given task and successor node(s).
41 pub fn new(task: Task, successors: NextNode) -> Self {
42 Self {
43 task,
44 successors: successors.clone(),
45 next: successors,
46 }
47 }
48
49 /// Runs the node's [task](Task) with the given input [NodeValue], returning the output [NodeValue].
50 /// This method is idempotent, and can be called multiple times with the same input to produce the same output,
51 /// as long as the task itself is idempotent, and the node's [successors](Node::successors) are unchanged.
52 ///
53 /// If the node's task is a [decider](crate::task::Task::TrivialDecider), it may also modify the node's
54 /// [next nodes](Node::next).
55 ///
56 /// This method runs the task directly, and is only non-synchronous if the task is one of the
57 /// [async variants](crate::task::Task::Async). For that reason, the [Result] is returned directly, rather
58 /// than wrapped in a future, and corresponds to the result returned by the task itself (or the return value
59 /// wrapped in a result, for "trivial" tasks).
60 pub async fn run(self: &mut Self, prev_result: NodeValue) -> Result<NodeValue> {
61 // reset next nodes to ensure runs are idempotent
62 self.next = self.successors.clone();
63 match &mut self.task {
64 Task::Trivial(f) => Ok(f(prev_result)),
65 Task::Generic(f) => f(prev_result),
66 Task::Async(f) => f(prev_result).await,
67 Task::TrivialDecider(f) => {
68 let (result, next) = f(prev_result, self.next.clone());
69 self.next = next;
70 Ok(result)
71 }
72 Task::GenericDecider(f) => {
73 let (result, next) = f(prev_result, self.next.clone())?;
74 self.next = next;
75 Ok(result)
76 }
77 Task::AsyncDecider(f) => {
78 let (result, next) = f(prev_result, self.next.clone()).await?;
79 self.next = next;
80 Ok(result)
81 }
82 Task::Custom(runnable) => runnable.run(prev_result),
83 Task::CustomDecider(runnable_decider) => {
84 let (result, next) = runnable_decider.run(prev_result, self.next.clone())?;
85 self.next = next;
86 Ok(result)
87 }
88 Task::AsyncCustom(async_runnable) => async_runnable.run(prev_result).await,
89 Task::AsyncCustomDecider(async_runnable_decider) => {
90 let (result, next) = async_runnable_decider.run(prev_result, self.next.clone()).await?;
91 self.next = next;
92 Ok(result)
93 }
94 }
95 }
96}
97
98/// Represents a dynamically-typed value which can be passed between nodes in the graph.
99/// This enum supports a variety of primitive types, as well as lists and byte arrays.
100///
101/// For more complex types, consider using [NodeValue::Any].
102#[derive(Debug, Clone)]
103pub enum DynValue {
104 Int(i32),
105 Float(f32),
106 Bool(bool),
107 Char(char),
108 UInt(u32),
109 Double(f64),
110 USize(usize),
111 Str(String),
112 List(Vec<DynValue>),
113 Bytes(Vec<u8>),
114}
115
116/// Represents the value produced or consumed by a [Node]. This enum can represent the absence of a
117/// value ([NodeValue::Void]), a dynamically-typed value ([NodeValue::Value]), or any arbitrary type
118/// ([NodeValue::Any]).
119///
120/// Please note that the `Any` variant uses `Arc<Box<...>>`, which means that cache-friendliness is sacrificed
121/// for flexibility. Use this variant sparingly, especially in hot paths. However, it's still obviously
122/// preferrable to serializing complex types into a [DynValue] or similar.
123#[derive(Debug, Clone)]
124pub enum NodeValue {
125 Void,
126 Value(DynValue),
127 Any(Arc<Box<dyn std::any::Any + Send + Sync>>),
128}