Skip to main content

harn_vm/orchestration/
mod.rs

1use std::path::PathBuf;
2use std::{cell::RefCell, thread_local};
3
4use serde::{Deserialize, Serialize};
5
6use crate::llm::vm_value_to_json;
7use crate::value::{VmError, VmValue};
8
9pub(crate) fn now_rfc3339() -> String {
10    use std::time::{SystemTime, UNIX_EPOCH};
11    let ts = SystemTime::now()
12        .duration_since(UNIX_EPOCH)
13        .unwrap_or_default()
14        .as_secs();
15    format!("{ts}")
16}
17
18pub(crate) fn new_id(prefix: &str) -> String {
19    format!("{prefix}_{}", uuid::Uuid::now_v7())
20}
21
22pub(crate) fn default_run_dir() -> PathBuf {
23    let base = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
24    crate::runtime_paths::run_root(&base)
25}
26
27mod hooks;
28pub use hooks::*;
29
30mod compaction;
31pub use compaction::*;
32
33mod artifacts;
34pub use artifacts::*;
35
36mod assemble;
37pub use assemble::*;
38
39mod handoffs;
40pub use handoffs::*;
41
42mod policy;
43pub use policy::*;
44
45mod workflow;
46pub use workflow::*;
47
48mod records;
49pub use records::*;
50
51thread_local! {
52    static CURRENT_MUTATION_SESSION: RefCell<Option<MutationSessionRecord>> = const { RefCell::new(None) };
53    /// Workflow-level skill context, installed by `workflow_execute` so
54    /// every per-node agent loop constructed inside `execute_stage_node`
55    /// can pick up the same `skills:` / `skill_match:` registry without
56    /// threading a new parameter through every helper. Cleared on
57    /// workflow exit (success or error) by `WorkflowSkillContextGuard`.
58    static CURRENT_WORKFLOW_SKILL_CONTEXT: RefCell<Option<WorkflowSkillContext>> = const { RefCell::new(None) };
59}
60
61/// Skill wiring threaded from `workflow_execute` into the per-stage
62/// agent loops via thread-local context. `VmValue` wraps `Rc` and is
63/// not `Send`, so we store it in a thread-local rather than a mutex —
64/// the workflow runner pins itself to one task via `LocalSet`, so
65/// every stage observes the same context.
66#[derive(Clone, Default)]
67pub struct WorkflowSkillContext {
68    pub registry: Option<VmValue>,
69    pub match_config: Option<VmValue>,
70}
71
72pub fn install_workflow_skill_context(context: Option<WorkflowSkillContext>) {
73    CURRENT_WORKFLOW_SKILL_CONTEXT.with(|slot| {
74        *slot.borrow_mut() = context;
75    });
76}
77
78pub fn current_workflow_skill_context() -> Option<WorkflowSkillContext> {
79    CURRENT_WORKFLOW_SKILL_CONTEXT.with(|slot| slot.borrow().clone())
80}
81
82/// RAII guard that clears the workflow skill context on drop. Paired
83/// with `install_workflow_skill_context` at the top of `execute_workflow`
84/// so the context never leaks past a workflow's scope.
85pub struct WorkflowSkillContextGuard;
86
87impl Drop for WorkflowSkillContextGuard {
88    fn drop(&mut self) {
89        install_workflow_skill_context(None);
90    }
91}
92
93#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
94#[serde(default)]
95pub struct MutationSessionRecord {
96    pub session_id: String,
97    pub parent_session_id: Option<String>,
98    pub run_id: Option<String>,
99    pub worker_id: Option<String>,
100    pub execution_kind: Option<String>,
101    pub mutation_scope: String,
102    /// Declarative per-tool approval policy for this session. When `None`,
103    /// no policy-driven approval is requested; the session update stream
104    /// remains the only host-observable surface for tool dispatch.
105    pub approval_policy: Option<ToolApprovalPolicy>,
106}
107
108impl MutationSessionRecord {
109    pub fn normalize(mut self) -> Self {
110        if self.session_id.is_empty() {
111            self.session_id = new_id("session");
112        }
113        if self.mutation_scope.is_empty() {
114            self.mutation_scope = "read_only".to_string();
115        }
116        self
117    }
118}
119
120pub fn install_current_mutation_session(session: Option<MutationSessionRecord>) {
121    CURRENT_MUTATION_SESSION.with(|slot| {
122        *slot.borrow_mut() = session.map(MutationSessionRecord::normalize);
123    });
124}
125
126pub fn current_mutation_session() -> Option<MutationSessionRecord> {
127    CURRENT_MUTATION_SESSION.with(|slot| slot.borrow().clone())
128}
129pub(crate) fn parse_json_payload<T: for<'de> Deserialize<'de>>(
130    json: serde_json::Value,
131    label: &str,
132) -> Result<T, VmError> {
133    let payload = json.to_string();
134    let mut deserializer = serde_json::Deserializer::from_str(&payload);
135    let mut tracker = serde_path_to_error::Track::new();
136    let path_deserializer = serde_path_to_error::Deserializer::new(&mut deserializer, &mut tracker);
137    T::deserialize(path_deserializer).map_err(|error| {
138        let snippet = if payload.len() > 600 {
139            format!("{}...", &payload[..600])
140        } else {
141            payload.clone()
142        };
143        VmError::Runtime(format!(
144            "{label} parse error at {}: {} | payload={}",
145            tracker.path(),
146            error,
147            snippet
148        ))
149    })
150}
151
152pub(crate) fn parse_json_value<T: for<'de> Deserialize<'de>>(
153    value: &VmValue,
154) -> Result<T, VmError> {
155    parse_json_payload(vm_value_to_json(value), "orchestration")
156}
157
158#[cfg(test)]
159mod tests;