Skip to main content

a3s_flow/
node.rs

1//! Node trait and execution context.
2//!
3//! Every workflow step implements [`Node`]. The engine calls [`Node::execute`]
4//! with an [`ExecContext`] containing the node's configuration, upstream outputs,
5//! global variables, and a reference to the active [`NodeRegistry`] — which
6//! lets nodes like `"iteration"` spin up sub-flow runners without holding their
7//! own registry reference.
8
9use std::collections::HashMap;
10use std::sync::{Arc, RwLock};
11
12use async_trait::async_trait;
13use serde::Deserialize;
14use serde_json::Value;
15
16use crate::error::Result;
17use crate::flow_store::FlowStore;
18use crate::registry::NodeRegistry;
19
20/// Per-node retry configuration, parsed from `data["retry"]`.
21///
22/// When present, the runner will re-attempt a failed node up to `max_attempts`
23/// times (including the first attempt) with an optional exponential backoff.
24///
25/// # Example (in flow definition)
26/// ```json
27/// {
28///   "id": "fetch",
29///   "type": "http-request",
30///   "data": {
31///     "url": "https://api.example.com/items",
32///     "retry": { "max_attempts": 3, "backoff_ms": 500 }
33///   }
34/// }
35/// ```
36#[derive(Debug, Clone, Deserialize)]
37pub struct RetryPolicy {
38    /// Maximum number of attempts (including the first). Minimum effective value: 1.
39    pub max_attempts: u32,
40    /// Base delay in milliseconds between attempts. Each subsequent retry waits
41    /// `backoff_ms * 2^(attempt-1)` milliseconds (capped at 64× the base).
42    /// Defaults to 0 (no delay).
43    #[serde(default)]
44    pub backoff_ms: u64,
45}
46
47/// Runtime context passed to every node during execution.
48///
49/// - `data` — the static configuration declared in the flow definition's `data` field.
50/// - `inputs` — outputs of all upstream nodes, keyed by node ID.
51/// - `variables` — global flow-level variables (env, secrets, user inputs).
52/// - `context` — shared mutable context for cross-node state (similar to Dify's global context).
53///   Nodes can read and write to this context using `context.read()` and `context.write()`.
54/// - `registry` — the active node registry; available to nodes that need to
55///   execute sub-flows (e.g. `"iteration"`, `"sub-flow"`).
56/// - `flow_store` — optional named flow definition store; required by the
57///   `"sub-flow"` node to load its target definition by name.
58pub struct ExecContext {
59    /// Node configuration from the flow definition's `data` field.
60    pub data: Value,
61    /// Outputs of upstream nodes, keyed by node ID.
62    pub inputs: HashMap<String, Value>,
63    /// Global flow variables (env, secrets, user inputs).
64    pub variables: HashMap<String, Value>,
65    /// Shared mutable context for cross-node state (Dify-style global context).
66    /// Use `context.read()` to read and `context.write()` to modify.
67    pub context: Arc<RwLock<HashMap<String, Value>>>,
68    /// Active node registry — allows nodes to run sub-flows.
69    pub registry: Arc<NodeRegistry>,
70    /// Named flow definition store — available when the engine has one configured.
71    pub flow_store: Option<Arc<dyn FlowStore>>,
72}
73
74impl Clone for ExecContext {
75    fn clone(&self) -> Self {
76        Self {
77            data: self.data.clone(),
78            inputs: self.inputs.clone(),
79            variables: self.variables.clone(),
80            context: Arc::clone(&self.context),
81            registry: Arc::clone(&self.registry),
82            flow_store: self.flow_store.clone(),
83        }
84    }
85}
86
87impl std::fmt::Debug for ExecContext {
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        f.debug_struct("ExecContext")
90            .field("data", &self.data)
91            .field("inputs", &self.inputs)
92            .field("variables", &self.variables)
93            .field("context", &"<RwLock>")
94            .finish_non_exhaustive()
95    }
96}
97
98impl Default for ExecContext {
99    fn default() -> Self {
100        Self {
101            data: Value::Null,
102            inputs: HashMap::new(),
103            variables: HashMap::new(),
104            context: Arc::new(RwLock::new(HashMap::new())),
105            registry: Arc::new(NodeRegistry::with_defaults()),
106            flow_store: None,
107        }
108    }
109}
110
111/// The extension point for workflow nodes.
112///
113/// Implement this trait to add custom node types (HTTP call, LLM prompt,
114/// script, condition branch, sub-flow, etc.). Every implementation must be
115/// `Send + Sync` so the runner can execute nodes concurrently across threads.
116#[async_trait]
117pub trait Node: Send + Sync {
118    /// The node type identifier matched against the `"type"` field in the
119    /// flow definition and looked up in [`NodeRegistry`].
120    fn node_type(&self) -> &str;
121
122    /// Execute the node and return a JSON output value.
123    ///
124    /// The output is stored under this node's ID and passed as `inputs` to
125    /// all downstream nodes.
126    async fn execute(&self, ctx: ExecContext) -> Result<Value>;
127}