1use std::collections::{BTreeMap, VecDeque};
2use std::path::{Path, PathBuf};
3use std::time::Duration as StdDuration;
4
5use serde::{Deserialize, Serialize};
6
7use crate::stdlib::macros::{harn_builtin, VmBuiltinDef};
8use crate::stdlib::process::runtime_root_base;
9use crate::value::{VmError, VmValue};
10use crate::vm::Vm;
11
12const DEFAULT_UPDATE_TIMEOUT_MS: u64 = 30_000;
13const UPDATE_POLL_INTERVAL_MS: u64 = 25;
14
15pub(crate) const MODULE_BUILTINS: &[&VmBuiltinDef] = &[
16 &WORKFLOW_SIGNAL_BUILTIN_DEF,
17 &WORKFLOW_QUERY_BUILTIN_DEF,
18 &WORKFLOW_PUBLISH_QUERY_BUILTIN_DEF,
19 &WORKFLOW_RECEIVE_BUILTIN_DEF,
20 &WORKFLOW_RESPOND_UPDATE_BUILTIN_DEF,
21 &WORKFLOW_PAUSE_BUILTIN_DEF,
22 &WORKFLOW_RESUME_BUILTIN_DEF,
23 &WORKFLOW_STATUS_BUILTIN_DEF,
24 &WORKFLOW_CONTINUE_AS_NEW_BUILTIN_DEF,
25 &CONTINUE_AS_NEW_BUILTIN_DEF,
26 &WORKFLOW_UPDATE_BUILTIN_DEF,
27];
28
29#[derive(Clone, Debug, Serialize, Deserialize)]
30pub struct WorkflowMessageRecord {
31 pub seq: u64,
32 pub kind: String,
33 pub name: String,
34 #[serde(skip_serializing_if = "Option::is_none")]
35 pub request_id: Option<String>,
36 pub payload: serde_json::Value,
37 pub enqueued_at: String,
38}
39
40#[derive(Clone, Debug, Serialize, Deserialize)]
41pub struct WorkflowQueryRecord {
42 pub value: serde_json::Value,
43 pub published_at: String,
44}
45
46#[derive(Clone, Debug, Serialize, Deserialize)]
47pub struct WorkflowUpdateResponseRecord {
48 pub request_id: String,
49 #[serde(skip_serializing_if = "Option::is_none")]
50 pub name: Option<String>,
51 pub value: serde_json::Value,
52 pub responded_at: String,
53}
54
55#[derive(Clone, Debug, Serialize, Deserialize)]
56pub struct WorkflowMailboxState {
57 #[serde(rename = "_type")]
58 pub type_name: String,
59 pub workflow_id: String,
60 #[serde(default = "default_generation")]
61 pub generation: u64,
62 #[serde(default)]
63 pub continue_as_new_count: u64,
64 #[serde(skip_serializing_if = "Option::is_none")]
65 pub last_continue_as_new_at: Option<String>,
66 #[serde(default)]
67 pub paused: bool,
68 #[serde(default)]
69 pub next_seq: u64,
70 #[serde(default)]
71 pub mailbox: VecDeque<WorkflowMessageRecord>,
72 #[serde(default)]
73 pub queries: BTreeMap<String, WorkflowQueryRecord>,
74 #[serde(default)]
75 pub responses: BTreeMap<String, WorkflowUpdateResponseRecord>,
76}
77
78impl Default for WorkflowMailboxState {
79 fn default() -> Self {
80 Self {
81 type_name: "workflow_mailbox".to_string(),
82 workflow_id: String::new(),
83 generation: default_generation(),
84 continue_as_new_count: 0,
85 last_continue_as_new_at: None,
86 paused: false,
87 next_seq: 0,
88 mailbox: VecDeque::new(),
89 queries: BTreeMap::new(),
90 responses: BTreeMap::new(),
91 }
92 }
93}
94
95fn default_generation() -> u64 {
96 1
97}
98
99#[derive(Clone, Debug, PartialEq, Eq)]
100struct WorkflowTarget {
101 workflow_id: String,
102 base_dir: PathBuf,
103}
104
105fn sanitize_workflow_id(raw: &str) -> String {
106 let trimmed = raw.trim();
107 let mut sanitized = String::with_capacity(trimmed.len());
108 for ch in trimmed.chars() {
109 if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_' | '.') {
110 sanitized.push(ch);
111 } else {
112 sanitized.push('_');
113 }
114 }
115 if sanitized.is_empty() || sanitized == "." || sanitized == ".." {
116 "workflow".to_string()
117 } else {
118 sanitized
119 }
120}
121
122fn workflow_base_dir_from_persisted_path(path: &Path) -> PathBuf {
123 for ancestor in path.ancestors() {
124 if ancestor.file_name().and_then(|value| value.to_str()) == Some(".harn-runs") {
125 return non_empty_path_or_dot(ancestor.parent().unwrap_or_else(|| Path::new(".")));
126 }
127 }
128 non_empty_path_or_dot(path.parent().unwrap_or_else(|| Path::new(".")))
129}
130
131fn non_empty_path_or_dot(path: &Path) -> PathBuf {
132 if path.as_os_str().is_empty() {
133 PathBuf::from(".")
134 } else {
135 path.to_path_buf()
136 }
137}
138
139fn workflow_target_root(target: &WorkflowTarget) -> PathBuf {
140 crate::runtime_paths::workflow_dir(&target.base_dir).join(&target.workflow_id)
141}
142
143fn workflow_state_path(target: &WorkflowTarget) -> PathBuf {
144 workflow_target_root(target).join("state.json")
145}
146
147fn now_rfc3339() -> String {
148 time::OffsetDateTime::now_utc()
149 .format(&time::format_description::well_known::Rfc3339)
150 .unwrap_or_else(|_| uuid::Uuid::now_v7().to_string())
151}
152
153fn load_state(target: &WorkflowTarget) -> Result<WorkflowMailboxState, String> {
154 let path = workflow_state_path(target);
155 if !path.exists() {
156 return Ok(WorkflowMailboxState {
157 workflow_id: target.workflow_id.clone(),
158 ..WorkflowMailboxState::default()
159 });
160 }
161 let text = std::fs::read_to_string(&path)
162 .map_err(|error| format!("workflow state read error: {error}"))?;
163 let mut state: WorkflowMailboxState = serde_json::from_str(&text)
164 .map_err(|error| format!("workflow state parse error: {error}"))?;
165 if state.type_name.is_empty() {
166 state.type_name = "workflow_mailbox".to_string();
167 }
168 if state.workflow_id.is_empty() {
169 state.workflow_id = target.workflow_id.clone();
170 }
171 if state.generation == 0 {
172 state.generation = 1;
173 }
174 Ok(state)
175}
176
177fn save_state(target: &WorkflowTarget, state: &WorkflowMailboxState) -> Result<(), String> {
178 let path = workflow_state_path(target);
179 let json = serde_json::to_string_pretty(state)
180 .map_err(|error| format!("workflow state encode error: {error}"))?;
181 crate::atomic_io::atomic_write(&path, json.as_bytes())
182 .map_err(|error| format!("workflow state write error: {error}"))
183}
184
185fn parse_target_json(
186 value: &serde_json::Value,
187 fallback_base_dir: Option<&Path>,
188) -> Option<WorkflowTarget> {
189 match value {
190 serde_json::Value::String(text) => Some(WorkflowTarget {
191 workflow_id: sanitize_workflow_id(text),
192 base_dir: fallback_base_dir
193 .map(Path::to_path_buf)
194 .unwrap_or_else(runtime_root_base),
195 }),
196 serde_json::Value::Object(map) => {
197 let workflow_id = map
198 .get("workflow_id")
199 .and_then(|value| value.as_str())
200 .or_else(|| map.get("workflow").and_then(|value| value.as_str()))
201 .or_else(|| {
202 map.get("run")
203 .and_then(|value| value.get("workflow_id"))
204 .and_then(|value| value.as_str())
205 })
206 .or_else(|| {
207 map.get("result")
208 .and_then(|value| value.get("run"))
209 .and_then(|value| value.get("workflow_id"))
210 .and_then(|value| value.as_str())
211 })?;
212 let explicit_base = map
213 .get("base_dir")
214 .and_then(|value| value.as_str())
215 .filter(|value| !value.trim().is_empty())
216 .map(PathBuf::from);
217 let persisted_path = map
218 .get("persisted_path")
219 .and_then(|value| value.as_str())
220 .or_else(|| map.get("path").and_then(|value| value.as_str()))
221 .or_else(|| {
222 map.get("run")
223 .and_then(|value| value.get("persisted_path"))
224 .and_then(|value| value.as_str())
225 })
226 .or_else(|| {
227 map.get("result")
228 .and_then(|value| value.get("run"))
229 .and_then(|value| value.get("persisted_path"))
230 .and_then(|value| value.as_str())
231 });
232 let base_dir = explicit_base
233 .or_else(|| {
234 persisted_path
235 .map(|path| workflow_base_dir_from_persisted_path(Path::new(path)))
236 })
237 .or_else(|| fallback_base_dir.map(Path::to_path_buf))
238 .unwrap_or_else(runtime_root_base);
239 Some(WorkflowTarget {
240 workflow_id: sanitize_workflow_id(workflow_id),
241 base_dir,
242 })
243 }
244 _ => None,
245 }
246}
247
248fn parse_target_vm(
249 value: Option<&VmValue>,
250 fallback_base_dir: Option<&Path>,
251 builtin: &str,
252) -> Result<WorkflowTarget, VmError> {
253 let value = value.ok_or_else(|| VmError::Runtime(format!("{builtin}: missing target")))?;
254 parse_target_json(&crate::llm::vm_value_to_json(value), fallback_base_dir).ok_or_else(|| {
255 VmError::Runtime(format!(
256 "{builtin}: target must be a workflow id string or dict with workflow_id/workflow"
257 ))
258 })
259}
260
261fn workflow_status_json(
262 target: &WorkflowTarget,
263 state: &WorkflowMailboxState,
264) -> serde_json::Value {
265 serde_json::json!({
266 "workflow_id": target.workflow_id,
267 "base_dir": target.base_dir.to_string_lossy(),
268 "generation": state.generation,
269 "paused": state.paused,
270 "pending_count": state.mailbox.len(),
271 "query_count": state.queries.len(),
272 "response_count": state.responses.len(),
273 "continue_as_new_count": state.continue_as_new_count,
274 "last_continue_as_new_at": state.last_continue_as_new_at,
275 })
276}
277
278fn target_for_base(base_dir: &Path, workflow_id: &str) -> WorkflowTarget {
279 WorkflowTarget {
280 workflow_id: sanitize_workflow_id(workflow_id),
281 base_dir: base_dir.to_path_buf(),
282 }
283}
284
285fn enqueue_message(
286 target: &WorkflowTarget,
287 kind: &str,
288 name: &str,
289 payload: serde_json::Value,
290 request_id: Option<String>,
291) -> Result<serde_json::Value, String> {
292 let mut state = load_state(target)?;
293 let message = push_message(&mut state, kind, name, payload, request_id);
294 save_state(target, &state)?;
295 Ok(serde_json::json!({
296 "workflow_id": target.workflow_id,
297 "message": message,
298 "status": workflow_status_json(target, &state),
299 }))
300}
301
302fn push_message(
303 state: &mut WorkflowMailboxState,
304 kind: &str,
305 name: &str,
306 payload: serde_json::Value,
307 request_id: Option<String>,
308) -> WorkflowMessageRecord {
309 state.next_seq += 1;
310 let message = WorkflowMessageRecord {
311 seq: state.next_seq,
312 kind: kind.to_string(),
313 name: name.to_string(),
314 request_id,
315 payload,
316 enqueued_at: now_rfc3339(),
317 };
318 state.mailbox.push_back(message.clone());
319 message
320}
321
322fn receive_message(target: &WorkflowTarget) -> Result<Option<WorkflowMessageRecord>, String> {
323 let mut state = load_state(target)?;
324 let message = state.mailbox.pop_front();
325 if message.is_some() {
326 save_state(target, &state)?;
327 }
328 Ok(message)
329}
330
331pub fn workflow_signal_for_base(
332 base_dir: &Path,
333 workflow_id: &str,
334 name: &str,
335 payload: serde_json::Value,
336) -> Result<serde_json::Value, String> {
337 let target = target_for_base(base_dir, workflow_id);
338 enqueue_message(&target, "signal", name, payload, None)
339}
340
341pub fn workflow_query_for_base(
342 base_dir: &Path,
343 workflow_id: &str,
344 name: &str,
345) -> Result<serde_json::Value, String> {
346 let target = target_for_base(base_dir, workflow_id);
347 let state = load_state(&target)?;
348 Ok(state
349 .queries
350 .get(name)
351 .map(|record| record.value.clone())
352 .unwrap_or(serde_json::Value::Null))
353}
354
355pub fn workflow_publish_query_for_base(
356 base_dir: &Path,
357 workflow_id: &str,
358 name: &str,
359 value: serde_json::Value,
360) -> Result<serde_json::Value, String> {
361 let target = target_for_base(base_dir, workflow_id);
362 let mut state = load_state(&target)?;
363 state.queries.insert(
364 name.to_string(),
365 WorkflowQueryRecord {
366 value,
367 published_at: now_rfc3339(),
368 },
369 );
370 save_state(&target, &state)?;
371 Ok(workflow_status_json(&target, &state))
372}
373
374pub fn workflow_pause_for_base(
375 base_dir: &Path,
376 workflow_id: &str,
377) -> Result<serde_json::Value, String> {
378 let target = target_for_base(base_dir, workflow_id);
379 let mut state = load_state(&target)?;
380 state.paused = true;
381 push_message(&mut state, "control", "pause", serde_json::json!({}), None);
382 save_state(&target, &state)?;
383 Ok(workflow_status_json(&target, &state))
384}
385
386pub fn workflow_resume_for_base(
387 base_dir: &Path,
388 workflow_id: &str,
389) -> Result<serde_json::Value, String> {
390 let target = target_for_base(base_dir, workflow_id);
391 let mut state = load_state(&target)?;
392 state.paused = false;
393 push_message(&mut state, "control", "resume", serde_json::json!({}), None);
394 save_state(&target, &state)?;
395 Ok(workflow_status_json(&target, &state))
396}
397
398pub async fn workflow_update_for_base(
399 base_dir: &Path,
400 workflow_id: &str,
401 name: &str,
402 payload: serde_json::Value,
403 timeout: StdDuration,
404) -> Result<serde_json::Value, String> {
405 let target = target_for_base(base_dir, workflow_id);
406 let request_id = enqueue_update_request(&target, name, payload)?;
407 wait_for_update_response(&target, name, &request_id, timeout).await
408}
409
410fn enqueue_update_request(
411 target: &WorkflowTarget,
412 name: &str,
413 payload: serde_json::Value,
414) -> Result<String, String> {
415 let request_id = uuid::Uuid::now_v7().to_string();
416 enqueue_message(target, "update", name, payload, Some(request_id.clone()))?;
417 Ok(request_id)
418}
419
420async fn wait_for_update_response(
421 target: &WorkflowTarget,
422 name: &str,
423 request_id: &str,
424 timeout: StdDuration,
425) -> Result<serde_json::Value, String> {
426 let deadline = tokio::time::Instant::now() + timeout;
427 loop {
428 match update_response_value(target, request_id) {
429 Ok(Some(value)) => return Ok(value),
430 Ok(None) => {}
431 Err(error) => return Err(error),
432 }
433
434 let now = tokio::time::Instant::now();
435 if now >= deadline {
436 break;
437 }
438 let next_poll = now + StdDuration::from_millis(UPDATE_POLL_INTERVAL_MS);
439 tokio::time::sleep_until(next_poll.min(deadline)).await;
440 }
441 Err(format!(
442 "workflow update '{name}' timed out for '{}'",
443 target.workflow_id
444 ))
445}
446
447fn update_response_value(
448 target: &WorkflowTarget,
449 request_id: &str,
450) -> Result<Option<serde_json::Value>, String> {
451 Ok(load_state(target)?
452 .responses
453 .get(request_id)
454 .map(|response| response.value.clone()))
455}
456
457pub fn workflow_respond_update_for_base(
458 base_dir: &Path,
459 workflow_id: &str,
460 request_id: &str,
461 name: Option<&str>,
462 value: serde_json::Value,
463) -> Result<serde_json::Value, String> {
464 let target = target_for_base(base_dir, workflow_id);
465 record_update_response(&target, request_id, name, value)
466}
467
468fn record_update_response(
469 target: &WorkflowTarget,
470 request_id: &str,
471 name: Option<&str>,
472 value: serde_json::Value,
473) -> Result<serde_json::Value, String> {
474 let mut state = load_state(target)?;
475 state.responses.insert(
476 request_id.to_string(),
477 WorkflowUpdateResponseRecord {
478 request_id: request_id.to_string(),
479 name: name.map(ToString::to_string),
480 value,
481 responded_at: now_rfc3339(),
482 },
483 );
484 save_state(target, &state)?;
485 Ok(workflow_status_json(target, &state))
486}
487
488pub(crate) fn register_workflow_message_builtins(vm: &mut Vm) {
489 vm.set_global(
490 "workflow",
491 VmValue::Dict(std::sync::Arc::new(BTreeMap::from([
492 (
493 "signal".to_string(),
494 VmValue::BuiltinRef(std::sync::Arc::from("workflow.signal")),
495 ),
496 (
497 "query".to_string(),
498 VmValue::BuiltinRef(std::sync::Arc::from("workflow.query")),
499 ),
500 (
501 "update".to_string(),
502 VmValue::BuiltinRef(std::sync::Arc::from("workflow.update")),
503 ),
504 (
505 "publish_query".to_string(),
506 VmValue::BuiltinRef(std::sync::Arc::from("workflow.publish_query")),
507 ),
508 (
509 "receive".to_string(),
510 VmValue::BuiltinRef(std::sync::Arc::from("workflow.receive")),
511 ),
512 (
513 "respond_update".to_string(),
514 VmValue::BuiltinRef(std::sync::Arc::from("workflow.respond_update")),
515 ),
516 (
517 "pause".to_string(),
518 VmValue::BuiltinRef(std::sync::Arc::from("workflow.pause")),
519 ),
520 (
521 "resume".to_string(),
522 VmValue::BuiltinRef(std::sync::Arc::from("workflow.resume")),
523 ),
524 (
525 "status".to_string(),
526 VmValue::BuiltinRef(std::sync::Arc::from("workflow.status")),
527 ),
528 (
529 "continue_as_new".to_string(),
530 VmValue::BuiltinRef(std::sync::Arc::from("workflow.continue_as_new")),
531 ),
532 ]))),
533 );
534
535 for def in MODULE_BUILTINS {
536 vm.register_builtin_def(def);
537 }
538}
539
540#[harn_builtin(
542 sig = "workflow.signal(target: string|dict, name: string, payload?: any) -> dict",
543 category = "workflow.messages"
544)]
545fn workflow_signal_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
546 let target = parse_target_vm(args.first(), None, "workflow.signal")?;
547 let name = args
548 .get(1)
549 .map(|value| value.display())
550 .filter(|value| !value.is_empty())
551 .ok_or_else(|| VmError::Runtime("workflow.signal: missing name".to_string()))?;
552 let payload = args
553 .get(2)
554 .map(crate::llm::vm_value_to_json)
555 .unwrap_or(serde_json::Value::Null);
556 let result =
557 enqueue_message(&target, "signal", &name, payload, None).map_err(VmError::Runtime)?;
558 Ok(crate::stdlib::json_to_vm_value(&result))
559}
560
561#[harn_builtin(
563 sig = "workflow.query(target: string|dict, name: string) -> any",
564 category = "workflow.messages"
565)]
566fn workflow_query_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
567 let target = parse_target_vm(args.first(), None, "workflow.query")?;
568 let name = args
569 .get(1)
570 .map(|value| value.display())
571 .filter(|value| !value.is_empty())
572 .ok_or_else(|| VmError::Runtime("workflow.query: missing name".to_string()))?;
573 let state = load_state(&target).map_err(VmError::Runtime)?;
574 Ok(crate::stdlib::json_to_vm_value(
575 &state
576 .queries
577 .get(&name)
578 .map(|record| record.value.clone())
579 .unwrap_or(serde_json::Value::Null),
580 ))
581}
582
583#[harn_builtin(
585 sig = "workflow.update(target: string|dict, name: string, payload?: any, options?: dict|nil) -> any",
586 kind = "async",
587 category = "workflow.messages"
588)]
589async fn workflow_update_builtin(
590 _ctx: crate::vm::AsyncBuiltinCtx,
591 args: Vec<VmValue>,
592) -> Result<VmValue, VmError> {
593 let target = parse_target_vm(args.first(), None, "workflow.update")?;
594 let name = args
595 .get(1)
596 .map(|value| value.display())
597 .filter(|value| !value.is_empty())
598 .ok_or_else(|| VmError::Runtime("workflow.update: missing name".to_string()))?;
599 let payload = args
600 .get(2)
601 .map(crate::llm::vm_value_to_json)
602 .unwrap_or(serde_json::Value::Null);
603 let timeout_ms = args
604 .get(3)
605 .and_then(|value| value.as_dict())
606 .and_then(|dict| dict.get("timeout_ms"))
607 .and_then(VmValue::as_int)
608 .unwrap_or(DEFAULT_UPDATE_TIMEOUT_MS as i64)
609 .max(1) as u64;
610 let result = workflow_update_for_base(
611 &target.base_dir,
612 &target.workflow_id,
613 &name,
614 payload,
615 StdDuration::from_millis(timeout_ms),
616 )
617 .await
618 .map_err(VmError::Runtime)?;
619 Ok(crate::stdlib::json_to_vm_value(&result))
620}
621
622#[harn_builtin(
624 sig = "workflow.publish_query(target: string|dict, name: string, value?: any) -> dict",
625 category = "workflow.messages"
626)]
627fn workflow_publish_query_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
628 let target = parse_target_vm(args.first(), None, "workflow.publish_query")?;
629 let name = args
630 .get(1)
631 .map(|value| value.display())
632 .filter(|value| !value.is_empty())
633 .ok_or_else(|| VmError::Runtime("workflow.publish_query: missing name".to_string()))?;
634 let value = args
635 .get(2)
636 .map(crate::llm::vm_value_to_json)
637 .unwrap_or(serde_json::Value::Null);
638 let result =
639 workflow_publish_query_for_base(&target.base_dir, &target.workflow_id, &name, value)
640 .map_err(VmError::Runtime)?;
641 Ok(crate::stdlib::json_to_vm_value(&result))
642}
643
644#[harn_builtin(
646 sig = "workflow.receive(target: string|dict) -> dict|nil",
647 category = "workflow.messages"
648)]
649fn workflow_receive_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
650 let target = parse_target_vm(args.first(), None, "workflow.receive")?;
651 let Some(message) = receive_message(&target).map_err(VmError::Runtime)? else {
652 return Ok(VmValue::Nil);
653 };
654 Ok(crate::stdlib::json_to_vm_value(&serde_json::json!({
655 "workflow_id": target.workflow_id,
656 "seq": message.seq,
657 "kind": message.kind,
658 "name": message.name,
659 "request_id": message.request_id,
660 "payload": message.payload,
661 "enqueued_at": message.enqueued_at,
662 })))
663}
664
665#[harn_builtin(
667 sig = "workflow.respond_update(target: string|dict, request_id: string, value?: any, name?: string|nil) -> dict",
668 category = "workflow.messages"
669)]
670fn workflow_respond_update_builtin(
671 args: &[VmValue],
672 _out: &mut String,
673) -> Result<VmValue, VmError> {
674 let target = parse_target_vm(args.first(), None, "workflow.respond_update")?;
675 let request_id = args
676 .get(1)
677 .map(|value| value.display())
678 .filter(|value| !value.is_empty())
679 .ok_or_else(|| {
680 VmError::Runtime("workflow.respond_update: missing request id".to_string())
681 })?;
682 let value = args
683 .get(2)
684 .map(crate::llm::vm_value_to_json)
685 .unwrap_or(serde_json::Value::Null);
686 let name = args
687 .get(3)
688 .map(|value| value.display())
689 .filter(|value| !value.is_empty());
690 let result = workflow_respond_update_for_base(
691 &target.base_dir,
692 &target.workflow_id,
693 &request_id,
694 name.as_deref(),
695 value,
696 )
697 .map_err(VmError::Runtime)?;
698 Ok(crate::stdlib::json_to_vm_value(&result))
699}
700
701#[harn_builtin(
703 sig = "workflow.pause(target: string|dict) -> dict",
704 category = "workflow.messages"
705)]
706fn workflow_pause_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
707 let target = parse_target_vm(args.first(), None, "workflow.pause")?;
708 let result =
709 workflow_pause_for_base(&target.base_dir, &target.workflow_id).map_err(VmError::Runtime)?;
710 Ok(crate::stdlib::json_to_vm_value(&result))
711}
712
713#[harn_builtin(
715 sig = "workflow.resume(target: string|dict) -> dict",
716 category = "workflow.messages"
717)]
718fn workflow_resume_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
719 let target = parse_target_vm(args.first(), None, "workflow.resume")?;
720 let result = workflow_resume_for_base(&target.base_dir, &target.workflow_id)
721 .map_err(VmError::Runtime)?;
722 Ok(crate::stdlib::json_to_vm_value(&result))
723}
724
725#[harn_builtin(
727 sig = "workflow.status(target: string|dict) -> dict",
728 category = "workflow.messages"
729)]
730fn workflow_status_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
731 let target = parse_target_vm(args.first(), None, "workflow.status")?;
732 let state = load_state(&target).map_err(VmError::Runtime)?;
733 Ok(crate::stdlib::json_to_vm_value(&workflow_status_json(
734 &target, &state,
735 )))
736}
737
738#[harn_builtin(
740 sig = "workflow.continue_as_new(target: string|dict) -> dict",
741 category = "workflow.messages"
742)]
743fn workflow_continue_as_new_builtin(
744 args: &[VmValue],
745 _out: &mut String,
746) -> Result<VmValue, VmError> {
747 continue_as_new_for_label(args, "workflow.continue_as_new")
748}
749
750#[harn_builtin(
752 sig = "continue_as_new(target: string|dict) -> dict",
753 category = "workflow.messages"
754)]
755fn continue_as_new_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
756 continue_as_new_for_label(args, "continue_as_new")
757}
758
759fn continue_as_new_for_label(args: &[VmValue], label: &str) -> Result<VmValue, VmError> {
760 let target = parse_target_vm(args.first(), None, label)?;
761 let mut state = load_state(&target).map_err(VmError::Runtime)?;
762 state.generation += 1;
763 state.continue_as_new_count += 1;
764 state.last_continue_as_new_at = Some(now_rfc3339());
765 state.responses.clear();
766 save_state(&target, &state).map_err(VmError::Runtime)?;
767 Ok(crate::stdlib::json_to_vm_value(&workflow_status_json(
768 &target, &state,
769 )))
770}
771
772#[cfg(test)]
773mod tests {
774 use super::*;
775 use std::task::Poll;
776
777 #[tokio::test(start_paused = true)]
778 async fn update_round_trip_waits_for_response() {
779 let dir = tempfile::tempdir().expect("tempdir");
780 let workflow_id = "wf-update";
781 let base_dir = dir.path().to_path_buf();
782 let target = target_for_base(&base_dir, workflow_id);
783 let request_id =
784 enqueue_update_request(&target, "adjust_budget", serde_json::json!({"max_usd": 10}))
785 .expect("enqueue update");
786
787 let message = receive_message(&target)
788 .expect("receive queued update")
789 .expect("queued update");
790 assert_eq!(message.kind, "update");
791 assert_eq!(message.name, "adjust_budget");
792 assert_eq!(message.request_id.as_deref(), Some(request_id.as_str()));
793 assert_eq!(
794 update_response_value(&target, &request_id).expect("read response"),
795 None
796 );
797
798 let waiter = wait_for_update_response(
799 &target,
800 "adjust_budget",
801 &request_id,
802 StdDuration::from_millis(500),
803 );
804 tokio::pin!(waiter);
805 assert!(matches!(futures::poll!(&mut waiter), Poll::Pending));
806
807 workflow_respond_update_for_base(
808 &base_dir,
809 workflow_id,
810 &request_id,
811 Some("adjust_budget"),
812 serde_json::json!({"ok": true}),
813 )
814 .expect("save response");
815 assert_eq!(
816 update_response_value(&target, &request_id).expect("read response"),
817 Some(serde_json::json!({"ok": true}))
818 );
819 tokio::time::advance(StdDuration::from_millis(UPDATE_POLL_INTERVAL_MS)).await;
820
821 let result = waiter.await.expect("update result");
822 assert_eq!(result, serde_json::json!({"ok": true}));
823 }
824
825 #[tokio::test(start_paused = true)]
826 async fn update_wait_respects_short_timeout() {
827 let dir = tempfile::tempdir().expect("tempdir");
828 let target = target_for_base(dir.path(), "wf-timeout");
829 let request_id =
830 enqueue_update_request(&target, "adjust_budget", serde_json::json!({"max_usd": 10}))
831 .expect("enqueue update");
832
833 let waiter = wait_for_update_response(
834 &target,
835 "adjust_budget",
836 &request_id,
837 StdDuration::from_millis(10),
838 );
839 tokio::pin!(waiter);
840 assert!(matches!(futures::poll!(&mut waiter), Poll::Pending));
841
842 tokio::time::advance(StdDuration::from_millis(9)).await;
843 assert!(matches!(futures::poll!(&mut waiter), Poll::Pending));
844
845 tokio::time::advance(StdDuration::from_millis(1)).await;
846 let err = waiter.await.expect_err("update should time out");
847 assert!(err.contains("timed out"));
848 }
849
850 #[tokio::test]
851 async fn update_wait_propagates_state_errors() {
852 let dir = tempfile::tempdir().expect("tempdir");
853 let target = target_for_base(dir.path(), "wf-corrupt");
854 std::fs::create_dir_all(workflow_target_root(&target)).expect("state dir");
855 std::fs::write(workflow_state_path(&target), "{not json").expect("state write");
856
857 let err = wait_for_update_response(
858 &target,
859 "adjust_budget",
860 "request-1",
861 StdDuration::from_millis(10),
862 )
863 .await
864 .expect_err("corrupt state should fail immediately");
865 assert!(err.contains("workflow state parse error"));
866 }
867
868 #[test]
869 fn workflow_ids_preserve_namespace_without_path_segments() {
870 assert_eq!(
871 sanitize_workflow_id("workflow://local/start-my-day"),
872 "workflow___local_start-my-day"
873 );
874 assert_eq!(sanitize_workflow_id("../start-my-day"), ".._start-my-day");
875 assert_eq!(sanitize_workflow_id(".."), "workflow");
876 }
877
878 #[test]
879 fn persisted_path_drives_target_base_dir() {
880 let base = parse_target_json(
881 &serde_json::json!({
882 "workflow_id": "wf",
883 "persisted_path": "/tmp/demo/.harn-runs/run.json"
884 }),
885 None,
886 )
887 .expect("target");
888 assert_eq!(base.workflow_id, "wf");
889 assert_eq!(base.base_dir, PathBuf::from("/tmp/demo"));
890 }
891
892 #[test]
893 fn nested_persisted_path_drives_target_base_dir() {
894 let base = parse_target_json(
895 &serde_json::json!({
896 "workflow_id": "wf",
897 "persisted_path": "/tmp/demo/.harn-runs/session/run.json"
898 }),
899 None,
900 )
901 .expect("target");
902 assert_eq!(base.base_dir, PathBuf::from("/tmp/demo"));
903
904 let relative = parse_target_json(
905 &serde_json::json!({
906 "workflow_id": "wf",
907 "persisted_path": ".harn-runs/session/run.json"
908 }),
909 None,
910 )
911 .expect("target");
912 assert_eq!(relative.base_dir, PathBuf::from("."));
913 }
914}