1use std::collections::{BTreeMap, VecDeque};
2use std::path::{Path, PathBuf};
3use std::rc::Rc;
4use std::time::Duration as StdDuration;
5
6use serde::{Deserialize, Serialize};
7
8use crate::stdlib::process::runtime_root_base;
9use crate::value::{VmError, VmValue};
10use crate::vm::Vm;
11
12const DEFAULT_UPDATE_TIMEOUT_MS: u64 = 30_000;
13const UPDATE_POLL_INTERVAL_MS: u64 = 25;
14
15#[derive(Clone, Debug, Serialize, Deserialize)]
16pub struct WorkflowMessageRecord {
17 pub seq: u64,
18 pub kind: String,
19 pub name: String,
20 #[serde(skip_serializing_if = "Option::is_none")]
21 pub request_id: Option<String>,
22 pub payload: serde_json::Value,
23 pub enqueued_at: String,
24}
25
26#[derive(Clone, Debug, Serialize, Deserialize)]
27pub struct WorkflowQueryRecord {
28 pub value: serde_json::Value,
29 pub published_at: String,
30}
31
32#[derive(Clone, Debug, Serialize, Deserialize)]
33pub struct WorkflowUpdateResponseRecord {
34 pub request_id: String,
35 #[serde(skip_serializing_if = "Option::is_none")]
36 pub name: Option<String>,
37 pub value: serde_json::Value,
38 pub responded_at: String,
39}
40
41#[derive(Clone, Debug, Serialize, Deserialize)]
42pub struct WorkflowMailboxState {
43 #[serde(rename = "_type")]
44 pub type_name: String,
45 pub workflow_id: String,
46 #[serde(default = "default_generation")]
47 pub generation: u64,
48 #[serde(default)]
49 pub continue_as_new_count: u64,
50 #[serde(skip_serializing_if = "Option::is_none")]
51 pub last_continue_as_new_at: Option<String>,
52 #[serde(default)]
53 pub paused: bool,
54 #[serde(default)]
55 pub next_seq: u64,
56 #[serde(default)]
57 pub mailbox: VecDeque<WorkflowMessageRecord>,
58 #[serde(default)]
59 pub queries: BTreeMap<String, WorkflowQueryRecord>,
60 #[serde(default)]
61 pub responses: BTreeMap<String, WorkflowUpdateResponseRecord>,
62}
63
64impl Default for WorkflowMailboxState {
65 fn default() -> Self {
66 Self {
67 type_name: "workflow_mailbox".to_string(),
68 workflow_id: String::new(),
69 generation: default_generation(),
70 continue_as_new_count: 0,
71 last_continue_as_new_at: None,
72 paused: false,
73 next_seq: 0,
74 mailbox: VecDeque::new(),
75 queries: BTreeMap::new(),
76 responses: BTreeMap::new(),
77 }
78 }
79}
80
81fn default_generation() -> u64 {
82 1
83}
84
85#[derive(Clone, Debug, PartialEq, Eq)]
86struct WorkflowTarget {
87 workflow_id: String,
88 base_dir: PathBuf,
89}
90
91fn sanitize_workflow_id(raw: &str) -> String {
92 let trimmed = raw.trim();
93 let base = Path::new(trimmed)
94 .file_name()
95 .and_then(|value| value.to_str())
96 .unwrap_or(trimmed);
97 if base.is_empty() || base == "." || base == ".." {
98 "workflow".to_string()
99 } else {
100 base.to_string()
101 }
102}
103
104fn workflow_base_dir_from_persisted_path(path: &Path) -> PathBuf {
105 let parent = path.parent().unwrap_or_else(|| Path::new("."));
106 if parent.file_name().and_then(|value| value.to_str()) == Some(".harn-runs") {
107 parent.parent().unwrap_or(parent).to_path_buf()
108 } else {
109 parent.to_path_buf()
110 }
111}
112
113fn workflow_target_root(target: &WorkflowTarget) -> PathBuf {
114 crate::runtime_paths::workflow_dir(&target.base_dir).join(&target.workflow_id)
115}
116
117fn workflow_state_path(target: &WorkflowTarget) -> PathBuf {
118 workflow_target_root(target).join("state.json")
119}
120
121fn now_rfc3339() -> String {
122 time::OffsetDateTime::now_utc()
123 .format(&time::format_description::well_known::Rfc3339)
124 .unwrap_or_else(|_| uuid::Uuid::now_v7().to_string())
125}
126
127fn load_state(target: &WorkflowTarget) -> Result<WorkflowMailboxState, String> {
128 let path = workflow_state_path(target);
129 if !path.exists() {
130 return Ok(WorkflowMailboxState {
131 workflow_id: target.workflow_id.clone(),
132 ..WorkflowMailboxState::default()
133 });
134 }
135 let text = std::fs::read_to_string(&path)
136 .map_err(|error| format!("workflow state read error: {error}"))?;
137 let mut state: WorkflowMailboxState = serde_json::from_str(&text)
138 .map_err(|error| format!("workflow state parse error: {error}"))?;
139 if state.type_name.is_empty() {
140 state.type_name = "workflow_mailbox".to_string();
141 }
142 if state.workflow_id.is_empty() {
143 state.workflow_id = target.workflow_id.clone();
144 }
145 if state.generation == 0 {
146 state.generation = 1;
147 }
148 Ok(state)
149}
150
151fn save_state(target: &WorkflowTarget, state: &WorkflowMailboxState) -> Result<(), String> {
152 let path = workflow_state_path(target);
153 let json = serde_json::to_string_pretty(state)
154 .map_err(|error| format!("workflow state encode error: {error}"))?;
155 crate::atomic_io::atomic_write(&path, json.as_bytes())
156 .map_err(|error| format!("workflow state write error: {error}"))
157}
158
159fn parse_target_json(
160 value: &serde_json::Value,
161 fallback_base_dir: Option<&Path>,
162) -> Option<WorkflowTarget> {
163 match value {
164 serde_json::Value::String(text) => Some(WorkflowTarget {
165 workflow_id: sanitize_workflow_id(text),
166 base_dir: fallback_base_dir
167 .map(Path::to_path_buf)
168 .unwrap_or_else(runtime_root_base),
169 }),
170 serde_json::Value::Object(map) => {
171 let workflow_id = map
172 .get("workflow_id")
173 .and_then(|value| value.as_str())
174 .or_else(|| map.get("workflow").and_then(|value| value.as_str()))
175 .or_else(|| {
176 map.get("run")
177 .and_then(|value| value.get("workflow_id"))
178 .and_then(|value| value.as_str())
179 })
180 .or_else(|| {
181 map.get("result")
182 .and_then(|value| value.get("run"))
183 .and_then(|value| value.get("workflow_id"))
184 .and_then(|value| value.as_str())
185 })?;
186 let explicit_base = map
187 .get("base_dir")
188 .and_then(|value| value.as_str())
189 .filter(|value| !value.trim().is_empty())
190 .map(PathBuf::from);
191 let persisted_path = map
192 .get("persisted_path")
193 .and_then(|value| value.as_str())
194 .or_else(|| map.get("path").and_then(|value| value.as_str()))
195 .or_else(|| {
196 map.get("run")
197 .and_then(|value| value.get("persisted_path"))
198 .and_then(|value| value.as_str())
199 })
200 .or_else(|| {
201 map.get("result")
202 .and_then(|value| value.get("run"))
203 .and_then(|value| value.get("persisted_path"))
204 .and_then(|value| value.as_str())
205 });
206 let base_dir = explicit_base
207 .or_else(|| {
208 persisted_path
209 .map(|path| workflow_base_dir_from_persisted_path(Path::new(path)))
210 })
211 .or_else(|| fallback_base_dir.map(Path::to_path_buf))
212 .unwrap_or_else(runtime_root_base);
213 Some(WorkflowTarget {
214 workflow_id: sanitize_workflow_id(workflow_id),
215 base_dir,
216 })
217 }
218 _ => None,
219 }
220}
221
222fn parse_target_vm(
223 value: Option<&VmValue>,
224 fallback_base_dir: Option<&Path>,
225 builtin: &str,
226) -> Result<WorkflowTarget, VmError> {
227 let value = value.ok_or_else(|| VmError::Runtime(format!("{builtin}: missing target")))?;
228 parse_target_json(&crate::llm::vm_value_to_json(value), fallback_base_dir).ok_or_else(|| {
229 VmError::Runtime(format!(
230 "{builtin}: target must be a workflow id string or dict with workflow_id/workflow"
231 ))
232 })
233}
234
235fn workflow_status_json(
236 target: &WorkflowTarget,
237 state: &WorkflowMailboxState,
238) -> serde_json::Value {
239 serde_json::json!({
240 "workflow_id": target.workflow_id,
241 "base_dir": target.base_dir.to_string_lossy(),
242 "generation": state.generation,
243 "paused": state.paused,
244 "pending_count": state.mailbox.len(),
245 "query_count": state.queries.len(),
246 "response_count": state.responses.len(),
247 "continue_as_new_count": state.continue_as_new_count,
248 "last_continue_as_new_at": state.last_continue_as_new_at,
249 })
250}
251
252fn enqueue_message(
253 target: &WorkflowTarget,
254 kind: &str,
255 name: &str,
256 payload: serde_json::Value,
257 request_id: Option<String>,
258) -> Result<serde_json::Value, String> {
259 let mut state = load_state(target)?;
260 state.next_seq += 1;
261 let message = WorkflowMessageRecord {
262 seq: state.next_seq,
263 kind: kind.to_string(),
264 name: name.to_string(),
265 request_id,
266 payload,
267 enqueued_at: now_rfc3339(),
268 };
269 state.mailbox.push_back(message.clone());
270 save_state(target, &state)?;
271 Ok(serde_json::json!({
272 "workflow_id": target.workflow_id,
273 "message": message,
274 "status": workflow_status_json(target, &state),
275 }))
276}
277
278pub fn workflow_signal_for_base(
279 base_dir: &Path,
280 workflow_id: &str,
281 name: &str,
282 payload: serde_json::Value,
283) -> Result<serde_json::Value, String> {
284 let target = WorkflowTarget {
285 workflow_id: sanitize_workflow_id(workflow_id),
286 base_dir: base_dir.to_path_buf(),
287 };
288 enqueue_message(&target, "signal", name, payload, None)
289}
290
291pub fn workflow_query_for_base(
292 base_dir: &Path,
293 workflow_id: &str,
294 name: &str,
295) -> Result<serde_json::Value, String> {
296 let target = WorkflowTarget {
297 workflow_id: sanitize_workflow_id(workflow_id),
298 base_dir: base_dir.to_path_buf(),
299 };
300 let state = load_state(&target)?;
301 Ok(state
302 .queries
303 .get(name)
304 .map(|record| record.value.clone())
305 .unwrap_or(serde_json::Value::Null))
306}
307
308pub fn workflow_publish_query_for_base(
309 base_dir: &Path,
310 workflow_id: &str,
311 name: &str,
312 value: serde_json::Value,
313) -> Result<serde_json::Value, String> {
314 let target = WorkflowTarget {
315 workflow_id: sanitize_workflow_id(workflow_id),
316 base_dir: base_dir.to_path_buf(),
317 };
318 let mut state = load_state(&target)?;
319 state.queries.insert(
320 name.to_string(),
321 WorkflowQueryRecord {
322 value,
323 published_at: now_rfc3339(),
324 },
325 );
326 save_state(&target, &state)?;
327 Ok(workflow_status_json(&target, &state))
328}
329
330pub fn workflow_pause_for_base(
331 base_dir: &Path,
332 workflow_id: &str,
333) -> Result<serde_json::Value, String> {
334 let target = WorkflowTarget {
335 workflow_id: sanitize_workflow_id(workflow_id),
336 base_dir: base_dir.to_path_buf(),
337 };
338 let mut state = load_state(&target)?;
339 state.paused = true;
340 state.next_seq += 1;
341 state.mailbox.push_back(WorkflowMessageRecord {
342 seq: state.next_seq,
343 kind: "control".to_string(),
344 name: "pause".to_string(),
345 request_id: None,
346 payload: serde_json::json!({}),
347 enqueued_at: now_rfc3339(),
348 });
349 save_state(&target, &state)?;
350 Ok(workflow_status_json(&target, &state))
351}
352
353pub fn workflow_resume_for_base(
354 base_dir: &Path,
355 workflow_id: &str,
356) -> Result<serde_json::Value, String> {
357 let target = WorkflowTarget {
358 workflow_id: sanitize_workflow_id(workflow_id),
359 base_dir: base_dir.to_path_buf(),
360 };
361 let mut state = load_state(&target)?;
362 state.paused = false;
363 state.next_seq += 1;
364 state.mailbox.push_back(WorkflowMessageRecord {
365 seq: state.next_seq,
366 kind: "control".to_string(),
367 name: "resume".to_string(),
368 request_id: None,
369 payload: serde_json::json!({}),
370 enqueued_at: now_rfc3339(),
371 });
372 save_state(&target, &state)?;
373 Ok(workflow_status_json(&target, &state))
374}
375
376pub async fn workflow_update_for_base(
377 base_dir: &Path,
378 workflow_id: &str,
379 name: &str,
380 payload: serde_json::Value,
381 timeout: StdDuration,
382) -> Result<serde_json::Value, String> {
383 let target = WorkflowTarget {
384 workflow_id: sanitize_workflow_id(workflow_id),
385 base_dir: base_dir.to_path_buf(),
386 };
387 let request_id = uuid::Uuid::now_v7().to_string();
388 enqueue_message(&target, "update", name, payload, Some(request_id.clone()))?;
389 let started = std::time::Instant::now();
390 while started.elapsed() <= timeout {
391 if let Ok(state) = load_state(&target) {
392 if let Some(response) = state.responses.get(&request_id) {
393 return Ok(response.value.clone());
394 }
395 }
396 tokio::time::sleep(StdDuration::from_millis(UPDATE_POLL_INTERVAL_MS)).await;
397 }
398 Err(format!(
399 "workflow update '{name}' timed out for '{}'",
400 target.workflow_id
401 ))
402}
403
404pub fn workflow_respond_update_for_base(
405 base_dir: &Path,
406 workflow_id: &str,
407 request_id: &str,
408 name: Option<&str>,
409 value: serde_json::Value,
410) -> Result<serde_json::Value, String> {
411 let target = WorkflowTarget {
412 workflow_id: sanitize_workflow_id(workflow_id),
413 base_dir: base_dir.to_path_buf(),
414 };
415 let mut state = load_state(&target)?;
416 state.responses.insert(
417 request_id.to_string(),
418 WorkflowUpdateResponseRecord {
419 request_id: request_id.to_string(),
420 name: name.map(ToString::to_string),
421 value,
422 responded_at: now_rfc3339(),
423 },
424 );
425 save_state(&target, &state)?;
426 Ok(workflow_status_json(&target, &state))
427}
428
429pub(crate) fn register_workflow_message_builtins(vm: &mut Vm) {
430 vm.set_global(
431 "workflow",
432 VmValue::Dict(Rc::new(BTreeMap::from([
433 (
434 "signal".to_string(),
435 VmValue::BuiltinRef(Rc::from("workflow.signal")),
436 ),
437 (
438 "query".to_string(),
439 VmValue::BuiltinRef(Rc::from("workflow.query")),
440 ),
441 (
442 "update".to_string(),
443 VmValue::BuiltinRef(Rc::from("workflow.update")),
444 ),
445 (
446 "publish_query".to_string(),
447 VmValue::BuiltinRef(Rc::from("workflow.publish_query")),
448 ),
449 (
450 "receive".to_string(),
451 VmValue::BuiltinRef(Rc::from("workflow.receive")),
452 ),
453 (
454 "respond_update".to_string(),
455 VmValue::BuiltinRef(Rc::from("workflow.respond_update")),
456 ),
457 (
458 "pause".to_string(),
459 VmValue::BuiltinRef(Rc::from("workflow.pause")),
460 ),
461 (
462 "resume".to_string(),
463 VmValue::BuiltinRef(Rc::from("workflow.resume")),
464 ),
465 (
466 "status".to_string(),
467 VmValue::BuiltinRef(Rc::from("workflow.status")),
468 ),
469 (
470 "continue_as_new".to_string(),
471 VmValue::BuiltinRef(Rc::from("workflow.continue_as_new")),
472 ),
473 ]))),
474 );
475
476 vm.register_builtin("workflow.signal", |args, _out| {
477 let target = parse_target_vm(args.first(), None, "workflow.signal")?;
478 let name = args
479 .get(1)
480 .map(|value| value.display())
481 .filter(|value| !value.is_empty())
482 .ok_or_else(|| VmError::Runtime("workflow.signal: missing name".to_string()))?;
483 let payload = args
484 .get(2)
485 .map(crate::llm::vm_value_to_json)
486 .unwrap_or(serde_json::Value::Null);
487 let result =
488 enqueue_message(&target, "signal", &name, payload, None).map_err(VmError::Runtime)?;
489 Ok(crate::stdlib::json_to_vm_value(&result))
490 });
491
492 vm.register_builtin("workflow.query", |args, _out| {
493 let target = parse_target_vm(args.first(), None, "workflow.query")?;
494 let name = args
495 .get(1)
496 .map(|value| value.display())
497 .filter(|value| !value.is_empty())
498 .ok_or_else(|| VmError::Runtime("workflow.query: missing name".to_string()))?;
499 let state = load_state(&target).map_err(VmError::Runtime)?;
500 Ok(crate::stdlib::json_to_vm_value(
501 &state
502 .queries
503 .get(&name)
504 .map(|record| record.value.clone())
505 .unwrap_or(serde_json::Value::Null),
506 ))
507 });
508
509 vm.register_async_builtin("workflow.update", |args| async move {
510 let target = parse_target_vm(args.first(), None, "workflow.update")?;
511 let name = args
512 .get(1)
513 .map(|value| value.display())
514 .filter(|value| !value.is_empty())
515 .ok_or_else(|| VmError::Runtime("workflow.update: missing name".to_string()))?;
516 let payload = args
517 .get(2)
518 .map(crate::llm::vm_value_to_json)
519 .unwrap_or(serde_json::Value::Null);
520 let timeout_ms = args
521 .get(3)
522 .and_then(|value| value.as_dict())
523 .and_then(|dict| dict.get("timeout_ms"))
524 .and_then(VmValue::as_int)
525 .unwrap_or(DEFAULT_UPDATE_TIMEOUT_MS as i64)
526 .max(1) as u64;
527 let result = workflow_update_for_base(
528 &target.base_dir,
529 &target.workflow_id,
530 &name,
531 payload,
532 StdDuration::from_millis(timeout_ms),
533 )
534 .await
535 .map_err(VmError::Runtime)?;
536 Ok(crate::stdlib::json_to_vm_value(&result))
537 });
538
539 vm.register_builtin("workflow.publish_query", |args, _out| {
540 let target = parse_target_vm(args.first(), None, "workflow.publish_query")?;
541 let name = args
542 .get(1)
543 .map(|value| value.display())
544 .filter(|value| !value.is_empty())
545 .ok_or_else(|| VmError::Runtime("workflow.publish_query: missing name".to_string()))?;
546 let value = args
547 .get(2)
548 .map(crate::llm::vm_value_to_json)
549 .unwrap_or(serde_json::Value::Null);
550 let result =
551 workflow_publish_query_for_base(&target.base_dir, &target.workflow_id, &name, value)
552 .map_err(VmError::Runtime)?;
553 Ok(crate::stdlib::json_to_vm_value(&result))
554 });
555
556 vm.register_builtin("workflow.receive", |args, _out| {
557 let target = parse_target_vm(args.first(), None, "workflow.receive")?;
558 let mut state = load_state(&target).map_err(VmError::Runtime)?;
559 let Some(message) = state.mailbox.pop_front() else {
560 return Ok(VmValue::Nil);
561 };
562 save_state(&target, &state).map_err(VmError::Runtime)?;
563 Ok(crate::stdlib::json_to_vm_value(&serde_json::json!({
564 "workflow_id": target.workflow_id,
565 "seq": message.seq,
566 "kind": message.kind,
567 "name": message.name,
568 "request_id": message.request_id,
569 "payload": message.payload,
570 "enqueued_at": message.enqueued_at,
571 })))
572 });
573
574 vm.register_builtin("workflow.respond_update", |args, _out| {
575 let target = parse_target_vm(args.first(), None, "workflow.respond_update")?;
576 let request_id = args
577 .get(1)
578 .map(|value| value.display())
579 .filter(|value| !value.is_empty())
580 .ok_or_else(|| {
581 VmError::Runtime("workflow.respond_update: missing request id".to_string())
582 })?;
583 let value = args
584 .get(2)
585 .map(crate::llm::vm_value_to_json)
586 .unwrap_or(serde_json::Value::Null);
587 let name = args
588 .get(3)
589 .map(|value| value.display())
590 .filter(|value| !value.is_empty());
591 let result = workflow_respond_update_for_base(
592 &target.base_dir,
593 &target.workflow_id,
594 &request_id,
595 name.as_deref(),
596 value,
597 )
598 .map_err(VmError::Runtime)?;
599 Ok(crate::stdlib::json_to_vm_value(&result))
600 });
601
602 vm.register_builtin("workflow.pause", |args, _out| {
603 let target = parse_target_vm(args.first(), None, "workflow.pause")?;
604 let result = workflow_pause_for_base(&target.base_dir, &target.workflow_id)
605 .map_err(VmError::Runtime)?;
606 Ok(crate::stdlib::json_to_vm_value(&result))
607 });
608
609 vm.register_builtin("workflow.resume", |args, _out| {
610 let target = parse_target_vm(args.first(), None, "workflow.resume")?;
611 let result = workflow_resume_for_base(&target.base_dir, &target.workflow_id)
612 .map_err(VmError::Runtime)?;
613 Ok(crate::stdlib::json_to_vm_value(&result))
614 });
615
616 vm.register_builtin("workflow.status", |args, _out| {
617 let target = parse_target_vm(args.first(), None, "workflow.status")?;
618 let state = load_state(&target).map_err(VmError::Runtime)?;
619 Ok(crate::stdlib::json_to_vm_value(&workflow_status_json(
620 &target, &state,
621 )))
622 });
623
624 vm.register_builtin("workflow.continue_as_new", |args, _out| {
625 let target = parse_target_vm(args.first(), None, "workflow.continue_as_new")?;
626 let mut state = load_state(&target).map_err(VmError::Runtime)?;
627 state.generation += 1;
628 state.continue_as_new_count += 1;
629 state.last_continue_as_new_at = Some(now_rfc3339());
630 state.responses.clear();
631 save_state(&target, &state).map_err(VmError::Runtime)?;
632 Ok(crate::stdlib::json_to_vm_value(&workflow_status_json(
633 &target, &state,
634 )))
635 });
636
637 vm.register_builtin("continue_as_new", |args, _out| {
638 let target = parse_target_vm(args.first(), None, "continue_as_new")?;
639 let mut state = load_state(&target).map_err(VmError::Runtime)?;
640 state.generation += 1;
641 state.continue_as_new_count += 1;
642 state.last_continue_as_new_at = Some(now_rfc3339());
643 state.responses.clear();
644 save_state(&target, &state).map_err(VmError::Runtime)?;
645 Ok(crate::stdlib::json_to_vm_value(&workflow_status_json(
646 &target, &state,
647 )))
648 });
649}
650
651#[cfg(test)]
652mod tests {
653 use super::*;
654
655 #[tokio::test(start_paused = true)]
656 async fn update_round_trip_waits_for_response() {
657 let dir = tempfile::tempdir().expect("tempdir");
658 let workflow_id = "wf-update";
659 let base_dir = dir.path().to_path_buf();
660 let base_dir_clone = base_dir.clone();
661 let task = tokio::spawn(async move {
662 workflow_update_for_base(
663 &base_dir_clone,
664 workflow_id,
665 "adjust_budget",
666 serde_json::json!({"max_usd": 10}),
667 StdDuration::from_millis(500),
668 )
669 .await
670 });
671
672 tokio::time::sleep(StdDuration::from_millis(50)).await;
673 let target = WorkflowTarget {
674 workflow_id: workflow_id.to_string(),
675 base_dir: base_dir.clone(),
676 };
677 let mut state = load_state(&target).expect("load state");
678 let message = state.mailbox.pop_front().expect("queued update");
679 assert_eq!(message.kind, "update");
680 state.responses.insert(
681 message.request_id.clone().expect("request id"),
682 WorkflowUpdateResponseRecord {
683 request_id: message.request_id.expect("request id"),
684 name: Some(message.name.clone()),
685 value: serde_json::json!({"ok": true}),
686 responded_at: now_rfc3339(),
687 },
688 );
689 save_state(&target, &state).expect("save response");
690
691 let result = task.await.expect("join").expect("update result");
692 assert_eq!(result, serde_json::json!({"ok": true}));
693 }
694
695 #[test]
696 fn persisted_path_drives_target_base_dir() {
697 let base = parse_target_json(
698 &serde_json::json!({
699 "workflow_id": "wf",
700 "persisted_path": "/tmp/demo/.harn-runs/run.json"
701 }),
702 None,
703 )
704 .expect("target");
705 assert_eq!(base.workflow_id, "wf");
706 assert_eq!(base.base_dir, PathBuf::from("/tmp/demo"));
707 }
708}