xerv_nodes/flow/
wait.rs

1//! Wait node (human-in-the-loop).
2//!
3//! Pauses execution until an external signal is received.
4//! Enables approval workflows and manual intervention patterns.
5
6use std::collections::HashMap;
7use xerv_core::traits::{Context, Node, NodeFuture, NodeInfo, NodeOutput, Port, PortDirection};
8use xerv_core::types::RelPtr;
9use xerv_core::value::Value;
10
11/// Configuration for how the wait state is persisted.
12#[derive(Debug, Clone, Default)]
13pub enum WaitPersistence {
14    /// Store wait state in memory (development only).
15    #[default]
16    Memory,
17    /// Store wait state in Redis.
18    Redis {
19        /// Key prefix for Redis storage.
20        key_prefix: String,
21        /// TTL for the wait state (seconds).
22        ttl_seconds: u64,
23    },
24    /// Store wait state in a database.
25    Database {
26        /// Table name for wait states.
27        table: String,
28    },
29}
30
31/// Configuration for how the wait is resumed.
32#[derive(Debug, Clone, Default)]
33pub enum ResumeMethod {
34    /// Resume via webhook callback.
35    #[default]
36    Webhook,
37    /// Resume via API call with approval token.
38    ApiApproval {
39        /// Token required for approval.
40        token_field: String,
41    },
42    /// Resume after a timeout (auto-approve or auto-reject).
43    Timeout {
44        /// Timeout duration in seconds.
45        seconds: u64,
46        /// What happens on timeout.
47        on_timeout: TimeoutAction,
48    },
49}
50
51/// Action to take when a timeout occurs.
52#[derive(Debug, Clone, Copy, Default)]
53pub enum TimeoutAction {
54    /// Auto-approve and continue.
55    Approve,
56    /// Auto-reject and emit to error port.
57    #[default]
58    Reject,
59    /// Escalate (emit to escalate port).
60    Escalate,
61}
62
63/// Wait node - human-in-the-loop.
64///
65/// Pauses flow execution until an external signal (webhook, API call, etc.)
66/// is received. The current state is checkpointed and can be resumed later.
67///
68/// # Ports
69/// - Input: "in" - Data to preserve during wait
70/// - Output: "out" - Emitted when approved/resumed
71/// - Output: "rejected" - Emitted when rejected
72/// - Output: "escalated" - Emitted when escalated
73/// - Output: "error" - Emitted on errors
74///
75/// # Example Configuration
76/// ```yaml
77/// nodes:
78///   await_approval:
79///     type: std::wait
80///     config:
81///       hook_id: order_approval_${trace_id}
82///       persistence: redis
83///       resume_method: webhook
84///       timeout_seconds: 86400  # 24 hours
85///       on_timeout: reject
86///       metadata:
87///         approver_email: ${config.approval_email}
88///         order_id: ${order.id}
89///     inputs:
90///       - from: validate_order.out -> in
91///     outputs:
92///       out: -> process_order.in
93///       rejected: -> notify_rejection.in
94/// ```
95#[derive(Debug)]
96pub struct WaitNode {
97    /// Unique identifier for this wait hook.
98    hook_id: String,
99    /// How to persist the wait state.
100    persistence: WaitPersistence,
101    /// How the wait can be resumed.
102    resume_method: ResumeMethod,
103    /// Optional metadata to include with the wait notification.
104    metadata_fields: Vec<String>,
105}
106
107impl WaitNode {
108    /// Create a wait node with the given hook ID.
109    pub fn new(hook_id: impl Into<String>) -> Self {
110        Self {
111            hook_id: hook_id.into(),
112            persistence: WaitPersistence::Memory,
113            resume_method: ResumeMethod::Webhook,
114            metadata_fields: Vec::new(),
115        }
116    }
117
118    /// Create a wait node with webhook resumption.
119    pub fn webhook(hook_id: impl Into<String>) -> Self {
120        Self::new(hook_id)
121    }
122
123    /// Create a wait node with Redis persistence.
124    pub fn with_redis(
125        hook_id: impl Into<String>,
126        key_prefix: impl Into<String>,
127        ttl_seconds: u64,
128    ) -> Self {
129        Self {
130            hook_id: hook_id.into(),
131            persistence: WaitPersistence::Redis {
132                key_prefix: key_prefix.into(),
133                ttl_seconds,
134            },
135            resume_method: ResumeMethod::Webhook,
136            metadata_fields: Vec::new(),
137        }
138    }
139
140    /// Set the persistence method.
141    pub fn with_persistence(mut self, persistence: WaitPersistence) -> Self {
142        self.persistence = persistence;
143        self
144    }
145
146    /// Set the resume method.
147    pub fn with_resume_method(mut self, method: ResumeMethod) -> Self {
148        self.resume_method = method;
149        self
150    }
151
152    /// Add a timeout with auto-action.
153    pub fn with_timeout(mut self, seconds: u64, on_timeout: TimeoutAction) -> Self {
154        self.resume_method = ResumeMethod::Timeout {
155            seconds,
156            on_timeout,
157        };
158        self
159    }
160
161    /// Add metadata fields to extract from input.
162    pub fn with_metadata(mut self, fields: Vec<String>) -> Self {
163        self.metadata_fields = fields;
164        self
165    }
166
167    /// Extract metadata from input for the wait notification.
168    fn extract_metadata(&self, input: &Value) -> serde_json::Map<String, serde_json::Value> {
169        let mut metadata = serde_json::Map::new();
170        for field in &self.metadata_fields {
171            if let Some(value) = input.get_field(field) {
172                let key = field.split('.').last().unwrap_or(field);
173                metadata.insert(key.to_string(), value.into_inner());
174            }
175        }
176        metadata
177    }
178}
179
180impl Node for WaitNode {
181    fn info(&self) -> NodeInfo {
182        NodeInfo::new("std", "wait")
183            .with_description("Pause execution for human-in-the-loop approval")
184            .with_inputs(vec![Port::input("Any")])
185            .with_outputs(vec![
186                Port::named("out", PortDirection::Output, "Any")
187                    .with_description("Emitted when approved"),
188                Port::named("rejected", PortDirection::Output, "Any")
189                    .with_description("Emitted when rejected"),
190                Port::named("escalated", PortDirection::Output, "Any")
191                    .with_description("Emitted when escalated"),
192                Port::error(),
193            ])
194    }
195
196    fn execute<'a>(&'a self, ctx: Context, inputs: HashMap<String, RelPtr<()>>) -> NodeFuture<'a> {
197        Box::pin(async move {
198            let input = inputs.get("in").copied().unwrap_or_else(RelPtr::null);
199
200            // Read input for metadata extraction
201            let value = if input.is_null() {
202                Value::null()
203            } else {
204                match ctx.read_bytes(input) {
205                    Ok(bytes) => Value::from_bytes(&bytes).unwrap_or_else(|_| Value::null()),
206                    Err(_) => Value::null(),
207                }
208            };
209
210            // Extract metadata
211            let metadata = self.extract_metadata(&value);
212
213            // In a full implementation, this would:
214            // 1. Flush the arena to disk
215            // 2. Write trace_id + hook_id to persistence store
216            // 3. Send notification (email, Slack, etc.) with approval links
217            // 4. Return a "waiting" status to the executor
218            // 5. The executor would unload the trace from memory
219            //
220            // Resumption would be handled by:
221            // 1. External webhook/API call with hook_id
222            // 2. Executor loads arena back from disk
223            // 3. Replays from WAL to get to this node
224            // 4. Continues execution based on approval decision
225
226            tracing::info!(
227                hook_id = %self.hook_id,
228                trace_id = %ctx.trace_id(),
229                persistence = ?self.persistence,
230                resume_method = ?self.resume_method,
231                metadata = ?metadata,
232                "Wait: pausing for approval"
233            );
234
235            // For now, we simulate immediate approval for testing
236            // In production, this would return a "waiting" signal
237            // that the executor handles specially.
238            //
239            // The wait node output includes:
240            // - Original input data
241            // - Wait state information
242            // - Metadata for the approval UI
243            let wait_state = Value::from(serde_json::json!({
244                "hook_id": self.hook_id,
245                "trace_id": ctx.trace_id().to_string(),
246                "status": "waiting",
247                "metadata": metadata,
248                "resume_url": format!("/api/v1/resume/{}", self.hook_id)
249            }));
250
251            // Write wait state to arena
252            let state_bytes = match wait_state.to_bytes() {
253                Ok(bytes) => bytes,
254                Err(e) => {
255                    return Ok(NodeOutput::error_with_message(format!(
256                        "Failed to serialize wait state: {}",
257                        e
258                    )));
259                }
260            };
261
262            let state_ptr = match ctx.write_bytes(&state_bytes) {
263                Ok(ptr) => ptr,
264                Err(e) => {
265                    return Ok(NodeOutput::error_with_message(format!(
266                        "Failed to write wait state: {}",
267                        e
268                    )));
269                }
270            };
271
272            // In a real implementation, we would return a special "waiting" output
273            // that tells the executor to suspend this trace.
274            // For now, we emit on "out" to simulate immediate approval.
275            Ok(NodeOutput::out(state_ptr))
276        })
277    }
278}
279
280#[cfg(test)]
281mod tests {
282    use super::*;
283    use serde_json::json;
284
285    #[test]
286    fn wait_node_info() {
287        let node = WaitNode::new("test_hook");
288        let info = node.info();
289
290        assert_eq!(info.name, "std::wait");
291        assert_eq!(info.inputs.len(), 1);
292        assert_eq!(info.outputs.len(), 4);
293        assert_eq!(info.outputs[0].name, "out");
294        assert_eq!(info.outputs[1].name, "rejected");
295        assert_eq!(info.outputs[2].name, "escalated");
296        assert_eq!(info.outputs[3].name, "error");
297    }
298
299    #[test]
300    fn wait_persistence_default() {
301        let persistence = WaitPersistence::default();
302        assert!(matches!(persistence, WaitPersistence::Memory));
303    }
304
305    #[test]
306    fn wait_resume_method_default() {
307        let method = ResumeMethod::default();
308        assert!(matches!(method, ResumeMethod::Webhook));
309    }
310
311    #[test]
312    fn wait_timeout_action_default() {
313        let action = TimeoutAction::default();
314        assert!(matches!(action, TimeoutAction::Reject));
315    }
316
317    #[test]
318    fn wait_with_redis() {
319        let node = WaitNode::with_redis("hook", "xerv:wait", 3600);
320        assert!(matches!(
321            node.persistence,
322            WaitPersistence::Redis {
323                ttl_seconds: 3600,
324                ..
325            }
326        ));
327    }
328
329    #[test]
330    fn wait_with_timeout() {
331        let node = WaitNode::new("hook").with_timeout(300, TimeoutAction::Approve);
332        assert!(matches!(
333            node.resume_method,
334            ResumeMethod::Timeout {
335                seconds: 300,
336                on_timeout: TimeoutAction::Approve
337            }
338        ));
339    }
340
341    #[test]
342    fn wait_extract_metadata() {
343        let node = WaitNode::new("hook")
344            .with_metadata(vec!["user.name".to_string(), "order.id".to_string()]);
345
346        let input = Value::from(json!({
347            "user": {"name": "Alice"},
348            "order": {"id": 12345}
349        }));
350
351        let metadata = node.extract_metadata(&input);
352
353        assert_eq!(
354            metadata.get("name"),
355            Some(&serde_json::Value::String("Alice".to_string()))
356        );
357        assert_eq!(
358            metadata.get("id"),
359            Some(&serde_json::Value::Number(12345.into()))
360        );
361    }
362
363    #[test]
364    fn wait_builder_chain() {
365        let node = WaitNode::new("approval_hook")
366            .with_persistence(WaitPersistence::Redis {
367                key_prefix: "xerv".to_string(),
368                ttl_seconds: 86400,
369            })
370            .with_metadata(vec!["order_id".to_string()]);
371
372        assert_eq!(node.hook_id, "approval_hook");
373        assert!(matches!(node.persistence, WaitPersistence::Redis { .. }));
374        assert_eq!(node.metadata_fields.len(), 1);
375    }
376}