Skip to main content

harn_vm/stdlib/
workflow_messages.rs

1use std::collections::{BTreeMap, VecDeque};
2use std::path::{Path, PathBuf};
3use std::rc::Rc;
4use std::time::Duration as StdDuration;
5
6use serde::{Deserialize, Serialize};
7
8use crate::stdlib::process::runtime_root_base;
9use crate::stdlib::registration::{
10    async_builtin, register_builtin_group, AsyncBuiltin, BuiltinGroup, SyncBuiltin,
11};
12use crate::value::{VmError, VmValue};
13use crate::vm::{Vm, VmBuiltinArity};
14
15const DEFAULT_UPDATE_TIMEOUT_MS: u64 = 30_000;
16const UPDATE_POLL_INTERVAL_MS: u64 = 25;
17
18const WORKFLOW_MESSAGE_SYNC_PRIMITIVES: &[SyncBuiltin] = &[
19    SyncBuiltin::new("workflow.signal", workflow_signal_builtin)
20        .signature("workflow.signal(target, name, payload?)")
21        .arity(VmBuiltinArity::Range { min: 2, max: 3 })
22        .doc("Enqueue a workflow signal message."),
23    SyncBuiltin::new("workflow.query", workflow_query_builtin)
24        .signature("workflow.query(target, name)")
25        .arity(VmBuiltinArity::Exact(2))
26        .doc("Read the latest published workflow query value."),
27    SyncBuiltin::new("workflow.publish_query", workflow_publish_query_builtin)
28        .signature("workflow.publish_query(target, name, value?)")
29        .arity(VmBuiltinArity::Range { min: 2, max: 3 })
30        .doc("Publish a workflow query value."),
31    SyncBuiltin::new("workflow.receive", workflow_receive_builtin)
32        .signature("workflow.receive(target)")
33        .arity(VmBuiltinArity::Exact(1))
34        .doc("Receive the next workflow mailbox message."),
35    SyncBuiltin::new("workflow.respond_update", workflow_respond_update_builtin)
36        .signature("workflow.respond_update(target, request_id, value?, name?)")
37        .arity(VmBuiltinArity::Range { min: 2, max: 4 })
38        .doc("Respond to a pending workflow update request."),
39    SyncBuiltin::new("workflow.pause", workflow_pause_builtin)
40        .signature("workflow.pause(target)")
41        .arity(VmBuiltinArity::Exact(1))
42        .doc("Pause a workflow mailbox."),
43    SyncBuiltin::new("workflow.resume", workflow_resume_builtin)
44        .signature("workflow.resume(target)")
45        .arity(VmBuiltinArity::Exact(1))
46        .doc("Resume a workflow mailbox."),
47    SyncBuiltin::new("workflow.status", workflow_status_builtin)
48        .signature("workflow.status(target)")
49        .arity(VmBuiltinArity::Exact(1))
50        .doc("Return workflow mailbox status."),
51    SyncBuiltin::new("workflow.continue_as_new", workflow_continue_as_new_builtin)
52        .signature("workflow.continue_as_new(target)")
53        .arity(VmBuiltinArity::Exact(1))
54        .doc("Advance a workflow mailbox generation."),
55    SyncBuiltin::new("continue_as_new", continue_as_new_builtin)
56        .signature("continue_as_new(target)")
57        .arity(VmBuiltinArity::Exact(1))
58        .doc("Advance a workflow mailbox generation."),
59];
60
61const WORKFLOW_MESSAGE_ASYNC_PRIMITIVES: &[AsyncBuiltin] =
62    &[async_builtin!("workflow.update", workflow_update_builtin)
63        .signature("workflow.update(target, name, payload?, options?)")
64        .arity(VmBuiltinArity::Range { min: 2, max: 4 })
65        .doc("Enqueue a workflow update and wait for a response.")];
66
67const WORKFLOW_MESSAGE_PRIMITIVES: BuiltinGroup<'static> = BuiltinGroup::new()
68    .category("workflow.messages")
69    .sync(WORKFLOW_MESSAGE_SYNC_PRIMITIVES)
70    .async_(WORKFLOW_MESSAGE_ASYNC_PRIMITIVES);
71
72#[derive(Clone, Debug, Serialize, Deserialize)]
73pub struct WorkflowMessageRecord {
74    pub seq: u64,
75    pub kind: String,
76    pub name: String,
77    #[serde(skip_serializing_if = "Option::is_none")]
78    pub request_id: Option<String>,
79    pub payload: serde_json::Value,
80    pub enqueued_at: String,
81}
82
83#[derive(Clone, Debug, Serialize, Deserialize)]
84pub struct WorkflowQueryRecord {
85    pub value: serde_json::Value,
86    pub published_at: String,
87}
88
89#[derive(Clone, Debug, Serialize, Deserialize)]
90pub struct WorkflowUpdateResponseRecord {
91    pub request_id: String,
92    #[serde(skip_serializing_if = "Option::is_none")]
93    pub name: Option<String>,
94    pub value: serde_json::Value,
95    pub responded_at: String,
96}
97
98#[derive(Clone, Debug, Serialize, Deserialize)]
99pub struct WorkflowMailboxState {
100    #[serde(rename = "_type")]
101    pub type_name: String,
102    pub workflow_id: String,
103    #[serde(default = "default_generation")]
104    pub generation: u64,
105    #[serde(default)]
106    pub continue_as_new_count: u64,
107    #[serde(skip_serializing_if = "Option::is_none")]
108    pub last_continue_as_new_at: Option<String>,
109    #[serde(default)]
110    pub paused: bool,
111    #[serde(default)]
112    pub next_seq: u64,
113    #[serde(default)]
114    pub mailbox: VecDeque<WorkflowMessageRecord>,
115    #[serde(default)]
116    pub queries: BTreeMap<String, WorkflowQueryRecord>,
117    #[serde(default)]
118    pub responses: BTreeMap<String, WorkflowUpdateResponseRecord>,
119}
120
121impl Default for WorkflowMailboxState {
122    fn default() -> Self {
123        Self {
124            type_name: "workflow_mailbox".to_string(),
125            workflow_id: String::new(),
126            generation: default_generation(),
127            continue_as_new_count: 0,
128            last_continue_as_new_at: None,
129            paused: false,
130            next_seq: 0,
131            mailbox: VecDeque::new(),
132            queries: BTreeMap::new(),
133            responses: BTreeMap::new(),
134        }
135    }
136}
137
138fn default_generation() -> u64 {
139    1
140}
141
142#[derive(Clone, Debug, PartialEq, Eq)]
143struct WorkflowTarget {
144    workflow_id: String,
145    base_dir: PathBuf,
146}
147
148fn sanitize_workflow_id(raw: &str) -> String {
149    let trimmed = raw.trim();
150    let base = Path::new(trimmed)
151        .file_name()
152        .and_then(|value| value.to_str())
153        .unwrap_or(trimmed);
154    if base.is_empty() || base == "." || base == ".." {
155        "workflow".to_string()
156    } else {
157        base.to_string()
158    }
159}
160
161fn workflow_base_dir_from_persisted_path(path: &Path) -> PathBuf {
162    let parent = path.parent().unwrap_or_else(|| Path::new("."));
163    if parent.file_name().and_then(|value| value.to_str()) == Some(".harn-runs") {
164        parent.parent().unwrap_or(parent).to_path_buf()
165    } else {
166        parent.to_path_buf()
167    }
168}
169
170fn workflow_target_root(target: &WorkflowTarget) -> PathBuf {
171    crate::runtime_paths::workflow_dir(&target.base_dir).join(&target.workflow_id)
172}
173
174fn workflow_state_path(target: &WorkflowTarget) -> PathBuf {
175    workflow_target_root(target).join("state.json")
176}
177
178fn now_rfc3339() -> String {
179    time::OffsetDateTime::now_utc()
180        .format(&time::format_description::well_known::Rfc3339)
181        .unwrap_or_else(|_| uuid::Uuid::now_v7().to_string())
182}
183
184fn load_state(target: &WorkflowTarget) -> Result<WorkflowMailboxState, String> {
185    let path = workflow_state_path(target);
186    if !path.exists() {
187        return Ok(WorkflowMailboxState {
188            workflow_id: target.workflow_id.clone(),
189            ..WorkflowMailboxState::default()
190        });
191    }
192    let text = std::fs::read_to_string(&path)
193        .map_err(|error| format!("workflow state read error: {error}"))?;
194    let mut state: WorkflowMailboxState = serde_json::from_str(&text)
195        .map_err(|error| format!("workflow state parse error: {error}"))?;
196    if state.type_name.is_empty() {
197        state.type_name = "workflow_mailbox".to_string();
198    }
199    if state.workflow_id.is_empty() {
200        state.workflow_id = target.workflow_id.clone();
201    }
202    if state.generation == 0 {
203        state.generation = 1;
204    }
205    Ok(state)
206}
207
208fn save_state(target: &WorkflowTarget, state: &WorkflowMailboxState) -> Result<(), String> {
209    let path = workflow_state_path(target);
210    let json = serde_json::to_string_pretty(state)
211        .map_err(|error| format!("workflow state encode error: {error}"))?;
212    crate::atomic_io::atomic_write(&path, json.as_bytes())
213        .map_err(|error| format!("workflow state write error: {error}"))
214}
215
216fn parse_target_json(
217    value: &serde_json::Value,
218    fallback_base_dir: Option<&Path>,
219) -> Option<WorkflowTarget> {
220    match value {
221        serde_json::Value::String(text) => Some(WorkflowTarget {
222            workflow_id: sanitize_workflow_id(text),
223            base_dir: fallback_base_dir
224                .map(Path::to_path_buf)
225                .unwrap_or_else(runtime_root_base),
226        }),
227        serde_json::Value::Object(map) => {
228            let workflow_id = map
229                .get("workflow_id")
230                .and_then(|value| value.as_str())
231                .or_else(|| map.get("workflow").and_then(|value| value.as_str()))
232                .or_else(|| {
233                    map.get("run")
234                        .and_then(|value| value.get("workflow_id"))
235                        .and_then(|value| value.as_str())
236                })
237                .or_else(|| {
238                    map.get("result")
239                        .and_then(|value| value.get("run"))
240                        .and_then(|value| value.get("workflow_id"))
241                        .and_then(|value| value.as_str())
242                })?;
243            let explicit_base = map
244                .get("base_dir")
245                .and_then(|value| value.as_str())
246                .filter(|value| !value.trim().is_empty())
247                .map(PathBuf::from);
248            let persisted_path = map
249                .get("persisted_path")
250                .and_then(|value| value.as_str())
251                .or_else(|| map.get("path").and_then(|value| value.as_str()))
252                .or_else(|| {
253                    map.get("run")
254                        .and_then(|value| value.get("persisted_path"))
255                        .and_then(|value| value.as_str())
256                })
257                .or_else(|| {
258                    map.get("result")
259                        .and_then(|value| value.get("run"))
260                        .and_then(|value| value.get("persisted_path"))
261                        .and_then(|value| value.as_str())
262                });
263            let base_dir = explicit_base
264                .or_else(|| {
265                    persisted_path
266                        .map(|path| workflow_base_dir_from_persisted_path(Path::new(path)))
267                })
268                .or_else(|| fallback_base_dir.map(Path::to_path_buf))
269                .unwrap_or_else(runtime_root_base);
270            Some(WorkflowTarget {
271                workflow_id: sanitize_workflow_id(workflow_id),
272                base_dir,
273            })
274        }
275        _ => None,
276    }
277}
278
279fn parse_target_vm(
280    value: Option<&VmValue>,
281    fallback_base_dir: Option<&Path>,
282    builtin: &str,
283) -> Result<WorkflowTarget, VmError> {
284    let value = value.ok_or_else(|| VmError::Runtime(format!("{builtin}: missing target")))?;
285    parse_target_json(&crate::llm::vm_value_to_json(value), fallback_base_dir).ok_or_else(|| {
286        VmError::Runtime(format!(
287            "{builtin}: target must be a workflow id string or dict with workflow_id/workflow"
288        ))
289    })
290}
291
292fn workflow_status_json(
293    target: &WorkflowTarget,
294    state: &WorkflowMailboxState,
295) -> serde_json::Value {
296    serde_json::json!({
297        "workflow_id": target.workflow_id,
298        "base_dir": target.base_dir.to_string_lossy(),
299        "generation": state.generation,
300        "paused": state.paused,
301        "pending_count": state.mailbox.len(),
302        "query_count": state.queries.len(),
303        "response_count": state.responses.len(),
304        "continue_as_new_count": state.continue_as_new_count,
305        "last_continue_as_new_at": state.last_continue_as_new_at,
306    })
307}
308
309fn target_for_base(base_dir: &Path, workflow_id: &str) -> WorkflowTarget {
310    WorkflowTarget {
311        workflow_id: sanitize_workflow_id(workflow_id),
312        base_dir: base_dir.to_path_buf(),
313    }
314}
315
316fn enqueue_message(
317    target: &WorkflowTarget,
318    kind: &str,
319    name: &str,
320    payload: serde_json::Value,
321    request_id: Option<String>,
322) -> Result<serde_json::Value, String> {
323    let mut state = load_state(target)?;
324    let message = push_message(&mut state, kind, name, payload, request_id);
325    save_state(target, &state)?;
326    Ok(serde_json::json!({
327        "workflow_id": target.workflow_id,
328        "message": message,
329        "status": workflow_status_json(target, &state),
330    }))
331}
332
333fn push_message(
334    state: &mut WorkflowMailboxState,
335    kind: &str,
336    name: &str,
337    payload: serde_json::Value,
338    request_id: Option<String>,
339) -> WorkflowMessageRecord {
340    state.next_seq += 1;
341    let message = WorkflowMessageRecord {
342        seq: state.next_seq,
343        kind: kind.to_string(),
344        name: name.to_string(),
345        request_id,
346        payload,
347        enqueued_at: now_rfc3339(),
348    };
349    state.mailbox.push_back(message.clone());
350    message
351}
352
353fn receive_message(target: &WorkflowTarget) -> Result<Option<WorkflowMessageRecord>, String> {
354    let mut state = load_state(target)?;
355    let message = state.mailbox.pop_front();
356    if message.is_some() {
357        save_state(target, &state)?;
358    }
359    Ok(message)
360}
361
362pub fn workflow_signal_for_base(
363    base_dir: &Path,
364    workflow_id: &str,
365    name: &str,
366    payload: serde_json::Value,
367) -> Result<serde_json::Value, String> {
368    let target = target_for_base(base_dir, workflow_id);
369    enqueue_message(&target, "signal", name, payload, None)
370}
371
372pub fn workflow_query_for_base(
373    base_dir: &Path,
374    workflow_id: &str,
375    name: &str,
376) -> Result<serde_json::Value, String> {
377    let target = target_for_base(base_dir, workflow_id);
378    let state = load_state(&target)?;
379    Ok(state
380        .queries
381        .get(name)
382        .map(|record| record.value.clone())
383        .unwrap_or(serde_json::Value::Null))
384}
385
386pub fn workflow_publish_query_for_base(
387    base_dir: &Path,
388    workflow_id: &str,
389    name: &str,
390    value: serde_json::Value,
391) -> Result<serde_json::Value, String> {
392    let target = target_for_base(base_dir, workflow_id);
393    let mut state = load_state(&target)?;
394    state.queries.insert(
395        name.to_string(),
396        WorkflowQueryRecord {
397            value,
398            published_at: now_rfc3339(),
399        },
400    );
401    save_state(&target, &state)?;
402    Ok(workflow_status_json(&target, &state))
403}
404
405pub fn workflow_pause_for_base(
406    base_dir: &Path,
407    workflow_id: &str,
408) -> Result<serde_json::Value, String> {
409    let target = target_for_base(base_dir, workflow_id);
410    let mut state = load_state(&target)?;
411    state.paused = true;
412    push_message(&mut state, "control", "pause", serde_json::json!({}), None);
413    save_state(&target, &state)?;
414    Ok(workflow_status_json(&target, &state))
415}
416
417pub fn workflow_resume_for_base(
418    base_dir: &Path,
419    workflow_id: &str,
420) -> Result<serde_json::Value, String> {
421    let target = target_for_base(base_dir, workflow_id);
422    let mut state = load_state(&target)?;
423    state.paused = false;
424    push_message(&mut state, "control", "resume", serde_json::json!({}), None);
425    save_state(&target, &state)?;
426    Ok(workflow_status_json(&target, &state))
427}
428
429pub async fn workflow_update_for_base(
430    base_dir: &Path,
431    workflow_id: &str,
432    name: &str,
433    payload: serde_json::Value,
434    timeout: StdDuration,
435) -> Result<serde_json::Value, String> {
436    let target = target_for_base(base_dir, workflow_id);
437    let request_id = enqueue_update_request(&target, name, payload)?;
438    wait_for_update_response(&target, name, &request_id, timeout).await
439}
440
441fn enqueue_update_request(
442    target: &WorkflowTarget,
443    name: &str,
444    payload: serde_json::Value,
445) -> Result<String, String> {
446    let request_id = uuid::Uuid::now_v7().to_string();
447    enqueue_message(target, "update", name, payload, Some(request_id.clone()))?;
448    Ok(request_id)
449}
450
451async fn wait_for_update_response(
452    target: &WorkflowTarget,
453    name: &str,
454    request_id: &str,
455    timeout: StdDuration,
456) -> Result<serde_json::Value, String> {
457    let deadline = tokio::time::Instant::now() + timeout;
458    while tokio::time::Instant::now() <= deadline {
459        if let Ok(Some(value)) = update_response_value(target, request_id) {
460            return Ok(value);
461        }
462        tokio::time::sleep(StdDuration::from_millis(UPDATE_POLL_INTERVAL_MS)).await;
463    }
464    Err(format!(
465        "workflow update '{name}' timed out for '{}'",
466        target.workflow_id
467    ))
468}
469
470fn update_response_value(
471    target: &WorkflowTarget,
472    request_id: &str,
473) -> Result<Option<serde_json::Value>, String> {
474    Ok(load_state(target)?
475        .responses
476        .get(request_id)
477        .map(|response| response.value.clone()))
478}
479
480pub fn workflow_respond_update_for_base(
481    base_dir: &Path,
482    workflow_id: &str,
483    request_id: &str,
484    name: Option<&str>,
485    value: serde_json::Value,
486) -> Result<serde_json::Value, String> {
487    let target = target_for_base(base_dir, workflow_id);
488    record_update_response(&target, request_id, name, value)
489}
490
491fn record_update_response(
492    target: &WorkflowTarget,
493    request_id: &str,
494    name: Option<&str>,
495    value: serde_json::Value,
496) -> Result<serde_json::Value, String> {
497    let mut state = load_state(target)?;
498    state.responses.insert(
499        request_id.to_string(),
500        WorkflowUpdateResponseRecord {
501            request_id: request_id.to_string(),
502            name: name.map(ToString::to_string),
503            value,
504            responded_at: now_rfc3339(),
505        },
506    );
507    save_state(target, &state)?;
508    Ok(workflow_status_json(target, &state))
509}
510
511pub(crate) fn register_workflow_message_builtins(vm: &mut Vm) {
512    vm.set_global(
513        "workflow",
514        VmValue::Dict(Rc::new(BTreeMap::from([
515            (
516                "signal".to_string(),
517                VmValue::BuiltinRef(Rc::from("workflow.signal")),
518            ),
519            (
520                "query".to_string(),
521                VmValue::BuiltinRef(Rc::from("workflow.query")),
522            ),
523            (
524                "update".to_string(),
525                VmValue::BuiltinRef(Rc::from("workflow.update")),
526            ),
527            (
528                "publish_query".to_string(),
529                VmValue::BuiltinRef(Rc::from("workflow.publish_query")),
530            ),
531            (
532                "receive".to_string(),
533                VmValue::BuiltinRef(Rc::from("workflow.receive")),
534            ),
535            (
536                "respond_update".to_string(),
537                VmValue::BuiltinRef(Rc::from("workflow.respond_update")),
538            ),
539            (
540                "pause".to_string(),
541                VmValue::BuiltinRef(Rc::from("workflow.pause")),
542            ),
543            (
544                "resume".to_string(),
545                VmValue::BuiltinRef(Rc::from("workflow.resume")),
546            ),
547            (
548                "status".to_string(),
549                VmValue::BuiltinRef(Rc::from("workflow.status")),
550            ),
551            (
552                "continue_as_new".to_string(),
553                VmValue::BuiltinRef(Rc::from("workflow.continue_as_new")),
554            ),
555        ]))),
556    );
557
558    register_builtin_group(vm, WORKFLOW_MESSAGE_PRIMITIVES);
559}
560
561fn workflow_signal_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
562    let target = parse_target_vm(args.first(), None, "workflow.signal")?;
563    let name = args
564        .get(1)
565        .map(|value| value.display())
566        .filter(|value| !value.is_empty())
567        .ok_or_else(|| VmError::Runtime("workflow.signal: missing name".to_string()))?;
568    let payload = args
569        .get(2)
570        .map(crate::llm::vm_value_to_json)
571        .unwrap_or(serde_json::Value::Null);
572    let result =
573        enqueue_message(&target, "signal", &name, payload, None).map_err(VmError::Runtime)?;
574    Ok(crate::stdlib::json_to_vm_value(&result))
575}
576
577fn workflow_query_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
578    let target = parse_target_vm(args.first(), None, "workflow.query")?;
579    let name = args
580        .get(1)
581        .map(|value| value.display())
582        .filter(|value| !value.is_empty())
583        .ok_or_else(|| VmError::Runtime("workflow.query: missing name".to_string()))?;
584    let state = load_state(&target).map_err(VmError::Runtime)?;
585    Ok(crate::stdlib::json_to_vm_value(
586        &state
587            .queries
588            .get(&name)
589            .map(|record| record.value.clone())
590            .unwrap_or(serde_json::Value::Null),
591    ))
592}
593
594async fn workflow_update_builtin(args: Vec<VmValue>) -> Result<VmValue, VmError> {
595    let target = parse_target_vm(args.first(), None, "workflow.update")?;
596    let name = args
597        .get(1)
598        .map(|value| value.display())
599        .filter(|value| !value.is_empty())
600        .ok_or_else(|| VmError::Runtime("workflow.update: missing name".to_string()))?;
601    let payload = args
602        .get(2)
603        .map(crate::llm::vm_value_to_json)
604        .unwrap_or(serde_json::Value::Null);
605    let timeout_ms = args
606        .get(3)
607        .and_then(|value| value.as_dict())
608        .and_then(|dict| dict.get("timeout_ms"))
609        .and_then(VmValue::as_int)
610        .unwrap_or(DEFAULT_UPDATE_TIMEOUT_MS as i64)
611        .max(1) as u64;
612    let result = workflow_update_for_base(
613        &target.base_dir,
614        &target.workflow_id,
615        &name,
616        payload,
617        StdDuration::from_millis(timeout_ms),
618    )
619    .await
620    .map_err(VmError::Runtime)?;
621    Ok(crate::stdlib::json_to_vm_value(&result))
622}
623
624fn workflow_publish_query_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
625    let target = parse_target_vm(args.first(), None, "workflow.publish_query")?;
626    let name = args
627        .get(1)
628        .map(|value| value.display())
629        .filter(|value| !value.is_empty())
630        .ok_or_else(|| VmError::Runtime("workflow.publish_query: missing name".to_string()))?;
631    let value = args
632        .get(2)
633        .map(crate::llm::vm_value_to_json)
634        .unwrap_or(serde_json::Value::Null);
635    let result =
636        workflow_publish_query_for_base(&target.base_dir, &target.workflow_id, &name, value)
637            .map_err(VmError::Runtime)?;
638    Ok(crate::stdlib::json_to_vm_value(&result))
639}
640
641fn workflow_receive_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
642    let target = parse_target_vm(args.first(), None, "workflow.receive")?;
643    let Some(message) = receive_message(&target).map_err(VmError::Runtime)? else {
644        return Ok(VmValue::Nil);
645    };
646    Ok(crate::stdlib::json_to_vm_value(&serde_json::json!({
647        "workflow_id": target.workflow_id,
648        "seq": message.seq,
649        "kind": message.kind,
650        "name": message.name,
651        "request_id": message.request_id,
652        "payload": message.payload,
653        "enqueued_at": message.enqueued_at,
654    })))
655}
656
657fn workflow_respond_update_builtin(
658    args: &[VmValue],
659    _out: &mut String,
660) -> Result<VmValue, VmError> {
661    let target = parse_target_vm(args.first(), None, "workflow.respond_update")?;
662    let request_id = args
663        .get(1)
664        .map(|value| value.display())
665        .filter(|value| !value.is_empty())
666        .ok_or_else(|| {
667            VmError::Runtime("workflow.respond_update: missing request id".to_string())
668        })?;
669    let value = args
670        .get(2)
671        .map(crate::llm::vm_value_to_json)
672        .unwrap_or(serde_json::Value::Null);
673    let name = args
674        .get(3)
675        .map(|value| value.display())
676        .filter(|value| !value.is_empty());
677    let result = workflow_respond_update_for_base(
678        &target.base_dir,
679        &target.workflow_id,
680        &request_id,
681        name.as_deref(),
682        value,
683    )
684    .map_err(VmError::Runtime)?;
685    Ok(crate::stdlib::json_to_vm_value(&result))
686}
687
688fn workflow_pause_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
689    let target = parse_target_vm(args.first(), None, "workflow.pause")?;
690    let result =
691        workflow_pause_for_base(&target.base_dir, &target.workflow_id).map_err(VmError::Runtime)?;
692    Ok(crate::stdlib::json_to_vm_value(&result))
693}
694
695fn workflow_resume_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
696    let target = parse_target_vm(args.first(), None, "workflow.resume")?;
697    let result = workflow_resume_for_base(&target.base_dir, &target.workflow_id)
698        .map_err(VmError::Runtime)?;
699    Ok(crate::stdlib::json_to_vm_value(&result))
700}
701
702fn workflow_status_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
703    let target = parse_target_vm(args.first(), None, "workflow.status")?;
704    let state = load_state(&target).map_err(VmError::Runtime)?;
705    Ok(crate::stdlib::json_to_vm_value(&workflow_status_json(
706        &target, &state,
707    )))
708}
709
710fn workflow_continue_as_new_builtin(
711    args: &[VmValue],
712    _out: &mut String,
713) -> Result<VmValue, VmError> {
714    continue_as_new_for_label(args, "workflow.continue_as_new")
715}
716
717fn continue_as_new_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
718    continue_as_new_for_label(args, "continue_as_new")
719}
720
721fn continue_as_new_for_label(args: &[VmValue], label: &str) -> Result<VmValue, VmError> {
722    let target = parse_target_vm(args.first(), None, label)?;
723    let mut state = load_state(&target).map_err(VmError::Runtime)?;
724    state.generation += 1;
725    state.continue_as_new_count += 1;
726    state.last_continue_as_new_at = Some(now_rfc3339());
727    state.responses.clear();
728    save_state(&target, &state).map_err(VmError::Runtime)?;
729    Ok(crate::stdlib::json_to_vm_value(&workflow_status_json(
730        &target, &state,
731    )))
732}
733
734#[cfg(test)]
735mod tests {
736    use super::*;
737    use std::task::Poll;
738
739    #[tokio::test(start_paused = true)]
740    async fn update_round_trip_waits_for_response() {
741        let dir = tempfile::tempdir().expect("tempdir");
742        let workflow_id = "wf-update";
743        let base_dir = dir.path().to_path_buf();
744        let target = target_for_base(&base_dir, workflow_id);
745        let request_id =
746            enqueue_update_request(&target, "adjust_budget", serde_json::json!({"max_usd": 10}))
747                .expect("enqueue update");
748
749        let message = receive_message(&target)
750            .expect("receive queued update")
751            .expect("queued update");
752        assert_eq!(message.kind, "update");
753        assert_eq!(message.name, "adjust_budget");
754        assert_eq!(message.request_id.as_deref(), Some(request_id.as_str()));
755        assert_eq!(
756            update_response_value(&target, &request_id).expect("read response"),
757            None
758        );
759
760        let waiter = wait_for_update_response(
761            &target,
762            "adjust_budget",
763            &request_id,
764            StdDuration::from_millis(500),
765        );
766        tokio::pin!(waiter);
767        assert!(matches!(futures::poll!(&mut waiter), Poll::Pending));
768
769        workflow_respond_update_for_base(
770            &base_dir,
771            workflow_id,
772            &request_id,
773            Some("adjust_budget"),
774            serde_json::json!({"ok": true}),
775        )
776        .expect("save response");
777        assert_eq!(
778            update_response_value(&target, &request_id).expect("read response"),
779            Some(serde_json::json!({"ok": true}))
780        );
781        tokio::time::advance(StdDuration::from_millis(UPDATE_POLL_INTERVAL_MS)).await;
782
783        let result = waiter.await.expect("update result");
784        assert_eq!(result, serde_json::json!({"ok": true}));
785    }
786
787    #[test]
788    fn persisted_path_drives_target_base_dir() {
789        let base = parse_target_json(
790            &serde_json::json!({
791                "workflow_id": "wf",
792                "persisted_path": "/tmp/demo/.harn-runs/run.json"
793            }),
794            None,
795        )
796        .expect("target");
797        assert_eq!(base.workflow_id, "wf");
798        assert_eq!(base.base_dir, PathBuf::from("/tmp/demo"));
799    }
800}