Skip to main content

harn_vm/stdlib/
workflow_messages.rs

1use std::collections::{BTreeMap, VecDeque};
2use std::path::{Path, PathBuf};
3use std::time::Duration as StdDuration;
4
5use serde::{Deserialize, Serialize};
6
7use crate::stdlib::macros::{harn_builtin, VmBuiltinDef};
8use crate::stdlib::process::runtime_root_base;
9use crate::value::{VmError, VmValue};
10use crate::vm::Vm;
11
12const DEFAULT_UPDATE_TIMEOUT_MS: u64 = 30_000;
13const UPDATE_POLL_INTERVAL_MS: u64 = 25;
14
15pub(crate) const MODULE_BUILTINS: &[&VmBuiltinDef] = &[
16    &WORKFLOW_SIGNAL_BUILTIN_DEF,
17    &WORKFLOW_QUERY_BUILTIN_DEF,
18    &WORKFLOW_PUBLISH_QUERY_BUILTIN_DEF,
19    &WORKFLOW_RECEIVE_BUILTIN_DEF,
20    &WORKFLOW_RESPOND_UPDATE_BUILTIN_DEF,
21    &WORKFLOW_PAUSE_BUILTIN_DEF,
22    &WORKFLOW_RESUME_BUILTIN_DEF,
23    &WORKFLOW_STATUS_BUILTIN_DEF,
24    &WORKFLOW_CONTINUE_AS_NEW_BUILTIN_DEF,
25    &CONTINUE_AS_NEW_BUILTIN_DEF,
26    &WORKFLOW_UPDATE_BUILTIN_DEF,
27];
28
29#[derive(Clone, Debug, Serialize, Deserialize)]
30pub struct WorkflowMessageRecord {
31    pub seq: u64,
32    pub kind: String,
33    pub name: String,
34    #[serde(skip_serializing_if = "Option::is_none")]
35    pub request_id: Option<String>,
36    pub payload: serde_json::Value,
37    pub enqueued_at: String,
38}
39
40#[derive(Clone, Debug, Serialize, Deserialize)]
41pub struct WorkflowQueryRecord {
42    pub value: serde_json::Value,
43    pub published_at: String,
44}
45
46#[derive(Clone, Debug, Serialize, Deserialize)]
47pub struct WorkflowUpdateResponseRecord {
48    pub request_id: String,
49    #[serde(skip_serializing_if = "Option::is_none")]
50    pub name: Option<String>,
51    pub value: serde_json::Value,
52    pub responded_at: String,
53}
54
55#[derive(Clone, Debug, Serialize, Deserialize)]
56pub struct WorkflowMailboxState {
57    #[serde(rename = "_type")]
58    pub type_name: String,
59    pub workflow_id: String,
60    #[serde(default = "default_generation")]
61    pub generation: u64,
62    #[serde(default)]
63    pub continue_as_new_count: u64,
64    #[serde(skip_serializing_if = "Option::is_none")]
65    pub last_continue_as_new_at: Option<String>,
66    #[serde(default)]
67    pub paused: bool,
68    #[serde(default)]
69    pub next_seq: u64,
70    #[serde(default)]
71    pub mailbox: VecDeque<WorkflowMessageRecord>,
72    #[serde(default)]
73    pub queries: BTreeMap<String, WorkflowQueryRecord>,
74    #[serde(default)]
75    pub responses: BTreeMap<String, WorkflowUpdateResponseRecord>,
76}
77
78impl Default for WorkflowMailboxState {
79    fn default() -> Self {
80        Self {
81            type_name: "workflow_mailbox".to_string(),
82            workflow_id: String::new(),
83            generation: default_generation(),
84            continue_as_new_count: 0,
85            last_continue_as_new_at: None,
86            paused: false,
87            next_seq: 0,
88            mailbox: VecDeque::new(),
89            queries: BTreeMap::new(),
90            responses: BTreeMap::new(),
91        }
92    }
93}
94
95fn default_generation() -> u64 {
96    1
97}
98
99#[derive(Clone, Debug, PartialEq, Eq)]
100struct WorkflowTarget {
101    workflow_id: String,
102    base_dir: PathBuf,
103}
104
105fn sanitize_workflow_id(raw: &str) -> String {
106    let trimmed = raw.trim();
107    let mut sanitized = String::with_capacity(trimmed.len());
108    for ch in trimmed.chars() {
109        if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_' | '.') {
110            sanitized.push(ch);
111        } else {
112            sanitized.push('_');
113        }
114    }
115    if sanitized.is_empty() || sanitized == "." || sanitized == ".." {
116        "workflow".to_string()
117    } else {
118        sanitized
119    }
120}
121
122fn workflow_base_dir_from_persisted_path(path: &Path) -> PathBuf {
123    for ancestor in path.ancestors() {
124        if ancestor.file_name().and_then(|value| value.to_str()) == Some(".harn-runs") {
125            return non_empty_path_or_dot(ancestor.parent().unwrap_or_else(|| Path::new(".")));
126        }
127    }
128    non_empty_path_or_dot(path.parent().unwrap_or_else(|| Path::new(".")))
129}
130
131fn non_empty_path_or_dot(path: &Path) -> PathBuf {
132    if path.as_os_str().is_empty() {
133        PathBuf::from(".")
134    } else {
135        path.to_path_buf()
136    }
137}
138
139fn workflow_target_root(target: &WorkflowTarget) -> PathBuf {
140    crate::runtime_paths::workflow_dir(&target.base_dir).join(&target.workflow_id)
141}
142
143fn workflow_state_path(target: &WorkflowTarget) -> PathBuf {
144    workflow_target_root(target).join("state.json")
145}
146
147fn now_rfc3339() -> String {
148    time::OffsetDateTime::now_utc()
149        .format(&time::format_description::well_known::Rfc3339)
150        .unwrap_or_else(|_| uuid::Uuid::now_v7().to_string())
151}
152
153fn load_state(target: &WorkflowTarget) -> Result<WorkflowMailboxState, String> {
154    let path = workflow_state_path(target);
155    if !path.exists() {
156        return Ok(WorkflowMailboxState {
157            workflow_id: target.workflow_id.clone(),
158            ..WorkflowMailboxState::default()
159        });
160    }
161    let text = std::fs::read_to_string(&path)
162        .map_err(|error| format!("workflow state read error: {error}"))?;
163    let mut state: WorkflowMailboxState = serde_json::from_str(&text)
164        .map_err(|error| format!("workflow state parse error: {error}"))?;
165    if state.type_name.is_empty() {
166        state.type_name = "workflow_mailbox".to_string();
167    }
168    if state.workflow_id.is_empty() {
169        state.workflow_id = target.workflow_id.clone();
170    }
171    if state.generation == 0 {
172        state.generation = 1;
173    }
174    Ok(state)
175}
176
177fn save_state(target: &WorkflowTarget, state: &WorkflowMailboxState) -> Result<(), String> {
178    let path = workflow_state_path(target);
179    let json = serde_json::to_string_pretty(state)
180        .map_err(|error| format!("workflow state encode error: {error}"))?;
181    crate::atomic_io::atomic_write(&path, json.as_bytes())
182        .map_err(|error| format!("workflow state write error: {error}"))
183}
184
185fn parse_target_json(
186    value: &serde_json::Value,
187    fallback_base_dir: Option<&Path>,
188) -> Option<WorkflowTarget> {
189    match value {
190        serde_json::Value::String(text) => Some(WorkflowTarget {
191            workflow_id: sanitize_workflow_id(text),
192            base_dir: fallback_base_dir
193                .map(Path::to_path_buf)
194                .unwrap_or_else(runtime_root_base),
195        }),
196        serde_json::Value::Object(map) => {
197            let workflow_id = map
198                .get("workflow_id")
199                .and_then(|value| value.as_str())
200                .or_else(|| map.get("workflow").and_then(|value| value.as_str()))
201                .or_else(|| {
202                    map.get("run")
203                        .and_then(|value| value.get("workflow_id"))
204                        .and_then(|value| value.as_str())
205                })
206                .or_else(|| {
207                    map.get("result")
208                        .and_then(|value| value.get("run"))
209                        .and_then(|value| value.get("workflow_id"))
210                        .and_then(|value| value.as_str())
211                })?;
212            let explicit_base = map
213                .get("base_dir")
214                .and_then(|value| value.as_str())
215                .filter(|value| !value.trim().is_empty())
216                .map(PathBuf::from);
217            let persisted_path = map
218                .get("persisted_path")
219                .and_then(|value| value.as_str())
220                .or_else(|| map.get("path").and_then(|value| value.as_str()))
221                .or_else(|| {
222                    map.get("run")
223                        .and_then(|value| value.get("persisted_path"))
224                        .and_then(|value| value.as_str())
225                })
226                .or_else(|| {
227                    map.get("result")
228                        .and_then(|value| value.get("run"))
229                        .and_then(|value| value.get("persisted_path"))
230                        .and_then(|value| value.as_str())
231                });
232            let base_dir = explicit_base
233                .or_else(|| {
234                    persisted_path
235                        .map(|path| workflow_base_dir_from_persisted_path(Path::new(path)))
236                })
237                .or_else(|| fallback_base_dir.map(Path::to_path_buf))
238                .unwrap_or_else(runtime_root_base);
239            Some(WorkflowTarget {
240                workflow_id: sanitize_workflow_id(workflow_id),
241                base_dir,
242            })
243        }
244        _ => None,
245    }
246}
247
248fn parse_target_vm(
249    value: Option<&VmValue>,
250    fallback_base_dir: Option<&Path>,
251    builtin: &str,
252) -> Result<WorkflowTarget, VmError> {
253    let value = value.ok_or_else(|| VmError::Runtime(format!("{builtin}: missing target")))?;
254    parse_target_json(&crate::llm::vm_value_to_json(value), fallback_base_dir).ok_or_else(|| {
255        VmError::Runtime(format!(
256            "{builtin}: target must be a workflow id string or dict with workflow_id/workflow"
257        ))
258    })
259}
260
261fn workflow_status_json(
262    target: &WorkflowTarget,
263    state: &WorkflowMailboxState,
264) -> serde_json::Value {
265    serde_json::json!({
266        "workflow_id": target.workflow_id,
267        "base_dir": target.base_dir.to_string_lossy(),
268        "generation": state.generation,
269        "paused": state.paused,
270        "pending_count": state.mailbox.len(),
271        "query_count": state.queries.len(),
272        "response_count": state.responses.len(),
273        "continue_as_new_count": state.continue_as_new_count,
274        "last_continue_as_new_at": state.last_continue_as_new_at,
275    })
276}
277
278fn target_for_base(base_dir: &Path, workflow_id: &str) -> WorkflowTarget {
279    WorkflowTarget {
280        workflow_id: sanitize_workflow_id(workflow_id),
281        base_dir: base_dir.to_path_buf(),
282    }
283}
284
285fn enqueue_message(
286    target: &WorkflowTarget,
287    kind: &str,
288    name: &str,
289    payload: serde_json::Value,
290    request_id: Option<String>,
291) -> Result<serde_json::Value, String> {
292    let mut state = load_state(target)?;
293    let message = push_message(&mut state, kind, name, payload, request_id);
294    save_state(target, &state)?;
295    Ok(serde_json::json!({
296        "workflow_id": target.workflow_id,
297        "message": message,
298        "status": workflow_status_json(target, &state),
299    }))
300}
301
302fn push_message(
303    state: &mut WorkflowMailboxState,
304    kind: &str,
305    name: &str,
306    payload: serde_json::Value,
307    request_id: Option<String>,
308) -> WorkflowMessageRecord {
309    state.next_seq += 1;
310    let message = WorkflowMessageRecord {
311        seq: state.next_seq,
312        kind: kind.to_string(),
313        name: name.to_string(),
314        request_id,
315        payload,
316        enqueued_at: now_rfc3339(),
317    };
318    state.mailbox.push_back(message.clone());
319    message
320}
321
322fn receive_message(target: &WorkflowTarget) -> Result<Option<WorkflowMessageRecord>, String> {
323    let mut state = load_state(target)?;
324    let message = state.mailbox.pop_front();
325    if message.is_some() {
326        save_state(target, &state)?;
327    }
328    Ok(message)
329}
330
331pub fn workflow_signal_for_base(
332    base_dir: &Path,
333    workflow_id: &str,
334    name: &str,
335    payload: serde_json::Value,
336) -> Result<serde_json::Value, String> {
337    let target = target_for_base(base_dir, workflow_id);
338    enqueue_message(&target, "signal", name, payload, None)
339}
340
341pub fn workflow_query_for_base(
342    base_dir: &Path,
343    workflow_id: &str,
344    name: &str,
345) -> Result<serde_json::Value, String> {
346    let target = target_for_base(base_dir, workflow_id);
347    let state = load_state(&target)?;
348    Ok(state
349        .queries
350        .get(name)
351        .map(|record| record.value.clone())
352        .unwrap_or(serde_json::Value::Null))
353}
354
355pub fn workflow_publish_query_for_base(
356    base_dir: &Path,
357    workflow_id: &str,
358    name: &str,
359    value: serde_json::Value,
360) -> Result<serde_json::Value, String> {
361    let target = target_for_base(base_dir, workflow_id);
362    let mut state = load_state(&target)?;
363    state.queries.insert(
364        name.to_string(),
365        WorkflowQueryRecord {
366            value,
367            published_at: now_rfc3339(),
368        },
369    );
370    save_state(&target, &state)?;
371    Ok(workflow_status_json(&target, &state))
372}
373
374pub fn workflow_pause_for_base(
375    base_dir: &Path,
376    workflow_id: &str,
377) -> Result<serde_json::Value, String> {
378    let target = target_for_base(base_dir, workflow_id);
379    let mut state = load_state(&target)?;
380    state.paused = true;
381    push_message(&mut state, "control", "pause", serde_json::json!({}), None);
382    save_state(&target, &state)?;
383    Ok(workflow_status_json(&target, &state))
384}
385
386pub fn workflow_resume_for_base(
387    base_dir: &Path,
388    workflow_id: &str,
389) -> Result<serde_json::Value, String> {
390    let target = target_for_base(base_dir, workflow_id);
391    let mut state = load_state(&target)?;
392    state.paused = false;
393    push_message(&mut state, "control", "resume", serde_json::json!({}), None);
394    save_state(&target, &state)?;
395    Ok(workflow_status_json(&target, &state))
396}
397
398pub async fn workflow_update_for_base(
399    base_dir: &Path,
400    workflow_id: &str,
401    name: &str,
402    payload: serde_json::Value,
403    timeout: StdDuration,
404) -> Result<serde_json::Value, String> {
405    let target = target_for_base(base_dir, workflow_id);
406    let request_id = enqueue_update_request(&target, name, payload)?;
407    wait_for_update_response(&target, name, &request_id, timeout).await
408}
409
410fn enqueue_update_request(
411    target: &WorkflowTarget,
412    name: &str,
413    payload: serde_json::Value,
414) -> Result<String, String> {
415    let request_id = uuid::Uuid::now_v7().to_string();
416    enqueue_message(target, "update", name, payload, Some(request_id.clone()))?;
417    Ok(request_id)
418}
419
420async fn wait_for_update_response(
421    target: &WorkflowTarget,
422    name: &str,
423    request_id: &str,
424    timeout: StdDuration,
425) -> Result<serde_json::Value, String> {
426    let deadline = tokio::time::Instant::now() + timeout;
427    loop {
428        match update_response_value(target, request_id) {
429            Ok(Some(value)) => return Ok(value),
430            Ok(None) => {}
431            Err(error) => return Err(error),
432        }
433
434        let now = tokio::time::Instant::now();
435        if now >= deadline {
436            break;
437        }
438        let next_poll = now + StdDuration::from_millis(UPDATE_POLL_INTERVAL_MS);
439        tokio::time::sleep_until(next_poll.min(deadline)).await;
440    }
441    Err(format!(
442        "workflow update '{name}' timed out for '{}'",
443        target.workflow_id
444    ))
445}
446
447fn update_response_value(
448    target: &WorkflowTarget,
449    request_id: &str,
450) -> Result<Option<serde_json::Value>, String> {
451    Ok(load_state(target)?
452        .responses
453        .get(request_id)
454        .map(|response| response.value.clone()))
455}
456
457pub fn workflow_respond_update_for_base(
458    base_dir: &Path,
459    workflow_id: &str,
460    request_id: &str,
461    name: Option<&str>,
462    value: serde_json::Value,
463) -> Result<serde_json::Value, String> {
464    let target = target_for_base(base_dir, workflow_id);
465    record_update_response(&target, request_id, name, value)
466}
467
468fn record_update_response(
469    target: &WorkflowTarget,
470    request_id: &str,
471    name: Option<&str>,
472    value: serde_json::Value,
473) -> Result<serde_json::Value, String> {
474    let mut state = load_state(target)?;
475    state.responses.insert(
476        request_id.to_string(),
477        WorkflowUpdateResponseRecord {
478            request_id: request_id.to_string(),
479            name: name.map(ToString::to_string),
480            value,
481            responded_at: now_rfc3339(),
482        },
483    );
484    save_state(target, &state)?;
485    Ok(workflow_status_json(target, &state))
486}
487
488pub(crate) fn register_workflow_message_builtins(vm: &mut Vm) {
489    vm.set_global(
490        "workflow",
491        VmValue::Dict(std::sync::Arc::new(BTreeMap::from([
492            (
493                "signal".to_string(),
494                VmValue::BuiltinRef(std::sync::Arc::from("workflow.signal")),
495            ),
496            (
497                "query".to_string(),
498                VmValue::BuiltinRef(std::sync::Arc::from("workflow.query")),
499            ),
500            (
501                "update".to_string(),
502                VmValue::BuiltinRef(std::sync::Arc::from("workflow.update")),
503            ),
504            (
505                "publish_query".to_string(),
506                VmValue::BuiltinRef(std::sync::Arc::from("workflow.publish_query")),
507            ),
508            (
509                "receive".to_string(),
510                VmValue::BuiltinRef(std::sync::Arc::from("workflow.receive")),
511            ),
512            (
513                "respond_update".to_string(),
514                VmValue::BuiltinRef(std::sync::Arc::from("workflow.respond_update")),
515            ),
516            (
517                "pause".to_string(),
518                VmValue::BuiltinRef(std::sync::Arc::from("workflow.pause")),
519            ),
520            (
521                "resume".to_string(),
522                VmValue::BuiltinRef(std::sync::Arc::from("workflow.resume")),
523            ),
524            (
525                "status".to_string(),
526                VmValue::BuiltinRef(std::sync::Arc::from("workflow.status")),
527            ),
528            (
529                "continue_as_new".to_string(),
530                VmValue::BuiltinRef(std::sync::Arc::from("workflow.continue_as_new")),
531            ),
532        ]))),
533    );
534
535    for def in MODULE_BUILTINS {
536        vm.register_builtin_def(def);
537    }
538}
539
540/// Enqueue a workflow signal message.
541#[harn_builtin(
542    sig = "workflow.signal(target: string|dict, name: string, payload?: any) -> dict",
543    category = "workflow.messages"
544)]
545fn workflow_signal_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
546    let target = parse_target_vm(args.first(), None, "workflow.signal")?;
547    let name = args
548        .get(1)
549        .map(|value| value.display())
550        .filter(|value| !value.is_empty())
551        .ok_or_else(|| VmError::Runtime("workflow.signal: missing name".to_string()))?;
552    let payload = args
553        .get(2)
554        .map(crate::llm::vm_value_to_json)
555        .unwrap_or(serde_json::Value::Null);
556    let result =
557        enqueue_message(&target, "signal", &name, payload, None).map_err(VmError::Runtime)?;
558    Ok(crate::stdlib::json_to_vm_value(&result))
559}
560
561/// Read the latest published workflow query value.
562#[harn_builtin(
563    sig = "workflow.query(target: string|dict, name: string) -> any",
564    category = "workflow.messages"
565)]
566fn workflow_query_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
567    let target = parse_target_vm(args.first(), None, "workflow.query")?;
568    let name = args
569        .get(1)
570        .map(|value| value.display())
571        .filter(|value| !value.is_empty())
572        .ok_or_else(|| VmError::Runtime("workflow.query: missing name".to_string()))?;
573    let state = load_state(&target).map_err(VmError::Runtime)?;
574    Ok(crate::stdlib::json_to_vm_value(
575        &state
576            .queries
577            .get(&name)
578            .map(|record| record.value.clone())
579            .unwrap_or(serde_json::Value::Null),
580    ))
581}
582
583/// Enqueue a workflow update and wait for a response.
584#[harn_builtin(
585    sig = "workflow.update(target: string|dict, name: string, payload?: any, options?: dict|nil) -> any",
586    kind = "async",
587    category = "workflow.messages"
588)]
589async fn workflow_update_builtin(
590    _ctx: crate::vm::AsyncBuiltinCtx,
591    args: Vec<VmValue>,
592) -> Result<VmValue, VmError> {
593    let target = parse_target_vm(args.first(), None, "workflow.update")?;
594    let name = args
595        .get(1)
596        .map(|value| value.display())
597        .filter(|value| !value.is_empty())
598        .ok_or_else(|| VmError::Runtime("workflow.update: missing name".to_string()))?;
599    let payload = args
600        .get(2)
601        .map(crate::llm::vm_value_to_json)
602        .unwrap_or(serde_json::Value::Null);
603    let timeout_ms = args
604        .get(3)
605        .and_then(|value| value.as_dict())
606        .and_then(|dict| dict.get("timeout_ms"))
607        .and_then(VmValue::as_int)
608        .unwrap_or(DEFAULT_UPDATE_TIMEOUT_MS as i64)
609        .max(1) as u64;
610    let result = workflow_update_for_base(
611        &target.base_dir,
612        &target.workflow_id,
613        &name,
614        payload,
615        StdDuration::from_millis(timeout_ms),
616    )
617    .await
618    .map_err(VmError::Runtime)?;
619    Ok(crate::stdlib::json_to_vm_value(&result))
620}
621
622/// Publish a workflow query value.
623#[harn_builtin(
624    sig = "workflow.publish_query(target: string|dict, name: string, value?: any) -> dict",
625    category = "workflow.messages"
626)]
627fn workflow_publish_query_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
628    let target = parse_target_vm(args.first(), None, "workflow.publish_query")?;
629    let name = args
630        .get(1)
631        .map(|value| value.display())
632        .filter(|value| !value.is_empty())
633        .ok_or_else(|| VmError::Runtime("workflow.publish_query: missing name".to_string()))?;
634    let value = args
635        .get(2)
636        .map(crate::llm::vm_value_to_json)
637        .unwrap_or(serde_json::Value::Null);
638    let result =
639        workflow_publish_query_for_base(&target.base_dir, &target.workflow_id, &name, value)
640            .map_err(VmError::Runtime)?;
641    Ok(crate::stdlib::json_to_vm_value(&result))
642}
643
644/// Receive the next workflow mailbox message.
645#[harn_builtin(
646    sig = "workflow.receive(target: string|dict) -> dict|nil",
647    category = "workflow.messages"
648)]
649fn workflow_receive_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
650    let target = parse_target_vm(args.first(), None, "workflow.receive")?;
651    let Some(message) = receive_message(&target).map_err(VmError::Runtime)? else {
652        return Ok(VmValue::Nil);
653    };
654    Ok(crate::stdlib::json_to_vm_value(&serde_json::json!({
655        "workflow_id": target.workflow_id,
656        "seq": message.seq,
657        "kind": message.kind,
658        "name": message.name,
659        "request_id": message.request_id,
660        "payload": message.payload,
661        "enqueued_at": message.enqueued_at,
662    })))
663}
664
665/// Respond to a pending workflow update request.
666#[harn_builtin(
667    sig = "workflow.respond_update(target: string|dict, request_id: string, value?: any, name?: string|nil) -> dict",
668    category = "workflow.messages"
669)]
670fn workflow_respond_update_builtin(
671    args: &[VmValue],
672    _out: &mut String,
673) -> Result<VmValue, VmError> {
674    let target = parse_target_vm(args.first(), None, "workflow.respond_update")?;
675    let request_id = args
676        .get(1)
677        .map(|value| value.display())
678        .filter(|value| !value.is_empty())
679        .ok_or_else(|| {
680            VmError::Runtime("workflow.respond_update: missing request id".to_string())
681        })?;
682    let value = args
683        .get(2)
684        .map(crate::llm::vm_value_to_json)
685        .unwrap_or(serde_json::Value::Null);
686    let name = args
687        .get(3)
688        .map(|value| value.display())
689        .filter(|value| !value.is_empty());
690    let result = workflow_respond_update_for_base(
691        &target.base_dir,
692        &target.workflow_id,
693        &request_id,
694        name.as_deref(),
695        value,
696    )
697    .map_err(VmError::Runtime)?;
698    Ok(crate::stdlib::json_to_vm_value(&result))
699}
700
701/// Pause a workflow mailbox.
702#[harn_builtin(
703    sig = "workflow.pause(target: string|dict) -> dict",
704    category = "workflow.messages"
705)]
706fn workflow_pause_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
707    let target = parse_target_vm(args.first(), None, "workflow.pause")?;
708    let result =
709        workflow_pause_for_base(&target.base_dir, &target.workflow_id).map_err(VmError::Runtime)?;
710    Ok(crate::stdlib::json_to_vm_value(&result))
711}
712
713/// Resume a workflow mailbox.
714#[harn_builtin(
715    sig = "workflow.resume(target: string|dict) -> dict",
716    category = "workflow.messages"
717)]
718fn workflow_resume_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
719    let target = parse_target_vm(args.first(), None, "workflow.resume")?;
720    let result = workflow_resume_for_base(&target.base_dir, &target.workflow_id)
721        .map_err(VmError::Runtime)?;
722    Ok(crate::stdlib::json_to_vm_value(&result))
723}
724
725/// Return workflow mailbox status.
726#[harn_builtin(
727    sig = "workflow.status(target: string|dict) -> dict",
728    category = "workflow.messages"
729)]
730fn workflow_status_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
731    let target = parse_target_vm(args.first(), None, "workflow.status")?;
732    let state = load_state(&target).map_err(VmError::Runtime)?;
733    Ok(crate::stdlib::json_to_vm_value(&workflow_status_json(
734        &target, &state,
735    )))
736}
737
738/// Advance a workflow mailbox generation.
739#[harn_builtin(
740    sig = "workflow.continue_as_new(target: string|dict) -> dict",
741    category = "workflow.messages"
742)]
743fn workflow_continue_as_new_builtin(
744    args: &[VmValue],
745    _out: &mut String,
746) -> Result<VmValue, VmError> {
747    continue_as_new_for_label(args, "workflow.continue_as_new")
748}
749
750/// Advance a workflow mailbox generation (top-level alias).
751#[harn_builtin(
752    sig = "continue_as_new(target: string|dict) -> dict",
753    category = "workflow.messages"
754)]
755fn continue_as_new_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
756    continue_as_new_for_label(args, "continue_as_new")
757}
758
759fn continue_as_new_for_label(args: &[VmValue], label: &str) -> Result<VmValue, VmError> {
760    let target = parse_target_vm(args.first(), None, label)?;
761    let mut state = load_state(&target).map_err(VmError::Runtime)?;
762    state.generation += 1;
763    state.continue_as_new_count += 1;
764    state.last_continue_as_new_at = Some(now_rfc3339());
765    state.responses.clear();
766    save_state(&target, &state).map_err(VmError::Runtime)?;
767    Ok(crate::stdlib::json_to_vm_value(&workflow_status_json(
768        &target, &state,
769    )))
770}
771
772#[cfg(test)]
773mod tests {
774    use super::*;
775    use std::task::Poll;
776
777    #[tokio::test(start_paused = true)]
778    async fn update_round_trip_waits_for_response() {
779        let dir = tempfile::tempdir().expect("tempdir");
780        let workflow_id = "wf-update";
781        let base_dir = dir.path().to_path_buf();
782        let target = target_for_base(&base_dir, workflow_id);
783        let request_id =
784            enqueue_update_request(&target, "adjust_budget", serde_json::json!({"max_usd": 10}))
785                .expect("enqueue update");
786
787        let message = receive_message(&target)
788            .expect("receive queued update")
789            .expect("queued update");
790        assert_eq!(message.kind, "update");
791        assert_eq!(message.name, "adjust_budget");
792        assert_eq!(message.request_id.as_deref(), Some(request_id.as_str()));
793        assert_eq!(
794            update_response_value(&target, &request_id).expect("read response"),
795            None
796        );
797
798        let waiter = wait_for_update_response(
799            &target,
800            "adjust_budget",
801            &request_id,
802            StdDuration::from_millis(500),
803        );
804        tokio::pin!(waiter);
805        assert!(matches!(futures::poll!(&mut waiter), Poll::Pending));
806
807        workflow_respond_update_for_base(
808            &base_dir,
809            workflow_id,
810            &request_id,
811            Some("adjust_budget"),
812            serde_json::json!({"ok": true}),
813        )
814        .expect("save response");
815        assert_eq!(
816            update_response_value(&target, &request_id).expect("read response"),
817            Some(serde_json::json!({"ok": true}))
818        );
819        tokio::time::advance(StdDuration::from_millis(UPDATE_POLL_INTERVAL_MS)).await;
820
821        let result = waiter.await.expect("update result");
822        assert_eq!(result, serde_json::json!({"ok": true}));
823    }
824
825    #[tokio::test(start_paused = true)]
826    async fn update_wait_respects_short_timeout() {
827        let dir = tempfile::tempdir().expect("tempdir");
828        let target = target_for_base(dir.path(), "wf-timeout");
829        let request_id =
830            enqueue_update_request(&target, "adjust_budget", serde_json::json!({"max_usd": 10}))
831                .expect("enqueue update");
832
833        let waiter = wait_for_update_response(
834            &target,
835            "adjust_budget",
836            &request_id,
837            StdDuration::from_millis(10),
838        );
839        tokio::pin!(waiter);
840        assert!(matches!(futures::poll!(&mut waiter), Poll::Pending));
841
842        tokio::time::advance(StdDuration::from_millis(9)).await;
843        assert!(matches!(futures::poll!(&mut waiter), Poll::Pending));
844
845        tokio::time::advance(StdDuration::from_millis(1)).await;
846        let err = waiter.await.expect_err("update should time out");
847        assert!(err.contains("timed out"));
848    }
849
850    #[tokio::test]
851    async fn update_wait_propagates_state_errors() {
852        let dir = tempfile::tempdir().expect("tempdir");
853        let target = target_for_base(dir.path(), "wf-corrupt");
854        std::fs::create_dir_all(workflow_target_root(&target)).expect("state dir");
855        std::fs::write(workflow_state_path(&target), "{not json").expect("state write");
856
857        let err = wait_for_update_response(
858            &target,
859            "adjust_budget",
860            "request-1",
861            StdDuration::from_millis(10),
862        )
863        .await
864        .expect_err("corrupt state should fail immediately");
865        assert!(err.contains("workflow state parse error"));
866    }
867
868    #[test]
869    fn workflow_ids_preserve_namespace_without_path_segments() {
870        assert_eq!(
871            sanitize_workflow_id("workflow://local/start-my-day"),
872            "workflow___local_start-my-day"
873        );
874        assert_eq!(sanitize_workflow_id("../start-my-day"), ".._start-my-day");
875        assert_eq!(sanitize_workflow_id(".."), "workflow");
876    }
877
878    #[test]
879    fn persisted_path_drives_target_base_dir() {
880        let base = parse_target_json(
881            &serde_json::json!({
882                "workflow_id": "wf",
883                "persisted_path": "/tmp/demo/.harn-runs/run.json"
884            }),
885            None,
886        )
887        .expect("target");
888        assert_eq!(base.workflow_id, "wf");
889        assert_eq!(base.base_dir, PathBuf::from("/tmp/demo"));
890    }
891
892    #[test]
893    fn nested_persisted_path_drives_target_base_dir() {
894        let base = parse_target_json(
895            &serde_json::json!({
896                "workflow_id": "wf",
897                "persisted_path": "/tmp/demo/.harn-runs/session/run.json"
898            }),
899            None,
900        )
901        .expect("target");
902        assert_eq!(base.base_dir, PathBuf::from("/tmp/demo"));
903
904        let relative = parse_target_json(
905            &serde_json::json!({
906                "workflow_id": "wf",
907                "persisted_path": ".harn-runs/session/run.json"
908            }),
909            None,
910        )
911        .expect("target");
912        assert_eq!(relative.base_dir, PathBuf::from("."));
913    }
914}