Skip to main content

harn_vm/stdlib/
workflow_messages.rs

1use std::collections::{BTreeMap, VecDeque};
2use std::path::{Path, PathBuf};
3use std::rc::Rc;
4use std::time::Duration as StdDuration;
5
6use serde::{Deserialize, Serialize};
7
8use crate::stdlib::process::runtime_root_base;
9use crate::value::{VmError, VmValue};
10use crate::vm::Vm;
11
12const DEFAULT_UPDATE_TIMEOUT_MS: u64 = 30_000;
13const UPDATE_POLL_INTERVAL_MS: u64 = 25;
14
15#[derive(Clone, Debug, Serialize, Deserialize)]
16pub struct WorkflowMessageRecord {
17    pub seq: u64,
18    pub kind: String,
19    pub name: String,
20    #[serde(skip_serializing_if = "Option::is_none")]
21    pub request_id: Option<String>,
22    pub payload: serde_json::Value,
23    pub enqueued_at: String,
24}
25
26#[derive(Clone, Debug, Serialize, Deserialize)]
27pub struct WorkflowQueryRecord {
28    pub value: serde_json::Value,
29    pub published_at: String,
30}
31
32#[derive(Clone, Debug, Serialize, Deserialize)]
33pub struct WorkflowUpdateResponseRecord {
34    pub request_id: String,
35    #[serde(skip_serializing_if = "Option::is_none")]
36    pub name: Option<String>,
37    pub value: serde_json::Value,
38    pub responded_at: String,
39}
40
41#[derive(Clone, Debug, Serialize, Deserialize)]
42pub struct WorkflowMailboxState {
43    #[serde(rename = "_type")]
44    pub type_name: String,
45    pub workflow_id: String,
46    #[serde(default = "default_generation")]
47    pub generation: u64,
48    #[serde(default)]
49    pub continue_as_new_count: u64,
50    #[serde(skip_serializing_if = "Option::is_none")]
51    pub last_continue_as_new_at: Option<String>,
52    #[serde(default)]
53    pub paused: bool,
54    #[serde(default)]
55    pub next_seq: u64,
56    #[serde(default)]
57    pub mailbox: VecDeque<WorkflowMessageRecord>,
58    #[serde(default)]
59    pub queries: BTreeMap<String, WorkflowQueryRecord>,
60    #[serde(default)]
61    pub responses: BTreeMap<String, WorkflowUpdateResponseRecord>,
62}
63
64impl Default for WorkflowMailboxState {
65    fn default() -> Self {
66        Self {
67            type_name: "workflow_mailbox".to_string(),
68            workflow_id: String::new(),
69            generation: default_generation(),
70            continue_as_new_count: 0,
71            last_continue_as_new_at: None,
72            paused: false,
73            next_seq: 0,
74            mailbox: VecDeque::new(),
75            queries: BTreeMap::new(),
76            responses: BTreeMap::new(),
77        }
78    }
79}
80
81fn default_generation() -> u64 {
82    1
83}
84
85#[derive(Clone, Debug, PartialEq, Eq)]
86struct WorkflowTarget {
87    workflow_id: String,
88    base_dir: PathBuf,
89}
90
91fn sanitize_workflow_id(raw: &str) -> String {
92    let trimmed = raw.trim();
93    let base = Path::new(trimmed)
94        .file_name()
95        .and_then(|value| value.to_str())
96        .unwrap_or(trimmed);
97    if base.is_empty() || base == "." || base == ".." {
98        "workflow".to_string()
99    } else {
100        base.to_string()
101    }
102}
103
104fn workflow_base_dir_from_persisted_path(path: &Path) -> PathBuf {
105    let parent = path.parent().unwrap_or_else(|| Path::new("."));
106    if parent.file_name().and_then(|value| value.to_str()) == Some(".harn-runs") {
107        parent.parent().unwrap_or(parent).to_path_buf()
108    } else {
109        parent.to_path_buf()
110    }
111}
112
113fn workflow_target_root(target: &WorkflowTarget) -> PathBuf {
114    crate::runtime_paths::workflow_dir(&target.base_dir).join(&target.workflow_id)
115}
116
117fn workflow_state_path(target: &WorkflowTarget) -> PathBuf {
118    workflow_target_root(target).join("state.json")
119}
120
121fn now_rfc3339() -> String {
122    time::OffsetDateTime::now_utc()
123        .format(&time::format_description::well_known::Rfc3339)
124        .unwrap_or_else(|_| uuid::Uuid::now_v7().to_string())
125}
126
127fn load_state(target: &WorkflowTarget) -> Result<WorkflowMailboxState, String> {
128    let path = workflow_state_path(target);
129    if !path.exists() {
130        return Ok(WorkflowMailboxState {
131            workflow_id: target.workflow_id.clone(),
132            ..WorkflowMailboxState::default()
133        });
134    }
135    let text = std::fs::read_to_string(&path)
136        .map_err(|error| format!("workflow state read error: {error}"))?;
137    let mut state: WorkflowMailboxState = serde_json::from_str(&text)
138        .map_err(|error| format!("workflow state parse error: {error}"))?;
139    if state.type_name.is_empty() {
140        state.type_name = "workflow_mailbox".to_string();
141    }
142    if state.workflow_id.is_empty() {
143        state.workflow_id = target.workflow_id.clone();
144    }
145    if state.generation == 0 {
146        state.generation = 1;
147    }
148    Ok(state)
149}
150
151fn save_state(target: &WorkflowTarget, state: &WorkflowMailboxState) -> Result<(), String> {
152    let path = workflow_state_path(target);
153    let parent = path
154        .parent()
155        .ok_or_else(|| "workflow state path has no parent".to_string())?;
156    std::fs::create_dir_all(parent)
157        .map_err(|error| format!("workflow state mkdir error: {error}"))?;
158    let json = serde_json::to_string_pretty(state)
159        .map_err(|error| format!("workflow state encode error: {error}"))?;
160    let tmp_path = parent.join(format!(".state.json.{}.tmp", uuid::Uuid::now_v7()));
161    std::fs::write(&tmp_path, &json)
162        .map_err(|error| format!("workflow state write error: {error}"))?;
163    if let Err(error) = std::fs::rename(&tmp_path, &path) {
164        let _ = std::fs::remove_file(&tmp_path);
165        return Err(format!("workflow state rename error: {error}"));
166    }
167    Ok(())
168}
169
170fn parse_target_json(
171    value: &serde_json::Value,
172    fallback_base_dir: Option<&Path>,
173) -> Option<WorkflowTarget> {
174    match value {
175        serde_json::Value::String(text) => Some(WorkflowTarget {
176            workflow_id: sanitize_workflow_id(text),
177            base_dir: fallback_base_dir
178                .map(Path::to_path_buf)
179                .unwrap_or_else(runtime_root_base),
180        }),
181        serde_json::Value::Object(map) => {
182            let workflow_id = map
183                .get("workflow_id")
184                .and_then(|value| value.as_str())
185                .or_else(|| map.get("workflow").and_then(|value| value.as_str()))
186                .or_else(|| {
187                    map.get("run")
188                        .and_then(|value| value.get("workflow_id"))
189                        .and_then(|value| value.as_str())
190                })
191                .or_else(|| {
192                    map.get("result")
193                        .and_then(|value| value.get("run"))
194                        .and_then(|value| value.get("workflow_id"))
195                        .and_then(|value| value.as_str())
196                })?;
197            let explicit_base = map
198                .get("base_dir")
199                .and_then(|value| value.as_str())
200                .filter(|value| !value.trim().is_empty())
201                .map(PathBuf::from);
202            let persisted_path = map
203                .get("persisted_path")
204                .and_then(|value| value.as_str())
205                .or_else(|| map.get("path").and_then(|value| value.as_str()))
206                .or_else(|| {
207                    map.get("run")
208                        .and_then(|value| value.get("persisted_path"))
209                        .and_then(|value| value.as_str())
210                })
211                .or_else(|| {
212                    map.get("result")
213                        .and_then(|value| value.get("run"))
214                        .and_then(|value| value.get("persisted_path"))
215                        .and_then(|value| value.as_str())
216                });
217            let base_dir = explicit_base
218                .or_else(|| {
219                    persisted_path
220                        .map(|path| workflow_base_dir_from_persisted_path(Path::new(path)))
221                })
222                .or_else(|| fallback_base_dir.map(Path::to_path_buf))
223                .unwrap_or_else(runtime_root_base);
224            Some(WorkflowTarget {
225                workflow_id: sanitize_workflow_id(workflow_id),
226                base_dir,
227            })
228        }
229        _ => None,
230    }
231}
232
233fn parse_target_vm(
234    value: Option<&VmValue>,
235    fallback_base_dir: Option<&Path>,
236    builtin: &str,
237) -> Result<WorkflowTarget, VmError> {
238    let value = value.ok_or_else(|| VmError::Runtime(format!("{builtin}: missing target")))?;
239    parse_target_json(&crate::llm::vm_value_to_json(value), fallback_base_dir).ok_or_else(|| {
240        VmError::Runtime(format!(
241            "{builtin}: target must be a workflow id string or dict with workflow_id/workflow"
242        ))
243    })
244}
245
246fn workflow_status_json(
247    target: &WorkflowTarget,
248    state: &WorkflowMailboxState,
249) -> serde_json::Value {
250    serde_json::json!({
251        "workflow_id": target.workflow_id,
252        "base_dir": target.base_dir.to_string_lossy(),
253        "generation": state.generation,
254        "paused": state.paused,
255        "pending_count": state.mailbox.len(),
256        "query_count": state.queries.len(),
257        "response_count": state.responses.len(),
258        "continue_as_new_count": state.continue_as_new_count,
259        "last_continue_as_new_at": state.last_continue_as_new_at,
260    })
261}
262
263fn enqueue_message(
264    target: &WorkflowTarget,
265    kind: &str,
266    name: &str,
267    payload: serde_json::Value,
268    request_id: Option<String>,
269) -> Result<serde_json::Value, String> {
270    let mut state = load_state(target)?;
271    state.next_seq += 1;
272    let message = WorkflowMessageRecord {
273        seq: state.next_seq,
274        kind: kind.to_string(),
275        name: name.to_string(),
276        request_id,
277        payload,
278        enqueued_at: now_rfc3339(),
279    };
280    state.mailbox.push_back(message.clone());
281    save_state(target, &state)?;
282    Ok(serde_json::json!({
283        "workflow_id": target.workflow_id,
284        "message": message,
285        "status": workflow_status_json(target, &state),
286    }))
287}
288
289pub fn workflow_signal_for_base(
290    base_dir: &Path,
291    workflow_id: &str,
292    name: &str,
293    payload: serde_json::Value,
294) -> Result<serde_json::Value, String> {
295    let target = WorkflowTarget {
296        workflow_id: sanitize_workflow_id(workflow_id),
297        base_dir: base_dir.to_path_buf(),
298    };
299    enqueue_message(&target, "signal", name, payload, None)
300}
301
302pub fn workflow_query_for_base(
303    base_dir: &Path,
304    workflow_id: &str,
305    name: &str,
306) -> Result<serde_json::Value, String> {
307    let target = WorkflowTarget {
308        workflow_id: sanitize_workflow_id(workflow_id),
309        base_dir: base_dir.to_path_buf(),
310    };
311    let state = load_state(&target)?;
312    Ok(state
313        .queries
314        .get(name)
315        .map(|record| record.value.clone())
316        .unwrap_or(serde_json::Value::Null))
317}
318
319pub fn workflow_publish_query_for_base(
320    base_dir: &Path,
321    workflow_id: &str,
322    name: &str,
323    value: serde_json::Value,
324) -> Result<serde_json::Value, String> {
325    let target = WorkflowTarget {
326        workflow_id: sanitize_workflow_id(workflow_id),
327        base_dir: base_dir.to_path_buf(),
328    };
329    let mut state = load_state(&target)?;
330    state.queries.insert(
331        name.to_string(),
332        WorkflowQueryRecord {
333            value,
334            published_at: now_rfc3339(),
335        },
336    );
337    save_state(&target, &state)?;
338    Ok(workflow_status_json(&target, &state))
339}
340
341pub fn workflow_pause_for_base(
342    base_dir: &Path,
343    workflow_id: &str,
344) -> Result<serde_json::Value, String> {
345    let target = WorkflowTarget {
346        workflow_id: sanitize_workflow_id(workflow_id),
347        base_dir: base_dir.to_path_buf(),
348    };
349    let mut state = load_state(&target)?;
350    state.paused = true;
351    state.next_seq += 1;
352    state.mailbox.push_back(WorkflowMessageRecord {
353        seq: state.next_seq,
354        kind: "control".to_string(),
355        name: "pause".to_string(),
356        request_id: None,
357        payload: serde_json::json!({}),
358        enqueued_at: now_rfc3339(),
359    });
360    save_state(&target, &state)?;
361    Ok(workflow_status_json(&target, &state))
362}
363
364pub fn workflow_resume_for_base(
365    base_dir: &Path,
366    workflow_id: &str,
367) -> Result<serde_json::Value, String> {
368    let target = WorkflowTarget {
369        workflow_id: sanitize_workflow_id(workflow_id),
370        base_dir: base_dir.to_path_buf(),
371    };
372    let mut state = load_state(&target)?;
373    state.paused = false;
374    state.next_seq += 1;
375    state.mailbox.push_back(WorkflowMessageRecord {
376        seq: state.next_seq,
377        kind: "control".to_string(),
378        name: "resume".to_string(),
379        request_id: None,
380        payload: serde_json::json!({}),
381        enqueued_at: now_rfc3339(),
382    });
383    save_state(&target, &state)?;
384    Ok(workflow_status_json(&target, &state))
385}
386
387pub async fn workflow_update_for_base(
388    base_dir: &Path,
389    workflow_id: &str,
390    name: &str,
391    payload: serde_json::Value,
392    timeout: StdDuration,
393) -> Result<serde_json::Value, String> {
394    let target = WorkflowTarget {
395        workflow_id: sanitize_workflow_id(workflow_id),
396        base_dir: base_dir.to_path_buf(),
397    };
398    let request_id = uuid::Uuid::now_v7().to_string();
399    enqueue_message(&target, "update", name, payload, Some(request_id.clone()))?;
400    let started = std::time::Instant::now();
401    while started.elapsed() <= timeout {
402        if let Ok(state) = load_state(&target) {
403            if let Some(response) = state.responses.get(&request_id) {
404                return Ok(response.value.clone());
405            }
406        }
407        tokio::time::sleep(StdDuration::from_millis(UPDATE_POLL_INTERVAL_MS)).await;
408    }
409    Err(format!(
410        "workflow update '{name}' timed out for '{}'",
411        target.workflow_id
412    ))
413}
414
415pub fn workflow_respond_update_for_base(
416    base_dir: &Path,
417    workflow_id: &str,
418    request_id: &str,
419    name: Option<&str>,
420    value: serde_json::Value,
421) -> Result<serde_json::Value, String> {
422    let target = WorkflowTarget {
423        workflow_id: sanitize_workflow_id(workflow_id),
424        base_dir: base_dir.to_path_buf(),
425    };
426    let mut state = load_state(&target)?;
427    state.responses.insert(
428        request_id.to_string(),
429        WorkflowUpdateResponseRecord {
430            request_id: request_id.to_string(),
431            name: name.map(ToString::to_string),
432            value,
433            responded_at: now_rfc3339(),
434        },
435    );
436    save_state(&target, &state)?;
437    Ok(workflow_status_json(&target, &state))
438}
439
440pub(crate) fn register_workflow_message_builtins(vm: &mut Vm) {
441    vm.set_global(
442        "workflow",
443        VmValue::Dict(Rc::new(BTreeMap::from([
444            (
445                "signal".to_string(),
446                VmValue::BuiltinRef(Rc::from("workflow.signal")),
447            ),
448            (
449                "query".to_string(),
450                VmValue::BuiltinRef(Rc::from("workflow.query")),
451            ),
452            (
453                "update".to_string(),
454                VmValue::BuiltinRef(Rc::from("workflow.update")),
455            ),
456            (
457                "publish_query".to_string(),
458                VmValue::BuiltinRef(Rc::from("workflow.publish_query")),
459            ),
460            (
461                "receive".to_string(),
462                VmValue::BuiltinRef(Rc::from("workflow.receive")),
463            ),
464            (
465                "respond_update".to_string(),
466                VmValue::BuiltinRef(Rc::from("workflow.respond_update")),
467            ),
468            (
469                "pause".to_string(),
470                VmValue::BuiltinRef(Rc::from("workflow.pause")),
471            ),
472            (
473                "resume".to_string(),
474                VmValue::BuiltinRef(Rc::from("workflow.resume")),
475            ),
476            (
477                "status".to_string(),
478                VmValue::BuiltinRef(Rc::from("workflow.status")),
479            ),
480            (
481                "continue_as_new".to_string(),
482                VmValue::BuiltinRef(Rc::from("workflow.continue_as_new")),
483            ),
484        ]))),
485    );
486
487    vm.register_builtin("workflow.signal", |args, _out| {
488        let target = parse_target_vm(args.first(), None, "workflow.signal")?;
489        let name = args
490            .get(1)
491            .map(|value| value.display())
492            .filter(|value| !value.is_empty())
493            .ok_or_else(|| VmError::Runtime("workflow.signal: missing name".to_string()))?;
494        let payload = args
495            .get(2)
496            .map(crate::llm::vm_value_to_json)
497            .unwrap_or(serde_json::Value::Null);
498        let result =
499            enqueue_message(&target, "signal", &name, payload, None).map_err(VmError::Runtime)?;
500        Ok(crate::stdlib::json_to_vm_value(&result))
501    });
502
503    vm.register_builtin("workflow.query", |args, _out| {
504        let target = parse_target_vm(args.first(), None, "workflow.query")?;
505        let name = args
506            .get(1)
507            .map(|value| value.display())
508            .filter(|value| !value.is_empty())
509            .ok_or_else(|| VmError::Runtime("workflow.query: missing name".to_string()))?;
510        let state = load_state(&target).map_err(VmError::Runtime)?;
511        Ok(crate::stdlib::json_to_vm_value(
512            &state
513                .queries
514                .get(&name)
515                .map(|record| record.value.clone())
516                .unwrap_or(serde_json::Value::Null),
517        ))
518    });
519
520    vm.register_async_builtin("workflow.update", |args| async move {
521        let target = parse_target_vm(args.first(), None, "workflow.update")?;
522        let name = args
523            .get(1)
524            .map(|value| value.display())
525            .filter(|value| !value.is_empty())
526            .ok_or_else(|| VmError::Runtime("workflow.update: missing name".to_string()))?;
527        let payload = args
528            .get(2)
529            .map(crate::llm::vm_value_to_json)
530            .unwrap_or(serde_json::Value::Null);
531        let timeout_ms = args
532            .get(3)
533            .and_then(|value| value.as_dict())
534            .and_then(|dict| dict.get("timeout_ms"))
535            .and_then(VmValue::as_int)
536            .unwrap_or(DEFAULT_UPDATE_TIMEOUT_MS as i64)
537            .max(1) as u64;
538        let result = workflow_update_for_base(
539            &target.base_dir,
540            &target.workflow_id,
541            &name,
542            payload,
543            StdDuration::from_millis(timeout_ms),
544        )
545        .await
546        .map_err(VmError::Runtime)?;
547        Ok(crate::stdlib::json_to_vm_value(&result))
548    });
549
550    vm.register_builtin("workflow.publish_query", |args, _out| {
551        let target = parse_target_vm(args.first(), None, "workflow.publish_query")?;
552        let name = args
553            .get(1)
554            .map(|value| value.display())
555            .filter(|value| !value.is_empty())
556            .ok_or_else(|| VmError::Runtime("workflow.publish_query: missing name".to_string()))?;
557        let value = args
558            .get(2)
559            .map(crate::llm::vm_value_to_json)
560            .unwrap_or(serde_json::Value::Null);
561        let result =
562            workflow_publish_query_for_base(&target.base_dir, &target.workflow_id, &name, value)
563                .map_err(VmError::Runtime)?;
564        Ok(crate::stdlib::json_to_vm_value(&result))
565    });
566
567    vm.register_builtin("workflow.receive", |args, _out| {
568        let target = parse_target_vm(args.first(), None, "workflow.receive")?;
569        let mut state = load_state(&target).map_err(VmError::Runtime)?;
570        let Some(message) = state.mailbox.pop_front() else {
571            return Ok(VmValue::Nil);
572        };
573        save_state(&target, &state).map_err(VmError::Runtime)?;
574        Ok(crate::stdlib::json_to_vm_value(&serde_json::json!({
575            "workflow_id": target.workflow_id,
576            "seq": message.seq,
577            "kind": message.kind,
578            "name": message.name,
579            "request_id": message.request_id,
580            "payload": message.payload,
581            "enqueued_at": message.enqueued_at,
582        })))
583    });
584
585    vm.register_builtin("workflow.respond_update", |args, _out| {
586        let target = parse_target_vm(args.first(), None, "workflow.respond_update")?;
587        let request_id = args
588            .get(1)
589            .map(|value| value.display())
590            .filter(|value| !value.is_empty())
591            .ok_or_else(|| {
592                VmError::Runtime("workflow.respond_update: missing request id".to_string())
593            })?;
594        let value = args
595            .get(2)
596            .map(crate::llm::vm_value_to_json)
597            .unwrap_or(serde_json::Value::Null);
598        let name = args
599            .get(3)
600            .map(|value| value.display())
601            .filter(|value| !value.is_empty());
602        let result = workflow_respond_update_for_base(
603            &target.base_dir,
604            &target.workflow_id,
605            &request_id,
606            name.as_deref(),
607            value,
608        )
609        .map_err(VmError::Runtime)?;
610        Ok(crate::stdlib::json_to_vm_value(&result))
611    });
612
613    vm.register_builtin("workflow.pause", |args, _out| {
614        let target = parse_target_vm(args.first(), None, "workflow.pause")?;
615        let result = workflow_pause_for_base(&target.base_dir, &target.workflow_id)
616            .map_err(VmError::Runtime)?;
617        Ok(crate::stdlib::json_to_vm_value(&result))
618    });
619
620    vm.register_builtin("workflow.resume", |args, _out| {
621        let target = parse_target_vm(args.first(), None, "workflow.resume")?;
622        let result = workflow_resume_for_base(&target.base_dir, &target.workflow_id)
623            .map_err(VmError::Runtime)?;
624        Ok(crate::stdlib::json_to_vm_value(&result))
625    });
626
627    vm.register_builtin("workflow.status", |args, _out| {
628        let target = parse_target_vm(args.first(), None, "workflow.status")?;
629        let state = load_state(&target).map_err(VmError::Runtime)?;
630        Ok(crate::stdlib::json_to_vm_value(&workflow_status_json(
631            &target, &state,
632        )))
633    });
634
635    vm.register_builtin("workflow.continue_as_new", |args, _out| {
636        let target = parse_target_vm(args.first(), None, "workflow.continue_as_new")?;
637        let mut state = load_state(&target).map_err(VmError::Runtime)?;
638        state.generation += 1;
639        state.continue_as_new_count += 1;
640        state.last_continue_as_new_at = Some(now_rfc3339());
641        state.responses.clear();
642        save_state(&target, &state).map_err(VmError::Runtime)?;
643        Ok(crate::stdlib::json_to_vm_value(&workflow_status_json(
644            &target, &state,
645        )))
646    });
647
648    vm.register_builtin("continue_as_new", |args, _out| {
649        let target = parse_target_vm(args.first(), None, "continue_as_new")?;
650        let mut state = load_state(&target).map_err(VmError::Runtime)?;
651        state.generation += 1;
652        state.continue_as_new_count += 1;
653        state.last_continue_as_new_at = Some(now_rfc3339());
654        state.responses.clear();
655        save_state(&target, &state).map_err(VmError::Runtime)?;
656        Ok(crate::stdlib::json_to_vm_value(&workflow_status_json(
657            &target, &state,
658        )))
659    });
660}
661
662#[cfg(test)]
663mod tests {
664    use super::*;
665
666    #[tokio::test]
667    async fn update_round_trip_waits_for_response() {
668        let dir = tempfile::tempdir().expect("tempdir");
669        let workflow_id = "wf-update";
670        let base_dir = dir.path().to_path_buf();
671        let base_dir_clone = base_dir.clone();
672        let task = tokio::spawn(async move {
673            workflow_update_for_base(
674                &base_dir_clone,
675                workflow_id,
676                "adjust_budget",
677                serde_json::json!({"max_usd": 10}),
678                StdDuration::from_millis(500),
679            )
680            .await
681        });
682
683        tokio::time::sleep(StdDuration::from_millis(50)).await;
684        let target = WorkflowTarget {
685            workflow_id: workflow_id.to_string(),
686            base_dir: base_dir.clone(),
687        };
688        let mut state = load_state(&target).expect("load state");
689        let message = state.mailbox.pop_front().expect("queued update");
690        assert_eq!(message.kind, "update");
691        state.responses.insert(
692            message.request_id.clone().expect("request id"),
693            WorkflowUpdateResponseRecord {
694                request_id: message.request_id.expect("request id"),
695                name: Some(message.name.clone()),
696                value: serde_json::json!({"ok": true}),
697                responded_at: now_rfc3339(),
698            },
699        );
700        save_state(&target, &state).expect("save response");
701
702        let result = task.await.expect("join").expect("update result");
703        assert_eq!(result, serde_json::json!({"ok": true}));
704    }
705
706    #[test]
707    fn persisted_path_drives_target_base_dir() {
708        let base = parse_target_json(
709            &serde_json::json!({
710                "workflow_id": "wf",
711                "persisted_path": "/tmp/demo/.harn-runs/run.json"
712            }),
713            None,
714        )
715        .expect("target");
716        assert_eq!(base.workflow_id, "wf");
717        assert_eq!(base.base_dir, PathBuf::from("/tmp/demo"));
718    }
719}