Skip to main content

a3s_flow/nodes/
context_set.rs

1//! `"context-set"` node — write key-value pairs into the shared execution context.
2//!
3//! Works like the `"assign"` node but writes to [`ExecContext::context`] (the
4//! shared `Arc<RwLock<HashMap>>`) instead of the flow-level variable scope.
5//! Any downstream node — or an external caller via [`FlowEngine::get_context`] —
6//! can read the values immediately after this node completes.
7//!
8//! String values are rendered as Jinja2 templates using the same context as the
9//! `"llm"` node (variables + upstream inputs). Non-string JSON values are stored
10//! as-is.
11//!
12//! # Config schema
13//!
14//! ```json
15//! {
16//!   "assigns": {
17//!     "user_id":  "{{ start.user_id }}",
18//!     "attempt":  1,
19//!     "metadata": { "source": "webhook" }
20//!   }
21//! }
22//! ```
23//!
24//! | Field | Type | Required | Description |
25//! |-------|------|:--------:|-------------|
26//! | `assigns` | object | ✅ | Map of context keys to Jinja2 templates (strings) or literal JSON values |
27//!
28//! # Output schema
29//!
30//! Returns the resolved key-value map that was written to the context:
31//!
32//! ```json
33//! { "user_id": "u_123", "attempt": 1, "metadata": { "source": "webhook" } }
34//! ```
35
36use async_trait::async_trait;
37use serde_json::Value;
38
39use crate::error::{FlowError, Result};
40use crate::node::{ExecContext, Node};
41
42use super::llm::{build_jinja_context, render};
43
44/// Context-set node — writes key-value pairs into the shared execution context.
45pub struct ContextSetNode;
46
47#[async_trait]
48impl Node for ContextSetNode {
49    fn node_type(&self) -> &str {
50        "context-set"
51    }
52
53    async fn execute(&self, ctx: ExecContext) -> Result<Value> {
54        let assigns = ctx.data["assigns"].as_object().ok_or_else(|| {
55            FlowError::InvalidDefinition("context-set: missing or non-object data.assigns".into())
56        })?;
57
58        let jinja_ctx = build_jinja_context(&ctx);
59        let mut out = serde_json::Map::new();
60
61        for (key, template_val) in assigns {
62            let resolved = if let Some(s) = template_val.as_str() {
63                Value::String(render(s, &jinja_ctx)?)
64            } else {
65                template_val.clone()
66            };
67            out.insert(key.clone(), resolved.clone());
68
69            // Write directly into the shared context.
70            ctx.context.write().unwrap().insert(key.clone(), resolved);
71        }
72
73        Ok(Value::Object(out))
74    }
75}
76
77// ── Tests ──────────────────────────────────────────────────────────────────
78
79#[cfg(test)]
80mod tests {
81    use super::*;
82    use serde_json::json;
83    use std::collections::HashMap;
84    use std::sync::{Arc, RwLock};
85
86    fn ctx_with_context(data: Value, shared: Arc<RwLock<HashMap<String, Value>>>) -> ExecContext {
87        ExecContext {
88            data,
89            context: shared,
90            ..Default::default()
91        }
92    }
93
94    #[tokio::test]
95    async fn writes_literal_values_to_context() {
96        let shared = Arc::new(RwLock::new(HashMap::new()));
97        let node = ContextSetNode;
98        let out = node
99            .execute(ctx_with_context(
100                json!({ "assigns": { "count": 42, "flag": true } }),
101                Arc::clone(&shared),
102            ))
103            .await
104            .unwrap();
105
106        assert_eq!(out["count"], json!(42));
107        assert_eq!(out["flag"], json!(true));
108
109        let ctx = shared.read().unwrap();
110        assert_eq!(ctx["count"], json!(42));
111        assert_eq!(ctx["flag"], json!(true));
112    }
113
114    #[tokio::test]
115    async fn renders_string_template_into_context() {
116        let shared = Arc::new(RwLock::new(HashMap::new()));
117        let node = ContextSetNode;
118        node.execute(ExecContext {
119            data: json!({ "assigns": { "greeting": "Hello, {{ name }}!" } }),
120            variables: HashMap::from([("name".into(), json!("Alice"))]),
121            context: Arc::clone(&shared),
122            ..Default::default()
123        })
124        .await
125        .unwrap();
126
127        assert_eq!(shared.read().unwrap()["greeting"], json!("Hello, Alice!"));
128    }
129
130    #[tokio::test]
131    async fn missing_assigns_returns_error() {
132        let node = ContextSetNode;
133        let err = node
134            .execute(ExecContext {
135                data: json!({}),
136                ..Default::default()
137            })
138            .await
139            .unwrap_err();
140        assert!(matches!(err, FlowError::InvalidDefinition(_)));
141    }
142
143    #[tokio::test]
144    async fn overwrites_existing_context_entry() {
145        let shared = Arc::new(RwLock::new(HashMap::from([("key".into(), json!("old"))])));
146        let node = ContextSetNode;
147        node.execute(ctx_with_context(
148            json!({ "assigns": { "key": "new" } }),
149            Arc::clone(&shared),
150        ))
151        .await
152        .unwrap();
153
154        assert_eq!(shared.read().unwrap()["key"], json!("new"));
155    }
156}