1use std::collections::{BTreeMap, VecDeque};
2use std::path::{Path, PathBuf};
3use std::rc::Rc;
4use std::time::Duration as StdDuration;
5
6use serde::{Deserialize, Serialize};
7
8use crate::stdlib::process::runtime_root_base;
9use crate::stdlib::registration::{
10 async_builtin, register_builtin_group, AsyncBuiltin, BuiltinGroup, SyncBuiltin,
11};
12use crate::value::{VmError, VmValue};
13use crate::vm::{Vm, VmBuiltinArity};
14
15const DEFAULT_UPDATE_TIMEOUT_MS: u64 = 30_000;
16const UPDATE_POLL_INTERVAL_MS: u64 = 25;
17
18const WORKFLOW_MESSAGE_SYNC_PRIMITIVES: &[SyncBuiltin] = &[
19 SyncBuiltin::new("workflow.signal", workflow_signal_builtin)
20 .signature("workflow.signal(target, name, payload?)")
21 .arity(VmBuiltinArity::Range { min: 2, max: 3 })
22 .doc("Enqueue a workflow signal message."),
23 SyncBuiltin::new("workflow.query", workflow_query_builtin)
24 .signature("workflow.query(target, name)")
25 .arity(VmBuiltinArity::Exact(2))
26 .doc("Read the latest published workflow query value."),
27 SyncBuiltin::new("workflow.publish_query", workflow_publish_query_builtin)
28 .signature("workflow.publish_query(target, name, value?)")
29 .arity(VmBuiltinArity::Range { min: 2, max: 3 })
30 .doc("Publish a workflow query value."),
31 SyncBuiltin::new("workflow.receive", workflow_receive_builtin)
32 .signature("workflow.receive(target)")
33 .arity(VmBuiltinArity::Exact(1))
34 .doc("Receive the next workflow mailbox message."),
35 SyncBuiltin::new("workflow.respond_update", workflow_respond_update_builtin)
36 .signature("workflow.respond_update(target, request_id, value?, name?)")
37 .arity(VmBuiltinArity::Range { min: 2, max: 4 })
38 .doc("Respond to a pending workflow update request."),
39 SyncBuiltin::new("workflow.pause", workflow_pause_builtin)
40 .signature("workflow.pause(target)")
41 .arity(VmBuiltinArity::Exact(1))
42 .doc("Pause a workflow mailbox."),
43 SyncBuiltin::new("workflow.resume", workflow_resume_builtin)
44 .signature("workflow.resume(target)")
45 .arity(VmBuiltinArity::Exact(1))
46 .doc("Resume a workflow mailbox."),
47 SyncBuiltin::new("workflow.status", workflow_status_builtin)
48 .signature("workflow.status(target)")
49 .arity(VmBuiltinArity::Exact(1))
50 .doc("Return workflow mailbox status."),
51 SyncBuiltin::new("workflow.continue_as_new", workflow_continue_as_new_builtin)
52 .signature("workflow.continue_as_new(target)")
53 .arity(VmBuiltinArity::Exact(1))
54 .doc("Advance a workflow mailbox generation."),
55 SyncBuiltin::new("continue_as_new", continue_as_new_builtin)
56 .signature("continue_as_new(target)")
57 .arity(VmBuiltinArity::Exact(1))
58 .doc("Advance a workflow mailbox generation."),
59];
60
61const WORKFLOW_MESSAGE_ASYNC_PRIMITIVES: &[AsyncBuiltin] =
62 &[async_builtin!("workflow.update", workflow_update_builtin)
63 .signature("workflow.update(target, name, payload?, options?)")
64 .arity(VmBuiltinArity::Range { min: 2, max: 4 })
65 .doc("Enqueue a workflow update and wait for a response.")];
66
67const WORKFLOW_MESSAGE_PRIMITIVES: BuiltinGroup<'static> = BuiltinGroup::new()
68 .category("workflow.messages")
69 .sync(WORKFLOW_MESSAGE_SYNC_PRIMITIVES)
70 .async_(WORKFLOW_MESSAGE_ASYNC_PRIMITIVES);
71
72#[derive(Clone, Debug, Serialize, Deserialize)]
73pub struct WorkflowMessageRecord {
74 pub seq: u64,
75 pub kind: String,
76 pub name: String,
77 #[serde(skip_serializing_if = "Option::is_none")]
78 pub request_id: Option<String>,
79 pub payload: serde_json::Value,
80 pub enqueued_at: String,
81}
82
83#[derive(Clone, Debug, Serialize, Deserialize)]
84pub struct WorkflowQueryRecord {
85 pub value: serde_json::Value,
86 pub published_at: String,
87}
88
89#[derive(Clone, Debug, Serialize, Deserialize)]
90pub struct WorkflowUpdateResponseRecord {
91 pub request_id: String,
92 #[serde(skip_serializing_if = "Option::is_none")]
93 pub name: Option<String>,
94 pub value: serde_json::Value,
95 pub responded_at: String,
96}
97
98#[derive(Clone, Debug, Serialize, Deserialize)]
99pub struct WorkflowMailboxState {
100 #[serde(rename = "_type")]
101 pub type_name: String,
102 pub workflow_id: String,
103 #[serde(default = "default_generation")]
104 pub generation: u64,
105 #[serde(default)]
106 pub continue_as_new_count: u64,
107 #[serde(skip_serializing_if = "Option::is_none")]
108 pub last_continue_as_new_at: Option<String>,
109 #[serde(default)]
110 pub paused: bool,
111 #[serde(default)]
112 pub next_seq: u64,
113 #[serde(default)]
114 pub mailbox: VecDeque<WorkflowMessageRecord>,
115 #[serde(default)]
116 pub queries: BTreeMap<String, WorkflowQueryRecord>,
117 #[serde(default)]
118 pub responses: BTreeMap<String, WorkflowUpdateResponseRecord>,
119}
120
121impl Default for WorkflowMailboxState {
122 fn default() -> Self {
123 Self {
124 type_name: "workflow_mailbox".to_string(),
125 workflow_id: String::new(),
126 generation: default_generation(),
127 continue_as_new_count: 0,
128 last_continue_as_new_at: None,
129 paused: false,
130 next_seq: 0,
131 mailbox: VecDeque::new(),
132 queries: BTreeMap::new(),
133 responses: BTreeMap::new(),
134 }
135 }
136}
137
138fn default_generation() -> u64 {
139 1
140}
141
142#[derive(Clone, Debug, PartialEq, Eq)]
143struct WorkflowTarget {
144 workflow_id: String,
145 base_dir: PathBuf,
146}
147
148fn sanitize_workflow_id(raw: &str) -> String {
149 let trimmed = raw.trim();
150 let base = Path::new(trimmed)
151 .file_name()
152 .and_then(|value| value.to_str())
153 .unwrap_or(trimmed);
154 if base.is_empty() || base == "." || base == ".." {
155 "workflow".to_string()
156 } else {
157 base.to_string()
158 }
159}
160
161fn workflow_base_dir_from_persisted_path(path: &Path) -> PathBuf {
162 let parent = path.parent().unwrap_or_else(|| Path::new("."));
163 if parent.file_name().and_then(|value| value.to_str()) == Some(".harn-runs") {
164 parent.parent().unwrap_or(parent).to_path_buf()
165 } else {
166 parent.to_path_buf()
167 }
168}
169
170fn workflow_target_root(target: &WorkflowTarget) -> PathBuf {
171 crate::runtime_paths::workflow_dir(&target.base_dir).join(&target.workflow_id)
172}
173
174fn workflow_state_path(target: &WorkflowTarget) -> PathBuf {
175 workflow_target_root(target).join("state.json")
176}
177
178fn now_rfc3339() -> String {
179 time::OffsetDateTime::now_utc()
180 .format(&time::format_description::well_known::Rfc3339)
181 .unwrap_or_else(|_| uuid::Uuid::now_v7().to_string())
182}
183
184fn load_state(target: &WorkflowTarget) -> Result<WorkflowMailboxState, String> {
185 let path = workflow_state_path(target);
186 if !path.exists() {
187 return Ok(WorkflowMailboxState {
188 workflow_id: target.workflow_id.clone(),
189 ..WorkflowMailboxState::default()
190 });
191 }
192 let text = std::fs::read_to_string(&path)
193 .map_err(|error| format!("workflow state read error: {error}"))?;
194 let mut state: WorkflowMailboxState = serde_json::from_str(&text)
195 .map_err(|error| format!("workflow state parse error: {error}"))?;
196 if state.type_name.is_empty() {
197 state.type_name = "workflow_mailbox".to_string();
198 }
199 if state.workflow_id.is_empty() {
200 state.workflow_id = target.workflow_id.clone();
201 }
202 if state.generation == 0 {
203 state.generation = 1;
204 }
205 Ok(state)
206}
207
208fn save_state(target: &WorkflowTarget, state: &WorkflowMailboxState) -> Result<(), String> {
209 let path = workflow_state_path(target);
210 let json = serde_json::to_string_pretty(state)
211 .map_err(|error| format!("workflow state encode error: {error}"))?;
212 crate::atomic_io::atomic_write(&path, json.as_bytes())
213 .map_err(|error| format!("workflow state write error: {error}"))
214}
215
216fn parse_target_json(
217 value: &serde_json::Value,
218 fallback_base_dir: Option<&Path>,
219) -> Option<WorkflowTarget> {
220 match value {
221 serde_json::Value::String(text) => Some(WorkflowTarget {
222 workflow_id: sanitize_workflow_id(text),
223 base_dir: fallback_base_dir
224 .map(Path::to_path_buf)
225 .unwrap_or_else(runtime_root_base),
226 }),
227 serde_json::Value::Object(map) => {
228 let workflow_id = map
229 .get("workflow_id")
230 .and_then(|value| value.as_str())
231 .or_else(|| map.get("workflow").and_then(|value| value.as_str()))
232 .or_else(|| {
233 map.get("run")
234 .and_then(|value| value.get("workflow_id"))
235 .and_then(|value| value.as_str())
236 })
237 .or_else(|| {
238 map.get("result")
239 .and_then(|value| value.get("run"))
240 .and_then(|value| value.get("workflow_id"))
241 .and_then(|value| value.as_str())
242 })?;
243 let explicit_base = map
244 .get("base_dir")
245 .and_then(|value| value.as_str())
246 .filter(|value| !value.trim().is_empty())
247 .map(PathBuf::from);
248 let persisted_path = map
249 .get("persisted_path")
250 .and_then(|value| value.as_str())
251 .or_else(|| map.get("path").and_then(|value| value.as_str()))
252 .or_else(|| {
253 map.get("run")
254 .and_then(|value| value.get("persisted_path"))
255 .and_then(|value| value.as_str())
256 })
257 .or_else(|| {
258 map.get("result")
259 .and_then(|value| value.get("run"))
260 .and_then(|value| value.get("persisted_path"))
261 .and_then(|value| value.as_str())
262 });
263 let base_dir = explicit_base
264 .or_else(|| {
265 persisted_path
266 .map(|path| workflow_base_dir_from_persisted_path(Path::new(path)))
267 })
268 .or_else(|| fallback_base_dir.map(Path::to_path_buf))
269 .unwrap_or_else(runtime_root_base);
270 Some(WorkflowTarget {
271 workflow_id: sanitize_workflow_id(workflow_id),
272 base_dir,
273 })
274 }
275 _ => None,
276 }
277}
278
279fn parse_target_vm(
280 value: Option<&VmValue>,
281 fallback_base_dir: Option<&Path>,
282 builtin: &str,
283) -> Result<WorkflowTarget, VmError> {
284 let value = value.ok_or_else(|| VmError::Runtime(format!("{builtin}: missing target")))?;
285 parse_target_json(&crate::llm::vm_value_to_json(value), fallback_base_dir).ok_or_else(|| {
286 VmError::Runtime(format!(
287 "{builtin}: target must be a workflow id string or dict with workflow_id/workflow"
288 ))
289 })
290}
291
292fn workflow_status_json(
293 target: &WorkflowTarget,
294 state: &WorkflowMailboxState,
295) -> serde_json::Value {
296 serde_json::json!({
297 "workflow_id": target.workflow_id,
298 "base_dir": target.base_dir.to_string_lossy(),
299 "generation": state.generation,
300 "paused": state.paused,
301 "pending_count": state.mailbox.len(),
302 "query_count": state.queries.len(),
303 "response_count": state.responses.len(),
304 "continue_as_new_count": state.continue_as_new_count,
305 "last_continue_as_new_at": state.last_continue_as_new_at,
306 })
307}
308
309fn target_for_base(base_dir: &Path, workflow_id: &str) -> WorkflowTarget {
310 WorkflowTarget {
311 workflow_id: sanitize_workflow_id(workflow_id),
312 base_dir: base_dir.to_path_buf(),
313 }
314}
315
316fn enqueue_message(
317 target: &WorkflowTarget,
318 kind: &str,
319 name: &str,
320 payload: serde_json::Value,
321 request_id: Option<String>,
322) -> Result<serde_json::Value, String> {
323 let mut state = load_state(target)?;
324 let message = push_message(&mut state, kind, name, payload, request_id);
325 save_state(target, &state)?;
326 Ok(serde_json::json!({
327 "workflow_id": target.workflow_id,
328 "message": message,
329 "status": workflow_status_json(target, &state),
330 }))
331}
332
333fn push_message(
334 state: &mut WorkflowMailboxState,
335 kind: &str,
336 name: &str,
337 payload: serde_json::Value,
338 request_id: Option<String>,
339) -> WorkflowMessageRecord {
340 state.next_seq += 1;
341 let message = WorkflowMessageRecord {
342 seq: state.next_seq,
343 kind: kind.to_string(),
344 name: name.to_string(),
345 request_id,
346 payload,
347 enqueued_at: now_rfc3339(),
348 };
349 state.mailbox.push_back(message.clone());
350 message
351}
352
353fn receive_message(target: &WorkflowTarget) -> Result<Option<WorkflowMessageRecord>, String> {
354 let mut state = load_state(target)?;
355 let message = state.mailbox.pop_front();
356 if message.is_some() {
357 save_state(target, &state)?;
358 }
359 Ok(message)
360}
361
362pub fn workflow_signal_for_base(
363 base_dir: &Path,
364 workflow_id: &str,
365 name: &str,
366 payload: serde_json::Value,
367) -> Result<serde_json::Value, String> {
368 let target = target_for_base(base_dir, workflow_id);
369 enqueue_message(&target, "signal", name, payload, None)
370}
371
372pub fn workflow_query_for_base(
373 base_dir: &Path,
374 workflow_id: &str,
375 name: &str,
376) -> Result<serde_json::Value, String> {
377 let target = target_for_base(base_dir, workflow_id);
378 let state = load_state(&target)?;
379 Ok(state
380 .queries
381 .get(name)
382 .map(|record| record.value.clone())
383 .unwrap_or(serde_json::Value::Null))
384}
385
386pub fn workflow_publish_query_for_base(
387 base_dir: &Path,
388 workflow_id: &str,
389 name: &str,
390 value: serde_json::Value,
391) -> Result<serde_json::Value, String> {
392 let target = target_for_base(base_dir, workflow_id);
393 let mut state = load_state(&target)?;
394 state.queries.insert(
395 name.to_string(),
396 WorkflowQueryRecord {
397 value,
398 published_at: now_rfc3339(),
399 },
400 );
401 save_state(&target, &state)?;
402 Ok(workflow_status_json(&target, &state))
403}
404
405pub fn workflow_pause_for_base(
406 base_dir: &Path,
407 workflow_id: &str,
408) -> Result<serde_json::Value, String> {
409 let target = target_for_base(base_dir, workflow_id);
410 let mut state = load_state(&target)?;
411 state.paused = true;
412 push_message(&mut state, "control", "pause", serde_json::json!({}), None);
413 save_state(&target, &state)?;
414 Ok(workflow_status_json(&target, &state))
415}
416
417pub fn workflow_resume_for_base(
418 base_dir: &Path,
419 workflow_id: &str,
420) -> Result<serde_json::Value, String> {
421 let target = target_for_base(base_dir, workflow_id);
422 let mut state = load_state(&target)?;
423 state.paused = false;
424 push_message(&mut state, "control", "resume", serde_json::json!({}), None);
425 save_state(&target, &state)?;
426 Ok(workflow_status_json(&target, &state))
427}
428
429pub async fn workflow_update_for_base(
430 base_dir: &Path,
431 workflow_id: &str,
432 name: &str,
433 payload: serde_json::Value,
434 timeout: StdDuration,
435) -> Result<serde_json::Value, String> {
436 let target = target_for_base(base_dir, workflow_id);
437 let request_id = enqueue_update_request(&target, name, payload)?;
438 wait_for_update_response(&target, name, &request_id, timeout).await
439}
440
441fn enqueue_update_request(
442 target: &WorkflowTarget,
443 name: &str,
444 payload: serde_json::Value,
445) -> Result<String, String> {
446 let request_id = uuid::Uuid::now_v7().to_string();
447 enqueue_message(target, "update", name, payload, Some(request_id.clone()))?;
448 Ok(request_id)
449}
450
451async fn wait_for_update_response(
452 target: &WorkflowTarget,
453 name: &str,
454 request_id: &str,
455 timeout: StdDuration,
456) -> Result<serde_json::Value, String> {
457 let deadline = tokio::time::Instant::now() + timeout;
458 while tokio::time::Instant::now() <= deadline {
459 if let Ok(Some(value)) = update_response_value(target, request_id) {
460 return Ok(value);
461 }
462 tokio::time::sleep(StdDuration::from_millis(UPDATE_POLL_INTERVAL_MS)).await;
463 }
464 Err(format!(
465 "workflow update '{name}' timed out for '{}'",
466 target.workflow_id
467 ))
468}
469
470fn update_response_value(
471 target: &WorkflowTarget,
472 request_id: &str,
473) -> Result<Option<serde_json::Value>, String> {
474 Ok(load_state(target)?
475 .responses
476 .get(request_id)
477 .map(|response| response.value.clone()))
478}
479
480pub fn workflow_respond_update_for_base(
481 base_dir: &Path,
482 workflow_id: &str,
483 request_id: &str,
484 name: Option<&str>,
485 value: serde_json::Value,
486) -> Result<serde_json::Value, String> {
487 let target = target_for_base(base_dir, workflow_id);
488 record_update_response(&target, request_id, name, value)
489}
490
491fn record_update_response(
492 target: &WorkflowTarget,
493 request_id: &str,
494 name: Option<&str>,
495 value: serde_json::Value,
496) -> Result<serde_json::Value, String> {
497 let mut state = load_state(target)?;
498 state.responses.insert(
499 request_id.to_string(),
500 WorkflowUpdateResponseRecord {
501 request_id: request_id.to_string(),
502 name: name.map(ToString::to_string),
503 value,
504 responded_at: now_rfc3339(),
505 },
506 );
507 save_state(target, &state)?;
508 Ok(workflow_status_json(target, &state))
509}
510
511pub(crate) fn register_workflow_message_builtins(vm: &mut Vm) {
512 vm.set_global(
513 "workflow",
514 VmValue::Dict(Rc::new(BTreeMap::from([
515 (
516 "signal".to_string(),
517 VmValue::BuiltinRef(Rc::from("workflow.signal")),
518 ),
519 (
520 "query".to_string(),
521 VmValue::BuiltinRef(Rc::from("workflow.query")),
522 ),
523 (
524 "update".to_string(),
525 VmValue::BuiltinRef(Rc::from("workflow.update")),
526 ),
527 (
528 "publish_query".to_string(),
529 VmValue::BuiltinRef(Rc::from("workflow.publish_query")),
530 ),
531 (
532 "receive".to_string(),
533 VmValue::BuiltinRef(Rc::from("workflow.receive")),
534 ),
535 (
536 "respond_update".to_string(),
537 VmValue::BuiltinRef(Rc::from("workflow.respond_update")),
538 ),
539 (
540 "pause".to_string(),
541 VmValue::BuiltinRef(Rc::from("workflow.pause")),
542 ),
543 (
544 "resume".to_string(),
545 VmValue::BuiltinRef(Rc::from("workflow.resume")),
546 ),
547 (
548 "status".to_string(),
549 VmValue::BuiltinRef(Rc::from("workflow.status")),
550 ),
551 (
552 "continue_as_new".to_string(),
553 VmValue::BuiltinRef(Rc::from("workflow.continue_as_new")),
554 ),
555 ]))),
556 );
557
558 register_builtin_group(vm, WORKFLOW_MESSAGE_PRIMITIVES);
559}
560
561fn workflow_signal_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
562 let target = parse_target_vm(args.first(), None, "workflow.signal")?;
563 let name = args
564 .get(1)
565 .map(|value| value.display())
566 .filter(|value| !value.is_empty())
567 .ok_or_else(|| VmError::Runtime("workflow.signal: missing name".to_string()))?;
568 let payload = args
569 .get(2)
570 .map(crate::llm::vm_value_to_json)
571 .unwrap_or(serde_json::Value::Null);
572 let result =
573 enqueue_message(&target, "signal", &name, payload, None).map_err(VmError::Runtime)?;
574 Ok(crate::stdlib::json_to_vm_value(&result))
575}
576
577fn workflow_query_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
578 let target = parse_target_vm(args.first(), None, "workflow.query")?;
579 let name = args
580 .get(1)
581 .map(|value| value.display())
582 .filter(|value| !value.is_empty())
583 .ok_or_else(|| VmError::Runtime("workflow.query: missing name".to_string()))?;
584 let state = load_state(&target).map_err(VmError::Runtime)?;
585 Ok(crate::stdlib::json_to_vm_value(
586 &state
587 .queries
588 .get(&name)
589 .map(|record| record.value.clone())
590 .unwrap_or(serde_json::Value::Null),
591 ))
592}
593
594async fn workflow_update_builtin(args: Vec<VmValue>) -> Result<VmValue, VmError> {
595 let target = parse_target_vm(args.first(), None, "workflow.update")?;
596 let name = args
597 .get(1)
598 .map(|value| value.display())
599 .filter(|value| !value.is_empty())
600 .ok_or_else(|| VmError::Runtime("workflow.update: missing name".to_string()))?;
601 let payload = args
602 .get(2)
603 .map(crate::llm::vm_value_to_json)
604 .unwrap_or(serde_json::Value::Null);
605 let timeout_ms = args
606 .get(3)
607 .and_then(|value| value.as_dict())
608 .and_then(|dict| dict.get("timeout_ms"))
609 .and_then(VmValue::as_int)
610 .unwrap_or(DEFAULT_UPDATE_TIMEOUT_MS as i64)
611 .max(1) as u64;
612 let result = workflow_update_for_base(
613 &target.base_dir,
614 &target.workflow_id,
615 &name,
616 payload,
617 StdDuration::from_millis(timeout_ms),
618 )
619 .await
620 .map_err(VmError::Runtime)?;
621 Ok(crate::stdlib::json_to_vm_value(&result))
622}
623
624fn workflow_publish_query_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
625 let target = parse_target_vm(args.first(), None, "workflow.publish_query")?;
626 let name = args
627 .get(1)
628 .map(|value| value.display())
629 .filter(|value| !value.is_empty())
630 .ok_or_else(|| VmError::Runtime("workflow.publish_query: missing name".to_string()))?;
631 let value = args
632 .get(2)
633 .map(crate::llm::vm_value_to_json)
634 .unwrap_or(serde_json::Value::Null);
635 let result =
636 workflow_publish_query_for_base(&target.base_dir, &target.workflow_id, &name, value)
637 .map_err(VmError::Runtime)?;
638 Ok(crate::stdlib::json_to_vm_value(&result))
639}
640
641fn workflow_receive_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
642 let target = parse_target_vm(args.first(), None, "workflow.receive")?;
643 let Some(message) = receive_message(&target).map_err(VmError::Runtime)? else {
644 return Ok(VmValue::Nil);
645 };
646 Ok(crate::stdlib::json_to_vm_value(&serde_json::json!({
647 "workflow_id": target.workflow_id,
648 "seq": message.seq,
649 "kind": message.kind,
650 "name": message.name,
651 "request_id": message.request_id,
652 "payload": message.payload,
653 "enqueued_at": message.enqueued_at,
654 })))
655}
656
657fn workflow_respond_update_builtin(
658 args: &[VmValue],
659 _out: &mut String,
660) -> Result<VmValue, VmError> {
661 let target = parse_target_vm(args.first(), None, "workflow.respond_update")?;
662 let request_id = args
663 .get(1)
664 .map(|value| value.display())
665 .filter(|value| !value.is_empty())
666 .ok_or_else(|| {
667 VmError::Runtime("workflow.respond_update: missing request id".to_string())
668 })?;
669 let value = args
670 .get(2)
671 .map(crate::llm::vm_value_to_json)
672 .unwrap_or(serde_json::Value::Null);
673 let name = args
674 .get(3)
675 .map(|value| value.display())
676 .filter(|value| !value.is_empty());
677 let result = workflow_respond_update_for_base(
678 &target.base_dir,
679 &target.workflow_id,
680 &request_id,
681 name.as_deref(),
682 value,
683 )
684 .map_err(VmError::Runtime)?;
685 Ok(crate::stdlib::json_to_vm_value(&result))
686}
687
688fn workflow_pause_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
689 let target = parse_target_vm(args.first(), None, "workflow.pause")?;
690 let result =
691 workflow_pause_for_base(&target.base_dir, &target.workflow_id).map_err(VmError::Runtime)?;
692 Ok(crate::stdlib::json_to_vm_value(&result))
693}
694
695fn workflow_resume_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
696 let target = parse_target_vm(args.first(), None, "workflow.resume")?;
697 let result = workflow_resume_for_base(&target.base_dir, &target.workflow_id)
698 .map_err(VmError::Runtime)?;
699 Ok(crate::stdlib::json_to_vm_value(&result))
700}
701
702fn workflow_status_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
703 let target = parse_target_vm(args.first(), None, "workflow.status")?;
704 let state = load_state(&target).map_err(VmError::Runtime)?;
705 Ok(crate::stdlib::json_to_vm_value(&workflow_status_json(
706 &target, &state,
707 )))
708}
709
710fn workflow_continue_as_new_builtin(
711 args: &[VmValue],
712 _out: &mut String,
713) -> Result<VmValue, VmError> {
714 continue_as_new_for_label(args, "workflow.continue_as_new")
715}
716
717fn continue_as_new_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
718 continue_as_new_for_label(args, "continue_as_new")
719}
720
721fn continue_as_new_for_label(args: &[VmValue], label: &str) -> Result<VmValue, VmError> {
722 let target = parse_target_vm(args.first(), None, label)?;
723 let mut state = load_state(&target).map_err(VmError::Runtime)?;
724 state.generation += 1;
725 state.continue_as_new_count += 1;
726 state.last_continue_as_new_at = Some(now_rfc3339());
727 state.responses.clear();
728 save_state(&target, &state).map_err(VmError::Runtime)?;
729 Ok(crate::stdlib::json_to_vm_value(&workflow_status_json(
730 &target, &state,
731 )))
732}
733
734#[cfg(test)]
735mod tests {
736 use super::*;
737 use std::task::Poll;
738
739 #[tokio::test(start_paused = true)]
740 async fn update_round_trip_waits_for_response() {
741 let dir = tempfile::tempdir().expect("tempdir");
742 let workflow_id = "wf-update";
743 let base_dir = dir.path().to_path_buf();
744 let target = target_for_base(&base_dir, workflow_id);
745 let request_id =
746 enqueue_update_request(&target, "adjust_budget", serde_json::json!({"max_usd": 10}))
747 .expect("enqueue update");
748
749 let message = receive_message(&target)
750 .expect("receive queued update")
751 .expect("queued update");
752 assert_eq!(message.kind, "update");
753 assert_eq!(message.name, "adjust_budget");
754 assert_eq!(message.request_id.as_deref(), Some(request_id.as_str()));
755 assert_eq!(
756 update_response_value(&target, &request_id).expect("read response"),
757 None
758 );
759
760 let waiter = wait_for_update_response(
761 &target,
762 "adjust_budget",
763 &request_id,
764 StdDuration::from_millis(500),
765 );
766 tokio::pin!(waiter);
767 assert!(matches!(futures::poll!(&mut waiter), Poll::Pending));
768
769 workflow_respond_update_for_base(
770 &base_dir,
771 workflow_id,
772 &request_id,
773 Some("adjust_budget"),
774 serde_json::json!({"ok": true}),
775 )
776 .expect("save response");
777 assert_eq!(
778 update_response_value(&target, &request_id).expect("read response"),
779 Some(serde_json::json!({"ok": true}))
780 );
781 tokio::time::advance(StdDuration::from_millis(UPDATE_POLL_INTERVAL_MS)).await;
782
783 let result = waiter.await.expect("update result");
784 assert_eq!(result, serde_json::json!({"ok": true}));
785 }
786
787 #[test]
788 fn persisted_path_drives_target_base_dir() {
789 let base = parse_target_json(
790 &serde_json::json!({
791 "workflow_id": "wf",
792 "persisted_path": "/tmp/demo/.harn-runs/run.json"
793 }),
794 None,
795 )
796 .expect("target");
797 assert_eq!(base.workflow_id, "wf");
798 assert_eq!(base.base_dir, PathBuf::from("/tmp/demo"));
799 }
800}