pub trait Node<TState, TResourceKey = Cow<'static, str>>: Send + Syncwhere
TState: Clone + Debug + Send + Sync + 'static,
TResourceKey: Hash + Eq + Send + Sync + 'static,{
type PrepResult: Send + Sync;
type ExecResult: Send + Sync;
// Required methods
fn prep<'life0, 'life1, 'async_trait>(
&'life0 self,
res: &'life1 Resources<TResourceKey>,
) -> Pin<Box<dyn Future<Output = Result<Self::PrepResult, CanoError>> + Send + 'async_trait>>
where Self: Sync + 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn exec<'life0, 'async_trait>(
&'life0 self,
prep_res: Self::PrepResult,
) -> Pin<Box<dyn Future<Output = Self::ExecResult> + Send + 'async_trait>>
where Self: Sync + 'async_trait,
'life0: 'async_trait;
fn post<'life0, 'life1, 'async_trait>(
&'life0 self,
res: &'life1 Resources<TResourceKey>,
exec_res: Self::ExecResult,
) -> Pin<Box<dyn Future<Output = Result<TState, CanoError>> + Send + 'async_trait>>
where Self: Sync + 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
// Provided methods
fn config(&self) -> TaskConfig { ... }
fn run<'life0, 'life1, 'async_trait>(
&'life0 self,
res: &'life1 Resources<TResourceKey>,
) -> Pin<Box<dyn Future<Output = Result<TState, CanoError>> + Send + 'async_trait>>
where Self: Sync + 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn run_with_retries<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
res: &'life1 Resources<TResourceKey>,
config: &'life2 TaskConfig,
) -> Pin<Box<dyn Future<Output = Result<TState, CanoError>> + Send + 'async_trait>>
where Self: Sync + 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait { ... }
}Expand description
Node trait for workflow processing
This trait defines the core interface that all workflow nodes must implement. It provides type flexibility while maintaining performance and type safety.
§Generic Types
TState: The return type from the post method (typically an enum for workflow control)TResourceKey: The key type used to look up resources (defaults toCow<'static, str>— accepts&'static strliterals without allocating, plus ownedStringkeys for runtime-built names)PrepResult: The result type from theprepphase, passed toexec.ExecResult: The result type from theexecphase, passed topost.
§Node Lifecycle
Each node follows a three-phase execution lifecycle:
prep: Preparation phase - setup and data loadingexec: Execution phase - main processing logicpost: Post-processing phase - cleanup and result handling
The run method orchestrates these phases automatically.
§Example
use cano::prelude::*;
// A params struct that carries node configuration as a resource.
#[derive(Resource)]
struct NodeParams {
batch_size: usize,
}
struct MyNode;
#[node]
impl Node<String> for MyNode {
type PrepResult = Vec<u32>;
type ExecResult = u32;
fn config(&self) -> TaskConfig {
TaskConfig::minimal() // Use minimal retries for fast execution
}
async fn prep(&self, res: &Resources) -> Result<Self::PrepResult, CanoError> {
// Read params and previously stored data from resources.
let params = res.get::<NodeParams, _>("params")?;
let store = res.get::<MemoryStore, _>("store")?;
let data: Vec<u32> = store.get("input")?;
// Take only up to batch_size items.
Ok(data.into_iter().take(params.batch_size).collect())
}
async fn exec(&self, prep_res: Self::PrepResult) -> Self::ExecResult {
// Pure computation: no resource access, easy to test in isolation.
prep_res.iter().sum()
}
async fn post(&self, res: &Resources, exec_res: Self::ExecResult)
-> Result<String, CanoError> {
// Write result back to the store so downstream nodes can read it.
let store = res.get::<MemoryStore, _>("store")?;
store.put("sum", exec_res)?;
Ok("done".to_string())
}
}Required Associated Types§
Sourcetype PrepResult: Send + Sync
type PrepResult: Send + Sync
Result type from the prep phase
Sourcetype ExecResult: Send + Sync
type ExecResult: Send + Sync
Result type from the exec phase
Required Methods§
Sourcefn prep<'life0, 'life1, 'async_trait>(
&'life0 self,
res: &'life1 Resources<TResourceKey>,
) -> Pin<Box<dyn Future<Output = Result<Self::PrepResult, CanoError>> + Send + 'async_trait>>where
Self: Sync + 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn prep<'life0, 'life1, 'async_trait>(
&'life0 self,
res: &'life1 Resources<TResourceKey>,
) -> Pin<Box<dyn Future<Output = Result<Self::PrepResult, CanoError>> + Send + 'async_trait>>where
Self: Sync + 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Preparation phase - load data and setup resources
This is the first phase of node execution. Use it to:
- Load data from resources that was left by previous nodes
- Validate inputs and parameters
- Setup resources needed for execution
- Prepare any data structures
The result of this phase is passed to the exec method.
Sourcefn exec<'life0, 'async_trait>(
&'life0 self,
prep_res: Self::PrepResult,
) -> Pin<Box<dyn Future<Output = Self::ExecResult> + Send + 'async_trait>>where
Self: Sync + 'async_trait,
'life0: 'async_trait,
fn exec<'life0, 'async_trait>(
&'life0 self,
prep_res: Self::PrepResult,
) -> Pin<Box<dyn Future<Output = Self::ExecResult> + Send + 'async_trait>>where
Self: Sync + 'async_trait,
'life0: 'async_trait,
Execution phase - main processing logic
This is the core processing phase where the main business logic runs.
This phase doesn’t have access to resources - it only receives the result
from the prep phase and produces a result for the post phase.
Benefits of this design:
- Clear separation of concerns
- Easier testing (pure function)
- Better performance (no resource access during processing)
§Retry Note
On any phase failure, the entire prep → exec → post pipeline restarts.
This method must be idempotent: if it has side effects (e.g. sending a network
request or writing to an external system), those side effects will be repeated
on every retry attempt.
Sourcefn post<'life0, 'life1, 'async_trait>(
&'life0 self,
res: &'life1 Resources<TResourceKey>,
exec_res: Self::ExecResult,
) -> Pin<Box<dyn Future<Output = Result<TState, CanoError>> + Send + 'async_trait>>where
Self: Sync + 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn post<'life0, 'life1, 'async_trait>(
&'life0 self,
res: &'life1 Resources<TResourceKey>,
exec_res: Self::ExecResult,
) -> Pin<Box<dyn Future<Output = Result<TState, CanoError>> + Send + 'async_trait>>where
Self: Sync + 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Post-processing phase - cleanup and result handling
This is the final phase of node execution. Use it to:
- Store results for the next node to use
- Clean up resources
- Determine the next action/node to run
- Handle errors from the exec phase
This method returns a typed value that determines what happens next in the workflow.
Provided Methods§
Sourcefn config(&self) -> TaskConfig
fn config(&self) -> TaskConfig
Get the node configuration that controls execution behavior
Returns the TaskConfig that determines how this node should be executed.
The default implementation returns TaskConfig::default() which configures
the node with standard retry logic.
Override this method to customize execution behavior:
- Use
TaskConfig::minimal()for fast-failing nodes with minimal retries - Use
TaskConfig::new().with_fixed_retry(n, duration)for custom retry behavior - Return a custom configuration with specific retry/parameter settings
Sourcefn run<'life0, 'life1, 'async_trait>(
&'life0 self,
res: &'life1 Resources<TResourceKey>,
) -> Pin<Box<dyn Future<Output = Result<TState, CanoError>> + Send + 'async_trait>>where
Self: Sync + 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn run<'life0, 'life1, 'async_trait>(
&'life0 self,
res: &'life1 Resources<TResourceKey>,
) -> Pin<Box<dyn Future<Output = Result<TState, CanoError>> + Send + 'async_trait>>where
Self: Sync + 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Run the complete node lifecycle with configuration-driven execution.
Orchestrates prep → exec → post with the retry policy from Node::config.
Only prep and post failures are retried; exec is infallible by design (returns
Self::ExecResult directly). You can override this method for completely custom
orchestration.
§Workflow integration
When a Node is registered with a crate::workflow::Workflow, the workflow engine
uses the blanket crate::task::Task impl rather than calling this method directly.
That blanket impl runs a single prep → exec → post pass per attempt and
delegates retries to the outer run_with_retries call in the workflow dispatcher.
If you call Node::run directly (outside a workflow), retries run here, which is
correct for standalone use. Do not call Node::run inside a custom Task::run
implementation — that would double-retry the node.
§Errors
- When retries are disabled (
max_attempts <= 1), the original error frompreporpostis propagated unchanged (typicallyCanoError::PreparationorCanoError::NodeExecution). - When retries are enabled and the attempt limit is reached, the failure is
wrapped in
CanoError::RetryExhaustedwith the underlying message inlined.
Sourcefn run_with_retries<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
res: &'life1 Resources<TResourceKey>,
config: &'life2 TaskConfig,
) -> Pin<Box<dyn Future<Output = Result<TState, CanoError>> + Send + 'async_trait>>where
Self: Sync + 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn run_with_retries<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
res: &'life1 Resources<TResourceKey>,
config: &'life2 TaskConfig,
) -> Pin<Box<dyn Future<Output = Result<TState, CanoError>> + Send + 'async_trait>>where
Self: Sync + 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Internal method to run the node lifecycle with retry logic
Executes the three phases (prep → exec → post) in sequence, retrying the
entire pipeline from prep whenever any phase returns an error.
§Full-Pipeline Retry Semantics
Unlike retry strategies that only re-run the failing step, this method restarts from the very beginning on each attempt:
- If
prepfails → the whole pipeline retries fromprep. - If
postfails →prepandexecboth re-run beforepostis tried again.
This means all three phases must be idempotent when retries are enabled.
Any side effects (network calls, writes to external systems, etc.) in prep or
exec will be repeated on every retry attempt.
The number of attempts and delay between them are controlled by the
TaskConfig returned from Node::config.