Skip to main content

harn_vm/stdlib/
workflow_messages.rs

1use std::collections::{BTreeMap, VecDeque};
2use std::path::{Path, PathBuf};
3use std::rc::Rc;
4use std::time::Duration as StdDuration;
5
6use serde::{Deserialize, Serialize};
7
8use crate::stdlib::process::runtime_root_base;
9use crate::stdlib::registration::{
10    async_builtin, register_builtin_group, AsyncBuiltin, BuiltinGroup, SyncBuiltin,
11};
12use crate::value::{VmError, VmValue};
13use crate::vm::{Vm, VmBuiltinArity};
14
15const DEFAULT_UPDATE_TIMEOUT_MS: u64 = 30_000;
16const UPDATE_POLL_INTERVAL_MS: u64 = 25;
17
18const WORKFLOW_MESSAGE_SYNC_PRIMITIVES: &[SyncBuiltin] = &[
19    SyncBuiltin::new("workflow.signal", workflow_signal_builtin)
20        .signature("workflow.signal(target, name, payload?)")
21        .arity(VmBuiltinArity::Range { min: 2, max: 3 })
22        .doc("Enqueue a workflow signal message."),
23    SyncBuiltin::new("workflow.query", workflow_query_builtin)
24        .signature("workflow.query(target, name)")
25        .arity(VmBuiltinArity::Exact(2))
26        .doc("Read the latest published workflow query value."),
27    SyncBuiltin::new("workflow.publish_query", workflow_publish_query_builtin)
28        .signature("workflow.publish_query(target, name, value?)")
29        .arity(VmBuiltinArity::Range { min: 2, max: 3 })
30        .doc("Publish a workflow query value."),
31    SyncBuiltin::new("workflow.receive", workflow_receive_builtin)
32        .signature("workflow.receive(target)")
33        .arity(VmBuiltinArity::Exact(1))
34        .doc("Receive the next workflow mailbox message."),
35    SyncBuiltin::new("workflow.respond_update", workflow_respond_update_builtin)
36        .signature("workflow.respond_update(target, request_id, value?, name?)")
37        .arity(VmBuiltinArity::Range { min: 2, max: 4 })
38        .doc("Respond to a pending workflow update request."),
39    SyncBuiltin::new("workflow.pause", workflow_pause_builtin)
40        .signature("workflow.pause(target)")
41        .arity(VmBuiltinArity::Exact(1))
42        .doc("Pause a workflow mailbox."),
43    SyncBuiltin::new("workflow.resume", workflow_resume_builtin)
44        .signature("workflow.resume(target)")
45        .arity(VmBuiltinArity::Exact(1))
46        .doc("Resume a workflow mailbox."),
47    SyncBuiltin::new("workflow.status", workflow_status_builtin)
48        .signature("workflow.status(target)")
49        .arity(VmBuiltinArity::Exact(1))
50        .doc("Return workflow mailbox status."),
51    SyncBuiltin::new("workflow.continue_as_new", workflow_continue_as_new_builtin)
52        .signature("workflow.continue_as_new(target)")
53        .arity(VmBuiltinArity::Exact(1))
54        .doc("Advance a workflow mailbox generation."),
55    SyncBuiltin::new("continue_as_new", continue_as_new_builtin)
56        .signature("continue_as_new(target)")
57        .arity(VmBuiltinArity::Exact(1))
58        .doc("Advance a workflow mailbox generation."),
59];
60
61const WORKFLOW_MESSAGE_ASYNC_PRIMITIVES: &[AsyncBuiltin] =
62    &[async_builtin!("workflow.update", workflow_update_builtin)
63        .signature("workflow.update(target, name, payload?, options?)")
64        .arity(VmBuiltinArity::Range { min: 2, max: 4 })
65        .doc("Enqueue a workflow update and wait for a response.")];
66
67const WORKFLOW_MESSAGE_PRIMITIVES: BuiltinGroup<'static> = BuiltinGroup::new()
68    .category("workflow.messages")
69    .sync(WORKFLOW_MESSAGE_SYNC_PRIMITIVES)
70    .async_(WORKFLOW_MESSAGE_ASYNC_PRIMITIVES);
71
72#[derive(Clone, Debug, Serialize, Deserialize)]
73pub struct WorkflowMessageRecord {
74    pub seq: u64,
75    pub kind: String,
76    pub name: String,
77    #[serde(skip_serializing_if = "Option::is_none")]
78    pub request_id: Option<String>,
79    pub payload: serde_json::Value,
80    pub enqueued_at: String,
81}
82
83#[derive(Clone, Debug, Serialize, Deserialize)]
84pub struct WorkflowQueryRecord {
85    pub value: serde_json::Value,
86    pub published_at: String,
87}
88
89#[derive(Clone, Debug, Serialize, Deserialize)]
90pub struct WorkflowUpdateResponseRecord {
91    pub request_id: String,
92    #[serde(skip_serializing_if = "Option::is_none")]
93    pub name: Option<String>,
94    pub value: serde_json::Value,
95    pub responded_at: String,
96}
97
98#[derive(Clone, Debug, Serialize, Deserialize)]
99pub struct WorkflowMailboxState {
100    #[serde(rename = "_type")]
101    pub type_name: String,
102    pub workflow_id: String,
103    #[serde(default = "default_generation")]
104    pub generation: u64,
105    #[serde(default)]
106    pub continue_as_new_count: u64,
107    #[serde(skip_serializing_if = "Option::is_none")]
108    pub last_continue_as_new_at: Option<String>,
109    #[serde(default)]
110    pub paused: bool,
111    #[serde(default)]
112    pub next_seq: u64,
113    #[serde(default)]
114    pub mailbox: VecDeque<WorkflowMessageRecord>,
115    #[serde(default)]
116    pub queries: BTreeMap<String, WorkflowQueryRecord>,
117    #[serde(default)]
118    pub responses: BTreeMap<String, WorkflowUpdateResponseRecord>,
119}
120
121impl Default for WorkflowMailboxState {
122    fn default() -> Self {
123        Self {
124            type_name: "workflow_mailbox".to_string(),
125            workflow_id: String::new(),
126            generation: default_generation(),
127            continue_as_new_count: 0,
128            last_continue_as_new_at: None,
129            paused: false,
130            next_seq: 0,
131            mailbox: VecDeque::new(),
132            queries: BTreeMap::new(),
133            responses: BTreeMap::new(),
134        }
135    }
136}
137
138fn default_generation() -> u64 {
139    1
140}
141
142#[derive(Clone, Debug, PartialEq, Eq)]
143struct WorkflowTarget {
144    workflow_id: String,
145    base_dir: PathBuf,
146}
147
148fn sanitize_workflow_id(raw: &str) -> String {
149    let trimmed = raw.trim();
150    let base = Path::new(trimmed)
151        .file_name()
152        .and_then(|value| value.to_str())
153        .unwrap_or(trimmed);
154    if base.is_empty() || base == "." || base == ".." {
155        "workflow".to_string()
156    } else {
157        base.to_string()
158    }
159}
160
161fn workflow_base_dir_from_persisted_path(path: &Path) -> PathBuf {
162    let parent = path.parent().unwrap_or_else(|| Path::new("."));
163    if parent.file_name().and_then(|value| value.to_str()) == Some(".harn-runs") {
164        parent.parent().unwrap_or(parent).to_path_buf()
165    } else {
166        parent.to_path_buf()
167    }
168}
169
170fn workflow_target_root(target: &WorkflowTarget) -> PathBuf {
171    crate::runtime_paths::workflow_dir(&target.base_dir).join(&target.workflow_id)
172}
173
174fn workflow_state_path(target: &WorkflowTarget) -> PathBuf {
175    workflow_target_root(target).join("state.json")
176}
177
178fn now_rfc3339() -> String {
179    time::OffsetDateTime::now_utc()
180        .format(&time::format_description::well_known::Rfc3339)
181        .unwrap_or_else(|_| uuid::Uuid::now_v7().to_string())
182}
183
184fn load_state(target: &WorkflowTarget) -> Result<WorkflowMailboxState, String> {
185    let path = workflow_state_path(target);
186    if !path.exists() {
187        return Ok(WorkflowMailboxState {
188            workflow_id: target.workflow_id.clone(),
189            ..WorkflowMailboxState::default()
190        });
191    }
192    let text = std::fs::read_to_string(&path)
193        .map_err(|error| format!("workflow state read error: {error}"))?;
194    let mut state: WorkflowMailboxState = serde_json::from_str(&text)
195        .map_err(|error| format!("workflow state parse error: {error}"))?;
196    if state.type_name.is_empty() {
197        state.type_name = "workflow_mailbox".to_string();
198    }
199    if state.workflow_id.is_empty() {
200        state.workflow_id = target.workflow_id.clone();
201    }
202    if state.generation == 0 {
203        state.generation = 1;
204    }
205    Ok(state)
206}
207
208fn save_state(target: &WorkflowTarget, state: &WorkflowMailboxState) -> Result<(), String> {
209    let path = workflow_state_path(target);
210    let json = serde_json::to_string_pretty(state)
211        .map_err(|error| format!("workflow state encode error: {error}"))?;
212    crate::atomic_io::atomic_write(&path, json.as_bytes())
213        .map_err(|error| format!("workflow state write error: {error}"))
214}
215
216fn parse_target_json(
217    value: &serde_json::Value,
218    fallback_base_dir: Option<&Path>,
219) -> Option<WorkflowTarget> {
220    match value {
221        serde_json::Value::String(text) => Some(WorkflowTarget {
222            workflow_id: sanitize_workflow_id(text),
223            base_dir: fallback_base_dir
224                .map(Path::to_path_buf)
225                .unwrap_or_else(runtime_root_base),
226        }),
227        serde_json::Value::Object(map) => {
228            let workflow_id = map
229                .get("workflow_id")
230                .and_then(|value| value.as_str())
231                .or_else(|| map.get("workflow").and_then(|value| value.as_str()))
232                .or_else(|| {
233                    map.get("run")
234                        .and_then(|value| value.get("workflow_id"))
235                        .and_then(|value| value.as_str())
236                })
237                .or_else(|| {
238                    map.get("result")
239                        .and_then(|value| value.get("run"))
240                        .and_then(|value| value.get("workflow_id"))
241                        .and_then(|value| value.as_str())
242                })?;
243            let explicit_base = map
244                .get("base_dir")
245                .and_then(|value| value.as_str())
246                .filter(|value| !value.trim().is_empty())
247                .map(PathBuf::from);
248            let persisted_path = map
249                .get("persisted_path")
250                .and_then(|value| value.as_str())
251                .or_else(|| map.get("path").and_then(|value| value.as_str()))
252                .or_else(|| {
253                    map.get("run")
254                        .and_then(|value| value.get("persisted_path"))
255                        .and_then(|value| value.as_str())
256                })
257                .or_else(|| {
258                    map.get("result")
259                        .and_then(|value| value.get("run"))
260                        .and_then(|value| value.get("persisted_path"))
261                        .and_then(|value| value.as_str())
262                });
263            let base_dir = explicit_base
264                .or_else(|| {
265                    persisted_path
266                        .map(|path| workflow_base_dir_from_persisted_path(Path::new(path)))
267                })
268                .or_else(|| fallback_base_dir.map(Path::to_path_buf))
269                .unwrap_or_else(runtime_root_base);
270            Some(WorkflowTarget {
271                workflow_id: sanitize_workflow_id(workflow_id),
272                base_dir,
273            })
274        }
275        _ => None,
276    }
277}
278
279fn parse_target_vm(
280    value: Option<&VmValue>,
281    fallback_base_dir: Option<&Path>,
282    builtin: &str,
283) -> Result<WorkflowTarget, VmError> {
284    let value = value.ok_or_else(|| VmError::Runtime(format!("{builtin}: missing target")))?;
285    parse_target_json(&crate::llm::vm_value_to_json(value), fallback_base_dir).ok_or_else(|| {
286        VmError::Runtime(format!(
287            "{builtin}: target must be a workflow id string or dict with workflow_id/workflow"
288        ))
289    })
290}
291
292fn workflow_status_json(
293    target: &WorkflowTarget,
294    state: &WorkflowMailboxState,
295) -> serde_json::Value {
296    serde_json::json!({
297        "workflow_id": target.workflow_id,
298        "base_dir": target.base_dir.to_string_lossy(),
299        "generation": state.generation,
300        "paused": state.paused,
301        "pending_count": state.mailbox.len(),
302        "query_count": state.queries.len(),
303        "response_count": state.responses.len(),
304        "continue_as_new_count": state.continue_as_new_count,
305        "last_continue_as_new_at": state.last_continue_as_new_at,
306    })
307}
308
309fn enqueue_message(
310    target: &WorkflowTarget,
311    kind: &str,
312    name: &str,
313    payload: serde_json::Value,
314    request_id: Option<String>,
315) -> Result<serde_json::Value, String> {
316    let mut state = load_state(target)?;
317    state.next_seq += 1;
318    let message = WorkflowMessageRecord {
319        seq: state.next_seq,
320        kind: kind.to_string(),
321        name: name.to_string(),
322        request_id,
323        payload,
324        enqueued_at: now_rfc3339(),
325    };
326    state.mailbox.push_back(message.clone());
327    save_state(target, &state)?;
328    Ok(serde_json::json!({
329        "workflow_id": target.workflow_id,
330        "message": message,
331        "status": workflow_status_json(target, &state),
332    }))
333}
334
335pub fn workflow_signal_for_base(
336    base_dir: &Path,
337    workflow_id: &str,
338    name: &str,
339    payload: serde_json::Value,
340) -> Result<serde_json::Value, String> {
341    let target = WorkflowTarget {
342        workflow_id: sanitize_workflow_id(workflow_id),
343        base_dir: base_dir.to_path_buf(),
344    };
345    enqueue_message(&target, "signal", name, payload, None)
346}
347
348pub fn workflow_query_for_base(
349    base_dir: &Path,
350    workflow_id: &str,
351    name: &str,
352) -> Result<serde_json::Value, String> {
353    let target = WorkflowTarget {
354        workflow_id: sanitize_workflow_id(workflow_id),
355        base_dir: base_dir.to_path_buf(),
356    };
357    let state = load_state(&target)?;
358    Ok(state
359        .queries
360        .get(name)
361        .map(|record| record.value.clone())
362        .unwrap_or(serde_json::Value::Null))
363}
364
365pub fn workflow_publish_query_for_base(
366    base_dir: &Path,
367    workflow_id: &str,
368    name: &str,
369    value: serde_json::Value,
370) -> Result<serde_json::Value, String> {
371    let target = WorkflowTarget {
372        workflow_id: sanitize_workflow_id(workflow_id),
373        base_dir: base_dir.to_path_buf(),
374    };
375    let mut state = load_state(&target)?;
376    state.queries.insert(
377        name.to_string(),
378        WorkflowQueryRecord {
379            value,
380            published_at: now_rfc3339(),
381        },
382    );
383    save_state(&target, &state)?;
384    Ok(workflow_status_json(&target, &state))
385}
386
387pub fn workflow_pause_for_base(
388    base_dir: &Path,
389    workflow_id: &str,
390) -> Result<serde_json::Value, String> {
391    let target = WorkflowTarget {
392        workflow_id: sanitize_workflow_id(workflow_id),
393        base_dir: base_dir.to_path_buf(),
394    };
395    let mut state = load_state(&target)?;
396    state.paused = true;
397    state.next_seq += 1;
398    state.mailbox.push_back(WorkflowMessageRecord {
399        seq: state.next_seq,
400        kind: "control".to_string(),
401        name: "pause".to_string(),
402        request_id: None,
403        payload: serde_json::json!({}),
404        enqueued_at: now_rfc3339(),
405    });
406    save_state(&target, &state)?;
407    Ok(workflow_status_json(&target, &state))
408}
409
410pub fn workflow_resume_for_base(
411    base_dir: &Path,
412    workflow_id: &str,
413) -> Result<serde_json::Value, String> {
414    let target = WorkflowTarget {
415        workflow_id: sanitize_workflow_id(workflow_id),
416        base_dir: base_dir.to_path_buf(),
417    };
418    let mut state = load_state(&target)?;
419    state.paused = false;
420    state.next_seq += 1;
421    state.mailbox.push_back(WorkflowMessageRecord {
422        seq: state.next_seq,
423        kind: "control".to_string(),
424        name: "resume".to_string(),
425        request_id: None,
426        payload: serde_json::json!({}),
427        enqueued_at: now_rfc3339(),
428    });
429    save_state(&target, &state)?;
430    Ok(workflow_status_json(&target, &state))
431}
432
433pub async fn workflow_update_for_base(
434    base_dir: &Path,
435    workflow_id: &str,
436    name: &str,
437    payload: serde_json::Value,
438    timeout: StdDuration,
439) -> Result<serde_json::Value, String> {
440    let target = WorkflowTarget {
441        workflow_id: sanitize_workflow_id(workflow_id),
442        base_dir: base_dir.to_path_buf(),
443    };
444    let request_id = uuid::Uuid::now_v7().to_string();
445    enqueue_message(&target, "update", name, payload, Some(request_id.clone()))?;
446    let started = std::time::Instant::now();
447    while started.elapsed() <= timeout {
448        if let Ok(state) = load_state(&target) {
449            if let Some(response) = state.responses.get(&request_id) {
450                return Ok(response.value.clone());
451            }
452        }
453        tokio::time::sleep(StdDuration::from_millis(UPDATE_POLL_INTERVAL_MS)).await;
454    }
455    Err(format!(
456        "workflow update '{name}' timed out for '{}'",
457        target.workflow_id
458    ))
459}
460
461pub fn workflow_respond_update_for_base(
462    base_dir: &Path,
463    workflow_id: &str,
464    request_id: &str,
465    name: Option<&str>,
466    value: serde_json::Value,
467) -> Result<serde_json::Value, String> {
468    let target = WorkflowTarget {
469        workflow_id: sanitize_workflow_id(workflow_id),
470        base_dir: base_dir.to_path_buf(),
471    };
472    let mut state = load_state(&target)?;
473    state.responses.insert(
474        request_id.to_string(),
475        WorkflowUpdateResponseRecord {
476            request_id: request_id.to_string(),
477            name: name.map(ToString::to_string),
478            value,
479            responded_at: now_rfc3339(),
480        },
481    );
482    save_state(&target, &state)?;
483    Ok(workflow_status_json(&target, &state))
484}
485
486pub(crate) fn register_workflow_message_builtins(vm: &mut Vm) {
487    vm.set_global(
488        "workflow",
489        VmValue::Dict(Rc::new(BTreeMap::from([
490            (
491                "signal".to_string(),
492                VmValue::BuiltinRef(Rc::from("workflow.signal")),
493            ),
494            (
495                "query".to_string(),
496                VmValue::BuiltinRef(Rc::from("workflow.query")),
497            ),
498            (
499                "update".to_string(),
500                VmValue::BuiltinRef(Rc::from("workflow.update")),
501            ),
502            (
503                "publish_query".to_string(),
504                VmValue::BuiltinRef(Rc::from("workflow.publish_query")),
505            ),
506            (
507                "receive".to_string(),
508                VmValue::BuiltinRef(Rc::from("workflow.receive")),
509            ),
510            (
511                "respond_update".to_string(),
512                VmValue::BuiltinRef(Rc::from("workflow.respond_update")),
513            ),
514            (
515                "pause".to_string(),
516                VmValue::BuiltinRef(Rc::from("workflow.pause")),
517            ),
518            (
519                "resume".to_string(),
520                VmValue::BuiltinRef(Rc::from("workflow.resume")),
521            ),
522            (
523                "status".to_string(),
524                VmValue::BuiltinRef(Rc::from("workflow.status")),
525            ),
526            (
527                "continue_as_new".to_string(),
528                VmValue::BuiltinRef(Rc::from("workflow.continue_as_new")),
529            ),
530        ]))),
531    );
532
533    register_builtin_group(vm, WORKFLOW_MESSAGE_PRIMITIVES);
534}
535
536fn workflow_signal_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
537    let target = parse_target_vm(args.first(), None, "workflow.signal")?;
538    let name = args
539        .get(1)
540        .map(|value| value.display())
541        .filter(|value| !value.is_empty())
542        .ok_or_else(|| VmError::Runtime("workflow.signal: missing name".to_string()))?;
543    let payload = args
544        .get(2)
545        .map(crate::llm::vm_value_to_json)
546        .unwrap_or(serde_json::Value::Null);
547    let result =
548        enqueue_message(&target, "signal", &name, payload, None).map_err(VmError::Runtime)?;
549    Ok(crate::stdlib::json_to_vm_value(&result))
550}
551
552fn workflow_query_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
553    let target = parse_target_vm(args.first(), None, "workflow.query")?;
554    let name = args
555        .get(1)
556        .map(|value| value.display())
557        .filter(|value| !value.is_empty())
558        .ok_or_else(|| VmError::Runtime("workflow.query: missing name".to_string()))?;
559    let state = load_state(&target).map_err(VmError::Runtime)?;
560    Ok(crate::stdlib::json_to_vm_value(
561        &state
562            .queries
563            .get(&name)
564            .map(|record| record.value.clone())
565            .unwrap_or(serde_json::Value::Null),
566    ))
567}
568
569async fn workflow_update_builtin(args: Vec<VmValue>) -> Result<VmValue, VmError> {
570    let target = parse_target_vm(args.first(), None, "workflow.update")?;
571    let name = args
572        .get(1)
573        .map(|value| value.display())
574        .filter(|value| !value.is_empty())
575        .ok_or_else(|| VmError::Runtime("workflow.update: missing name".to_string()))?;
576    let payload = args
577        .get(2)
578        .map(crate::llm::vm_value_to_json)
579        .unwrap_or(serde_json::Value::Null);
580    let timeout_ms = args
581        .get(3)
582        .and_then(|value| value.as_dict())
583        .and_then(|dict| dict.get("timeout_ms"))
584        .and_then(VmValue::as_int)
585        .unwrap_or(DEFAULT_UPDATE_TIMEOUT_MS as i64)
586        .max(1) as u64;
587    let result = workflow_update_for_base(
588        &target.base_dir,
589        &target.workflow_id,
590        &name,
591        payload,
592        StdDuration::from_millis(timeout_ms),
593    )
594    .await
595    .map_err(VmError::Runtime)?;
596    Ok(crate::stdlib::json_to_vm_value(&result))
597}
598
599fn workflow_publish_query_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
600    let target = parse_target_vm(args.first(), None, "workflow.publish_query")?;
601    let name = args
602        .get(1)
603        .map(|value| value.display())
604        .filter(|value| !value.is_empty())
605        .ok_or_else(|| VmError::Runtime("workflow.publish_query: missing name".to_string()))?;
606    let value = args
607        .get(2)
608        .map(crate::llm::vm_value_to_json)
609        .unwrap_or(serde_json::Value::Null);
610    let result =
611        workflow_publish_query_for_base(&target.base_dir, &target.workflow_id, &name, value)
612            .map_err(VmError::Runtime)?;
613    Ok(crate::stdlib::json_to_vm_value(&result))
614}
615
616fn workflow_receive_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
617    let target = parse_target_vm(args.first(), None, "workflow.receive")?;
618    let mut state = load_state(&target).map_err(VmError::Runtime)?;
619    let Some(message) = state.mailbox.pop_front() else {
620        return Ok(VmValue::Nil);
621    };
622    save_state(&target, &state).map_err(VmError::Runtime)?;
623    Ok(crate::stdlib::json_to_vm_value(&serde_json::json!({
624        "workflow_id": target.workflow_id,
625        "seq": message.seq,
626        "kind": message.kind,
627        "name": message.name,
628        "request_id": message.request_id,
629        "payload": message.payload,
630        "enqueued_at": message.enqueued_at,
631    })))
632}
633
634fn workflow_respond_update_builtin(
635    args: &[VmValue],
636    _out: &mut String,
637) -> Result<VmValue, VmError> {
638    let target = parse_target_vm(args.first(), None, "workflow.respond_update")?;
639    let request_id = args
640        .get(1)
641        .map(|value| value.display())
642        .filter(|value| !value.is_empty())
643        .ok_or_else(|| {
644            VmError::Runtime("workflow.respond_update: missing request id".to_string())
645        })?;
646    let value = args
647        .get(2)
648        .map(crate::llm::vm_value_to_json)
649        .unwrap_or(serde_json::Value::Null);
650    let name = args
651        .get(3)
652        .map(|value| value.display())
653        .filter(|value| !value.is_empty());
654    let result = workflow_respond_update_for_base(
655        &target.base_dir,
656        &target.workflow_id,
657        &request_id,
658        name.as_deref(),
659        value,
660    )
661    .map_err(VmError::Runtime)?;
662    Ok(crate::stdlib::json_to_vm_value(&result))
663}
664
665fn workflow_pause_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
666    let target = parse_target_vm(args.first(), None, "workflow.pause")?;
667    let result =
668        workflow_pause_for_base(&target.base_dir, &target.workflow_id).map_err(VmError::Runtime)?;
669    Ok(crate::stdlib::json_to_vm_value(&result))
670}
671
672fn workflow_resume_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
673    let target = parse_target_vm(args.first(), None, "workflow.resume")?;
674    let result = workflow_resume_for_base(&target.base_dir, &target.workflow_id)
675        .map_err(VmError::Runtime)?;
676    Ok(crate::stdlib::json_to_vm_value(&result))
677}
678
679fn workflow_status_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
680    let target = parse_target_vm(args.first(), None, "workflow.status")?;
681    let state = load_state(&target).map_err(VmError::Runtime)?;
682    Ok(crate::stdlib::json_to_vm_value(&workflow_status_json(
683        &target, &state,
684    )))
685}
686
687fn workflow_continue_as_new_builtin(
688    args: &[VmValue],
689    _out: &mut String,
690) -> Result<VmValue, VmError> {
691    continue_as_new_for_label(args, "workflow.continue_as_new")
692}
693
694fn continue_as_new_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
695    continue_as_new_for_label(args, "continue_as_new")
696}
697
698fn continue_as_new_for_label(args: &[VmValue], label: &str) -> Result<VmValue, VmError> {
699    let target = parse_target_vm(args.first(), None, label)?;
700    let mut state = load_state(&target).map_err(VmError::Runtime)?;
701    state.generation += 1;
702    state.continue_as_new_count += 1;
703    state.last_continue_as_new_at = Some(now_rfc3339());
704    state.responses.clear();
705    save_state(&target, &state).map_err(VmError::Runtime)?;
706    Ok(crate::stdlib::json_to_vm_value(&workflow_status_json(
707        &target, &state,
708    )))
709}
710
711#[cfg(test)]
712mod tests {
713    use super::*;
714
715    #[tokio::test(start_paused = true)]
716    async fn update_round_trip_waits_for_response() {
717        let dir = tempfile::tempdir().expect("tempdir");
718        let workflow_id = "wf-update";
719        let base_dir = dir.path().to_path_buf();
720        let base_dir_clone = base_dir.clone();
721        let task = tokio::spawn(async move {
722            workflow_update_for_base(
723                &base_dir_clone,
724                workflow_id,
725                "adjust_budget",
726                serde_json::json!({"max_usd": 10}),
727                StdDuration::from_millis(500),
728            )
729            .await
730        });
731
732        let target = WorkflowTarget {
733            workflow_id: workflow_id.to_string(),
734            base_dir: base_dir.clone(),
735        };
736        let mut state = None;
737        let mut message = None;
738        for _ in 0..100 {
739            if let Ok(mut loaded) = load_state(&target) {
740                if let Some(queued) = loaded.mailbox.pop_front() {
741                    state = Some(loaded);
742                    message = Some(queued);
743                    break;
744                }
745            }
746            tokio::task::yield_now().await;
747        }
748        let mut state = state.expect("load state with queued update");
749        let message = message.expect("queued update");
750        assert_eq!(message.kind, "update");
751        state.responses.insert(
752            message.request_id.clone().expect("request id"),
753            WorkflowUpdateResponseRecord {
754                request_id: message.request_id.expect("request id"),
755                name: Some(message.name.clone()),
756                value: serde_json::json!({"ok": true}),
757                responded_at: now_rfc3339(),
758            },
759        );
760        save_state(&target, &state).expect("save response");
761        tokio::time::advance(StdDuration::from_millis(UPDATE_POLL_INTERVAL_MS)).await;
762
763        let result = task.await.expect("join").expect("update result");
764        assert_eq!(result, serde_json::json!({"ok": true}));
765    }
766
767    #[test]
768    fn persisted_path_drives_target_base_dir() {
769        let base = parse_target_json(
770            &serde_json::json!({
771                "workflow_id": "wf",
772                "persisted_path": "/tmp/demo/.harn-runs/run.json"
773            }),
774            None,
775        )
776        .expect("target");
777        assert_eq!(base.workflow_id, "wf");
778        assert_eq!(base.base_dir, PathBuf::from("/tmp/demo"));
779    }
780}