Skip to main content

harn_vm/
agent_sessions.rs

1//! First-class session storage.
2//!
3//! A session owns three things:
4//!
5//! 1. A transcript dict (messages, events, summary, metadata, …).
6//! 2. Closure subscribers that fire on agent-loop events for this session.
7//! 3. Its own lifecycle (open, reset, fork, trim, compact, close).
8//!
9//! Storage is thread-local because session lifecycle and subscriber dispatch
10//! are owned by the current-thread agent-loop worker. The subscribers register,
11//! fire, and unregister on that same thread, keeping ordering deterministic.
12//!
13//! Lifecycle is explicit. Builtins (`agent_session_open`,
14//! `_reset`, `_fork`, `_fork_at`, `_close`, `_trim`, `_compact`,
15//! `_inject`, `_exists`, `_length`, `_snapshot`, `_ancestry`) drive
16//! the store directly — there is no "policy" config dict that
17//! performs lifecycle as a side effect.
18
19use crate::value::VmDictExt;
20use std::cell::{Cell, RefCell};
21use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
22use std::future::Future;
23use std::path::{Path, PathBuf};
24use std::sync::{Mutex, OnceLock};
25use std::time::Instant;
26
27use crate::actor_chain::ActorChain;
28use crate::runtime_limits::RuntimeLimits;
29use crate::value::VmValue;
30use crate::workspace_anchor::{
31    MountMode, MountedRoot, WorkspaceAnchor, WorkspacePolicy, WORKSPACE_ANCHOR_METADATA_KEY,
32};
33
34const LIVE_CLIENT_EVENT_KIND: &str = "live_session_client";
35const LIVE_CLIENT_PERMISSION_EVENT_KIND: &str = "live_session_permission_route";
36
37/// Default cap on concurrent sessions per VM thread. Beyond this the
38/// least-recently-accessed session is evicted on the next `open`.
39pub const DEFAULT_SESSION_CAP: usize = RuntimeLimits::DEFAULT.max_agent_sessions;
40
41/// Default cap on retained prompt-visible messages per session. The
42/// limit is intentionally high enough for normal long-running agents
43/// while still bounding accidental unbounded growth.
44pub const DEFAULT_TRANSCRIPT_MESSAGE_CAP: usize = 4096;
45
46/// Default cap on retained transcript audit events per session. Events
47/// include message-derived entries plus orchestration lifecycle records.
48pub const DEFAULT_TRANSCRIPT_EVENT_CAP: usize = 32768;
49pub const MAX_SCRATCHPAD_BYTES: usize = 16 * 1024;
50#[cfg(debug_assertions)]
51const CACHE_STABLE_SYSTEM_PROMPT_DIAGNOSTIC: &str = "HARN-CACHE-001";
52
53#[derive(Clone, Debug, PartialEq, Eq)]
54pub enum TranscriptBudgetRecovery {
55    Reject,
56    Trim { keep_last: usize },
57    Compact { keep_last: usize },
58}
59
60#[derive(Clone, Debug, PartialEq, Eq)]
61pub struct SessionTranscriptBudgetPolicy {
62    pub max_messages: usize,
63    pub max_events: usize,
64    pub max_approx_bytes: Option<usize>,
65    pub recovery: TranscriptBudgetRecovery,
66}
67
68impl SessionTranscriptBudgetPolicy {
69    pub fn reject(max_messages: usize, max_events: usize) -> Self {
70        Self {
71            max_messages: max_messages.max(1),
72            max_events: max_events.max(1),
73            max_approx_bytes: None,
74            recovery: TranscriptBudgetRecovery::Reject,
75        }
76    }
77
78    pub fn trim(max_messages: usize, max_events: usize, keep_last: usize) -> Self {
79        Self {
80            max_messages: max_messages.max(1),
81            max_events: max_events.max(1),
82            max_approx_bytes: None,
83            recovery: TranscriptBudgetRecovery::Trim { keep_last },
84        }
85    }
86
87    pub fn compact(max_messages: usize, max_events: usize, keep_last: usize) -> Self {
88        Self {
89            max_messages: max_messages.max(1),
90            max_events: max_events.max(1),
91            max_approx_bytes: None,
92            recovery: TranscriptBudgetRecovery::Compact { keep_last },
93        }
94    }
95
96    pub fn with_max_approx_bytes(mut self, max_approx_bytes: Option<usize>) -> Self {
97        self.max_approx_bytes = max_approx_bytes.map(|limit| limit.max(1));
98        self
99    }
100
101    fn normalized(&self) -> Self {
102        Self {
103            max_messages: self.max_messages.max(1),
104            max_events: self.max_events.max(1),
105            max_approx_bytes: self.max_approx_bytes.map(|limit| limit.max(1)),
106            recovery: self.recovery.clone(),
107        }
108    }
109}
110
111impl Default for SessionTranscriptBudgetPolicy {
112    fn default() -> Self {
113        Self::reject(DEFAULT_TRANSCRIPT_MESSAGE_CAP, DEFAULT_TRANSCRIPT_EVENT_CAP)
114    }
115}
116
117pub struct SessionState {
118    pub id: String,
119    pub transcript: VmValue,
120    pub subscribers: Vec<VmValue>,
121    pub created_at: String,
122    pub last_accessed: Instant,
123    pub parent_id: Option<String>,
124    pub child_ids: Vec<String>,
125    pub branched_at_event_index: Option<usize>,
126    pub actor_chain: Option<ActorChain>,
127    /// Names of skills that were active at the end of the most recent
128    /// `agent_loop` run on this session. Empty when no skills were
129    /// matched, when the skill system wasn't used, or when the
130    /// deactivation phase cleared them. Re-entering the session
131    /// restores these as the initial active set before matching runs.
132    pub active_skills: Vec<String>,
133    /// Tool-calling protocol claimed by the first agent loop that uses
134    /// this session. A transcript is only replayable under the same
135    /// contract that produced its prompt/history.
136    pub tool_format: Option<String>,
137    /// Stable session-level system prompt material. This is transcript
138    /// metadata, not a replay message: providers receive it through
139    /// their system/developer instruction channel on each call.
140    pub system_prompt: Option<String>,
141    /// Session-pinned model selector. When set, `llm_call` invocations
142    /// that do not pass an explicit `model:` option resolve to this
143    /// selector instead of `HARN_LLM_MODEL` / provider defaults. Mid-
144    /// session swap is exposed over ACP via `session/set_config_option`
145    /// (configId="model").
146    pub pinned_model: Option<String>,
147    /// Session-pinned high-level reasoning policy. When set, `llm_call`
148    /// invocations that do not pass explicit `thinking` or
149    /// `reasoning_effort` options resolve this provider-aware policy into
150    /// the route's native thinking shape. Exposed over ACP as
151    /// `session/set_config_option(configId="thought_level")`.
152    pub pinned_reasoning_policy: Option<String>,
153    /// Session-local workspace defaults. Persona and host policy layers
154    /// can update this without rewriting the current anchor.
155    pub workspace_policy: WorkspacePolicy,
156    /// Typed workspace anchor for the session. Primary path plus any
157    /// additional mounted roots; consumed by permission matchers, the
158    /// bundle exporter, and host-side cross-project handoff flows
159    /// (epic #2208). `None` until a host opens the session with one or
160    /// the ACP `reanchor` / `add_root` primitives populate it.
161    pub workspace_anchor: Option<WorkspaceAnchor>,
162    /// Small session-local working memory rendered into agent prompts by the
163    /// Harn stdlib agent loop. This is live state, not a replayed message.
164    pub scratchpad: Option<VmValue>,
165    pub scratchpad_version: u64,
166    pub transcript_budget_policy: SessionTranscriptBudgetPolicy,
167    pub last_transcript_budget_action: Option<serde_json::Value>,
168    pub live_clients: BTreeMap<String, LiveSessionClient>,
169    pub live_controller_id: Option<String>,
170    pub completed_turn_checkpoints: Vec<SessionTurnCheckpoint>,
171    pub redo_stack: Vec<SessionRedoEntry>,
172}
173
174impl SessionState {
175    fn new(id: String) -> Self {
176        let now = Instant::now();
177        let transcript = empty_transcript(&id);
178        Self {
179            id,
180            transcript,
181            subscribers: Vec::new(),
182            created_at: crate::orchestration::now_rfc3339(),
183            last_accessed: now,
184            parent_id: None,
185            child_ids: Vec::new(),
186            branched_at_event_index: None,
187            actor_chain: None,
188            active_skills: Vec::new(),
189            tool_format: None,
190            system_prompt: None,
191            pinned_model: None,
192            pinned_reasoning_policy: None,
193            workspace_policy: WorkspacePolicy::default(),
194            workspace_anchor: None,
195            scratchpad: None,
196            scratchpad_version: 0,
197            transcript_budget_policy: default_transcript_budget_policy(),
198            last_transcript_budget_action: None,
199            live_clients: BTreeMap::new(),
200            live_controller_id: None,
201            completed_turn_checkpoints: Vec::new(),
202            redo_stack: Vec::new(),
203        }
204    }
205
206    fn touch(&mut self) {
207        self.last_accessed = Instant::now();
208    }
209
210    fn replace_transcript(&mut self, transcript: VmValue) {
211        if !crate::values_equal(&self.transcript, &transcript) {
212            self.redo_stack.clear();
213        }
214        self.transcript = transcript;
215        self.touch();
216    }
217}
218
219#[derive(Clone, Debug, PartialEq, Eq)]
220pub enum LiveClientMode {
221    Observer,
222    Controller,
223}
224
225impl LiveClientMode {
226    fn as_str(&self) -> &'static str {
227        match self {
228            Self::Observer => "observer",
229            Self::Controller => "controller",
230        }
231    }
232}
233
234#[derive(Clone, Debug, PartialEq, Eq)]
235pub struct LiveSessionClient {
236    pub client_id: String,
237    pub mode: LiveClientMode,
238    pub attached_at: String,
239    pub last_seen_at: String,
240    pub prompt_injection: bool,
241    pub permission_routing: bool,
242    pub metadata: serde_json::Value,
243}
244
245#[derive(Clone, Debug, PartialEq, Eq)]
246pub struct AttachLiveClient {
247    pub client_id: String,
248    pub mode: LiveClientMode,
249    pub takeover: bool,
250    pub prompt_injection: bool,
251    pub permission_routing: bool,
252    pub metadata: serde_json::Value,
253}
254
255#[derive(Clone, Debug, PartialEq, Eq)]
256pub struct LiveClientChange {
257    pub client: Option<LiveSessionClient>,
258    pub previous_controller_id: Option<String>,
259    pub active_controller_id: Option<String>,
260    pub clients: Vec<LiveSessionClient>,
261}
262
263#[derive(Clone, Debug, PartialEq, Eq)]
264pub struct SessionAncestry {
265    pub parent_id: Option<String>,
266    pub child_ids: Vec<String>,
267    pub root_id: String,
268}
269
270#[derive(Clone, Debug, PartialEq, Eq)]
271pub struct SessionTruncateResult {
272    pub kept_turn_count: usize,
273    pub removed_turn_count: usize,
274    pub new_tip_turn_id: Option<String>,
275}
276
277#[derive(Clone, Debug, PartialEq, Eq)]
278pub struct SessionCheckpointSummary {
279    pub checkpoint_id: String,
280    pub before_message_count: usize,
281    pub after_message_count: usize,
282    pub fs_snapshot_ids: Vec<String>,
283}
284
285#[derive(Clone, Debug)]
286pub struct SessionTurnCheckpoint {
287    pub checkpoint_id: String,
288    pub completed_at: String,
289    pub before_transcript: VmValue,
290    pub after_transcript: VmValue,
291    pub before_message_count: usize,
292    pub after_message_count: usize,
293    pub fs_snapshot_ids: Vec<String>,
294}
295
296#[derive(Clone, Debug)]
297pub struct SessionRedoEntry {
298    pub checkpoint: SessionTurnCheckpoint,
299    pub redo_fs_snapshot_ids: Vec<String>,
300}
301
302#[derive(Clone, Debug, PartialEq, Eq)]
303pub enum SessionCheckpointError {
304    UnknownSession,
305    NoCheckpoint,
306    NoRedo,
307}
308
309#[derive(Clone, Debug, PartialEq, Eq)]
310pub struct SessionCheckpointOutcome {
311    pub status: &'static str,
312    pub checkpoint: SessionCheckpointSummary,
313    pub redo_fs_snapshot_ids: Vec<String>,
314}
315
316#[derive(Clone, Debug, PartialEq, Eq)]
317pub struct ReminderInjectionReport {
318    pub reminder_id: String,
319    pub deduped_count: usize,
320}
321
322thread_local! {
323    static SESSIONS: RefCell<HashMap<String, SessionState>> = RefCell::new(HashMap::new());
324    static SESSION_CAP: Cell<usize> = const { Cell::new(DEFAULT_SESSION_CAP) };
325    static DEFAULT_TRANSCRIPT_BUDGET_POLICY: RefCell<SessionTranscriptBudgetPolicy> =
326        RefCell::new(SessionTranscriptBudgetPolicy::default());
327    static CURRENT_SESSION_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
328    static CURRENT_TOOL_CALL_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
329}
330
331tokio::task_local! {
332    static CURRENT_TOOL_CALL_TASK: String;
333}
334
335/// Per-session set of filesystem paths a session has mutated (written or
336/// deleted) via the deterministic hostlib write surface. Keyed by session id
337/// and process-global (not thread-local like [`SESSIONS`]) so a background
338/// fan-out child's writes are attributed correctly regardless of which runtime
339/// thread serviced them. Fed by the single hostlib write chokepoint
340/// (`harn-hostlib`'s `fs_snapshot::auto_capture_for_write`), which is reached
341/// only AFTER a write is policy-approved and about to touch disk — so this
342/// reflects ACTUAL committed mutations, not denied/aborted attempts. The
343/// authoritative source for a sub-agent's `files_written` receipt field.
344static SESSION_CHANGED_PATHS: OnceLock<Mutex<BTreeMap<String, BTreeSet<String>>>> = OnceLock::new();
345
346fn session_changed_paths_store() -> &'static Mutex<BTreeMap<String, BTreeSet<String>>> {
347    SESSION_CHANGED_PATHS.get_or_init(|| Mutex::new(BTreeMap::new()))
348}
349
350/// Record that `path` was mutated (written/deleted) by `session_id`. Called from
351/// the hostlib write chokepoint; a no-op when either argument is empty.
352pub fn record_session_changed_path(session_id: &str, path: &str) {
353    if session_id.is_empty() || path.is_empty() {
354        return;
355    }
356    if let Ok(mut store) = session_changed_paths_store().lock() {
357        store
358            .entry(session_id.to_string())
359            .or_default()
360            .insert(path.to_string());
361    }
362}
363
364/// The sorted set of paths `session_id` has mutated so far (empty when none).
365/// Non-draining: safe to call for introspection without disturbing the record.
366pub fn session_changed_paths(session_id: &str) -> Vec<String> {
367    session_changed_paths_store()
368        .lock()
369        .ok()
370        .and_then(|store| {
371            store
372                .get(session_id)
373                .map(|set| set.iter().cloned().collect())
374        })
375        .unwrap_or_default()
376}
377
378/// Read AND remove a session's mutated-path set. Used at sub-agent teardown so
379/// the receipt captures the child's writes exactly once and the global map does
380/// not grow unbounded across a long-lived daemon's many fan-outs.
381pub fn take_session_changed_paths(session_id: &str) -> Vec<String> {
382    session_changed_paths_store()
383        .lock()
384        .ok()
385        .and_then(|mut store| store.remove(session_id))
386        .map(|set| set.into_iter().collect())
387        .unwrap_or_default()
388}
389
390/// Drop a session's recorded mutated paths (explicit teardown / test reset).
391pub fn clear_session_changed_paths(session_id: &str) {
392    if let Ok(mut store) = session_changed_paths_store().lock() {
393        store.remove(session_id);
394    }
395}
396
397pub struct CurrentSessionGuard {
398    active: bool,
399}
400
401impl Drop for CurrentSessionGuard {
402    fn drop(&mut self) {
403        if self.active {
404            pop_current_session();
405        }
406    }
407}
408
409/// RAII guard that scopes the active tool-call id for the running thread.
410///
411/// Set on entry to a tool dispatch and dropped on exit, so any hostlib
412/// builtin invoked under it (e.g. `tools/write_file`) can resolve the
413/// owning tool call without threading the id through every parameter.
414pub struct CurrentToolCallGuard {
415    active: bool,
416}
417
418impl Drop for CurrentToolCallGuard {
419    fn drop(&mut self) {
420        if self.active {
421            pop_current_tool_call();
422        }
423    }
424}
425
426/// Set the per-thread session cap. Primarily for tests; production VMs
427/// inherit the default.
428pub fn set_session_cap(cap: usize) {
429    SESSION_CAP.with(|c| c.set(cap.max(1)));
430}
431
432pub fn session_cap() -> usize {
433    SESSION_CAP.with(|c| c.get())
434}
435
436pub fn set_default_transcript_budget_policy(policy: SessionTranscriptBudgetPolicy) {
437    DEFAULT_TRANSCRIPT_BUDGET_POLICY.with(|cell| {
438        *cell.borrow_mut() = policy.normalized();
439    });
440}
441
442pub fn reset_default_transcript_budget_policy() {
443    set_default_transcript_budget_policy(SessionTranscriptBudgetPolicy::default());
444}
445
446pub fn default_transcript_budget_policy() -> SessionTranscriptBudgetPolicy {
447    DEFAULT_TRANSCRIPT_BUDGET_POLICY.with(|cell| cell.borrow().clone())
448}
449
450pub fn transcript_budget_policy(id: &str) -> Option<SessionTranscriptBudgetPolicy> {
451    SESSIONS.with(|s| {
452        s.borrow()
453            .get(id)
454            .map(|state| state.transcript_budget_policy.clone())
455    })
456}
457
458pub fn set_transcript_budget_policy(
459    id: &str,
460    policy: SessionTranscriptBudgetPolicy,
461) -> Result<(), String> {
462    SESSIONS.with(|s| {
463        let mut map = s.borrow_mut();
464        let Some(state) = map.get_mut(id) else {
465            return Err(format!("agent session '{id}' does not exist"));
466        };
467        let previous = state.transcript_budget_policy.clone();
468        let previous_action = state.last_transcript_budget_action.clone();
469        state.transcript_budget_policy = policy.normalized();
470        let candidate = state.transcript.clone();
471        if let Err(error) = apply_transcript_with_budget(state, candidate, "policy_update") {
472            state.transcript_budget_policy = previous;
473            state.last_transcript_budget_action = previous_action;
474            return Err(error);
475        }
476        Ok(())
477    })
478}
479
480/// Clear the session store. Wired into `reset_llm_state` for test isolation.
481pub fn reset_session_store() {
482    SESSIONS.with(|s| s.borrow_mut().clear());
483    CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().clear());
484    CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow_mut().clear());
485    reset_default_transcript_budget_policy();
486}
487
488pub(crate) fn push_current_session(id: String) {
489    if id.is_empty() {
490        return;
491    }
492    CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().push(id));
493}
494
495pub(crate) fn swap_current_session_stack(replacement: Vec<String>) -> Vec<String> {
496    CURRENT_SESSION_STACK.with(|stack| std::mem::replace(&mut *stack.borrow_mut(), replacement))
497}
498
499pub(crate) fn pop_current_session() {
500    CURRENT_SESSION_STACK.with(|stack| {
501        let _ = stack.borrow_mut().pop();
502    });
503}
504
505pub fn current_session_id() -> Option<String> {
506    CURRENT_SESSION_STACK.with(|stack| stack.borrow().last().cloned())
507}
508
509pub fn current_actor_chain() -> Option<ActorChain> {
510    current_session_id().as_deref().and_then(actor_chain)
511}
512
513pub fn enter_current_session(id: impl Into<String>) -> CurrentSessionGuard {
514    let id = id.into();
515    if id.trim().is_empty() {
516        return CurrentSessionGuard { active: false };
517    }
518    push_current_session(id);
519    CurrentSessionGuard { active: true }
520}
521
522pub fn actor_chain(id: &str) -> Option<ActorChain> {
523    SESSIONS.with(|s| {
524        s.borrow()
525            .get(id)
526            .and_then(|state| state.actor_chain.clone())
527    })
528}
529
530pub fn set_actor_chain(id: &str, actor_chain: Option<ActorChain>) -> Result<bool, String> {
531    SESSIONS.with(|s| {
532        let mut map = s.borrow_mut();
533        let Some(state) = map.get_mut(id) else {
534            return Err(format!("agent session '{id}' does not exist"));
535        };
536        let changed = state.actor_chain != actor_chain;
537        state.actor_chain = actor_chain;
538        state.touch();
539        Ok(changed)
540    })
541}
542
543fn push_current_tool_call(id: String) {
544    if id.is_empty() {
545        return;
546    }
547    CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow_mut().push(id));
548}
549
550fn pop_current_tool_call() {
551    CURRENT_TOOL_CALL_STACK.with(|stack| {
552        let _ = stack.borrow_mut().pop();
553    });
554}
555
556/// Return the active tool-call id for the current thread, if any.
557///
558/// Hostlib builtins consult this to attribute side-effect snapshots to
559/// the owning ACP `toolCallId` without callers passing it explicitly.
560pub fn current_tool_call_id() -> Option<String> {
561    if let Ok(id) = CURRENT_TOOL_CALL_TASK.try_with(Clone::clone) {
562        if !id.trim().is_empty() {
563            return Some(id);
564        }
565    }
566    CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow().last().cloned())
567}
568
569/// Scope the active tool-call id to one async task.
570///
571/// Parallel tool dispatch runs sibling calls on the same OS thread, so
572/// thread-local guards alone cannot preserve attribution across `.await`
573/// points. Tokio task-local scoping follows the future instead.
574pub async fn scope_current_tool_call<F, T>(id: impl Into<String>, future: F) -> T
575where
576    F: Future<Output = T>,
577{
578    let id = id.into();
579    if id.trim().is_empty() {
580        future.await
581    } else {
582        CURRENT_TOOL_CALL_TASK.scope(id, future).await
583    }
584}
585
586/// Scope the active tool-call id for the duration of the returned guard.
587pub fn enter_current_tool_call(id: impl Into<String>) -> CurrentToolCallGuard {
588    let id = id.into();
589    if id.trim().is_empty() {
590        return CurrentToolCallGuard { active: false };
591    }
592    push_current_tool_call(id);
593    CurrentToolCallGuard { active: true }
594}
595
596pub fn exists(id: &str) -> bool {
597    SESSIONS.with(|s| s.borrow().contains_key(id))
598}
599
600pub fn length(id: &str) -> Option<usize> {
601    SESSIONS.with(|s| {
602        s.borrow().get(id).map(|state| {
603            state
604                .transcript
605                .as_dict()
606                .and_then(|d| d.get("messages"))
607                .and_then(|v| match v {
608                    VmValue::List(list) => Some(list.len()),
609                    _ => None,
610                })
611                .unwrap_or(0)
612        })
613    })
614}
615
616pub fn scratchpad(id: &str) -> Option<VmValue> {
617    SESSIONS.with(|s| {
618        s.borrow()
619            .get(id)
620            .and_then(|state| state.scratchpad.clone())
621    })
622}
623
624pub fn scratchpad_version(id: &str) -> Option<u64> {
625    SESSIONS.with(|s| s.borrow().get(id).map(|state| state.scratchpad_version))
626}
627
628pub fn set_scratchpad(
629    id: &str,
630    scratchpad: VmValue,
631    source: impl Into<String>,
632    reason: Option<String>,
633    metadata: serde_json::Value,
634) -> Result<u64, String> {
635    validate_scratchpad_value(&scratchpad)?;
636    SESSIONS.with(|s| {
637        let mut map = s.borrow_mut();
638        let Some(state) = map.get_mut(id) else {
639            return Err(format!("agent session '{id}' does not exist"));
640        };
641        let version = state.scratchpad_version.saturating_add(1);
642        let event = scratchpad_transcript_event(
643            "set",
644            version,
645            Some(&scratchpad),
646            source.into(),
647            reason,
648            metadata,
649        );
650        append_event_to_state(state, event, "set_scratchpad")?;
651        state.scratchpad = Some(scratchpad);
652        state.scratchpad_version = version;
653        state.touch();
654        Ok(version)
655    })
656}
657
658pub fn clear_scratchpad(
659    id: &str,
660    source: impl Into<String>,
661    reason: Option<String>,
662    metadata: serde_json::Value,
663) -> Result<u64, String> {
664    SESSIONS.with(|s| {
665        let mut map = s.borrow_mut();
666        let Some(state) = map.get_mut(id) else {
667            return Err(format!("agent session '{id}' does not exist"));
668        };
669        let version = state.scratchpad_version.saturating_add(1);
670        let event =
671            scratchpad_transcript_event("clear", version, None, source.into(), reason, metadata);
672        append_event_to_state(state, event, "clear_scratchpad")?;
673        state.scratchpad = None;
674        state.scratchpad_version = version;
675        state.touch();
676        Ok(version)
677    })
678}
679
680fn validate_scratchpad_value(value: &VmValue) -> Result<(), String> {
681    if !matches!(value, VmValue::Dict(_)) {
682        return Err("agent session scratchpad must be a dict".to_string());
683    }
684    let json = crate::llm::helpers::vm_value_to_json(value);
685    let approx_bytes = serde_json::to_vec(&json)
686        .map(|bytes| bytes.len())
687        .unwrap_or(usize::MAX);
688    if approx_bytes > MAX_SCRATCHPAD_BYTES {
689        return Err(format!(
690            "agent session scratchpad is {approx_bytes} bytes; max is {MAX_SCRATCHPAD_BYTES}"
691        ));
692    }
693    Ok(())
694}
695
696fn scratchpad_transcript_event(
697    action: &str,
698    version: u64,
699    scratchpad: Option<&VmValue>,
700    source: String,
701    reason: Option<String>,
702    metadata: serde_json::Value,
703) -> VmValue {
704    let scratchpad_json = scratchpad.map(crate::llm::helpers::vm_value_to_json);
705    let approx_bytes = scratchpad_json
706        .as_ref()
707        .and_then(|value| serde_json::to_vec(value).ok().map(|bytes| bytes.len()))
708        .unwrap_or(0);
709    let event_metadata = serde_json::json!({
710        "action": action,
711        "version": version,
712        "source": normalize_scratchpad_source(source),
713        "reason": reason.unwrap_or_default(),
714        "approx_bytes": approx_bytes,
715        "counts": scratchpad_json
716            .as_ref()
717            .map(scratchpad_counts_json)
718            .unwrap_or_else(|| serde_json::json!({})),
719        "metadata": metadata,
720    });
721    let content = format!("Agent scratchpad {action}");
722    crate::llm::helpers::transcript_event(
723        "agent_scratchpad",
724        "system",
725        "internal",
726        &content,
727        Some(event_metadata),
728    )
729}
730
731fn normalize_scratchpad_source(source: String) -> String {
732    let trimmed = source.trim();
733    if trimmed.is_empty() {
734        "harn.agent_scratchpad".to_string()
735    } else {
736        trimmed.to_string()
737    }
738}
739
740fn scratchpad_counts_json(value: &serde_json::Value) -> serde_json::Value {
741    serde_json::json!({
742        "goals": scratchpad_array_len(value, "goals"),
743        "open_items": scratchpad_array_len(value, "open_items"),
744        "facts": scratchpad_array_len(value, "facts"),
745        "refs": scratchpad_array_len(value, "refs"),
746    })
747}
748
749fn scratchpad_array_len(value: &serde_json::Value, key: &str) -> usize {
750    value
751        .get(key)
752        .and_then(serde_json::Value::as_array)
753        .map(Vec::len)
754        .unwrap_or(0)
755}
756
757pub fn snapshot(id: &str) -> Option<VmValue> {
758    SESSIONS.with(|s| s.borrow().get(id).map(session_snapshot))
759}
760
761/// Session-only fields stay on `agent_session_snapshot`.
762pub fn transcript(id: &str) -> Option<VmValue> {
763    SESSIONS.with(|s| {
764        s.borrow()
765            .get(id)
766            .map(|state| transcript_with_session_metadata(state.transcript.clone(), state))
767    })
768}
769
770/// Open a session, or create it if missing. Returns the resolved id.
771///
772/// Newly-created sessions auto-register an event-log-backed sink when a
773/// generalized [`crate::event_log::EventLog`] has been installed for the
774/// current VM thread. For legacy env-driven workflows that still point
775/// `HARN_EVENT_LOG_DIR` at a directory, we preserve the older JSONL sink
776/// as a compatibility fallback. Re-opening an existing session does not
777/// re-register — sinks are per-session, owned by the first opener.
778pub fn open_or_create(id: Option<String>) -> String {
779    open_or_create_with_actor_chain(id, None)
780}
781
782pub fn open_or_create_with_actor_chain(
783    id: Option<String>,
784    requested_actor_chain: Option<ActorChain>,
785) -> String {
786    let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
787    let parent_session = current_session_id();
788    let inherited_actor_chain = requested_actor_chain
789        .clone()
790        .or_else(|| parent_session.as_deref().and_then(actor_chain));
791    let mut was_new = false;
792    SESSIONS.with(|s| {
793        let mut map = s.borrow_mut();
794        if let Some(state) = map.get_mut(&resolved) {
795            if let Some(actor_chain) = requested_actor_chain.clone() {
796                state.actor_chain = Some(actor_chain);
797            }
798            state.touch();
799            return;
800        }
801        was_new = true;
802        let cap = SESSION_CAP.with(|c| c.get());
803        if map.len() >= cap {
804            if let Some(victim) = map
805                .iter()
806                .min_by_key(|(_, state)| state.last_accessed)
807                .map(|(id, _)| id.clone())
808            {
809                map.remove(&victim);
810            }
811        }
812        let mut state = SessionState::new(resolved.clone());
813        state.actor_chain = inherited_actor_chain.clone();
814        map.insert(resolved.clone(), state);
815    });
816    if was_new {
817        if let Some(parent) = parent_session.as_deref() {
818            crate::agent_events::mirror_session_sinks(parent, &resolved);
819        }
820        try_register_event_log(&resolved);
821    }
822    resolved
823}
824
825pub fn open_child_session(parent_id: &str, id: Option<String>) -> String {
826    open_child_session_with_actor(parent_id, id, None)
827}
828
829pub fn open_child_session_with_actor(
830    parent_id: &str,
831    id: Option<String>,
832    actor: Option<&str>,
833) -> String {
834    let actor_chain = actor_chain(parent_id).map(|chain| match actor {
835        Some(actor) if !actor.trim().is_empty() => chain.pushed(actor.trim()),
836        _ => chain,
837    });
838    let resolved = open_or_create_with_actor_chain(id, actor_chain);
839    link_child_session(parent_id, &resolved);
840    resolved
841}
842
843pub fn link_child_session(parent_id: &str, child_id: &str) {
844    link_child_session_with_branch(parent_id, child_id, None);
845}
846
847pub fn link_child_session_with_branch(
848    parent_id: &str,
849    child_id: &str,
850    branched_at_event_index: Option<usize>,
851) {
852    if parent_id == child_id {
853        return;
854    }
855    open_or_create(Some(parent_id.to_string()));
856    open_or_create(Some(child_id.to_string()));
857    SESSIONS.with(|s| {
858        let mut map = s.borrow_mut();
859        update_lineage(&mut map, parent_id, child_id, branched_at_event_index);
860    });
861}
862
863pub fn parent_id(id: &str) -> Option<String> {
864    SESSIONS.with(|s| s.borrow().get(id).and_then(|state| state.parent_id.clone()))
865}
866
867pub fn child_ids(id: &str) -> Vec<String> {
868    SESSIONS.with(|s| {
869        s.borrow()
870            .get(id)
871            .map(|state| state.child_ids.clone())
872            .unwrap_or_default()
873    })
874}
875
876pub fn ancestry(id: &str) -> Option<SessionAncestry> {
877    SESSIONS.with(|s| {
878        let map = s.borrow();
879        let state = map.get(id)?;
880        let mut root_id = state.id.clone();
881        let mut cursor = state.parent_id.clone();
882        let mut seen = HashSet::from([state.id.clone()]);
883        while let Some(parent_id) = cursor {
884            if !seen.insert(parent_id.clone()) {
885                break;
886            }
887            root_id = parent_id.clone();
888            cursor = map
889                .get(&parent_id)
890                .and_then(|parent| parent.parent_id.clone());
891        }
892        Some(SessionAncestry {
893            parent_id: state.parent_id.clone(),
894            child_ids: state.child_ids.clone(),
895            root_id,
896        })
897    })
898}
899
900pub fn live_clients(id: &str) -> Option<Vec<LiveSessionClient>> {
901    SESSIONS.with(|s| {
902        s.borrow()
903            .get(id)
904            .map(|state| state.live_clients.values().cloned().collect())
905    })
906}
907
908pub fn attach_live_client(id: &str, request: AttachLiveClient) -> Result<LiveClientChange, String> {
909    SESSIONS.with(|s| {
910        let mut map = s.borrow_mut();
911        let Some(state) = map.get_mut(id) else {
912            return Err(format!("agent session '{id}' does not exist"));
913        };
914        let client_id = validate_live_client_id(request.client_id)?;
915        let now = crate::orchestration::now_rfc3339();
916        let previous_clients = state.live_clients.clone();
917        let previous_controller_id = state.live_controller_id.clone();
918
919        if request.mode == LiveClientMode::Controller {
920            let conflicting_controller = previous_controller_id
921                .as_ref()
922                .filter(|controller_id| *controller_id != &client_id)
923                .filter(|controller_id| state.live_clients.contains_key(*controller_id));
924            if let Some(previous) = conflicting_controller {
925                if !request.takeover {
926                    return Err(format!("live session already has controller '{previous}'"));
927                }
928                if let Some(previous_client) = state.live_clients.get_mut(previous) {
929                    previous_client.mode = LiveClientMode::Observer;
930                    previous_client.prompt_injection = false;
931                    previous_client.permission_routing = false;
932                    previous_client.last_seen_at = now.clone();
933                }
934            }
935            state.live_controller_id = Some(client_id.clone());
936        } else if state.live_controller_id.as_deref() == Some(client_id.as_str()) {
937            state.live_controller_id = None;
938        }
939
940        let attached_at = state
941            .live_clients
942            .get(&client_id)
943            .map(|client| client.attached_at.clone())
944            .unwrap_or_else(|| now.clone());
945        let client = LiveSessionClient {
946            client_id: client_id.clone(),
947            mode: request.mode,
948            attached_at,
949            last_seen_at: now,
950            prompt_injection: request.prompt_injection,
951            permission_routing: request.permission_routing,
952            metadata: request.metadata,
953        };
954        state.live_clients.insert(client_id, client.clone());
955        state.touch();
956        let active_controller_id = state.live_controller_id.clone();
957        append_live_client_event(
958            state,
959            "attached",
960            Some(&client),
961            previous_controller_id.as_deref(),
962            active_controller_id.as_deref(),
963            serde_json::Value::Null,
964        )
965        .inspect_err(|_error| {
966            state.live_clients = previous_clients;
967            state.live_controller_id = previous_controller_id.clone();
968        })?;
969        Ok(live_client_change(
970            Some(client),
971            previous_controller_id,
972            state,
973        ))
974    })
975}
976
977pub fn takeover_live_client(
978    id: &str,
979    client_id: impl Into<String>,
980    metadata: serde_json::Value,
981) -> Result<LiveClientChange, String> {
982    attach_live_client(
983        id,
984        AttachLiveClient {
985            client_id: client_id.into(),
986            mode: LiveClientMode::Controller,
987            takeover: true,
988            prompt_injection: true,
989            permission_routing: true,
990            metadata,
991        },
992    )
993}
994
995pub fn detach_live_client(
996    id: &str,
997    client_id: impl Into<String>,
998    reason: Option<String>,
999    metadata: serde_json::Value,
1000) -> Result<LiveClientChange, String> {
1001    SESSIONS.with(|s| {
1002        let mut map = s.borrow_mut();
1003        let Some(state) = map.get_mut(id) else {
1004            return Err(format!("agent session '{id}' does not exist"));
1005        };
1006        let client_id = validate_live_client_id(client_id.into())?;
1007        let previous_clients = state.live_clients.clone();
1008        let previous_controller_id = state.live_controller_id.clone();
1009        let Some(mut client) = state.live_clients.remove(&client_id) else {
1010            return Err(format!("live client '{client_id}' is not attached"));
1011        };
1012        client.last_seen_at = crate::orchestration::now_rfc3339();
1013        if state.live_controller_id.as_deref() == Some(client_id.as_str()) {
1014            state.live_controller_id = None;
1015        }
1016        state.touch();
1017        let active_controller_id = state.live_controller_id.clone();
1018        append_live_client_event(
1019            state,
1020            "detached",
1021            Some(&client),
1022            previous_controller_id.as_deref(),
1023            active_controller_id.as_deref(),
1024            serde_json::json!({
1025                "reason": reason.unwrap_or_else(|| "client_detached".to_string()),
1026                "metadata": metadata,
1027            }),
1028        )
1029        .inspect_err(|_error| {
1030            state.live_clients = previous_clients;
1031            state.live_controller_id = previous_controller_id.clone();
1032        })?;
1033        Ok(live_client_change(None, previous_controller_id, state))
1034    })
1035}
1036
1037pub fn heartbeat_live_client(
1038    id: &str,
1039    client_id: impl Into<String>,
1040    metadata: serde_json::Value,
1041) -> Result<LiveClientChange, String> {
1042    SESSIONS.with(|s| {
1043        let mut map = s.borrow_mut();
1044        let Some(state) = map.get_mut(id) else {
1045            return Err(format!("agent session '{id}' does not exist"));
1046        };
1047        let client_id = validate_live_client_id(client_id.into())?;
1048        let previous_clients = state.live_clients.clone();
1049        let previous_controller_id = state.live_controller_id.clone();
1050        let Some(client) = state.live_clients.get_mut(&client_id) else {
1051            return Err(format!("live client '{client_id}' is not attached"));
1052        };
1053        client.last_seen_at = crate::orchestration::now_rfc3339();
1054        if !metadata.is_null() {
1055            client.metadata = metadata.clone();
1056        }
1057        let client = client.clone();
1058        state.touch();
1059        let active_controller_id = state.live_controller_id.clone();
1060        append_live_client_event(
1061            state,
1062            "heartbeat",
1063            Some(&client),
1064            previous_controller_id.as_deref(),
1065            active_controller_id.as_deref(),
1066            serde_json::json!({ "metadata": metadata }),
1067        )
1068        .inspect_err(|_error| {
1069            state.live_clients = previous_clients;
1070            state.live_controller_id = previous_controller_id.clone();
1071        })?;
1072        Ok(live_client_change(
1073            Some(client),
1074            previous_controller_id,
1075            state,
1076        ))
1077    })
1078}
1079
1080pub fn inject_prompt_from_live_client(
1081    id: &str,
1082    client_id: impl Into<String>,
1083    content: VmValue,
1084    metadata: serde_json::Value,
1085) -> Result<(), String> {
1086    let client_id = validate_live_client_id(client_id.into())?;
1087    ensure_live_controller(id, &client_id, LiveControllerCapability::PromptInjection)?;
1088    let mut message = BTreeMap::new();
1089    message.put_str("role", "user");
1090    message.insert("content".to_string(), content);
1091    message.insert(
1092        "metadata".to_string(),
1093        crate::stdlib::json_to_vm_value(&serde_json::json!({
1094            "live_session": {
1095                "client_id": client_id,
1096                "mode": "controller",
1097                "source": "live_session_attach",
1098                "metadata": metadata,
1099            }
1100        })),
1101    );
1102    inject_message(id, VmValue::dict(message))
1103}
1104
1105pub fn route_live_permission_request(
1106    id: &str,
1107    client_id: impl Into<String>,
1108    request: serde_json::Value,
1109    metadata: serde_json::Value,
1110) -> Result<serde_json::Value, String> {
1111    let client_id = validate_live_client_id(client_id.into())?;
1112    let client =
1113        ensure_live_controller(id, &client_id, LiveControllerCapability::PermissionRouting)?;
1114    let request_id = request
1115        .get("id")
1116        .or_else(|| request.get("request_id"))
1117        .and_then(serde_json::Value::as_str)
1118        .filter(|id| !id.trim().is_empty())
1119        .unwrap_or("permission_request");
1120    let event_metadata = serde_json::json!({
1121        "action": "permission_routed",
1122        "client": live_client_json(&client),
1123        "request_id": request_id,
1124        "request": request,
1125        "metadata": metadata,
1126    });
1127    let event = crate::llm::helpers::transcript_event(
1128        LIVE_CLIENT_PERMISSION_EVENT_KIND,
1129        "system",
1130        "internal",
1131        "Live session permission request routed",
1132        Some(event_metadata.clone()),
1133    );
1134    append_event(id, event)?;
1135    Ok(event_metadata)
1136}
1137
1138enum LiveControllerCapability {
1139    PromptInjection,
1140    PermissionRouting,
1141}
1142
1143fn ensure_live_controller(
1144    id: &str,
1145    client_id: &str,
1146    capability: LiveControllerCapability,
1147) -> Result<LiveSessionClient, String> {
1148    SESSIONS.with(|s| {
1149        let map = s.borrow();
1150        let Some(state) = map.get(id) else {
1151            return Err(format!("agent session '{id}' does not exist"));
1152        };
1153        if state.live_controller_id.as_deref() != Some(client_id) {
1154            return Err(format!(
1155                "live client '{client_id}' is not the active controller"
1156            ));
1157        }
1158        let Some(client) = state.live_clients.get(client_id) else {
1159            return Err(format!("live client '{client_id}' is not attached"));
1160        };
1161        match capability {
1162            LiveControllerCapability::PromptInjection if !client.prompt_injection => Err(format!(
1163                "live client '{client_id}' cannot inject prompts for this session"
1164            )),
1165            LiveControllerCapability::PermissionRouting if !client.permission_routing => Err(
1166                format!("live client '{client_id}' cannot route permissions for this session"),
1167            ),
1168            _ => Ok(client.clone()),
1169        }
1170    })
1171}
1172
1173fn append_live_client_event(
1174    state: &mut SessionState,
1175    action: &str,
1176    client: Option<&LiveSessionClient>,
1177    previous_controller_id: Option<&str>,
1178    active_controller_id: Option<&str>,
1179    extra: serde_json::Value,
1180) -> Result<(), String> {
1181    let metadata = serde_json::json!({
1182        "action": action,
1183        "session_id": state.id,
1184        "client": client.map(live_client_json),
1185        "previous_controller_id": previous_controller_id,
1186        "active_controller_id": active_controller_id,
1187        "clients": state
1188            .live_clients
1189            .values()
1190            .map(live_client_json)
1191            .collect::<Vec<_>>(),
1192        "extra": extra,
1193    });
1194    let event = crate::llm::helpers::transcript_event(
1195        LIVE_CLIENT_EVENT_KIND,
1196        "system",
1197        "internal",
1198        "Live session client lifecycle changed",
1199        Some(metadata),
1200    );
1201    append_event_to_state(state, event, "live_client")
1202}
1203
1204fn live_client_change(
1205    client: Option<LiveSessionClient>,
1206    previous_controller_id: Option<String>,
1207    state: &SessionState,
1208) -> LiveClientChange {
1209    LiveClientChange {
1210        client,
1211        previous_controller_id,
1212        active_controller_id: state.live_controller_id.clone(),
1213        clients: state.live_clients.values().cloned().collect(),
1214    }
1215}
1216
1217fn validate_live_client_id(id: impl Into<String>) -> Result<String, String> {
1218    let id = id.into();
1219    let trimmed = id.trim();
1220    if trimmed.is_empty() {
1221        return Err("live client id cannot be empty".to_string());
1222    }
1223    Ok(trimmed.to_string())
1224}
1225
1226pub fn live_client_json(client: &LiveSessionClient) -> serde_json::Value {
1227    serde_json::json!({
1228        "client_id": client.client_id,
1229        "mode": client.mode.as_str(),
1230        "attached_at": client.attached_at,
1231        "last_seen_at": client.last_seen_at,
1232        "prompt_injection": client.prompt_injection,
1233        "permission_routing": client.permission_routing,
1234        "metadata": client.metadata,
1235    })
1236}
1237
1238pub fn live_client_change_json(change: &LiveClientChange) -> serde_json::Value {
1239    serde_json::json!({
1240        "client": change.client.as_ref().map(live_client_json),
1241        "previous_controller_id": change.previous_controller_id,
1242        "active_controller_id": change.active_controller_id,
1243        "clients": change
1244            .clients
1245            .iter()
1246            .map(live_client_json)
1247            .collect::<Vec<_>>(),
1248    })
1249}
1250
1251/// Auto-register a persistent sink for a newly-created session.
1252/// Silent no-op on failure — a broken observability sink must never
1253/// prevent a session from starting.
1254fn try_register_event_log(session_id: &str) {
1255    if let Some(log) = crate::event_log::active_event_log() {
1256        crate::agent_events::register_sink(
1257            session_id,
1258            crate::agent_events::EventLogSink::new(log, session_id),
1259        );
1260        return;
1261    }
1262    let Ok(dir) = std::env::var("HARN_EVENT_LOG_DIR") else {
1263        return;
1264    };
1265    if dir.is_empty() {
1266        return;
1267    }
1268    let path = std::path::PathBuf::from(dir).join(format!("event_log-{session_id}.jsonl"));
1269    if let Ok(sink) = crate::agent_events::JsonlEventSink::open(&path) {
1270        crate::agent_events::register_sink(session_id, sink);
1271    }
1272}
1273
1274pub fn register_event_log_sink(session_id: &str) {
1275    try_register_event_log(session_id);
1276}
1277
1278pub fn close(id: &str) {
1279    SESSIONS.with(|s| {
1280        s.borrow_mut().remove(id);
1281    });
1282    // Cross-thread per-session state must be released too, otherwise
1283    // pending inbox entries can be delivered to a future session that
1284    // happens to reuse the same id.
1285    crate::orchestration::agent_inbox::clear_session(id);
1286    crate::agent_events::clear_session_sinks(id);
1287}
1288
1289pub fn close_with_status(
1290    id: &str,
1291    reason: impl Into<String>,
1292    status: impl Into<String>,
1293    metadata: serde_json::Value,
1294) -> bool {
1295    if !exists(id) {
1296        return false;
1297    }
1298    let reason = reason.into();
1299    let status = status.into();
1300    let event_metadata = serde_json::json!({
1301        "reason": reason,
1302        "status": status,
1303        "metadata": metadata,
1304    });
1305    let transcript_event = crate::llm::helpers::transcript_event(
1306        "agent_session_closed",
1307        "system",
1308        "internal",
1309        "Agent session closed",
1310        Some(event_metadata),
1311    );
1312    let _ = append_event(id, transcript_event);
1313    crate::llm::emit_live_agent_event_sync(&crate::agent_events::AgentEvent::SessionClosed {
1314        session_id: id.to_string(),
1315        reason,
1316        status,
1317        metadata,
1318    });
1319    close(id);
1320    true
1321}
1322
1323pub fn reset_transcript(id: &str) -> bool {
1324    SESSIONS.with(|s| {
1325        let mut map = s.borrow_mut();
1326        let Some(state) = map.get_mut(id) else {
1327            return false;
1328        };
1329        state.transcript = empty_transcript(id);
1330        state.tool_format = None;
1331        state.system_prompt = None;
1332        state.scratchpad = None;
1333        state.scratchpad_version = 0;
1334        state.last_transcript_budget_action = None;
1335        state.completed_turn_checkpoints.clear();
1336        state.redo_stack.clear();
1337        state.touch();
1338        true
1339    })
1340}
1341
1342/// Copy `src`'s transcript into a new session id. Subscribers are NOT
1343/// copied — a fork is a conversation branch, not an event fanout.
1344///
1345/// Touches `src`'s `last_accessed` before evicting, so the fork
1346/// operation itself can't make `src` look stale and kick it out of
1347/// the LRU just to make room for the new fork.
1348pub fn fork(src_id: &str, dst_id: Option<String>) -> Option<String> {
1349    let (
1350        src_transcript,
1351        src_tool_format,
1352        src_system_prompt,
1353        src_pinned_model,
1354        src_pinned_reasoning_policy,
1355        src_actor_chain,
1356        src_workspace_anchor,
1357        src_workspace_policy,
1358        src_scratchpad,
1359        src_scratchpad_version,
1360        src_transcript_budget_policy,
1361        src_last_transcript_budget_action,
1362        dst,
1363    ) = SESSIONS.with(|s| {
1364        let mut map = s.borrow_mut();
1365        let src = map.get_mut(src_id)?;
1366        src.touch();
1367        let dst = dst_id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
1368        let forked_transcript = clone_transcript_with_id(&src.transcript, &dst);
1369        Some((
1370            forked_transcript,
1371            src.tool_format.clone(),
1372            src.system_prompt.clone(),
1373            src.pinned_model.clone(),
1374            src.pinned_reasoning_policy.clone(),
1375            src.actor_chain.clone(),
1376            src.workspace_anchor.clone(),
1377            src.workspace_policy.clone(),
1378            src.scratchpad.clone(),
1379            src.scratchpad_version,
1380            src.transcript_budget_policy.clone(),
1381            src.last_transcript_budget_action.clone(),
1382            dst,
1383        ))
1384    })?;
1385    // Ensure cap is respected when inserting the fork.
1386    open_or_create(Some(dst.clone()));
1387    SESSIONS.with(|s| {
1388        let mut map = s.borrow_mut();
1389        if let Some(state) = map.get_mut(&dst) {
1390            state.transcript = src_transcript;
1391            state.tool_format = src_tool_format;
1392            state.system_prompt = src_system_prompt;
1393            state.pinned_model = src_pinned_model;
1394            state.pinned_reasoning_policy = src_pinned_reasoning_policy;
1395            state.actor_chain = src_actor_chain;
1396            state.workspace_anchor = src_workspace_anchor;
1397            state.workspace_policy = src_workspace_policy;
1398            state.scratchpad = src_scratchpad;
1399            state.scratchpad_version = src_scratchpad_version;
1400            state.transcript_budget_policy = src_transcript_budget_policy;
1401            state.last_transcript_budget_action = src_last_transcript_budget_action;
1402            state.touch();
1403        }
1404        update_lineage(&mut map, src_id, &dst, None);
1405    });
1406    let budget_ok = SESSIONS.with(|s| {
1407        let mut map = s.borrow_mut();
1408        let Some(state) = map.get_mut(&dst) else {
1409            return false;
1410        };
1411        let candidate = state.transcript.clone();
1412        apply_transcript_with_budget(state, candidate, "fork").is_ok()
1413    });
1414    if !budget_ok {
1415        close(&dst);
1416        return None;
1417    }
1418    // open_or_create evicts BEFORE inserting, so the dst slot is
1419    // guaranteed once we get here. The existence check is cheap
1420    // insurance against a future refactor that breaks that invariant.
1421    if exists(&dst) {
1422        Some(dst)
1423    } else {
1424        None
1425    }
1426}
1427
1428/// Fork `src_id` and truncate the destination transcript to the
1429/// first `keep_first` messages (#105 — branch-replay). Pairs with the
1430/// scrubber: the host picks an event index, rebuilds a message count,
1431/// and calls this to spawn a live sibling session that resumes from
1432/// the rebuilt state. Subscribers are not carried over (same as
1433/// `fork`), so sibling events don't double-fan into the parent's
1434/// consumers.
1435///
1436/// Returns the new session id on success, `None` if `src_id` doesn't
1437/// exist.
1438pub fn fork_at(src_id: &str, keep_first: usize, dst_id: Option<String>) -> Option<String> {
1439    let branched_at_event_index = SESSIONS.with(|s| {
1440        let map = s.borrow();
1441        let src = map.get(src_id)?;
1442        Some(branch_event_index(&src.transcript, keep_first))
1443    })?;
1444    let new_id = fork(src_id, dst_id)?;
1445    link_child_session_with_branch(src_id, &new_id, Some(branched_at_event_index));
1446    let _ = truncate(&new_id, keep_first);
1447    Some(new_id)
1448}
1449
1450/// Truncate the session transcript to the first `keep_first`
1451/// messages (opposite of `trim`, which keeps the last N). Returns
1452/// counts and the retained tip event id when the session exists.
1453pub fn truncate(id: &str, keep_first: usize) -> Option<SessionTruncateResult> {
1454    SESSIONS.with(|s| {
1455        let mut map = s.borrow_mut();
1456        let state = map.get_mut(id)?;
1457        let result = truncate_state(state, keep_first)?;
1458        Some(result)
1459    })
1460}
1461
1462fn truncate_state(state: &mut SessionState, keep_first: usize) -> Option<SessionTruncateResult> {
1463    let dict = state
1464        .transcript
1465        .as_dict()
1466        .cloned()
1467        .unwrap_or_else(crate::value::DictMap::new);
1468    let messages: Vec<VmValue> = match dict.get("messages") {
1469        Some(VmValue::List(list)) => list.iter().cloned().collect(),
1470        _ => Vec::new(),
1471    };
1472    let existing_events = match dict.get("events") {
1473        Some(VmValue::List(list)) => Some(list.iter().cloned().collect::<Vec<_>>()),
1474        _ => None,
1475    };
1476    let kept_turn_count = keep_first.min(messages.len());
1477    let removed_turn_count = messages.len().saturating_sub(kept_turn_count);
1478    let mut new_tip_turn_id = existing_events
1479        .as_ref()
1480        .map(|events| turn_event_id_for_count(events, kept_turn_count))
1481        .unwrap_or_else(|| {
1482            let events = crate::llm::helpers::transcript_events_from_messages(&messages);
1483            turn_event_id_for_count(&events, kept_turn_count)
1484        });
1485
1486    if removed_turn_count > 0 {
1487        let retained: Vec<VmValue> = messages.into_iter().take(kept_turn_count).collect();
1488        let retained_events = match existing_events {
1489            Some(events) => {
1490                let keep_event_count = event_prefix_len_for_messages(&events, kept_turn_count);
1491                events.into_iter().take(keep_event_count).collect()
1492            }
1493            None => crate::llm::helpers::transcript_events_from_messages(&retained),
1494        };
1495        new_tip_turn_id = turn_event_id_for_count(&retained_events, kept_turn_count);
1496        let mut next = dict;
1497        next.insert(
1498            crate::value::intern_key("events"),
1499            VmValue::List(std::sync::Arc::new(retained_events)),
1500        );
1501        next.insert(
1502            crate::value::intern_key("messages"),
1503            VmValue::List(std::sync::Arc::new(retained)),
1504        );
1505        next.remove("summary");
1506        apply_transcript_with_budget(state, VmValue::dict(next), "truncate").ok()?;
1507    }
1508    state.touch();
1509    Some(SessionTruncateResult {
1510        kept_turn_count,
1511        removed_turn_count,
1512        new_tip_turn_id,
1513    })
1514}
1515
1516/// Pop the trailing message iff it is an assistant message. Used by
1517/// `agent_step_judge` to remove a vetoed assistant turn before
1518/// regeneration (the "replace" on_veto path). Returns `true` if a
1519/// message was popped, `false` if the transcript was empty, and an
1520/// error if the trailing message was not an assistant turn —
1521/// signalling a call-site discipline bug rather than a runtime error.
1522pub fn pop_last_if_assistant(id: &str) -> Result<bool, String> {
1523    SESSIONS.with(|s| {
1524        let mut map = s.borrow_mut();
1525        let Some(state) = map.get_mut(id) else {
1526            return Err(format!(
1527                "pop_last_if_assistant: unknown session id '{id}'"
1528            ));
1529        };
1530        let messages: Vec<VmValue> = match state.transcript.as_dict() {
1531            Some(dict) => match dict.get("messages") {
1532                Some(VmValue::List(list)) => list.iter().cloned().collect(),
1533                _ => Vec::new(),
1534            },
1535            None => Vec::new(),
1536        };
1537        if messages.is_empty() {
1538            return Ok(false);
1539        }
1540        let trailing_role = messages
1541            .last()
1542            .and_then(|m| m.as_dict())
1543            .and_then(|d| d.get("role"))
1544            .map(|v| v.display())
1545            .unwrap_or_default();
1546        if trailing_role != "assistant" {
1547            return Err(format!(
1548                "pop_last_if_assistant: trailing message role is '{trailing_role}', expected 'assistant'"
1549            ));
1550        }
1551        let keep = messages.len() - 1;
1552        truncate_state(state, keep);
1553        Ok(true)
1554    })
1555}
1556
1557pub fn trim(id: &str, keep_last: usize) -> Option<usize> {
1558    SESSIONS.with(|s| {
1559        let mut map = s.borrow_mut();
1560        let state = map.get_mut(id)?;
1561        let dict = state.transcript.as_dict()?.clone();
1562        let messages: Vec<VmValue> = match dict.get("messages") {
1563            Some(VmValue::List(list)) => list.iter().cloned().collect(),
1564            _ => Vec::new(),
1565        };
1566        let start = messages.len().saturating_sub(keep_last);
1567        let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
1568        let kept = retained.len();
1569        let mut next = dict;
1570        next.insert(
1571            crate::value::intern_key("events"),
1572            VmValue::List(std::sync::Arc::new(
1573                crate::llm::helpers::transcript_events_from_messages(&retained),
1574            )),
1575        );
1576        next.insert(
1577            crate::value::intern_key("messages"),
1578            VmValue::List(std::sync::Arc::new(retained)),
1579        );
1580        apply_transcript_with_budget(state, VmValue::dict(next), "trim").ok()?;
1581        Some(kept)
1582    })
1583}
1584
1585/// Append a message dict to the session transcript. The message must
1586/// have at least a string `role`; anything else is merged verbatim.
1587pub fn inject_message(id: &str, message: VmValue) -> Result<(), String> {
1588    let Some(msg_dict) = message.as_dict().cloned() else {
1589        return Err("agent_session_inject: message must be a dict".into());
1590    };
1591    let role_ok = matches!(msg_dict.get("role"), Some(VmValue::String(_)));
1592    if !role_ok {
1593        return Err(
1594            "agent_session_inject: message must have a string `role` (user|assistant|tool_result|system)"
1595                .into(),
1596        );
1597    }
1598    SESSIONS.with(|s| {
1599        let mut map = s.borrow_mut();
1600        let Some(state) = map.get_mut(id) else {
1601            return Err(format!("agent_session_inject: unknown session id '{id}'"));
1602        };
1603        let dict = state
1604            .transcript
1605            .as_dict()
1606            .cloned()
1607            .unwrap_or_else(crate::value::DictMap::new);
1608        let mut messages: Vec<VmValue> = match dict.get("messages") {
1609            Some(VmValue::List(list)) => list.iter().cloned().collect(),
1610            _ => Vec::new(),
1611        };
1612        let mut events: Vec<VmValue> = match dict.get("events") {
1613            Some(VmValue::List(list)) => list.iter().cloned().collect(),
1614            _ => crate::llm::helpers::transcript_events_from_messages(&messages),
1615        };
1616        let new_message = VmValue::dict(msg_dict);
1617        let message_index = messages.len();
1618        events.push(crate::llm::helpers::transcript_event_from_message(
1619            &new_message,
1620        ));
1621        messages.push(new_message);
1622        let mut next = dict;
1623        next.insert(
1624            crate::value::intern_key("events"),
1625            VmValue::List(std::sync::Arc::new(events)),
1626        );
1627        next.insert(
1628            crate::value::intern_key("messages"),
1629            VmValue::List(std::sync::Arc::new(messages)),
1630        );
1631        let persisted_message = next
1632            .get("messages")
1633            .and_then(|value| match value {
1634                VmValue::List(list) => list.get(message_index).cloned(),
1635                _ => None,
1636            })
1637            .unwrap_or(VmValue::Nil);
1638        apply_transcript_with_budget(state, VmValue::dict(next), "inject_message")?;
1639        emit_identified_user_message_event(id, &persisted_message);
1640        emit_llm_message_event(id, message_index, &persisted_message);
1641        Ok(())
1642    })
1643}
1644
1645fn emit_identified_user_message_event(session_id: &str, message: &VmValue) {
1646    let message_json = crate::llm::helpers::vm_value_to_json(message);
1647    let role = message_json.get("role").and_then(|value| value.as_str());
1648    if role != Some("user") {
1649        return;
1650    }
1651    let Some(message_id) = message_json
1652        .get("messageId")
1653        .or_else(|| message_json.get("message_id"))
1654        .and_then(|value| value.as_str())
1655        .filter(|value| !value.trim().is_empty())
1656    else {
1657        return;
1658    };
1659    let content = message_json
1660        .get("content")
1661        .map(user_message_content_blocks)
1662        .unwrap_or_default();
1663    crate::agent_events::emit_event(&crate::agent_events::AgentEvent::UserMessage {
1664        session_id: session_id.to_string(),
1665        message_id: message_id.to_string(),
1666        content,
1667    });
1668}
1669
1670fn user_message_content_blocks(content: &serde_json::Value) -> Vec<serde_json::Value> {
1671    match content {
1672        serde_json::Value::Array(items) => items.clone(),
1673        serde_json::Value::String(text) => vec![serde_json::json!({
1674            "type": "text",
1675            "text": text,
1676        })],
1677        other => vec![serde_json::json!({
1678            "type": "text",
1679            "text": other.to_string(),
1680        })],
1681    }
1682}
1683
1684#[derive(Clone, Debug, PartialEq, Eq)]
1685struct TranscriptBudgetUsage {
1686    message_count: usize,
1687    event_count: usize,
1688    approx_bytes: Option<usize>,
1689}
1690
1691fn transcript_messages_from_dict(dict: &crate::value::DictMap) -> Vec<VmValue> {
1692    match dict.get("messages") {
1693        Some(VmValue::List(list)) => list.iter().cloned().collect(),
1694        _ => Vec::new(),
1695    }
1696}
1697
1698fn transcript_message_count(transcript: &VmValue) -> usize {
1699    transcript
1700        .as_dict()
1701        .map(transcript_messages_from_dict)
1702        .map(|messages| messages.len())
1703        .unwrap_or(0)
1704}
1705
1706fn transcript_events_from_dict(dict: &crate::value::DictMap) -> Vec<VmValue> {
1707    match dict.get("events") {
1708        Some(VmValue::List(list)) => list.iter().cloned().collect(),
1709        _ => {
1710            let messages = transcript_messages_from_dict(dict);
1711            crate::llm::helpers::transcript_events_from_messages(&messages)
1712        }
1713    }
1714}
1715
1716fn transcript_usage(transcript: &VmValue, include_bytes: bool) -> TranscriptBudgetUsage {
1717    let Some(dict) = transcript.as_dict() else {
1718        return TranscriptBudgetUsage {
1719            message_count: 0,
1720            event_count: 0,
1721            approx_bytes: include_bytes.then_some(0),
1722        };
1723    };
1724    let approx_bytes = if include_bytes {
1725        serde_json::to_vec(&crate::llm::helpers::vm_value_to_json(transcript))
1726            .map(|bytes| bytes.len())
1727            .ok()
1728            .or(Some(usize::MAX))
1729    } else {
1730        None
1731    };
1732    TranscriptBudgetUsage {
1733        message_count: transcript_messages_from_dict(dict).len(),
1734        event_count: transcript_events_from_dict(dict).len(),
1735        approx_bytes,
1736    }
1737}
1738
1739fn transcript_budget_exceeded_reason(
1740    usage: &TranscriptBudgetUsage,
1741    policy: &SessionTranscriptBudgetPolicy,
1742) -> Option<&'static str> {
1743    if usage.message_count > policy.max_messages {
1744        return Some("message_count");
1745    }
1746    if usage.event_count > policy.max_events {
1747        return Some("event_count");
1748    }
1749    if let (Some(bytes), Some(limit)) = (usage.approx_bytes, policy.max_approx_bytes) {
1750        if bytes > limit {
1751            return Some("approx_bytes");
1752        }
1753    }
1754    None
1755}
1756
1757fn transcript_budget_usage_json(usage: &TranscriptBudgetUsage) -> serde_json::Value {
1758    serde_json::json!({
1759        "messages": usage.message_count,
1760        "events": usage.event_count,
1761        "approx_bytes": usage.approx_bytes,
1762    })
1763}
1764
1765fn transcript_budget_policy_json(policy: &SessionTranscriptBudgetPolicy) -> serde_json::Value {
1766    let recovery = match &policy.recovery {
1767        TranscriptBudgetRecovery::Reject => serde_json::json!({"action": "reject"}),
1768        TranscriptBudgetRecovery::Trim { keep_last } => {
1769            serde_json::json!({"action": "trim", "keep_last": keep_last})
1770        }
1771        TranscriptBudgetRecovery::Compact { keep_last } => {
1772            serde_json::json!({"action": "compact", "keep_last": keep_last})
1773        }
1774    };
1775    serde_json::json!({
1776        "max_messages": policy.max_messages,
1777        "max_events": policy.max_events,
1778        "max_approx_bytes": policy.max_approx_bytes,
1779        "recovery": recovery,
1780    })
1781}
1782
1783fn transcript_budget_recovery_name(recovery: &TranscriptBudgetRecovery) -> &'static str {
1784    match recovery {
1785        TranscriptBudgetRecovery::Reject => "reject",
1786        TranscriptBudgetRecovery::Trim { .. } => "trim",
1787        TranscriptBudgetRecovery::Compact { .. } => "compact",
1788    }
1789}
1790
1791fn transcript_budget_error(
1792    state: &SessionState,
1793    policy: &SessionTranscriptBudgetPolicy,
1794    usage: &TranscriptBudgetUsage,
1795    reason: &str,
1796) -> String {
1797    let byte_suffix = match (usage.approx_bytes, policy.max_approx_bytes) {
1798        (Some(bytes), Some(limit)) => format!(", approx_bytes {bytes}/{limit}"),
1799        _ => String::new(),
1800    };
1801    format!(
1802        "transcript budget exceeded for session '{}': {reason} (messages {}/{}, events {}/{}{}; recovery={})",
1803        state.id,
1804        usage.message_count,
1805        policy.max_messages,
1806        usage.event_count,
1807        policy.max_events,
1808        byte_suffix,
1809        transcript_budget_recovery_name(&policy.recovery),
1810    )
1811}
1812
1813fn transcript_budget_audit_json(
1814    action: &str,
1815    source: &str,
1816    reason: &str,
1817    policy: &SessionTranscriptBudgetPolicy,
1818    usage_before: &TranscriptBudgetUsage,
1819    usage_attempted: &TranscriptBudgetUsage,
1820    usage_after: &TranscriptBudgetUsage,
1821) -> serde_json::Value {
1822    serde_json::json!({
1823        "action": action,
1824        "source": source,
1825        "reason": reason,
1826        "policy": transcript_budget_policy_json(policy),
1827        "usage_before": transcript_budget_usage_json(usage_before),
1828        "usage_attempted": transcript_budget_usage_json(usage_attempted),
1829        "usage_after": transcript_budget_usage_json(usage_after),
1830        "removed_messages": usage_attempted.message_count.saturating_sub(usage_after.message_count),
1831        "removed_events": usage_attempted.event_count.saturating_sub(usage_after.event_count),
1832    })
1833}
1834
1835fn transcript_budget_event(audit: &serde_json::Value) -> VmValue {
1836    let action = audit
1837        .get("action")
1838        .and_then(serde_json::Value::as_str)
1839        .unwrap_or("enforced");
1840    crate::llm::helpers::transcript_event(
1841        "transcript_budget",
1842        "system",
1843        "internal",
1844        &format!("Transcript budget {action}."),
1845        Some(audit.clone()),
1846    )
1847}
1848
1849fn append_event_to_transcript(transcript: VmValue, event: VmValue) -> VmValue {
1850    let Some(dict) = transcript.as_dict() else {
1851        return transcript;
1852    };
1853    let mut next = dict.clone();
1854    let mut events = transcript_events_from_dict(&next);
1855    events.push(event);
1856    next.insert(
1857        crate::value::intern_key("events"),
1858        VmValue::List(std::sync::Arc::new(events)),
1859    );
1860    VmValue::dict(next)
1861}
1862
1863fn tail_message_capacity(
1864    policy: &SessionTranscriptBudgetPolicy,
1865    reserve_audit_event: bool,
1866) -> usize {
1867    let event_capacity = tail_event_capacity(policy, usize::from(reserve_audit_event));
1868    policy.max_messages.min(event_capacity)
1869}
1870
1871fn tail_event_capacity(policy: &SessionTranscriptBudgetPolicy, reserved_events: usize) -> usize {
1872    policy.max_events.saturating_sub(reserved_events)
1873}
1874
1875fn trim_transcript_for_budget(
1876    transcript: &VmValue,
1877    policy: &SessionTranscriptBudgetPolicy,
1878    keep_last: usize,
1879) -> VmValue {
1880    let dict = transcript
1881        .as_dict()
1882        .cloned()
1883        .unwrap_or_else(crate::value::DictMap::new);
1884    let messages = transcript_messages_from_dict(&dict);
1885    let keep = keep_last.min(tail_message_capacity(policy, true));
1886    let start = messages.len().saturating_sub(keep);
1887    let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
1888    let mut next = dict;
1889    next.insert(
1890        crate::value::intern_key("events"),
1891        VmValue::List(std::sync::Arc::new(
1892            crate::llm::helpers::transcript_events_from_messages(&retained),
1893        )),
1894    );
1895    next.insert(
1896        crate::value::intern_key("messages"),
1897        VmValue::List(std::sync::Arc::new(retained)),
1898    );
1899    next.remove("summary");
1900    VmValue::dict(next)
1901}
1902
1903struct BudgetCompactionLiveEvent {
1904    policy: crate::orchestration::CompactionPolicy,
1905    policy_strategy: String,
1906    metrics: crate::orchestration::TranscriptCompactedEventMetrics,
1907}
1908
1909struct BudgetCompactionResult {
1910    transcript: VmValue,
1911    live_event: Option<BudgetCompactionLiveEvent>,
1912}
1913
1914fn compact_transcript_for_budget(
1915    transcript: &VmValue,
1916    policy: &SessionTranscriptBudgetPolicy,
1917    keep_last: usize,
1918    session_id: &str,
1919) -> BudgetCompactionResult {
1920    let dict = transcript
1921        .as_dict()
1922        .cloned()
1923        .unwrap_or_else(crate::value::DictMap::new);
1924    let messages = transcript_messages_from_dict(&dict);
1925    let message_capacity = policy.max_messages.min(tail_event_capacity(policy, 2));
1926    // Auto-compaction may widen a suffix to start on a clean user-turn boundary,
1927    // so reserve one extra slot beyond the summary when sizing for hard caps.
1928    let tail_keep = keep_last.min(message_capacity.saturating_sub(2));
1929    let mut config = crate::orchestration::AutoCompactConfig {
1930        token_threshold: 0,
1931        keep_last: tail_keep,
1932        compact_strategy: crate::orchestration::CompactStrategy::Llm,
1933        hard_limit_strategy: crate::orchestration::CompactStrategy::Truncate,
1934        fallback_strategy: Some(crate::orchestration::CompactStrategy::Truncate),
1935        policy_strategy: crate::orchestration::compact_strategy_name(
1936            &crate::orchestration::CompactStrategy::Llm,
1937        )
1938        .to_string(),
1939        ..Default::default()
1940    };
1941
1942    let mut json_messages = messages
1943        .iter()
1944        .map(crate::llm::helpers::vm_value_to_json)
1945        .collect::<Vec<_>>();
1946    let lifecycle =
1947        crate::orchestration::CompactLifecycle::new(crate::orchestration::CompactMode::Auto)
1948            .with_session_id(Some(session_id))
1949            .with_trigger(crate::orchestration::CompactionTrigger::BudgetPressure)
1950            .with_hook_dispatch(false)
1951            .with_evaluate_providers(false);
1952    let llm_opts = crate::llm::extract_llm_options(&[
1953        VmValue::String(arcstr::ArcStr::from("")),
1954        VmValue::Nil,
1955        VmValue::Nil,
1956    ])
1957    .ok();
1958    let outcome = futures::executor::block_on(crate::orchestration::run_compaction_lifecycle(
1959        &mut json_messages,
1960        &mut config,
1961        llm_opts.as_ref(),
1962        lifecycle,
1963    ))
1964    .ok()
1965    .flatten();
1966
1967    let retained = json_messages
1968        .iter()
1969        .map(crate::stdlib::json_to_vm_value)
1970        .collect::<Vec<_>>();
1971    let mut events = crate::llm::helpers::transcript_events_from_messages(&retained);
1972    let summary = outcome.as_ref().map(|outcome| outcome.summary.clone());
1973    let mut live_event = None;
1974    if let Some(outcome) = outcome {
1975        events.push(crate::llm::helpers::transcript_event(
1976            "compaction",
1977            "system",
1978            "internal",
1979            "",
1980            Some(outcome.event_metadata.clone()),
1981        ));
1982        live_event = Some(BudgetCompactionLiveEvent {
1983            policy: config.policy.clone(),
1984            policy_strategy: outcome.policy_strategy,
1985            metrics: crate::orchestration::TranscriptCompactedEventMetrics {
1986                archived_messages: outcome.archived_messages,
1987                estimated_tokens_before: outcome.estimated_tokens_before,
1988                estimated_tokens_after: outcome.estimated_tokens_after,
1989                snapshot_asset_id: outcome.snapshot_asset_id,
1990            },
1991        });
1992    }
1993
1994    let mut next = dict;
1995    next.insert(
1996        crate::value::intern_key("events"),
1997        VmValue::List(std::sync::Arc::new(events)),
1998    );
1999    next.insert(
2000        crate::value::intern_key("messages"),
2001        VmValue::List(std::sync::Arc::new(retained)),
2002    );
2003    if let Some(summary) = summary {
2004        next.put_str("summary", summary);
2005    } else {
2006        next.remove("summary");
2007    }
2008    BudgetCompactionResult {
2009        transcript: VmValue::dict(next),
2010        live_event,
2011    }
2012}
2013
2014fn recovered_transcript_with_audit(
2015    recovered: VmValue,
2016    action: &str,
2017    source: &str,
2018    reason: &str,
2019    policy: &SessionTranscriptBudgetPolicy,
2020    usage_before: &TranscriptBudgetUsage,
2021    usage_attempted: &TranscriptBudgetUsage,
2022    include_bytes: bool,
2023) -> (VmValue, serde_json::Value, TranscriptBudgetUsage) {
2024    let usage_after_without_audit = transcript_usage(&recovered, include_bytes);
2025    let initial_audit = transcript_budget_audit_json(
2026        action,
2027        source,
2028        reason,
2029        policy,
2030        usage_before,
2031        usage_attempted,
2032        &usage_after_without_audit,
2033    );
2034    let with_initial_audit =
2035        append_event_to_transcript(recovered.clone(), transcript_budget_event(&initial_audit));
2036    let usage_after = transcript_usage(&with_initial_audit, include_bytes);
2037    let audit = transcript_budget_audit_json(
2038        action,
2039        source,
2040        reason,
2041        policy,
2042        usage_before,
2043        usage_attempted,
2044        &usage_after,
2045    );
2046    let with_audit = append_event_to_transcript(recovered, transcript_budget_event(&audit));
2047    let usage_after = transcript_usage(&with_audit, include_bytes);
2048    (with_audit, audit, usage_after)
2049}
2050
2051fn apply_transcript_with_budget(
2052    state: &mut SessionState,
2053    candidate: VmValue,
2054    source: &str,
2055) -> Result<(), String> {
2056    let policy = state.transcript_budget_policy.normalized();
2057    let include_bytes = policy.max_approx_bytes.is_some();
2058    let usage_before = transcript_usage(&state.transcript, include_bytes);
2059    let usage_attempted = transcript_usage(&candidate, include_bytes);
2060    let Some(reason) = transcript_budget_exceeded_reason(&usage_attempted, &policy) else {
2061        state.replace_transcript(candidate);
2062        return Ok(());
2063    };
2064
2065    match policy.recovery.clone() {
2066        TranscriptBudgetRecovery::Reject => {
2067            let audit = transcript_budget_audit_json(
2068                "rejected",
2069                source,
2070                reason,
2071                &policy,
2072                &usage_before,
2073                &usage_attempted,
2074                &usage_before,
2075            );
2076            state.last_transcript_budget_action = Some(audit);
2077            Err(transcript_budget_error(
2078                state,
2079                &policy,
2080                &usage_attempted,
2081                reason,
2082            ))
2083        }
2084        TranscriptBudgetRecovery::Trim { keep_last } => {
2085            let recovered = trim_transcript_for_budget(&candidate, &policy, keep_last);
2086            let (with_audit, audit, usage_after) = recovered_transcript_with_audit(
2087                recovered,
2088                "trimmed",
2089                source,
2090                reason,
2091                &policy,
2092                &usage_before,
2093                &usage_attempted,
2094                include_bytes,
2095            );
2096            if transcript_budget_exceeded_reason(&usage_after, &policy).is_some() {
2097                let rejected = transcript_budget_audit_json(
2098                    "rejected",
2099                    source,
2100                    reason,
2101                    &policy,
2102                    &usage_before,
2103                    &usage_attempted,
2104                    &usage_after,
2105                );
2106                state.last_transcript_budget_action = Some(rejected);
2107                return Err(transcript_budget_error(
2108                    state,
2109                    &policy,
2110                    &usage_after,
2111                    reason,
2112                ));
2113            }
2114            state.last_transcript_budget_action = Some(audit);
2115            state.replace_transcript(with_audit);
2116            Ok(())
2117        }
2118        TranscriptBudgetRecovery::Compact { keep_last } => {
2119            let compacted =
2120                compact_transcript_for_budget(&candidate, &policy, keep_last, &state.id);
2121            let (with_audit, audit, usage_after) = recovered_transcript_with_audit(
2122                compacted.transcript,
2123                "compacted",
2124                source,
2125                reason,
2126                &policy,
2127                &usage_before,
2128                &usage_attempted,
2129                include_bytes,
2130            );
2131            if transcript_budget_exceeded_reason(&usage_after, &policy).is_some() {
2132                let rejected = transcript_budget_audit_json(
2133                    "rejected",
2134                    source,
2135                    reason,
2136                    &policy,
2137                    &usage_before,
2138                    &usage_attempted,
2139                    &usage_after,
2140                );
2141                state.last_transcript_budget_action = Some(rejected);
2142                return Err(transcript_budget_error(
2143                    state,
2144                    &policy,
2145                    &usage_after,
2146                    reason,
2147                ));
2148            }
2149            state.last_transcript_budget_action = Some(audit);
2150            state.replace_transcript(with_audit);
2151            if let Some(event) = compacted.live_event {
2152                crate::orchestration::emit_transcript_compacted_event_sync(
2153                    &state.id,
2154                    crate::orchestration::CompactMode::Auto,
2155                    crate::orchestration::CompactionTrigger::BudgetPressure
2156                        .as_str()
2157                        .to_string(),
2158                    &event.policy,
2159                    event.policy_strategy,
2160                    event.metrics,
2161                );
2162            }
2163            Ok(())
2164        }
2165    }
2166}
2167
2168fn emit_llm_message_event(session_id: &str, message_index: usize, message: &VmValue) {
2169    let mut fields = serde_json::Map::new();
2170    fields.insert(
2171        "session_id".to_string(),
2172        serde_json::Value::String(session_id.to_string()),
2173    );
2174    fields.insert(
2175        "message_index".to_string(),
2176        serde_json::json!(message_index),
2177    );
2178    let message_json = crate::llm::helpers::vm_value_to_json(message);
2179    if let Some(role) = message_json.get("role").and_then(|value| value.as_str()) {
2180        fields.insert(
2181            "role".to_string(),
2182            serde_json::Value::String(role.to_string()),
2183        );
2184    }
2185    if let Some(content) = message_json.get("content") {
2186        fields.insert("content".to_string(), content.clone());
2187    }
2188    fields.insert("message".to_string(), message_json);
2189    crate::llm::append_observability_sidecar_entry("message", fields);
2190}
2191
2192/// Create a new session from a reconstructed message list.
2193///
2194/// This is intentionally an all-at-once write instead of repeated
2195/// `inject_message` calls: importing a transcript should not re-emit
2196/// each historic turn into the active observability sidecar.
2197pub fn seed_from_messages(
2198    id: Option<String>,
2199    messages: &[serde_json::Value],
2200    metadata: serde_json::Value,
2201    system_prompt: Option<String>,
2202    tool_format: Option<String>,
2203) -> Result<String, String> {
2204    let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
2205    if exists(&resolved) {
2206        return Err(format!("agent session '{resolved}' already exists"));
2207    }
2208    open_or_create(Some(resolved.clone()));
2209    SESSIONS.with(|s| {
2210        let mut map = s.borrow_mut();
2211        let Some(state) = map.get_mut(&resolved) else {
2212            return Err(format!("failed to create agent session '{resolved}'"));
2213        };
2214        state.tool_format = tool_format.filter(|value| !value.trim().is_empty());
2215        state.system_prompt = system_prompt.filter(|value| !value.trim().is_empty());
2216
2217        let mut metadata = metadata
2218            .as_object()
2219            .cloned()
2220            .unwrap_or_else(serde_json::Map::new);
2221        if let Some(tool_format) = state.tool_format.as_ref() {
2222            metadata.insert(
2223                "tool_format".to_string(),
2224                serde_json::Value::String(tool_format.clone()),
2225            );
2226            metadata.insert(
2227                "tool_mode_locked".to_string(),
2228                serde_json::Value::Bool(true),
2229            );
2230        }
2231        if let Some(system_prompt) = state.system_prompt.as_ref() {
2232            metadata.insert(
2233                "system_prompt".to_string(),
2234                crate::llm::helpers::system_prompt_metadata(system_prompt),
2235            );
2236        }
2237        let vm_messages = crate::llm::helpers::json_messages_to_vm(messages);
2238        let candidate = crate::llm::helpers::new_transcript_with(
2239            Some(resolved.clone()),
2240            vm_messages,
2241            None,
2242            Some(crate::stdlib::json_to_vm_value(&serde_json::Value::Object(
2243                metadata,
2244            ))),
2245        );
2246        apply_transcript_with_budget(state, candidate, "seed_from_messages")?;
2247        Ok(resolved)
2248    })
2249}
2250
2251/// Load the messages vec (as JSON) for this session, for use as prefix
2252/// to an agent_loop run. Returns an empty vec if the session doesn't
2253/// exist or has no messages.
2254pub fn messages_json(id: &str) -> Vec<serde_json::Value> {
2255    SESSIONS.with(|s| {
2256        let map = s.borrow();
2257        let Some(state) = map.get(id) else {
2258            return Vec::new();
2259        };
2260        let Some(dict) = state.transcript.as_dict() else {
2261            return Vec::new();
2262        };
2263        match dict.get("messages") {
2264            Some(VmValue::List(list)) => list
2265                .iter()
2266                .map(crate::llm::helpers::vm_value_to_json)
2267                .collect(),
2268            _ => Vec::new(),
2269        }
2270    })
2271}
2272
2273#[derive(Clone, Debug, Default)]
2274pub struct SessionPromptState {
2275    pub messages: Vec<serde_json::Value>,
2276    pub summary: Option<String>,
2277}
2278
2279fn summary_message_json(summary: &str) -> serde_json::Value {
2280    serde_json::json!({
2281        "role": "user",
2282        "content": summary,
2283    })
2284}
2285
2286fn messages_begin_with_summary(messages: &[serde_json::Value], summary: &str) -> bool {
2287    messages.first().is_some_and(|message| {
2288        message.get("role").and_then(|value| value.as_str()) == Some("user")
2289            && message.get("content").and_then(|value| value.as_str()) == Some(summary)
2290    })
2291}
2292
2293/// Prompt-surface resume state for a persisted session.
2294///
2295/// Returns the compacted/rehydratable message list plus the transcript's
2296/// summary field. When the transcript carries a summary field but its
2297/// message list does not already begin with the compacted summary
2298/// message, this helper prepends one so session re-entry preserves the
2299/// same prompt surface the previous loop was actually using.
2300pub fn prompt_state_json(id: &str) -> SessionPromptState {
2301    SESSIONS.with(|s| {
2302        let map = s.borrow();
2303        let Some(state) = map.get(id) else {
2304            return SessionPromptState::default();
2305        };
2306        let Some(dict) = state.transcript.as_dict() else {
2307            return SessionPromptState::default();
2308        };
2309        let mut messages = match dict.get("messages") {
2310            Some(VmValue::List(list)) => list
2311                .iter()
2312                .map(crate::llm::helpers::vm_value_to_json)
2313                .collect::<Vec<_>>(),
2314            _ => Vec::new(),
2315        };
2316        let summary = dict.get("summary").and_then(|value| match value {
2317            VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
2318            _ => None,
2319        });
2320        if let Some(summary_text) = summary.as_deref() {
2321            if !messages_begin_with_summary(&messages, summary_text) {
2322                messages.insert(0, summary_message_json(summary_text));
2323            }
2324        }
2325        SessionPromptState { messages, summary }
2326    })
2327}
2328
2329/// Overwrite the transcript for this session. Used by `agent_loop` on
2330/// exit to persist the synthesized transcript.
2331pub fn store_transcript(id: &str, transcript: VmValue) -> Result<(), String> {
2332    SESSIONS.with(|s| {
2333        let mut map = s.borrow_mut();
2334        let Some(state) = map.get_mut(id) else {
2335            return Err(format!(
2336                "agent_session_store_transcript: unknown session id '{id}'"
2337            ));
2338        };
2339        let transcript = transcript_with_session_metadata(transcript, state);
2340        apply_transcript_with_budget(state, transcript, "store_transcript")?;
2341        Ok(())
2342    })
2343}
2344
2345fn checkpoint_summary(checkpoint: &SessionTurnCheckpoint) -> SessionCheckpointSummary {
2346    SessionCheckpointSummary {
2347        checkpoint_id: checkpoint.checkpoint_id.clone(),
2348        before_message_count: checkpoint.before_message_count,
2349        after_message_count: checkpoint.after_message_count,
2350        fs_snapshot_ids: checkpoint.fs_snapshot_ids.clone(),
2351    }
2352}
2353
2354fn checkpoint_error_status(error: SessionCheckpointError) -> &'static str {
2355    match error {
2356        SessionCheckpointError::UnknownSession => "unknown_session",
2357        SessionCheckpointError::NoCheckpoint => "no_checkpoint",
2358        SessionCheckpointError::NoRedo => "no_redo",
2359    }
2360}
2361
2362pub fn checkpoint_status_name(error: SessionCheckpointError) -> &'static str {
2363    checkpoint_error_status(error)
2364}
2365
2366/// Clear redo checkpoints after host-side workspace mutations that are not part
2367/// of the redo flow. Returns whether any redo state was discarded.
2368pub fn invalidate_redo(id: &str) -> bool {
2369    SESSIONS.with(|s| {
2370        let mut map = s.borrow_mut();
2371        let Some(state) = map.get_mut(id) else {
2372            return false;
2373        };
2374        let had_redo = !state.redo_stack.is_empty();
2375        state.redo_stack.clear();
2376        state.touch();
2377        had_redo
2378    })
2379}
2380
2381/// Record a completed prompt turn boundary.
2382///
2383/// `before_transcript` must be captured immediately before the user turn
2384/// starts. The current live transcript becomes the redo target, and optional
2385/// `fs_snapshot_ids` name host-owned filesystem snapshots captured during the
2386/// turn. Harn owns the transcript stack; hosts own concrete file restoration.
2387pub fn record_completed_turn_checkpoint(
2388    id: &str,
2389    before_transcript: VmValue,
2390    fs_snapshot_ids: Vec<String>,
2391) -> Result<Option<SessionCheckpointSummary>, SessionCheckpointError> {
2392    SESSIONS.with(|s| {
2393        let mut map = s.borrow_mut();
2394        let Some(state) = map.get_mut(id) else {
2395            return Err(SessionCheckpointError::UnknownSession);
2396        };
2397        let after_transcript = transcript_with_session_metadata(state.transcript.clone(), state);
2398        let before_message_count = transcript_message_count(&before_transcript);
2399        let after_message_count = transcript_message_count(&after_transcript);
2400        if crate::values_equal(&before_transcript, &after_transcript) && fs_snapshot_ids.is_empty()
2401        {
2402            return Ok(None);
2403        }
2404        let checkpoint = SessionTurnCheckpoint {
2405            checkpoint_id: format!("turn_{}", uuid::Uuid::now_v7().simple()),
2406            completed_at: crate::orchestration::now_rfc3339(),
2407            before_message_count,
2408            after_message_count,
2409            before_transcript,
2410            after_transcript,
2411            fs_snapshot_ids,
2412        };
2413        state.redo_stack.clear();
2414        state.completed_turn_checkpoints.push(checkpoint.clone());
2415        state.touch();
2416        Ok(Some(checkpoint_summary(&checkpoint)))
2417    })
2418}
2419
2420pub fn rollback_plan(id: &str) -> Result<SessionCheckpointSummary, SessionCheckpointError> {
2421    SESSIONS.with(|s| {
2422        let map = s.borrow();
2423        let Some(state) = map.get(id) else {
2424            return Err(SessionCheckpointError::UnknownSession);
2425        };
2426        state
2427            .completed_turn_checkpoints
2428            .last()
2429            .map(checkpoint_summary)
2430            .ok_or(SessionCheckpointError::NoCheckpoint)
2431    })
2432}
2433
2434pub fn redo_plan(id: &str) -> Result<SessionCheckpointSummary, SessionCheckpointError> {
2435    SESSIONS.with(|s| {
2436        let map = s.borrow();
2437        let Some(state) = map.get(id) else {
2438            return Err(SessionCheckpointError::UnknownSession);
2439        };
2440        state
2441            .redo_stack
2442            .last()
2443            .map(|entry| {
2444                let mut summary = checkpoint_summary(&entry.checkpoint);
2445                summary.fs_snapshot_ids = entry.redo_fs_snapshot_ids.clone();
2446                summary
2447            })
2448            .ok_or(SessionCheckpointError::NoRedo)
2449    })
2450}
2451
2452pub fn rollback_last_completed_turn(
2453    id: &str,
2454    redo_fs_snapshot_ids: Vec<String>,
2455) -> Result<SessionCheckpointOutcome, SessionCheckpointError> {
2456    SESSIONS.with(|s| {
2457        let mut map = s.borrow_mut();
2458        let Some(state) = map.get_mut(id) else {
2459            return Err(SessionCheckpointError::UnknownSession);
2460        };
2461        let Some(checkpoint) = state.completed_turn_checkpoints.pop() else {
2462            return Err(SessionCheckpointError::NoCheckpoint);
2463        };
2464        state.transcript = checkpoint.before_transcript.clone();
2465        state.redo_stack.push(SessionRedoEntry {
2466            checkpoint: checkpoint.clone(),
2467            redo_fs_snapshot_ids: redo_fs_snapshot_ids.clone(),
2468        });
2469        state.touch();
2470        Ok(SessionCheckpointOutcome {
2471            status: "rolled_back",
2472            checkpoint: checkpoint_summary(&checkpoint),
2473            redo_fs_snapshot_ids,
2474        })
2475    })
2476}
2477
2478pub fn redo_last_rollback(id: &str) -> Result<SessionCheckpointOutcome, SessionCheckpointError> {
2479    SESSIONS.with(|s| {
2480        let mut map = s.borrow_mut();
2481        let Some(state) = map.get_mut(id) else {
2482            return Err(SessionCheckpointError::UnknownSession);
2483        };
2484        let Some(entry) = state.redo_stack.pop() else {
2485            return Err(SessionCheckpointError::NoRedo);
2486        };
2487        let checkpoint = entry.checkpoint;
2488        state.transcript = checkpoint.after_transcript.clone();
2489        state.completed_turn_checkpoints.push(checkpoint.clone());
2490        state.touch();
2491        Ok(SessionCheckpointOutcome {
2492            status: "redone",
2493            checkpoint: checkpoint_summary(&checkpoint),
2494            redo_fs_snapshot_ids: entry.redo_fs_snapshot_ids,
2495        })
2496    })
2497}
2498
2499/// Remove malformed reminder events after their drop audit has been emitted.
2500/// Pending-reminder rendering scans the transcript on every LLM call; pruning
2501/// invalid entries makes the drop event one-shot instead of noisy per turn.
2502pub fn prune_invalid_reminder_events(id: &str) -> usize {
2503    SESSIONS.with(|s| {
2504        let mut map = s.borrow_mut();
2505        let Some(state) = map.get_mut(id) else {
2506            return 0;
2507        };
2508        let Some(dict) = state.transcript.as_dict().cloned() else {
2509            return 0;
2510        };
2511        let Some(VmValue::List(events)) = dict.get("events") else {
2512            return 0;
2513        };
2514        let mut pruned = 0_usize;
2515        let mut kept = Vec::with_capacity(events.len());
2516        for event in events.iter().cloned() {
2517            let is_reminder = event
2518                .as_dict()
2519                .and_then(|event| event.get("kind"))
2520                .map(VmValue::display)
2521                .as_deref()
2522                == Some(crate::llm::helpers::SYSTEM_REMINDER_EVENT_KIND);
2523            if !is_reminder {
2524                kept.push(event);
2525                continue;
2526            }
2527            let valid = crate::llm::helpers::reminder_from_event(&event)
2528                .is_some_and(|reminder| !reminder.body.trim().is_empty());
2529            if valid {
2530                kept.push(event);
2531            } else {
2532                pruned += 1;
2533            }
2534        }
2535        if pruned > 0 {
2536            let mut next = dict;
2537            next.insert(
2538                crate::value::intern_key("events"),
2539                VmValue::List(std::sync::Arc::new(kept)),
2540            );
2541            let _ = apply_transcript_with_budget(
2542                state,
2543                VmValue::dict(next),
2544                "prune_invalid_reminder_events",
2545            );
2546            state.touch();
2547        }
2548        pruned
2549    })
2550}
2551
2552/// Apply the reminder TTL lifecycle that runs once per completed agent
2553/// turn. Reminders with `ttl_turns = 1` expire and are removed; larger
2554/// finite TTLs are decremented in place. Expiry audit events are emitted
2555/// to the active EventLog when one is installed.
2556pub fn apply_reminder_post_turn(id: &str, turn: i64) -> Result<serde_json::Value, String> {
2557    let report = SESSIONS.with(|s| {
2558        let mut map = s.borrow_mut();
2559        let Some(state) = map.get_mut(id) else {
2560            return Err(format!(
2561                "agent_session_apply_reminder_post_turn: unknown session id '{id}'"
2562            ));
2563        };
2564        let report = crate::llm::helpers::apply_reminder_post_turn(&state.transcript, turn);
2565        if report.decremented_count > 0 || !report.expired.is_empty() {
2566            if let Some(next) = report.transcript.clone() {
2567                apply_transcript_with_budget(state, next, "apply_reminder_post_turn")?;
2568            }
2569            state.touch();
2570        }
2571        Ok(report)
2572    })?;
2573
2574    for reminder in &report.expired {
2575        let mut payload = crate::llm::helpers::reminder_lifecycle_payload(Some(id), reminder);
2576        if let Some(obj) = payload.as_object_mut() {
2577            obj.insert(
2578                "transcript_id".to_string(),
2579                serde_json::Value::String(id.to_string()),
2580            );
2581            obj.insert(
2582                "reason".to_string(),
2583                serde_json::Value::String("ttl".to_string()),
2584            );
2585            obj.insert(
2586                "ttl_turns_before".to_string(),
2587                serde_json::json!(&reminder.ttl_turns),
2588            );
2589            obj.insert("expired_at_turn".to_string(), serde_json::json!(turn));
2590        }
2591        crate::llm::helpers::emit_reminder_lifecycle_event(
2592            crate::llm::helpers::REMINDER_EXPIRED_EVENT_KIND,
2593            payload,
2594        );
2595    }
2596
2597    Ok(serde_json::json!({
2598        "expired_count": report.expired.len(),
2599        "decremented_count": report.decremented_count,
2600        "remaining_count": report.remaining_count,
2601    }))
2602}
2603
2604/// Inject a typed system reminder into the session transcript's event
2605/// stream. This mirrors `transcript.inject_reminder` for live sessions:
2606/// reminders with the same `dedupe_key` are replaced before the new
2607/// reminder event is appended.
2608pub fn inject_reminder(
2609    id: &str,
2610    reminder: crate::llm::helpers::SystemReminder,
2611) -> Result<ReminderInjectionReport, String> {
2612    let reminder_id = reminder.id.clone();
2613    let dedupe_key = reminder.dedupe_key.clone();
2614    let mut deduped_reminder_ids = Vec::new();
2615    SESSIONS.with(|s| {
2616        let mut map = s.borrow_mut();
2617        let Some(state) = map.get_mut(id) else {
2618            return Err(format!(
2619                "agent_session_inject_reminder: unknown session id '{id}'"
2620            ));
2621        };
2622        let dict = state
2623            .transcript
2624            .as_dict()
2625            .cloned()
2626            .unwrap_or_else(crate::value::DictMap::new);
2627        let mut events: Vec<VmValue> = match dict.get("events") {
2628            Some(VmValue::List(list)) => list.iter().cloned().collect(),
2629            _ => dict
2630                .get("messages")
2631                .and_then(|value| match value {
2632                    VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
2633                    _ => None,
2634                })
2635                .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
2636                .unwrap_or_default(),
2637        };
2638        if let Some(expected_key) = dedupe_key.as_deref() {
2639            events.retain(|event| {
2640                let Some(existing) = crate::llm::helpers::reminder_from_event(event) else {
2641                    return true;
2642                };
2643                if existing.dedupe_key.as_deref() == Some(expected_key) {
2644                    deduped_reminder_ids.push(existing.id);
2645                    false
2646                } else {
2647                    true
2648                }
2649            });
2650        }
2651        events.push(crate::llm::helpers::transcript_reminder_event(&reminder));
2652        let mut next = dict;
2653        next.insert(
2654            crate::value::intern_key("events"),
2655            VmValue::List(std::sync::Arc::new(events)),
2656        );
2657        apply_transcript_with_budget(state, VmValue::dict(next), "inject_reminder")?;
2658        state.touch();
2659        Ok(())
2660    })?;
2661
2662    if !deduped_reminder_ids.is_empty() {
2663        let dropped_count = deduped_reminder_ids.len();
2664        crate::llm::helpers::emit_reminder_lifecycle_event(
2665            crate::llm::helpers::REMINDER_DEDUPED_EVENT_KIND,
2666            serde_json::json!({
2667                "session_id": id,
2668                "transcript_id": id,
2669                "reminder_id": &reminder_id,
2670                "replacing_id": &reminder_id,
2671                "replaced_id": deduped_reminder_ids.first(),
2672                "replaced_ids": &deduped_reminder_ids,
2673                "dedupe_key": &dedupe_key,
2674                "dropped_reminder_ids": &deduped_reminder_ids,
2675                "dropped_count": dropped_count,
2676            }),
2677        );
2678    }
2679
2680    crate::llm::helpers::emit_reminder_lifecycle_event(
2681        crate::llm::helpers::REMINDER_INJECTED_EVENT_KIND,
2682        crate::llm::helpers::reminder_lifecycle_payload(Some(id), &reminder),
2683    );
2684
2685    Ok(ReminderInjectionReport {
2686        reminder_id,
2687        deduped_count: deduped_reminder_ids.len(),
2688    })
2689}
2690
2691/// Append a transcript event to the session without mutating its
2692/// message list. Used for orchestration-side lineage events (sub-agent
2693/// spawn/completion, workflow hooks, etc.) that should survive
2694/// persistence/replay without being replayed back into the model as
2695/// conversational messages.
2696pub fn append_event(id: &str, event: VmValue) -> Result<(), String> {
2697    let Some(event_dict) = event.as_dict() else {
2698        return Err("agent_session_append_event: event must be a dict".into());
2699    };
2700    let kind_ok = matches!(event_dict.get("kind"), Some(VmValue::String(_)));
2701    if !kind_ok {
2702        return Err("agent_session_append_event: event must have a string `kind`".into());
2703    }
2704    SESSIONS.with(|s| {
2705        let mut map = s.borrow_mut();
2706        let Some(state) = map.get_mut(id) else {
2707            return Err(format!(
2708                "agent_session_append_event: unknown session id '{id}'"
2709            ));
2710        };
2711        append_event_to_state(state, event, "append_event")?;
2712        Ok(())
2713    })
2714}
2715
2716fn append_event_to_state(
2717    state: &mut SessionState,
2718    event: VmValue,
2719    action: &str,
2720) -> Result<(), String> {
2721    let dict = state
2722        .transcript
2723        .as_dict()
2724        .cloned()
2725        .unwrap_or_else(crate::value::DictMap::new);
2726    let mut events: Vec<VmValue> = match dict.get("events") {
2727        Some(VmValue::List(list)) => list.iter().cloned().collect(),
2728        _ => dict
2729            .get("messages")
2730            .and_then(|value| match value {
2731                VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
2732                _ => None,
2733            })
2734            .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
2735            .unwrap_or_default(),
2736    };
2737    events.push(event);
2738    let mut next = dict;
2739    next.insert(
2740        crate::value::intern_key("events"),
2741        VmValue::List(std::sync::Arc::new(events)),
2742    );
2743    apply_transcript_with_budget(state, VmValue::dict(next), action)
2744}
2745
2746/// Replace the transcript's message list wholesale. Used by the
2747/// in-loop compaction path, which operates on JSON messages.
2748pub fn replace_messages(id: &str, messages: &[serde_json::Value]) -> Result<(), String> {
2749    replace_messages_with_summary(id, messages, None)
2750}
2751
2752/// Replace the transcript's message list and optionally update the
2753/// `summary` field on the persisted transcript. The compaction path
2754/// uses this to publish the human-readable rollup line that
2755/// `transcript_summary(transcript)` exposes to host code.
2756pub fn replace_messages_with_summary(
2757    id: &str,
2758    messages: &[serde_json::Value],
2759    summary: Option<&str>,
2760) -> Result<(), String> {
2761    SESSIONS.with(|s| {
2762        let mut map = s.borrow_mut();
2763        let Some(state) = map.get_mut(id) else {
2764            return Err(format!(
2765                "agent_session_replace_messages: unknown session id '{id}'"
2766            ));
2767        };
2768        let dict = state
2769            .transcript
2770            .as_dict()
2771            .cloned()
2772            .unwrap_or_else(crate::value::DictMap::new);
2773        let vm_messages: Vec<VmValue> = messages
2774            .iter()
2775            .map(crate::stdlib::json_to_vm_value)
2776            .collect();
2777        let mut next = dict;
2778        next.insert(
2779            crate::value::intern_key("events"),
2780            VmValue::List(std::sync::Arc::new(
2781                crate::llm::helpers::transcript_events_from_messages(&vm_messages),
2782            )),
2783        );
2784        next.insert(
2785            crate::value::intern_key("messages"),
2786            VmValue::List(std::sync::Arc::new(vm_messages)),
2787        );
2788        if let Some(summary) = summary {
2789            next.put_str("summary", summary);
2790        } else {
2791            next.remove("summary");
2792        }
2793        apply_transcript_with_budget(state, VmValue::dict(next), "replace_messages")?;
2794        Ok(())
2795    })
2796}
2797
2798pub fn append_subscriber(id: &str, callback: VmValue) {
2799    open_or_create(Some(id.to_string()));
2800    SESSIONS.with(|s| {
2801        if let Some(state) = s.borrow_mut().get_mut(id) {
2802            state.subscribers.push(callback);
2803            state.touch();
2804        }
2805    });
2806}
2807
2808pub fn subscribers_for(id: &str) -> Vec<VmValue> {
2809    SESSIONS.with(|s| {
2810        s.borrow()
2811            .get(id)
2812            .map(|state| state.subscribers.clone())
2813            .unwrap_or_default()
2814    })
2815}
2816
2817pub fn subscriber_count(id: &str) -> usize {
2818    SESSIONS.with(|s| {
2819        s.borrow()
2820            .get(id)
2821            .map(|state| state.subscribers.len())
2822            .unwrap_or(0)
2823    })
2824}
2825
2826/// Persist the set of active skill names for session resume. Called at
2827/// the end of an agent_loop run; the next `open_or_create` for this id
2828/// reads them back via [`active_skills`].
2829pub fn set_active_skills(id: &str, skills: Vec<String>) {
2830    SESSIONS.with(|s| {
2831        if let Some(state) = s.borrow_mut().get_mut(id) {
2832            state.active_skills = skills;
2833            state.touch();
2834        }
2835    });
2836}
2837
2838/// Skills that were active at the end of the previous agent_loop run
2839/// against this session. Returns an empty vec when the session doesn't
2840/// exist or nothing was persisted.
2841pub fn active_skills(id: &str) -> Vec<String> {
2842    SESSIONS.with(|s| {
2843        s.borrow()
2844            .get(id)
2845            .map(|state| state.active_skills.clone())
2846            .unwrap_or_default()
2847    })
2848}
2849
2850/// Claim the tool-calling contract for a session.
2851///
2852/// The first loop against a named session records its `tool_format`.
2853/// Later re-entry must use the same format so prompt/history generated
2854/// under a text contract is never replayed as native, or vice versa.
2855pub fn claim_tool_format(id: &str, tool_format: &str) -> Result<(), String> {
2856    let tool_format = tool_format.trim();
2857    if tool_format.is_empty() {
2858        return Ok(());
2859    }
2860    SESSIONS.with(|s| {
2861        let mut map = s.borrow_mut();
2862        let Some(state) = map.get_mut(id) else {
2863            return Err(format!("agent session '{id}' does not exist"));
2864        };
2865        match state.tool_format.as_deref() {
2866            Some(existing) if existing != tool_format => Err(format!(
2867                "agent session '{id}' is locked to tool_format='{existing}', but this run requested tool_format='{tool_format}'. Start a new session or fork/reset the transcript before changing tool mode."
2868            )),
2869            Some(_) => {
2870                state.touch();
2871                Ok(())
2872            }
2873            None => {
2874                state.tool_format = Some(tool_format.to_string());
2875                state.touch();
2876                Ok(())
2877            }
2878        }
2879    })
2880}
2881
2882pub fn tool_format(id: &str) -> Option<String> {
2883    SESSIONS.with(|s| {
2884        s.borrow()
2885            .get(id)
2886            .and_then(|state| state.tool_format.clone())
2887    })
2888}
2889
2890pub fn record_system_prompt(id: &str, system_prompt: &str) -> Result<(), String> {
2891    let system_prompt = system_prompt.trim();
2892    if system_prompt.is_empty() {
2893        return Ok(());
2894    }
2895    assert_cache_stable_system_prompt(system_prompt);
2896    SESSIONS.with(|s| {
2897        let mut map = s.borrow_mut();
2898        let Some(state) = map.get_mut(id) else {
2899            return Err(format!("agent session '{id}' does not exist"));
2900        };
2901        let changed = state.system_prompt.as_deref() != Some(system_prompt);
2902        state.system_prompt = Some(system_prompt.to_string());
2903        let dict = state
2904            .transcript
2905            .as_dict()
2906            .cloned()
2907            .unwrap_or_else(crate::value::DictMap::new);
2908        let mut next = dict;
2909        apply_system_prompt_metadata(&mut next, system_prompt);
2910        if changed {
2911            let mut events: Vec<VmValue> = match next.get("events") {
2912                Some(VmValue::List(list)) => list.iter().cloned().collect(),
2913                _ => Vec::new(),
2914            };
2915            events.push(crate::llm::helpers::transcript_event(
2916                "system_prompt",
2917                "system",
2918                "internal",
2919                "",
2920                Some(crate::llm::helpers::system_prompt_event_metadata(
2921                    system_prompt,
2922                )),
2923            ));
2924            next.insert(
2925                crate::value::intern_key("events"),
2926                VmValue::List(std::sync::Arc::new(events)),
2927            );
2928        }
2929        apply_transcript_with_budget(state, VmValue::dict(next), "record_system_prompt")?;
2930        Ok(())
2931    })
2932}
2933
2934pub fn system_prompt(id: &str) -> Option<String> {
2935    SESSIONS.with(|s| {
2936        s.borrow()
2937            .get(id)
2938            .and_then(|state| state.system_prompt.clone())
2939    })
2940}
2941
2942#[cfg(debug_assertions)]
2943fn forbidden_workspace_prompt_token(system_prompt: &str) -> Option<&'static str> {
2944    let mut remaining = system_prompt;
2945    while let Some(index) = remaining.find("{{") {
2946        let candidate = remaining[index + 2..].trim_start();
2947        if candidate.starts_with("workspace_") {
2948            return Some("workspace_");
2949        }
2950        if candidate.starts_with("project_") {
2951            return Some("project_");
2952        }
2953        remaining = candidate;
2954    }
2955    None
2956}
2957
2958#[cfg(debug_assertions)]
2959fn assert_cache_stable_system_prompt(system_prompt: &str) {
2960    if let Some(prefix) = forbidden_workspace_prompt_token(system_prompt) {
2961        panic!(
2962            "{CACHE_STABLE_SYSTEM_PROMPT_DIAGNOSTIC}: session system prompts must not interpolate `{{{{{prefix}...` tokens; move workspace/project context into the workspace-anchor reminder"
2963        );
2964    }
2965}
2966
2967#[cfg(not(debug_assertions))]
2968fn assert_cache_stable_system_prompt(_system_prompt: &str) {}
2969
2970/// Pin (or clear, with `None`) a model selector on a session. Returns
2971/// `Ok(true)` when the value actually changed so callers can decide
2972/// whether to broadcast a notification. The selector is stored verbatim
2973/// — alias / catalog resolution is the call-site's job.
2974pub fn set_pinned_model(id: &str, model: Option<String>) -> Result<bool, String> {
2975    let normalized = model
2976        .map(|value| value.trim().to_string())
2977        .filter(|value| !value.is_empty());
2978    SESSIONS.with(|s| {
2979        let mut map = s.borrow_mut();
2980        let Some(state) = map.get_mut(id) else {
2981            return Err(format!("agent session '{id}' does not exist"));
2982        };
2983        let changed = state.pinned_model != normalized;
2984        state.pinned_model = normalized;
2985        state.touch();
2986        Ok(changed)
2987    })
2988}
2989
2990/// Read the session's pinned model selector, if any. Consumed by
2991/// `vm_resolve_model` as the per-session default when a script-level
2992/// `llm_call` does not pass `model:` explicitly.
2993pub fn pinned_model(id: &str) -> Option<String> {
2994    SESSIONS.with(|s| {
2995        s.borrow()
2996            .get(id)
2997            .and_then(|state| state.pinned_model.clone())
2998    })
2999}
3000
3001/// Pin (or clear) the session-level provider-aware reasoning policy.
3002pub fn set_pinned_reasoning_policy(id: &str, policy: Option<String>) -> Result<bool, String> {
3003    let normalized = match policy {
3004        Some(value) => crate::llm::reasoning_policy::normalize_policy_selector(&value)?,
3005        None => None,
3006    };
3007    SESSIONS.with(|s| {
3008        let mut map = s.borrow_mut();
3009        let Some(state) = map.get_mut(id) else {
3010            return Err(format!("agent session '{id}' does not exist"));
3011        };
3012        let changed = state.pinned_reasoning_policy != normalized;
3013        state.pinned_reasoning_policy = normalized;
3014        state.touch();
3015        Ok(changed)
3016    })
3017}
3018
3019/// Read the session's pinned reasoning policy, if any.
3020pub fn pinned_reasoning_policy(id: &str) -> Option<String> {
3021    SESSIONS.with(|s| {
3022        s.borrow()
3023            .get(id)
3024            .and_then(|state| state.pinned_reasoning_policy.clone())
3025    })
3026}
3027
3028/// Set (or clear, with `None`) the typed workspace anchor on a session.
3029/// Returns `Ok(true)` when the value actually changed so callers can
3030/// decide whether to broadcast `AnchorChanged` notifications.
3031pub fn set_workspace_anchor(id: &str, anchor: Option<WorkspaceAnchor>) -> Result<bool, String> {
3032    SESSIONS.with(|s| {
3033        let mut map = s.borrow_mut();
3034        let Some(state) = map.get_mut(id) else {
3035            return Err(format!("agent session '{id}' does not exist"));
3036        };
3037        let changed = state.workspace_anchor != anchor;
3038        state.workspace_anchor = anchor;
3039        if changed {
3040            state.redo_stack.clear();
3041            crate::llm::permissions::clear_session_grants(id);
3042        }
3043        state.touch();
3044        Ok(changed)
3045    })
3046}
3047
3048/// Read the session's typed workspace anchor, if any.
3049pub fn workspace_anchor(id: &str) -> Option<WorkspaceAnchor> {
3050    SESSIONS.with(|s| {
3051        s.borrow()
3052            .get(id)
3053            .and_then(|state| state.workspace_anchor.clone())
3054    })
3055}
3056
3057/// Outcome of `reanchor_session`: previous + new anchor and whether the
3058/// swap actually moved anything. Callers use `changed` to suppress
3059/// no-op transcript / live events.
3060#[derive(Clone, Debug, PartialEq, Eq)]
3061pub struct ReanchorOutcome {
3062    pub previous: Option<WorkspaceAnchor>,
3063    pub current: WorkspaceAnchor,
3064    pub changed: bool,
3065}
3066
3067/// Atomically swap the session's primary anchor + emit the canonical
3068/// `AnchorChanged` transcript event and live `AgentEvent::AnchorChanged`
3069/// notification (#2218). Clears session-scoped permission grants so
3070/// stale anchor-based decisions don't leak into the next turn.
3071pub fn reanchor_session(
3072    id: &str,
3073    new_anchor: WorkspaceAnchor,
3074    carry_transcript: bool,
3075    compacted: bool,
3076    reason: Option<String>,
3077) -> Result<ReanchorOutcome, String> {
3078    let outcome = SESSIONS.with(|s| {
3079        let mut map = s.borrow_mut();
3080        let Some(state) = map.get_mut(id) else {
3081            return Err(format!("agent session '{id}' does not exist"));
3082        };
3083        let previous = state.workspace_anchor.clone();
3084        let changed = previous.as_ref() != Some(&new_anchor);
3085        state.workspace_anchor = Some(new_anchor.clone());
3086        if changed {
3087            crate::llm::permissions::clear_session_grants(id);
3088        }
3089        state.touch();
3090        Ok(ReanchorOutcome {
3091            previous,
3092            current: new_anchor,
3093            changed,
3094        })
3095    })?;
3096    if !outcome.changed {
3097        return Ok(outcome);
3098    }
3099    let previous_json = outcome.previous.as_ref().map(WorkspaceAnchor::to_json);
3100    let current_json = outcome.current.to_json();
3101    let event_metadata = serde_json::json!({
3102        "previous": previous_json,
3103        "current": current_json,
3104        "carry_transcript": carry_transcript,
3105        "compacted": compacted,
3106        "reason": reason,
3107    });
3108    let event = crate::llm::helpers::transcript_event(
3109        "AnchorChanged",
3110        "system",
3111        "internal",
3112        "",
3113        Some(event_metadata),
3114    );
3115    let _ = append_event(id, event);
3116    crate::llm::emit_live_agent_event_sync(&crate::agent_events::AgentEvent::AnchorChanged {
3117        session_id: id.to_string(),
3118        previous: previous_json,
3119        current: current_json,
3120        carry_transcript,
3121        compacted,
3122        reason,
3123    });
3124    Ok(outcome)
3125}
3126
3127/// Set session-local workspace defaults. Returns `Ok(true)` when the
3128/// policy changed.
3129pub fn set_workspace_policy(id: &str, policy: WorkspacePolicy) -> Result<bool, String> {
3130    SESSIONS.with(|s| {
3131        let mut map = s.borrow_mut();
3132        let Some(state) = map.get_mut(id) else {
3133            return Err(format!("agent session '{id}' does not exist"));
3134        };
3135        let changed = state.workspace_policy != policy;
3136        state.workspace_policy = policy;
3137        if changed {
3138            state.redo_stack.clear();
3139        }
3140        state.touch();
3141        Ok(changed)
3142    })
3143}
3144
3145/// Read the session's workspace policy, if the session exists.
3146pub fn workspace_policy(id: &str) -> Option<WorkspacePolicy> {
3147    SESSIONS.with(|s| {
3148        s.borrow()
3149            .get(id)
3150            .map(|state| state.workspace_policy.clone())
3151    })
3152}
3153
3154/// Validate and mount an additional workspace root on an anchored
3155/// session. When the path is already mounted, updates its mount mode
3156/// in place and refreshes its `mounted_at` timestamp.
3157pub fn add_workspace_root(
3158    id: &str,
3159    root: &str,
3160    mount_mode: Option<MountMode>,
3161    reason: Option<String>,
3162) -> Result<String, String> {
3163    let normalized_root = validate_workspace_root_path(root)?;
3164    let mounted_at = crate::orchestration::now_rfc3339();
3165    SESSIONS.with(|s| {
3166        let mut map = s.borrow_mut();
3167        let Some(state) = map.get_mut(id) else {
3168            return Err(format!("agent session '{id}' does not exist"));
3169        };
3170        let default_mount_mode = state.workspace_policy.default_mount_mode;
3171        let Some(anchor) = state.workspace_anchor.as_mut() else {
3172            return Err(format!("agent session '{id}' has no workspace anchor"));
3173        };
3174        let resolved_mount_mode = mount_mode.unwrap_or(default_mount_mode);
3175        if let Some(existing) = anchor
3176            .additional_roots
3177            .iter_mut()
3178            .find(|entry| entry.path == normalized_root)
3179        {
3180            let changed =
3181                existing.mount_mode != resolved_mount_mode || existing.mounted_at != mounted_at;
3182            existing.mount_mode = resolved_mount_mode;
3183            existing.mounted_at = mounted_at.clone();
3184            if changed {
3185                state.redo_stack.clear();
3186            }
3187        } else {
3188            anchor.additional_roots.push(MountedRoot {
3189                path: normalized_root.clone(),
3190                mount_mode: resolved_mount_mode,
3191                mounted_at: mounted_at.clone(),
3192            });
3193            state.redo_stack.clear();
3194        }
3195        let event = crate::llm::helpers::transcript_event(
3196            "RootMounted",
3197            "system",
3198            "internal",
3199            "",
3200            Some(serde_json::json!({
3201                "path": normalized_root.to_string_lossy(),
3202                "mount_mode": resolved_mount_mode.as_str(),
3203                "mounted_at": mounted_at.clone(),
3204                "reason": reason,
3205            })),
3206        );
3207        append_event_to_state(state, event, "add_workspace_root")?;
3208        crate::llm::permissions::clear_session_grants(id);
3209        state.touch();
3210        Ok(mounted_at.clone())
3211    })
3212}
3213
3214/// Remove one mounted root from an anchored session. Returns whether an
3215/// existing mount entry was deleted. Removing an absent root is a no-op.
3216pub fn remove_workspace_root(id: &str, root: &str) -> Result<bool, String> {
3217    let normalized_root = normalize_workspace_root_path(root);
3218    SESSIONS.with(|s| {
3219        let mut map = s.borrow_mut();
3220        let Some(state) = map.get_mut(id) else {
3221            return Err(format!("agent session '{id}' does not exist"));
3222        };
3223        let Some(anchor) = state.workspace_anchor.as_mut() else {
3224            return Err(format!("agent session '{id}' has no workspace anchor"));
3225        };
3226        let before = anchor.additional_roots.len();
3227        anchor
3228            .additional_roots
3229            .retain(|entry| entry.path != normalized_root);
3230        let removed = anchor.additional_roots.len() != before;
3231        if removed {
3232            state.redo_stack.clear();
3233            crate::llm::permissions::clear_session_grants(id);
3234        }
3235        state.touch();
3236        Ok(removed)
3237    })
3238}
3239
3240pub fn list_workspace_roots(id: &str) -> Result<(PathBuf, Vec<MountedRoot>), String> {
3241    SESSIONS.with(|s| {
3242        let map = s.borrow();
3243        let Some(state) = map.get(id) else {
3244            return Err(format!("agent session '{id}' does not exist"));
3245        };
3246        let Some(anchor) = state.workspace_anchor.as_ref() else {
3247            return Err(format!("agent session '{id}' has no workspace anchor"));
3248        };
3249        Ok((anchor.primary.clone(), anchor.additional_roots.clone()))
3250    })
3251}
3252
3253fn validate_workspace_root_path(root: &str) -> Result<PathBuf, String> {
3254    let normalized = normalize_workspace_root_path(root);
3255    let canonical = std::fs::canonicalize(&normalized)
3256        .map_err(|error| format!("workspace root '{root}' must exist and be readable: {error}"))?;
3257    let metadata = std::fs::metadata(&canonical)
3258        .map_err(|error| format!("workspace root '{root}' must exist and be readable: {error}"))?;
3259    if !metadata.is_dir() {
3260        return Err(format!("workspace root '{root}' must be a directory"));
3261    }
3262    std::fs::read_dir(&canonical)
3263        .map_err(|error| format!("workspace root '{root}' must be readable: {error}"))?;
3264    Ok(canonical)
3265}
3266
3267fn normalize_workspace_root_path(root: &str) -> PathBuf {
3268    let absolute = crate::stdlib::process::normalize_context_path(Path::new(root));
3269    std::fs::canonicalize(&absolute).unwrap_or(absolute)
3270}
3271
3272fn empty_transcript(id: &str) -> VmValue {
3273    use crate::llm::helpers::new_transcript_with;
3274    new_transcript_with(Some(id.to_string()), Vec::new(), None, None)
3275}
3276
3277fn clone_transcript_with_id(transcript: &VmValue, new_id: &str) -> VmValue {
3278    let Some(dict) = transcript.as_dict() else {
3279        return empty_transcript(new_id);
3280    };
3281    let mut next = dict.clone();
3282    next.put_str("id", new_id);
3283    VmValue::dict(next)
3284}
3285
3286fn clone_transcript_with_parent(transcript: &VmValue, parent_id: &str) -> VmValue {
3287    let Some(dict) = transcript.as_dict() else {
3288        return transcript.clone();
3289    };
3290    let mut next = dict.clone();
3291    let metadata = match next.get("metadata") {
3292        Some(VmValue::Dict(metadata)) => {
3293            let mut metadata = metadata.as_ref().clone();
3294            metadata.put_str("parent_session_id", parent_id);
3295            VmValue::dict(metadata)
3296        }
3297        _ => VmValue::dict(BTreeMap::from([(
3298            "parent_session_id".to_string(),
3299            VmValue::String(arcstr::ArcStr::from(parent_id.to_string())),
3300        )])),
3301    };
3302    next.insert(crate::value::intern_key("metadata"), metadata);
3303    VmValue::dict(next)
3304}
3305
3306fn apply_system_prompt_metadata(next: &mut crate::value::DictMap, system_prompt: &str) {
3307    let mut metadata = match next.get("metadata") {
3308        Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
3309        _ => crate::value::DictMap::new(),
3310    };
3311    metadata.insert(
3312        crate::value::intern_key("system_prompt"),
3313        crate::stdlib::json_to_vm_value(&crate::llm::helpers::system_prompt_metadata(
3314            system_prompt,
3315        )),
3316    );
3317    next.insert(
3318        crate::value::intern_key("metadata"),
3319        VmValue::dict(metadata),
3320    );
3321}
3322
3323fn transcript_with_session_metadata(transcript: VmValue, state: &SessionState) -> VmValue {
3324    let Some(dict) = transcript.as_dict() else {
3325        return transcript;
3326    };
3327    let mut next = dict.clone();
3328    let mut metadata = match next.get("metadata") {
3329        Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
3330        _ => crate::value::DictMap::new(),
3331    };
3332    if let Some(tool_format) = state.tool_format.as_ref() {
3333        metadata.put_str("tool_format", tool_format.clone());
3334        metadata.insert(
3335            crate::value::intern_key("tool_mode_locked"),
3336            VmValue::Bool(true),
3337        );
3338    }
3339    if let Some(system_prompt) = state.system_prompt.as_ref() {
3340        metadata.insert(
3341            crate::value::intern_key("system_prompt"),
3342            crate::stdlib::json_to_vm_value(&crate::llm::helpers::system_prompt_metadata(
3343                system_prompt,
3344            )),
3345        );
3346    }
3347    if let Some(actor_chain) = state.actor_chain.as_ref() {
3348        metadata.insert(
3349            crate::value::intern_key("actor_chain"),
3350            actor_chain.to_vm_value(),
3351        );
3352    } else {
3353        metadata.remove("actor_chain");
3354    }
3355    if let Some(anchor) = state.workspace_anchor.as_ref() {
3356        metadata.insert(
3357            crate::value::intern_key(WORKSPACE_ANCHOR_METADATA_KEY),
3358            anchor.to_vm_value(),
3359        );
3360    } else {
3361        metadata.remove(WORKSPACE_ANCHOR_METADATA_KEY);
3362    }
3363    if let Some(scratchpad) = state.scratchpad.as_ref() {
3364        metadata.insert(
3365            crate::value::intern_key("agent_scratchpad"),
3366            scratchpad.clone(),
3367        );
3368        metadata.insert(
3369            crate::value::intern_key("agent_scratchpad_version"),
3370            VmValue::Int(state.scratchpad_version as i64),
3371        );
3372    } else {
3373        metadata.remove("agent_scratchpad");
3374        metadata.remove("agent_scratchpad_version");
3375    }
3376    if let Some(last_action) = state.last_transcript_budget_action.as_ref() {
3377        let usage = transcript_usage(
3378            &VmValue::dict(next.clone()),
3379            state.transcript_budget_policy.max_approx_bytes.is_some(),
3380        );
3381        metadata.insert(
3382            crate::value::intern_key("transcript_budget"),
3383            crate::stdlib::json_to_vm_value(&serde_json::json!({
3384                "policy": transcript_budget_policy_json(&state.transcript_budget_policy.normalized()),
3385                "usage": transcript_budget_usage_json(&usage),
3386                "last_action": last_action,
3387            })),
3388        );
3389    }
3390    if !metadata.is_empty() {
3391        next.insert(
3392            crate::value::intern_key("metadata"),
3393            VmValue::dict(metadata),
3394        );
3395    }
3396    VmValue::dict(next)
3397}
3398
3399fn session_snapshot(state: &SessionState) -> VmValue {
3400    let transcript = transcript_with_session_metadata(state.transcript.clone(), state);
3401    let Some(dict) = transcript.as_dict() else {
3402        return state.transcript.clone();
3403    };
3404    let mut next = dict.clone();
3405    let length = next
3406        .get("messages")
3407        .and_then(|value| match value {
3408            VmValue::List(list) => Some(list.len() as i64),
3409            _ => None,
3410        })
3411        .unwrap_or(0);
3412    next.insert(crate::value::intern_key("length"), VmValue::Int(length));
3413    next.put_str("created_at", state.created_at.clone());
3414    next.insert(
3415        crate::value::intern_key("parent_id"),
3416        state
3417            .parent_id
3418            .as_ref()
3419            .map(|id| VmValue::String(arcstr::ArcStr::from(id.clone())))
3420            .unwrap_or(VmValue::Nil),
3421    );
3422    next.insert(
3423        crate::value::intern_key("child_ids"),
3424        VmValue::List(std::sync::Arc::new(
3425            state
3426                .child_ids
3427                .iter()
3428                .cloned()
3429                .map(|id| VmValue::String(arcstr::ArcStr::from(id)))
3430                .collect(),
3431        )),
3432    );
3433    next.insert(
3434        crate::value::intern_key("branched_at_event_index"),
3435        state
3436            .branched_at_event_index
3437            .map(|index| VmValue::Int(index as i64))
3438            .unwrap_or(VmValue::Nil),
3439    );
3440    next.insert(
3441        crate::value::intern_key("system_prompt"),
3442        state
3443            .system_prompt
3444            .as_ref()
3445            .map(|prompt| VmValue::String(arcstr::ArcStr::from(prompt.clone())))
3446            .unwrap_or(VmValue::Nil),
3447    );
3448    next.insert(
3449        crate::value::intern_key("tool_format"),
3450        state
3451            .tool_format
3452            .as_ref()
3453            .map(|format| VmValue::String(arcstr::ArcStr::from(format.clone())))
3454            .unwrap_or(VmValue::Nil),
3455    );
3456    next.insert(
3457        crate::value::intern_key("pinned_model"),
3458        state
3459            .pinned_model
3460            .as_ref()
3461            .map(|model| VmValue::String(arcstr::ArcStr::from(model.clone())))
3462            .unwrap_or(VmValue::Nil),
3463    );
3464    next.insert(
3465        crate::value::intern_key("pinned_reasoning_policy"),
3466        state
3467            .pinned_reasoning_policy
3468            .as_ref()
3469            .map(|policy| VmValue::String(arcstr::ArcStr::from(policy.clone())))
3470            .unwrap_or(VmValue::Nil),
3471    );
3472    next.insert(
3473        crate::value::intern_key("actor_chain"),
3474        state
3475            .actor_chain
3476            .as_ref()
3477            .map(ActorChain::to_vm_value)
3478            .unwrap_or(VmValue::Nil),
3479    );
3480    next.insert(
3481        crate::value::intern_key("scratchpad"),
3482        state.scratchpad.clone().unwrap_or(VmValue::Nil),
3483    );
3484    next.insert(
3485        crate::value::intern_key("scratchpad_version"),
3486        VmValue::Int(state.scratchpad_version as i64),
3487    );
3488    next.insert(
3489        crate::value::intern_key("workspace_anchor"),
3490        state
3491            .workspace_anchor
3492            .as_ref()
3493            .map(WorkspaceAnchor::to_vm_value)
3494            .unwrap_or(VmValue::Nil),
3495    );
3496    next.insert(
3497        crate::value::intern_key("workspace_policy"),
3498        state.workspace_policy.to_vm_value(),
3499    );
3500    next.insert(
3501        crate::value::intern_key("live_clients"),
3502        crate::stdlib::json_to_vm_value(&serde_json::Value::Array(
3503            state.live_clients.values().map(live_client_json).collect(),
3504        )),
3505    );
3506    next.insert(
3507        crate::value::intern_key("live_controller_id"),
3508        state
3509            .live_controller_id
3510            .as_ref()
3511            .map(|id| VmValue::String(arcstr::ArcStr::from(id.clone())))
3512            .unwrap_or(VmValue::Nil),
3513    );
3514    next.insert(
3515        crate::value::intern_key("completed_turn_checkpoint_count"),
3516        VmValue::Int(state.completed_turn_checkpoints.len() as i64),
3517    );
3518    next.insert(
3519        crate::value::intern_key("redo_checkpoint_count"),
3520        VmValue::Int(state.redo_stack.len() as i64),
3521    );
3522    VmValue::dict(next)
3523}
3524
3525fn update_lineage(
3526    map: &mut HashMap<String, SessionState>,
3527    parent_id: &str,
3528    child_id: &str,
3529    branched_at_event_index: Option<usize>,
3530) {
3531    let old_parent_id = map.get(child_id).and_then(|child| child.parent_id.clone());
3532    if let Some(old_parent_id) = old_parent_id.filter(|old_parent_id| old_parent_id != parent_id) {
3533        if let Some(old_parent) = map.get_mut(&old_parent_id) {
3534            old_parent.child_ids.retain(|id| id != child_id);
3535            old_parent.touch();
3536        }
3537    }
3538    if let Some(parent) = map.get_mut(parent_id) {
3539        parent.touch();
3540        if !parent.child_ids.iter().any(|id| id == child_id) {
3541            parent.child_ids.push(child_id.to_string());
3542        }
3543    }
3544    if let Some(child) = map.get_mut(child_id) {
3545        child.touch();
3546        child.parent_id = Some(parent_id.to_string());
3547        child.branched_at_event_index = branched_at_event_index;
3548        child.transcript = clone_transcript_with_parent(&child.transcript, parent_id);
3549    }
3550}
3551
3552fn branch_event_index(transcript: &VmValue, keep_first: usize) -> usize {
3553    if keep_first == 0 {
3554        return 0;
3555    }
3556    let Some(dict) = transcript.as_dict() else {
3557        return keep_first;
3558    };
3559    let Some(VmValue::List(events)) = dict.get("events") else {
3560        return keep_first;
3561    };
3562    event_prefix_len_for_messages(events, keep_first)
3563}
3564
3565fn event_kind(event: &VmValue) -> Option<String> {
3566    event
3567        .as_dict()
3568        .and_then(|dict| dict.get("kind"))
3569        .map(VmValue::display)
3570}
3571
3572fn event_id(event: &VmValue) -> Option<String> {
3573    event
3574        .as_dict()
3575        .and_then(|dict| dict.get("id"))
3576        .map(VmValue::display)
3577}
3578
3579fn is_turn_event(event: &VmValue) -> bool {
3580    matches!(
3581        event_kind(event).as_deref(),
3582        Some("message" | "tool_result")
3583    )
3584}
3585
3586fn event_prefix_len_for_messages(events: &[VmValue], keep_first: usize) -> usize {
3587    if keep_first == 0 {
3588        return 0;
3589    }
3590    let mut retained_messages = 0usize;
3591    for (index, event) in events.iter().enumerate() {
3592        if is_turn_event(event) {
3593            retained_messages += 1;
3594            if retained_messages == keep_first {
3595                return index + 1;
3596            }
3597        }
3598    }
3599    events.len()
3600}
3601
3602fn turn_event_id_for_count(events: &[VmValue], keep_first: usize) -> Option<String> {
3603    if keep_first == 0 {
3604        return None;
3605    }
3606    let mut retained_messages = 0usize;
3607    for event in events {
3608        if is_turn_event(event) {
3609            retained_messages += 1;
3610            if retained_messages == keep_first {
3611                return event_id(event);
3612            }
3613        }
3614    }
3615    None
3616}
3617
3618#[cfg(test)]
3619#[path = "agent_sessions_tests.rs"]
3620mod tests;