1use std::collections::HashMap;
7use xerv_core::traits::{Context, Node, NodeFuture, NodeInfo, NodeOutput, Port, PortDirection};
8use xerv_core::types::RelPtr;
9use xerv_core::value::Value;
10
11#[derive(Debug, Clone, Default)]
13pub enum WaitPersistence {
14 #[default]
16 Memory,
17 Redis {
19 key_prefix: String,
21 ttl_seconds: u64,
23 },
24 Database {
26 table: String,
28 },
29}
30
31#[derive(Debug, Clone, Default)]
33pub enum ResumeMethod {
34 #[default]
36 Webhook,
37 ApiApproval {
39 token_field: String,
41 },
42 Timeout {
44 seconds: u64,
46 on_timeout: TimeoutAction,
48 },
49}
50
51#[derive(Debug, Clone, Copy, Default)]
53pub enum TimeoutAction {
54 Approve,
56 #[default]
58 Reject,
59 Escalate,
61}
62
63#[derive(Debug)]
96pub struct WaitNode {
97 hook_id: String,
99 persistence: WaitPersistence,
101 resume_method: ResumeMethod,
103 metadata_fields: Vec<String>,
105}
106
107impl WaitNode {
108 pub fn new(hook_id: impl Into<String>) -> Self {
110 Self {
111 hook_id: hook_id.into(),
112 persistence: WaitPersistence::Memory,
113 resume_method: ResumeMethod::Webhook,
114 metadata_fields: Vec::new(),
115 }
116 }
117
118 pub fn webhook(hook_id: impl Into<String>) -> Self {
120 Self::new(hook_id)
121 }
122
123 pub fn with_redis(
125 hook_id: impl Into<String>,
126 key_prefix: impl Into<String>,
127 ttl_seconds: u64,
128 ) -> Self {
129 Self {
130 hook_id: hook_id.into(),
131 persistence: WaitPersistence::Redis {
132 key_prefix: key_prefix.into(),
133 ttl_seconds,
134 },
135 resume_method: ResumeMethod::Webhook,
136 metadata_fields: Vec::new(),
137 }
138 }
139
140 pub fn with_persistence(mut self, persistence: WaitPersistence) -> Self {
142 self.persistence = persistence;
143 self
144 }
145
146 pub fn with_resume_method(mut self, method: ResumeMethod) -> Self {
148 self.resume_method = method;
149 self
150 }
151
152 pub fn with_timeout(mut self, seconds: u64, on_timeout: TimeoutAction) -> Self {
154 self.resume_method = ResumeMethod::Timeout {
155 seconds,
156 on_timeout,
157 };
158 self
159 }
160
161 pub fn with_metadata(mut self, fields: Vec<String>) -> Self {
163 self.metadata_fields = fields;
164 self
165 }
166
167 fn extract_metadata(&self, input: &Value) -> serde_json::Map<String, serde_json::Value> {
169 let mut metadata = serde_json::Map::new();
170 for field in &self.metadata_fields {
171 if let Some(value) = input.get_field(field) {
172 let key = field.split('.').last().unwrap_or(field);
173 metadata.insert(key.to_string(), value.into_inner());
174 }
175 }
176 metadata
177 }
178}
179
180impl Node for WaitNode {
181 fn info(&self) -> NodeInfo {
182 NodeInfo::new("std", "wait")
183 .with_description("Pause execution for human-in-the-loop approval")
184 .with_inputs(vec![Port::input("Any")])
185 .with_outputs(vec![
186 Port::named("out", PortDirection::Output, "Any")
187 .with_description("Emitted when approved"),
188 Port::named("rejected", PortDirection::Output, "Any")
189 .with_description("Emitted when rejected"),
190 Port::named("escalated", PortDirection::Output, "Any")
191 .with_description("Emitted when escalated"),
192 Port::error(),
193 ])
194 }
195
196 fn execute<'a>(&'a self, ctx: Context, inputs: HashMap<String, RelPtr<()>>) -> NodeFuture<'a> {
197 Box::pin(async move {
198 let input = inputs.get("in").copied().unwrap_or_else(RelPtr::null);
199
200 let value = if input.is_null() {
202 Value::null()
203 } else {
204 match ctx.read_bytes(input) {
205 Ok(bytes) => Value::from_bytes(&bytes).unwrap_or_else(|_| Value::null()),
206 Err(_) => Value::null(),
207 }
208 };
209
210 let metadata = self.extract_metadata(&value);
212
213 tracing::info!(
227 hook_id = %self.hook_id,
228 trace_id = %ctx.trace_id(),
229 persistence = ?self.persistence,
230 resume_method = ?self.resume_method,
231 metadata = ?metadata,
232 "Wait: pausing for approval"
233 );
234
235 let wait_state = Value::from(serde_json::json!({
244 "hook_id": self.hook_id,
245 "trace_id": ctx.trace_id().to_string(),
246 "status": "waiting",
247 "metadata": metadata,
248 "resume_url": format!("/api/v1/resume/{}", self.hook_id)
249 }));
250
251 let state_bytes = match wait_state.to_bytes() {
253 Ok(bytes) => bytes,
254 Err(e) => {
255 return Ok(NodeOutput::error_with_message(format!(
256 "Failed to serialize wait state: {}",
257 e
258 )));
259 }
260 };
261
262 let state_ptr = match ctx.write_bytes(&state_bytes) {
263 Ok(ptr) => ptr,
264 Err(e) => {
265 return Ok(NodeOutput::error_with_message(format!(
266 "Failed to write wait state: {}",
267 e
268 )));
269 }
270 };
271
272 Ok(NodeOutput::out(state_ptr))
276 })
277 }
278}
279
280#[cfg(test)]
281mod tests {
282 use super::*;
283 use serde_json::json;
284
285 #[test]
286 fn wait_node_info() {
287 let node = WaitNode::new("test_hook");
288 let info = node.info();
289
290 assert_eq!(info.name, "std::wait");
291 assert_eq!(info.inputs.len(), 1);
292 assert_eq!(info.outputs.len(), 4);
293 assert_eq!(info.outputs[0].name, "out");
294 assert_eq!(info.outputs[1].name, "rejected");
295 assert_eq!(info.outputs[2].name, "escalated");
296 assert_eq!(info.outputs[3].name, "error");
297 }
298
299 #[test]
300 fn wait_persistence_default() {
301 let persistence = WaitPersistence::default();
302 assert!(matches!(persistence, WaitPersistence::Memory));
303 }
304
305 #[test]
306 fn wait_resume_method_default() {
307 let method = ResumeMethod::default();
308 assert!(matches!(method, ResumeMethod::Webhook));
309 }
310
311 #[test]
312 fn wait_timeout_action_default() {
313 let action = TimeoutAction::default();
314 assert!(matches!(action, TimeoutAction::Reject));
315 }
316
317 #[test]
318 fn wait_with_redis() {
319 let node = WaitNode::with_redis("hook", "xerv:wait", 3600);
320 assert!(matches!(
321 node.persistence,
322 WaitPersistence::Redis {
323 ttl_seconds: 3600,
324 ..
325 }
326 ));
327 }
328
329 #[test]
330 fn wait_with_timeout() {
331 let node = WaitNode::new("hook").with_timeout(300, TimeoutAction::Approve);
332 assert!(matches!(
333 node.resume_method,
334 ResumeMethod::Timeout {
335 seconds: 300,
336 on_timeout: TimeoutAction::Approve
337 }
338 ));
339 }
340
341 #[test]
342 fn wait_extract_metadata() {
343 let node = WaitNode::new("hook")
344 .with_metadata(vec!["user.name".to_string(), "order.id".to_string()]);
345
346 let input = Value::from(json!({
347 "user": {"name": "Alice"},
348 "order": {"id": 12345}
349 }));
350
351 let metadata = node.extract_metadata(&input);
352
353 assert_eq!(
354 metadata.get("name"),
355 Some(&serde_json::Value::String("Alice".to_string()))
356 );
357 assert_eq!(
358 metadata.get("id"),
359 Some(&serde_json::Value::Number(12345.into()))
360 );
361 }
362
363 #[test]
364 fn wait_builder_chain() {
365 let node = WaitNode::new("approval_hook")
366 .with_persistence(WaitPersistence::Redis {
367 key_prefix: "xerv".to_string(),
368 ttl_seconds: 86400,
369 })
370 .with_metadata(vec!["order_id".to_string()]);
371
372 assert_eq!(node.hook_id, "approval_hook");
373 assert!(matches!(node.persistence, WaitPersistence::Redis { .. }));
374 assert_eq!(node.metadata_fields.len(), 1);
375 }
376}