1use std::collections::{BTreeMap, VecDeque};
2use std::path::{Path, PathBuf};
3use std::rc::Rc;
4use std::time::Duration as StdDuration;
5
6use serde::{Deserialize, Serialize};
7
8use crate::stdlib::process::runtime_root_base;
9use crate::value::{VmError, VmValue};
10use crate::vm::Vm;
11
12const DEFAULT_UPDATE_TIMEOUT_MS: u64 = 30_000;
13const UPDATE_POLL_INTERVAL_MS: u64 = 25;
14
15#[derive(Clone, Debug, Serialize, Deserialize)]
16pub struct WorkflowMessageRecord {
17 pub seq: u64,
18 pub kind: String,
19 pub name: String,
20 #[serde(skip_serializing_if = "Option::is_none")]
21 pub request_id: Option<String>,
22 pub payload: serde_json::Value,
23 pub enqueued_at: String,
24}
25
26#[derive(Clone, Debug, Serialize, Deserialize)]
27pub struct WorkflowQueryRecord {
28 pub value: serde_json::Value,
29 pub published_at: String,
30}
31
32#[derive(Clone, Debug, Serialize, Deserialize)]
33pub struct WorkflowUpdateResponseRecord {
34 pub request_id: String,
35 #[serde(skip_serializing_if = "Option::is_none")]
36 pub name: Option<String>,
37 pub value: serde_json::Value,
38 pub responded_at: String,
39}
40
41#[derive(Clone, Debug, Serialize, Deserialize)]
42pub struct WorkflowMailboxState {
43 #[serde(rename = "_type")]
44 pub type_name: String,
45 pub workflow_id: String,
46 #[serde(default = "default_generation")]
47 pub generation: u64,
48 #[serde(default)]
49 pub continue_as_new_count: u64,
50 #[serde(skip_serializing_if = "Option::is_none")]
51 pub last_continue_as_new_at: Option<String>,
52 #[serde(default)]
53 pub paused: bool,
54 #[serde(default)]
55 pub next_seq: u64,
56 #[serde(default)]
57 pub mailbox: VecDeque<WorkflowMessageRecord>,
58 #[serde(default)]
59 pub queries: BTreeMap<String, WorkflowQueryRecord>,
60 #[serde(default)]
61 pub responses: BTreeMap<String, WorkflowUpdateResponseRecord>,
62}
63
64impl Default for WorkflowMailboxState {
65 fn default() -> Self {
66 Self {
67 type_name: "workflow_mailbox".to_string(),
68 workflow_id: String::new(),
69 generation: default_generation(),
70 continue_as_new_count: 0,
71 last_continue_as_new_at: None,
72 paused: false,
73 next_seq: 0,
74 mailbox: VecDeque::new(),
75 queries: BTreeMap::new(),
76 responses: BTreeMap::new(),
77 }
78 }
79}
80
81fn default_generation() -> u64 {
82 1
83}
84
85#[derive(Clone, Debug, PartialEq, Eq)]
86struct WorkflowTarget {
87 workflow_id: String,
88 base_dir: PathBuf,
89}
90
91fn sanitize_workflow_id(raw: &str) -> String {
92 let trimmed = raw.trim();
93 let base = Path::new(trimmed)
94 .file_name()
95 .and_then(|value| value.to_str())
96 .unwrap_or(trimmed);
97 if base.is_empty() || base == "." || base == ".." {
98 "workflow".to_string()
99 } else {
100 base.to_string()
101 }
102}
103
104fn workflow_base_dir_from_persisted_path(path: &Path) -> PathBuf {
105 let parent = path.parent().unwrap_or_else(|| Path::new("."));
106 if parent.file_name().and_then(|value| value.to_str()) == Some(".harn-runs") {
107 parent.parent().unwrap_or(parent).to_path_buf()
108 } else {
109 parent.to_path_buf()
110 }
111}
112
113fn workflow_target_root(target: &WorkflowTarget) -> PathBuf {
114 crate::runtime_paths::workflow_dir(&target.base_dir).join(&target.workflow_id)
115}
116
117fn workflow_state_path(target: &WorkflowTarget) -> PathBuf {
118 workflow_target_root(target).join("state.json")
119}
120
121fn now_rfc3339() -> String {
122 time::OffsetDateTime::now_utc()
123 .format(&time::format_description::well_known::Rfc3339)
124 .unwrap_or_else(|_| uuid::Uuid::now_v7().to_string())
125}
126
127fn load_state(target: &WorkflowTarget) -> Result<WorkflowMailboxState, String> {
128 let path = workflow_state_path(target);
129 if !path.exists() {
130 return Ok(WorkflowMailboxState {
131 workflow_id: target.workflow_id.clone(),
132 ..WorkflowMailboxState::default()
133 });
134 }
135 let text = std::fs::read_to_string(&path)
136 .map_err(|error| format!("workflow state read error: {error}"))?;
137 let mut state: WorkflowMailboxState = serde_json::from_str(&text)
138 .map_err(|error| format!("workflow state parse error: {error}"))?;
139 if state.type_name.is_empty() {
140 state.type_name = "workflow_mailbox".to_string();
141 }
142 if state.workflow_id.is_empty() {
143 state.workflow_id = target.workflow_id.clone();
144 }
145 if state.generation == 0 {
146 state.generation = 1;
147 }
148 Ok(state)
149}
150
151fn save_state(target: &WorkflowTarget, state: &WorkflowMailboxState) -> Result<(), String> {
152 let path = workflow_state_path(target);
153 if let Some(parent) = path.parent() {
154 std::fs::create_dir_all(parent)
155 .map_err(|error| format!("workflow state mkdir error: {error}"))?;
156 }
157 let json = serde_json::to_string_pretty(state)
158 .map_err(|error| format!("workflow state encode error: {error}"))?;
159 std::fs::write(&path, json).map_err(|error| format!("workflow state write error: {error}"))
160}
161
162fn parse_target_json(
163 value: &serde_json::Value,
164 fallback_base_dir: Option<&Path>,
165) -> Option<WorkflowTarget> {
166 match value {
167 serde_json::Value::String(text) => Some(WorkflowTarget {
168 workflow_id: sanitize_workflow_id(text),
169 base_dir: fallback_base_dir
170 .map(Path::to_path_buf)
171 .unwrap_or_else(runtime_root_base),
172 }),
173 serde_json::Value::Object(map) => {
174 let workflow_id = map
175 .get("workflow_id")
176 .and_then(|value| value.as_str())
177 .or_else(|| map.get("workflow").and_then(|value| value.as_str()))
178 .or_else(|| {
179 map.get("run")
180 .and_then(|value| value.get("workflow_id"))
181 .and_then(|value| value.as_str())
182 })
183 .or_else(|| {
184 map.get("result")
185 .and_then(|value| value.get("run"))
186 .and_then(|value| value.get("workflow_id"))
187 .and_then(|value| value.as_str())
188 })?;
189 let explicit_base = map
190 .get("base_dir")
191 .and_then(|value| value.as_str())
192 .filter(|value| !value.trim().is_empty())
193 .map(PathBuf::from);
194 let persisted_path = map
195 .get("persisted_path")
196 .and_then(|value| value.as_str())
197 .or_else(|| map.get("path").and_then(|value| value.as_str()))
198 .or_else(|| {
199 map.get("run")
200 .and_then(|value| value.get("persisted_path"))
201 .and_then(|value| value.as_str())
202 })
203 .or_else(|| {
204 map.get("result")
205 .and_then(|value| value.get("run"))
206 .and_then(|value| value.get("persisted_path"))
207 .and_then(|value| value.as_str())
208 });
209 let base_dir = explicit_base
210 .or_else(|| {
211 persisted_path
212 .map(|path| workflow_base_dir_from_persisted_path(Path::new(path)))
213 })
214 .or_else(|| fallback_base_dir.map(Path::to_path_buf))
215 .unwrap_or_else(runtime_root_base);
216 Some(WorkflowTarget {
217 workflow_id: sanitize_workflow_id(workflow_id),
218 base_dir,
219 })
220 }
221 _ => None,
222 }
223}
224
225fn parse_target_vm(
226 value: Option<&VmValue>,
227 fallback_base_dir: Option<&Path>,
228 builtin: &str,
229) -> Result<WorkflowTarget, VmError> {
230 let value = value.ok_or_else(|| VmError::Runtime(format!("{builtin}: missing target")))?;
231 parse_target_json(&crate::llm::vm_value_to_json(value), fallback_base_dir).ok_or_else(|| {
232 VmError::Runtime(format!(
233 "{builtin}: target must be a workflow id string or dict with workflow_id/workflow"
234 ))
235 })
236}
237
238fn workflow_status_json(
239 target: &WorkflowTarget,
240 state: &WorkflowMailboxState,
241) -> serde_json::Value {
242 serde_json::json!({
243 "workflow_id": target.workflow_id,
244 "base_dir": target.base_dir.to_string_lossy(),
245 "generation": state.generation,
246 "paused": state.paused,
247 "pending_count": state.mailbox.len(),
248 "query_count": state.queries.len(),
249 "response_count": state.responses.len(),
250 "continue_as_new_count": state.continue_as_new_count,
251 "last_continue_as_new_at": state.last_continue_as_new_at,
252 })
253}
254
255fn enqueue_message(
256 target: &WorkflowTarget,
257 kind: &str,
258 name: &str,
259 payload: serde_json::Value,
260 request_id: Option<String>,
261) -> Result<serde_json::Value, String> {
262 let mut state = load_state(target)?;
263 state.next_seq += 1;
264 let message = WorkflowMessageRecord {
265 seq: state.next_seq,
266 kind: kind.to_string(),
267 name: name.to_string(),
268 request_id,
269 payload,
270 enqueued_at: now_rfc3339(),
271 };
272 state.mailbox.push_back(message.clone());
273 save_state(target, &state)?;
274 Ok(serde_json::json!({
275 "workflow_id": target.workflow_id,
276 "message": message,
277 "status": workflow_status_json(target, &state),
278 }))
279}
280
281pub fn workflow_signal_for_base(
282 base_dir: &Path,
283 workflow_id: &str,
284 name: &str,
285 payload: serde_json::Value,
286) -> Result<serde_json::Value, String> {
287 let target = WorkflowTarget {
288 workflow_id: sanitize_workflow_id(workflow_id),
289 base_dir: base_dir.to_path_buf(),
290 };
291 enqueue_message(&target, "signal", name, payload, None)
292}
293
294pub fn workflow_query_for_base(
295 base_dir: &Path,
296 workflow_id: &str,
297 name: &str,
298) -> Result<serde_json::Value, String> {
299 let target = WorkflowTarget {
300 workflow_id: sanitize_workflow_id(workflow_id),
301 base_dir: base_dir.to_path_buf(),
302 };
303 let state = load_state(&target)?;
304 Ok(state
305 .queries
306 .get(name)
307 .map(|record| record.value.clone())
308 .unwrap_or(serde_json::Value::Null))
309}
310
311pub fn workflow_publish_query_for_base(
312 base_dir: &Path,
313 workflow_id: &str,
314 name: &str,
315 value: serde_json::Value,
316) -> Result<serde_json::Value, String> {
317 let target = WorkflowTarget {
318 workflow_id: sanitize_workflow_id(workflow_id),
319 base_dir: base_dir.to_path_buf(),
320 };
321 let mut state = load_state(&target)?;
322 state.queries.insert(
323 name.to_string(),
324 WorkflowQueryRecord {
325 value,
326 published_at: now_rfc3339(),
327 },
328 );
329 save_state(&target, &state)?;
330 Ok(workflow_status_json(&target, &state))
331}
332
333pub fn workflow_pause_for_base(
334 base_dir: &Path,
335 workflow_id: &str,
336) -> Result<serde_json::Value, String> {
337 let target = WorkflowTarget {
338 workflow_id: sanitize_workflow_id(workflow_id),
339 base_dir: base_dir.to_path_buf(),
340 };
341 let mut state = load_state(&target)?;
342 state.paused = true;
343 state.next_seq += 1;
344 state.mailbox.push_back(WorkflowMessageRecord {
345 seq: state.next_seq,
346 kind: "control".to_string(),
347 name: "pause".to_string(),
348 request_id: None,
349 payload: serde_json::json!({}),
350 enqueued_at: now_rfc3339(),
351 });
352 save_state(&target, &state)?;
353 Ok(workflow_status_json(&target, &state))
354}
355
356pub fn workflow_resume_for_base(
357 base_dir: &Path,
358 workflow_id: &str,
359) -> Result<serde_json::Value, String> {
360 let target = WorkflowTarget {
361 workflow_id: sanitize_workflow_id(workflow_id),
362 base_dir: base_dir.to_path_buf(),
363 };
364 let mut state = load_state(&target)?;
365 state.paused = false;
366 state.next_seq += 1;
367 state.mailbox.push_back(WorkflowMessageRecord {
368 seq: state.next_seq,
369 kind: "control".to_string(),
370 name: "resume".to_string(),
371 request_id: None,
372 payload: serde_json::json!({}),
373 enqueued_at: now_rfc3339(),
374 });
375 save_state(&target, &state)?;
376 Ok(workflow_status_json(&target, &state))
377}
378
379pub async fn workflow_update_for_base(
380 base_dir: &Path,
381 workflow_id: &str,
382 name: &str,
383 payload: serde_json::Value,
384 timeout: StdDuration,
385) -> Result<serde_json::Value, String> {
386 let target = WorkflowTarget {
387 workflow_id: sanitize_workflow_id(workflow_id),
388 base_dir: base_dir.to_path_buf(),
389 };
390 let request_id = uuid::Uuid::now_v7().to_string();
391 enqueue_message(&target, "update", name, payload, Some(request_id.clone()))?;
392 let started = std::time::Instant::now();
393 while started.elapsed() <= timeout {
394 let state = load_state(&target)?;
395 if let Some(response) = state.responses.get(&request_id) {
396 return Ok(response.value.clone());
397 }
398 tokio::time::sleep(StdDuration::from_millis(UPDATE_POLL_INTERVAL_MS)).await;
399 }
400 Err(format!(
401 "workflow update '{name}' timed out for '{}'",
402 target.workflow_id
403 ))
404}
405
406pub fn workflow_respond_update_for_base(
407 base_dir: &Path,
408 workflow_id: &str,
409 request_id: &str,
410 name: Option<&str>,
411 value: serde_json::Value,
412) -> Result<serde_json::Value, String> {
413 let target = WorkflowTarget {
414 workflow_id: sanitize_workflow_id(workflow_id),
415 base_dir: base_dir.to_path_buf(),
416 };
417 let mut state = load_state(&target)?;
418 state.responses.insert(
419 request_id.to_string(),
420 WorkflowUpdateResponseRecord {
421 request_id: request_id.to_string(),
422 name: name.map(ToString::to_string),
423 value,
424 responded_at: now_rfc3339(),
425 },
426 );
427 save_state(&target, &state)?;
428 Ok(workflow_status_json(&target, &state))
429}
430
431pub(crate) fn register_workflow_message_builtins(vm: &mut Vm) {
432 vm.set_global(
433 "workflow",
434 VmValue::Dict(Rc::new(BTreeMap::from([
435 (
436 "signal".to_string(),
437 VmValue::BuiltinRef(Rc::from("workflow.signal")),
438 ),
439 (
440 "query".to_string(),
441 VmValue::BuiltinRef(Rc::from("workflow.query")),
442 ),
443 (
444 "update".to_string(),
445 VmValue::BuiltinRef(Rc::from("workflow.update")),
446 ),
447 (
448 "publish_query".to_string(),
449 VmValue::BuiltinRef(Rc::from("workflow.publish_query")),
450 ),
451 (
452 "receive".to_string(),
453 VmValue::BuiltinRef(Rc::from("workflow.receive")),
454 ),
455 (
456 "respond_update".to_string(),
457 VmValue::BuiltinRef(Rc::from("workflow.respond_update")),
458 ),
459 (
460 "pause".to_string(),
461 VmValue::BuiltinRef(Rc::from("workflow.pause")),
462 ),
463 (
464 "resume".to_string(),
465 VmValue::BuiltinRef(Rc::from("workflow.resume")),
466 ),
467 (
468 "status".to_string(),
469 VmValue::BuiltinRef(Rc::from("workflow.status")),
470 ),
471 (
472 "continue_as_new".to_string(),
473 VmValue::BuiltinRef(Rc::from("workflow.continue_as_new")),
474 ),
475 ]))),
476 );
477
478 vm.register_builtin("workflow.signal", |args, _out| {
479 let target = parse_target_vm(args.first(), None, "workflow.signal")?;
480 let name = args
481 .get(1)
482 .map(|value| value.display())
483 .filter(|value| !value.is_empty())
484 .ok_or_else(|| VmError::Runtime("workflow.signal: missing name".to_string()))?;
485 let payload = args
486 .get(2)
487 .map(crate::llm::vm_value_to_json)
488 .unwrap_or(serde_json::Value::Null);
489 let result =
490 enqueue_message(&target, "signal", &name, payload, None).map_err(VmError::Runtime)?;
491 Ok(crate::stdlib::json_to_vm_value(&result))
492 });
493
494 vm.register_builtin("workflow.query", |args, _out| {
495 let target = parse_target_vm(args.first(), None, "workflow.query")?;
496 let name = args
497 .get(1)
498 .map(|value| value.display())
499 .filter(|value| !value.is_empty())
500 .ok_or_else(|| VmError::Runtime("workflow.query: missing name".to_string()))?;
501 let state = load_state(&target).map_err(VmError::Runtime)?;
502 Ok(crate::stdlib::json_to_vm_value(
503 &state
504 .queries
505 .get(&name)
506 .map(|record| record.value.clone())
507 .unwrap_or(serde_json::Value::Null),
508 ))
509 });
510
511 vm.register_async_builtin("workflow.update", |args| async move {
512 let target = parse_target_vm(args.first(), None, "workflow.update")?;
513 let name = args
514 .get(1)
515 .map(|value| value.display())
516 .filter(|value| !value.is_empty())
517 .ok_or_else(|| VmError::Runtime("workflow.update: missing name".to_string()))?;
518 let payload = args
519 .get(2)
520 .map(crate::llm::vm_value_to_json)
521 .unwrap_or(serde_json::Value::Null);
522 let timeout_ms = args
523 .get(3)
524 .and_then(|value| value.as_dict())
525 .and_then(|dict| dict.get("timeout_ms"))
526 .and_then(VmValue::as_int)
527 .unwrap_or(DEFAULT_UPDATE_TIMEOUT_MS as i64)
528 .max(1) as u64;
529 let result = workflow_update_for_base(
530 &target.base_dir,
531 &target.workflow_id,
532 &name,
533 payload,
534 StdDuration::from_millis(timeout_ms),
535 )
536 .await
537 .map_err(VmError::Runtime)?;
538 Ok(crate::stdlib::json_to_vm_value(&result))
539 });
540
541 vm.register_builtin("workflow.publish_query", |args, _out| {
542 let target = parse_target_vm(args.first(), None, "workflow.publish_query")?;
543 let name = args
544 .get(1)
545 .map(|value| value.display())
546 .filter(|value| !value.is_empty())
547 .ok_or_else(|| VmError::Runtime("workflow.publish_query: missing name".to_string()))?;
548 let value = args
549 .get(2)
550 .map(crate::llm::vm_value_to_json)
551 .unwrap_or(serde_json::Value::Null);
552 let result =
553 workflow_publish_query_for_base(&target.base_dir, &target.workflow_id, &name, value)
554 .map_err(VmError::Runtime)?;
555 Ok(crate::stdlib::json_to_vm_value(&result))
556 });
557
558 vm.register_builtin("workflow.receive", |args, _out| {
559 let target = parse_target_vm(args.first(), None, "workflow.receive")?;
560 let mut state = load_state(&target).map_err(VmError::Runtime)?;
561 let Some(message) = state.mailbox.pop_front() else {
562 return Ok(VmValue::Nil);
563 };
564 save_state(&target, &state).map_err(VmError::Runtime)?;
565 Ok(crate::stdlib::json_to_vm_value(&serde_json::json!({
566 "workflow_id": target.workflow_id,
567 "seq": message.seq,
568 "kind": message.kind,
569 "name": message.name,
570 "request_id": message.request_id,
571 "payload": message.payload,
572 "enqueued_at": message.enqueued_at,
573 })))
574 });
575
576 vm.register_builtin("workflow.respond_update", |args, _out| {
577 let target = parse_target_vm(args.first(), None, "workflow.respond_update")?;
578 let request_id = args
579 .get(1)
580 .map(|value| value.display())
581 .filter(|value| !value.is_empty())
582 .ok_or_else(|| {
583 VmError::Runtime("workflow.respond_update: missing request id".to_string())
584 })?;
585 let value = args
586 .get(2)
587 .map(crate::llm::vm_value_to_json)
588 .unwrap_or(serde_json::Value::Null);
589 let name = args
590 .get(3)
591 .map(|value| value.display())
592 .filter(|value| !value.is_empty());
593 let result = workflow_respond_update_for_base(
594 &target.base_dir,
595 &target.workflow_id,
596 &request_id,
597 name.as_deref(),
598 value,
599 )
600 .map_err(VmError::Runtime)?;
601 Ok(crate::stdlib::json_to_vm_value(&result))
602 });
603
604 vm.register_builtin("workflow.pause", |args, _out| {
605 let target = parse_target_vm(args.first(), None, "workflow.pause")?;
606 let result = workflow_pause_for_base(&target.base_dir, &target.workflow_id)
607 .map_err(VmError::Runtime)?;
608 Ok(crate::stdlib::json_to_vm_value(&result))
609 });
610
611 vm.register_builtin("workflow.resume", |args, _out| {
612 let target = parse_target_vm(args.first(), None, "workflow.resume")?;
613 let result = workflow_resume_for_base(&target.base_dir, &target.workflow_id)
614 .map_err(VmError::Runtime)?;
615 Ok(crate::stdlib::json_to_vm_value(&result))
616 });
617
618 vm.register_builtin("workflow.status", |args, _out| {
619 let target = parse_target_vm(args.first(), None, "workflow.status")?;
620 let state = load_state(&target).map_err(VmError::Runtime)?;
621 Ok(crate::stdlib::json_to_vm_value(&workflow_status_json(
622 &target, &state,
623 )))
624 });
625
626 vm.register_builtin("workflow.continue_as_new", |args, _out| {
627 let target = parse_target_vm(args.first(), None, "workflow.continue_as_new")?;
628 let mut state = load_state(&target).map_err(VmError::Runtime)?;
629 state.generation += 1;
630 state.continue_as_new_count += 1;
631 state.last_continue_as_new_at = Some(now_rfc3339());
632 state.responses.clear();
633 save_state(&target, &state).map_err(VmError::Runtime)?;
634 Ok(crate::stdlib::json_to_vm_value(&workflow_status_json(
635 &target, &state,
636 )))
637 });
638
639 vm.register_builtin("continue_as_new", |args, _out| {
640 let target = parse_target_vm(args.first(), None, "continue_as_new")?;
641 let mut state = load_state(&target).map_err(VmError::Runtime)?;
642 state.generation += 1;
643 state.continue_as_new_count += 1;
644 state.last_continue_as_new_at = Some(now_rfc3339());
645 state.responses.clear();
646 save_state(&target, &state).map_err(VmError::Runtime)?;
647 Ok(crate::stdlib::json_to_vm_value(&workflow_status_json(
648 &target, &state,
649 )))
650 });
651}
652
653#[cfg(test)]
654mod tests {
655 use super::*;
656
657 #[tokio::test]
658 async fn update_round_trip_waits_for_response() {
659 let dir = tempfile::tempdir().expect("tempdir");
660 let workflow_id = "wf-update";
661 let base_dir = dir.path().to_path_buf();
662 let base_dir_clone = base_dir.clone();
663 let task = tokio::spawn(async move {
664 workflow_update_for_base(
665 &base_dir_clone,
666 workflow_id,
667 "adjust_budget",
668 serde_json::json!({"max_usd": 10}),
669 StdDuration::from_millis(500),
670 )
671 .await
672 });
673
674 tokio::time::sleep(StdDuration::from_millis(50)).await;
675 let target = WorkflowTarget {
676 workflow_id: workflow_id.to_string(),
677 base_dir: base_dir.clone(),
678 };
679 let mut state = load_state(&target).expect("load state");
680 let message = state.mailbox.pop_front().expect("queued update");
681 assert_eq!(message.kind, "update");
682 state.responses.insert(
683 message.request_id.clone().expect("request id"),
684 WorkflowUpdateResponseRecord {
685 request_id: message.request_id.expect("request id"),
686 name: Some(message.name.clone()),
687 value: serde_json::json!({"ok": true}),
688 responded_at: now_rfc3339(),
689 },
690 );
691 save_state(&target, &state).expect("save response");
692
693 let result = task.await.expect("join").expect("update result");
694 assert_eq!(result, serde_json::json!({"ok": true}));
695 }
696
697 #[test]
698 fn persisted_path_drives_target_base_dir() {
699 let base = parse_target_json(
700 &serde_json::json!({
701 "workflow_id": "wf",
702 "persisted_path": "/tmp/demo/.harn-runs/run.json"
703 }),
704 None,
705 )
706 .expect("target");
707 assert_eq!(base.workflow_id, "wf");
708 assert_eq!(base.base_dir, PathBuf::from("/tmp/demo"));
709 }
710}