Skip to main content

harn_vm/stdlib/
workflow_messages.rs

1use std::collections::{BTreeMap, VecDeque};
2use std::path::{Path, PathBuf};
3use std::rc::Rc;
4use std::time::Duration as StdDuration;
5
6use serde::{Deserialize, Serialize};
7
8use crate::stdlib::process::runtime_root_base;
9use crate::value::{VmError, VmValue};
10use crate::vm::Vm;
11
12const DEFAULT_UPDATE_TIMEOUT_MS: u64 = 30_000;
13const UPDATE_POLL_INTERVAL_MS: u64 = 25;
14
15#[derive(Clone, Debug, Serialize, Deserialize)]
16pub struct WorkflowMessageRecord {
17    pub seq: u64,
18    pub kind: String,
19    pub name: String,
20    #[serde(skip_serializing_if = "Option::is_none")]
21    pub request_id: Option<String>,
22    pub payload: serde_json::Value,
23    pub enqueued_at: String,
24}
25
26#[derive(Clone, Debug, Serialize, Deserialize)]
27pub struct WorkflowQueryRecord {
28    pub value: serde_json::Value,
29    pub published_at: String,
30}
31
32#[derive(Clone, Debug, Serialize, Deserialize)]
33pub struct WorkflowUpdateResponseRecord {
34    pub request_id: String,
35    #[serde(skip_serializing_if = "Option::is_none")]
36    pub name: Option<String>,
37    pub value: serde_json::Value,
38    pub responded_at: String,
39}
40
41#[derive(Clone, Debug, Serialize, Deserialize)]
42pub struct WorkflowMailboxState {
43    #[serde(rename = "_type")]
44    pub type_name: String,
45    pub workflow_id: String,
46    #[serde(default = "default_generation")]
47    pub generation: u64,
48    #[serde(default)]
49    pub continue_as_new_count: u64,
50    #[serde(skip_serializing_if = "Option::is_none")]
51    pub last_continue_as_new_at: Option<String>,
52    #[serde(default)]
53    pub paused: bool,
54    #[serde(default)]
55    pub next_seq: u64,
56    #[serde(default)]
57    pub mailbox: VecDeque<WorkflowMessageRecord>,
58    #[serde(default)]
59    pub queries: BTreeMap<String, WorkflowQueryRecord>,
60    #[serde(default)]
61    pub responses: BTreeMap<String, WorkflowUpdateResponseRecord>,
62}
63
64impl Default for WorkflowMailboxState {
65    fn default() -> Self {
66        Self {
67            type_name: "workflow_mailbox".to_string(),
68            workflow_id: String::new(),
69            generation: default_generation(),
70            continue_as_new_count: 0,
71            last_continue_as_new_at: None,
72            paused: false,
73            next_seq: 0,
74            mailbox: VecDeque::new(),
75            queries: BTreeMap::new(),
76            responses: BTreeMap::new(),
77        }
78    }
79}
80
81fn default_generation() -> u64 {
82    1
83}
84
85#[derive(Clone, Debug, PartialEq, Eq)]
86struct WorkflowTarget {
87    workflow_id: String,
88    base_dir: PathBuf,
89}
90
91fn sanitize_workflow_id(raw: &str) -> String {
92    let trimmed = raw.trim();
93    let base = Path::new(trimmed)
94        .file_name()
95        .and_then(|value| value.to_str())
96        .unwrap_or(trimmed);
97    if base.is_empty() || base == "." || base == ".." {
98        "workflow".to_string()
99    } else {
100        base.to_string()
101    }
102}
103
104fn workflow_base_dir_from_persisted_path(path: &Path) -> PathBuf {
105    let parent = path.parent().unwrap_or_else(|| Path::new("."));
106    if parent.file_name().and_then(|value| value.to_str()) == Some(".harn-runs") {
107        parent.parent().unwrap_or(parent).to_path_buf()
108    } else {
109        parent.to_path_buf()
110    }
111}
112
113fn workflow_target_root(target: &WorkflowTarget) -> PathBuf {
114    crate::runtime_paths::workflow_dir(&target.base_dir).join(&target.workflow_id)
115}
116
117fn workflow_state_path(target: &WorkflowTarget) -> PathBuf {
118    workflow_target_root(target).join("state.json")
119}
120
121fn now_rfc3339() -> String {
122    time::OffsetDateTime::now_utc()
123        .format(&time::format_description::well_known::Rfc3339)
124        .unwrap_or_else(|_| uuid::Uuid::now_v7().to_string())
125}
126
127fn load_state(target: &WorkflowTarget) -> Result<WorkflowMailboxState, String> {
128    let path = workflow_state_path(target);
129    if !path.exists() {
130        return Ok(WorkflowMailboxState {
131            workflow_id: target.workflow_id.clone(),
132            ..WorkflowMailboxState::default()
133        });
134    }
135    let text = std::fs::read_to_string(&path)
136        .map_err(|error| format!("workflow state read error: {error}"))?;
137    let mut state: WorkflowMailboxState = serde_json::from_str(&text)
138        .map_err(|error| format!("workflow state parse error: {error}"))?;
139    if state.type_name.is_empty() {
140        state.type_name = "workflow_mailbox".to_string();
141    }
142    if state.workflow_id.is_empty() {
143        state.workflow_id = target.workflow_id.clone();
144    }
145    if state.generation == 0 {
146        state.generation = 1;
147    }
148    Ok(state)
149}
150
151fn save_state(target: &WorkflowTarget, state: &WorkflowMailboxState) -> Result<(), String> {
152    let path = workflow_state_path(target);
153    if let Some(parent) = path.parent() {
154        std::fs::create_dir_all(parent)
155            .map_err(|error| format!("workflow state mkdir error: {error}"))?;
156    }
157    let json = serde_json::to_string_pretty(state)
158        .map_err(|error| format!("workflow state encode error: {error}"))?;
159    std::fs::write(&path, json).map_err(|error| format!("workflow state write error: {error}"))
160}
161
162fn parse_target_json(
163    value: &serde_json::Value,
164    fallback_base_dir: Option<&Path>,
165) -> Option<WorkflowTarget> {
166    match value {
167        serde_json::Value::String(text) => Some(WorkflowTarget {
168            workflow_id: sanitize_workflow_id(text),
169            base_dir: fallback_base_dir
170                .map(Path::to_path_buf)
171                .unwrap_or_else(runtime_root_base),
172        }),
173        serde_json::Value::Object(map) => {
174            let workflow_id = map
175                .get("workflow_id")
176                .and_then(|value| value.as_str())
177                .or_else(|| map.get("workflow").and_then(|value| value.as_str()))
178                .or_else(|| {
179                    map.get("run")
180                        .and_then(|value| value.get("workflow_id"))
181                        .and_then(|value| value.as_str())
182                })
183                .or_else(|| {
184                    map.get("result")
185                        .and_then(|value| value.get("run"))
186                        .and_then(|value| value.get("workflow_id"))
187                        .and_then(|value| value.as_str())
188                })?;
189            let explicit_base = map
190                .get("base_dir")
191                .and_then(|value| value.as_str())
192                .filter(|value| !value.trim().is_empty())
193                .map(PathBuf::from);
194            let persisted_path = map
195                .get("persisted_path")
196                .and_then(|value| value.as_str())
197                .or_else(|| map.get("path").and_then(|value| value.as_str()))
198                .or_else(|| {
199                    map.get("run")
200                        .and_then(|value| value.get("persisted_path"))
201                        .and_then(|value| value.as_str())
202                })
203                .or_else(|| {
204                    map.get("result")
205                        .and_then(|value| value.get("run"))
206                        .and_then(|value| value.get("persisted_path"))
207                        .and_then(|value| value.as_str())
208                });
209            let base_dir = explicit_base
210                .or_else(|| {
211                    persisted_path
212                        .map(|path| workflow_base_dir_from_persisted_path(Path::new(path)))
213                })
214                .or_else(|| fallback_base_dir.map(Path::to_path_buf))
215                .unwrap_or_else(runtime_root_base);
216            Some(WorkflowTarget {
217                workflow_id: sanitize_workflow_id(workflow_id),
218                base_dir,
219            })
220        }
221        _ => None,
222    }
223}
224
225fn parse_target_vm(
226    value: Option<&VmValue>,
227    fallback_base_dir: Option<&Path>,
228    builtin: &str,
229) -> Result<WorkflowTarget, VmError> {
230    let value = value.ok_or_else(|| VmError::Runtime(format!("{builtin}: missing target")))?;
231    parse_target_json(&crate::llm::vm_value_to_json(value), fallback_base_dir).ok_or_else(|| {
232        VmError::Runtime(format!(
233            "{builtin}: target must be a workflow id string or dict with workflow_id/workflow"
234        ))
235    })
236}
237
238fn workflow_status_json(
239    target: &WorkflowTarget,
240    state: &WorkflowMailboxState,
241) -> serde_json::Value {
242    serde_json::json!({
243        "workflow_id": target.workflow_id,
244        "base_dir": target.base_dir.to_string_lossy(),
245        "generation": state.generation,
246        "paused": state.paused,
247        "pending_count": state.mailbox.len(),
248        "query_count": state.queries.len(),
249        "response_count": state.responses.len(),
250        "continue_as_new_count": state.continue_as_new_count,
251        "last_continue_as_new_at": state.last_continue_as_new_at,
252    })
253}
254
255fn enqueue_message(
256    target: &WorkflowTarget,
257    kind: &str,
258    name: &str,
259    payload: serde_json::Value,
260    request_id: Option<String>,
261) -> Result<serde_json::Value, String> {
262    let mut state = load_state(target)?;
263    state.next_seq += 1;
264    let message = WorkflowMessageRecord {
265        seq: state.next_seq,
266        kind: kind.to_string(),
267        name: name.to_string(),
268        request_id,
269        payload,
270        enqueued_at: now_rfc3339(),
271    };
272    state.mailbox.push_back(message.clone());
273    save_state(target, &state)?;
274    Ok(serde_json::json!({
275        "workflow_id": target.workflow_id,
276        "message": message,
277        "status": workflow_status_json(target, &state),
278    }))
279}
280
281pub fn workflow_signal_for_base(
282    base_dir: &Path,
283    workflow_id: &str,
284    name: &str,
285    payload: serde_json::Value,
286) -> Result<serde_json::Value, String> {
287    let target = WorkflowTarget {
288        workflow_id: sanitize_workflow_id(workflow_id),
289        base_dir: base_dir.to_path_buf(),
290    };
291    enqueue_message(&target, "signal", name, payload, None)
292}
293
294pub fn workflow_query_for_base(
295    base_dir: &Path,
296    workflow_id: &str,
297    name: &str,
298) -> Result<serde_json::Value, String> {
299    let target = WorkflowTarget {
300        workflow_id: sanitize_workflow_id(workflow_id),
301        base_dir: base_dir.to_path_buf(),
302    };
303    let state = load_state(&target)?;
304    Ok(state
305        .queries
306        .get(name)
307        .map(|record| record.value.clone())
308        .unwrap_or(serde_json::Value::Null))
309}
310
311pub fn workflow_publish_query_for_base(
312    base_dir: &Path,
313    workflow_id: &str,
314    name: &str,
315    value: serde_json::Value,
316) -> Result<serde_json::Value, String> {
317    let target = WorkflowTarget {
318        workflow_id: sanitize_workflow_id(workflow_id),
319        base_dir: base_dir.to_path_buf(),
320    };
321    let mut state = load_state(&target)?;
322    state.queries.insert(
323        name.to_string(),
324        WorkflowQueryRecord {
325            value,
326            published_at: now_rfc3339(),
327        },
328    );
329    save_state(&target, &state)?;
330    Ok(workflow_status_json(&target, &state))
331}
332
333pub fn workflow_pause_for_base(
334    base_dir: &Path,
335    workflow_id: &str,
336) -> Result<serde_json::Value, String> {
337    let target = WorkflowTarget {
338        workflow_id: sanitize_workflow_id(workflow_id),
339        base_dir: base_dir.to_path_buf(),
340    };
341    let mut state = load_state(&target)?;
342    state.paused = true;
343    state.next_seq += 1;
344    state.mailbox.push_back(WorkflowMessageRecord {
345        seq: state.next_seq,
346        kind: "control".to_string(),
347        name: "pause".to_string(),
348        request_id: None,
349        payload: serde_json::json!({}),
350        enqueued_at: now_rfc3339(),
351    });
352    save_state(&target, &state)?;
353    Ok(workflow_status_json(&target, &state))
354}
355
356pub fn workflow_resume_for_base(
357    base_dir: &Path,
358    workflow_id: &str,
359) -> Result<serde_json::Value, String> {
360    let target = WorkflowTarget {
361        workflow_id: sanitize_workflow_id(workflow_id),
362        base_dir: base_dir.to_path_buf(),
363    };
364    let mut state = load_state(&target)?;
365    state.paused = false;
366    state.next_seq += 1;
367    state.mailbox.push_back(WorkflowMessageRecord {
368        seq: state.next_seq,
369        kind: "control".to_string(),
370        name: "resume".to_string(),
371        request_id: None,
372        payload: serde_json::json!({}),
373        enqueued_at: now_rfc3339(),
374    });
375    save_state(&target, &state)?;
376    Ok(workflow_status_json(&target, &state))
377}
378
379pub async fn workflow_update_for_base(
380    base_dir: &Path,
381    workflow_id: &str,
382    name: &str,
383    payload: serde_json::Value,
384    timeout: StdDuration,
385) -> Result<serde_json::Value, String> {
386    let target = WorkflowTarget {
387        workflow_id: sanitize_workflow_id(workflow_id),
388        base_dir: base_dir.to_path_buf(),
389    };
390    let request_id = uuid::Uuid::now_v7().to_string();
391    enqueue_message(&target, "update", name, payload, Some(request_id.clone()))?;
392    let started = std::time::Instant::now();
393    while started.elapsed() <= timeout {
394        let state = load_state(&target)?;
395        if let Some(response) = state.responses.get(&request_id) {
396            return Ok(response.value.clone());
397        }
398        tokio::time::sleep(StdDuration::from_millis(UPDATE_POLL_INTERVAL_MS)).await;
399    }
400    Err(format!(
401        "workflow update '{name}' timed out for '{}'",
402        target.workflow_id
403    ))
404}
405
406pub fn workflow_respond_update_for_base(
407    base_dir: &Path,
408    workflow_id: &str,
409    request_id: &str,
410    name: Option<&str>,
411    value: serde_json::Value,
412) -> Result<serde_json::Value, String> {
413    let target = WorkflowTarget {
414        workflow_id: sanitize_workflow_id(workflow_id),
415        base_dir: base_dir.to_path_buf(),
416    };
417    let mut state = load_state(&target)?;
418    state.responses.insert(
419        request_id.to_string(),
420        WorkflowUpdateResponseRecord {
421            request_id: request_id.to_string(),
422            name: name.map(ToString::to_string),
423            value,
424            responded_at: now_rfc3339(),
425        },
426    );
427    save_state(&target, &state)?;
428    Ok(workflow_status_json(&target, &state))
429}
430
431pub(crate) fn register_workflow_message_builtins(vm: &mut Vm) {
432    vm.set_global(
433        "workflow",
434        VmValue::Dict(Rc::new(BTreeMap::from([
435            (
436                "signal".to_string(),
437                VmValue::BuiltinRef(Rc::from("workflow.signal")),
438            ),
439            (
440                "query".to_string(),
441                VmValue::BuiltinRef(Rc::from("workflow.query")),
442            ),
443            (
444                "update".to_string(),
445                VmValue::BuiltinRef(Rc::from("workflow.update")),
446            ),
447            (
448                "publish_query".to_string(),
449                VmValue::BuiltinRef(Rc::from("workflow.publish_query")),
450            ),
451            (
452                "receive".to_string(),
453                VmValue::BuiltinRef(Rc::from("workflow.receive")),
454            ),
455            (
456                "respond_update".to_string(),
457                VmValue::BuiltinRef(Rc::from("workflow.respond_update")),
458            ),
459            (
460                "pause".to_string(),
461                VmValue::BuiltinRef(Rc::from("workflow.pause")),
462            ),
463            (
464                "resume".to_string(),
465                VmValue::BuiltinRef(Rc::from("workflow.resume")),
466            ),
467            (
468                "status".to_string(),
469                VmValue::BuiltinRef(Rc::from("workflow.status")),
470            ),
471            (
472                "continue_as_new".to_string(),
473                VmValue::BuiltinRef(Rc::from("workflow.continue_as_new")),
474            ),
475        ]))),
476    );
477
478    vm.register_builtin("workflow.signal", |args, _out| {
479        let target = parse_target_vm(args.first(), None, "workflow.signal")?;
480        let name = args
481            .get(1)
482            .map(|value| value.display())
483            .filter(|value| !value.is_empty())
484            .ok_or_else(|| VmError::Runtime("workflow.signal: missing name".to_string()))?;
485        let payload = args
486            .get(2)
487            .map(crate::llm::vm_value_to_json)
488            .unwrap_or(serde_json::Value::Null);
489        let result =
490            enqueue_message(&target, "signal", &name, payload, None).map_err(VmError::Runtime)?;
491        Ok(crate::stdlib::json_to_vm_value(&result))
492    });
493
494    vm.register_builtin("workflow.query", |args, _out| {
495        let target = parse_target_vm(args.first(), None, "workflow.query")?;
496        let name = args
497            .get(1)
498            .map(|value| value.display())
499            .filter(|value| !value.is_empty())
500            .ok_or_else(|| VmError::Runtime("workflow.query: missing name".to_string()))?;
501        let state = load_state(&target).map_err(VmError::Runtime)?;
502        Ok(crate::stdlib::json_to_vm_value(
503            &state
504                .queries
505                .get(&name)
506                .map(|record| record.value.clone())
507                .unwrap_or(serde_json::Value::Null),
508        ))
509    });
510
511    vm.register_async_builtin("workflow.update", |args| async move {
512        let target = parse_target_vm(args.first(), None, "workflow.update")?;
513        let name = args
514            .get(1)
515            .map(|value| value.display())
516            .filter(|value| !value.is_empty())
517            .ok_or_else(|| VmError::Runtime("workflow.update: missing name".to_string()))?;
518        let payload = args
519            .get(2)
520            .map(crate::llm::vm_value_to_json)
521            .unwrap_or(serde_json::Value::Null);
522        let timeout_ms = args
523            .get(3)
524            .and_then(|value| value.as_dict())
525            .and_then(|dict| dict.get("timeout_ms"))
526            .and_then(VmValue::as_int)
527            .unwrap_or(DEFAULT_UPDATE_TIMEOUT_MS as i64)
528            .max(1) as u64;
529        let result = workflow_update_for_base(
530            &target.base_dir,
531            &target.workflow_id,
532            &name,
533            payload,
534            StdDuration::from_millis(timeout_ms),
535        )
536        .await
537        .map_err(VmError::Runtime)?;
538        Ok(crate::stdlib::json_to_vm_value(&result))
539    });
540
541    vm.register_builtin("workflow.publish_query", |args, _out| {
542        let target = parse_target_vm(args.first(), None, "workflow.publish_query")?;
543        let name = args
544            .get(1)
545            .map(|value| value.display())
546            .filter(|value| !value.is_empty())
547            .ok_or_else(|| VmError::Runtime("workflow.publish_query: missing name".to_string()))?;
548        let value = args
549            .get(2)
550            .map(crate::llm::vm_value_to_json)
551            .unwrap_or(serde_json::Value::Null);
552        let result =
553            workflow_publish_query_for_base(&target.base_dir, &target.workflow_id, &name, value)
554                .map_err(VmError::Runtime)?;
555        Ok(crate::stdlib::json_to_vm_value(&result))
556    });
557
558    vm.register_builtin("workflow.receive", |args, _out| {
559        let target = parse_target_vm(args.first(), None, "workflow.receive")?;
560        let mut state = load_state(&target).map_err(VmError::Runtime)?;
561        let Some(message) = state.mailbox.pop_front() else {
562            return Ok(VmValue::Nil);
563        };
564        save_state(&target, &state).map_err(VmError::Runtime)?;
565        Ok(crate::stdlib::json_to_vm_value(&serde_json::json!({
566            "workflow_id": target.workflow_id,
567            "seq": message.seq,
568            "kind": message.kind,
569            "name": message.name,
570            "request_id": message.request_id,
571            "payload": message.payload,
572            "enqueued_at": message.enqueued_at,
573        })))
574    });
575
576    vm.register_builtin("workflow.respond_update", |args, _out| {
577        let target = parse_target_vm(args.first(), None, "workflow.respond_update")?;
578        let request_id = args
579            .get(1)
580            .map(|value| value.display())
581            .filter(|value| !value.is_empty())
582            .ok_or_else(|| {
583                VmError::Runtime("workflow.respond_update: missing request id".to_string())
584            })?;
585        let value = args
586            .get(2)
587            .map(crate::llm::vm_value_to_json)
588            .unwrap_or(serde_json::Value::Null);
589        let name = args
590            .get(3)
591            .map(|value| value.display())
592            .filter(|value| !value.is_empty());
593        let result = workflow_respond_update_for_base(
594            &target.base_dir,
595            &target.workflow_id,
596            &request_id,
597            name.as_deref(),
598            value,
599        )
600        .map_err(VmError::Runtime)?;
601        Ok(crate::stdlib::json_to_vm_value(&result))
602    });
603
604    vm.register_builtin("workflow.pause", |args, _out| {
605        let target = parse_target_vm(args.first(), None, "workflow.pause")?;
606        let result = workflow_pause_for_base(&target.base_dir, &target.workflow_id)
607            .map_err(VmError::Runtime)?;
608        Ok(crate::stdlib::json_to_vm_value(&result))
609    });
610
611    vm.register_builtin("workflow.resume", |args, _out| {
612        let target = parse_target_vm(args.first(), None, "workflow.resume")?;
613        let result = workflow_resume_for_base(&target.base_dir, &target.workflow_id)
614            .map_err(VmError::Runtime)?;
615        Ok(crate::stdlib::json_to_vm_value(&result))
616    });
617
618    vm.register_builtin("workflow.status", |args, _out| {
619        let target = parse_target_vm(args.first(), None, "workflow.status")?;
620        let state = load_state(&target).map_err(VmError::Runtime)?;
621        Ok(crate::stdlib::json_to_vm_value(&workflow_status_json(
622            &target, &state,
623        )))
624    });
625
626    vm.register_builtin("workflow.continue_as_new", |args, _out| {
627        let target = parse_target_vm(args.first(), None, "workflow.continue_as_new")?;
628        let mut state = load_state(&target).map_err(VmError::Runtime)?;
629        state.generation += 1;
630        state.continue_as_new_count += 1;
631        state.last_continue_as_new_at = Some(now_rfc3339());
632        state.responses.clear();
633        save_state(&target, &state).map_err(VmError::Runtime)?;
634        Ok(crate::stdlib::json_to_vm_value(&workflow_status_json(
635            &target, &state,
636        )))
637    });
638
639    vm.register_builtin("continue_as_new", |args, _out| {
640        let target = parse_target_vm(args.first(), None, "continue_as_new")?;
641        let mut state = load_state(&target).map_err(VmError::Runtime)?;
642        state.generation += 1;
643        state.continue_as_new_count += 1;
644        state.last_continue_as_new_at = Some(now_rfc3339());
645        state.responses.clear();
646        save_state(&target, &state).map_err(VmError::Runtime)?;
647        Ok(crate::stdlib::json_to_vm_value(&workflow_status_json(
648            &target, &state,
649        )))
650    });
651}
652
653#[cfg(test)]
654mod tests {
655    use super::*;
656
657    #[tokio::test]
658    async fn update_round_trip_waits_for_response() {
659        let dir = tempfile::tempdir().expect("tempdir");
660        let workflow_id = "wf-update";
661        let base_dir = dir.path().to_path_buf();
662        let base_dir_clone = base_dir.clone();
663        let task = tokio::spawn(async move {
664            workflow_update_for_base(
665                &base_dir_clone,
666                workflow_id,
667                "adjust_budget",
668                serde_json::json!({"max_usd": 10}),
669                StdDuration::from_millis(500),
670            )
671            .await
672        });
673
674        tokio::time::sleep(StdDuration::from_millis(50)).await;
675        let target = WorkflowTarget {
676            workflow_id: workflow_id.to_string(),
677            base_dir: base_dir.clone(),
678        };
679        let mut state = load_state(&target).expect("load state");
680        let message = state.mailbox.pop_front().expect("queued update");
681        assert_eq!(message.kind, "update");
682        state.responses.insert(
683            message.request_id.clone().expect("request id"),
684            WorkflowUpdateResponseRecord {
685                request_id: message.request_id.expect("request id"),
686                name: Some(message.name.clone()),
687                value: serde_json::json!({"ok": true}),
688                responded_at: now_rfc3339(),
689            },
690        );
691        save_state(&target, &state).expect("save response");
692
693        let result = task.await.expect("join").expect("update result");
694        assert_eq!(result, serde_json::json!({"ok": true}));
695    }
696
697    #[test]
698    fn persisted_path_drives_target_base_dir() {
699        let base = parse_target_json(
700            &serde_json::json!({
701                "workflow_id": "wf",
702                "persisted_path": "/tmp/demo/.harn-runs/run.json"
703            }),
704            None,
705        )
706        .expect("target");
707        assert_eq!(base.workflow_id, "wf");
708        assert_eq!(base.base_dir, PathBuf::from("/tmp/demo"));
709    }
710}