Skip to main content

harn_vm/stdlib/
workflow_messages.rs

1use std::collections::{BTreeMap, VecDeque};
2use std::path::{Path, PathBuf};
3use std::rc::Rc;
4use std::time::Duration as StdDuration;
5
6use serde::{Deserialize, Serialize};
7
8use crate::stdlib::process::runtime_root_base;
9use crate::value::{VmError, VmValue};
10use crate::vm::Vm;
11
12const DEFAULT_UPDATE_TIMEOUT_MS: u64 = 30_000;
13const UPDATE_POLL_INTERVAL_MS: u64 = 25;
14
15#[derive(Clone, Debug, Serialize, Deserialize)]
16pub struct WorkflowMessageRecord {
17    pub seq: u64,
18    pub kind: String,
19    pub name: String,
20    #[serde(skip_serializing_if = "Option::is_none")]
21    pub request_id: Option<String>,
22    pub payload: serde_json::Value,
23    pub enqueued_at: String,
24}
25
26#[derive(Clone, Debug, Serialize, Deserialize)]
27pub struct WorkflowQueryRecord {
28    pub value: serde_json::Value,
29    pub published_at: String,
30}
31
32#[derive(Clone, Debug, Serialize, Deserialize)]
33pub struct WorkflowUpdateResponseRecord {
34    pub request_id: String,
35    #[serde(skip_serializing_if = "Option::is_none")]
36    pub name: Option<String>,
37    pub value: serde_json::Value,
38    pub responded_at: String,
39}
40
41#[derive(Clone, Debug, Serialize, Deserialize)]
42pub struct WorkflowMailboxState {
43    #[serde(rename = "_type")]
44    pub type_name: String,
45    pub workflow_id: String,
46    #[serde(default = "default_generation")]
47    pub generation: u64,
48    #[serde(default)]
49    pub continue_as_new_count: u64,
50    #[serde(skip_serializing_if = "Option::is_none")]
51    pub last_continue_as_new_at: Option<String>,
52    #[serde(default)]
53    pub paused: bool,
54    #[serde(default)]
55    pub next_seq: u64,
56    #[serde(default)]
57    pub mailbox: VecDeque<WorkflowMessageRecord>,
58    #[serde(default)]
59    pub queries: BTreeMap<String, WorkflowQueryRecord>,
60    #[serde(default)]
61    pub responses: BTreeMap<String, WorkflowUpdateResponseRecord>,
62}
63
64impl Default for WorkflowMailboxState {
65    fn default() -> Self {
66        Self {
67            type_name: "workflow_mailbox".to_string(),
68            workflow_id: String::new(),
69            generation: default_generation(),
70            continue_as_new_count: 0,
71            last_continue_as_new_at: None,
72            paused: false,
73            next_seq: 0,
74            mailbox: VecDeque::new(),
75            queries: BTreeMap::new(),
76            responses: BTreeMap::new(),
77        }
78    }
79}
80
81fn default_generation() -> u64 {
82    1
83}
84
85#[derive(Clone, Debug, PartialEq, Eq)]
86struct WorkflowTarget {
87    workflow_id: String,
88    base_dir: PathBuf,
89}
90
91fn sanitize_workflow_id(raw: &str) -> String {
92    let trimmed = raw.trim();
93    let base = Path::new(trimmed)
94        .file_name()
95        .and_then(|value| value.to_str())
96        .unwrap_or(trimmed);
97    if base.is_empty() || base == "." || base == ".." {
98        "workflow".to_string()
99    } else {
100        base.to_string()
101    }
102}
103
104fn workflow_base_dir_from_persisted_path(path: &Path) -> PathBuf {
105    let parent = path.parent().unwrap_or_else(|| Path::new("."));
106    if parent.file_name().and_then(|value| value.to_str()) == Some(".harn-runs") {
107        parent.parent().unwrap_or(parent).to_path_buf()
108    } else {
109        parent.to_path_buf()
110    }
111}
112
113fn workflow_target_root(target: &WorkflowTarget) -> PathBuf {
114    crate::runtime_paths::workflow_dir(&target.base_dir).join(&target.workflow_id)
115}
116
117fn workflow_state_path(target: &WorkflowTarget) -> PathBuf {
118    workflow_target_root(target).join("state.json")
119}
120
121fn now_rfc3339() -> String {
122    time::OffsetDateTime::now_utc()
123        .format(&time::format_description::well_known::Rfc3339)
124        .unwrap_or_else(|_| uuid::Uuid::now_v7().to_string())
125}
126
127fn load_state(target: &WorkflowTarget) -> Result<WorkflowMailboxState, String> {
128    let path = workflow_state_path(target);
129    if !path.exists() {
130        return Ok(WorkflowMailboxState {
131            workflow_id: target.workflow_id.clone(),
132            ..WorkflowMailboxState::default()
133        });
134    }
135    let text = std::fs::read_to_string(&path)
136        .map_err(|error| format!("workflow state read error: {error}"))?;
137    let mut state: WorkflowMailboxState = serde_json::from_str(&text)
138        .map_err(|error| format!("workflow state parse error: {error}"))?;
139    if state.type_name.is_empty() {
140        state.type_name = "workflow_mailbox".to_string();
141    }
142    if state.workflow_id.is_empty() {
143        state.workflow_id = target.workflow_id.clone();
144    }
145    if state.generation == 0 {
146        state.generation = 1;
147    }
148    Ok(state)
149}
150
151fn save_state(target: &WorkflowTarget, state: &WorkflowMailboxState) -> Result<(), String> {
152    let path = workflow_state_path(target);
153    let json = serde_json::to_string_pretty(state)
154        .map_err(|error| format!("workflow state encode error: {error}"))?;
155    crate::atomic_io::atomic_write(&path, json.as_bytes())
156        .map_err(|error| format!("workflow state write error: {error}"))
157}
158
159fn parse_target_json(
160    value: &serde_json::Value,
161    fallback_base_dir: Option<&Path>,
162) -> Option<WorkflowTarget> {
163    match value {
164        serde_json::Value::String(text) => Some(WorkflowTarget {
165            workflow_id: sanitize_workflow_id(text),
166            base_dir: fallback_base_dir
167                .map(Path::to_path_buf)
168                .unwrap_or_else(runtime_root_base),
169        }),
170        serde_json::Value::Object(map) => {
171            let workflow_id = map
172                .get("workflow_id")
173                .and_then(|value| value.as_str())
174                .or_else(|| map.get("workflow").and_then(|value| value.as_str()))
175                .or_else(|| {
176                    map.get("run")
177                        .and_then(|value| value.get("workflow_id"))
178                        .and_then(|value| value.as_str())
179                })
180                .or_else(|| {
181                    map.get("result")
182                        .and_then(|value| value.get("run"))
183                        .and_then(|value| value.get("workflow_id"))
184                        .and_then(|value| value.as_str())
185                })?;
186            let explicit_base = map
187                .get("base_dir")
188                .and_then(|value| value.as_str())
189                .filter(|value| !value.trim().is_empty())
190                .map(PathBuf::from);
191            let persisted_path = map
192                .get("persisted_path")
193                .and_then(|value| value.as_str())
194                .or_else(|| map.get("path").and_then(|value| value.as_str()))
195                .or_else(|| {
196                    map.get("run")
197                        .and_then(|value| value.get("persisted_path"))
198                        .and_then(|value| value.as_str())
199                })
200                .or_else(|| {
201                    map.get("result")
202                        .and_then(|value| value.get("run"))
203                        .and_then(|value| value.get("persisted_path"))
204                        .and_then(|value| value.as_str())
205                });
206            let base_dir = explicit_base
207                .or_else(|| {
208                    persisted_path
209                        .map(|path| workflow_base_dir_from_persisted_path(Path::new(path)))
210                })
211                .or_else(|| fallback_base_dir.map(Path::to_path_buf))
212                .unwrap_or_else(runtime_root_base);
213            Some(WorkflowTarget {
214                workflow_id: sanitize_workflow_id(workflow_id),
215                base_dir,
216            })
217        }
218        _ => None,
219    }
220}
221
222fn parse_target_vm(
223    value: Option<&VmValue>,
224    fallback_base_dir: Option<&Path>,
225    builtin: &str,
226) -> Result<WorkflowTarget, VmError> {
227    let value = value.ok_or_else(|| VmError::Runtime(format!("{builtin}: missing target")))?;
228    parse_target_json(&crate::llm::vm_value_to_json(value), fallback_base_dir).ok_or_else(|| {
229        VmError::Runtime(format!(
230            "{builtin}: target must be a workflow id string or dict with workflow_id/workflow"
231        ))
232    })
233}
234
235fn workflow_status_json(
236    target: &WorkflowTarget,
237    state: &WorkflowMailboxState,
238) -> serde_json::Value {
239    serde_json::json!({
240        "workflow_id": target.workflow_id,
241        "base_dir": target.base_dir.to_string_lossy(),
242        "generation": state.generation,
243        "paused": state.paused,
244        "pending_count": state.mailbox.len(),
245        "query_count": state.queries.len(),
246        "response_count": state.responses.len(),
247        "continue_as_new_count": state.continue_as_new_count,
248        "last_continue_as_new_at": state.last_continue_as_new_at,
249    })
250}
251
252fn enqueue_message(
253    target: &WorkflowTarget,
254    kind: &str,
255    name: &str,
256    payload: serde_json::Value,
257    request_id: Option<String>,
258) -> Result<serde_json::Value, String> {
259    let mut state = load_state(target)?;
260    state.next_seq += 1;
261    let message = WorkflowMessageRecord {
262        seq: state.next_seq,
263        kind: kind.to_string(),
264        name: name.to_string(),
265        request_id,
266        payload,
267        enqueued_at: now_rfc3339(),
268    };
269    state.mailbox.push_back(message.clone());
270    save_state(target, &state)?;
271    Ok(serde_json::json!({
272        "workflow_id": target.workflow_id,
273        "message": message,
274        "status": workflow_status_json(target, &state),
275    }))
276}
277
278pub fn workflow_signal_for_base(
279    base_dir: &Path,
280    workflow_id: &str,
281    name: &str,
282    payload: serde_json::Value,
283) -> Result<serde_json::Value, String> {
284    let target = WorkflowTarget {
285        workflow_id: sanitize_workflow_id(workflow_id),
286        base_dir: base_dir.to_path_buf(),
287    };
288    enqueue_message(&target, "signal", name, payload, None)
289}
290
291pub fn workflow_query_for_base(
292    base_dir: &Path,
293    workflow_id: &str,
294    name: &str,
295) -> Result<serde_json::Value, String> {
296    let target = WorkflowTarget {
297        workflow_id: sanitize_workflow_id(workflow_id),
298        base_dir: base_dir.to_path_buf(),
299    };
300    let state = load_state(&target)?;
301    Ok(state
302        .queries
303        .get(name)
304        .map(|record| record.value.clone())
305        .unwrap_or(serde_json::Value::Null))
306}
307
308pub fn workflow_publish_query_for_base(
309    base_dir: &Path,
310    workflow_id: &str,
311    name: &str,
312    value: serde_json::Value,
313) -> Result<serde_json::Value, String> {
314    let target = WorkflowTarget {
315        workflow_id: sanitize_workflow_id(workflow_id),
316        base_dir: base_dir.to_path_buf(),
317    };
318    let mut state = load_state(&target)?;
319    state.queries.insert(
320        name.to_string(),
321        WorkflowQueryRecord {
322            value,
323            published_at: now_rfc3339(),
324        },
325    );
326    save_state(&target, &state)?;
327    Ok(workflow_status_json(&target, &state))
328}
329
330pub fn workflow_pause_for_base(
331    base_dir: &Path,
332    workflow_id: &str,
333) -> Result<serde_json::Value, String> {
334    let target = WorkflowTarget {
335        workflow_id: sanitize_workflow_id(workflow_id),
336        base_dir: base_dir.to_path_buf(),
337    };
338    let mut state = load_state(&target)?;
339    state.paused = true;
340    state.next_seq += 1;
341    state.mailbox.push_back(WorkflowMessageRecord {
342        seq: state.next_seq,
343        kind: "control".to_string(),
344        name: "pause".to_string(),
345        request_id: None,
346        payload: serde_json::json!({}),
347        enqueued_at: now_rfc3339(),
348    });
349    save_state(&target, &state)?;
350    Ok(workflow_status_json(&target, &state))
351}
352
353pub fn workflow_resume_for_base(
354    base_dir: &Path,
355    workflow_id: &str,
356) -> Result<serde_json::Value, String> {
357    let target = WorkflowTarget {
358        workflow_id: sanitize_workflow_id(workflow_id),
359        base_dir: base_dir.to_path_buf(),
360    };
361    let mut state = load_state(&target)?;
362    state.paused = false;
363    state.next_seq += 1;
364    state.mailbox.push_back(WorkflowMessageRecord {
365        seq: state.next_seq,
366        kind: "control".to_string(),
367        name: "resume".to_string(),
368        request_id: None,
369        payload: serde_json::json!({}),
370        enqueued_at: now_rfc3339(),
371    });
372    save_state(&target, &state)?;
373    Ok(workflow_status_json(&target, &state))
374}
375
376pub async fn workflow_update_for_base(
377    base_dir: &Path,
378    workflow_id: &str,
379    name: &str,
380    payload: serde_json::Value,
381    timeout: StdDuration,
382) -> Result<serde_json::Value, String> {
383    let target = WorkflowTarget {
384        workflow_id: sanitize_workflow_id(workflow_id),
385        base_dir: base_dir.to_path_buf(),
386    };
387    let request_id = uuid::Uuid::now_v7().to_string();
388    enqueue_message(&target, "update", name, payload, Some(request_id.clone()))?;
389    let started = std::time::Instant::now();
390    while started.elapsed() <= timeout {
391        if let Ok(state) = load_state(&target) {
392            if let Some(response) = state.responses.get(&request_id) {
393                return Ok(response.value.clone());
394            }
395        }
396        tokio::time::sleep(StdDuration::from_millis(UPDATE_POLL_INTERVAL_MS)).await;
397    }
398    Err(format!(
399        "workflow update '{name}' timed out for '{}'",
400        target.workflow_id
401    ))
402}
403
404pub fn workflow_respond_update_for_base(
405    base_dir: &Path,
406    workflow_id: &str,
407    request_id: &str,
408    name: Option<&str>,
409    value: serde_json::Value,
410) -> Result<serde_json::Value, String> {
411    let target = WorkflowTarget {
412        workflow_id: sanitize_workflow_id(workflow_id),
413        base_dir: base_dir.to_path_buf(),
414    };
415    let mut state = load_state(&target)?;
416    state.responses.insert(
417        request_id.to_string(),
418        WorkflowUpdateResponseRecord {
419            request_id: request_id.to_string(),
420            name: name.map(ToString::to_string),
421            value,
422            responded_at: now_rfc3339(),
423        },
424    );
425    save_state(&target, &state)?;
426    Ok(workflow_status_json(&target, &state))
427}
428
429pub(crate) fn register_workflow_message_builtins(vm: &mut Vm) {
430    vm.set_global(
431        "workflow",
432        VmValue::Dict(Rc::new(BTreeMap::from([
433            (
434                "signal".to_string(),
435                VmValue::BuiltinRef(Rc::from("workflow.signal")),
436            ),
437            (
438                "query".to_string(),
439                VmValue::BuiltinRef(Rc::from("workflow.query")),
440            ),
441            (
442                "update".to_string(),
443                VmValue::BuiltinRef(Rc::from("workflow.update")),
444            ),
445            (
446                "publish_query".to_string(),
447                VmValue::BuiltinRef(Rc::from("workflow.publish_query")),
448            ),
449            (
450                "receive".to_string(),
451                VmValue::BuiltinRef(Rc::from("workflow.receive")),
452            ),
453            (
454                "respond_update".to_string(),
455                VmValue::BuiltinRef(Rc::from("workflow.respond_update")),
456            ),
457            (
458                "pause".to_string(),
459                VmValue::BuiltinRef(Rc::from("workflow.pause")),
460            ),
461            (
462                "resume".to_string(),
463                VmValue::BuiltinRef(Rc::from("workflow.resume")),
464            ),
465            (
466                "status".to_string(),
467                VmValue::BuiltinRef(Rc::from("workflow.status")),
468            ),
469            (
470                "continue_as_new".to_string(),
471                VmValue::BuiltinRef(Rc::from("workflow.continue_as_new")),
472            ),
473        ]))),
474    );
475
476    vm.register_builtin("workflow.signal", |args, _out| {
477        let target = parse_target_vm(args.first(), None, "workflow.signal")?;
478        let name = args
479            .get(1)
480            .map(|value| value.display())
481            .filter(|value| !value.is_empty())
482            .ok_or_else(|| VmError::Runtime("workflow.signal: missing name".to_string()))?;
483        let payload = args
484            .get(2)
485            .map(crate::llm::vm_value_to_json)
486            .unwrap_or(serde_json::Value::Null);
487        let result =
488            enqueue_message(&target, "signal", &name, payload, None).map_err(VmError::Runtime)?;
489        Ok(crate::stdlib::json_to_vm_value(&result))
490    });
491
492    vm.register_builtin("workflow.query", |args, _out| {
493        let target = parse_target_vm(args.first(), None, "workflow.query")?;
494        let name = args
495            .get(1)
496            .map(|value| value.display())
497            .filter(|value| !value.is_empty())
498            .ok_or_else(|| VmError::Runtime("workflow.query: missing name".to_string()))?;
499        let state = load_state(&target).map_err(VmError::Runtime)?;
500        Ok(crate::stdlib::json_to_vm_value(
501            &state
502                .queries
503                .get(&name)
504                .map(|record| record.value.clone())
505                .unwrap_or(serde_json::Value::Null),
506        ))
507    });
508
509    vm.register_async_builtin("workflow.update", |args| async move {
510        let target = parse_target_vm(args.first(), None, "workflow.update")?;
511        let name = args
512            .get(1)
513            .map(|value| value.display())
514            .filter(|value| !value.is_empty())
515            .ok_or_else(|| VmError::Runtime("workflow.update: missing name".to_string()))?;
516        let payload = args
517            .get(2)
518            .map(crate::llm::vm_value_to_json)
519            .unwrap_or(serde_json::Value::Null);
520        let timeout_ms = args
521            .get(3)
522            .and_then(|value| value.as_dict())
523            .and_then(|dict| dict.get("timeout_ms"))
524            .and_then(VmValue::as_int)
525            .unwrap_or(DEFAULT_UPDATE_TIMEOUT_MS as i64)
526            .max(1) as u64;
527        let result = workflow_update_for_base(
528            &target.base_dir,
529            &target.workflow_id,
530            &name,
531            payload,
532            StdDuration::from_millis(timeout_ms),
533        )
534        .await
535        .map_err(VmError::Runtime)?;
536        Ok(crate::stdlib::json_to_vm_value(&result))
537    });
538
539    vm.register_builtin("workflow.publish_query", |args, _out| {
540        let target = parse_target_vm(args.first(), None, "workflow.publish_query")?;
541        let name = args
542            .get(1)
543            .map(|value| value.display())
544            .filter(|value| !value.is_empty())
545            .ok_or_else(|| VmError::Runtime("workflow.publish_query: missing name".to_string()))?;
546        let value = args
547            .get(2)
548            .map(crate::llm::vm_value_to_json)
549            .unwrap_or(serde_json::Value::Null);
550        let result =
551            workflow_publish_query_for_base(&target.base_dir, &target.workflow_id, &name, value)
552                .map_err(VmError::Runtime)?;
553        Ok(crate::stdlib::json_to_vm_value(&result))
554    });
555
556    vm.register_builtin("workflow.receive", |args, _out| {
557        let target = parse_target_vm(args.first(), None, "workflow.receive")?;
558        let mut state = load_state(&target).map_err(VmError::Runtime)?;
559        let Some(message) = state.mailbox.pop_front() else {
560            return Ok(VmValue::Nil);
561        };
562        save_state(&target, &state).map_err(VmError::Runtime)?;
563        Ok(crate::stdlib::json_to_vm_value(&serde_json::json!({
564            "workflow_id": target.workflow_id,
565            "seq": message.seq,
566            "kind": message.kind,
567            "name": message.name,
568            "request_id": message.request_id,
569            "payload": message.payload,
570            "enqueued_at": message.enqueued_at,
571        })))
572    });
573
574    vm.register_builtin("workflow.respond_update", |args, _out| {
575        let target = parse_target_vm(args.first(), None, "workflow.respond_update")?;
576        let request_id = args
577            .get(1)
578            .map(|value| value.display())
579            .filter(|value| !value.is_empty())
580            .ok_or_else(|| {
581                VmError::Runtime("workflow.respond_update: missing request id".to_string())
582            })?;
583        let value = args
584            .get(2)
585            .map(crate::llm::vm_value_to_json)
586            .unwrap_or(serde_json::Value::Null);
587        let name = args
588            .get(3)
589            .map(|value| value.display())
590            .filter(|value| !value.is_empty());
591        let result = workflow_respond_update_for_base(
592            &target.base_dir,
593            &target.workflow_id,
594            &request_id,
595            name.as_deref(),
596            value,
597        )
598        .map_err(VmError::Runtime)?;
599        Ok(crate::stdlib::json_to_vm_value(&result))
600    });
601
602    vm.register_builtin("workflow.pause", |args, _out| {
603        let target = parse_target_vm(args.first(), None, "workflow.pause")?;
604        let result = workflow_pause_for_base(&target.base_dir, &target.workflow_id)
605            .map_err(VmError::Runtime)?;
606        Ok(crate::stdlib::json_to_vm_value(&result))
607    });
608
609    vm.register_builtin("workflow.resume", |args, _out| {
610        let target = parse_target_vm(args.first(), None, "workflow.resume")?;
611        let result = workflow_resume_for_base(&target.base_dir, &target.workflow_id)
612            .map_err(VmError::Runtime)?;
613        Ok(crate::stdlib::json_to_vm_value(&result))
614    });
615
616    vm.register_builtin("workflow.status", |args, _out| {
617        let target = parse_target_vm(args.first(), None, "workflow.status")?;
618        let state = load_state(&target).map_err(VmError::Runtime)?;
619        Ok(crate::stdlib::json_to_vm_value(&workflow_status_json(
620            &target, &state,
621        )))
622    });
623
624    vm.register_builtin("workflow.continue_as_new", |args, _out| {
625        let target = parse_target_vm(args.first(), None, "workflow.continue_as_new")?;
626        let mut state = load_state(&target).map_err(VmError::Runtime)?;
627        state.generation += 1;
628        state.continue_as_new_count += 1;
629        state.last_continue_as_new_at = Some(now_rfc3339());
630        state.responses.clear();
631        save_state(&target, &state).map_err(VmError::Runtime)?;
632        Ok(crate::stdlib::json_to_vm_value(&workflow_status_json(
633            &target, &state,
634        )))
635    });
636
637    vm.register_builtin("continue_as_new", |args, _out| {
638        let target = parse_target_vm(args.first(), None, "continue_as_new")?;
639        let mut state = load_state(&target).map_err(VmError::Runtime)?;
640        state.generation += 1;
641        state.continue_as_new_count += 1;
642        state.last_continue_as_new_at = Some(now_rfc3339());
643        state.responses.clear();
644        save_state(&target, &state).map_err(VmError::Runtime)?;
645        Ok(crate::stdlib::json_to_vm_value(&workflow_status_json(
646            &target, &state,
647        )))
648    });
649}
650
651#[cfg(test)]
652mod tests {
653    use super::*;
654
655    #[tokio::test(start_paused = true)]
656    async fn update_round_trip_waits_for_response() {
657        let dir = tempfile::tempdir().expect("tempdir");
658        let workflow_id = "wf-update";
659        let base_dir = dir.path().to_path_buf();
660        let base_dir_clone = base_dir.clone();
661        let task = tokio::spawn(async move {
662            workflow_update_for_base(
663                &base_dir_clone,
664                workflow_id,
665                "adjust_budget",
666                serde_json::json!({"max_usd": 10}),
667                StdDuration::from_millis(500),
668            )
669            .await
670        });
671
672        tokio::time::sleep(StdDuration::from_millis(50)).await;
673        let target = WorkflowTarget {
674            workflow_id: workflow_id.to_string(),
675            base_dir: base_dir.clone(),
676        };
677        let mut state = load_state(&target).expect("load state");
678        let message = state.mailbox.pop_front().expect("queued update");
679        assert_eq!(message.kind, "update");
680        state.responses.insert(
681            message.request_id.clone().expect("request id"),
682            WorkflowUpdateResponseRecord {
683                request_id: message.request_id.expect("request id"),
684                name: Some(message.name.clone()),
685                value: serde_json::json!({"ok": true}),
686                responded_at: now_rfc3339(),
687            },
688        );
689        save_state(&target, &state).expect("save response");
690
691        let result = task.await.expect("join").expect("update result");
692        assert_eq!(result, serde_json::json!({"ok": true}));
693    }
694
695    #[test]
696    fn persisted_path_drives_target_base_dir() {
697        let base = parse_target_json(
698            &serde_json::json!({
699                "workflow_id": "wf",
700                "persisted_path": "/tmp/demo/.harn-runs/run.json"
701            }),
702            None,
703        )
704        .expect("target");
705        assert_eq!(base.workflow_id, "wf");
706        assert_eq!(base.base_dir, PathBuf::from("/tmp/demo"));
707    }
708}