a3s_flow/node.rs
1//! Node trait and execution context.
2//!
3//! Every workflow step implements [`Node`]. The engine calls [`Node::execute`]
4//! with an [`ExecContext`] containing the node's configuration, upstream outputs,
5//! global variables, and a reference to the active [`NodeRegistry`] — which
6//! lets nodes like `"iteration"` spin up sub-flow runners without holding their
7//! own registry reference.
8
9use std::collections::HashMap;
10use std::sync::{Arc, RwLock};
11
12use async_trait::async_trait;
13use serde::Deserialize;
14use serde_json::Value;
15
16use crate::error::Result;
17use crate::flow_store::FlowStore;
18use crate::registry::NodeRegistry;
19
20/// Per-node retry configuration, parsed from `data["retry"]`.
21///
22/// When present, the runner will re-attempt a failed node up to `max_attempts`
23/// times (including the first attempt) with an optional exponential backoff.
24///
25/// # Example (in flow definition)
26/// ```json
27/// {
28/// "id": "fetch",
29/// "type": "http-request",
30/// "data": {
31/// "url": "https://api.example.com/items",
32/// "retry": { "max_attempts": 3, "backoff_ms": 500 }
33/// }
34/// }
35/// ```
36#[derive(Debug, Clone, Deserialize)]
37pub struct RetryPolicy {
38 /// Maximum number of attempts (including the first). Minimum effective value: 1.
39 pub max_attempts: u32,
40 /// Base delay in milliseconds between attempts. Each subsequent retry waits
41 /// `backoff_ms * 2^(attempt-1)` milliseconds (capped at 64× the base).
42 /// Defaults to 0 (no delay).
43 #[serde(default)]
44 pub backoff_ms: u64,
45}
46
47/// Runtime context passed to every node during execution.
48///
49/// - `data` — the static configuration declared in the flow definition's `data` field.
50/// - `inputs` — outputs of all upstream nodes, keyed by node ID.
51/// - `variables` — global flow-level variables (env, secrets, user inputs).
52/// - `context` — shared mutable context for cross-node state (similar to Dify's global context).
53/// Nodes can read and write to this context using `context.read()` and `context.write()`.
54/// - `registry` — the active node registry; available to nodes that need to
55/// execute sub-flows (e.g. `"iteration"`, `"sub-flow"`).
56/// - `flow_store` — optional named flow definition store; required by the
57/// `"sub-flow"` node to load its target definition by name.
58pub struct ExecContext {
59 /// Node configuration from the flow definition's `data` field.
60 pub data: Value,
61 /// Outputs of upstream nodes, keyed by node ID.
62 pub inputs: HashMap<String, Value>,
63 /// Global flow variables (env, secrets, user inputs).
64 pub variables: HashMap<String, Value>,
65 /// Shared mutable context for cross-node state (Dify-style global context).
66 /// Use `context.read()` to read and `context.write()` to modify.
67 pub context: Arc<RwLock<HashMap<String, Value>>>,
68 /// Active node registry — allows nodes to run sub-flows.
69 pub registry: Arc<NodeRegistry>,
70 /// Named flow definition store — available when the engine has one configured.
71 pub flow_store: Option<Arc<dyn FlowStore>>,
72}
73
74impl Clone for ExecContext {
75 fn clone(&self) -> Self {
76 Self {
77 data: self.data.clone(),
78 inputs: self.inputs.clone(),
79 variables: self.variables.clone(),
80 context: Arc::clone(&self.context),
81 registry: Arc::clone(&self.registry),
82 flow_store: self.flow_store.clone(),
83 }
84 }
85}
86
87impl std::fmt::Debug for ExecContext {
88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 f.debug_struct("ExecContext")
90 .field("data", &self.data)
91 .field("inputs", &self.inputs)
92 .field("variables", &self.variables)
93 .field("context", &"<RwLock>")
94 .finish_non_exhaustive()
95 }
96}
97
98impl Default for ExecContext {
99 fn default() -> Self {
100 Self {
101 data: Value::Null,
102 inputs: HashMap::new(),
103 variables: HashMap::new(),
104 context: Arc::new(RwLock::new(HashMap::new())),
105 registry: Arc::new(NodeRegistry::with_defaults()),
106 flow_store: None,
107 }
108 }
109}
110
111/// The extension point for workflow nodes.
112///
113/// Implement this trait to add custom node types (HTTP call, LLM prompt,
114/// script, condition branch, sub-flow, etc.). Every implementation must be
115/// `Send + Sync` so the runner can execute nodes concurrently across threads.
116#[async_trait]
117pub trait Node: Send + Sync {
118 /// The node type identifier matched against the `"type"` field in the
119 /// flow definition and looked up in [`NodeRegistry`].
120 fn node_type(&self) -> &str;
121
122 /// Execute the node and return a JSON output value.
123 ///
124 /// The output is stored under this node's ID and passed as `inputs` to
125 /// all downstream nodes.
126 async fn execute(&self, ctx: ExecContext) -> Result<Value>;
127}