Skip to main content

Node

Trait Node 

Source
pub trait Node: Send + Sync {
    // Required method
    fn run<'life0, 'async_trait>(
        &'life0 self,
        snapshot: StateSnapshot,
        ctx: NodeContext,
    ) -> Pin<Box<dyn Future<Output = Result<NodePartial, NodeError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
}
Expand description

Core trait defining executable workflow nodes.

The Node trait represents a single unit of computation within a workflow. Nodes receive the current state snapshot and execution context, perform their work, and return partial state updates.

§Design Principles

  • Stateless: Nodes should be stateless and deterministic
  • Focused: Each node should have a single, well-defined responsibility
  • Composable: Nodes should be easily combined into larger workflows
  • Observable: Use the context to emit events for monitoring and debugging

§Error Handling

Nodes can handle errors in two ways:

  1. Fatal errors: Return Err(NodeError) to stop workflow execution
  2. Recoverable errors: Add to NodePartial.errors and return Ok

§Examples

use weavegraph::node::{Node, NodeContext, NodePartial, NodeError};
use weavegraph::state::StateSnapshot;
use weavegraph::channels::errors::{ErrorEvent, WeaveError};
use async_trait::async_trait;

struct ValidationNode {
    required_fields: Vec<String>,
}

#[async_trait]
impl Node for ValidationNode {
    async fn run(&self, snapshot: StateSnapshot, ctx: NodeContext) -> Result<NodePartial, NodeError> {
        ctx.emit("validation", "Starting validation")?;

        for field in &self.required_fields {
            if !snapshot.extra.contains_key(field) {
                return Err(NodeError::ValidationFailed(format!("Missing field: {}", field)));
            }
        }

        // Demonstrate the fluent API for success with warnings
        if snapshot.messages.is_empty() {
            let warning = ErrorEvent {
                error: WeaveError {
                    message: "No messages to validate, but continuing".to_string(),
                    ..Default::default()
                },
                ..Default::default()
            };
            return Ok(NodePartial::new().with_errors(vec![warning]));
        }

        Ok(NodePartial::default())
    }
}

Required Methods§

Source

fn run<'life0, 'async_trait>( &'life0 self, snapshot: StateSnapshot, ctx: NodeContext, ) -> Pin<Box<dyn Future<Output = Result<NodePartial, NodeError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Execute this node with the given state snapshot and context.

Implementors§