Skip to main content

harn_vm/
agent_sessions.rs

1//! First-class session storage.
2//!
3//! A session owns three things:
4//!
5//! 1. A transcript dict (messages, events, summary, metadata, …).
6//! 2. Closure subscribers that fire on agent-loop events for this session.
7//! 3. Its own lifecycle (open, reset, fork, trim, compact, close).
8//!
9//! Storage is thread-local because session lifecycle and subscriber dispatch
10//! are owned by the current-thread agent-loop worker. The subscribers register,
11//! fire, and unregister on that same thread, keeping ordering deterministic.
12//!
13//! Lifecycle is explicit. Builtins (`agent_session_open`,
14//! `_reset`, `_fork`, `_fork_at`, `_close`, `_trim`, `_compact`,
15//! `_inject`, `_exists`, `_length`, `_snapshot`, `_ancestry`) drive
16//! the store directly — there is no "policy" config dict that
17//! performs lifecycle as a side effect.
18
19use crate::value::VmDictExt;
20use std::cell::{Cell, RefCell};
21use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
22use std::future::Future;
23use std::path::{Path, PathBuf};
24use std::sync::{Mutex, OnceLock};
25use std::time::Instant;
26
27use crate::actor_chain::ActorChain;
28use crate::agent_transcript_budget::{
29    apply_transcript_with_budget, transcript_budget_policy_json, transcript_budget_usage_json,
30    transcript_message_count, transcript_usage,
31};
32use crate::runtime_limits::RuntimeLimits;
33use crate::value::VmValue;
34use crate::workspace_anchor::{
35    MountMode, MountedRoot, WorkspaceAnchor, WorkspacePolicy, WORKSPACE_ANCHOR_METADATA_KEY,
36};
37
38const LIVE_CLIENT_EVENT_KIND: &str = "live_session_client";
39const LIVE_CLIENT_PERMISSION_EVENT_KIND: &str = "live_session_permission_route";
40
41/// Default cap on concurrent sessions per VM thread. Beyond this the
42/// least-recently-accessed session is evicted on the next `open`.
43pub const DEFAULT_SESSION_CAP: usize = RuntimeLimits::DEFAULT.max_agent_sessions;
44
45/// Default cap on retained prompt-visible messages per session. The
46/// limit is intentionally high enough for normal long-running agents
47/// while still bounding accidental unbounded growth.
48pub const DEFAULT_TRANSCRIPT_MESSAGE_CAP: usize = 4096;
49
50/// Default cap on retained transcript audit events per session. Events
51/// include message-derived entries plus orchestration lifecycle records.
52pub const DEFAULT_TRANSCRIPT_EVENT_CAP: usize = 32768;
53pub const MAX_SCRATCHPAD_BYTES: usize = 16 * 1024;
54#[cfg(debug_assertions)]
55const CACHE_STABLE_SYSTEM_PROMPT_DIAGNOSTIC: &str = "HARN-CACHE-001";
56
57#[derive(Clone, Debug, PartialEq, Eq)]
58pub enum TranscriptBudgetRecovery {
59    Reject,
60    Trim { keep_last: usize },
61    Compact { keep_last: usize },
62}
63
64#[derive(Clone, Debug, PartialEq, Eq)]
65pub struct SessionTranscriptBudgetPolicy {
66    pub max_messages: usize,
67    pub max_events: usize,
68    pub max_approx_bytes: Option<usize>,
69    pub recovery: TranscriptBudgetRecovery,
70}
71
72impl SessionTranscriptBudgetPolicy {
73    pub fn reject(max_messages: usize, max_events: usize) -> Self {
74        Self {
75            max_messages: max_messages.max(1),
76            max_events: max_events.max(1),
77            max_approx_bytes: None,
78            recovery: TranscriptBudgetRecovery::Reject,
79        }
80    }
81
82    pub fn trim(max_messages: usize, max_events: usize, keep_last: usize) -> Self {
83        Self {
84            max_messages: max_messages.max(1),
85            max_events: max_events.max(1),
86            max_approx_bytes: None,
87            recovery: TranscriptBudgetRecovery::Trim { keep_last },
88        }
89    }
90
91    pub fn compact(max_messages: usize, max_events: usize, keep_last: usize) -> Self {
92        Self {
93            max_messages: max_messages.max(1),
94            max_events: max_events.max(1),
95            max_approx_bytes: None,
96            recovery: TranscriptBudgetRecovery::Compact { keep_last },
97        }
98    }
99
100    pub fn with_max_approx_bytes(mut self, max_approx_bytes: Option<usize>) -> Self {
101        self.max_approx_bytes = max_approx_bytes.map(|limit| limit.max(1));
102        self
103    }
104
105    pub(crate) fn normalized(&self) -> Self {
106        Self {
107            max_messages: self.max_messages.max(1),
108            max_events: self.max_events.max(1),
109            max_approx_bytes: self.max_approx_bytes.map(|limit| limit.max(1)),
110            recovery: self.recovery.clone(),
111        }
112    }
113}
114
115impl Default for SessionTranscriptBudgetPolicy {
116    fn default() -> Self {
117        Self::reject(DEFAULT_TRANSCRIPT_MESSAGE_CAP, DEFAULT_TRANSCRIPT_EVENT_CAP)
118    }
119}
120
121pub struct SessionState {
122    pub id: String,
123    pub transcript: VmValue,
124    pub subscribers: Vec<VmValue>,
125    pub created_at: String,
126    pub last_accessed: Instant,
127    pub parent_id: Option<String>,
128    pub child_ids: Vec<String>,
129    pub branched_at_event_index: Option<usize>,
130    pub actor_chain: Option<ActorChain>,
131    /// Names of skills that were active at the end of the most recent
132    /// `agent_loop` run on this session. Empty when no skills were
133    /// matched, when the skill system wasn't used, or when the
134    /// deactivation phase cleared them. Re-entering the session
135    /// restores these as the initial active set before matching runs.
136    pub active_skills: Vec<String>,
137    /// Tool-calling protocol claimed by the first agent loop that uses
138    /// this session. A transcript is only replayable under the same
139    /// contract that produced its prompt/history.
140    pub tool_format: Option<String>,
141    /// Stable session-level system prompt material. This is transcript
142    /// metadata, not a replay message: providers receive it through
143    /// their system/developer instruction channel on each call.
144    pub system_prompt: Option<String>,
145    /// Session-pinned model selector. When set, `llm_call` invocations
146    /// that do not pass an explicit `model:` option resolve to this
147    /// selector instead of `HARN_LLM_MODEL` / provider defaults. Mid-
148    /// session swap is exposed over ACP via `session/set_config_option`
149    /// (configId="model").
150    pub pinned_model: Option<String>,
151    /// Session-pinned high-level reasoning policy. When set, `llm_call`
152    /// invocations that do not pass explicit `thinking` or
153    /// `reasoning_effort` options resolve this provider-aware policy into
154    /// the route's native thinking shape. Exposed over ACP as
155    /// `session/set_config_option(configId="thought_level")`.
156    pub pinned_reasoning_policy: Option<String>,
157    /// Session-local workspace defaults. Persona and host policy layers
158    /// can update this without rewriting the current anchor.
159    pub workspace_policy: WorkspacePolicy,
160    /// Typed workspace anchor for the session. Primary path plus any
161    /// additional mounted roots; consumed by permission matchers, the
162    /// bundle exporter, and host-side cross-project handoff flows
163    /// (epic #2208). `None` until a host opens the session with one or
164    /// the ACP `reanchor` / `add_root` primitives populate it.
165    pub workspace_anchor: Option<WorkspaceAnchor>,
166    /// Small session-local working memory rendered into agent prompts by the
167    /// Harn stdlib agent loop. This is live state, not a replayed message.
168    pub scratchpad: Option<VmValue>,
169    pub scratchpad_version: u64,
170    pub transcript_budget_policy: SessionTranscriptBudgetPolicy,
171    pub last_transcript_budget_action: Option<serde_json::Value>,
172    pub live_clients: BTreeMap<String, LiveSessionClient>,
173    pub live_controller_id: Option<String>,
174    pub completed_turn_checkpoints: Vec<SessionTurnCheckpoint>,
175    pub redo_stack: Vec<SessionRedoEntry>,
176}
177
178impl SessionState {
179    fn new(id: String) -> Self {
180        let now = Instant::now();
181        let transcript = empty_transcript(&id);
182        Self {
183            id,
184            transcript,
185            subscribers: Vec::new(),
186            created_at: crate::orchestration::now_rfc3339(),
187            last_accessed: now,
188            parent_id: None,
189            child_ids: Vec::new(),
190            branched_at_event_index: None,
191            actor_chain: None,
192            active_skills: Vec::new(),
193            tool_format: None,
194            system_prompt: None,
195            pinned_model: None,
196            pinned_reasoning_policy: None,
197            workspace_policy: WorkspacePolicy::default(),
198            workspace_anchor: None,
199            scratchpad: None,
200            scratchpad_version: 0,
201            transcript_budget_policy: default_transcript_budget_policy(),
202            last_transcript_budget_action: None,
203            live_clients: BTreeMap::new(),
204            live_controller_id: None,
205            completed_turn_checkpoints: Vec::new(),
206            redo_stack: Vec::new(),
207        }
208    }
209
210    fn touch(&mut self) {
211        self.last_accessed = Instant::now();
212    }
213
214    pub(crate) fn replace_transcript(&mut self, transcript: VmValue) {
215        if !crate::values_equal(&self.transcript, &transcript) {
216            self.redo_stack.clear();
217        }
218        self.transcript = transcript;
219        self.touch();
220    }
221}
222
223#[derive(Clone, Debug, PartialEq, Eq)]
224pub enum LiveClientMode {
225    Observer,
226    Controller,
227}
228
229impl LiveClientMode {
230    fn as_str(&self) -> &'static str {
231        match self {
232            Self::Observer => "observer",
233            Self::Controller => "controller",
234        }
235    }
236}
237
238#[derive(Clone, Debug, PartialEq, Eq)]
239pub struct LiveSessionClient {
240    pub client_id: String,
241    pub mode: LiveClientMode,
242    pub attached_at: String,
243    pub last_seen_at: String,
244    pub prompt_injection: bool,
245    pub permission_routing: bool,
246    pub metadata: serde_json::Value,
247}
248
249#[derive(Clone, Debug, PartialEq, Eq)]
250pub struct AttachLiveClient {
251    pub client_id: String,
252    pub mode: LiveClientMode,
253    pub takeover: bool,
254    pub prompt_injection: bool,
255    pub permission_routing: bool,
256    pub metadata: serde_json::Value,
257}
258
259#[derive(Clone, Debug, PartialEq, Eq)]
260pub struct LiveClientChange {
261    pub client: Option<LiveSessionClient>,
262    pub previous_controller_id: Option<String>,
263    pub active_controller_id: Option<String>,
264    pub clients: Vec<LiveSessionClient>,
265}
266
267#[derive(Clone, Debug, PartialEq, Eq)]
268pub struct SessionAncestry {
269    pub parent_id: Option<String>,
270    pub child_ids: Vec<String>,
271    pub root_id: String,
272}
273
274#[derive(Clone, Debug, PartialEq, Eq)]
275pub struct SessionTruncateResult {
276    pub kept_turn_count: usize,
277    pub removed_turn_count: usize,
278    pub new_tip_turn_id: Option<String>,
279}
280
281#[derive(Clone, Debug, PartialEq, Eq)]
282pub struct SessionCheckpointSummary {
283    pub checkpoint_id: String,
284    pub before_message_count: usize,
285    pub after_message_count: usize,
286    pub fs_snapshot_ids: Vec<String>,
287}
288
289#[derive(Clone, Debug)]
290pub struct SessionTurnCheckpoint {
291    pub checkpoint_id: String,
292    pub completed_at: String,
293    pub before_transcript: VmValue,
294    pub after_transcript: VmValue,
295    pub before_message_count: usize,
296    pub after_message_count: usize,
297    pub fs_snapshot_ids: Vec<String>,
298}
299
300#[derive(Clone, Debug)]
301pub struct SessionRedoEntry {
302    pub checkpoint: SessionTurnCheckpoint,
303    pub redo_fs_snapshot_ids: Vec<String>,
304}
305
306#[derive(Clone, Debug, PartialEq, Eq)]
307pub enum SessionCheckpointError {
308    UnknownSession,
309    NoCheckpoint,
310    NoRedo,
311}
312
313#[derive(Clone, Debug, PartialEq, Eq)]
314pub struct SessionCheckpointOutcome {
315    pub status: &'static str,
316    pub checkpoint: SessionCheckpointSummary,
317    pub redo_fs_snapshot_ids: Vec<String>,
318}
319
320#[derive(Clone, Debug, PartialEq, Eq)]
321pub struct ReminderInjectionReport {
322    pub reminder_id: String,
323    pub deduped_count: usize,
324}
325
326thread_local! {
327    static SESSIONS: RefCell<HashMap<String, SessionState>> = RefCell::new(HashMap::new());
328    static SESSION_CAP: Cell<usize> = const { Cell::new(DEFAULT_SESSION_CAP) };
329    static DEFAULT_TRANSCRIPT_BUDGET_POLICY: RefCell<SessionTranscriptBudgetPolicy> =
330        RefCell::new(SessionTranscriptBudgetPolicy::default());
331    static CURRENT_SESSION_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
332    static CURRENT_TOOL_CALL_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
333}
334
335tokio::task_local! {
336    static CURRENT_TOOL_CALL_TASK: String;
337}
338
339/// Per-session set of filesystem paths a session has mutated (written or
340/// deleted) via the deterministic hostlib write surface. Keyed by session id
341/// and process-global (not thread-local like [`SESSIONS`]) so a background
342/// fan-out child's writes are attributed correctly regardless of which runtime
343/// thread serviced them. Fed by the single hostlib write chokepoint
344/// (`harn-hostlib`'s `fs_snapshot::auto_capture_for_write`), which is reached
345/// only AFTER a write is policy-approved and about to touch disk — so this
346/// reflects ACTUAL committed mutations, not denied/aborted attempts. The
347/// authoritative source for a sub-agent's `files_written` receipt field.
348static SESSION_CHANGED_PATHS: OnceLock<Mutex<BTreeMap<String, BTreeSet<String>>>> = OnceLock::new();
349
350fn session_changed_paths_store() -> &'static Mutex<BTreeMap<String, BTreeSet<String>>> {
351    SESSION_CHANGED_PATHS.get_or_init(|| Mutex::new(BTreeMap::new()))
352}
353
354/// Record that `path` was mutated (written/deleted) by `session_id`. Called from
355/// the hostlib write chokepoint; a no-op when either argument is empty.
356pub fn record_session_changed_path(session_id: &str, path: &str) {
357    if session_id.is_empty() || path.is_empty() {
358        return;
359    }
360    if let Ok(mut store) = session_changed_paths_store().lock() {
361        store
362            .entry(session_id.to_string())
363            .or_default()
364            .insert(path.to_string());
365    }
366}
367
368/// The sorted set of paths `session_id` has mutated so far (empty when none).
369/// Non-draining: safe to call for introspection without disturbing the record.
370pub fn session_changed_paths(session_id: &str) -> Vec<String> {
371    session_changed_paths_store()
372        .lock()
373        .ok()
374        .and_then(|store| {
375            store
376                .get(session_id)
377                .map(|set| set.iter().cloned().collect())
378        })
379        .unwrap_or_default()
380}
381
382/// Read AND remove a session's mutated-path set. Used at sub-agent teardown so
383/// the receipt captures the child's writes exactly once and the global map does
384/// not grow unbounded across a long-lived daemon's many fan-outs.
385pub fn take_session_changed_paths(session_id: &str) -> Vec<String> {
386    session_changed_paths_store()
387        .lock()
388        .ok()
389        .and_then(|mut store| store.remove(session_id))
390        .map(|set| set.into_iter().collect())
391        .unwrap_or_default()
392}
393
394/// Drop a session's recorded mutated paths (explicit teardown / test reset).
395pub fn clear_session_changed_paths(session_id: &str) {
396    if let Ok(mut store) = session_changed_paths_store().lock() {
397        store.remove(session_id);
398    }
399}
400
401pub struct CurrentSessionGuard {
402    active: bool,
403}
404
405impl Drop for CurrentSessionGuard {
406    fn drop(&mut self) {
407        if self.active {
408            pop_current_session();
409        }
410    }
411}
412
413/// RAII guard that scopes the active tool-call id for the running thread.
414///
415/// Set on entry to a tool dispatch and dropped on exit, so any hostlib
416/// builtin invoked under it (e.g. `tools/write_file`) can resolve the
417/// owning tool call without threading the id through every parameter.
418pub struct CurrentToolCallGuard {
419    active: bool,
420}
421
422impl Drop for CurrentToolCallGuard {
423    fn drop(&mut self) {
424        if self.active {
425            pop_current_tool_call();
426        }
427    }
428}
429
430/// Set the per-thread session cap. Primarily for tests; production VMs
431/// inherit the default.
432pub fn set_session_cap(cap: usize) {
433    SESSION_CAP.with(|c| c.set(cap.max(1)));
434}
435
436pub fn session_cap() -> usize {
437    SESSION_CAP.with(|c| c.get())
438}
439
440pub fn set_default_transcript_budget_policy(policy: SessionTranscriptBudgetPolicy) {
441    DEFAULT_TRANSCRIPT_BUDGET_POLICY.with(|cell| {
442        *cell.borrow_mut() = policy.normalized();
443    });
444}
445
446pub fn reset_default_transcript_budget_policy() {
447    set_default_transcript_budget_policy(SessionTranscriptBudgetPolicy::default());
448}
449
450pub fn default_transcript_budget_policy() -> SessionTranscriptBudgetPolicy {
451    DEFAULT_TRANSCRIPT_BUDGET_POLICY.with(|cell| cell.borrow().clone())
452}
453
454pub fn transcript_budget_policy(id: &str) -> Option<SessionTranscriptBudgetPolicy> {
455    SESSIONS.with(|s| {
456        s.borrow()
457            .get(id)
458            .map(|state| state.transcript_budget_policy.clone())
459    })
460}
461
462pub fn set_transcript_budget_policy(
463    id: &str,
464    policy: SessionTranscriptBudgetPolicy,
465) -> Result<(), String> {
466    SESSIONS.with(|s| {
467        let mut map = s.borrow_mut();
468        let Some(state) = map.get_mut(id) else {
469            return Err(format!("agent session '{id}' does not exist"));
470        };
471        let previous = state.transcript_budget_policy.clone();
472        let previous_action = state.last_transcript_budget_action.clone();
473        state.transcript_budget_policy = policy.normalized();
474        let candidate = state.transcript.clone();
475        if let Err(error) = apply_transcript_with_budget(state, candidate, "policy_update") {
476            state.transcript_budget_policy = previous;
477            state.last_transcript_budget_action = previous_action;
478            return Err(error);
479        }
480        Ok(())
481    })
482}
483
484/// Clear the session store. Wired into `reset_llm_state` for test isolation.
485pub fn reset_session_store() {
486    SESSIONS.with(|s| s.borrow_mut().clear());
487    CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().clear());
488    CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow_mut().clear());
489    reset_default_transcript_budget_policy();
490}
491
492pub(crate) fn push_current_session(id: String) {
493    if id.is_empty() {
494        return;
495    }
496    CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().push(id));
497}
498
499pub(crate) fn swap_current_session_stack(replacement: Vec<String>) -> Vec<String> {
500    CURRENT_SESSION_STACK.with(|stack| std::mem::replace(&mut *stack.borrow_mut(), replacement))
501}
502
503pub(crate) fn pop_current_session() {
504    CURRENT_SESSION_STACK.with(|stack| {
505        let _ = stack.borrow_mut().pop();
506    });
507}
508
509pub fn current_session_id() -> Option<String> {
510    CURRENT_SESSION_STACK.with(|stack| stack.borrow().last().cloned())
511}
512
513pub fn current_actor_chain() -> Option<ActorChain> {
514    current_session_id().as_deref().and_then(actor_chain)
515}
516
517pub fn enter_current_session(id: impl Into<String>) -> CurrentSessionGuard {
518    let id = id.into();
519    if id.trim().is_empty() {
520        return CurrentSessionGuard { active: false };
521    }
522    push_current_session(id);
523    CurrentSessionGuard { active: true }
524}
525
526pub fn actor_chain(id: &str) -> Option<ActorChain> {
527    SESSIONS.with(|s| {
528        s.borrow()
529            .get(id)
530            .and_then(|state| state.actor_chain.clone())
531    })
532}
533
534pub fn set_actor_chain(id: &str, actor_chain: Option<ActorChain>) -> Result<bool, String> {
535    SESSIONS.with(|s| {
536        let mut map = s.borrow_mut();
537        let Some(state) = map.get_mut(id) else {
538            return Err(format!("agent session '{id}' does not exist"));
539        };
540        let changed = state.actor_chain != actor_chain;
541        state.actor_chain = actor_chain;
542        state.touch();
543        Ok(changed)
544    })
545}
546
547fn push_current_tool_call(id: String) {
548    if id.is_empty() {
549        return;
550    }
551    CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow_mut().push(id));
552}
553
554fn pop_current_tool_call() {
555    CURRENT_TOOL_CALL_STACK.with(|stack| {
556        let _ = stack.borrow_mut().pop();
557    });
558}
559
560/// Return the active tool-call id for the current thread, if any.
561///
562/// Hostlib builtins consult this to attribute side-effect snapshots to
563/// the owning ACP `toolCallId` without callers passing it explicitly.
564pub fn current_tool_call_id() -> Option<String> {
565    if let Ok(id) = CURRENT_TOOL_CALL_TASK.try_with(Clone::clone) {
566        if !id.trim().is_empty() {
567            return Some(id);
568        }
569    }
570    CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow().last().cloned())
571}
572
573/// Scope the active tool-call id to one async task.
574///
575/// Parallel tool dispatch runs sibling calls on the same OS thread, so
576/// thread-local guards alone cannot preserve attribution across `.await`
577/// points. Tokio task-local scoping follows the future instead.
578pub async fn scope_current_tool_call<F, T>(id: impl Into<String>, future: F) -> T
579where
580    F: Future<Output = T>,
581{
582    let id = id.into();
583    if id.trim().is_empty() {
584        future.await
585    } else {
586        CURRENT_TOOL_CALL_TASK.scope(id, future).await
587    }
588}
589
590/// Scope the active tool-call id for the duration of the returned guard.
591pub fn enter_current_tool_call(id: impl Into<String>) -> CurrentToolCallGuard {
592    let id = id.into();
593    if id.trim().is_empty() {
594        return CurrentToolCallGuard { active: false };
595    }
596    push_current_tool_call(id);
597    CurrentToolCallGuard { active: true }
598}
599
600pub fn exists(id: &str) -> bool {
601    SESSIONS.with(|s| s.borrow().contains_key(id))
602}
603
604pub fn length(id: &str) -> Option<usize> {
605    SESSIONS.with(|s| {
606        s.borrow().get(id).map(|state| {
607            state
608                .transcript
609                .as_dict()
610                .and_then(|d| d.get("messages"))
611                .and_then(|v| match v {
612                    VmValue::List(list) => Some(list.len()),
613                    _ => None,
614                })
615                .unwrap_or(0)
616        })
617    })
618}
619
620pub fn scratchpad(id: &str) -> Option<VmValue> {
621    SESSIONS.with(|s| {
622        s.borrow()
623            .get(id)
624            .and_then(|state| state.scratchpad.clone())
625    })
626}
627
628pub fn scratchpad_version(id: &str) -> Option<u64> {
629    SESSIONS.with(|s| s.borrow().get(id).map(|state| state.scratchpad_version))
630}
631
632pub fn set_scratchpad(
633    id: &str,
634    scratchpad: VmValue,
635    source: impl Into<String>,
636    reason: Option<String>,
637    metadata: serde_json::Value,
638) -> Result<u64, String> {
639    validate_scratchpad_value(&scratchpad)?;
640    SESSIONS.with(|s| {
641        let mut map = s.borrow_mut();
642        let Some(state) = map.get_mut(id) else {
643            return Err(format!("agent session '{id}' does not exist"));
644        };
645        let version = state.scratchpad_version.saturating_add(1);
646        let event = scratchpad_transcript_event(
647            "set",
648            version,
649            Some(&scratchpad),
650            source.into(),
651            reason,
652            metadata,
653        );
654        append_event_to_state(state, event, "set_scratchpad")?;
655        state.scratchpad = Some(scratchpad);
656        state.scratchpad_version = version;
657        state.touch();
658        Ok(version)
659    })
660}
661
662pub fn clear_scratchpad(
663    id: &str,
664    source: impl Into<String>,
665    reason: Option<String>,
666    metadata: serde_json::Value,
667) -> Result<u64, String> {
668    SESSIONS.with(|s| {
669        let mut map = s.borrow_mut();
670        let Some(state) = map.get_mut(id) else {
671            return Err(format!("agent session '{id}' does not exist"));
672        };
673        let version = state.scratchpad_version.saturating_add(1);
674        let event =
675            scratchpad_transcript_event("clear", version, None, source.into(), reason, metadata);
676        append_event_to_state(state, event, "clear_scratchpad")?;
677        state.scratchpad = None;
678        state.scratchpad_version = version;
679        state.touch();
680        Ok(version)
681    })
682}
683
684fn validate_scratchpad_value(value: &VmValue) -> Result<(), String> {
685    if !matches!(value, VmValue::Dict(_)) {
686        return Err("agent session scratchpad must be a dict".to_string());
687    }
688    let json = crate::llm::helpers::vm_value_to_json(value);
689    let approx_bytes = serde_json::to_vec(&json)
690        .map(|bytes| bytes.len())
691        .unwrap_or(usize::MAX);
692    if approx_bytes > MAX_SCRATCHPAD_BYTES {
693        return Err(format!(
694            "agent session scratchpad is {approx_bytes} bytes; max is {MAX_SCRATCHPAD_BYTES}"
695        ));
696    }
697    Ok(())
698}
699
700fn scratchpad_transcript_event(
701    action: &str,
702    version: u64,
703    scratchpad: Option<&VmValue>,
704    source: String,
705    reason: Option<String>,
706    metadata: serde_json::Value,
707) -> VmValue {
708    let scratchpad_json = scratchpad.map(crate::llm::helpers::vm_value_to_json);
709    let approx_bytes = scratchpad_json
710        .as_ref()
711        .and_then(|value| serde_json::to_vec(value).ok().map(|bytes| bytes.len()))
712        .unwrap_or(0);
713    let event_metadata = serde_json::json!({
714        "action": action,
715        "version": version,
716        "source": normalize_scratchpad_source(source),
717        "reason": reason.unwrap_or_default(),
718        "approx_bytes": approx_bytes,
719        "counts": scratchpad_json
720            .as_ref()
721            .map(scratchpad_counts_json)
722            .unwrap_or_else(|| serde_json::json!({})),
723        "metadata": metadata,
724    });
725    let content = format!("Agent scratchpad {action}");
726    crate::llm::helpers::transcript_event(
727        "agent_scratchpad",
728        "system",
729        "internal",
730        &content,
731        Some(event_metadata),
732    )
733}
734
735fn normalize_scratchpad_source(source: String) -> String {
736    let trimmed = source.trim();
737    if trimmed.is_empty() {
738        "harn.agent_scratchpad".to_string()
739    } else {
740        trimmed.to_string()
741    }
742}
743
744fn scratchpad_counts_json(value: &serde_json::Value) -> serde_json::Value {
745    serde_json::json!({
746        "goals": scratchpad_array_len(value, "goals"),
747        "open_items": scratchpad_array_len(value, "open_items"),
748        "facts": scratchpad_array_len(value, "facts"),
749        "refs": scratchpad_array_len(value, "refs"),
750    })
751}
752
753fn scratchpad_array_len(value: &serde_json::Value, key: &str) -> usize {
754    value
755        .get(key)
756        .and_then(serde_json::Value::as_array)
757        .map(Vec::len)
758        .unwrap_or(0)
759}
760
761pub fn snapshot(id: &str) -> Option<VmValue> {
762    SESSIONS.with(|s| s.borrow().get(id).map(session_snapshot))
763}
764
765/// Session-only fields stay on `agent_session_snapshot`.
766pub fn transcript(id: &str) -> Option<VmValue> {
767    SESSIONS.with(|s| {
768        s.borrow()
769            .get(id)
770            .map(|state| transcript_with_session_metadata(state.transcript.clone(), state))
771    })
772}
773
774/// Open a session, or create it if missing. Returns the resolved id.
775///
776/// Newly-created sessions auto-register an event-log-backed sink when a
777/// generalized [`crate::event_log::EventLog`] has been installed for the
778/// current VM thread. For legacy env-driven workflows that still point
779/// `HARN_EVENT_LOG_DIR` at a directory, we preserve the older JSONL sink
780/// as a compatibility fallback. Re-opening an existing session does not
781/// re-register — sinks are per-session, owned by the first opener.
782pub fn open_or_create(id: Option<String>) -> String {
783    open_or_create_with_actor_chain(id, None)
784}
785
786pub fn open_or_create_with_actor_chain(
787    id: Option<String>,
788    requested_actor_chain: Option<ActorChain>,
789) -> String {
790    let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
791    let parent_session = current_session_id();
792    let inherited_actor_chain = requested_actor_chain
793        .clone()
794        .or_else(|| parent_session.as_deref().and_then(actor_chain));
795    let mut was_new = false;
796    SESSIONS.with(|s| {
797        let mut map = s.borrow_mut();
798        if let Some(state) = map.get_mut(&resolved) {
799            if let Some(actor_chain) = requested_actor_chain.clone() {
800                state.actor_chain = Some(actor_chain);
801            }
802            state.touch();
803            return;
804        }
805        was_new = true;
806        let cap = SESSION_CAP.with(|c| c.get());
807        if map.len() >= cap {
808            if let Some(victim) = map
809                .iter()
810                .min_by_key(|(_, state)| state.last_accessed)
811                .map(|(id, _)| id.clone())
812            {
813                map.remove(&victim);
814            }
815        }
816        let mut state = SessionState::new(resolved.clone());
817        state.actor_chain = inherited_actor_chain.clone();
818        map.insert(resolved.clone(), state);
819    });
820    if was_new {
821        if let Some(parent) = parent_session.as_deref() {
822            crate::agent_events::mirror_session_sinks(parent, &resolved);
823        }
824        try_register_event_log(&resolved);
825    }
826    resolved
827}
828
829pub fn open_child_session(parent_id: &str, id: Option<String>) -> String {
830    open_child_session_with_actor(parent_id, id, None)
831}
832
833pub fn open_child_session_with_actor(
834    parent_id: &str,
835    id: Option<String>,
836    actor: Option<&str>,
837) -> String {
838    let actor_chain = actor_chain(parent_id).map(|chain| match actor {
839        Some(actor) if !actor.trim().is_empty() => chain.pushed(actor.trim()),
840        _ => chain,
841    });
842    let resolved = open_or_create_with_actor_chain(id, actor_chain);
843    link_child_session(parent_id, &resolved);
844    resolved
845}
846
847pub fn link_child_session(parent_id: &str, child_id: &str) {
848    link_child_session_with_branch(parent_id, child_id, None);
849}
850
851pub fn link_child_session_with_branch(
852    parent_id: &str,
853    child_id: &str,
854    branched_at_event_index: Option<usize>,
855) {
856    if parent_id == child_id {
857        return;
858    }
859    open_or_create(Some(parent_id.to_string()));
860    open_or_create(Some(child_id.to_string()));
861    SESSIONS.with(|s| {
862        let mut map = s.borrow_mut();
863        update_lineage(&mut map, parent_id, child_id, branched_at_event_index);
864    });
865}
866
867pub fn parent_id(id: &str) -> Option<String> {
868    SESSIONS.with(|s| s.borrow().get(id).and_then(|state| state.parent_id.clone()))
869}
870
871pub fn child_ids(id: &str) -> Vec<String> {
872    SESSIONS.with(|s| {
873        s.borrow()
874            .get(id)
875            .map(|state| state.child_ids.clone())
876            .unwrap_or_default()
877    })
878}
879
880pub fn ancestry(id: &str) -> Option<SessionAncestry> {
881    SESSIONS.with(|s| {
882        let map = s.borrow();
883        let state = map.get(id)?;
884        let mut root_id = state.id.clone();
885        let mut cursor = state.parent_id.clone();
886        let mut seen = HashSet::from([state.id.clone()]);
887        while let Some(parent_id) = cursor {
888            if !seen.insert(parent_id.clone()) {
889                break;
890            }
891            root_id = parent_id.clone();
892            cursor = map
893                .get(&parent_id)
894                .and_then(|parent| parent.parent_id.clone());
895        }
896        Some(SessionAncestry {
897            parent_id: state.parent_id.clone(),
898            child_ids: state.child_ids.clone(),
899            root_id,
900        })
901    })
902}
903
904pub fn live_clients(id: &str) -> Option<Vec<LiveSessionClient>> {
905    SESSIONS.with(|s| {
906        s.borrow()
907            .get(id)
908            .map(|state| state.live_clients.values().cloned().collect())
909    })
910}
911
912pub fn attach_live_client(id: &str, request: AttachLiveClient) -> Result<LiveClientChange, String> {
913    SESSIONS.with(|s| {
914        let mut map = s.borrow_mut();
915        let Some(state) = map.get_mut(id) else {
916            return Err(format!("agent session '{id}' does not exist"));
917        };
918        let client_id = validate_live_client_id(request.client_id)?;
919        let now = crate::orchestration::now_rfc3339();
920        let previous_clients = state.live_clients.clone();
921        let previous_controller_id = state.live_controller_id.clone();
922
923        if request.mode == LiveClientMode::Controller {
924            let conflicting_controller = previous_controller_id
925                .as_ref()
926                .filter(|controller_id| *controller_id != &client_id)
927                .filter(|controller_id| state.live_clients.contains_key(*controller_id));
928            if let Some(previous) = conflicting_controller {
929                if !request.takeover {
930                    return Err(format!("live session already has controller '{previous}'"));
931                }
932                if let Some(previous_client) = state.live_clients.get_mut(previous) {
933                    previous_client.mode = LiveClientMode::Observer;
934                    previous_client.prompt_injection = false;
935                    previous_client.permission_routing = false;
936                    previous_client.last_seen_at = now.clone();
937                }
938            }
939            state.live_controller_id = Some(client_id.clone());
940        } else if state.live_controller_id.as_deref() == Some(client_id.as_str()) {
941            state.live_controller_id = None;
942        }
943
944        let attached_at = state
945            .live_clients
946            .get(&client_id)
947            .map(|client| client.attached_at.clone())
948            .unwrap_or_else(|| now.clone());
949        let client = LiveSessionClient {
950            client_id: client_id.clone(),
951            mode: request.mode,
952            attached_at,
953            last_seen_at: now,
954            prompt_injection: request.prompt_injection,
955            permission_routing: request.permission_routing,
956            metadata: request.metadata,
957        };
958        state.live_clients.insert(client_id, client.clone());
959        state.touch();
960        let active_controller_id = state.live_controller_id.clone();
961        append_live_client_event(
962            state,
963            "attached",
964            Some(&client),
965            previous_controller_id.as_deref(),
966            active_controller_id.as_deref(),
967            serde_json::Value::Null,
968        )
969        .inspect_err(|_error| {
970            state.live_clients = previous_clients;
971            state.live_controller_id = previous_controller_id.clone();
972        })?;
973        Ok(live_client_change(
974            Some(client),
975            previous_controller_id,
976            state,
977        ))
978    })
979}
980
981pub fn takeover_live_client(
982    id: &str,
983    client_id: impl Into<String>,
984    metadata: serde_json::Value,
985) -> Result<LiveClientChange, String> {
986    attach_live_client(
987        id,
988        AttachLiveClient {
989            client_id: client_id.into(),
990            mode: LiveClientMode::Controller,
991            takeover: true,
992            prompt_injection: true,
993            permission_routing: true,
994            metadata,
995        },
996    )
997}
998
999pub fn detach_live_client(
1000    id: &str,
1001    client_id: impl Into<String>,
1002    reason: Option<String>,
1003    metadata: serde_json::Value,
1004) -> Result<LiveClientChange, String> {
1005    SESSIONS.with(|s| {
1006        let mut map = s.borrow_mut();
1007        let Some(state) = map.get_mut(id) else {
1008            return Err(format!("agent session '{id}' does not exist"));
1009        };
1010        let client_id = validate_live_client_id(client_id.into())?;
1011        let previous_clients = state.live_clients.clone();
1012        let previous_controller_id = state.live_controller_id.clone();
1013        let Some(mut client) = state.live_clients.remove(&client_id) else {
1014            return Err(format!("live client '{client_id}' is not attached"));
1015        };
1016        client.last_seen_at = crate::orchestration::now_rfc3339();
1017        if state.live_controller_id.as_deref() == Some(client_id.as_str()) {
1018            state.live_controller_id = None;
1019        }
1020        state.touch();
1021        let active_controller_id = state.live_controller_id.clone();
1022        append_live_client_event(
1023            state,
1024            "detached",
1025            Some(&client),
1026            previous_controller_id.as_deref(),
1027            active_controller_id.as_deref(),
1028            serde_json::json!({
1029                "reason": reason.unwrap_or_else(|| "client_detached".to_string()),
1030                "metadata": metadata,
1031            }),
1032        )
1033        .inspect_err(|_error| {
1034            state.live_clients = previous_clients;
1035            state.live_controller_id = previous_controller_id.clone();
1036        })?;
1037        Ok(live_client_change(None, previous_controller_id, state))
1038    })
1039}
1040
1041pub fn heartbeat_live_client(
1042    id: &str,
1043    client_id: impl Into<String>,
1044    metadata: serde_json::Value,
1045) -> Result<LiveClientChange, String> {
1046    SESSIONS.with(|s| {
1047        let mut map = s.borrow_mut();
1048        let Some(state) = map.get_mut(id) else {
1049            return Err(format!("agent session '{id}' does not exist"));
1050        };
1051        let client_id = validate_live_client_id(client_id.into())?;
1052        let previous_clients = state.live_clients.clone();
1053        let previous_controller_id = state.live_controller_id.clone();
1054        let Some(client) = state.live_clients.get_mut(&client_id) else {
1055            return Err(format!("live client '{client_id}' is not attached"));
1056        };
1057        client.last_seen_at = crate::orchestration::now_rfc3339();
1058        if !metadata.is_null() {
1059            client.metadata = metadata.clone();
1060        }
1061        let client = client.clone();
1062        state.touch();
1063        let active_controller_id = state.live_controller_id.clone();
1064        append_live_client_event(
1065            state,
1066            "heartbeat",
1067            Some(&client),
1068            previous_controller_id.as_deref(),
1069            active_controller_id.as_deref(),
1070            serde_json::json!({ "metadata": metadata }),
1071        )
1072        .inspect_err(|_error| {
1073            state.live_clients = previous_clients;
1074            state.live_controller_id = previous_controller_id.clone();
1075        })?;
1076        Ok(live_client_change(
1077            Some(client),
1078            previous_controller_id,
1079            state,
1080        ))
1081    })
1082}
1083
1084pub fn inject_prompt_from_live_client(
1085    id: &str,
1086    client_id: impl Into<String>,
1087    content: VmValue,
1088    metadata: serde_json::Value,
1089) -> Result<(), String> {
1090    let client_id = validate_live_client_id(client_id.into())?;
1091    ensure_live_controller(id, &client_id, LiveControllerCapability::PromptInjection)?;
1092    let mut message = BTreeMap::new();
1093    message.put_str("role", "user");
1094    message.insert("content".to_string(), content);
1095    message.insert(
1096        "metadata".to_string(),
1097        crate::stdlib::json_to_vm_value(&serde_json::json!({
1098            "live_session": {
1099                "client_id": client_id,
1100                "mode": "controller",
1101                "source": "live_session_attach",
1102                "metadata": metadata,
1103            }
1104        })),
1105    );
1106    inject_message(id, VmValue::dict(message))
1107}
1108
1109pub fn route_live_permission_request(
1110    id: &str,
1111    client_id: impl Into<String>,
1112    request: serde_json::Value,
1113    metadata: serde_json::Value,
1114) -> Result<serde_json::Value, String> {
1115    let client_id = validate_live_client_id(client_id.into())?;
1116    let client =
1117        ensure_live_controller(id, &client_id, LiveControllerCapability::PermissionRouting)?;
1118    let request_id = request
1119        .get("id")
1120        .or_else(|| request.get("request_id"))
1121        .and_then(serde_json::Value::as_str)
1122        .filter(|id| !id.trim().is_empty())
1123        .unwrap_or("permission_request");
1124    let event_metadata = serde_json::json!({
1125        "action": "permission_routed",
1126        "client": live_client_json(&client),
1127        "request_id": request_id,
1128        "request": request,
1129        "metadata": metadata,
1130    });
1131    let event = crate::llm::helpers::transcript_event(
1132        LIVE_CLIENT_PERMISSION_EVENT_KIND,
1133        "system",
1134        "internal",
1135        "Live session permission request routed",
1136        Some(event_metadata.clone()),
1137    );
1138    append_event(id, event)?;
1139    Ok(event_metadata)
1140}
1141
1142enum LiveControllerCapability {
1143    PromptInjection,
1144    PermissionRouting,
1145}
1146
1147fn ensure_live_controller(
1148    id: &str,
1149    client_id: &str,
1150    capability: LiveControllerCapability,
1151) -> Result<LiveSessionClient, String> {
1152    SESSIONS.with(|s| {
1153        let map = s.borrow();
1154        let Some(state) = map.get(id) else {
1155            return Err(format!("agent session '{id}' does not exist"));
1156        };
1157        if state.live_controller_id.as_deref() != Some(client_id) {
1158            return Err(format!(
1159                "live client '{client_id}' is not the active controller"
1160            ));
1161        }
1162        let Some(client) = state.live_clients.get(client_id) else {
1163            return Err(format!("live client '{client_id}' is not attached"));
1164        };
1165        match capability {
1166            LiveControllerCapability::PromptInjection if !client.prompt_injection => Err(format!(
1167                "live client '{client_id}' cannot inject prompts for this session"
1168            )),
1169            LiveControllerCapability::PermissionRouting if !client.permission_routing => Err(
1170                format!("live client '{client_id}' cannot route permissions for this session"),
1171            ),
1172            _ => Ok(client.clone()),
1173        }
1174    })
1175}
1176
1177fn append_live_client_event(
1178    state: &mut SessionState,
1179    action: &str,
1180    client: Option<&LiveSessionClient>,
1181    previous_controller_id: Option<&str>,
1182    active_controller_id: Option<&str>,
1183    extra: serde_json::Value,
1184) -> Result<(), String> {
1185    let metadata = serde_json::json!({
1186        "action": action,
1187        "session_id": state.id,
1188        "client": client.map(live_client_json),
1189        "previous_controller_id": previous_controller_id,
1190        "active_controller_id": active_controller_id,
1191        "clients": state
1192            .live_clients
1193            .values()
1194            .map(live_client_json)
1195            .collect::<Vec<_>>(),
1196        "extra": extra,
1197    });
1198    let event = crate::llm::helpers::transcript_event(
1199        LIVE_CLIENT_EVENT_KIND,
1200        "system",
1201        "internal",
1202        "Live session client lifecycle changed",
1203        Some(metadata),
1204    );
1205    append_event_to_state(state, event, "live_client")
1206}
1207
1208fn live_client_change(
1209    client: Option<LiveSessionClient>,
1210    previous_controller_id: Option<String>,
1211    state: &SessionState,
1212) -> LiveClientChange {
1213    LiveClientChange {
1214        client,
1215        previous_controller_id,
1216        active_controller_id: state.live_controller_id.clone(),
1217        clients: state.live_clients.values().cloned().collect(),
1218    }
1219}
1220
1221fn validate_live_client_id(id: impl Into<String>) -> Result<String, String> {
1222    let id = id.into();
1223    let trimmed = id.trim();
1224    if trimmed.is_empty() {
1225        return Err("live client id cannot be empty".to_string());
1226    }
1227    Ok(trimmed.to_string())
1228}
1229
1230pub fn live_client_json(client: &LiveSessionClient) -> serde_json::Value {
1231    serde_json::json!({
1232        "client_id": client.client_id,
1233        "mode": client.mode.as_str(),
1234        "attached_at": client.attached_at,
1235        "last_seen_at": client.last_seen_at,
1236        "prompt_injection": client.prompt_injection,
1237        "permission_routing": client.permission_routing,
1238        "metadata": client.metadata,
1239    })
1240}
1241
1242pub fn live_client_change_json(change: &LiveClientChange) -> serde_json::Value {
1243    serde_json::json!({
1244        "client": change.client.as_ref().map(live_client_json),
1245        "previous_controller_id": change.previous_controller_id,
1246        "active_controller_id": change.active_controller_id,
1247        "clients": change
1248            .clients
1249            .iter()
1250            .map(live_client_json)
1251            .collect::<Vec<_>>(),
1252    })
1253}
1254
1255/// Auto-register a persistent sink for a newly-created session.
1256/// Silent no-op on failure — a broken observability sink must never
1257/// prevent a session from starting.
1258fn try_register_event_log(session_id: &str) {
1259    if let Some(log) = crate::event_log::active_event_log() {
1260        crate::agent_events::register_sink(
1261            session_id,
1262            crate::agent_events::EventLogSink::new(log, session_id),
1263        );
1264        return;
1265    }
1266    let Ok(dir) = std::env::var("HARN_EVENT_LOG_DIR") else {
1267        return;
1268    };
1269    if dir.is_empty() {
1270        return;
1271    }
1272    let path = std::path::PathBuf::from(dir).join(format!("event_log-{session_id}.jsonl"));
1273    if let Ok(sink) = crate::agent_events::JsonlEventSink::open(&path) {
1274        crate::agent_events::register_sink(session_id, sink);
1275    }
1276}
1277
1278pub fn register_event_log_sink(session_id: &str) {
1279    try_register_event_log(session_id);
1280}
1281
1282pub fn close(id: &str) {
1283    SESSIONS.with(|s| {
1284        s.borrow_mut().remove(id);
1285    });
1286    // Cross-thread per-session state must be released too, otherwise
1287    // pending inbox entries can be delivered to a future session that
1288    // happens to reuse the same id.
1289    crate::orchestration::agent_inbox::clear_session(id);
1290    crate::agent_events::clear_session_sinks(id);
1291}
1292
1293pub fn close_with_status(
1294    id: &str,
1295    reason: impl Into<String>,
1296    status: impl Into<String>,
1297    metadata: serde_json::Value,
1298) -> bool {
1299    if !exists(id) {
1300        return false;
1301    }
1302    let reason = reason.into();
1303    let status = status.into();
1304    let event_metadata = serde_json::json!({
1305        "reason": reason,
1306        "status": status,
1307        "metadata": metadata,
1308    });
1309    let transcript_event = crate::llm::helpers::transcript_event(
1310        "agent_session_closed",
1311        "system",
1312        "internal",
1313        "Agent session closed",
1314        Some(event_metadata),
1315    );
1316    let _ = append_event(id, transcript_event);
1317    crate::llm::emit_live_agent_event_sync(&crate::agent_events::AgentEvent::SessionClosed {
1318        session_id: id.to_string(),
1319        reason,
1320        status,
1321        metadata,
1322    });
1323    close(id);
1324    true
1325}
1326
1327pub fn reset_transcript(id: &str) -> bool {
1328    SESSIONS.with(|s| {
1329        let mut map = s.borrow_mut();
1330        let Some(state) = map.get_mut(id) else {
1331            return false;
1332        };
1333        state.transcript = empty_transcript(id);
1334        state.tool_format = None;
1335        state.system_prompt = None;
1336        state.scratchpad = None;
1337        state.scratchpad_version = 0;
1338        state.last_transcript_budget_action = None;
1339        state.completed_turn_checkpoints.clear();
1340        state.redo_stack.clear();
1341        state.touch();
1342        true
1343    })
1344}
1345
1346/// Copy `src`'s transcript into a new session id. Subscribers are NOT
1347/// copied — a fork is a conversation branch, not an event fanout.
1348///
1349/// Touches `src`'s `last_accessed` before evicting, so the fork
1350/// operation itself can't make `src` look stale and kick it out of
1351/// the LRU just to make room for the new fork.
1352pub fn fork(src_id: &str, dst_id: Option<String>) -> Option<String> {
1353    let (
1354        src_transcript,
1355        src_tool_format,
1356        src_system_prompt,
1357        src_pinned_model,
1358        src_pinned_reasoning_policy,
1359        src_actor_chain,
1360        src_workspace_anchor,
1361        src_workspace_policy,
1362        src_scratchpad,
1363        src_scratchpad_version,
1364        src_transcript_budget_policy,
1365        src_last_transcript_budget_action,
1366        dst,
1367    ) = SESSIONS.with(|s| {
1368        let mut map = s.borrow_mut();
1369        let src = map.get_mut(src_id)?;
1370        src.touch();
1371        let dst = dst_id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
1372        let forked_transcript = clone_transcript_with_id(&src.transcript, &dst);
1373        Some((
1374            forked_transcript,
1375            src.tool_format.clone(),
1376            src.system_prompt.clone(),
1377            src.pinned_model.clone(),
1378            src.pinned_reasoning_policy.clone(),
1379            src.actor_chain.clone(),
1380            src.workspace_anchor.clone(),
1381            src.workspace_policy.clone(),
1382            src.scratchpad.clone(),
1383            src.scratchpad_version,
1384            src.transcript_budget_policy.clone(),
1385            src.last_transcript_budget_action.clone(),
1386            dst,
1387        ))
1388    })?;
1389    // Ensure cap is respected when inserting the fork.
1390    open_or_create(Some(dst.clone()));
1391    SESSIONS.with(|s| {
1392        let mut map = s.borrow_mut();
1393        if let Some(state) = map.get_mut(&dst) {
1394            state.transcript = src_transcript;
1395            state.tool_format = src_tool_format;
1396            state.system_prompt = src_system_prompt;
1397            state.pinned_model = src_pinned_model;
1398            state.pinned_reasoning_policy = src_pinned_reasoning_policy;
1399            state.actor_chain = src_actor_chain;
1400            state.workspace_anchor = src_workspace_anchor;
1401            state.workspace_policy = src_workspace_policy;
1402            state.scratchpad = src_scratchpad;
1403            state.scratchpad_version = src_scratchpad_version;
1404            state.transcript_budget_policy = src_transcript_budget_policy;
1405            state.last_transcript_budget_action = src_last_transcript_budget_action;
1406            state.touch();
1407        }
1408        update_lineage(&mut map, src_id, &dst, None);
1409    });
1410    let budget_ok = SESSIONS.with(|s| {
1411        let mut map = s.borrow_mut();
1412        let Some(state) = map.get_mut(&dst) else {
1413            return false;
1414        };
1415        let candidate = state.transcript.clone();
1416        apply_transcript_with_budget(state, candidate, "fork").is_ok()
1417    });
1418    if !budget_ok {
1419        close(&dst);
1420        return None;
1421    }
1422    // open_or_create evicts BEFORE inserting, so the dst slot is
1423    // guaranteed once we get here. The existence check is cheap
1424    // insurance against a future refactor that breaks that invariant.
1425    if exists(&dst) {
1426        Some(dst)
1427    } else {
1428        None
1429    }
1430}
1431
1432/// Fork `src_id` and truncate the destination transcript to the
1433/// first `keep_first` messages (#105 — branch-replay). Pairs with the
1434/// scrubber: the host picks an event index, rebuilds a message count,
1435/// and calls this to spawn a live sibling session that resumes from
1436/// the rebuilt state. Subscribers are not carried over (same as
1437/// `fork`), so sibling events don't double-fan into the parent's
1438/// consumers.
1439///
1440/// Returns the new session id on success, `None` if `src_id` doesn't
1441/// exist.
1442pub fn fork_at(src_id: &str, keep_first: usize, dst_id: Option<String>) -> Option<String> {
1443    let branched_at_event_index = SESSIONS.with(|s| {
1444        let map = s.borrow();
1445        let src = map.get(src_id)?;
1446        Some(branch_event_index(&src.transcript, keep_first))
1447    })?;
1448    let new_id = fork(src_id, dst_id)?;
1449    link_child_session_with_branch(src_id, &new_id, Some(branched_at_event_index));
1450    let _ = truncate(&new_id, keep_first);
1451    Some(new_id)
1452}
1453
1454/// Truncate the session transcript to the first `keep_first`
1455/// messages (opposite of `trim`, which keeps the last N). Returns
1456/// counts and the retained tip event id when the session exists.
1457pub fn truncate(id: &str, keep_first: usize) -> Option<SessionTruncateResult> {
1458    SESSIONS.with(|s| {
1459        let mut map = s.borrow_mut();
1460        let state = map.get_mut(id)?;
1461        let result = truncate_state(state, keep_first)?;
1462        Some(result)
1463    })
1464}
1465
1466fn truncate_state(state: &mut SessionState, keep_first: usize) -> Option<SessionTruncateResult> {
1467    let dict = state
1468        .transcript
1469        .as_dict()
1470        .cloned()
1471        .unwrap_or_else(crate::value::DictMap::new);
1472    let messages: Vec<VmValue> = match dict.get("messages") {
1473        Some(VmValue::List(list)) => list.iter().cloned().collect(),
1474        _ => Vec::new(),
1475    };
1476    let existing_events = match dict.get("events") {
1477        Some(VmValue::List(list)) => Some(list.iter().cloned().collect::<Vec<_>>()),
1478        _ => None,
1479    };
1480    let kept_turn_count = keep_first.min(messages.len());
1481    let removed_turn_count = messages.len().saturating_sub(kept_turn_count);
1482    let mut new_tip_turn_id = existing_events
1483        .as_ref()
1484        .map(|events| turn_event_id_for_count(events, kept_turn_count))
1485        .unwrap_or_else(|| {
1486            let events = crate::llm::helpers::transcript_events_from_messages(&messages);
1487            turn_event_id_for_count(&events, kept_turn_count)
1488        });
1489
1490    if removed_turn_count > 0 {
1491        let retained: Vec<VmValue> = messages.into_iter().take(kept_turn_count).collect();
1492        let retained_events = match existing_events {
1493            Some(events) => {
1494                let keep_event_count = event_prefix_len_for_messages(&events, kept_turn_count);
1495                events.into_iter().take(keep_event_count).collect()
1496            }
1497            None => crate::llm::helpers::transcript_events_from_messages(&retained),
1498        };
1499        new_tip_turn_id = turn_event_id_for_count(&retained_events, kept_turn_count);
1500        let mut next = dict;
1501        next.insert(
1502            crate::value::intern_key("events"),
1503            VmValue::List(std::sync::Arc::new(retained_events)),
1504        );
1505        next.insert(
1506            crate::value::intern_key("messages"),
1507            VmValue::List(std::sync::Arc::new(retained)),
1508        );
1509        next.remove("summary");
1510        apply_transcript_with_budget(state, VmValue::dict(next), "truncate").ok()?;
1511    }
1512    state.touch();
1513    Some(SessionTruncateResult {
1514        kept_turn_count,
1515        removed_turn_count,
1516        new_tip_turn_id,
1517    })
1518}
1519
1520/// Pop the trailing message iff it is an assistant message. Used by
1521/// `agent_step_judge` to remove a vetoed assistant turn before
1522/// regeneration (the "replace" on_veto path). Returns `true` if a
1523/// message was popped, `false` if the transcript was empty, and an
1524/// error if the trailing message was not an assistant turn —
1525/// signalling a call-site discipline bug rather than a runtime error.
1526pub fn pop_last_if_assistant(id: &str) -> Result<bool, String> {
1527    SESSIONS.with(|s| {
1528        let mut map = s.borrow_mut();
1529        let Some(state) = map.get_mut(id) else {
1530            return Err(format!(
1531                "pop_last_if_assistant: unknown session id '{id}'"
1532            ));
1533        };
1534        let messages: Vec<VmValue> = match state.transcript.as_dict() {
1535            Some(dict) => match dict.get("messages") {
1536                Some(VmValue::List(list)) => list.iter().cloned().collect(),
1537                _ => Vec::new(),
1538            },
1539            None => Vec::new(),
1540        };
1541        if messages.is_empty() {
1542            return Ok(false);
1543        }
1544        let trailing_role = messages
1545            .last()
1546            .and_then(|m| m.as_dict())
1547            .and_then(|d| d.get("role"))
1548            .map(|v| v.display())
1549            .unwrap_or_default();
1550        if trailing_role != "assistant" {
1551            return Err(format!(
1552                "pop_last_if_assistant: trailing message role is '{trailing_role}', expected 'assistant'"
1553            ));
1554        }
1555        let keep = messages.len() - 1;
1556        truncate_state(state, keep);
1557        Ok(true)
1558    })
1559}
1560
1561pub fn trim(id: &str, keep_last: usize) -> Option<usize> {
1562    SESSIONS.with(|s| {
1563        let mut map = s.borrow_mut();
1564        let state = map.get_mut(id)?;
1565        let dict = state.transcript.as_dict()?.clone();
1566        let messages: Vec<VmValue> = match dict.get("messages") {
1567            Some(VmValue::List(list)) => list.iter().cloned().collect(),
1568            _ => Vec::new(),
1569        };
1570        let start = messages.len().saturating_sub(keep_last);
1571        let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
1572        let kept = retained.len();
1573        let mut next = dict;
1574        next.insert(
1575            crate::value::intern_key("events"),
1576            VmValue::List(std::sync::Arc::new(
1577                crate::llm::helpers::transcript_events_from_messages(&retained),
1578            )),
1579        );
1580        next.insert(
1581            crate::value::intern_key("messages"),
1582            VmValue::List(std::sync::Arc::new(retained)),
1583        );
1584        apply_transcript_with_budget(state, VmValue::dict(next), "trim").ok()?;
1585        Some(kept)
1586    })
1587}
1588
1589/// Append a message dict to the session transcript. The message must
1590/// have at least a string `role`; anything else is merged verbatim.
1591pub fn inject_message(id: &str, message: VmValue) -> Result<(), String> {
1592    let Some(msg_dict) = message.as_dict().cloned() else {
1593        return Err("agent_session_inject: message must be a dict".into());
1594    };
1595    let role_ok = matches!(msg_dict.get("role"), Some(VmValue::String(_)));
1596    if !role_ok {
1597        return Err(
1598            "agent_session_inject: message must have a string `role` (user|assistant|tool_result|system)"
1599                .into(),
1600        );
1601    }
1602    SESSIONS.with(|s| {
1603        let mut map = s.borrow_mut();
1604        let Some(state) = map.get_mut(id) else {
1605            return Err(format!("agent_session_inject: unknown session id '{id}'"));
1606        };
1607        let dict = state
1608            .transcript
1609            .as_dict()
1610            .cloned()
1611            .unwrap_or_else(crate::value::DictMap::new);
1612        let mut messages: Vec<VmValue> = match dict.get("messages") {
1613            Some(VmValue::List(list)) => list.iter().cloned().collect(),
1614            _ => Vec::new(),
1615        };
1616        let mut events: Vec<VmValue> = match dict.get("events") {
1617            Some(VmValue::List(list)) => list.iter().cloned().collect(),
1618            _ => crate::llm::helpers::transcript_events_from_messages(&messages),
1619        };
1620        let new_message = VmValue::dict(msg_dict);
1621        let message_index = messages.len();
1622        events.push(crate::llm::helpers::transcript_event_from_message(
1623            &new_message,
1624        ));
1625        messages.push(new_message);
1626        let mut next = dict;
1627        next.insert(
1628            crate::value::intern_key("events"),
1629            VmValue::List(std::sync::Arc::new(events)),
1630        );
1631        next.insert(
1632            crate::value::intern_key("messages"),
1633            VmValue::List(std::sync::Arc::new(messages)),
1634        );
1635        let persisted_message = next
1636            .get("messages")
1637            .and_then(|value| match value {
1638                VmValue::List(list) => list.get(message_index).cloned(),
1639                _ => None,
1640            })
1641            .unwrap_or(VmValue::Nil);
1642        apply_transcript_with_budget(state, VmValue::dict(next), "inject_message")?;
1643        emit_identified_user_message_event(id, &persisted_message);
1644        emit_llm_message_event(id, message_index, &persisted_message);
1645        Ok(())
1646    })
1647}
1648
1649fn emit_identified_user_message_event(session_id: &str, message: &VmValue) {
1650    let message_json = crate::llm::helpers::vm_value_to_json(message);
1651    let role = message_json.get("role").and_then(|value| value.as_str());
1652    if role != Some("user") {
1653        return;
1654    }
1655    let Some(message_id) = message_json
1656        .get("messageId")
1657        .or_else(|| message_json.get("message_id"))
1658        .and_then(|value| value.as_str())
1659        .filter(|value| !value.trim().is_empty())
1660    else {
1661        return;
1662    };
1663    let content = message_json
1664        .get("content")
1665        .map(user_message_content_blocks)
1666        .unwrap_or_default();
1667    crate::agent_events::emit_event(&crate::agent_events::AgentEvent::UserMessage {
1668        session_id: session_id.to_string(),
1669        message_id: message_id.to_string(),
1670        content,
1671    });
1672}
1673
1674fn user_message_content_blocks(content: &serde_json::Value) -> Vec<serde_json::Value> {
1675    match content {
1676        serde_json::Value::Array(items) => items.clone(),
1677        serde_json::Value::String(text) => vec![serde_json::json!({
1678            "type": "text",
1679            "text": text,
1680        })],
1681        other => vec![serde_json::json!({
1682            "type": "text",
1683            "text": other.to_string(),
1684        })],
1685    }
1686}
1687
1688fn emit_llm_message_event(session_id: &str, message_index: usize, message: &VmValue) {
1689    let mut fields = serde_json::Map::new();
1690    fields.insert(
1691        "session_id".to_string(),
1692        serde_json::Value::String(session_id.to_string()),
1693    );
1694    fields.insert(
1695        "message_index".to_string(),
1696        serde_json::json!(message_index),
1697    );
1698    let message_json = crate::llm::helpers::vm_value_to_json(message);
1699    if let Some(role) = message_json.get("role").and_then(|value| value.as_str()) {
1700        fields.insert(
1701            "role".to_string(),
1702            serde_json::Value::String(role.to_string()),
1703        );
1704    }
1705    if let Some(content) = message_json.get("content") {
1706        fields.insert("content".to_string(), content.clone());
1707    }
1708    fields.insert("message".to_string(), message_json);
1709    crate::llm::append_observability_sidecar_entry("message", fields);
1710}
1711
1712/// Create a new session from a reconstructed message list.
1713///
1714/// This is intentionally an all-at-once write instead of repeated
1715/// `inject_message` calls: importing a transcript should not re-emit
1716/// each historic turn into the active observability sidecar.
1717pub fn seed_from_messages(
1718    id: Option<String>,
1719    messages: &[serde_json::Value],
1720    metadata: serde_json::Value,
1721    system_prompt: Option<String>,
1722    tool_format: Option<String>,
1723) -> Result<String, String> {
1724    let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
1725    if exists(&resolved) {
1726        return Err(format!("agent session '{resolved}' already exists"));
1727    }
1728    open_or_create(Some(resolved.clone()));
1729    SESSIONS.with(|s| {
1730        let mut map = s.borrow_mut();
1731        let Some(state) = map.get_mut(&resolved) else {
1732            return Err(format!("failed to create agent session '{resolved}'"));
1733        };
1734        state.tool_format = tool_format.filter(|value| !value.trim().is_empty());
1735        state.system_prompt = system_prompt.filter(|value| !value.trim().is_empty());
1736
1737        let mut metadata = metadata
1738            .as_object()
1739            .cloned()
1740            .unwrap_or_else(serde_json::Map::new);
1741        if let Some(tool_format) = state.tool_format.as_ref() {
1742            metadata.insert(
1743                "tool_format".to_string(),
1744                serde_json::Value::String(tool_format.clone()),
1745            );
1746            metadata.insert(
1747                "tool_mode_locked".to_string(),
1748                serde_json::Value::Bool(true),
1749            );
1750        }
1751        if let Some(system_prompt) = state.system_prompt.as_ref() {
1752            metadata.insert(
1753                "system_prompt".to_string(),
1754                crate::llm::helpers::system_prompt_metadata(system_prompt),
1755            );
1756        }
1757        let vm_messages = crate::llm::helpers::json_messages_to_vm(messages);
1758        let candidate = crate::llm::helpers::new_transcript_with(
1759            Some(resolved.clone()),
1760            vm_messages,
1761            None,
1762            Some(crate::stdlib::json_to_vm_value(&serde_json::Value::Object(
1763                metadata,
1764            ))),
1765        );
1766        apply_transcript_with_budget(state, candidate, "seed_from_messages")?;
1767        Ok(resolved)
1768    })
1769}
1770
1771/// Load the messages vec (as JSON) for this session, for use as prefix
1772/// to an agent_loop run. Returns an empty vec if the session doesn't
1773/// exist or has no messages.
1774pub fn messages_json(id: &str) -> Vec<serde_json::Value> {
1775    SESSIONS.with(|s| {
1776        let map = s.borrow();
1777        let Some(state) = map.get(id) else {
1778            return Vec::new();
1779        };
1780        let Some(dict) = state.transcript.as_dict() else {
1781            return Vec::new();
1782        };
1783        match dict.get("messages") {
1784            Some(VmValue::List(list)) => list
1785                .iter()
1786                .map(crate::llm::helpers::vm_value_to_json)
1787                .collect(),
1788            _ => Vec::new(),
1789        }
1790    })
1791}
1792
1793#[derive(Clone, Debug, Default)]
1794pub struct SessionPromptState {
1795    pub messages: Vec<serde_json::Value>,
1796    pub summary: Option<String>,
1797}
1798
1799fn summary_message_json(summary: &str) -> serde_json::Value {
1800    serde_json::json!({
1801        "role": "user",
1802        "content": summary,
1803    })
1804}
1805
1806fn messages_begin_with_summary(messages: &[serde_json::Value], summary: &str) -> bool {
1807    messages.first().is_some_and(|message| {
1808        message.get("role").and_then(|value| value.as_str()) == Some("user")
1809            && message.get("content").and_then(|value| value.as_str()) == Some(summary)
1810    })
1811}
1812
1813/// Prompt-surface resume state for a persisted session.
1814///
1815/// Returns the compacted/rehydratable message list plus the transcript's
1816/// summary field. When the transcript carries a summary field but its
1817/// message list does not already begin with the compacted summary
1818/// message, this helper prepends one so session re-entry preserves the
1819/// same prompt surface the previous loop was actually using.
1820pub fn prompt_state_json(id: &str) -> SessionPromptState {
1821    SESSIONS.with(|s| {
1822        let map = s.borrow();
1823        let Some(state) = map.get(id) else {
1824            return SessionPromptState::default();
1825        };
1826        let Some(dict) = state.transcript.as_dict() else {
1827            return SessionPromptState::default();
1828        };
1829        let mut messages = match dict.get("messages") {
1830            Some(VmValue::List(list)) => list
1831                .iter()
1832                .map(crate::llm::helpers::vm_value_to_json)
1833                .collect::<Vec<_>>(),
1834            _ => Vec::new(),
1835        };
1836        let summary = dict.get("summary").and_then(|value| match value {
1837            VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
1838            _ => None,
1839        });
1840        if let Some(summary_text) = summary.as_deref() {
1841            if !messages_begin_with_summary(&messages, summary_text) {
1842                messages.insert(0, summary_message_json(summary_text));
1843            }
1844        }
1845        SessionPromptState { messages, summary }
1846    })
1847}
1848
1849/// Overwrite the transcript for this session. Used by `agent_loop` on
1850/// exit to persist the synthesized transcript.
1851pub fn store_transcript(id: &str, transcript: VmValue) -> Result<(), String> {
1852    SESSIONS.with(|s| {
1853        let mut map = s.borrow_mut();
1854        let Some(state) = map.get_mut(id) else {
1855            return Err(format!(
1856                "agent_session_store_transcript: unknown session id '{id}'"
1857            ));
1858        };
1859        let transcript = transcript_with_session_metadata(transcript, state);
1860        apply_transcript_with_budget(state, transcript, "store_transcript")?;
1861        Ok(())
1862    })
1863}
1864
1865fn checkpoint_summary(checkpoint: &SessionTurnCheckpoint) -> SessionCheckpointSummary {
1866    SessionCheckpointSummary {
1867        checkpoint_id: checkpoint.checkpoint_id.clone(),
1868        before_message_count: checkpoint.before_message_count,
1869        after_message_count: checkpoint.after_message_count,
1870        fs_snapshot_ids: checkpoint.fs_snapshot_ids.clone(),
1871    }
1872}
1873
1874fn checkpoint_error_status(error: SessionCheckpointError) -> &'static str {
1875    match error {
1876        SessionCheckpointError::UnknownSession => "unknown_session",
1877        SessionCheckpointError::NoCheckpoint => "no_checkpoint",
1878        SessionCheckpointError::NoRedo => "no_redo",
1879    }
1880}
1881
1882pub fn checkpoint_status_name(error: SessionCheckpointError) -> &'static str {
1883    checkpoint_error_status(error)
1884}
1885
1886/// Clear redo checkpoints after host-side workspace mutations that are not part
1887/// of the redo flow. Returns whether any redo state was discarded.
1888pub fn invalidate_redo(id: &str) -> bool {
1889    SESSIONS.with(|s| {
1890        let mut map = s.borrow_mut();
1891        let Some(state) = map.get_mut(id) else {
1892            return false;
1893        };
1894        let had_redo = !state.redo_stack.is_empty();
1895        state.redo_stack.clear();
1896        state.touch();
1897        had_redo
1898    })
1899}
1900
1901/// Record a completed prompt turn boundary.
1902///
1903/// `before_transcript` must be captured immediately before the user turn
1904/// starts. The current live transcript becomes the redo target, and optional
1905/// `fs_snapshot_ids` name host-owned filesystem snapshots captured during the
1906/// turn. Harn owns the transcript stack; hosts own concrete file restoration.
1907pub fn record_completed_turn_checkpoint(
1908    id: &str,
1909    before_transcript: VmValue,
1910    fs_snapshot_ids: Vec<String>,
1911) -> Result<Option<SessionCheckpointSummary>, SessionCheckpointError> {
1912    SESSIONS.with(|s| {
1913        let mut map = s.borrow_mut();
1914        let Some(state) = map.get_mut(id) else {
1915            return Err(SessionCheckpointError::UnknownSession);
1916        };
1917        let after_transcript = transcript_with_session_metadata(state.transcript.clone(), state);
1918        let before_message_count = transcript_message_count(&before_transcript);
1919        let after_message_count = transcript_message_count(&after_transcript);
1920        if crate::values_equal(&before_transcript, &after_transcript) && fs_snapshot_ids.is_empty()
1921        {
1922            return Ok(None);
1923        }
1924        let checkpoint = SessionTurnCheckpoint {
1925            checkpoint_id: format!("turn_{}", uuid::Uuid::now_v7().simple()),
1926            completed_at: crate::orchestration::now_rfc3339(),
1927            before_message_count,
1928            after_message_count,
1929            before_transcript,
1930            after_transcript,
1931            fs_snapshot_ids,
1932        };
1933        state.redo_stack.clear();
1934        state.completed_turn_checkpoints.push(checkpoint.clone());
1935        state.touch();
1936        Ok(Some(checkpoint_summary(&checkpoint)))
1937    })
1938}
1939
1940pub fn rollback_plan(id: &str) -> Result<SessionCheckpointSummary, SessionCheckpointError> {
1941    SESSIONS.with(|s| {
1942        let map = s.borrow();
1943        let Some(state) = map.get(id) else {
1944            return Err(SessionCheckpointError::UnknownSession);
1945        };
1946        state
1947            .completed_turn_checkpoints
1948            .last()
1949            .map(checkpoint_summary)
1950            .ok_or(SessionCheckpointError::NoCheckpoint)
1951    })
1952}
1953
1954pub fn redo_plan(id: &str) -> Result<SessionCheckpointSummary, SessionCheckpointError> {
1955    SESSIONS.with(|s| {
1956        let map = s.borrow();
1957        let Some(state) = map.get(id) else {
1958            return Err(SessionCheckpointError::UnknownSession);
1959        };
1960        state
1961            .redo_stack
1962            .last()
1963            .map(|entry| {
1964                let mut summary = checkpoint_summary(&entry.checkpoint);
1965                summary.fs_snapshot_ids = entry.redo_fs_snapshot_ids.clone();
1966                summary
1967            })
1968            .ok_or(SessionCheckpointError::NoRedo)
1969    })
1970}
1971
1972pub fn rollback_last_completed_turn(
1973    id: &str,
1974    redo_fs_snapshot_ids: Vec<String>,
1975) -> Result<SessionCheckpointOutcome, SessionCheckpointError> {
1976    SESSIONS.with(|s| {
1977        let mut map = s.borrow_mut();
1978        let Some(state) = map.get_mut(id) else {
1979            return Err(SessionCheckpointError::UnknownSession);
1980        };
1981        let Some(checkpoint) = state.completed_turn_checkpoints.pop() else {
1982            return Err(SessionCheckpointError::NoCheckpoint);
1983        };
1984        state.transcript = checkpoint.before_transcript.clone();
1985        state.redo_stack.push(SessionRedoEntry {
1986            checkpoint: checkpoint.clone(),
1987            redo_fs_snapshot_ids: redo_fs_snapshot_ids.clone(),
1988        });
1989        state.touch();
1990        Ok(SessionCheckpointOutcome {
1991            status: "rolled_back",
1992            checkpoint: checkpoint_summary(&checkpoint),
1993            redo_fs_snapshot_ids,
1994        })
1995    })
1996}
1997
1998pub fn redo_last_rollback(id: &str) -> Result<SessionCheckpointOutcome, SessionCheckpointError> {
1999    SESSIONS.with(|s| {
2000        let mut map = s.borrow_mut();
2001        let Some(state) = map.get_mut(id) else {
2002            return Err(SessionCheckpointError::UnknownSession);
2003        };
2004        let Some(entry) = state.redo_stack.pop() else {
2005            return Err(SessionCheckpointError::NoRedo);
2006        };
2007        let checkpoint = entry.checkpoint;
2008        state.transcript = checkpoint.after_transcript.clone();
2009        state.completed_turn_checkpoints.push(checkpoint.clone());
2010        state.touch();
2011        Ok(SessionCheckpointOutcome {
2012            status: "redone",
2013            checkpoint: checkpoint_summary(&checkpoint),
2014            redo_fs_snapshot_ids: entry.redo_fs_snapshot_ids,
2015        })
2016    })
2017}
2018
2019/// Remove malformed reminder events after their drop audit has been emitted.
2020/// Pending-reminder rendering scans the transcript on every LLM call; pruning
2021/// invalid entries makes the drop event one-shot instead of noisy per turn.
2022pub fn prune_invalid_reminder_events(id: &str) -> usize {
2023    SESSIONS.with(|s| {
2024        let mut map = s.borrow_mut();
2025        let Some(state) = map.get_mut(id) else {
2026            return 0;
2027        };
2028        let Some(dict) = state.transcript.as_dict().cloned() else {
2029            return 0;
2030        };
2031        let Some(VmValue::List(events)) = dict.get("events") else {
2032            return 0;
2033        };
2034        let mut pruned = 0_usize;
2035        let mut kept = Vec::with_capacity(events.len());
2036        for event in events.iter().cloned() {
2037            let is_reminder = event
2038                .as_dict()
2039                .and_then(|event| event.get("kind"))
2040                .map(VmValue::display)
2041                .as_deref()
2042                == Some(crate::llm::helpers::SYSTEM_REMINDER_EVENT_KIND);
2043            if !is_reminder {
2044                kept.push(event);
2045                continue;
2046            }
2047            let valid = crate::llm::helpers::reminder_from_event(&event)
2048                .is_some_and(|reminder| !reminder.body.trim().is_empty());
2049            if valid {
2050                kept.push(event);
2051            } else {
2052                pruned += 1;
2053            }
2054        }
2055        if pruned > 0 {
2056            let mut next = dict;
2057            next.insert(
2058                crate::value::intern_key("events"),
2059                VmValue::List(std::sync::Arc::new(kept)),
2060            );
2061            let _ = apply_transcript_with_budget(
2062                state,
2063                VmValue::dict(next),
2064                "prune_invalid_reminder_events",
2065            );
2066            state.touch();
2067        }
2068        pruned
2069    })
2070}
2071
2072/// Apply the reminder TTL lifecycle that runs once per completed agent
2073/// turn. Reminders with `ttl_turns = 1` expire and are removed; larger
2074/// finite TTLs are decremented in place. Expiry audit events are emitted
2075/// to the active EventLog when one is installed.
2076pub fn apply_reminder_post_turn(id: &str, turn: i64) -> Result<serde_json::Value, String> {
2077    let report = SESSIONS.with(|s| {
2078        let mut map = s.borrow_mut();
2079        let Some(state) = map.get_mut(id) else {
2080            return Err(format!(
2081                "agent_session_apply_reminder_post_turn: unknown session id '{id}'"
2082            ));
2083        };
2084        let report = crate::llm::helpers::apply_reminder_post_turn(&state.transcript, turn);
2085        if report.decremented_count > 0 || !report.expired.is_empty() {
2086            if let Some(next) = report.transcript.clone() {
2087                apply_transcript_with_budget(state, next, "apply_reminder_post_turn")?;
2088            }
2089            state.touch();
2090        }
2091        Ok(report)
2092    })?;
2093
2094    for reminder in &report.expired {
2095        let mut payload = crate::llm::helpers::reminder_lifecycle_payload(Some(id), reminder);
2096        if let Some(obj) = payload.as_object_mut() {
2097            obj.insert(
2098                "transcript_id".to_string(),
2099                serde_json::Value::String(id.to_string()),
2100            );
2101            obj.insert(
2102                "reason".to_string(),
2103                serde_json::Value::String("ttl".to_string()),
2104            );
2105            obj.insert(
2106                "ttl_turns_before".to_string(),
2107                serde_json::json!(&reminder.ttl_turns),
2108            );
2109            obj.insert("expired_at_turn".to_string(), serde_json::json!(turn));
2110        }
2111        crate::llm::helpers::emit_reminder_lifecycle_event(
2112            crate::llm::helpers::REMINDER_EXPIRED_EVENT_KIND,
2113            payload,
2114        );
2115    }
2116
2117    Ok(serde_json::json!({
2118        "expired_count": report.expired.len(),
2119        "decremented_count": report.decremented_count,
2120        "remaining_count": report.remaining_count,
2121    }))
2122}
2123
2124/// Inject a typed system reminder into the session transcript's event
2125/// stream. This mirrors `transcript.inject_reminder` for live sessions:
2126/// reminders with the same `dedupe_key` are replaced before the new
2127/// reminder event is appended.
2128pub fn inject_reminder(
2129    id: &str,
2130    reminder: crate::llm::helpers::SystemReminder,
2131) -> Result<ReminderInjectionReport, String> {
2132    let reminder_id = reminder.id.clone();
2133    let dedupe_key = reminder.dedupe_key.clone();
2134    let mut deduped_reminder_ids = Vec::new();
2135    SESSIONS.with(|s| {
2136        let mut map = s.borrow_mut();
2137        let Some(state) = map.get_mut(id) else {
2138            return Err(format!(
2139                "agent_session_inject_reminder: unknown session id '{id}'"
2140            ));
2141        };
2142        let dict = state
2143            .transcript
2144            .as_dict()
2145            .cloned()
2146            .unwrap_or_else(crate::value::DictMap::new);
2147        let mut events: Vec<VmValue> = match dict.get("events") {
2148            Some(VmValue::List(list)) => list.iter().cloned().collect(),
2149            _ => dict
2150                .get("messages")
2151                .and_then(|value| match value {
2152                    VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
2153                    _ => None,
2154                })
2155                .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
2156                .unwrap_or_default(),
2157        };
2158        if let Some(expected_key) = dedupe_key.as_deref() {
2159            events.retain(|event| {
2160                let Some(existing) = crate::llm::helpers::reminder_from_event(event) else {
2161                    return true;
2162                };
2163                if existing.dedupe_key.as_deref() == Some(expected_key) {
2164                    deduped_reminder_ids.push(existing.id);
2165                    false
2166                } else {
2167                    true
2168                }
2169            });
2170        }
2171        events.push(crate::llm::helpers::transcript_reminder_event(&reminder));
2172        let mut next = dict;
2173        next.insert(
2174            crate::value::intern_key("events"),
2175            VmValue::List(std::sync::Arc::new(events)),
2176        );
2177        apply_transcript_with_budget(state, VmValue::dict(next), "inject_reminder")?;
2178        state.touch();
2179        Ok(())
2180    })?;
2181
2182    if !deduped_reminder_ids.is_empty() {
2183        let dropped_count = deduped_reminder_ids.len();
2184        crate::llm::helpers::emit_reminder_lifecycle_event(
2185            crate::llm::helpers::REMINDER_DEDUPED_EVENT_KIND,
2186            serde_json::json!({
2187                "session_id": id,
2188                "transcript_id": id,
2189                "reminder_id": &reminder_id,
2190                "replacing_id": &reminder_id,
2191                "replaced_id": deduped_reminder_ids.first(),
2192                "replaced_ids": &deduped_reminder_ids,
2193                "dedupe_key": &dedupe_key,
2194                "dropped_reminder_ids": &deduped_reminder_ids,
2195                "dropped_count": dropped_count,
2196            }),
2197        );
2198    }
2199
2200    crate::llm::helpers::emit_reminder_lifecycle_event(
2201        crate::llm::helpers::REMINDER_INJECTED_EVENT_KIND,
2202        crate::llm::helpers::reminder_lifecycle_payload(Some(id), &reminder),
2203    );
2204
2205    Ok(ReminderInjectionReport {
2206        reminder_id,
2207        deduped_count: deduped_reminder_ids.len(),
2208    })
2209}
2210
2211/// Append a transcript event to the session without mutating its
2212/// message list. Used for orchestration-side lineage events (sub-agent
2213/// spawn/completion, workflow hooks, etc.) that should survive
2214/// persistence/replay without being replayed back into the model as
2215/// conversational messages.
2216pub fn append_event(id: &str, event: VmValue) -> Result<(), String> {
2217    let Some(event_dict) = event.as_dict() else {
2218        return Err("agent_session_append_event: event must be a dict".into());
2219    };
2220    let kind_ok = matches!(event_dict.get("kind"), Some(VmValue::String(_)));
2221    if !kind_ok {
2222        return Err("agent_session_append_event: event must have a string `kind`".into());
2223    }
2224    SESSIONS.with(|s| {
2225        let mut map = s.borrow_mut();
2226        let Some(state) = map.get_mut(id) else {
2227            return Err(format!(
2228                "agent_session_append_event: unknown session id '{id}'"
2229            ));
2230        };
2231        append_event_to_state(state, event, "append_event")?;
2232        Ok(())
2233    })
2234}
2235
2236fn append_event_to_state(
2237    state: &mut SessionState,
2238    event: VmValue,
2239    action: &str,
2240) -> Result<(), String> {
2241    let dict = state
2242        .transcript
2243        .as_dict()
2244        .cloned()
2245        .unwrap_or_else(crate::value::DictMap::new);
2246    let mut events: Vec<VmValue> = match dict.get("events") {
2247        Some(VmValue::List(list)) => list.iter().cloned().collect(),
2248        _ => dict
2249            .get("messages")
2250            .and_then(|value| match value {
2251                VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
2252                _ => None,
2253            })
2254            .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
2255            .unwrap_or_default(),
2256    };
2257    events.push(event);
2258    let mut next = dict;
2259    next.insert(
2260        crate::value::intern_key("events"),
2261        VmValue::List(std::sync::Arc::new(events)),
2262    );
2263    apply_transcript_with_budget(state, VmValue::dict(next), action)
2264}
2265
2266/// Replace the transcript's message list wholesale. Used by the
2267/// in-loop compaction path, which operates on JSON messages.
2268pub fn replace_messages(id: &str, messages: &[serde_json::Value]) -> Result<(), String> {
2269    replace_messages_with_summary(id, messages, None)
2270}
2271
2272/// Replace the transcript's message list and optionally update the
2273/// `summary` field on the persisted transcript. The compaction path
2274/// uses this to publish the human-readable rollup line that
2275/// `transcript_summary(transcript)` exposes to host code.
2276pub fn replace_messages_with_summary(
2277    id: &str,
2278    messages: &[serde_json::Value],
2279    summary: Option<&str>,
2280) -> Result<(), String> {
2281    SESSIONS.with(|s| {
2282        let mut map = s.borrow_mut();
2283        let Some(state) = map.get_mut(id) else {
2284            return Err(format!(
2285                "agent_session_replace_messages: unknown session id '{id}'"
2286            ));
2287        };
2288        let dict = state
2289            .transcript
2290            .as_dict()
2291            .cloned()
2292            .unwrap_or_else(crate::value::DictMap::new);
2293        let vm_messages: Vec<VmValue> = messages
2294            .iter()
2295            .map(crate::stdlib::json_to_vm_value)
2296            .collect();
2297        let mut next = dict;
2298        next.insert(
2299            crate::value::intern_key("events"),
2300            VmValue::List(std::sync::Arc::new(
2301                crate::llm::helpers::transcript_events_from_messages(&vm_messages),
2302            )),
2303        );
2304        next.insert(
2305            crate::value::intern_key("messages"),
2306            VmValue::List(std::sync::Arc::new(vm_messages)),
2307        );
2308        if let Some(summary) = summary {
2309            next.put_str("summary", summary);
2310        } else {
2311            next.remove("summary");
2312        }
2313        apply_transcript_with_budget(state, VmValue::dict(next), "replace_messages")?;
2314        Ok(())
2315    })
2316}
2317
2318pub fn append_subscriber(id: &str, callback: VmValue) {
2319    open_or_create(Some(id.to_string()));
2320    SESSIONS.with(|s| {
2321        if let Some(state) = s.borrow_mut().get_mut(id) {
2322            state.subscribers.push(callback);
2323            state.touch();
2324        }
2325    });
2326}
2327
2328pub fn subscribers_for(id: &str) -> Vec<VmValue> {
2329    SESSIONS.with(|s| {
2330        s.borrow()
2331            .get(id)
2332            .map(|state| state.subscribers.clone())
2333            .unwrap_or_default()
2334    })
2335}
2336
2337pub fn subscriber_count(id: &str) -> usize {
2338    SESSIONS.with(|s| {
2339        s.borrow()
2340            .get(id)
2341            .map(|state| state.subscribers.len())
2342            .unwrap_or(0)
2343    })
2344}
2345
2346/// Persist the set of active skill names for session resume. Called at
2347/// the end of an agent_loop run; the next `open_or_create` for this id
2348/// reads them back via [`active_skills`].
2349pub fn set_active_skills(id: &str, skills: Vec<String>) {
2350    SESSIONS.with(|s| {
2351        if let Some(state) = s.borrow_mut().get_mut(id) {
2352            state.active_skills = skills;
2353            state.touch();
2354        }
2355    });
2356}
2357
2358/// Skills that were active at the end of the previous agent_loop run
2359/// against this session. Returns an empty vec when the session doesn't
2360/// exist or nothing was persisted.
2361pub fn active_skills(id: &str) -> Vec<String> {
2362    SESSIONS.with(|s| {
2363        s.borrow()
2364            .get(id)
2365            .map(|state| state.active_skills.clone())
2366            .unwrap_or_default()
2367    })
2368}
2369
2370/// Claim the tool-calling contract for a session.
2371///
2372/// The first loop against a named session records its `tool_format`.
2373/// Later re-entry must use the same format so prompt/history generated
2374/// under a text contract is never replayed as native, or vice versa.
2375pub fn claim_tool_format(id: &str, tool_format: &str) -> Result<(), String> {
2376    let tool_format = tool_format.trim();
2377    if tool_format.is_empty() {
2378        return Ok(());
2379    }
2380    SESSIONS.with(|s| {
2381        let mut map = s.borrow_mut();
2382        let Some(state) = map.get_mut(id) else {
2383            return Err(format!("agent session '{id}' does not exist"));
2384        };
2385        match state.tool_format.as_deref() {
2386            Some(existing) if existing != tool_format => Err(format!(
2387                "agent session '{id}' is locked to tool_format='{existing}', but this run requested tool_format='{tool_format}'. Start a new session or fork/reset the transcript before changing tool mode."
2388            )),
2389            Some(_) => {
2390                state.touch();
2391                Ok(())
2392            }
2393            None => {
2394                state.tool_format = Some(tool_format.to_string());
2395                state.touch();
2396                Ok(())
2397            }
2398        }
2399    })
2400}
2401
2402pub fn tool_format(id: &str) -> Option<String> {
2403    SESSIONS.with(|s| {
2404        s.borrow()
2405            .get(id)
2406            .and_then(|state| state.tool_format.clone())
2407    })
2408}
2409
2410pub fn record_system_prompt(id: &str, system_prompt: &str) -> Result<(), String> {
2411    let system_prompt = system_prompt.trim();
2412    if system_prompt.is_empty() {
2413        return Ok(());
2414    }
2415    assert_cache_stable_system_prompt(system_prompt);
2416    SESSIONS.with(|s| {
2417        let mut map = s.borrow_mut();
2418        let Some(state) = map.get_mut(id) else {
2419            return Err(format!("agent session '{id}' does not exist"));
2420        };
2421        let changed = state.system_prompt.as_deref() != Some(system_prompt);
2422        state.system_prompt = Some(system_prompt.to_string());
2423        let dict = state
2424            .transcript
2425            .as_dict()
2426            .cloned()
2427            .unwrap_or_else(crate::value::DictMap::new);
2428        let mut next = dict;
2429        apply_system_prompt_metadata(&mut next, system_prompt);
2430        if changed {
2431            let mut events: Vec<VmValue> = match next.get("events") {
2432                Some(VmValue::List(list)) => list.iter().cloned().collect(),
2433                _ => Vec::new(),
2434            };
2435            events.push(crate::llm::helpers::transcript_event(
2436                "system_prompt",
2437                "system",
2438                "internal",
2439                "",
2440                Some(crate::llm::helpers::system_prompt_event_metadata(
2441                    system_prompt,
2442                )),
2443            ));
2444            next.insert(
2445                crate::value::intern_key("events"),
2446                VmValue::List(std::sync::Arc::new(events)),
2447            );
2448        }
2449        apply_transcript_with_budget(state, VmValue::dict(next), "record_system_prompt")?;
2450        Ok(())
2451    })
2452}
2453
2454pub fn system_prompt(id: &str) -> Option<String> {
2455    SESSIONS.with(|s| {
2456        s.borrow()
2457            .get(id)
2458            .and_then(|state| state.system_prompt.clone())
2459    })
2460}
2461
2462#[cfg(debug_assertions)]
2463fn forbidden_workspace_prompt_token(system_prompt: &str) -> Option<&'static str> {
2464    let mut remaining = system_prompt;
2465    while let Some(index) = remaining.find("{{") {
2466        let candidate = remaining[index + 2..].trim_start();
2467        if candidate.starts_with("workspace_") {
2468            return Some("workspace_");
2469        }
2470        if candidate.starts_with("project_") {
2471            return Some("project_");
2472        }
2473        remaining = candidate;
2474    }
2475    None
2476}
2477
2478#[cfg(debug_assertions)]
2479fn assert_cache_stable_system_prompt(system_prompt: &str) {
2480    if let Some(prefix) = forbidden_workspace_prompt_token(system_prompt) {
2481        panic!(
2482            "{CACHE_STABLE_SYSTEM_PROMPT_DIAGNOSTIC}: session system prompts must not interpolate `{{{{{prefix}...` tokens; move workspace/project context into the workspace-anchor reminder"
2483        );
2484    }
2485}
2486
2487#[cfg(not(debug_assertions))]
2488fn assert_cache_stable_system_prompt(_system_prompt: &str) {}
2489
2490/// Pin (or clear, with `None`) a model selector on a session. Returns
2491/// `Ok(true)` when the value actually changed so callers can decide
2492/// whether to broadcast a notification. The selector is stored verbatim
2493/// — alias / catalog resolution is the call-site's job.
2494pub fn set_pinned_model(id: &str, model: Option<String>) -> Result<bool, String> {
2495    let normalized = model
2496        .map(|value| value.trim().to_string())
2497        .filter(|value| !value.is_empty());
2498    SESSIONS.with(|s| {
2499        let mut map = s.borrow_mut();
2500        let Some(state) = map.get_mut(id) else {
2501            return Err(format!("agent session '{id}' does not exist"));
2502        };
2503        let changed = state.pinned_model != normalized;
2504        state.pinned_model = normalized;
2505        state.touch();
2506        Ok(changed)
2507    })
2508}
2509
2510/// Read the session's pinned model selector, if any. Consumed by
2511/// `vm_resolve_model` as the per-session default when a script-level
2512/// `llm_call` does not pass `model:` explicitly.
2513pub fn pinned_model(id: &str) -> Option<String> {
2514    SESSIONS.with(|s| {
2515        s.borrow()
2516            .get(id)
2517            .and_then(|state| state.pinned_model.clone())
2518    })
2519}
2520
2521/// Pin (or clear) the session-level provider-aware reasoning policy.
2522pub fn set_pinned_reasoning_policy(id: &str, policy: Option<String>) -> Result<bool, String> {
2523    let normalized = match policy {
2524        Some(value) => crate::llm::reasoning_policy::normalize_policy_selector(&value)?,
2525        None => None,
2526    };
2527    SESSIONS.with(|s| {
2528        let mut map = s.borrow_mut();
2529        let Some(state) = map.get_mut(id) else {
2530            return Err(format!("agent session '{id}' does not exist"));
2531        };
2532        let changed = state.pinned_reasoning_policy != normalized;
2533        state.pinned_reasoning_policy = normalized;
2534        state.touch();
2535        Ok(changed)
2536    })
2537}
2538
2539/// Read the session's pinned reasoning policy, if any.
2540pub fn pinned_reasoning_policy(id: &str) -> Option<String> {
2541    SESSIONS.with(|s| {
2542        s.borrow()
2543            .get(id)
2544            .and_then(|state| state.pinned_reasoning_policy.clone())
2545    })
2546}
2547
2548/// Set (or clear, with `None`) the typed workspace anchor on a session.
2549/// Returns `Ok(true)` when the value actually changed so callers can
2550/// decide whether to broadcast `AnchorChanged` notifications.
2551pub fn set_workspace_anchor(id: &str, anchor: Option<WorkspaceAnchor>) -> Result<bool, String> {
2552    SESSIONS.with(|s| {
2553        let mut map = s.borrow_mut();
2554        let Some(state) = map.get_mut(id) else {
2555            return Err(format!("agent session '{id}' does not exist"));
2556        };
2557        let changed = state.workspace_anchor != anchor;
2558        state.workspace_anchor = anchor;
2559        if changed {
2560            state.redo_stack.clear();
2561            crate::llm::permissions::clear_session_grants(id);
2562        }
2563        state.touch();
2564        Ok(changed)
2565    })
2566}
2567
2568/// Read the session's typed workspace anchor, if any.
2569pub fn workspace_anchor(id: &str) -> Option<WorkspaceAnchor> {
2570    SESSIONS.with(|s| {
2571        s.borrow()
2572            .get(id)
2573            .and_then(|state| state.workspace_anchor.clone())
2574    })
2575}
2576
2577/// Outcome of `reanchor_session`: previous + new anchor and whether the
2578/// swap actually moved anything. Callers use `changed` to suppress
2579/// no-op transcript / live events.
2580#[derive(Clone, Debug, PartialEq, Eq)]
2581pub struct ReanchorOutcome {
2582    pub previous: Option<WorkspaceAnchor>,
2583    pub current: WorkspaceAnchor,
2584    pub changed: bool,
2585}
2586
2587/// Atomically swap the session's primary anchor + emit the canonical
2588/// `AnchorChanged` transcript event and live `AgentEvent::AnchorChanged`
2589/// notification (#2218). Clears session-scoped permission grants so
2590/// stale anchor-based decisions don't leak into the next turn.
2591pub fn reanchor_session(
2592    id: &str,
2593    new_anchor: WorkspaceAnchor,
2594    carry_transcript: bool,
2595    compacted: bool,
2596    reason: Option<String>,
2597) -> Result<ReanchorOutcome, String> {
2598    let outcome = SESSIONS.with(|s| {
2599        let mut map = s.borrow_mut();
2600        let Some(state) = map.get_mut(id) else {
2601            return Err(format!("agent session '{id}' does not exist"));
2602        };
2603        let previous = state.workspace_anchor.clone();
2604        let changed = previous.as_ref() != Some(&new_anchor);
2605        state.workspace_anchor = Some(new_anchor.clone());
2606        if changed {
2607            crate::llm::permissions::clear_session_grants(id);
2608        }
2609        state.touch();
2610        Ok(ReanchorOutcome {
2611            previous,
2612            current: new_anchor,
2613            changed,
2614        })
2615    })?;
2616    if !outcome.changed {
2617        return Ok(outcome);
2618    }
2619    let previous_json = outcome.previous.as_ref().map(WorkspaceAnchor::to_json);
2620    let current_json = outcome.current.to_json();
2621    let event_metadata = serde_json::json!({
2622        "previous": previous_json,
2623        "current": current_json,
2624        "carry_transcript": carry_transcript,
2625        "compacted": compacted,
2626        "reason": reason,
2627    });
2628    let event = crate::llm::helpers::transcript_event(
2629        "AnchorChanged",
2630        "system",
2631        "internal",
2632        "",
2633        Some(event_metadata),
2634    );
2635    let _ = append_event(id, event);
2636    crate::llm::emit_live_agent_event_sync(&crate::agent_events::AgentEvent::AnchorChanged {
2637        session_id: id.to_string(),
2638        previous: previous_json,
2639        current: current_json,
2640        carry_transcript,
2641        compacted,
2642        reason,
2643    });
2644    Ok(outcome)
2645}
2646
2647/// Set session-local workspace defaults. Returns `Ok(true)` when the
2648/// policy changed.
2649pub fn set_workspace_policy(id: &str, policy: WorkspacePolicy) -> Result<bool, String> {
2650    SESSIONS.with(|s| {
2651        let mut map = s.borrow_mut();
2652        let Some(state) = map.get_mut(id) else {
2653            return Err(format!("agent session '{id}' does not exist"));
2654        };
2655        let changed = state.workspace_policy != policy;
2656        state.workspace_policy = policy;
2657        if changed {
2658            state.redo_stack.clear();
2659        }
2660        state.touch();
2661        Ok(changed)
2662    })
2663}
2664
2665/// Read the session's workspace policy, if the session exists.
2666pub fn workspace_policy(id: &str) -> Option<WorkspacePolicy> {
2667    SESSIONS.with(|s| {
2668        s.borrow()
2669            .get(id)
2670            .map(|state| state.workspace_policy.clone())
2671    })
2672}
2673
2674/// Validate and mount an additional workspace root on an anchored
2675/// session. When the path is already mounted, updates its mount mode
2676/// in place and refreshes its `mounted_at` timestamp.
2677pub fn add_workspace_root(
2678    id: &str,
2679    root: &str,
2680    mount_mode: Option<MountMode>,
2681    reason: Option<String>,
2682) -> Result<String, String> {
2683    let normalized_root = validate_workspace_root_path(root)?;
2684    let mounted_at = crate::orchestration::now_rfc3339();
2685    SESSIONS.with(|s| {
2686        let mut map = s.borrow_mut();
2687        let Some(state) = map.get_mut(id) else {
2688            return Err(format!("agent session '{id}' does not exist"));
2689        };
2690        let default_mount_mode = state.workspace_policy.default_mount_mode;
2691        let Some(anchor) = state.workspace_anchor.as_mut() else {
2692            return Err(format!("agent session '{id}' has no workspace anchor"));
2693        };
2694        let resolved_mount_mode = mount_mode.unwrap_or(default_mount_mode);
2695        if let Some(existing) = anchor
2696            .additional_roots
2697            .iter_mut()
2698            .find(|entry| entry.path == normalized_root)
2699        {
2700            let changed =
2701                existing.mount_mode != resolved_mount_mode || existing.mounted_at != mounted_at;
2702            existing.mount_mode = resolved_mount_mode;
2703            existing.mounted_at = mounted_at.clone();
2704            if changed {
2705                state.redo_stack.clear();
2706            }
2707        } else {
2708            anchor.additional_roots.push(MountedRoot {
2709                path: normalized_root.clone(),
2710                mount_mode: resolved_mount_mode,
2711                mounted_at: mounted_at.clone(),
2712            });
2713            state.redo_stack.clear();
2714        }
2715        let event = crate::llm::helpers::transcript_event(
2716            "RootMounted",
2717            "system",
2718            "internal",
2719            "",
2720            Some(serde_json::json!({
2721                "path": normalized_root.to_string_lossy(),
2722                "mount_mode": resolved_mount_mode.as_str(),
2723                "mounted_at": mounted_at.clone(),
2724                "reason": reason,
2725            })),
2726        );
2727        append_event_to_state(state, event, "add_workspace_root")?;
2728        crate::llm::permissions::clear_session_grants(id);
2729        state.touch();
2730        Ok(mounted_at.clone())
2731    })
2732}
2733
2734/// Remove one mounted root from an anchored session. Returns whether an
2735/// existing mount entry was deleted. Removing an absent root is a no-op.
2736pub fn remove_workspace_root(id: &str, root: &str) -> Result<bool, String> {
2737    let normalized_root = normalize_workspace_root_path(root);
2738    SESSIONS.with(|s| {
2739        let mut map = s.borrow_mut();
2740        let Some(state) = map.get_mut(id) else {
2741            return Err(format!("agent session '{id}' does not exist"));
2742        };
2743        let Some(anchor) = state.workspace_anchor.as_mut() else {
2744            return Err(format!("agent session '{id}' has no workspace anchor"));
2745        };
2746        let before = anchor.additional_roots.len();
2747        anchor
2748            .additional_roots
2749            .retain(|entry| entry.path != normalized_root);
2750        let removed = anchor.additional_roots.len() != before;
2751        if removed {
2752            state.redo_stack.clear();
2753            crate::llm::permissions::clear_session_grants(id);
2754        }
2755        state.touch();
2756        Ok(removed)
2757    })
2758}
2759
2760pub fn list_workspace_roots(id: &str) -> Result<(PathBuf, Vec<MountedRoot>), String> {
2761    SESSIONS.with(|s| {
2762        let map = s.borrow();
2763        let Some(state) = map.get(id) else {
2764            return Err(format!("agent session '{id}' does not exist"));
2765        };
2766        let Some(anchor) = state.workspace_anchor.as_ref() else {
2767            return Err(format!("agent session '{id}' has no workspace anchor"));
2768        };
2769        Ok((anchor.primary.clone(), anchor.additional_roots.clone()))
2770    })
2771}
2772
2773fn validate_workspace_root_path(root: &str) -> Result<PathBuf, String> {
2774    let normalized = normalize_workspace_root_path(root);
2775    let canonical = std::fs::canonicalize(&normalized)
2776        .map_err(|error| format!("workspace root '{root}' must exist and be readable: {error}"))?;
2777    let metadata = std::fs::metadata(&canonical)
2778        .map_err(|error| format!("workspace root '{root}' must exist and be readable: {error}"))?;
2779    if !metadata.is_dir() {
2780        return Err(format!("workspace root '{root}' must be a directory"));
2781    }
2782    std::fs::read_dir(&canonical)
2783        .map_err(|error| format!("workspace root '{root}' must be readable: {error}"))?;
2784    Ok(canonical)
2785}
2786
2787fn normalize_workspace_root_path(root: &str) -> PathBuf {
2788    let absolute = crate::stdlib::process::normalize_context_path(Path::new(root));
2789    std::fs::canonicalize(&absolute).unwrap_or(absolute)
2790}
2791
2792fn empty_transcript(id: &str) -> VmValue {
2793    use crate::llm::helpers::new_transcript_with;
2794    new_transcript_with(Some(id.to_string()), Vec::new(), None, None)
2795}
2796
2797fn clone_transcript_with_id(transcript: &VmValue, new_id: &str) -> VmValue {
2798    let Some(dict) = transcript.as_dict() else {
2799        return empty_transcript(new_id);
2800    };
2801    let mut next = dict.clone();
2802    next.put_str("id", new_id);
2803    VmValue::dict(next)
2804}
2805
2806fn clone_transcript_with_parent(transcript: &VmValue, parent_id: &str) -> VmValue {
2807    let Some(dict) = transcript.as_dict() else {
2808        return transcript.clone();
2809    };
2810    let mut next = dict.clone();
2811    let metadata = match next.get("metadata") {
2812        Some(VmValue::Dict(metadata)) => {
2813            let mut metadata = metadata.as_ref().clone();
2814            metadata.put_str("parent_session_id", parent_id);
2815            VmValue::dict(metadata)
2816        }
2817        _ => VmValue::dict(BTreeMap::from([(
2818            "parent_session_id".to_string(),
2819            VmValue::String(arcstr::ArcStr::from(parent_id.to_string())),
2820        )])),
2821    };
2822    next.insert(crate::value::intern_key("metadata"), metadata);
2823    VmValue::dict(next)
2824}
2825
2826fn apply_system_prompt_metadata(next: &mut crate::value::DictMap, system_prompt: &str) {
2827    let mut metadata = match next.get("metadata") {
2828        Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
2829        _ => crate::value::DictMap::new(),
2830    };
2831    metadata.insert(
2832        crate::value::intern_key("system_prompt"),
2833        crate::stdlib::json_to_vm_value(&crate::llm::helpers::system_prompt_metadata(
2834            system_prompt,
2835        )),
2836    );
2837    next.insert(
2838        crate::value::intern_key("metadata"),
2839        VmValue::dict(metadata),
2840    );
2841}
2842
2843fn transcript_with_session_metadata(transcript: VmValue, state: &SessionState) -> VmValue {
2844    let Some(dict) = transcript.as_dict() else {
2845        return transcript;
2846    };
2847    let mut next = dict.clone();
2848    let mut metadata = match next.get("metadata") {
2849        Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
2850        _ => crate::value::DictMap::new(),
2851    };
2852    if let Some(tool_format) = state.tool_format.as_ref() {
2853        metadata.put_str("tool_format", tool_format.clone());
2854        metadata.insert(
2855            crate::value::intern_key("tool_mode_locked"),
2856            VmValue::Bool(true),
2857        );
2858    }
2859    if let Some(system_prompt) = state.system_prompt.as_ref() {
2860        metadata.insert(
2861            crate::value::intern_key("system_prompt"),
2862            crate::stdlib::json_to_vm_value(&crate::llm::helpers::system_prompt_metadata(
2863                system_prompt,
2864            )),
2865        );
2866    }
2867    if let Some(actor_chain) = state.actor_chain.as_ref() {
2868        metadata.insert(
2869            crate::value::intern_key("actor_chain"),
2870            actor_chain.to_vm_value(),
2871        );
2872    } else {
2873        metadata.remove("actor_chain");
2874    }
2875    if let Some(anchor) = state.workspace_anchor.as_ref() {
2876        metadata.insert(
2877            crate::value::intern_key(WORKSPACE_ANCHOR_METADATA_KEY),
2878            anchor.to_vm_value(),
2879        );
2880    } else {
2881        metadata.remove(WORKSPACE_ANCHOR_METADATA_KEY);
2882    }
2883    if let Some(scratchpad) = state.scratchpad.as_ref() {
2884        metadata.insert(
2885            crate::value::intern_key("agent_scratchpad"),
2886            scratchpad.clone(),
2887        );
2888        metadata.insert(
2889            crate::value::intern_key("agent_scratchpad_version"),
2890            VmValue::Int(state.scratchpad_version as i64),
2891        );
2892    } else {
2893        metadata.remove("agent_scratchpad");
2894        metadata.remove("agent_scratchpad_version");
2895    }
2896    if let Some(last_action) = state.last_transcript_budget_action.as_ref() {
2897        let usage = transcript_usage(
2898            &VmValue::dict(next.clone()),
2899            state.transcript_budget_policy.max_approx_bytes.is_some(),
2900        );
2901        metadata.insert(
2902            crate::value::intern_key("transcript_budget"),
2903            crate::stdlib::json_to_vm_value(&serde_json::json!({
2904                "policy": transcript_budget_policy_json(&state.transcript_budget_policy.normalized()),
2905                "usage": transcript_budget_usage_json(&usage),
2906                "last_action": last_action,
2907            })),
2908        );
2909    }
2910    if !metadata.is_empty() {
2911        next.insert(
2912            crate::value::intern_key("metadata"),
2913            VmValue::dict(metadata),
2914        );
2915    }
2916    VmValue::dict(next)
2917}
2918
2919/// Materialize the externally-visible view of a session: the transcript
2920/// plus session-level metadata (lineage, pinned model, scratchpad, live
2921/// clients, checkpoint counts, ...).
2922///
2923/// `state.subscribers` (closure callbacks registered for agent-loop
2924/// events) are intentionally omitted: they are ephemeral by design,
2925/// owned by the current-thread agent-loop worker, and are re-registered
2926/// by the host when a persisted session is reopened. Serializing them
2927/// would at best persist a useless display-string — see
2928/// `crate::llm::vm_value_to_json_strict` for the seams that do guard
2929/// against that class of silent corruption.
2930fn session_snapshot(state: &SessionState) -> VmValue {
2931    let transcript = transcript_with_session_metadata(state.transcript.clone(), state);
2932    let Some(dict) = transcript.as_dict() else {
2933        return state.transcript.clone();
2934    };
2935    let mut next = dict.clone();
2936    let length = next
2937        .get("messages")
2938        .and_then(|value| match value {
2939            VmValue::List(list) => Some(list.len() as i64),
2940            _ => None,
2941        })
2942        .unwrap_or(0);
2943    next.insert(crate::value::intern_key("length"), VmValue::Int(length));
2944    next.put_str("created_at", state.created_at.clone());
2945    next.insert(
2946        crate::value::intern_key("parent_id"),
2947        state
2948            .parent_id
2949            .as_ref()
2950            .map(|id| VmValue::String(arcstr::ArcStr::from(id.clone())))
2951            .unwrap_or(VmValue::Nil),
2952    );
2953    next.insert(
2954        crate::value::intern_key("child_ids"),
2955        VmValue::List(std::sync::Arc::new(
2956            state
2957                .child_ids
2958                .iter()
2959                .cloned()
2960                .map(|id| VmValue::String(arcstr::ArcStr::from(id)))
2961                .collect(),
2962        )),
2963    );
2964    next.insert(
2965        crate::value::intern_key("branched_at_event_index"),
2966        state
2967            .branched_at_event_index
2968            .map(|index| VmValue::Int(index as i64))
2969            .unwrap_or(VmValue::Nil),
2970    );
2971    next.insert(
2972        crate::value::intern_key("system_prompt"),
2973        state
2974            .system_prompt
2975            .as_ref()
2976            .map(|prompt| VmValue::String(arcstr::ArcStr::from(prompt.clone())))
2977            .unwrap_or(VmValue::Nil),
2978    );
2979    next.insert(
2980        crate::value::intern_key("tool_format"),
2981        state
2982            .tool_format
2983            .as_ref()
2984            .map(|format| VmValue::String(arcstr::ArcStr::from(format.clone())))
2985            .unwrap_or(VmValue::Nil),
2986    );
2987    next.insert(
2988        crate::value::intern_key("pinned_model"),
2989        state
2990            .pinned_model
2991            .as_ref()
2992            .map(|model| VmValue::String(arcstr::ArcStr::from(model.clone())))
2993            .unwrap_or(VmValue::Nil),
2994    );
2995    next.insert(
2996        crate::value::intern_key("pinned_reasoning_policy"),
2997        state
2998            .pinned_reasoning_policy
2999            .as_ref()
3000            .map(|policy| VmValue::String(arcstr::ArcStr::from(policy.clone())))
3001            .unwrap_or(VmValue::Nil),
3002    );
3003    next.insert(
3004        crate::value::intern_key("actor_chain"),
3005        state
3006            .actor_chain
3007            .as_ref()
3008            .map(ActorChain::to_vm_value)
3009            .unwrap_or(VmValue::Nil),
3010    );
3011    next.insert(
3012        crate::value::intern_key("scratchpad"),
3013        state.scratchpad.clone().unwrap_or(VmValue::Nil),
3014    );
3015    next.insert(
3016        crate::value::intern_key("scratchpad_version"),
3017        VmValue::Int(state.scratchpad_version as i64),
3018    );
3019    next.insert(
3020        crate::value::intern_key("workspace_anchor"),
3021        state
3022            .workspace_anchor
3023            .as_ref()
3024            .map(WorkspaceAnchor::to_vm_value)
3025            .unwrap_or(VmValue::Nil),
3026    );
3027    next.insert(
3028        crate::value::intern_key("workspace_policy"),
3029        state.workspace_policy.to_vm_value(),
3030    );
3031    next.insert(
3032        crate::value::intern_key("live_clients"),
3033        crate::stdlib::json_to_vm_value(&serde_json::Value::Array(
3034            state.live_clients.values().map(live_client_json).collect(),
3035        )),
3036    );
3037    next.insert(
3038        crate::value::intern_key("live_controller_id"),
3039        state
3040            .live_controller_id
3041            .as_ref()
3042            .map(|id| VmValue::String(arcstr::ArcStr::from(id.clone())))
3043            .unwrap_or(VmValue::Nil),
3044    );
3045    next.insert(
3046        crate::value::intern_key("completed_turn_checkpoint_count"),
3047        VmValue::Int(state.completed_turn_checkpoints.len() as i64),
3048    );
3049    next.insert(
3050        crate::value::intern_key("redo_checkpoint_count"),
3051        VmValue::Int(state.redo_stack.len() as i64),
3052    );
3053    VmValue::dict(next)
3054}
3055
3056fn update_lineage(
3057    map: &mut HashMap<String, SessionState>,
3058    parent_id: &str,
3059    child_id: &str,
3060    branched_at_event_index: Option<usize>,
3061) {
3062    let old_parent_id = map.get(child_id).and_then(|child| child.parent_id.clone());
3063    if let Some(old_parent_id) = old_parent_id.filter(|old_parent_id| old_parent_id != parent_id) {
3064        if let Some(old_parent) = map.get_mut(&old_parent_id) {
3065            old_parent.child_ids.retain(|id| id != child_id);
3066            old_parent.touch();
3067        }
3068    }
3069    if let Some(parent) = map.get_mut(parent_id) {
3070        parent.touch();
3071        if !parent.child_ids.iter().any(|id| id == child_id) {
3072            parent.child_ids.push(child_id.to_string());
3073        }
3074    }
3075    if let Some(child) = map.get_mut(child_id) {
3076        child.touch();
3077        child.parent_id = Some(parent_id.to_string());
3078        child.branched_at_event_index = branched_at_event_index;
3079        child.transcript = clone_transcript_with_parent(&child.transcript, parent_id);
3080    }
3081}
3082
3083fn branch_event_index(transcript: &VmValue, keep_first: usize) -> usize {
3084    if keep_first == 0 {
3085        return 0;
3086    }
3087    let Some(dict) = transcript.as_dict() else {
3088        return keep_first;
3089    };
3090    let Some(VmValue::List(events)) = dict.get("events") else {
3091        return keep_first;
3092    };
3093    event_prefix_len_for_messages(events, keep_first)
3094}
3095
3096fn event_kind(event: &VmValue) -> Option<String> {
3097    event
3098        .as_dict()
3099        .and_then(|dict| dict.get("kind"))
3100        .map(VmValue::display)
3101}
3102
3103fn event_id(event: &VmValue) -> Option<String> {
3104    event
3105        .as_dict()
3106        .and_then(|dict| dict.get("id"))
3107        .map(VmValue::display)
3108}
3109
3110fn is_turn_event(event: &VmValue) -> bool {
3111    matches!(
3112        event_kind(event).as_deref(),
3113        Some("message" | "tool_result")
3114    )
3115}
3116
3117fn event_prefix_len_for_messages(events: &[VmValue], keep_first: usize) -> usize {
3118    if keep_first == 0 {
3119        return 0;
3120    }
3121    let mut retained_messages = 0usize;
3122    for (index, event) in events.iter().enumerate() {
3123        if is_turn_event(event) {
3124            retained_messages += 1;
3125            if retained_messages == keep_first {
3126                return index + 1;
3127            }
3128        }
3129    }
3130    events.len()
3131}
3132
3133fn turn_event_id_for_count(events: &[VmValue], keep_first: usize) -> Option<String> {
3134    if keep_first == 0 {
3135        return None;
3136    }
3137    let mut retained_messages = 0usize;
3138    for event in events {
3139        if is_turn_event(event) {
3140            retained_messages += 1;
3141            if retained_messages == keep_first {
3142                return event_id(event);
3143            }
3144        }
3145    }
3146    None
3147}
3148
3149#[cfg(test)]
3150#[path = "agent_sessions_tests.rs"]
3151mod tests;