Skip to main content

harn_vm/stdlib/
workflow_messages.rs

1use std::collections::{BTreeMap, VecDeque};
2use std::path::{Path, PathBuf};
3use std::rc::Rc;
4use std::time::Duration as StdDuration;
5
6use serde::{Deserialize, Serialize};
7
8use crate::stdlib::macros::{harn_builtin, VmBuiltinDef};
9use crate::stdlib::process::runtime_root_base;
10use crate::value::{VmError, VmValue};
11use crate::vm::Vm;
12
13const DEFAULT_UPDATE_TIMEOUT_MS: u64 = 30_000;
14const UPDATE_POLL_INTERVAL_MS: u64 = 25;
15
16pub(crate) const MODULE_BUILTINS: &[&VmBuiltinDef] = &[
17    &WORKFLOW_SIGNAL_BUILTIN_DEF,
18    &WORKFLOW_QUERY_BUILTIN_DEF,
19    &WORKFLOW_PUBLISH_QUERY_BUILTIN_DEF,
20    &WORKFLOW_RECEIVE_BUILTIN_DEF,
21    &WORKFLOW_RESPOND_UPDATE_BUILTIN_DEF,
22    &WORKFLOW_PAUSE_BUILTIN_DEF,
23    &WORKFLOW_RESUME_BUILTIN_DEF,
24    &WORKFLOW_STATUS_BUILTIN_DEF,
25    &WORKFLOW_CONTINUE_AS_NEW_BUILTIN_DEF,
26    &CONTINUE_AS_NEW_BUILTIN_DEF,
27    &WORKFLOW_UPDATE_BUILTIN_DEF,
28];
29
30#[derive(Clone, Debug, Serialize, Deserialize)]
31pub struct WorkflowMessageRecord {
32    pub seq: u64,
33    pub kind: String,
34    pub name: String,
35    #[serde(skip_serializing_if = "Option::is_none")]
36    pub request_id: Option<String>,
37    pub payload: serde_json::Value,
38    pub enqueued_at: String,
39}
40
41#[derive(Clone, Debug, Serialize, Deserialize)]
42pub struct WorkflowQueryRecord {
43    pub value: serde_json::Value,
44    pub published_at: String,
45}
46
47#[derive(Clone, Debug, Serialize, Deserialize)]
48pub struct WorkflowUpdateResponseRecord {
49    pub request_id: String,
50    #[serde(skip_serializing_if = "Option::is_none")]
51    pub name: Option<String>,
52    pub value: serde_json::Value,
53    pub responded_at: String,
54}
55
56#[derive(Clone, Debug, Serialize, Deserialize)]
57pub struct WorkflowMailboxState {
58    #[serde(rename = "_type")]
59    pub type_name: String,
60    pub workflow_id: String,
61    #[serde(default = "default_generation")]
62    pub generation: u64,
63    #[serde(default)]
64    pub continue_as_new_count: u64,
65    #[serde(skip_serializing_if = "Option::is_none")]
66    pub last_continue_as_new_at: Option<String>,
67    #[serde(default)]
68    pub paused: bool,
69    #[serde(default)]
70    pub next_seq: u64,
71    #[serde(default)]
72    pub mailbox: VecDeque<WorkflowMessageRecord>,
73    #[serde(default)]
74    pub queries: BTreeMap<String, WorkflowQueryRecord>,
75    #[serde(default)]
76    pub responses: BTreeMap<String, WorkflowUpdateResponseRecord>,
77}
78
79impl Default for WorkflowMailboxState {
80    fn default() -> Self {
81        Self {
82            type_name: "workflow_mailbox".to_string(),
83            workflow_id: String::new(),
84            generation: default_generation(),
85            continue_as_new_count: 0,
86            last_continue_as_new_at: None,
87            paused: false,
88            next_seq: 0,
89            mailbox: VecDeque::new(),
90            queries: BTreeMap::new(),
91            responses: BTreeMap::new(),
92        }
93    }
94}
95
96fn default_generation() -> u64 {
97    1
98}
99
100#[derive(Clone, Debug, PartialEq, Eq)]
101struct WorkflowTarget {
102    workflow_id: String,
103    base_dir: PathBuf,
104}
105
106fn sanitize_workflow_id(raw: &str) -> String {
107    let trimmed = raw.trim();
108    let mut sanitized = String::with_capacity(trimmed.len());
109    for ch in trimmed.chars() {
110        if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_' | '.') {
111            sanitized.push(ch);
112        } else {
113            sanitized.push('_');
114        }
115    }
116    if sanitized.is_empty() || sanitized == "." || sanitized == ".." {
117        "workflow".to_string()
118    } else {
119        sanitized
120    }
121}
122
123fn workflow_base_dir_from_persisted_path(path: &Path) -> PathBuf {
124    for ancestor in path.ancestors() {
125        if ancestor.file_name().and_then(|value| value.to_str()) == Some(".harn-runs") {
126            return non_empty_path_or_dot(ancestor.parent().unwrap_or_else(|| Path::new(".")));
127        }
128    }
129    non_empty_path_or_dot(path.parent().unwrap_or_else(|| Path::new(".")))
130}
131
132fn non_empty_path_or_dot(path: &Path) -> PathBuf {
133    if path.as_os_str().is_empty() {
134        PathBuf::from(".")
135    } else {
136        path.to_path_buf()
137    }
138}
139
140fn workflow_target_root(target: &WorkflowTarget) -> PathBuf {
141    crate::runtime_paths::workflow_dir(&target.base_dir).join(&target.workflow_id)
142}
143
144fn workflow_state_path(target: &WorkflowTarget) -> PathBuf {
145    workflow_target_root(target).join("state.json")
146}
147
148fn now_rfc3339() -> String {
149    time::OffsetDateTime::now_utc()
150        .format(&time::format_description::well_known::Rfc3339)
151        .unwrap_or_else(|_| uuid::Uuid::now_v7().to_string())
152}
153
154fn load_state(target: &WorkflowTarget) -> Result<WorkflowMailboxState, String> {
155    let path = workflow_state_path(target);
156    if !path.exists() {
157        return Ok(WorkflowMailboxState {
158            workflow_id: target.workflow_id.clone(),
159            ..WorkflowMailboxState::default()
160        });
161    }
162    let text = std::fs::read_to_string(&path)
163        .map_err(|error| format!("workflow state read error: {error}"))?;
164    let mut state: WorkflowMailboxState = serde_json::from_str(&text)
165        .map_err(|error| format!("workflow state parse error: {error}"))?;
166    if state.type_name.is_empty() {
167        state.type_name = "workflow_mailbox".to_string();
168    }
169    if state.workflow_id.is_empty() {
170        state.workflow_id = target.workflow_id.clone();
171    }
172    if state.generation == 0 {
173        state.generation = 1;
174    }
175    Ok(state)
176}
177
178fn save_state(target: &WorkflowTarget, state: &WorkflowMailboxState) -> Result<(), String> {
179    let path = workflow_state_path(target);
180    let json = serde_json::to_string_pretty(state)
181        .map_err(|error| format!("workflow state encode error: {error}"))?;
182    crate::atomic_io::atomic_write(&path, json.as_bytes())
183        .map_err(|error| format!("workflow state write error: {error}"))
184}
185
186fn parse_target_json(
187    value: &serde_json::Value,
188    fallback_base_dir: Option<&Path>,
189) -> Option<WorkflowTarget> {
190    match value {
191        serde_json::Value::String(text) => Some(WorkflowTarget {
192            workflow_id: sanitize_workflow_id(text),
193            base_dir: fallback_base_dir
194                .map(Path::to_path_buf)
195                .unwrap_or_else(runtime_root_base),
196        }),
197        serde_json::Value::Object(map) => {
198            let workflow_id = map
199                .get("workflow_id")
200                .and_then(|value| value.as_str())
201                .or_else(|| map.get("workflow").and_then(|value| value.as_str()))
202                .or_else(|| {
203                    map.get("run")
204                        .and_then(|value| value.get("workflow_id"))
205                        .and_then(|value| value.as_str())
206                })
207                .or_else(|| {
208                    map.get("result")
209                        .and_then(|value| value.get("run"))
210                        .and_then(|value| value.get("workflow_id"))
211                        .and_then(|value| value.as_str())
212                })?;
213            let explicit_base = map
214                .get("base_dir")
215                .and_then(|value| value.as_str())
216                .filter(|value| !value.trim().is_empty())
217                .map(PathBuf::from);
218            let persisted_path = map
219                .get("persisted_path")
220                .and_then(|value| value.as_str())
221                .or_else(|| map.get("path").and_then(|value| value.as_str()))
222                .or_else(|| {
223                    map.get("run")
224                        .and_then(|value| value.get("persisted_path"))
225                        .and_then(|value| value.as_str())
226                })
227                .or_else(|| {
228                    map.get("result")
229                        .and_then(|value| value.get("run"))
230                        .and_then(|value| value.get("persisted_path"))
231                        .and_then(|value| value.as_str())
232                });
233            let base_dir = explicit_base
234                .or_else(|| {
235                    persisted_path
236                        .map(|path| workflow_base_dir_from_persisted_path(Path::new(path)))
237                })
238                .or_else(|| fallback_base_dir.map(Path::to_path_buf))
239                .unwrap_or_else(runtime_root_base);
240            Some(WorkflowTarget {
241                workflow_id: sanitize_workflow_id(workflow_id),
242                base_dir,
243            })
244        }
245        _ => None,
246    }
247}
248
249fn parse_target_vm(
250    value: Option<&VmValue>,
251    fallback_base_dir: Option<&Path>,
252    builtin: &str,
253) -> Result<WorkflowTarget, VmError> {
254    let value = value.ok_or_else(|| VmError::Runtime(format!("{builtin}: missing target")))?;
255    parse_target_json(&crate::llm::vm_value_to_json(value), fallback_base_dir).ok_or_else(|| {
256        VmError::Runtime(format!(
257            "{builtin}: target must be a workflow id string or dict with workflow_id/workflow"
258        ))
259    })
260}
261
262fn workflow_status_json(
263    target: &WorkflowTarget,
264    state: &WorkflowMailboxState,
265) -> serde_json::Value {
266    serde_json::json!({
267        "workflow_id": target.workflow_id,
268        "base_dir": target.base_dir.to_string_lossy(),
269        "generation": state.generation,
270        "paused": state.paused,
271        "pending_count": state.mailbox.len(),
272        "query_count": state.queries.len(),
273        "response_count": state.responses.len(),
274        "continue_as_new_count": state.continue_as_new_count,
275        "last_continue_as_new_at": state.last_continue_as_new_at,
276    })
277}
278
279fn target_for_base(base_dir: &Path, workflow_id: &str) -> WorkflowTarget {
280    WorkflowTarget {
281        workflow_id: sanitize_workflow_id(workflow_id),
282        base_dir: base_dir.to_path_buf(),
283    }
284}
285
286fn enqueue_message(
287    target: &WorkflowTarget,
288    kind: &str,
289    name: &str,
290    payload: serde_json::Value,
291    request_id: Option<String>,
292) -> Result<serde_json::Value, String> {
293    let mut state = load_state(target)?;
294    let message = push_message(&mut state, kind, name, payload, request_id);
295    save_state(target, &state)?;
296    Ok(serde_json::json!({
297        "workflow_id": target.workflow_id,
298        "message": message,
299        "status": workflow_status_json(target, &state),
300    }))
301}
302
303fn push_message(
304    state: &mut WorkflowMailboxState,
305    kind: &str,
306    name: &str,
307    payload: serde_json::Value,
308    request_id: Option<String>,
309) -> WorkflowMessageRecord {
310    state.next_seq += 1;
311    let message = WorkflowMessageRecord {
312        seq: state.next_seq,
313        kind: kind.to_string(),
314        name: name.to_string(),
315        request_id,
316        payload,
317        enqueued_at: now_rfc3339(),
318    };
319    state.mailbox.push_back(message.clone());
320    message
321}
322
323fn receive_message(target: &WorkflowTarget) -> Result<Option<WorkflowMessageRecord>, String> {
324    let mut state = load_state(target)?;
325    let message = state.mailbox.pop_front();
326    if message.is_some() {
327        save_state(target, &state)?;
328    }
329    Ok(message)
330}
331
332pub fn workflow_signal_for_base(
333    base_dir: &Path,
334    workflow_id: &str,
335    name: &str,
336    payload: serde_json::Value,
337) -> Result<serde_json::Value, String> {
338    let target = target_for_base(base_dir, workflow_id);
339    enqueue_message(&target, "signal", name, payload, None)
340}
341
342pub fn workflow_query_for_base(
343    base_dir: &Path,
344    workflow_id: &str,
345    name: &str,
346) -> Result<serde_json::Value, String> {
347    let target = target_for_base(base_dir, workflow_id);
348    let state = load_state(&target)?;
349    Ok(state
350        .queries
351        .get(name)
352        .map(|record| record.value.clone())
353        .unwrap_or(serde_json::Value::Null))
354}
355
356pub fn workflow_publish_query_for_base(
357    base_dir: &Path,
358    workflow_id: &str,
359    name: &str,
360    value: serde_json::Value,
361) -> Result<serde_json::Value, String> {
362    let target = target_for_base(base_dir, workflow_id);
363    let mut state = load_state(&target)?;
364    state.queries.insert(
365        name.to_string(),
366        WorkflowQueryRecord {
367            value,
368            published_at: now_rfc3339(),
369        },
370    );
371    save_state(&target, &state)?;
372    Ok(workflow_status_json(&target, &state))
373}
374
375pub fn workflow_pause_for_base(
376    base_dir: &Path,
377    workflow_id: &str,
378) -> Result<serde_json::Value, String> {
379    let target = target_for_base(base_dir, workflow_id);
380    let mut state = load_state(&target)?;
381    state.paused = true;
382    push_message(&mut state, "control", "pause", serde_json::json!({}), None);
383    save_state(&target, &state)?;
384    Ok(workflow_status_json(&target, &state))
385}
386
387pub fn workflow_resume_for_base(
388    base_dir: &Path,
389    workflow_id: &str,
390) -> Result<serde_json::Value, String> {
391    let target = target_for_base(base_dir, workflow_id);
392    let mut state = load_state(&target)?;
393    state.paused = false;
394    push_message(&mut state, "control", "resume", serde_json::json!({}), None);
395    save_state(&target, &state)?;
396    Ok(workflow_status_json(&target, &state))
397}
398
399pub async fn workflow_update_for_base(
400    base_dir: &Path,
401    workflow_id: &str,
402    name: &str,
403    payload: serde_json::Value,
404    timeout: StdDuration,
405) -> Result<serde_json::Value, String> {
406    let target = target_for_base(base_dir, workflow_id);
407    let request_id = enqueue_update_request(&target, name, payload)?;
408    wait_for_update_response(&target, name, &request_id, timeout).await
409}
410
411fn enqueue_update_request(
412    target: &WorkflowTarget,
413    name: &str,
414    payload: serde_json::Value,
415) -> Result<String, String> {
416    let request_id = uuid::Uuid::now_v7().to_string();
417    enqueue_message(target, "update", name, payload, Some(request_id.clone()))?;
418    Ok(request_id)
419}
420
421async fn wait_for_update_response(
422    target: &WorkflowTarget,
423    name: &str,
424    request_id: &str,
425    timeout: StdDuration,
426) -> Result<serde_json::Value, String> {
427    let deadline = tokio::time::Instant::now() + timeout;
428    loop {
429        match update_response_value(target, request_id) {
430            Ok(Some(value)) => return Ok(value),
431            Ok(None) => {}
432            Err(error) => return Err(error),
433        }
434
435        let now = tokio::time::Instant::now();
436        if now >= deadline {
437            break;
438        }
439        let next_poll = now + StdDuration::from_millis(UPDATE_POLL_INTERVAL_MS);
440        tokio::time::sleep_until(next_poll.min(deadline)).await;
441    }
442    Err(format!(
443        "workflow update '{name}' timed out for '{}'",
444        target.workflow_id
445    ))
446}
447
448fn update_response_value(
449    target: &WorkflowTarget,
450    request_id: &str,
451) -> Result<Option<serde_json::Value>, String> {
452    Ok(load_state(target)?
453        .responses
454        .get(request_id)
455        .map(|response| response.value.clone()))
456}
457
458pub fn workflow_respond_update_for_base(
459    base_dir: &Path,
460    workflow_id: &str,
461    request_id: &str,
462    name: Option<&str>,
463    value: serde_json::Value,
464) -> Result<serde_json::Value, String> {
465    let target = target_for_base(base_dir, workflow_id);
466    record_update_response(&target, request_id, name, value)
467}
468
469fn record_update_response(
470    target: &WorkflowTarget,
471    request_id: &str,
472    name: Option<&str>,
473    value: serde_json::Value,
474) -> Result<serde_json::Value, String> {
475    let mut state = load_state(target)?;
476    state.responses.insert(
477        request_id.to_string(),
478        WorkflowUpdateResponseRecord {
479            request_id: request_id.to_string(),
480            name: name.map(ToString::to_string),
481            value,
482            responded_at: now_rfc3339(),
483        },
484    );
485    save_state(target, &state)?;
486    Ok(workflow_status_json(target, &state))
487}
488
489pub(crate) fn register_workflow_message_builtins(vm: &mut Vm) {
490    vm.set_global(
491        "workflow",
492        VmValue::Dict(Rc::new(BTreeMap::from([
493            (
494                "signal".to_string(),
495                VmValue::BuiltinRef(Rc::from("workflow.signal")),
496            ),
497            (
498                "query".to_string(),
499                VmValue::BuiltinRef(Rc::from("workflow.query")),
500            ),
501            (
502                "update".to_string(),
503                VmValue::BuiltinRef(Rc::from("workflow.update")),
504            ),
505            (
506                "publish_query".to_string(),
507                VmValue::BuiltinRef(Rc::from("workflow.publish_query")),
508            ),
509            (
510                "receive".to_string(),
511                VmValue::BuiltinRef(Rc::from("workflow.receive")),
512            ),
513            (
514                "respond_update".to_string(),
515                VmValue::BuiltinRef(Rc::from("workflow.respond_update")),
516            ),
517            (
518                "pause".to_string(),
519                VmValue::BuiltinRef(Rc::from("workflow.pause")),
520            ),
521            (
522                "resume".to_string(),
523                VmValue::BuiltinRef(Rc::from("workflow.resume")),
524            ),
525            (
526                "status".to_string(),
527                VmValue::BuiltinRef(Rc::from("workflow.status")),
528            ),
529            (
530                "continue_as_new".to_string(),
531                VmValue::BuiltinRef(Rc::from("workflow.continue_as_new")),
532            ),
533        ]))),
534    );
535
536    for def in MODULE_BUILTINS {
537        vm.register_builtin_def(def);
538    }
539}
540
541/// Enqueue a workflow signal message.
542#[harn_builtin(
543    sig = "workflow.signal(target: string|dict, name: string, payload?: any) -> dict",
544    category = "workflow.messages"
545)]
546fn workflow_signal_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
547    let target = parse_target_vm(args.first(), None, "workflow.signal")?;
548    let name = args
549        .get(1)
550        .map(|value| value.display())
551        .filter(|value| !value.is_empty())
552        .ok_or_else(|| VmError::Runtime("workflow.signal: missing name".to_string()))?;
553    let payload = args
554        .get(2)
555        .map(crate::llm::vm_value_to_json)
556        .unwrap_or(serde_json::Value::Null);
557    let result =
558        enqueue_message(&target, "signal", &name, payload, None).map_err(VmError::Runtime)?;
559    Ok(crate::stdlib::json_to_vm_value(&result))
560}
561
562/// Read the latest published workflow query value.
563#[harn_builtin(
564    sig = "workflow.query(target: string|dict, name: string) -> any",
565    category = "workflow.messages"
566)]
567fn workflow_query_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
568    let target = parse_target_vm(args.first(), None, "workflow.query")?;
569    let name = args
570        .get(1)
571        .map(|value| value.display())
572        .filter(|value| !value.is_empty())
573        .ok_or_else(|| VmError::Runtime("workflow.query: missing name".to_string()))?;
574    let state = load_state(&target).map_err(VmError::Runtime)?;
575    Ok(crate::stdlib::json_to_vm_value(
576        &state
577            .queries
578            .get(&name)
579            .map(|record| record.value.clone())
580            .unwrap_or(serde_json::Value::Null),
581    ))
582}
583
584/// Enqueue a workflow update and wait for a response.
585#[harn_builtin(
586    sig = "workflow.update(target: string|dict, name: string, payload?: any, options?: dict|nil) -> any",
587    kind = "async",
588    category = "workflow.messages"
589)]
590async fn workflow_update_builtin(args: Vec<VmValue>) -> Result<VmValue, VmError> {
591    let target = parse_target_vm(args.first(), None, "workflow.update")?;
592    let name = args
593        .get(1)
594        .map(|value| value.display())
595        .filter(|value| !value.is_empty())
596        .ok_or_else(|| VmError::Runtime("workflow.update: missing name".to_string()))?;
597    let payload = args
598        .get(2)
599        .map(crate::llm::vm_value_to_json)
600        .unwrap_or(serde_json::Value::Null);
601    let timeout_ms = args
602        .get(3)
603        .and_then(|value| value.as_dict())
604        .and_then(|dict| dict.get("timeout_ms"))
605        .and_then(VmValue::as_int)
606        .unwrap_or(DEFAULT_UPDATE_TIMEOUT_MS as i64)
607        .max(1) as u64;
608    let result = workflow_update_for_base(
609        &target.base_dir,
610        &target.workflow_id,
611        &name,
612        payload,
613        StdDuration::from_millis(timeout_ms),
614    )
615    .await
616    .map_err(VmError::Runtime)?;
617    Ok(crate::stdlib::json_to_vm_value(&result))
618}
619
620/// Publish a workflow query value.
621#[harn_builtin(
622    sig = "workflow.publish_query(target: string|dict, name: string, value?: any) -> dict",
623    category = "workflow.messages"
624)]
625fn workflow_publish_query_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
626    let target = parse_target_vm(args.first(), None, "workflow.publish_query")?;
627    let name = args
628        .get(1)
629        .map(|value| value.display())
630        .filter(|value| !value.is_empty())
631        .ok_or_else(|| VmError::Runtime("workflow.publish_query: missing name".to_string()))?;
632    let value = args
633        .get(2)
634        .map(crate::llm::vm_value_to_json)
635        .unwrap_or(serde_json::Value::Null);
636    let result =
637        workflow_publish_query_for_base(&target.base_dir, &target.workflow_id, &name, value)
638            .map_err(VmError::Runtime)?;
639    Ok(crate::stdlib::json_to_vm_value(&result))
640}
641
642/// Receive the next workflow mailbox message.
643#[harn_builtin(
644    sig = "workflow.receive(target: string|dict) -> dict|nil",
645    category = "workflow.messages"
646)]
647fn workflow_receive_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
648    let target = parse_target_vm(args.first(), None, "workflow.receive")?;
649    let Some(message) = receive_message(&target).map_err(VmError::Runtime)? else {
650        return Ok(VmValue::Nil);
651    };
652    Ok(crate::stdlib::json_to_vm_value(&serde_json::json!({
653        "workflow_id": target.workflow_id,
654        "seq": message.seq,
655        "kind": message.kind,
656        "name": message.name,
657        "request_id": message.request_id,
658        "payload": message.payload,
659        "enqueued_at": message.enqueued_at,
660    })))
661}
662
663/// Respond to a pending workflow update request.
664#[harn_builtin(
665    sig = "workflow.respond_update(target: string|dict, request_id: string, value?: any, name?: string|nil) -> dict",
666    category = "workflow.messages"
667)]
668fn workflow_respond_update_builtin(
669    args: &[VmValue],
670    _out: &mut String,
671) -> Result<VmValue, VmError> {
672    let target = parse_target_vm(args.first(), None, "workflow.respond_update")?;
673    let request_id = args
674        .get(1)
675        .map(|value| value.display())
676        .filter(|value| !value.is_empty())
677        .ok_or_else(|| {
678            VmError::Runtime("workflow.respond_update: missing request id".to_string())
679        })?;
680    let value = args
681        .get(2)
682        .map(crate::llm::vm_value_to_json)
683        .unwrap_or(serde_json::Value::Null);
684    let name = args
685        .get(3)
686        .map(|value| value.display())
687        .filter(|value| !value.is_empty());
688    let result = workflow_respond_update_for_base(
689        &target.base_dir,
690        &target.workflow_id,
691        &request_id,
692        name.as_deref(),
693        value,
694    )
695    .map_err(VmError::Runtime)?;
696    Ok(crate::stdlib::json_to_vm_value(&result))
697}
698
699/// Pause a workflow mailbox.
700#[harn_builtin(
701    sig = "workflow.pause(target: string|dict) -> dict",
702    category = "workflow.messages"
703)]
704fn workflow_pause_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
705    let target = parse_target_vm(args.first(), None, "workflow.pause")?;
706    let result =
707        workflow_pause_for_base(&target.base_dir, &target.workflow_id).map_err(VmError::Runtime)?;
708    Ok(crate::stdlib::json_to_vm_value(&result))
709}
710
711/// Resume a workflow mailbox.
712#[harn_builtin(
713    sig = "workflow.resume(target: string|dict) -> dict",
714    category = "workflow.messages"
715)]
716fn workflow_resume_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
717    let target = parse_target_vm(args.first(), None, "workflow.resume")?;
718    let result = workflow_resume_for_base(&target.base_dir, &target.workflow_id)
719        .map_err(VmError::Runtime)?;
720    Ok(crate::stdlib::json_to_vm_value(&result))
721}
722
723/// Return workflow mailbox status.
724#[harn_builtin(
725    sig = "workflow.status(target: string|dict) -> dict",
726    category = "workflow.messages"
727)]
728fn workflow_status_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
729    let target = parse_target_vm(args.first(), None, "workflow.status")?;
730    let state = load_state(&target).map_err(VmError::Runtime)?;
731    Ok(crate::stdlib::json_to_vm_value(&workflow_status_json(
732        &target, &state,
733    )))
734}
735
736/// Advance a workflow mailbox generation.
737#[harn_builtin(
738    sig = "workflow.continue_as_new(target: string|dict) -> dict",
739    category = "workflow.messages"
740)]
741fn workflow_continue_as_new_builtin(
742    args: &[VmValue],
743    _out: &mut String,
744) -> Result<VmValue, VmError> {
745    continue_as_new_for_label(args, "workflow.continue_as_new")
746}
747
748/// Advance a workflow mailbox generation (top-level alias).
749#[harn_builtin(
750    sig = "continue_as_new(target: string|dict) -> dict",
751    category = "workflow.messages"
752)]
753fn continue_as_new_builtin(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
754    continue_as_new_for_label(args, "continue_as_new")
755}
756
757fn continue_as_new_for_label(args: &[VmValue], label: &str) -> Result<VmValue, VmError> {
758    let target = parse_target_vm(args.first(), None, label)?;
759    let mut state = load_state(&target).map_err(VmError::Runtime)?;
760    state.generation += 1;
761    state.continue_as_new_count += 1;
762    state.last_continue_as_new_at = Some(now_rfc3339());
763    state.responses.clear();
764    save_state(&target, &state).map_err(VmError::Runtime)?;
765    Ok(crate::stdlib::json_to_vm_value(&workflow_status_json(
766        &target, &state,
767    )))
768}
769
770#[cfg(test)]
771mod tests {
772    use super::*;
773    use std::task::Poll;
774
775    #[tokio::test(start_paused = true)]
776    async fn update_round_trip_waits_for_response() {
777        let dir = tempfile::tempdir().expect("tempdir");
778        let workflow_id = "wf-update";
779        let base_dir = dir.path().to_path_buf();
780        let target = target_for_base(&base_dir, workflow_id);
781        let request_id =
782            enqueue_update_request(&target, "adjust_budget", serde_json::json!({"max_usd": 10}))
783                .expect("enqueue update");
784
785        let message = receive_message(&target)
786            .expect("receive queued update")
787            .expect("queued update");
788        assert_eq!(message.kind, "update");
789        assert_eq!(message.name, "adjust_budget");
790        assert_eq!(message.request_id.as_deref(), Some(request_id.as_str()));
791        assert_eq!(
792            update_response_value(&target, &request_id).expect("read response"),
793            None
794        );
795
796        let waiter = wait_for_update_response(
797            &target,
798            "adjust_budget",
799            &request_id,
800            StdDuration::from_millis(500),
801        );
802        tokio::pin!(waiter);
803        assert!(matches!(futures::poll!(&mut waiter), Poll::Pending));
804
805        workflow_respond_update_for_base(
806            &base_dir,
807            workflow_id,
808            &request_id,
809            Some("adjust_budget"),
810            serde_json::json!({"ok": true}),
811        )
812        .expect("save response");
813        assert_eq!(
814            update_response_value(&target, &request_id).expect("read response"),
815            Some(serde_json::json!({"ok": true}))
816        );
817        tokio::time::advance(StdDuration::from_millis(UPDATE_POLL_INTERVAL_MS)).await;
818
819        let result = waiter.await.expect("update result");
820        assert_eq!(result, serde_json::json!({"ok": true}));
821    }
822
823    #[tokio::test(start_paused = true)]
824    async fn update_wait_respects_short_timeout() {
825        let dir = tempfile::tempdir().expect("tempdir");
826        let target = target_for_base(dir.path(), "wf-timeout");
827        let request_id =
828            enqueue_update_request(&target, "adjust_budget", serde_json::json!({"max_usd": 10}))
829                .expect("enqueue update");
830
831        let waiter = wait_for_update_response(
832            &target,
833            "adjust_budget",
834            &request_id,
835            StdDuration::from_millis(10),
836        );
837        tokio::pin!(waiter);
838        assert!(matches!(futures::poll!(&mut waiter), Poll::Pending));
839
840        tokio::time::advance(StdDuration::from_millis(9)).await;
841        assert!(matches!(futures::poll!(&mut waiter), Poll::Pending));
842
843        tokio::time::advance(StdDuration::from_millis(1)).await;
844        let err = waiter.await.expect_err("update should time out");
845        assert!(err.contains("timed out"));
846    }
847
848    #[tokio::test]
849    async fn update_wait_propagates_state_errors() {
850        let dir = tempfile::tempdir().expect("tempdir");
851        let target = target_for_base(dir.path(), "wf-corrupt");
852        std::fs::create_dir_all(workflow_target_root(&target)).expect("state dir");
853        std::fs::write(workflow_state_path(&target), "{not json").expect("state write");
854
855        let err = wait_for_update_response(
856            &target,
857            "adjust_budget",
858            "request-1",
859            StdDuration::from_millis(10),
860        )
861        .await
862        .expect_err("corrupt state should fail immediately");
863        assert!(err.contains("workflow state parse error"));
864    }
865
866    #[test]
867    fn workflow_ids_preserve_namespace_without_path_segments() {
868        assert_eq!(
869            sanitize_workflow_id("workflow://local/start-my-day"),
870            "workflow___local_start-my-day"
871        );
872        assert_eq!(sanitize_workflow_id("../start-my-day"), ".._start-my-day");
873        assert_eq!(sanitize_workflow_id(".."), "workflow");
874    }
875
876    #[test]
877    fn persisted_path_drives_target_base_dir() {
878        let base = parse_target_json(
879            &serde_json::json!({
880                "workflow_id": "wf",
881                "persisted_path": "/tmp/demo/.harn-runs/run.json"
882            }),
883            None,
884        )
885        .expect("target");
886        assert_eq!(base.workflow_id, "wf");
887        assert_eq!(base.base_dir, PathBuf::from("/tmp/demo"));
888    }
889
890    #[test]
891    fn nested_persisted_path_drives_target_base_dir() {
892        let base = parse_target_json(
893            &serde_json::json!({
894                "workflow_id": "wf",
895                "persisted_path": "/tmp/demo/.harn-runs/session/run.json"
896            }),
897            None,
898        )
899        .expect("target");
900        assert_eq!(base.base_dir, PathBuf::from("/tmp/demo"));
901
902        let relative = parse_target_json(
903            &serde_json::json!({
904                "workflow_id": "wf",
905                "persisted_path": ".harn-runs/session/run.json"
906            }),
907            None,
908        )
909        .expect("target");
910        assert_eq!(relative.base_dir, PathBuf::from("."));
911    }
912}