1use std::collections::{BTreeMap, VecDeque};
2use std::path::{Path, PathBuf};
3use std::rc::Rc;
4use std::time::Duration as StdDuration;
5
6use serde::{Deserialize, Serialize};
7
8use crate::stdlib::macros::{harn_builtin, VmBuiltinDef};
9use crate::stdlib::process::runtime_root_base;
10use crate::value::{VmError, VmValue};
11use crate::vm::Vm;
12
13const DEFAULT_UPDATE_TIMEOUT_MS: u64 = 30_000;
14const UPDATE_POLL_INTERVAL_MS: u64 = 25;
15
16pub(crate) const MODULE_BUILTINS: &[&VmBuiltinDef] = &[
17 &WORKFLOW_SIGNAL_BUILTIN_DEF,
18 &WORKFLOW_QUERY_BUILTIN_DEF,
19 &WORKFLOW_PUBLISH_QUERY_BUILTIN_DEF,
20 &WORKFLOW_RECEIVE_BUILTIN_DEF,
21 &WORKFLOW_RESPOND_UPDATE_BUILTIN_DEF,
22 &WORKFLOW_PAUSE_BUILTIN_DEF,
23 &WORKFLOW_RESUME_BUILTIN_DEF,
24 &WORKFLOW_STATUS_BUILTIN_DEF,
25 &WORKFLOW_CONTINUE_AS_NEW_BUILTIN_DEF,
26 &CONTINUE_AS_NEW_BUILTIN_DEF,
27 &WORKFLOW_UPDATE_BUILTIN_DEF,
28];
29
30#[derive(Clone, Debug, Serialize, Deserialize)]
31pub struct WorkflowMessageRecord {
32 pub seq: u64,
33 pub kind: String,
34 pub name: String,
35 #[serde(skip_serializing_if = "Option::is_none")]
36 pub request_id: Option<String>,
37 pub payload: serde_json::Value,
38 pub enqueued_at: String,
39}
40
41#[derive(Clone, Debug, Serialize, Deserialize)]
42pub struct WorkflowQueryRecord {
43 pub value: serde_json::Value,
44 pub published_at: String,
45}
46
47#[derive(Clone, Debug, Serialize, Deserialize)]
48pub struct WorkflowUpdateResponseRecord {
49 pub request_id: String,
50 #[serde(skip_serializing_if = "Option::is_none")]
51 pub name: Option<String>,
52 pub value: serde_json::Value,
53 pub responded_at: String,
54}
55
56#[derive(Clone, Debug, Serialize, Deserialize)]
57pub struct WorkflowMailboxState {
58 #[serde(rename = "_type")]
59 pub type_name: String,
60 pub workflow_id: String,
61 #[serde(default = "default_generation")]
62 pub generation: u64,
63 #[serde(default)]
64 pub continue_as_new_count: u64,
65 #[serde(skip_serializing_if = "Option::is_none")]
66 pub last_continue_as_new_at: Option<String>,
67 #[serde(default)]
68 pub paused: bool,
69 #[serde(default)]
70 pub next_seq: u64,
71 #[serde(default)]
72 pub mailbox: VecDeque<WorkflowMessageRecord>,
73 #[serde(default)]
74 pub queries: BTreeMap<String, WorkflowQueryRecord>,
75 #[serde(default)]
76 pub responses: BTreeMap<String, WorkflowUpdateResponseRecord>,
77}
78
79impl Default for WorkflowMailboxState {
80 fn default() -> Self {
81 Self {
82 type_name: "workflow_mailbox".to_string(),
83 workflow_id: String::new(),
84 generation: default_generation(),
85 continue_as_new_count: 0,
86 last_continue_as_new_at: None,
87 paused: false,
88 next_seq: 0,
89 mailbox: VecDeque::new(),
90 queries: BTreeMap::new(),
91 responses: BTreeMap::new(),
92 }
93 }
94}
95
96fn default_generation() -> u64 {
97 1
98}
99
100#[derive(Clone, Debug, PartialEq, Eq)]
101struct WorkflowTarget {
102 workflow_id: String,
103 base_dir: PathBuf,
104}
105
106fn sanitize_workflow_id(raw: &str) -> String {
107 let trimmed = raw.trim();
108 let mut sanitized = String::with_capacity(trimmed.len());
109 for ch in trimmed.chars() {
110 if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_' | '.') {
111 sanitized.push(ch);
112 } else {
113 sanitized.push('_');
114 }
115 }
116 if sanitized.is_empty() || sanitized == "." || sanitized == ".." {
117 "workflow".to_string()
118 } else {
119 sanitized
120 }
121}
122
123fn workflow_base_dir_from_persisted_path(path: &Path) -> PathBuf {
124 for ancestor in path.ancestors() {
125 if ancestor.file_name().and_then(|value| value.to_str()) == Some(".harn-runs") {
126 return non_empty_path_or_dot(ancestor.parent().unwrap_or_else(|| Path::new(".")));
127 }
128 }
129 non_empty_path_or_dot(path.parent().unwrap_or_else(|| Path::new(".")))
130}
131
132fn non_empty_path_or_dot(path: &Path) -> PathBuf {
133 if path.as_os_str().is_empty() {
134 PathBuf::from(".")
135 } else {
136 path.to_path_buf()
137 }
138}
139
140fn workflow_target_root(target: &WorkflowTarget) -> PathBuf {
141 crate::runtime_paths::workflow_dir(&target.base_dir).join(&target.workflow_id)
142}
143
144fn workflow_state_path(target: &WorkflowTarget) -> PathBuf {
145 workflow_target_root(target).join("state.json")
146}
147
148fn now_rfc3339() -> String {
149 time::OffsetDateTime::now_utc()
150 .format(&time::format_description::well_known::Rfc3339)
151 .unwrap_or_else(|_| uuid::Uuid::now_v7().to_string())
152}
153
154fn load_state(target: &WorkflowTarget) -> Result<WorkflowMailboxState, String> {
155 let path = workflow_state_path(target);
156 if !path.exists() {
157 return Ok(WorkflowMailboxState {
158 workflow_id: target.workflow_id.clone(),
159 ..WorkflowMailboxState::default()
160 });
161 }
162 let text = std::fs::read_to_string(&path)
163 .map_err(|error| format!("workflow state read error: {error}"))?;
164 let mut state: WorkflowMailboxState = serde_json::from_str(&text)
165 .map_err(|error| format!("workflow state parse error: {error}"))?;
166 if state.type_name.is_empty() {
167 state.type_name = "workflow_mailbox".to_string();
168 }
169 if state.workflow_id.is_empty() {
170 state.workflow_id = target.workflow_id.clone();
171 }
172 if state.generation == 0 {
173 state.generation = 1;
174 }
175 Ok(state)
176}
177
178fn save_state(target: &WorkflowTarget, state: &WorkflowMailboxState) -> Result<(), String> {
179 let path = workflow_state_path(target);
180 let json = serde_json::to_string_pretty(state)
181 .map_err(|error| format!("workflow state encode error: {error}"))?;
182 crate::atomic_io::atomic_write(&path, json.as_bytes())
183 .map_err(|error| format!("workflow state write error: {error}"))
184}
185
186fn parse_target_json(
187 value: &serde_json::Value,
188 fallback_base_dir: Option<&Path>,
189) -> Option<WorkflowTarget> {
190 match value {
191 serde_json::Value::String(text) => Some(WorkflowTarget {
192 workflow_id: sanitize_workflow_id(text),
193 base_dir: fallback_base_dir
194 .map(Path::to_path_buf)
195 .unwrap_or_else(runtime_root_base),
196 }),
197 serde_json::Value::Object(map) => {
198 let workflow_id = map
199 .get("workflow_id")
200 .and_then(|value| value.as_str())
201 .or_else(|| map.get("workflow").and_then(|value| value.as_str()))
202 .or_else(|| {
203 map.get("run")
204 .and_then(|value| value.get("workflow_id"))
205 .and_then(|value| value.as_str())
206 })
207 .or_else(|| {
208 map.get("result")
209 .and_then(|value| value.get("run"))
210 .and_then(|value| value.get("workflow_id"))
211 .and_then(|value| value.as_str())
212 })?;
213 let explicit_base = map
214 .get("base_dir")
215 .and_then(|value| value.as_str())
216 .filter(|value| !value.trim().is_empty())
217 .map(PathBuf::from);
218 let persisted_path = map
219 .get("persisted_path")
220 .and_then(|value| value.as_str())
221 .or_else(|| map.get("path").and_then(|value| value.as_str()))
222 .or_else(|| {
223 map.get("run")
224 .and_then(|value| value.get("persisted_path"))
225 .and_then(|value| value.as_str())
226 })
227 .or_else(|| {
228 map.get("result")
229 .and_then(|value| value.get("run"))
230 .and_then(|value| value.get("persisted_path"))
231 .and_then(|value| value.as_str())
232 });
233 let base_dir = explicit_base
234 .or_else(|| {
235 persisted_path
236 .map(|path| workflow_base_dir_from_persisted_path(Path::new(path)))
237 })
238 .or_else(|| fallback_base_dir.map(Path::to_path_buf))
239 .unwrap_or_else(runtime_root_base);
240 Some(WorkflowTarget {
241 workflow_id: sanitize_workflow_id(workflow_id),
242 base_dir,
243 })
244 }
245 _ => None,
246 }
247}
248
249fn parse_target_vm(
250 value: Option<&VmValue>,
251 fallback_base_dir: Option<&Path>,
252 builtin: &str,
253) -> Result<WorkflowTarget, VmError> {
254 let value = value.ok_or_else(|| VmError::Runtime(format!("{builtin}: missing target")))?;
255 parse_target_json(&crate::llm::vm_value_to_json(value), fallback_base_dir).ok_or_else(|| {
256 VmError::Runtime(format!(
257 "{builtin}: target must be a workflow id string or dict with workflow_id/workflow"
258 ))
259 })
260}
261
262fn workflow_status_json(
263 target: &WorkflowTarget,
264 state: &WorkflowMailboxState,
265) -> serde_json::Value {
266 serde_json::json!({
267 "workflow_id": target.workflow_id,
268 "base_dir": target.base_dir.to_string_lossy(),
269 "generation": state.generation,
270 "paused": state.paused,
271 "pending_count": state.mailbox.len(),
272 "query_count": state.queries.len(),
273 "response_count": state.responses.len(),
274 "continue_as_new_count": state.continue_as_new_count,
275 "last_continue_as_new_at": state.last_continue_as_new_at,
276 })
277}
278
279fn target_for_base(base_dir: &Path, workflow_id: &str) -> WorkflowTarget {
280 WorkflowTarget {
281 workflow_id: sanitize_workflow_id(workflow_id),
282 base_dir: base_dir.to_path_buf(),
283 }
284}
285
286fn enqueue_message(
287 target: &WorkflowTarget,
288 kind: &str,
289 name: &str,
290 payload: serde_json::Value,
291 request_id: Option<String>,
292) -> Result<serde_json::Value, String> {
293 let mut state = load_state(target)?;
294 let message = push_message(&mut state, kind, name, payload, request_id);
295 save_state(target, &state)?;
296 Ok(serde_json::json!({
297 "workflow_id": target.workflow_id,
298 "message": message,
299 "status": workflow_status_json(target, &state),
300 }))
301}
302
303fn push_message(
304 state: &mut WorkflowMailboxState,
305 kind: &str,
306 name: &str,
307 payload: serde_json::Value,
308 request_id: Option<String>,
309) -> WorkflowMessageRecord {
310 state.next_seq += 1;
311 let message = WorkflowMessageRecord {
312 seq: state.next_seq,
313 kind: kind.to_string(),
314 name: name.to_string(),
315 request_id,
316 payload,
317 enqueued_at: now_rfc3339(),
318 };
319 state.mailbox.push_back(message.clone());
320 message
321}
322
323fn receive_message(target: &WorkflowTarget) -> Result<Option<WorkflowMessageRecord>, String> {
324 let mut state = load_state(target)?;
325 let message = state.mailbox.pop_front();
326 if message.is_some() {
327 save_state(target, &state)?;
328 }
329 Ok(message)
330}
331
332pub fn workflow_signal_for_base(
333 base_dir: &Path,
334 workflow_id: &str,
335 name: &str,
336 payload: serde_json::Value,
337) -> Result<serde_json::Value, String> {
338 let target = target_for_base(base_dir, workflow_id);
339 enqueue_message(&target, "signal", name, payload, None)
340}
341
342pub fn workflow_query_for_base(
343 base_dir: &Path,
344 workflow_id: &str,
345 name: &str,
346) -> Result<serde_json::Value, String> {
347 let target = target_for_base(base_dir, workflow_id);
348 let state = load_state(&target)?;
349 Ok(state
350 .queries
351 .get(name)
352 .map(|record| record.value.clone())
353 .unwrap_or(serde_json::Value::Null))
354}
355
356pub fn workflow_publish_query_for_base(
357 base_dir: &Path,
358 workflow_id: &str,
359 name: &str,
360 value: serde_json::Value,
361) -> Result<serde_json::Value, String> {
362 let target = target_for_base(base_dir, workflow_id);
363 let mut state = load_state(&target)?;
364 state.queries.insert(
365 name.to_string(),
366 WorkflowQueryRecord {
367 value,
368 published_at: now_rfc3339(),
369 },
370 );
371 save_state(&target, &state)?;
372 Ok(workflow_status_json(&target, &state))
373}
374
375pub fn workflow_pause_for_base(
376 base_dir: &Path,
377 workflow_id: &str,
378) -> Result<serde_json::Value, String> {
379 let target = target_for_base(base_dir, workflow_id);
380 let mut state = load_state(&target)?;
381 state.paused = true;
382 push_message(&mut state, "control", "pause", serde_json::json!({}), None);
383 save_state(&target, &state)?;
384 Ok(workflow_status_json(&target, &state))
385}
386
387pub fn workflow_resume_for_base(
388 base_dir: &Path,
389 workflow_id: &str,
390) -> Result<serde_json::Value, String> {
391 let target = target_for_base(base_dir, workflow_id);
392 let mut state = load_state(&target)?;
393 state.paused = false;
394 push_message(&mut state, "control", "resume", serde_json::json!({}), None);
395 save_state(&target, &state)?;
396 Ok(workflow_status_json(&target, &state))
397}
398
399pub async fn workflow_update_for_base(
400 base_dir: &Path,
401 workflow_id: &str,
402 name: &str,
403 payload: serde_json::Value,
404 timeout: StdDuration,
405) -> Result<serde_json::Value, String> {
406 let target = target_for_base(base_dir, workflow_id);
407 let request_id = enqueue_update_request(&target, name, payload)?;
408 wait_for_update_response(&target, name, &request_id, timeout).await
409}
410
411fn enqueue_update_request(
412 target: &WorkflowTarget,
413 name: &str,
414 payload: serde_json::Value,
415) -> Result<String, String> {
416 let request_id = uuid::Uuid::now_v7().to_string();
417 enqueue_message(target, "update", name, payload, Some(request_id.clone()))?;
418 Ok(request_id)
419}
420
421async fn wait_for_update_response(
422 target: &WorkflowTarget,
423 name: &str,
424 request_id: &str,
425 timeout: StdDuration,
426) -> Result<serde_json::Value, String> {
427 let deadline = tokio::time::Instant::now() + timeout;
428 loop {
429 match update_response_value(target, request_id) {
430 Ok(Some(value)) => return Ok(value),
431 Ok(None) => {}
432 Err(error) => return Err(error),
433 }
434
435 let now = tokio::time::Instant::now();
436 if now >= deadline {
437 break;
438 }
439 let next_poll = now + StdDuration::from_millis(UPDATE_POLL_INTERVAL_MS);
440 tokio::time::sleep_until(next_poll.min(deadline)).await;
441 }
442 Err(format!(
443 "workflow update '{name}' timed out for '{}'",
444 target.workflow_id
445 ))
446}
447
448fn update_response_value(
449 target: &WorkflowTarget,
450 request_id: &str,
451) -> Result<Option<serde_json::Value>, String> {
452 Ok(load_state(target)?
453 .responses
454 .get(request_id)
455 .map(|response| response.value.clone()))
456}
457
458pub fn workflow_respond_update_for_base(
459 base_dir: &Path,
460 workflow_id: &str,
461 request_id: &str,
462 name: Option<&str>,
463 value: serde_json::Value,
464) -> Result<serde_json::Value, String> {
465 let target = target_for_base(base_dir, workflow_id);
466 record_update_response(&target, request_id, name, value)
467}
468
469fn record_update_response(
470 target: &WorkflowTarget,
471 request_id: &str,
472 name: Option<&str>,
473 value: serde_json::Value,
474) -> Result<serde_json::Value, String> {
475 let mut state = load_state(target)?;
476 state.responses.insert(
477 request_id.to_string(),
478 WorkflowUpdateResponseRecord {
479 request_id: request_id.to_string(),
480 name: name.map(ToString::to_string),
481 value,
482 responded_at: now_rfc3339(),
483 },
484 );
485 save_state(target, &state)?;
486 Ok(workflow_status_json(target, &state))
487}
488
489pub(crate) fn register_workflow_message_builtins(vm: &mut Vm) {
490 vm.set_global(
491 "workflow",
492 VmValue::Dict(Rc::new(BTreeMap::from([
493 (
494 "signal".to_string(),
495 VmValue::BuiltinRef(Rc::from("workflow.signal")),
496 ),
497 (
498 "query".to_string(),
499 VmValue::BuiltinRef(Rc::from("workflow.query")),
500 ),
501 (
502 "update".to_string(),
503 VmValue::BuiltinRef(Rc::from("workflow.update")),
504 ),
505 (
506 "publish_query".to_string(),
507 VmValue::BuiltinRef(Rc::from("workflow.publish_query")),
508 ),
509 (
510 "receive".to_string(),
511 VmValue::BuiltinRef(Rc::from("workflow.receive")),
512 ),
513 (
514 "respond_update".to_string(),
515 VmValue::BuiltinRef(Rc::from("workflow.respond_update")),
516 ),
517 (
518 "pause".to_string(),
519 VmValue::BuiltinRef(Rc::from("workflow.pause")),
520 ),
521 (
522 "resume".to_string(),
523 VmValue::BuiltinRef(Rc::from("workflow.resume")),
524 ),
525 (
526 "status".to_string(),
527 VmValue::BuiltinRef(Rc::from("workflow.status")),
528 ),
529 (
530 "continue_as_new".to_string(),
531 VmValue::BuiltinRef(Rc::from("workflow.continue_as_new")),
532 ),
533 ]))),
534 );
535
536 for def in MODULE_BUILTINS {
537 vm.register_builtin_def(def);
538 }
539}
540
541#[harn_builtin(
543 sig = "workflow.signal(target: string|dict, name: string, payload?: any) -> dict",
544 category = "workflow.messages"
545)]
546fn workflow_signal_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
547 let target = parse_target_vm(args.first(), None, "workflow.signal")?;
548 let name = args
549 .get(1)
550 .map(|value| value.display())
551 .filter(|value| !value.is_empty())
552 .ok_or_else(|| VmError::Runtime("workflow.signal: missing name".to_string()))?;
553 let payload = args
554 .get(2)
555 .map(crate::llm::vm_value_to_json)
556 .unwrap_or(serde_json::Value::Null);
557 let result =
558 enqueue_message(&target, "signal", &name, payload, None).map_err(VmError::Runtime)?;
559 Ok(crate::stdlib::json_to_vm_value(&result))
560}
561
562#[harn_builtin(
564 sig = "workflow.query(target: string|dict, name: string) -> any",
565 category = "workflow.messages"
566)]
567fn workflow_query_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
568 let target = parse_target_vm(args.first(), None, "workflow.query")?;
569 let name = args
570 .get(1)
571 .map(|value| value.display())
572 .filter(|value| !value.is_empty())
573 .ok_or_else(|| VmError::Runtime("workflow.query: missing name".to_string()))?;
574 let state = load_state(&target).map_err(VmError::Runtime)?;
575 Ok(crate::stdlib::json_to_vm_value(
576 &state
577 .queries
578 .get(&name)
579 .map(|record| record.value.clone())
580 .unwrap_or(serde_json::Value::Null),
581 ))
582}
583
584#[harn_builtin(
586 sig = "workflow.update(target: string|dict, name: string, payload?: any, options?: dict|nil) -> any",
587 kind = "async",
588 category = "workflow.messages"
589)]
590async fn workflow_update_builtin(args: Vec<VmValue>) -> Result<VmValue, VmError> {
591 let target = parse_target_vm(args.first(), None, "workflow.update")?;
592 let name = args
593 .get(1)
594 .map(|value| value.display())
595 .filter(|value| !value.is_empty())
596 .ok_or_else(|| VmError::Runtime("workflow.update: missing name".to_string()))?;
597 let payload = args
598 .get(2)
599 .map(crate::llm::vm_value_to_json)
600 .unwrap_or(serde_json::Value::Null);
601 let timeout_ms = args
602 .get(3)
603 .and_then(|value| value.as_dict())
604 .and_then(|dict| dict.get("timeout_ms"))
605 .and_then(VmValue::as_int)
606 .unwrap_or(DEFAULT_UPDATE_TIMEOUT_MS as i64)
607 .max(1) as u64;
608 let result = workflow_update_for_base(
609 &target.base_dir,
610 &target.workflow_id,
611 &name,
612 payload,
613 StdDuration::from_millis(timeout_ms),
614 )
615 .await
616 .map_err(VmError::Runtime)?;
617 Ok(crate::stdlib::json_to_vm_value(&result))
618}
619
620#[harn_builtin(
622 sig = "workflow.publish_query(target: string|dict, name: string, value?: any) -> dict",
623 category = "workflow.messages"
624)]
625fn workflow_publish_query_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
626 let target = parse_target_vm(args.first(), None, "workflow.publish_query")?;
627 let name = args
628 .get(1)
629 .map(|value| value.display())
630 .filter(|value| !value.is_empty())
631 .ok_or_else(|| VmError::Runtime("workflow.publish_query: missing name".to_string()))?;
632 let value = args
633 .get(2)
634 .map(crate::llm::vm_value_to_json)
635 .unwrap_or(serde_json::Value::Null);
636 let result =
637 workflow_publish_query_for_base(&target.base_dir, &target.workflow_id, &name, value)
638 .map_err(VmError::Runtime)?;
639 Ok(crate::stdlib::json_to_vm_value(&result))
640}
641
642#[harn_builtin(
644 sig = "workflow.receive(target: string|dict) -> dict|nil",
645 category = "workflow.messages"
646)]
647fn workflow_receive_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
648 let target = parse_target_vm(args.first(), None, "workflow.receive")?;
649 let Some(message) = receive_message(&target).map_err(VmError::Runtime)? else {
650 return Ok(VmValue::Nil);
651 };
652 Ok(crate::stdlib::json_to_vm_value(&serde_json::json!({
653 "workflow_id": target.workflow_id,
654 "seq": message.seq,
655 "kind": message.kind,
656 "name": message.name,
657 "request_id": message.request_id,
658 "payload": message.payload,
659 "enqueued_at": message.enqueued_at,
660 })))
661}
662
663#[harn_builtin(
665 sig = "workflow.respond_update(target: string|dict, request_id: string, value?: any, name?: string|nil) -> dict",
666 category = "workflow.messages"
667)]
668fn workflow_respond_update_builtin(
669 args: &[VmValue],
670 _out: &mut String,
671) -> Result<VmValue, VmError> {
672 let target = parse_target_vm(args.first(), None, "workflow.respond_update")?;
673 let request_id = args
674 .get(1)
675 .map(|value| value.display())
676 .filter(|value| !value.is_empty())
677 .ok_or_else(|| {
678 VmError::Runtime("workflow.respond_update: missing request id".to_string())
679 })?;
680 let value = args
681 .get(2)
682 .map(crate::llm::vm_value_to_json)
683 .unwrap_or(serde_json::Value::Null);
684 let name = args
685 .get(3)
686 .map(|value| value.display())
687 .filter(|value| !value.is_empty());
688 let result = workflow_respond_update_for_base(
689 &target.base_dir,
690 &target.workflow_id,
691 &request_id,
692 name.as_deref(),
693 value,
694 )
695 .map_err(VmError::Runtime)?;
696 Ok(crate::stdlib::json_to_vm_value(&result))
697}
698
699#[harn_builtin(
701 sig = "workflow.pause(target: string|dict) -> dict",
702 category = "workflow.messages"
703)]
704fn workflow_pause_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
705 let target = parse_target_vm(args.first(), None, "workflow.pause")?;
706 let result =
707 workflow_pause_for_base(&target.base_dir, &target.workflow_id).map_err(VmError::Runtime)?;
708 Ok(crate::stdlib::json_to_vm_value(&result))
709}
710
711#[harn_builtin(
713 sig = "workflow.resume(target: string|dict) -> dict",
714 category = "workflow.messages"
715)]
716fn workflow_resume_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
717 let target = parse_target_vm(args.first(), None, "workflow.resume")?;
718 let result = workflow_resume_for_base(&target.base_dir, &target.workflow_id)
719 .map_err(VmError::Runtime)?;
720 Ok(crate::stdlib::json_to_vm_value(&result))
721}
722
723#[harn_builtin(
725 sig = "workflow.status(target: string|dict) -> dict",
726 category = "workflow.messages"
727)]
728fn workflow_status_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
729 let target = parse_target_vm(args.first(), None, "workflow.status")?;
730 let state = load_state(&target).map_err(VmError::Runtime)?;
731 Ok(crate::stdlib::json_to_vm_value(&workflow_status_json(
732 &target, &state,
733 )))
734}
735
736#[harn_builtin(
738 sig = "workflow.continue_as_new(target: string|dict) -> dict",
739 category = "workflow.messages"
740)]
741fn workflow_continue_as_new_builtin(
742 args: &[VmValue],
743 _out: &mut String,
744) -> Result<VmValue, VmError> {
745 continue_as_new_for_label(args, "workflow.continue_as_new")
746}
747
748#[harn_builtin(
750 sig = "continue_as_new(target: string|dict) -> dict",
751 category = "workflow.messages"
752)]
753fn continue_as_new_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
754 continue_as_new_for_label(args, "continue_as_new")
755}
756
757fn continue_as_new_for_label(args: &[VmValue], label: &str) -> Result<VmValue, VmError> {
758 let target = parse_target_vm(args.first(), None, label)?;
759 let mut state = load_state(&target).map_err(VmError::Runtime)?;
760 state.generation += 1;
761 state.continue_as_new_count += 1;
762 state.last_continue_as_new_at = Some(now_rfc3339());
763 state.responses.clear();
764 save_state(&target, &state).map_err(VmError::Runtime)?;
765 Ok(crate::stdlib::json_to_vm_value(&workflow_status_json(
766 &target, &state,
767 )))
768}
769
770#[cfg(test)]
771mod tests {
772 use super::*;
773 use std::task::Poll;
774
775 #[tokio::test(start_paused = true)]
776 async fn update_round_trip_waits_for_response() {
777 let dir = tempfile::tempdir().expect("tempdir");
778 let workflow_id = "wf-update";
779 let base_dir = dir.path().to_path_buf();
780 let target = target_for_base(&base_dir, workflow_id);
781 let request_id =
782 enqueue_update_request(&target, "adjust_budget", serde_json::json!({"max_usd": 10}))
783 .expect("enqueue update");
784
785 let message = receive_message(&target)
786 .expect("receive queued update")
787 .expect("queued update");
788 assert_eq!(message.kind, "update");
789 assert_eq!(message.name, "adjust_budget");
790 assert_eq!(message.request_id.as_deref(), Some(request_id.as_str()));
791 assert_eq!(
792 update_response_value(&target, &request_id).expect("read response"),
793 None
794 );
795
796 let waiter = wait_for_update_response(
797 &target,
798 "adjust_budget",
799 &request_id,
800 StdDuration::from_millis(500),
801 );
802 tokio::pin!(waiter);
803 assert!(matches!(futures::poll!(&mut waiter), Poll::Pending));
804
805 workflow_respond_update_for_base(
806 &base_dir,
807 workflow_id,
808 &request_id,
809 Some("adjust_budget"),
810 serde_json::json!({"ok": true}),
811 )
812 .expect("save response");
813 assert_eq!(
814 update_response_value(&target, &request_id).expect("read response"),
815 Some(serde_json::json!({"ok": true}))
816 );
817 tokio::time::advance(StdDuration::from_millis(UPDATE_POLL_INTERVAL_MS)).await;
818
819 let result = waiter.await.expect("update result");
820 assert_eq!(result, serde_json::json!({"ok": true}));
821 }
822
823 #[tokio::test(start_paused = true)]
824 async fn update_wait_respects_short_timeout() {
825 let dir = tempfile::tempdir().expect("tempdir");
826 let target = target_for_base(dir.path(), "wf-timeout");
827 let request_id =
828 enqueue_update_request(&target, "adjust_budget", serde_json::json!({"max_usd": 10}))
829 .expect("enqueue update");
830
831 let waiter = wait_for_update_response(
832 &target,
833 "adjust_budget",
834 &request_id,
835 StdDuration::from_millis(10),
836 );
837 tokio::pin!(waiter);
838 assert!(matches!(futures::poll!(&mut waiter), Poll::Pending));
839
840 tokio::time::advance(StdDuration::from_millis(9)).await;
841 assert!(matches!(futures::poll!(&mut waiter), Poll::Pending));
842
843 tokio::time::advance(StdDuration::from_millis(1)).await;
844 let err = waiter.await.expect_err("update should time out");
845 assert!(err.contains("timed out"));
846 }
847
848 #[tokio::test]
849 async fn update_wait_propagates_state_errors() {
850 let dir = tempfile::tempdir().expect("tempdir");
851 let target = target_for_base(dir.path(), "wf-corrupt");
852 std::fs::create_dir_all(workflow_target_root(&target)).expect("state dir");
853 std::fs::write(workflow_state_path(&target), "{not json").expect("state write");
854
855 let err = wait_for_update_response(
856 &target,
857 "adjust_budget",
858 "request-1",
859 StdDuration::from_millis(10),
860 )
861 .await
862 .expect_err("corrupt state should fail immediately");
863 assert!(err.contains("workflow state parse error"));
864 }
865
866 #[test]
867 fn workflow_ids_preserve_namespace_without_path_segments() {
868 assert_eq!(
869 sanitize_workflow_id("workflow://local/start-my-day"),
870 "workflow___local_start-my-day"
871 );
872 assert_eq!(sanitize_workflow_id("../start-my-day"), ".._start-my-day");
873 assert_eq!(sanitize_workflow_id(".."), "workflow");
874 }
875
876 #[test]
877 fn persisted_path_drives_target_base_dir() {
878 let base = parse_target_json(
879 &serde_json::json!({
880 "workflow_id": "wf",
881 "persisted_path": "/tmp/demo/.harn-runs/run.json"
882 }),
883 None,
884 )
885 .expect("target");
886 assert_eq!(base.workflow_id, "wf");
887 assert_eq!(base.base_dir, PathBuf::from("/tmp/demo"));
888 }
889
890 #[test]
891 fn nested_persisted_path_drives_target_base_dir() {
892 let base = parse_target_json(
893 &serde_json::json!({
894 "workflow_id": "wf",
895 "persisted_path": "/tmp/demo/.harn-runs/session/run.json"
896 }),
897 None,
898 )
899 .expect("target");
900 assert_eq!(base.base_dir, PathBuf::from("/tmp/demo"));
901
902 let relative = parse_target_json(
903 &serde_json::json!({
904 "workflow_id": "wf",
905 "persisted_path": ".harn-runs/session/run.json"
906 }),
907 None,
908 )
909 .expect("target");
910 assert_eq!(relative.base_dir, PathBuf::from("."));
911 }
912}