1use std::collections::{BTreeMap, VecDeque};
2use std::path::{Path, PathBuf};
3use std::rc::Rc;
4use std::time::Duration as StdDuration;
5
6use serde::{Deserialize, Serialize};
7
8use crate::stdlib::process::runtime_root_base;
9use crate::value::{VmError, VmValue};
10use crate::vm::Vm;
11
12const DEFAULT_UPDATE_TIMEOUT_MS: u64 = 30_000;
13const UPDATE_POLL_INTERVAL_MS: u64 = 25;
14
15#[derive(Clone, Debug, Serialize, Deserialize)]
16pub struct WorkflowMessageRecord {
17 pub seq: u64,
18 pub kind: String,
19 pub name: String,
20 #[serde(skip_serializing_if = "Option::is_none")]
21 pub request_id: Option<String>,
22 pub payload: serde_json::Value,
23 pub enqueued_at: String,
24}
25
26#[derive(Clone, Debug, Serialize, Deserialize)]
27pub struct WorkflowQueryRecord {
28 pub value: serde_json::Value,
29 pub published_at: String,
30}
31
32#[derive(Clone, Debug, Serialize, Deserialize)]
33pub struct WorkflowUpdateResponseRecord {
34 pub request_id: String,
35 #[serde(skip_serializing_if = "Option::is_none")]
36 pub name: Option<String>,
37 pub value: serde_json::Value,
38 pub responded_at: String,
39}
40
41#[derive(Clone, Debug, Serialize, Deserialize)]
42pub struct WorkflowMailboxState {
43 #[serde(rename = "_type")]
44 pub type_name: String,
45 pub workflow_id: String,
46 #[serde(default = "default_generation")]
47 pub generation: u64,
48 #[serde(default)]
49 pub continue_as_new_count: u64,
50 #[serde(skip_serializing_if = "Option::is_none")]
51 pub last_continue_as_new_at: Option<String>,
52 #[serde(default)]
53 pub paused: bool,
54 #[serde(default)]
55 pub next_seq: u64,
56 #[serde(default)]
57 pub mailbox: VecDeque<WorkflowMessageRecord>,
58 #[serde(default)]
59 pub queries: BTreeMap<String, WorkflowQueryRecord>,
60 #[serde(default)]
61 pub responses: BTreeMap<String, WorkflowUpdateResponseRecord>,
62}
63
64impl Default for WorkflowMailboxState {
65 fn default() -> Self {
66 Self {
67 type_name: "workflow_mailbox".to_string(),
68 workflow_id: String::new(),
69 generation: default_generation(),
70 continue_as_new_count: 0,
71 last_continue_as_new_at: None,
72 paused: false,
73 next_seq: 0,
74 mailbox: VecDeque::new(),
75 queries: BTreeMap::new(),
76 responses: BTreeMap::new(),
77 }
78 }
79}
80
81fn default_generation() -> u64 {
82 1
83}
84
85#[derive(Clone, Debug, PartialEq, Eq)]
86struct WorkflowTarget {
87 workflow_id: String,
88 base_dir: PathBuf,
89}
90
91fn sanitize_workflow_id(raw: &str) -> String {
92 let trimmed = raw.trim();
93 let base = Path::new(trimmed)
94 .file_name()
95 .and_then(|value| value.to_str())
96 .unwrap_or(trimmed);
97 if base.is_empty() || base == "." || base == ".." {
98 "workflow".to_string()
99 } else {
100 base.to_string()
101 }
102}
103
104fn workflow_base_dir_from_persisted_path(path: &Path) -> PathBuf {
105 let parent = path.parent().unwrap_or_else(|| Path::new("."));
106 if parent.file_name().and_then(|value| value.to_str()) == Some(".harn-runs") {
107 parent.parent().unwrap_or(parent).to_path_buf()
108 } else {
109 parent.to_path_buf()
110 }
111}
112
113fn workflow_target_root(target: &WorkflowTarget) -> PathBuf {
114 crate::runtime_paths::workflow_dir(&target.base_dir).join(&target.workflow_id)
115}
116
117fn workflow_state_path(target: &WorkflowTarget) -> PathBuf {
118 workflow_target_root(target).join("state.json")
119}
120
121fn now_rfc3339() -> String {
122 time::OffsetDateTime::now_utc()
123 .format(&time::format_description::well_known::Rfc3339)
124 .unwrap_or_else(|_| uuid::Uuid::now_v7().to_string())
125}
126
127fn load_state(target: &WorkflowTarget) -> Result<WorkflowMailboxState, String> {
128 let path = workflow_state_path(target);
129 if !path.exists() {
130 return Ok(WorkflowMailboxState {
131 workflow_id: target.workflow_id.clone(),
132 ..WorkflowMailboxState::default()
133 });
134 }
135 let text = std::fs::read_to_string(&path)
136 .map_err(|error| format!("workflow state read error: {error}"))?;
137 let mut state: WorkflowMailboxState = serde_json::from_str(&text)
138 .map_err(|error| format!("workflow state parse error: {error}"))?;
139 if state.type_name.is_empty() {
140 state.type_name = "workflow_mailbox".to_string();
141 }
142 if state.workflow_id.is_empty() {
143 state.workflow_id = target.workflow_id.clone();
144 }
145 if state.generation == 0 {
146 state.generation = 1;
147 }
148 Ok(state)
149}
150
151fn save_state(target: &WorkflowTarget, state: &WorkflowMailboxState) -> Result<(), String> {
152 let path = workflow_state_path(target);
153 let parent = path
154 .parent()
155 .ok_or_else(|| "workflow state path has no parent".to_string())?;
156 std::fs::create_dir_all(parent)
157 .map_err(|error| format!("workflow state mkdir error: {error}"))?;
158 let json = serde_json::to_string_pretty(state)
159 .map_err(|error| format!("workflow state encode error: {error}"))?;
160 let tmp_path = parent.join(format!(".state.json.{}.tmp", uuid::Uuid::now_v7()));
161 std::fs::write(&tmp_path, &json)
162 .map_err(|error| format!("workflow state write error: {error}"))?;
163 if let Err(error) = std::fs::rename(&tmp_path, &path) {
164 let _ = std::fs::remove_file(&tmp_path);
165 return Err(format!("workflow state rename error: {error}"));
166 }
167 Ok(())
168}
169
170fn parse_target_json(
171 value: &serde_json::Value,
172 fallback_base_dir: Option<&Path>,
173) -> Option<WorkflowTarget> {
174 match value {
175 serde_json::Value::String(text) => Some(WorkflowTarget {
176 workflow_id: sanitize_workflow_id(text),
177 base_dir: fallback_base_dir
178 .map(Path::to_path_buf)
179 .unwrap_or_else(runtime_root_base),
180 }),
181 serde_json::Value::Object(map) => {
182 let workflow_id = map
183 .get("workflow_id")
184 .and_then(|value| value.as_str())
185 .or_else(|| map.get("workflow").and_then(|value| value.as_str()))
186 .or_else(|| {
187 map.get("run")
188 .and_then(|value| value.get("workflow_id"))
189 .and_then(|value| value.as_str())
190 })
191 .or_else(|| {
192 map.get("result")
193 .and_then(|value| value.get("run"))
194 .and_then(|value| value.get("workflow_id"))
195 .and_then(|value| value.as_str())
196 })?;
197 let explicit_base = map
198 .get("base_dir")
199 .and_then(|value| value.as_str())
200 .filter(|value| !value.trim().is_empty())
201 .map(PathBuf::from);
202 let persisted_path = map
203 .get("persisted_path")
204 .and_then(|value| value.as_str())
205 .or_else(|| map.get("path").and_then(|value| value.as_str()))
206 .or_else(|| {
207 map.get("run")
208 .and_then(|value| value.get("persisted_path"))
209 .and_then(|value| value.as_str())
210 })
211 .or_else(|| {
212 map.get("result")
213 .and_then(|value| value.get("run"))
214 .and_then(|value| value.get("persisted_path"))
215 .and_then(|value| value.as_str())
216 });
217 let base_dir = explicit_base
218 .or_else(|| {
219 persisted_path
220 .map(|path| workflow_base_dir_from_persisted_path(Path::new(path)))
221 })
222 .or_else(|| fallback_base_dir.map(Path::to_path_buf))
223 .unwrap_or_else(runtime_root_base);
224 Some(WorkflowTarget {
225 workflow_id: sanitize_workflow_id(workflow_id),
226 base_dir,
227 })
228 }
229 _ => None,
230 }
231}
232
233fn parse_target_vm(
234 value: Option<&VmValue>,
235 fallback_base_dir: Option<&Path>,
236 builtin: &str,
237) -> Result<WorkflowTarget, VmError> {
238 let value = value.ok_or_else(|| VmError::Runtime(format!("{builtin}: missing target")))?;
239 parse_target_json(&crate::llm::vm_value_to_json(value), fallback_base_dir).ok_or_else(|| {
240 VmError::Runtime(format!(
241 "{builtin}: target must be a workflow id string or dict with workflow_id/workflow"
242 ))
243 })
244}
245
246fn workflow_status_json(
247 target: &WorkflowTarget,
248 state: &WorkflowMailboxState,
249) -> serde_json::Value {
250 serde_json::json!({
251 "workflow_id": target.workflow_id,
252 "base_dir": target.base_dir.to_string_lossy(),
253 "generation": state.generation,
254 "paused": state.paused,
255 "pending_count": state.mailbox.len(),
256 "query_count": state.queries.len(),
257 "response_count": state.responses.len(),
258 "continue_as_new_count": state.continue_as_new_count,
259 "last_continue_as_new_at": state.last_continue_as_new_at,
260 })
261}
262
263fn enqueue_message(
264 target: &WorkflowTarget,
265 kind: &str,
266 name: &str,
267 payload: serde_json::Value,
268 request_id: Option<String>,
269) -> Result<serde_json::Value, String> {
270 let mut state = load_state(target)?;
271 state.next_seq += 1;
272 let message = WorkflowMessageRecord {
273 seq: state.next_seq,
274 kind: kind.to_string(),
275 name: name.to_string(),
276 request_id,
277 payload,
278 enqueued_at: now_rfc3339(),
279 };
280 state.mailbox.push_back(message.clone());
281 save_state(target, &state)?;
282 Ok(serde_json::json!({
283 "workflow_id": target.workflow_id,
284 "message": message,
285 "status": workflow_status_json(target, &state),
286 }))
287}
288
289pub fn workflow_signal_for_base(
290 base_dir: &Path,
291 workflow_id: &str,
292 name: &str,
293 payload: serde_json::Value,
294) -> Result<serde_json::Value, String> {
295 let target = WorkflowTarget {
296 workflow_id: sanitize_workflow_id(workflow_id),
297 base_dir: base_dir.to_path_buf(),
298 };
299 enqueue_message(&target, "signal", name, payload, None)
300}
301
302pub fn workflow_query_for_base(
303 base_dir: &Path,
304 workflow_id: &str,
305 name: &str,
306) -> Result<serde_json::Value, String> {
307 let target = WorkflowTarget {
308 workflow_id: sanitize_workflow_id(workflow_id),
309 base_dir: base_dir.to_path_buf(),
310 };
311 let state = load_state(&target)?;
312 Ok(state
313 .queries
314 .get(name)
315 .map(|record| record.value.clone())
316 .unwrap_or(serde_json::Value::Null))
317}
318
319pub fn workflow_publish_query_for_base(
320 base_dir: &Path,
321 workflow_id: &str,
322 name: &str,
323 value: serde_json::Value,
324) -> Result<serde_json::Value, String> {
325 let target = WorkflowTarget {
326 workflow_id: sanitize_workflow_id(workflow_id),
327 base_dir: base_dir.to_path_buf(),
328 };
329 let mut state = load_state(&target)?;
330 state.queries.insert(
331 name.to_string(),
332 WorkflowQueryRecord {
333 value,
334 published_at: now_rfc3339(),
335 },
336 );
337 save_state(&target, &state)?;
338 Ok(workflow_status_json(&target, &state))
339}
340
341pub fn workflow_pause_for_base(
342 base_dir: &Path,
343 workflow_id: &str,
344) -> Result<serde_json::Value, String> {
345 let target = WorkflowTarget {
346 workflow_id: sanitize_workflow_id(workflow_id),
347 base_dir: base_dir.to_path_buf(),
348 };
349 let mut state = load_state(&target)?;
350 state.paused = true;
351 state.next_seq += 1;
352 state.mailbox.push_back(WorkflowMessageRecord {
353 seq: state.next_seq,
354 kind: "control".to_string(),
355 name: "pause".to_string(),
356 request_id: None,
357 payload: serde_json::json!({}),
358 enqueued_at: now_rfc3339(),
359 });
360 save_state(&target, &state)?;
361 Ok(workflow_status_json(&target, &state))
362}
363
364pub fn workflow_resume_for_base(
365 base_dir: &Path,
366 workflow_id: &str,
367) -> Result<serde_json::Value, String> {
368 let target = WorkflowTarget {
369 workflow_id: sanitize_workflow_id(workflow_id),
370 base_dir: base_dir.to_path_buf(),
371 };
372 let mut state = load_state(&target)?;
373 state.paused = false;
374 state.next_seq += 1;
375 state.mailbox.push_back(WorkflowMessageRecord {
376 seq: state.next_seq,
377 kind: "control".to_string(),
378 name: "resume".to_string(),
379 request_id: None,
380 payload: serde_json::json!({}),
381 enqueued_at: now_rfc3339(),
382 });
383 save_state(&target, &state)?;
384 Ok(workflow_status_json(&target, &state))
385}
386
387pub async fn workflow_update_for_base(
388 base_dir: &Path,
389 workflow_id: &str,
390 name: &str,
391 payload: serde_json::Value,
392 timeout: StdDuration,
393) -> Result<serde_json::Value, String> {
394 let target = WorkflowTarget {
395 workflow_id: sanitize_workflow_id(workflow_id),
396 base_dir: base_dir.to_path_buf(),
397 };
398 let request_id = uuid::Uuid::now_v7().to_string();
399 enqueue_message(&target, "update", name, payload, Some(request_id.clone()))?;
400 let started = std::time::Instant::now();
401 while started.elapsed() <= timeout {
402 if let Ok(state) = load_state(&target) {
403 if let Some(response) = state.responses.get(&request_id) {
404 return Ok(response.value.clone());
405 }
406 }
407 tokio::time::sleep(StdDuration::from_millis(UPDATE_POLL_INTERVAL_MS)).await;
408 }
409 Err(format!(
410 "workflow update '{name}' timed out for '{}'",
411 target.workflow_id
412 ))
413}
414
415pub fn workflow_respond_update_for_base(
416 base_dir: &Path,
417 workflow_id: &str,
418 request_id: &str,
419 name: Option<&str>,
420 value: serde_json::Value,
421) -> Result<serde_json::Value, String> {
422 let target = WorkflowTarget {
423 workflow_id: sanitize_workflow_id(workflow_id),
424 base_dir: base_dir.to_path_buf(),
425 };
426 let mut state = load_state(&target)?;
427 state.responses.insert(
428 request_id.to_string(),
429 WorkflowUpdateResponseRecord {
430 request_id: request_id.to_string(),
431 name: name.map(ToString::to_string),
432 value,
433 responded_at: now_rfc3339(),
434 },
435 );
436 save_state(&target, &state)?;
437 Ok(workflow_status_json(&target, &state))
438}
439
440pub(crate) fn register_workflow_message_builtins(vm: &mut Vm) {
441 vm.set_global(
442 "workflow",
443 VmValue::Dict(Rc::new(BTreeMap::from([
444 (
445 "signal".to_string(),
446 VmValue::BuiltinRef(Rc::from("workflow.signal")),
447 ),
448 (
449 "query".to_string(),
450 VmValue::BuiltinRef(Rc::from("workflow.query")),
451 ),
452 (
453 "update".to_string(),
454 VmValue::BuiltinRef(Rc::from("workflow.update")),
455 ),
456 (
457 "publish_query".to_string(),
458 VmValue::BuiltinRef(Rc::from("workflow.publish_query")),
459 ),
460 (
461 "receive".to_string(),
462 VmValue::BuiltinRef(Rc::from("workflow.receive")),
463 ),
464 (
465 "respond_update".to_string(),
466 VmValue::BuiltinRef(Rc::from("workflow.respond_update")),
467 ),
468 (
469 "pause".to_string(),
470 VmValue::BuiltinRef(Rc::from("workflow.pause")),
471 ),
472 (
473 "resume".to_string(),
474 VmValue::BuiltinRef(Rc::from("workflow.resume")),
475 ),
476 (
477 "status".to_string(),
478 VmValue::BuiltinRef(Rc::from("workflow.status")),
479 ),
480 (
481 "continue_as_new".to_string(),
482 VmValue::BuiltinRef(Rc::from("workflow.continue_as_new")),
483 ),
484 ]))),
485 );
486
487 vm.register_builtin("workflow.signal", |args, _out| {
488 let target = parse_target_vm(args.first(), None, "workflow.signal")?;
489 let name = args
490 .get(1)
491 .map(|value| value.display())
492 .filter(|value| !value.is_empty())
493 .ok_or_else(|| VmError::Runtime("workflow.signal: missing name".to_string()))?;
494 let payload = args
495 .get(2)
496 .map(crate::llm::vm_value_to_json)
497 .unwrap_or(serde_json::Value::Null);
498 let result =
499 enqueue_message(&target, "signal", &name, payload, None).map_err(VmError::Runtime)?;
500 Ok(crate::stdlib::json_to_vm_value(&result))
501 });
502
503 vm.register_builtin("workflow.query", |args, _out| {
504 let target = parse_target_vm(args.first(), None, "workflow.query")?;
505 let name = args
506 .get(1)
507 .map(|value| value.display())
508 .filter(|value| !value.is_empty())
509 .ok_or_else(|| VmError::Runtime("workflow.query: missing name".to_string()))?;
510 let state = load_state(&target).map_err(VmError::Runtime)?;
511 Ok(crate::stdlib::json_to_vm_value(
512 &state
513 .queries
514 .get(&name)
515 .map(|record| record.value.clone())
516 .unwrap_or(serde_json::Value::Null),
517 ))
518 });
519
520 vm.register_async_builtin("workflow.update", |args| async move {
521 let target = parse_target_vm(args.first(), None, "workflow.update")?;
522 let name = args
523 .get(1)
524 .map(|value| value.display())
525 .filter(|value| !value.is_empty())
526 .ok_or_else(|| VmError::Runtime("workflow.update: missing name".to_string()))?;
527 let payload = args
528 .get(2)
529 .map(crate::llm::vm_value_to_json)
530 .unwrap_or(serde_json::Value::Null);
531 let timeout_ms = args
532 .get(3)
533 .and_then(|value| value.as_dict())
534 .and_then(|dict| dict.get("timeout_ms"))
535 .and_then(VmValue::as_int)
536 .unwrap_or(DEFAULT_UPDATE_TIMEOUT_MS as i64)
537 .max(1) as u64;
538 let result = workflow_update_for_base(
539 &target.base_dir,
540 &target.workflow_id,
541 &name,
542 payload,
543 StdDuration::from_millis(timeout_ms),
544 )
545 .await
546 .map_err(VmError::Runtime)?;
547 Ok(crate::stdlib::json_to_vm_value(&result))
548 });
549
550 vm.register_builtin("workflow.publish_query", |args, _out| {
551 let target = parse_target_vm(args.first(), None, "workflow.publish_query")?;
552 let name = args
553 .get(1)
554 .map(|value| value.display())
555 .filter(|value| !value.is_empty())
556 .ok_or_else(|| VmError::Runtime("workflow.publish_query: missing name".to_string()))?;
557 let value = args
558 .get(2)
559 .map(crate::llm::vm_value_to_json)
560 .unwrap_or(serde_json::Value::Null);
561 let result =
562 workflow_publish_query_for_base(&target.base_dir, &target.workflow_id, &name, value)
563 .map_err(VmError::Runtime)?;
564 Ok(crate::stdlib::json_to_vm_value(&result))
565 });
566
567 vm.register_builtin("workflow.receive", |args, _out| {
568 let target = parse_target_vm(args.first(), None, "workflow.receive")?;
569 let mut state = load_state(&target).map_err(VmError::Runtime)?;
570 let Some(message) = state.mailbox.pop_front() else {
571 return Ok(VmValue::Nil);
572 };
573 save_state(&target, &state).map_err(VmError::Runtime)?;
574 Ok(crate::stdlib::json_to_vm_value(&serde_json::json!({
575 "workflow_id": target.workflow_id,
576 "seq": message.seq,
577 "kind": message.kind,
578 "name": message.name,
579 "request_id": message.request_id,
580 "payload": message.payload,
581 "enqueued_at": message.enqueued_at,
582 })))
583 });
584
585 vm.register_builtin("workflow.respond_update", |args, _out| {
586 let target = parse_target_vm(args.first(), None, "workflow.respond_update")?;
587 let request_id = args
588 .get(1)
589 .map(|value| value.display())
590 .filter(|value| !value.is_empty())
591 .ok_or_else(|| {
592 VmError::Runtime("workflow.respond_update: missing request id".to_string())
593 })?;
594 let value = args
595 .get(2)
596 .map(crate::llm::vm_value_to_json)
597 .unwrap_or(serde_json::Value::Null);
598 let name = args
599 .get(3)
600 .map(|value| value.display())
601 .filter(|value| !value.is_empty());
602 let result = workflow_respond_update_for_base(
603 &target.base_dir,
604 &target.workflow_id,
605 &request_id,
606 name.as_deref(),
607 value,
608 )
609 .map_err(VmError::Runtime)?;
610 Ok(crate::stdlib::json_to_vm_value(&result))
611 });
612
613 vm.register_builtin("workflow.pause", |args, _out| {
614 let target = parse_target_vm(args.first(), None, "workflow.pause")?;
615 let result = workflow_pause_for_base(&target.base_dir, &target.workflow_id)
616 .map_err(VmError::Runtime)?;
617 Ok(crate::stdlib::json_to_vm_value(&result))
618 });
619
620 vm.register_builtin("workflow.resume", |args, _out| {
621 let target = parse_target_vm(args.first(), None, "workflow.resume")?;
622 let result = workflow_resume_for_base(&target.base_dir, &target.workflow_id)
623 .map_err(VmError::Runtime)?;
624 Ok(crate::stdlib::json_to_vm_value(&result))
625 });
626
627 vm.register_builtin("workflow.status", |args, _out| {
628 let target = parse_target_vm(args.first(), None, "workflow.status")?;
629 let state = load_state(&target).map_err(VmError::Runtime)?;
630 Ok(crate::stdlib::json_to_vm_value(&workflow_status_json(
631 &target, &state,
632 )))
633 });
634
635 vm.register_builtin("workflow.continue_as_new", |args, _out| {
636 let target = parse_target_vm(args.first(), None, "workflow.continue_as_new")?;
637 let mut state = load_state(&target).map_err(VmError::Runtime)?;
638 state.generation += 1;
639 state.continue_as_new_count += 1;
640 state.last_continue_as_new_at = Some(now_rfc3339());
641 state.responses.clear();
642 save_state(&target, &state).map_err(VmError::Runtime)?;
643 Ok(crate::stdlib::json_to_vm_value(&workflow_status_json(
644 &target, &state,
645 )))
646 });
647
648 vm.register_builtin("continue_as_new", |args, _out| {
649 let target = parse_target_vm(args.first(), None, "continue_as_new")?;
650 let mut state = load_state(&target).map_err(VmError::Runtime)?;
651 state.generation += 1;
652 state.continue_as_new_count += 1;
653 state.last_continue_as_new_at = Some(now_rfc3339());
654 state.responses.clear();
655 save_state(&target, &state).map_err(VmError::Runtime)?;
656 Ok(crate::stdlib::json_to_vm_value(&workflow_status_json(
657 &target, &state,
658 )))
659 });
660}
661
662#[cfg(test)]
663mod tests {
664 use super::*;
665
666 #[tokio::test]
667 async fn update_round_trip_waits_for_response() {
668 let dir = tempfile::tempdir().expect("tempdir");
669 let workflow_id = "wf-update";
670 let base_dir = dir.path().to_path_buf();
671 let base_dir_clone = base_dir.clone();
672 let task = tokio::spawn(async move {
673 workflow_update_for_base(
674 &base_dir_clone,
675 workflow_id,
676 "adjust_budget",
677 serde_json::json!({"max_usd": 10}),
678 StdDuration::from_millis(500),
679 )
680 .await
681 });
682
683 tokio::time::sleep(StdDuration::from_millis(50)).await;
684 let target = WorkflowTarget {
685 workflow_id: workflow_id.to_string(),
686 base_dir: base_dir.clone(),
687 };
688 let mut state = load_state(&target).expect("load state");
689 let message = state.mailbox.pop_front().expect("queued update");
690 assert_eq!(message.kind, "update");
691 state.responses.insert(
692 message.request_id.clone().expect("request id"),
693 WorkflowUpdateResponseRecord {
694 request_id: message.request_id.expect("request id"),
695 name: Some(message.name.clone()),
696 value: serde_json::json!({"ok": true}),
697 responded_at: now_rfc3339(),
698 },
699 );
700 save_state(&target, &state).expect("save response");
701
702 let result = task.await.expect("join").expect("update result");
703 assert_eq!(result, serde_json::json!({"ok": true}));
704 }
705
706 #[test]
707 fn persisted_path_drives_target_base_dir() {
708 let base = parse_target_json(
709 &serde_json::json!({
710 "workflow_id": "wf",
711 "persisted_path": "/tmp/demo/.harn-runs/run.json"
712 }),
713 None,
714 )
715 .expect("target");
716 assert_eq!(base.workflow_id, "wf");
717 assert_eq!(base.base_dir, PathBuf::from("/tmp/demo"));
718 }
719}