1use std::collections::{BTreeMap, VecDeque};
2use std::path::{Path, PathBuf};
3use std::rc::Rc;
4use std::time::Duration as StdDuration;
5
6use serde::{Deserialize, Serialize};
7
8use crate::stdlib::process::runtime_root_base;
9use crate::stdlib::registration::{
10 async_builtin, register_builtin_group, AsyncBuiltin, BuiltinGroup, SyncBuiltin,
11};
12use crate::value::{VmError, VmValue};
13use crate::vm::{Vm, VmBuiltinArity};
14
15const DEFAULT_UPDATE_TIMEOUT_MS: u64 = 30_000;
16const UPDATE_POLL_INTERVAL_MS: u64 = 25;
17
18const WORKFLOW_MESSAGE_SYNC_PRIMITIVES: &[SyncBuiltin] = &[
19 SyncBuiltin::new("workflow.signal", workflow_signal_builtin)
20 .signature("workflow.signal(target, name, payload?)")
21 .arity(VmBuiltinArity::Range { min: 2, max: 3 })
22 .doc("Enqueue a workflow signal message."),
23 SyncBuiltin::new("workflow.query", workflow_query_builtin)
24 .signature("workflow.query(target, name)")
25 .arity(VmBuiltinArity::Exact(2))
26 .doc("Read the latest published workflow query value."),
27 SyncBuiltin::new("workflow.publish_query", workflow_publish_query_builtin)
28 .signature("workflow.publish_query(target, name, value?)")
29 .arity(VmBuiltinArity::Range { min: 2, max: 3 })
30 .doc("Publish a workflow query value."),
31 SyncBuiltin::new("workflow.receive", workflow_receive_builtin)
32 .signature("workflow.receive(target)")
33 .arity(VmBuiltinArity::Exact(1))
34 .doc("Receive the next workflow mailbox message."),
35 SyncBuiltin::new("workflow.respond_update", workflow_respond_update_builtin)
36 .signature("workflow.respond_update(target, request_id, value?, name?)")
37 .arity(VmBuiltinArity::Range { min: 2, max: 4 })
38 .doc("Respond to a pending workflow update request."),
39 SyncBuiltin::new("workflow.pause", workflow_pause_builtin)
40 .signature("workflow.pause(target)")
41 .arity(VmBuiltinArity::Exact(1))
42 .doc("Pause a workflow mailbox."),
43 SyncBuiltin::new("workflow.resume", workflow_resume_builtin)
44 .signature("workflow.resume(target)")
45 .arity(VmBuiltinArity::Exact(1))
46 .doc("Resume a workflow mailbox."),
47 SyncBuiltin::new("workflow.status", workflow_status_builtin)
48 .signature("workflow.status(target)")
49 .arity(VmBuiltinArity::Exact(1))
50 .doc("Return workflow mailbox status."),
51 SyncBuiltin::new("workflow.continue_as_new", workflow_continue_as_new_builtin)
52 .signature("workflow.continue_as_new(target)")
53 .arity(VmBuiltinArity::Exact(1))
54 .doc("Advance a workflow mailbox generation."),
55 SyncBuiltin::new("continue_as_new", continue_as_new_builtin)
56 .signature("continue_as_new(target)")
57 .arity(VmBuiltinArity::Exact(1))
58 .doc("Advance a workflow mailbox generation."),
59];
60
61const WORKFLOW_MESSAGE_ASYNC_PRIMITIVES: &[AsyncBuiltin] =
62 &[async_builtin!("workflow.update", workflow_update_builtin)
63 .signature("workflow.update(target, name, payload?, options?)")
64 .arity(VmBuiltinArity::Range { min: 2, max: 4 })
65 .doc("Enqueue a workflow update and wait for a response.")];
66
67const WORKFLOW_MESSAGE_PRIMITIVES: BuiltinGroup<'static> = BuiltinGroup::new()
68 .category("workflow.messages")
69 .sync(WORKFLOW_MESSAGE_SYNC_PRIMITIVES)
70 .async_(WORKFLOW_MESSAGE_ASYNC_PRIMITIVES);
71
72#[derive(Clone, Debug, Serialize, Deserialize)]
73pub struct WorkflowMessageRecord {
74 pub seq: u64,
75 pub kind: String,
76 pub name: String,
77 #[serde(skip_serializing_if = "Option::is_none")]
78 pub request_id: Option<String>,
79 pub payload: serde_json::Value,
80 pub enqueued_at: String,
81}
82
83#[derive(Clone, Debug, Serialize, Deserialize)]
84pub struct WorkflowQueryRecord {
85 pub value: serde_json::Value,
86 pub published_at: String,
87}
88
89#[derive(Clone, Debug, Serialize, Deserialize)]
90pub struct WorkflowUpdateResponseRecord {
91 pub request_id: String,
92 #[serde(skip_serializing_if = "Option::is_none")]
93 pub name: Option<String>,
94 pub value: serde_json::Value,
95 pub responded_at: String,
96}
97
98#[derive(Clone, Debug, Serialize, Deserialize)]
99pub struct WorkflowMailboxState {
100 #[serde(rename = "_type")]
101 pub type_name: String,
102 pub workflow_id: String,
103 #[serde(default = "default_generation")]
104 pub generation: u64,
105 #[serde(default)]
106 pub continue_as_new_count: u64,
107 #[serde(skip_serializing_if = "Option::is_none")]
108 pub last_continue_as_new_at: Option<String>,
109 #[serde(default)]
110 pub paused: bool,
111 #[serde(default)]
112 pub next_seq: u64,
113 #[serde(default)]
114 pub mailbox: VecDeque<WorkflowMessageRecord>,
115 #[serde(default)]
116 pub queries: BTreeMap<String, WorkflowQueryRecord>,
117 #[serde(default)]
118 pub responses: BTreeMap<String, WorkflowUpdateResponseRecord>,
119}
120
121impl Default for WorkflowMailboxState {
122 fn default() -> Self {
123 Self {
124 type_name: "workflow_mailbox".to_string(),
125 workflow_id: String::new(),
126 generation: default_generation(),
127 continue_as_new_count: 0,
128 last_continue_as_new_at: None,
129 paused: false,
130 next_seq: 0,
131 mailbox: VecDeque::new(),
132 queries: BTreeMap::new(),
133 responses: BTreeMap::new(),
134 }
135 }
136}
137
138fn default_generation() -> u64 {
139 1
140}
141
142#[derive(Clone, Debug, PartialEq, Eq)]
143struct WorkflowTarget {
144 workflow_id: String,
145 base_dir: PathBuf,
146}
147
148fn sanitize_workflow_id(raw: &str) -> String {
149 let trimmed = raw.trim();
150 let mut sanitized = String::with_capacity(trimmed.len());
151 for ch in trimmed.chars() {
152 if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_' | '.') {
153 sanitized.push(ch);
154 } else {
155 sanitized.push('_');
156 }
157 }
158 if sanitized.is_empty() || sanitized == "." || sanitized == ".." {
159 "workflow".to_string()
160 } else {
161 sanitized
162 }
163}
164
165fn workflow_base_dir_from_persisted_path(path: &Path) -> PathBuf {
166 for ancestor in path.ancestors() {
167 if ancestor.file_name().and_then(|value| value.to_str()) == Some(".harn-runs") {
168 return non_empty_path_or_dot(ancestor.parent().unwrap_or_else(|| Path::new(".")));
169 }
170 }
171 non_empty_path_or_dot(path.parent().unwrap_or_else(|| Path::new(".")))
172}
173
174fn non_empty_path_or_dot(path: &Path) -> PathBuf {
175 if path.as_os_str().is_empty() {
176 PathBuf::from(".")
177 } else {
178 path.to_path_buf()
179 }
180}
181
182fn workflow_target_root(target: &WorkflowTarget) -> PathBuf {
183 crate::runtime_paths::workflow_dir(&target.base_dir).join(&target.workflow_id)
184}
185
186fn workflow_state_path(target: &WorkflowTarget) -> PathBuf {
187 workflow_target_root(target).join("state.json")
188}
189
190fn now_rfc3339() -> String {
191 time::OffsetDateTime::now_utc()
192 .format(&time::format_description::well_known::Rfc3339)
193 .unwrap_or_else(|_| uuid::Uuid::now_v7().to_string())
194}
195
196fn load_state(target: &WorkflowTarget) -> Result<WorkflowMailboxState, String> {
197 let path = workflow_state_path(target);
198 if !path.exists() {
199 return Ok(WorkflowMailboxState {
200 workflow_id: target.workflow_id.clone(),
201 ..WorkflowMailboxState::default()
202 });
203 }
204 let text = std::fs::read_to_string(&path)
205 .map_err(|error| format!("workflow state read error: {error}"))?;
206 let mut state: WorkflowMailboxState = serde_json::from_str(&text)
207 .map_err(|error| format!("workflow state parse error: {error}"))?;
208 if state.type_name.is_empty() {
209 state.type_name = "workflow_mailbox".to_string();
210 }
211 if state.workflow_id.is_empty() {
212 state.workflow_id = target.workflow_id.clone();
213 }
214 if state.generation == 0 {
215 state.generation = 1;
216 }
217 Ok(state)
218}
219
220fn save_state(target: &WorkflowTarget, state: &WorkflowMailboxState) -> Result<(), String> {
221 let path = workflow_state_path(target);
222 let json = serde_json::to_string_pretty(state)
223 .map_err(|error| format!("workflow state encode error: {error}"))?;
224 crate::atomic_io::atomic_write(&path, json.as_bytes())
225 .map_err(|error| format!("workflow state write error: {error}"))
226}
227
228fn parse_target_json(
229 value: &serde_json::Value,
230 fallback_base_dir: Option<&Path>,
231) -> Option<WorkflowTarget> {
232 match value {
233 serde_json::Value::String(text) => Some(WorkflowTarget {
234 workflow_id: sanitize_workflow_id(text),
235 base_dir: fallback_base_dir
236 .map(Path::to_path_buf)
237 .unwrap_or_else(runtime_root_base),
238 }),
239 serde_json::Value::Object(map) => {
240 let workflow_id = map
241 .get("workflow_id")
242 .and_then(|value| value.as_str())
243 .or_else(|| map.get("workflow").and_then(|value| value.as_str()))
244 .or_else(|| {
245 map.get("run")
246 .and_then(|value| value.get("workflow_id"))
247 .and_then(|value| value.as_str())
248 })
249 .or_else(|| {
250 map.get("result")
251 .and_then(|value| value.get("run"))
252 .and_then(|value| value.get("workflow_id"))
253 .and_then(|value| value.as_str())
254 })?;
255 let explicit_base = map
256 .get("base_dir")
257 .and_then(|value| value.as_str())
258 .filter(|value| !value.trim().is_empty())
259 .map(PathBuf::from);
260 let persisted_path = map
261 .get("persisted_path")
262 .and_then(|value| value.as_str())
263 .or_else(|| map.get("path").and_then(|value| value.as_str()))
264 .or_else(|| {
265 map.get("run")
266 .and_then(|value| value.get("persisted_path"))
267 .and_then(|value| value.as_str())
268 })
269 .or_else(|| {
270 map.get("result")
271 .and_then(|value| value.get("run"))
272 .and_then(|value| value.get("persisted_path"))
273 .and_then(|value| value.as_str())
274 });
275 let base_dir = explicit_base
276 .or_else(|| {
277 persisted_path
278 .map(|path| workflow_base_dir_from_persisted_path(Path::new(path)))
279 })
280 .or_else(|| fallback_base_dir.map(Path::to_path_buf))
281 .unwrap_or_else(runtime_root_base);
282 Some(WorkflowTarget {
283 workflow_id: sanitize_workflow_id(workflow_id),
284 base_dir,
285 })
286 }
287 _ => None,
288 }
289}
290
291fn parse_target_vm(
292 value: Option<&VmValue>,
293 fallback_base_dir: Option<&Path>,
294 builtin: &str,
295) -> Result<WorkflowTarget, VmError> {
296 let value = value.ok_or_else(|| VmError::Runtime(format!("{builtin}: missing target")))?;
297 parse_target_json(&crate::llm::vm_value_to_json(value), fallback_base_dir).ok_or_else(|| {
298 VmError::Runtime(format!(
299 "{builtin}: target must be a workflow id string or dict with workflow_id/workflow"
300 ))
301 })
302}
303
304fn workflow_status_json(
305 target: &WorkflowTarget,
306 state: &WorkflowMailboxState,
307) -> serde_json::Value {
308 serde_json::json!({
309 "workflow_id": target.workflow_id,
310 "base_dir": target.base_dir.to_string_lossy(),
311 "generation": state.generation,
312 "paused": state.paused,
313 "pending_count": state.mailbox.len(),
314 "query_count": state.queries.len(),
315 "response_count": state.responses.len(),
316 "continue_as_new_count": state.continue_as_new_count,
317 "last_continue_as_new_at": state.last_continue_as_new_at,
318 })
319}
320
321fn target_for_base(base_dir: &Path, workflow_id: &str) -> WorkflowTarget {
322 WorkflowTarget {
323 workflow_id: sanitize_workflow_id(workflow_id),
324 base_dir: base_dir.to_path_buf(),
325 }
326}
327
328fn enqueue_message(
329 target: &WorkflowTarget,
330 kind: &str,
331 name: &str,
332 payload: serde_json::Value,
333 request_id: Option<String>,
334) -> Result<serde_json::Value, String> {
335 let mut state = load_state(target)?;
336 let message = push_message(&mut state, kind, name, payload, request_id);
337 save_state(target, &state)?;
338 Ok(serde_json::json!({
339 "workflow_id": target.workflow_id,
340 "message": message,
341 "status": workflow_status_json(target, &state),
342 }))
343}
344
345fn push_message(
346 state: &mut WorkflowMailboxState,
347 kind: &str,
348 name: &str,
349 payload: serde_json::Value,
350 request_id: Option<String>,
351) -> WorkflowMessageRecord {
352 state.next_seq += 1;
353 let message = WorkflowMessageRecord {
354 seq: state.next_seq,
355 kind: kind.to_string(),
356 name: name.to_string(),
357 request_id,
358 payload,
359 enqueued_at: now_rfc3339(),
360 };
361 state.mailbox.push_back(message.clone());
362 message
363}
364
365fn receive_message(target: &WorkflowTarget) -> Result<Option<WorkflowMessageRecord>, String> {
366 let mut state = load_state(target)?;
367 let message = state.mailbox.pop_front();
368 if message.is_some() {
369 save_state(target, &state)?;
370 }
371 Ok(message)
372}
373
374pub fn workflow_signal_for_base(
375 base_dir: &Path,
376 workflow_id: &str,
377 name: &str,
378 payload: serde_json::Value,
379) -> Result<serde_json::Value, String> {
380 let target = target_for_base(base_dir, workflow_id);
381 enqueue_message(&target, "signal", name, payload, None)
382}
383
384pub fn workflow_query_for_base(
385 base_dir: &Path,
386 workflow_id: &str,
387 name: &str,
388) -> Result<serde_json::Value, String> {
389 let target = target_for_base(base_dir, workflow_id);
390 let state = load_state(&target)?;
391 Ok(state
392 .queries
393 .get(name)
394 .map(|record| record.value.clone())
395 .unwrap_or(serde_json::Value::Null))
396}
397
398pub fn workflow_publish_query_for_base(
399 base_dir: &Path,
400 workflow_id: &str,
401 name: &str,
402 value: serde_json::Value,
403) -> Result<serde_json::Value, String> {
404 let target = target_for_base(base_dir, workflow_id);
405 let mut state = load_state(&target)?;
406 state.queries.insert(
407 name.to_string(),
408 WorkflowQueryRecord {
409 value,
410 published_at: now_rfc3339(),
411 },
412 );
413 save_state(&target, &state)?;
414 Ok(workflow_status_json(&target, &state))
415}
416
417pub fn workflow_pause_for_base(
418 base_dir: &Path,
419 workflow_id: &str,
420) -> Result<serde_json::Value, String> {
421 let target = target_for_base(base_dir, workflow_id);
422 let mut state = load_state(&target)?;
423 state.paused = true;
424 push_message(&mut state, "control", "pause", serde_json::json!({}), None);
425 save_state(&target, &state)?;
426 Ok(workflow_status_json(&target, &state))
427}
428
429pub fn workflow_resume_for_base(
430 base_dir: &Path,
431 workflow_id: &str,
432) -> Result<serde_json::Value, String> {
433 let target = target_for_base(base_dir, workflow_id);
434 let mut state = load_state(&target)?;
435 state.paused = false;
436 push_message(&mut state, "control", "resume", serde_json::json!({}), None);
437 save_state(&target, &state)?;
438 Ok(workflow_status_json(&target, &state))
439}
440
441pub async fn workflow_update_for_base(
442 base_dir: &Path,
443 workflow_id: &str,
444 name: &str,
445 payload: serde_json::Value,
446 timeout: StdDuration,
447) -> Result<serde_json::Value, String> {
448 let target = target_for_base(base_dir, workflow_id);
449 let request_id = enqueue_update_request(&target, name, payload)?;
450 wait_for_update_response(&target, name, &request_id, timeout).await
451}
452
453fn enqueue_update_request(
454 target: &WorkflowTarget,
455 name: &str,
456 payload: serde_json::Value,
457) -> Result<String, String> {
458 let request_id = uuid::Uuid::now_v7().to_string();
459 enqueue_message(target, "update", name, payload, Some(request_id.clone()))?;
460 Ok(request_id)
461}
462
463async fn wait_for_update_response(
464 target: &WorkflowTarget,
465 name: &str,
466 request_id: &str,
467 timeout: StdDuration,
468) -> Result<serde_json::Value, String> {
469 let deadline = tokio::time::Instant::now() + timeout;
470 loop {
471 match update_response_value(target, request_id) {
472 Ok(Some(value)) => return Ok(value),
473 Ok(None) => {}
474 Err(error) => return Err(error),
475 }
476
477 let now = tokio::time::Instant::now();
478 if now >= deadline {
479 break;
480 }
481 let next_poll = now + StdDuration::from_millis(UPDATE_POLL_INTERVAL_MS);
482 tokio::time::sleep_until(next_poll.min(deadline)).await;
483 }
484 Err(format!(
485 "workflow update '{name}' timed out for '{}'",
486 target.workflow_id
487 ))
488}
489
490fn update_response_value(
491 target: &WorkflowTarget,
492 request_id: &str,
493) -> Result<Option<serde_json::Value>, String> {
494 Ok(load_state(target)?
495 .responses
496 .get(request_id)
497 .map(|response| response.value.clone()))
498}
499
500pub fn workflow_respond_update_for_base(
501 base_dir: &Path,
502 workflow_id: &str,
503 request_id: &str,
504 name: Option<&str>,
505 value: serde_json::Value,
506) -> Result<serde_json::Value, String> {
507 let target = target_for_base(base_dir, workflow_id);
508 record_update_response(&target, request_id, name, value)
509}
510
511fn record_update_response(
512 target: &WorkflowTarget,
513 request_id: &str,
514 name: Option<&str>,
515 value: serde_json::Value,
516) -> Result<serde_json::Value, String> {
517 let mut state = load_state(target)?;
518 state.responses.insert(
519 request_id.to_string(),
520 WorkflowUpdateResponseRecord {
521 request_id: request_id.to_string(),
522 name: name.map(ToString::to_string),
523 value,
524 responded_at: now_rfc3339(),
525 },
526 );
527 save_state(target, &state)?;
528 Ok(workflow_status_json(target, &state))
529}
530
531pub(crate) fn register_workflow_message_builtins(vm: &mut Vm) {
532 vm.set_global(
533 "workflow",
534 VmValue::Dict(Rc::new(BTreeMap::from([
535 (
536 "signal".to_string(),
537 VmValue::BuiltinRef(Rc::from("workflow.signal")),
538 ),
539 (
540 "query".to_string(),
541 VmValue::BuiltinRef(Rc::from("workflow.query")),
542 ),
543 (
544 "update".to_string(),
545 VmValue::BuiltinRef(Rc::from("workflow.update")),
546 ),
547 (
548 "publish_query".to_string(),
549 VmValue::BuiltinRef(Rc::from("workflow.publish_query")),
550 ),
551 (
552 "receive".to_string(),
553 VmValue::BuiltinRef(Rc::from("workflow.receive")),
554 ),
555 (
556 "respond_update".to_string(),
557 VmValue::BuiltinRef(Rc::from("workflow.respond_update")),
558 ),
559 (
560 "pause".to_string(),
561 VmValue::BuiltinRef(Rc::from("workflow.pause")),
562 ),
563 (
564 "resume".to_string(),
565 VmValue::BuiltinRef(Rc::from("workflow.resume")),
566 ),
567 (
568 "status".to_string(),
569 VmValue::BuiltinRef(Rc::from("workflow.status")),
570 ),
571 (
572 "continue_as_new".to_string(),
573 VmValue::BuiltinRef(Rc::from("workflow.continue_as_new")),
574 ),
575 ]))),
576 );
577
578 register_builtin_group(vm, WORKFLOW_MESSAGE_PRIMITIVES);
579}
580
581fn workflow_signal_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
582 let target = parse_target_vm(args.first(), None, "workflow.signal")?;
583 let name = args
584 .get(1)
585 .map(|value| value.display())
586 .filter(|value| !value.is_empty())
587 .ok_or_else(|| VmError::Runtime("workflow.signal: missing name".to_string()))?;
588 let payload = args
589 .get(2)
590 .map(crate::llm::vm_value_to_json)
591 .unwrap_or(serde_json::Value::Null);
592 let result =
593 enqueue_message(&target, "signal", &name, payload, None).map_err(VmError::Runtime)?;
594 Ok(crate::stdlib::json_to_vm_value(&result))
595}
596
597fn workflow_query_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
598 let target = parse_target_vm(args.first(), None, "workflow.query")?;
599 let name = args
600 .get(1)
601 .map(|value| value.display())
602 .filter(|value| !value.is_empty())
603 .ok_or_else(|| VmError::Runtime("workflow.query: missing name".to_string()))?;
604 let state = load_state(&target).map_err(VmError::Runtime)?;
605 Ok(crate::stdlib::json_to_vm_value(
606 &state
607 .queries
608 .get(&name)
609 .map(|record| record.value.clone())
610 .unwrap_or(serde_json::Value::Null),
611 ))
612}
613
614async fn workflow_update_builtin(args: Vec<VmValue>) -> Result<VmValue, VmError> {
615 let target = parse_target_vm(args.first(), None, "workflow.update")?;
616 let name = args
617 .get(1)
618 .map(|value| value.display())
619 .filter(|value| !value.is_empty())
620 .ok_or_else(|| VmError::Runtime("workflow.update: missing name".to_string()))?;
621 let payload = args
622 .get(2)
623 .map(crate::llm::vm_value_to_json)
624 .unwrap_or(serde_json::Value::Null);
625 let timeout_ms = args
626 .get(3)
627 .and_then(|value| value.as_dict())
628 .and_then(|dict| dict.get("timeout_ms"))
629 .and_then(VmValue::as_int)
630 .unwrap_or(DEFAULT_UPDATE_TIMEOUT_MS as i64)
631 .max(1) as u64;
632 let result = workflow_update_for_base(
633 &target.base_dir,
634 &target.workflow_id,
635 &name,
636 payload,
637 StdDuration::from_millis(timeout_ms),
638 )
639 .await
640 .map_err(VmError::Runtime)?;
641 Ok(crate::stdlib::json_to_vm_value(&result))
642}
643
644fn workflow_publish_query_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
645 let target = parse_target_vm(args.first(), None, "workflow.publish_query")?;
646 let name = args
647 .get(1)
648 .map(|value| value.display())
649 .filter(|value| !value.is_empty())
650 .ok_or_else(|| VmError::Runtime("workflow.publish_query: missing name".to_string()))?;
651 let value = args
652 .get(2)
653 .map(crate::llm::vm_value_to_json)
654 .unwrap_or(serde_json::Value::Null);
655 let result =
656 workflow_publish_query_for_base(&target.base_dir, &target.workflow_id, &name, value)
657 .map_err(VmError::Runtime)?;
658 Ok(crate::stdlib::json_to_vm_value(&result))
659}
660
661fn workflow_receive_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
662 let target = parse_target_vm(args.first(), None, "workflow.receive")?;
663 let Some(message) = receive_message(&target).map_err(VmError::Runtime)? else {
664 return Ok(VmValue::Nil);
665 };
666 Ok(crate::stdlib::json_to_vm_value(&serde_json::json!({
667 "workflow_id": target.workflow_id,
668 "seq": message.seq,
669 "kind": message.kind,
670 "name": message.name,
671 "request_id": message.request_id,
672 "payload": message.payload,
673 "enqueued_at": message.enqueued_at,
674 })))
675}
676
677fn workflow_respond_update_builtin(
678 args: &[VmValue],
679 _out: &mut String,
680) -> Result<VmValue, VmError> {
681 let target = parse_target_vm(args.first(), None, "workflow.respond_update")?;
682 let request_id = args
683 .get(1)
684 .map(|value| value.display())
685 .filter(|value| !value.is_empty())
686 .ok_or_else(|| {
687 VmError::Runtime("workflow.respond_update: missing request id".to_string())
688 })?;
689 let value = args
690 .get(2)
691 .map(crate::llm::vm_value_to_json)
692 .unwrap_or(serde_json::Value::Null);
693 let name = args
694 .get(3)
695 .map(|value| value.display())
696 .filter(|value| !value.is_empty());
697 let result = workflow_respond_update_for_base(
698 &target.base_dir,
699 &target.workflow_id,
700 &request_id,
701 name.as_deref(),
702 value,
703 )
704 .map_err(VmError::Runtime)?;
705 Ok(crate::stdlib::json_to_vm_value(&result))
706}
707
708fn workflow_pause_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
709 let target = parse_target_vm(args.first(), None, "workflow.pause")?;
710 let result =
711 workflow_pause_for_base(&target.base_dir, &target.workflow_id).map_err(VmError::Runtime)?;
712 Ok(crate::stdlib::json_to_vm_value(&result))
713}
714
715fn workflow_resume_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
716 let target = parse_target_vm(args.first(), None, "workflow.resume")?;
717 let result = workflow_resume_for_base(&target.base_dir, &target.workflow_id)
718 .map_err(VmError::Runtime)?;
719 Ok(crate::stdlib::json_to_vm_value(&result))
720}
721
722fn workflow_status_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
723 let target = parse_target_vm(args.first(), None, "workflow.status")?;
724 let state = load_state(&target).map_err(VmError::Runtime)?;
725 Ok(crate::stdlib::json_to_vm_value(&workflow_status_json(
726 &target, &state,
727 )))
728}
729
730fn workflow_continue_as_new_builtin(
731 args: &[VmValue],
732 _out: &mut String,
733) -> Result<VmValue, VmError> {
734 continue_as_new_for_label(args, "workflow.continue_as_new")
735}
736
737fn continue_as_new_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
738 continue_as_new_for_label(args, "continue_as_new")
739}
740
741fn continue_as_new_for_label(args: &[VmValue], label: &str) -> Result<VmValue, VmError> {
742 let target = parse_target_vm(args.first(), None, label)?;
743 let mut state = load_state(&target).map_err(VmError::Runtime)?;
744 state.generation += 1;
745 state.continue_as_new_count += 1;
746 state.last_continue_as_new_at = Some(now_rfc3339());
747 state.responses.clear();
748 save_state(&target, &state).map_err(VmError::Runtime)?;
749 Ok(crate::stdlib::json_to_vm_value(&workflow_status_json(
750 &target, &state,
751 )))
752}
753
754#[cfg(test)]
755mod tests {
756 use super::*;
757 use std::task::Poll;
758
759 #[tokio::test(start_paused = true)]
760 async fn update_round_trip_waits_for_response() {
761 let dir = tempfile::tempdir().expect("tempdir");
762 let workflow_id = "wf-update";
763 let base_dir = dir.path().to_path_buf();
764 let target = target_for_base(&base_dir, workflow_id);
765 let request_id =
766 enqueue_update_request(&target, "adjust_budget", serde_json::json!({"max_usd": 10}))
767 .expect("enqueue update");
768
769 let message = receive_message(&target)
770 .expect("receive queued update")
771 .expect("queued update");
772 assert_eq!(message.kind, "update");
773 assert_eq!(message.name, "adjust_budget");
774 assert_eq!(message.request_id.as_deref(), Some(request_id.as_str()));
775 assert_eq!(
776 update_response_value(&target, &request_id).expect("read response"),
777 None
778 );
779
780 let waiter = wait_for_update_response(
781 &target,
782 "adjust_budget",
783 &request_id,
784 StdDuration::from_millis(500),
785 );
786 tokio::pin!(waiter);
787 assert!(matches!(futures::poll!(&mut waiter), Poll::Pending));
788
789 workflow_respond_update_for_base(
790 &base_dir,
791 workflow_id,
792 &request_id,
793 Some("adjust_budget"),
794 serde_json::json!({"ok": true}),
795 )
796 .expect("save response");
797 assert_eq!(
798 update_response_value(&target, &request_id).expect("read response"),
799 Some(serde_json::json!({"ok": true}))
800 );
801 tokio::time::advance(StdDuration::from_millis(UPDATE_POLL_INTERVAL_MS)).await;
802
803 let result = waiter.await.expect("update result");
804 assert_eq!(result, serde_json::json!({"ok": true}));
805 }
806
807 #[tokio::test(start_paused = true)]
808 async fn update_wait_respects_short_timeout() {
809 let dir = tempfile::tempdir().expect("tempdir");
810 let target = target_for_base(dir.path(), "wf-timeout");
811 let request_id =
812 enqueue_update_request(&target, "adjust_budget", serde_json::json!({"max_usd": 10}))
813 .expect("enqueue update");
814
815 let waiter = wait_for_update_response(
816 &target,
817 "adjust_budget",
818 &request_id,
819 StdDuration::from_millis(10),
820 );
821 tokio::pin!(waiter);
822 assert!(matches!(futures::poll!(&mut waiter), Poll::Pending));
823
824 tokio::time::advance(StdDuration::from_millis(9)).await;
825 assert!(matches!(futures::poll!(&mut waiter), Poll::Pending));
826
827 tokio::time::advance(StdDuration::from_millis(1)).await;
828 let err = waiter.await.expect_err("update should time out");
829 assert!(err.contains("timed out"));
830 }
831
832 #[tokio::test]
833 async fn update_wait_propagates_state_errors() {
834 let dir = tempfile::tempdir().expect("tempdir");
835 let target = target_for_base(dir.path(), "wf-corrupt");
836 std::fs::create_dir_all(workflow_target_root(&target)).expect("state dir");
837 std::fs::write(workflow_state_path(&target), "{not json").expect("state write");
838
839 let err = wait_for_update_response(
840 &target,
841 "adjust_budget",
842 "request-1",
843 StdDuration::from_millis(10),
844 )
845 .await
846 .expect_err("corrupt state should fail immediately");
847 assert!(err.contains("workflow state parse error"));
848 }
849
850 #[test]
851 fn workflow_ids_preserve_namespace_without_path_segments() {
852 assert_eq!(
853 sanitize_workflow_id("workflow://local/start-my-day"),
854 "workflow___local_start-my-day"
855 );
856 assert_eq!(sanitize_workflow_id("../start-my-day"), ".._start-my-day");
857 assert_eq!(sanitize_workflow_id(".."), "workflow");
858 }
859
860 #[test]
861 fn persisted_path_drives_target_base_dir() {
862 let base = parse_target_json(
863 &serde_json::json!({
864 "workflow_id": "wf",
865 "persisted_path": "/tmp/demo/.harn-runs/run.json"
866 }),
867 None,
868 )
869 .expect("target");
870 assert_eq!(base.workflow_id, "wf");
871 assert_eq!(base.base_dir, PathBuf::from("/tmp/demo"));
872 }
873
874 #[test]
875 fn nested_persisted_path_drives_target_base_dir() {
876 let base = parse_target_json(
877 &serde_json::json!({
878 "workflow_id": "wf",
879 "persisted_path": "/tmp/demo/.harn-runs/session/run.json"
880 }),
881 None,
882 )
883 .expect("target");
884 assert_eq!(base.base_dir, PathBuf::from("/tmp/demo"));
885
886 let relative = parse_target_json(
887 &serde_json::json!({
888 "workflow_id": "wf",
889 "persisted_path": ".harn-runs/session/run.json"
890 }),
891 None,
892 )
893 .expect("target");
894 assert_eq!(relative.base_dir, PathBuf::from("."));
895 }
896}