Skip to main content

harn_vm/stdlib/
workflow_messages.rs

1use std::collections::{BTreeMap, VecDeque};
2use std::path::{Path, PathBuf};
3use std::rc::Rc;
4use std::time::Duration as StdDuration;
5
6use serde::{Deserialize, Serialize};
7
8use crate::stdlib::process::runtime_root_base;
9use crate::stdlib::registration::{
10    async_builtin, register_builtin_group, AsyncBuiltin, BuiltinGroup, SyncBuiltin,
11};
12use crate::value::{VmError, VmValue};
13use crate::vm::{Vm, VmBuiltinArity};
14
15const DEFAULT_UPDATE_TIMEOUT_MS: u64 = 30_000;
16const UPDATE_POLL_INTERVAL_MS: u64 = 25;
17
18const WORKFLOW_MESSAGE_SYNC_PRIMITIVES: &[SyncBuiltin] = &[
19    SyncBuiltin::new("workflow.signal", workflow_signal_builtin)
20        .signature("workflow.signal(target, name, payload?)")
21        .arity(VmBuiltinArity::Range { min: 2, max: 3 })
22        .doc("Enqueue a workflow signal message."),
23    SyncBuiltin::new("workflow.query", workflow_query_builtin)
24        .signature("workflow.query(target, name)")
25        .arity(VmBuiltinArity::Exact(2))
26        .doc("Read the latest published workflow query value."),
27    SyncBuiltin::new("workflow.publish_query", workflow_publish_query_builtin)
28        .signature("workflow.publish_query(target, name, value?)")
29        .arity(VmBuiltinArity::Range { min: 2, max: 3 })
30        .doc("Publish a workflow query value."),
31    SyncBuiltin::new("workflow.receive", workflow_receive_builtin)
32        .signature("workflow.receive(target)")
33        .arity(VmBuiltinArity::Exact(1))
34        .doc("Receive the next workflow mailbox message."),
35    SyncBuiltin::new("workflow.respond_update", workflow_respond_update_builtin)
36        .signature("workflow.respond_update(target, request_id, value?, name?)")
37        .arity(VmBuiltinArity::Range { min: 2, max: 4 })
38        .doc("Respond to a pending workflow update request."),
39    SyncBuiltin::new("workflow.pause", workflow_pause_builtin)
40        .signature("workflow.pause(target)")
41        .arity(VmBuiltinArity::Exact(1))
42        .doc("Pause a workflow mailbox."),
43    SyncBuiltin::new("workflow.resume", workflow_resume_builtin)
44        .signature("workflow.resume(target)")
45        .arity(VmBuiltinArity::Exact(1))
46        .doc("Resume a workflow mailbox."),
47    SyncBuiltin::new("workflow.status", workflow_status_builtin)
48        .signature("workflow.status(target)")
49        .arity(VmBuiltinArity::Exact(1))
50        .doc("Return workflow mailbox status."),
51    SyncBuiltin::new("workflow.continue_as_new", workflow_continue_as_new_builtin)
52        .signature("workflow.continue_as_new(target)")
53        .arity(VmBuiltinArity::Exact(1))
54        .doc("Advance a workflow mailbox generation."),
55    SyncBuiltin::new("continue_as_new", continue_as_new_builtin)
56        .signature("continue_as_new(target)")
57        .arity(VmBuiltinArity::Exact(1))
58        .doc("Advance a workflow mailbox generation."),
59];
60
61const WORKFLOW_MESSAGE_ASYNC_PRIMITIVES: &[AsyncBuiltin] =
62    &[async_builtin!("workflow.update", workflow_update_builtin)
63        .signature("workflow.update(target, name, payload?, options?)")
64        .arity(VmBuiltinArity::Range { min: 2, max: 4 })
65        .doc("Enqueue a workflow update and wait for a response.")];
66
67const WORKFLOW_MESSAGE_PRIMITIVES: BuiltinGroup<'static> = BuiltinGroup::new()
68    .category("workflow.messages")
69    .sync(WORKFLOW_MESSAGE_SYNC_PRIMITIVES)
70    .async_(WORKFLOW_MESSAGE_ASYNC_PRIMITIVES);
71
72#[derive(Clone, Debug, Serialize, Deserialize)]
73pub struct WorkflowMessageRecord {
74    pub seq: u64,
75    pub kind: String,
76    pub name: String,
77    #[serde(skip_serializing_if = "Option::is_none")]
78    pub request_id: Option<String>,
79    pub payload: serde_json::Value,
80    pub enqueued_at: String,
81}
82
83#[derive(Clone, Debug, Serialize, Deserialize)]
84pub struct WorkflowQueryRecord {
85    pub value: serde_json::Value,
86    pub published_at: String,
87}
88
89#[derive(Clone, Debug, Serialize, Deserialize)]
90pub struct WorkflowUpdateResponseRecord {
91    pub request_id: String,
92    #[serde(skip_serializing_if = "Option::is_none")]
93    pub name: Option<String>,
94    pub value: serde_json::Value,
95    pub responded_at: String,
96}
97
98#[derive(Clone, Debug, Serialize, Deserialize)]
99pub struct WorkflowMailboxState {
100    #[serde(rename = "_type")]
101    pub type_name: String,
102    pub workflow_id: String,
103    #[serde(default = "default_generation")]
104    pub generation: u64,
105    #[serde(default)]
106    pub continue_as_new_count: u64,
107    #[serde(skip_serializing_if = "Option::is_none")]
108    pub last_continue_as_new_at: Option<String>,
109    #[serde(default)]
110    pub paused: bool,
111    #[serde(default)]
112    pub next_seq: u64,
113    #[serde(default)]
114    pub mailbox: VecDeque<WorkflowMessageRecord>,
115    #[serde(default)]
116    pub queries: BTreeMap<String, WorkflowQueryRecord>,
117    #[serde(default)]
118    pub responses: BTreeMap<String, WorkflowUpdateResponseRecord>,
119}
120
121impl Default for WorkflowMailboxState {
122    fn default() -> Self {
123        Self {
124            type_name: "workflow_mailbox".to_string(),
125            workflow_id: String::new(),
126            generation: default_generation(),
127            continue_as_new_count: 0,
128            last_continue_as_new_at: None,
129            paused: false,
130            next_seq: 0,
131            mailbox: VecDeque::new(),
132            queries: BTreeMap::new(),
133            responses: BTreeMap::new(),
134        }
135    }
136}
137
138fn default_generation() -> u64 {
139    1
140}
141
142#[derive(Clone, Debug, PartialEq, Eq)]
143struct WorkflowTarget {
144    workflow_id: String,
145    base_dir: PathBuf,
146}
147
148fn sanitize_workflow_id(raw: &str) -> String {
149    let trimmed = raw.trim();
150    let mut sanitized = String::with_capacity(trimmed.len());
151    for ch in trimmed.chars() {
152        if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_' | '.') {
153            sanitized.push(ch);
154        } else {
155            sanitized.push('_');
156        }
157    }
158    if sanitized.is_empty() || sanitized == "." || sanitized == ".." {
159        "workflow".to_string()
160    } else {
161        sanitized
162    }
163}
164
165fn workflow_base_dir_from_persisted_path(path: &Path) -> PathBuf {
166    for ancestor in path.ancestors() {
167        if ancestor.file_name().and_then(|value| value.to_str()) == Some(".harn-runs") {
168            return non_empty_path_or_dot(ancestor.parent().unwrap_or_else(|| Path::new(".")));
169        }
170    }
171    non_empty_path_or_dot(path.parent().unwrap_or_else(|| Path::new(".")))
172}
173
174fn non_empty_path_or_dot(path: &Path) -> PathBuf {
175    if path.as_os_str().is_empty() {
176        PathBuf::from(".")
177    } else {
178        path.to_path_buf()
179    }
180}
181
182fn workflow_target_root(target: &WorkflowTarget) -> PathBuf {
183    crate::runtime_paths::workflow_dir(&target.base_dir).join(&target.workflow_id)
184}
185
186fn workflow_state_path(target: &WorkflowTarget) -> PathBuf {
187    workflow_target_root(target).join("state.json")
188}
189
190fn now_rfc3339() -> String {
191    time::OffsetDateTime::now_utc()
192        .format(&time::format_description::well_known::Rfc3339)
193        .unwrap_or_else(|_| uuid::Uuid::now_v7().to_string())
194}
195
196fn load_state(target: &WorkflowTarget) -> Result<WorkflowMailboxState, String> {
197    let path = workflow_state_path(target);
198    if !path.exists() {
199        return Ok(WorkflowMailboxState {
200            workflow_id: target.workflow_id.clone(),
201            ..WorkflowMailboxState::default()
202        });
203    }
204    let text = std::fs::read_to_string(&path)
205        .map_err(|error| format!("workflow state read error: {error}"))?;
206    let mut state: WorkflowMailboxState = serde_json::from_str(&text)
207        .map_err(|error| format!("workflow state parse error: {error}"))?;
208    if state.type_name.is_empty() {
209        state.type_name = "workflow_mailbox".to_string();
210    }
211    if state.workflow_id.is_empty() {
212        state.workflow_id = target.workflow_id.clone();
213    }
214    if state.generation == 0 {
215        state.generation = 1;
216    }
217    Ok(state)
218}
219
220fn save_state(target: &WorkflowTarget, state: &WorkflowMailboxState) -> Result<(), String> {
221    let path = workflow_state_path(target);
222    let json = serde_json::to_string_pretty(state)
223        .map_err(|error| format!("workflow state encode error: {error}"))?;
224    crate::atomic_io::atomic_write(&path, json.as_bytes())
225        .map_err(|error| format!("workflow state write error: {error}"))
226}
227
228fn parse_target_json(
229    value: &serde_json::Value,
230    fallback_base_dir: Option<&Path>,
231) -> Option<WorkflowTarget> {
232    match value {
233        serde_json::Value::String(text) => Some(WorkflowTarget {
234            workflow_id: sanitize_workflow_id(text),
235            base_dir: fallback_base_dir
236                .map(Path::to_path_buf)
237                .unwrap_or_else(runtime_root_base),
238        }),
239        serde_json::Value::Object(map) => {
240            let workflow_id = map
241                .get("workflow_id")
242                .and_then(|value| value.as_str())
243                .or_else(|| map.get("workflow").and_then(|value| value.as_str()))
244                .or_else(|| {
245                    map.get("run")
246                        .and_then(|value| value.get("workflow_id"))
247                        .and_then(|value| value.as_str())
248                })
249                .or_else(|| {
250                    map.get("result")
251                        .and_then(|value| value.get("run"))
252                        .and_then(|value| value.get("workflow_id"))
253                        .and_then(|value| value.as_str())
254                })?;
255            let explicit_base = map
256                .get("base_dir")
257                .and_then(|value| value.as_str())
258                .filter(|value| !value.trim().is_empty())
259                .map(PathBuf::from);
260            let persisted_path = map
261                .get("persisted_path")
262                .and_then(|value| value.as_str())
263                .or_else(|| map.get("path").and_then(|value| value.as_str()))
264                .or_else(|| {
265                    map.get("run")
266                        .and_then(|value| value.get("persisted_path"))
267                        .and_then(|value| value.as_str())
268                })
269                .or_else(|| {
270                    map.get("result")
271                        .and_then(|value| value.get("run"))
272                        .and_then(|value| value.get("persisted_path"))
273                        .and_then(|value| value.as_str())
274                });
275            let base_dir = explicit_base
276                .or_else(|| {
277                    persisted_path
278                        .map(|path| workflow_base_dir_from_persisted_path(Path::new(path)))
279                })
280                .or_else(|| fallback_base_dir.map(Path::to_path_buf))
281                .unwrap_or_else(runtime_root_base);
282            Some(WorkflowTarget {
283                workflow_id: sanitize_workflow_id(workflow_id),
284                base_dir,
285            })
286        }
287        _ => None,
288    }
289}
290
291fn parse_target_vm(
292    value: Option<&VmValue>,
293    fallback_base_dir: Option<&Path>,
294    builtin: &str,
295) -> Result<WorkflowTarget, VmError> {
296    let value = value.ok_or_else(|| VmError::Runtime(format!("{builtin}: missing target")))?;
297    parse_target_json(&crate::llm::vm_value_to_json(value), fallback_base_dir).ok_or_else(|| {
298        VmError::Runtime(format!(
299            "{builtin}: target must be a workflow id string or dict with workflow_id/workflow"
300        ))
301    })
302}
303
304fn workflow_status_json(
305    target: &WorkflowTarget,
306    state: &WorkflowMailboxState,
307) -> serde_json::Value {
308    serde_json::json!({
309        "workflow_id": target.workflow_id,
310        "base_dir": target.base_dir.to_string_lossy(),
311        "generation": state.generation,
312        "paused": state.paused,
313        "pending_count": state.mailbox.len(),
314        "query_count": state.queries.len(),
315        "response_count": state.responses.len(),
316        "continue_as_new_count": state.continue_as_new_count,
317        "last_continue_as_new_at": state.last_continue_as_new_at,
318    })
319}
320
321fn target_for_base(base_dir: &Path, workflow_id: &str) -> WorkflowTarget {
322    WorkflowTarget {
323        workflow_id: sanitize_workflow_id(workflow_id),
324        base_dir: base_dir.to_path_buf(),
325    }
326}
327
328fn enqueue_message(
329    target: &WorkflowTarget,
330    kind: &str,
331    name: &str,
332    payload: serde_json::Value,
333    request_id: Option<String>,
334) -> Result<serde_json::Value, String> {
335    let mut state = load_state(target)?;
336    let message = push_message(&mut state, kind, name, payload, request_id);
337    save_state(target, &state)?;
338    Ok(serde_json::json!({
339        "workflow_id": target.workflow_id,
340        "message": message,
341        "status": workflow_status_json(target, &state),
342    }))
343}
344
345fn push_message(
346    state: &mut WorkflowMailboxState,
347    kind: &str,
348    name: &str,
349    payload: serde_json::Value,
350    request_id: Option<String>,
351) -> WorkflowMessageRecord {
352    state.next_seq += 1;
353    let message = WorkflowMessageRecord {
354        seq: state.next_seq,
355        kind: kind.to_string(),
356        name: name.to_string(),
357        request_id,
358        payload,
359        enqueued_at: now_rfc3339(),
360    };
361    state.mailbox.push_back(message.clone());
362    message
363}
364
365fn receive_message(target: &WorkflowTarget) -> Result<Option<WorkflowMessageRecord>, String> {
366    let mut state = load_state(target)?;
367    let message = state.mailbox.pop_front();
368    if message.is_some() {
369        save_state(target, &state)?;
370    }
371    Ok(message)
372}
373
374pub fn workflow_signal_for_base(
375    base_dir: &Path,
376    workflow_id: &str,
377    name: &str,
378    payload: serde_json::Value,
379) -> Result<serde_json::Value, String> {
380    let target = target_for_base(base_dir, workflow_id);
381    enqueue_message(&target, "signal", name, payload, None)
382}
383
384pub fn workflow_query_for_base(
385    base_dir: &Path,
386    workflow_id: &str,
387    name: &str,
388) -> Result<serde_json::Value, String> {
389    let target = target_for_base(base_dir, workflow_id);
390    let state = load_state(&target)?;
391    Ok(state
392        .queries
393        .get(name)
394        .map(|record| record.value.clone())
395        .unwrap_or(serde_json::Value::Null))
396}
397
398pub fn workflow_publish_query_for_base(
399    base_dir: &Path,
400    workflow_id: &str,
401    name: &str,
402    value: serde_json::Value,
403) -> Result<serde_json::Value, String> {
404    let target = target_for_base(base_dir, workflow_id);
405    let mut state = load_state(&target)?;
406    state.queries.insert(
407        name.to_string(),
408        WorkflowQueryRecord {
409            value,
410            published_at: now_rfc3339(),
411        },
412    );
413    save_state(&target, &state)?;
414    Ok(workflow_status_json(&target, &state))
415}
416
417pub fn workflow_pause_for_base(
418    base_dir: &Path,
419    workflow_id: &str,
420) -> Result<serde_json::Value, String> {
421    let target = target_for_base(base_dir, workflow_id);
422    let mut state = load_state(&target)?;
423    state.paused = true;
424    push_message(&mut state, "control", "pause", serde_json::json!({}), None);
425    save_state(&target, &state)?;
426    Ok(workflow_status_json(&target, &state))
427}
428
429pub fn workflow_resume_for_base(
430    base_dir: &Path,
431    workflow_id: &str,
432) -> Result<serde_json::Value, String> {
433    let target = target_for_base(base_dir, workflow_id);
434    let mut state = load_state(&target)?;
435    state.paused = false;
436    push_message(&mut state, "control", "resume", serde_json::json!({}), None);
437    save_state(&target, &state)?;
438    Ok(workflow_status_json(&target, &state))
439}
440
441pub async fn workflow_update_for_base(
442    base_dir: &Path,
443    workflow_id: &str,
444    name: &str,
445    payload: serde_json::Value,
446    timeout: StdDuration,
447) -> Result<serde_json::Value, String> {
448    let target = target_for_base(base_dir, workflow_id);
449    let request_id = enqueue_update_request(&target, name, payload)?;
450    wait_for_update_response(&target, name, &request_id, timeout).await
451}
452
453fn enqueue_update_request(
454    target: &WorkflowTarget,
455    name: &str,
456    payload: serde_json::Value,
457) -> Result<String, String> {
458    let request_id = uuid::Uuid::now_v7().to_string();
459    enqueue_message(target, "update", name, payload, Some(request_id.clone()))?;
460    Ok(request_id)
461}
462
463async fn wait_for_update_response(
464    target: &WorkflowTarget,
465    name: &str,
466    request_id: &str,
467    timeout: StdDuration,
468) -> Result<serde_json::Value, String> {
469    let deadline = tokio::time::Instant::now() + timeout;
470    loop {
471        match update_response_value(target, request_id) {
472            Ok(Some(value)) => return Ok(value),
473            Ok(None) => {}
474            Err(error) => return Err(error),
475        }
476
477        let now = tokio::time::Instant::now();
478        if now >= deadline {
479            break;
480        }
481        let next_poll = now + StdDuration::from_millis(UPDATE_POLL_INTERVAL_MS);
482        tokio::time::sleep_until(next_poll.min(deadline)).await;
483    }
484    Err(format!(
485        "workflow update '{name}' timed out for '{}'",
486        target.workflow_id
487    ))
488}
489
490fn update_response_value(
491    target: &WorkflowTarget,
492    request_id: &str,
493) -> Result<Option<serde_json::Value>, String> {
494    Ok(load_state(target)?
495        .responses
496        .get(request_id)
497        .map(|response| response.value.clone()))
498}
499
500pub fn workflow_respond_update_for_base(
501    base_dir: &Path,
502    workflow_id: &str,
503    request_id: &str,
504    name: Option<&str>,
505    value: serde_json::Value,
506) -> Result<serde_json::Value, String> {
507    let target = target_for_base(base_dir, workflow_id);
508    record_update_response(&target, request_id, name, value)
509}
510
511fn record_update_response(
512    target: &WorkflowTarget,
513    request_id: &str,
514    name: Option<&str>,
515    value: serde_json::Value,
516) -> Result<serde_json::Value, String> {
517    let mut state = load_state(target)?;
518    state.responses.insert(
519        request_id.to_string(),
520        WorkflowUpdateResponseRecord {
521            request_id: request_id.to_string(),
522            name: name.map(ToString::to_string),
523            value,
524            responded_at: now_rfc3339(),
525        },
526    );
527    save_state(target, &state)?;
528    Ok(workflow_status_json(target, &state))
529}
530
531pub(crate) fn register_workflow_message_builtins(vm: &mut Vm) {
532    vm.set_global(
533        "workflow",
534        VmValue::Dict(Rc::new(BTreeMap::from([
535            (
536                "signal".to_string(),
537                VmValue::BuiltinRef(Rc::from("workflow.signal")),
538            ),
539            (
540                "query".to_string(),
541                VmValue::BuiltinRef(Rc::from("workflow.query")),
542            ),
543            (
544                "update".to_string(),
545                VmValue::BuiltinRef(Rc::from("workflow.update")),
546            ),
547            (
548                "publish_query".to_string(),
549                VmValue::BuiltinRef(Rc::from("workflow.publish_query")),
550            ),
551            (
552                "receive".to_string(),
553                VmValue::BuiltinRef(Rc::from("workflow.receive")),
554            ),
555            (
556                "respond_update".to_string(),
557                VmValue::BuiltinRef(Rc::from("workflow.respond_update")),
558            ),
559            (
560                "pause".to_string(),
561                VmValue::BuiltinRef(Rc::from("workflow.pause")),
562            ),
563            (
564                "resume".to_string(),
565                VmValue::BuiltinRef(Rc::from("workflow.resume")),
566            ),
567            (
568                "status".to_string(),
569                VmValue::BuiltinRef(Rc::from("workflow.status")),
570            ),
571            (
572                "continue_as_new".to_string(),
573                VmValue::BuiltinRef(Rc::from("workflow.continue_as_new")),
574            ),
575        ]))),
576    );
577
578    register_builtin_group(vm, WORKFLOW_MESSAGE_PRIMITIVES);
579}
580
581fn workflow_signal_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
582    let target = parse_target_vm(args.first(), None, "workflow.signal")?;
583    let name = args
584        .get(1)
585        .map(|value| value.display())
586        .filter(|value| !value.is_empty())
587        .ok_or_else(|| VmError::Runtime("workflow.signal: missing name".to_string()))?;
588    let payload = args
589        .get(2)
590        .map(crate::llm::vm_value_to_json)
591        .unwrap_or(serde_json::Value::Null);
592    let result =
593        enqueue_message(&target, "signal", &name, payload, None).map_err(VmError::Runtime)?;
594    Ok(crate::stdlib::json_to_vm_value(&result))
595}
596
597fn workflow_query_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
598    let target = parse_target_vm(args.first(), None, "workflow.query")?;
599    let name = args
600        .get(1)
601        .map(|value| value.display())
602        .filter(|value| !value.is_empty())
603        .ok_or_else(|| VmError::Runtime("workflow.query: missing name".to_string()))?;
604    let state = load_state(&target).map_err(VmError::Runtime)?;
605    Ok(crate::stdlib::json_to_vm_value(
606        &state
607            .queries
608            .get(&name)
609            .map(|record| record.value.clone())
610            .unwrap_or(serde_json::Value::Null),
611    ))
612}
613
614async fn workflow_update_builtin(args: Vec<VmValue>) -> Result<VmValue, VmError> {
615    let target = parse_target_vm(args.first(), None, "workflow.update")?;
616    let name = args
617        .get(1)
618        .map(|value| value.display())
619        .filter(|value| !value.is_empty())
620        .ok_or_else(|| VmError::Runtime("workflow.update: missing name".to_string()))?;
621    let payload = args
622        .get(2)
623        .map(crate::llm::vm_value_to_json)
624        .unwrap_or(serde_json::Value::Null);
625    let timeout_ms = args
626        .get(3)
627        .and_then(|value| value.as_dict())
628        .and_then(|dict| dict.get("timeout_ms"))
629        .and_then(VmValue::as_int)
630        .unwrap_or(DEFAULT_UPDATE_TIMEOUT_MS as i64)
631        .max(1) as u64;
632    let result = workflow_update_for_base(
633        &target.base_dir,
634        &target.workflow_id,
635        &name,
636        payload,
637        StdDuration::from_millis(timeout_ms),
638    )
639    .await
640    .map_err(VmError::Runtime)?;
641    Ok(crate::stdlib::json_to_vm_value(&result))
642}
643
644fn workflow_publish_query_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
645    let target = parse_target_vm(args.first(), None, "workflow.publish_query")?;
646    let name = args
647        .get(1)
648        .map(|value| value.display())
649        .filter(|value| !value.is_empty())
650        .ok_or_else(|| VmError::Runtime("workflow.publish_query: missing name".to_string()))?;
651    let value = args
652        .get(2)
653        .map(crate::llm::vm_value_to_json)
654        .unwrap_or(serde_json::Value::Null);
655    let result =
656        workflow_publish_query_for_base(&target.base_dir, &target.workflow_id, &name, value)
657            .map_err(VmError::Runtime)?;
658    Ok(crate::stdlib::json_to_vm_value(&result))
659}
660
661fn workflow_receive_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
662    let target = parse_target_vm(args.first(), None, "workflow.receive")?;
663    let Some(message) = receive_message(&target).map_err(VmError::Runtime)? else {
664        return Ok(VmValue::Nil);
665    };
666    Ok(crate::stdlib::json_to_vm_value(&serde_json::json!({
667        "workflow_id": target.workflow_id,
668        "seq": message.seq,
669        "kind": message.kind,
670        "name": message.name,
671        "request_id": message.request_id,
672        "payload": message.payload,
673        "enqueued_at": message.enqueued_at,
674    })))
675}
676
677fn workflow_respond_update_builtin(
678    args: &[VmValue],
679    _out: &mut String,
680) -> Result<VmValue, VmError> {
681    let target = parse_target_vm(args.first(), None, "workflow.respond_update")?;
682    let request_id = args
683        .get(1)
684        .map(|value| value.display())
685        .filter(|value| !value.is_empty())
686        .ok_or_else(|| {
687            VmError::Runtime("workflow.respond_update: missing request id".to_string())
688        })?;
689    let value = args
690        .get(2)
691        .map(crate::llm::vm_value_to_json)
692        .unwrap_or(serde_json::Value::Null);
693    let name = args
694        .get(3)
695        .map(|value| value.display())
696        .filter(|value| !value.is_empty());
697    let result = workflow_respond_update_for_base(
698        &target.base_dir,
699        &target.workflow_id,
700        &request_id,
701        name.as_deref(),
702        value,
703    )
704    .map_err(VmError::Runtime)?;
705    Ok(crate::stdlib::json_to_vm_value(&result))
706}
707
708fn workflow_pause_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
709    let target = parse_target_vm(args.first(), None, "workflow.pause")?;
710    let result =
711        workflow_pause_for_base(&target.base_dir, &target.workflow_id).map_err(VmError::Runtime)?;
712    Ok(crate::stdlib::json_to_vm_value(&result))
713}
714
715fn workflow_resume_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
716    let target = parse_target_vm(args.first(), None, "workflow.resume")?;
717    let result = workflow_resume_for_base(&target.base_dir, &target.workflow_id)
718        .map_err(VmError::Runtime)?;
719    Ok(crate::stdlib::json_to_vm_value(&result))
720}
721
722fn workflow_status_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
723    let target = parse_target_vm(args.first(), None, "workflow.status")?;
724    let state = load_state(&target).map_err(VmError::Runtime)?;
725    Ok(crate::stdlib::json_to_vm_value(&workflow_status_json(
726        &target, &state,
727    )))
728}
729
730fn workflow_continue_as_new_builtin(
731    args: &[VmValue],
732    _out: &mut String,
733) -> Result<VmValue, VmError> {
734    continue_as_new_for_label(args, "workflow.continue_as_new")
735}
736
737fn continue_as_new_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
738    continue_as_new_for_label(args, "continue_as_new")
739}
740
741fn continue_as_new_for_label(args: &[VmValue], label: &str) -> Result<VmValue, VmError> {
742    let target = parse_target_vm(args.first(), None, label)?;
743    let mut state = load_state(&target).map_err(VmError::Runtime)?;
744    state.generation += 1;
745    state.continue_as_new_count += 1;
746    state.last_continue_as_new_at = Some(now_rfc3339());
747    state.responses.clear();
748    save_state(&target, &state).map_err(VmError::Runtime)?;
749    Ok(crate::stdlib::json_to_vm_value(&workflow_status_json(
750        &target, &state,
751    )))
752}
753
754#[cfg(test)]
755mod tests {
756    use super::*;
757    use std::task::Poll;
758
759    #[tokio::test(start_paused = true)]
760    async fn update_round_trip_waits_for_response() {
761        let dir = tempfile::tempdir().expect("tempdir");
762        let workflow_id = "wf-update";
763        let base_dir = dir.path().to_path_buf();
764        let target = target_for_base(&base_dir, workflow_id);
765        let request_id =
766            enqueue_update_request(&target, "adjust_budget", serde_json::json!({"max_usd": 10}))
767                .expect("enqueue update");
768
769        let message = receive_message(&target)
770            .expect("receive queued update")
771            .expect("queued update");
772        assert_eq!(message.kind, "update");
773        assert_eq!(message.name, "adjust_budget");
774        assert_eq!(message.request_id.as_deref(), Some(request_id.as_str()));
775        assert_eq!(
776            update_response_value(&target, &request_id).expect("read response"),
777            None
778        );
779
780        let waiter = wait_for_update_response(
781            &target,
782            "adjust_budget",
783            &request_id,
784            StdDuration::from_millis(500),
785        );
786        tokio::pin!(waiter);
787        assert!(matches!(futures::poll!(&mut waiter), Poll::Pending));
788
789        workflow_respond_update_for_base(
790            &base_dir,
791            workflow_id,
792            &request_id,
793            Some("adjust_budget"),
794            serde_json::json!({"ok": true}),
795        )
796        .expect("save response");
797        assert_eq!(
798            update_response_value(&target, &request_id).expect("read response"),
799            Some(serde_json::json!({"ok": true}))
800        );
801        tokio::time::advance(StdDuration::from_millis(UPDATE_POLL_INTERVAL_MS)).await;
802
803        let result = waiter.await.expect("update result");
804        assert_eq!(result, serde_json::json!({"ok": true}));
805    }
806
807    #[tokio::test(start_paused = true)]
808    async fn update_wait_respects_short_timeout() {
809        let dir = tempfile::tempdir().expect("tempdir");
810        let target = target_for_base(dir.path(), "wf-timeout");
811        let request_id =
812            enqueue_update_request(&target, "adjust_budget", serde_json::json!({"max_usd": 10}))
813                .expect("enqueue update");
814
815        let waiter = wait_for_update_response(
816            &target,
817            "adjust_budget",
818            &request_id,
819            StdDuration::from_millis(10),
820        );
821        tokio::pin!(waiter);
822        assert!(matches!(futures::poll!(&mut waiter), Poll::Pending));
823
824        tokio::time::advance(StdDuration::from_millis(9)).await;
825        assert!(matches!(futures::poll!(&mut waiter), Poll::Pending));
826
827        tokio::time::advance(StdDuration::from_millis(1)).await;
828        let err = waiter.await.expect_err("update should time out");
829        assert!(err.contains("timed out"));
830    }
831
832    #[tokio::test]
833    async fn update_wait_propagates_state_errors() {
834        let dir = tempfile::tempdir().expect("tempdir");
835        let target = target_for_base(dir.path(), "wf-corrupt");
836        std::fs::create_dir_all(workflow_target_root(&target)).expect("state dir");
837        std::fs::write(workflow_state_path(&target), "{not json").expect("state write");
838
839        let err = wait_for_update_response(
840            &target,
841            "adjust_budget",
842            "request-1",
843            StdDuration::from_millis(10),
844        )
845        .await
846        .expect_err("corrupt state should fail immediately");
847        assert!(err.contains("workflow state parse error"));
848    }
849
850    #[test]
851    fn workflow_ids_preserve_namespace_without_path_segments() {
852        assert_eq!(
853            sanitize_workflow_id("workflow://local/start-my-day"),
854            "workflow___local_start-my-day"
855        );
856        assert_eq!(sanitize_workflow_id("../start-my-day"), ".._start-my-day");
857        assert_eq!(sanitize_workflow_id(".."), "workflow");
858    }
859
860    #[test]
861    fn persisted_path_drives_target_base_dir() {
862        let base = parse_target_json(
863            &serde_json::json!({
864                "workflow_id": "wf",
865                "persisted_path": "/tmp/demo/.harn-runs/run.json"
866            }),
867            None,
868        )
869        .expect("target");
870        assert_eq!(base.workflow_id, "wf");
871        assert_eq!(base.base_dir, PathBuf::from("/tmp/demo"));
872    }
873
874    #[test]
875    fn nested_persisted_path_drives_target_base_dir() {
876        let base = parse_target_json(
877            &serde_json::json!({
878                "workflow_id": "wf",
879                "persisted_path": "/tmp/demo/.harn-runs/session/run.json"
880            }),
881            None,
882        )
883        .expect("target");
884        assert_eq!(base.base_dir, PathBuf::from("/tmp/demo"));
885
886        let relative = parse_target_json(
887            &serde_json::json!({
888                "workflow_id": "wf",
889                "persisted_path": ".harn-runs/session/run.json"
890            }),
891            None,
892        )
893        .expect("target");
894        assert_eq!(relative.base_dir, PathBuf::from("."));
895    }
896}