Skip to main content

harn_vm/
agent_sessions.rs

1//! First-class session storage.
2//!
3//! A session owns three things:
4//!
5//! 1. A transcript dict (messages, events, summary, metadata, …).
6//! 2. Closure subscribers that fire on agent-loop events for this session.
7//! 3. Its own lifecycle (open, reset, fork, trim, compact, close).
8//!
9//! Storage is thread-local because `VmValue` contains `Rc`, which is
10//! neither `Send` nor `Sync`. The agent loop runs on a tokio
11//! current-thread worker, so all session reads and writes happen on the
12//! same thread. The closure-subscribers register, fire, and unregister
13//! on that same thread.
14//!
15//! Lifecycle is explicit. Builtins (`agent_session_open`,
16//! `_reset`, `_fork`, `_fork_at`, `_close`, `_trim`, `_compact`,
17//! `_inject`, `_exists`, `_length`, `_snapshot`, `_ancestry`) drive
18//! the store directly — there is no "policy" config dict that
19//! performs lifecycle as a side effect.
20
21use std::cell::{Cell, RefCell};
22use std::collections::{BTreeMap, HashMap, HashSet};
23use std::future::Future;
24use std::path::{Path, PathBuf};
25use std::rc::Rc;
26use std::time::Instant;
27
28use crate::runtime_limits::RuntimeLimits;
29use crate::value::VmValue;
30use crate::workspace_anchor::{
31    MountMode, MountedRoot, WorkspaceAnchor, WorkspacePolicy, WORKSPACE_ANCHOR_METADATA_KEY,
32};
33
34/// Default cap on concurrent sessions per VM thread. Beyond this the
35/// least-recently-accessed session is evicted on the next `open`.
36pub const DEFAULT_SESSION_CAP: usize = RuntimeLimits::DEFAULT.max_agent_sessions;
37
38/// Default cap on retained prompt-visible messages per session. The
39/// limit is intentionally high enough for normal long-running agents
40/// while still bounding accidental unbounded growth.
41pub const DEFAULT_TRANSCRIPT_MESSAGE_CAP: usize = 4096;
42
43/// Default cap on retained transcript audit events per session. Events
44/// include message-derived entries plus orchestration lifecycle records.
45pub const DEFAULT_TRANSCRIPT_EVENT_CAP: usize = 32768;
46#[cfg(debug_assertions)]
47const CACHE_STABLE_SYSTEM_PROMPT_DIAGNOSTIC: &str = "HARN-CACHE-001";
48
49#[derive(Clone, Debug, PartialEq, Eq)]
50pub enum TranscriptBudgetRecovery {
51    Reject,
52    Trim { keep_last: usize },
53    Compact { keep_last: usize },
54}
55
56#[derive(Clone, Debug, PartialEq, Eq)]
57pub struct SessionTranscriptBudgetPolicy {
58    pub max_messages: usize,
59    pub max_events: usize,
60    pub max_approx_bytes: Option<usize>,
61    pub recovery: TranscriptBudgetRecovery,
62}
63
64impl SessionTranscriptBudgetPolicy {
65    pub fn reject(max_messages: usize, max_events: usize) -> Self {
66        Self {
67            max_messages: max_messages.max(1),
68            max_events: max_events.max(1),
69            max_approx_bytes: None,
70            recovery: TranscriptBudgetRecovery::Reject,
71        }
72    }
73
74    pub fn trim(max_messages: usize, max_events: usize, keep_last: usize) -> Self {
75        Self {
76            max_messages: max_messages.max(1),
77            max_events: max_events.max(1),
78            max_approx_bytes: None,
79            recovery: TranscriptBudgetRecovery::Trim { keep_last },
80        }
81    }
82
83    pub fn compact(max_messages: usize, max_events: usize, keep_last: usize) -> Self {
84        Self {
85            max_messages: max_messages.max(1),
86            max_events: max_events.max(1),
87            max_approx_bytes: None,
88            recovery: TranscriptBudgetRecovery::Compact { keep_last },
89        }
90    }
91
92    pub fn with_max_approx_bytes(mut self, max_approx_bytes: Option<usize>) -> Self {
93        self.max_approx_bytes = max_approx_bytes.map(|limit| limit.max(1));
94        self
95    }
96
97    fn normalized(&self) -> Self {
98        Self {
99            max_messages: self.max_messages.max(1),
100            max_events: self.max_events.max(1),
101            max_approx_bytes: self.max_approx_bytes.map(|limit| limit.max(1)),
102            recovery: self.recovery.clone(),
103        }
104    }
105}
106
107impl Default for SessionTranscriptBudgetPolicy {
108    fn default() -> Self {
109        Self::reject(DEFAULT_TRANSCRIPT_MESSAGE_CAP, DEFAULT_TRANSCRIPT_EVENT_CAP)
110    }
111}
112
113pub struct SessionState {
114    pub id: String,
115    pub transcript: VmValue,
116    pub subscribers: Vec<VmValue>,
117    pub created_at: String,
118    pub last_accessed: Instant,
119    pub parent_id: Option<String>,
120    pub child_ids: Vec<String>,
121    pub branched_at_event_index: Option<usize>,
122    /// Names of skills that were active at the end of the most recent
123    /// `agent_loop` run on this session. Empty when no skills were
124    /// matched, when the skill system wasn't used, or when the
125    /// deactivation phase cleared them. Re-entering the session
126    /// restores these as the initial active set before matching runs.
127    pub active_skills: Vec<String>,
128    /// Tool-calling protocol claimed by the first agent loop that uses
129    /// this session. A transcript is only replayable under the same
130    /// contract that produced its prompt/history.
131    pub tool_format: Option<String>,
132    /// Stable session-level system prompt material. This is transcript
133    /// metadata, not a replay message: providers receive it through
134    /// their system/developer instruction channel on each call.
135    pub system_prompt: Option<String>,
136    /// Session-pinned model selector. When set, `llm_call` invocations
137    /// that do not pass an explicit `model:` option resolve to this
138    /// selector instead of `HARN_LLM_MODEL` / provider defaults. Mid-
139    /// session swap is exposed over ACP via `session/set_config_option`
140    /// (configId="model").
141    pub pinned_model: Option<String>,
142    /// Session-pinned high-level reasoning policy. When set, `llm_call`
143    /// invocations that do not pass explicit `thinking` or
144    /// `reasoning_effort` options resolve this provider-aware policy into
145    /// the route's native thinking shape. Exposed over ACP as
146    /// `session/set_config_option(configId="thought_level")`.
147    pub pinned_reasoning_policy: Option<String>,
148    /// Session-local workspace defaults. Persona and host policy layers
149    /// can update this without rewriting the current anchor.
150    pub workspace_policy: WorkspacePolicy,
151    /// Typed workspace anchor for the session. Primary path plus any
152    /// additional mounted roots; consumed by permission matchers, the
153    /// bundle exporter, and host-side cross-project handoff flows
154    /// (epic #2208). `None` until a host opens the session with one or
155    /// the ACP `reanchor` / `add_root` primitives populate it.
156    pub workspace_anchor: Option<WorkspaceAnchor>,
157    pub transcript_budget_policy: SessionTranscriptBudgetPolicy,
158    pub last_transcript_budget_action: Option<serde_json::Value>,
159}
160
161impl SessionState {
162    fn new(id: String) -> Self {
163        let now = Instant::now();
164        let transcript = empty_transcript(&id);
165        Self {
166            id,
167            transcript,
168            subscribers: Vec::new(),
169            created_at: crate::orchestration::now_rfc3339(),
170            last_accessed: now,
171            parent_id: None,
172            child_ids: Vec::new(),
173            branched_at_event_index: None,
174            active_skills: Vec::new(),
175            tool_format: None,
176            system_prompt: None,
177            pinned_model: None,
178            pinned_reasoning_policy: None,
179            workspace_policy: WorkspacePolicy::default(),
180            workspace_anchor: None,
181            transcript_budget_policy: default_transcript_budget_policy(),
182            last_transcript_budget_action: None,
183        }
184    }
185}
186
187#[derive(Clone, Debug, PartialEq, Eq)]
188pub struct SessionAncestry {
189    pub parent_id: Option<String>,
190    pub child_ids: Vec<String>,
191    pub root_id: String,
192}
193
194#[derive(Clone, Debug, PartialEq, Eq)]
195pub struct SessionTruncateResult {
196    pub kept_turn_count: usize,
197    pub removed_turn_count: usize,
198    pub new_tip_turn_id: Option<String>,
199}
200
201#[derive(Clone, Debug, PartialEq, Eq)]
202pub struct ReminderInjectionReport {
203    pub reminder_id: String,
204    pub deduped_count: usize,
205}
206
207thread_local! {
208    static SESSIONS: RefCell<HashMap<String, SessionState>> = RefCell::new(HashMap::new());
209    static SESSION_CAP: Cell<usize> = const { Cell::new(DEFAULT_SESSION_CAP) };
210    static DEFAULT_TRANSCRIPT_BUDGET_POLICY: RefCell<SessionTranscriptBudgetPolicy> =
211        RefCell::new(SessionTranscriptBudgetPolicy::default());
212    static CURRENT_SESSION_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
213    static CURRENT_TOOL_CALL_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
214}
215
216tokio::task_local! {
217    static CURRENT_TOOL_CALL_TASK: String;
218}
219
220pub struct CurrentSessionGuard {
221    active: bool,
222}
223
224impl Drop for CurrentSessionGuard {
225    fn drop(&mut self) {
226        if self.active {
227            pop_current_session();
228        }
229    }
230}
231
232/// RAII guard that scopes the active tool-call id for the running thread.
233///
234/// Set on entry to a tool dispatch and dropped on exit, so any hostlib
235/// builtin invoked under it (e.g. `tools/write_file`) can resolve the
236/// owning tool call without threading the id through every parameter.
237pub struct CurrentToolCallGuard {
238    active: bool,
239}
240
241impl Drop for CurrentToolCallGuard {
242    fn drop(&mut self) {
243        if self.active {
244            pop_current_tool_call();
245        }
246    }
247}
248
249/// Set the per-thread session cap. Primarily for tests; production VMs
250/// inherit the default.
251pub fn set_session_cap(cap: usize) {
252    SESSION_CAP.with(|c| c.set(cap.max(1)));
253}
254
255pub fn session_cap() -> usize {
256    SESSION_CAP.with(|c| c.get())
257}
258
259pub fn set_default_transcript_budget_policy(policy: SessionTranscriptBudgetPolicy) {
260    DEFAULT_TRANSCRIPT_BUDGET_POLICY.with(|cell| {
261        *cell.borrow_mut() = policy.normalized();
262    });
263}
264
265pub fn reset_default_transcript_budget_policy() {
266    set_default_transcript_budget_policy(SessionTranscriptBudgetPolicy::default());
267}
268
269pub fn default_transcript_budget_policy() -> SessionTranscriptBudgetPolicy {
270    DEFAULT_TRANSCRIPT_BUDGET_POLICY.with(|cell| cell.borrow().clone())
271}
272
273pub fn transcript_budget_policy(id: &str) -> Option<SessionTranscriptBudgetPolicy> {
274    SESSIONS.with(|s| {
275        s.borrow()
276            .get(id)
277            .map(|state| state.transcript_budget_policy.clone())
278    })
279}
280
281pub fn set_transcript_budget_policy(
282    id: &str,
283    policy: SessionTranscriptBudgetPolicy,
284) -> Result<(), String> {
285    SESSIONS.with(|s| {
286        let mut map = s.borrow_mut();
287        let Some(state) = map.get_mut(id) else {
288            return Err(format!("agent session '{id}' does not exist"));
289        };
290        let previous = state.transcript_budget_policy.clone();
291        let previous_action = state.last_transcript_budget_action.clone();
292        state.transcript_budget_policy = policy.normalized();
293        let candidate = state.transcript.clone();
294        if let Err(error) = apply_transcript_with_budget(state, candidate, "policy_update") {
295            state.transcript_budget_policy = previous;
296            state.last_transcript_budget_action = previous_action;
297            return Err(error);
298        }
299        state.last_accessed = Instant::now();
300        Ok(())
301    })
302}
303
304/// Clear the session store. Wired into `reset_llm_state` for test isolation.
305pub fn reset_session_store() {
306    SESSIONS.with(|s| s.borrow_mut().clear());
307    CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().clear());
308    CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow_mut().clear());
309    reset_default_transcript_budget_policy();
310}
311
312pub(crate) fn push_current_session(id: String) {
313    if id.is_empty() {
314        return;
315    }
316    CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().push(id));
317}
318
319pub(crate) fn pop_current_session() {
320    CURRENT_SESSION_STACK.with(|stack| {
321        let _ = stack.borrow_mut().pop();
322    });
323}
324
325pub fn current_session_id() -> Option<String> {
326    CURRENT_SESSION_STACK.with(|stack| stack.borrow().last().cloned())
327}
328
329pub fn enter_current_session(id: impl Into<String>) -> CurrentSessionGuard {
330    let id = id.into();
331    if id.trim().is_empty() {
332        return CurrentSessionGuard { active: false };
333    }
334    push_current_session(id);
335    CurrentSessionGuard { active: true }
336}
337
338fn push_current_tool_call(id: String) {
339    if id.is_empty() {
340        return;
341    }
342    CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow_mut().push(id));
343}
344
345fn pop_current_tool_call() {
346    CURRENT_TOOL_CALL_STACK.with(|stack| {
347        let _ = stack.borrow_mut().pop();
348    });
349}
350
351/// Return the active tool-call id for the current thread, if any.
352///
353/// Hostlib builtins consult this to attribute side-effect snapshots to
354/// the owning ACP `toolCallId` without callers passing it explicitly.
355pub fn current_tool_call_id() -> Option<String> {
356    if let Ok(id) = CURRENT_TOOL_CALL_TASK.try_with(Clone::clone) {
357        if !id.trim().is_empty() {
358            return Some(id);
359        }
360    }
361    CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow().last().cloned())
362}
363
364/// Scope the active tool-call id to one async task.
365///
366/// Parallel tool dispatch runs sibling calls on the same OS thread, so
367/// thread-local guards alone cannot preserve attribution across `.await`
368/// points. Tokio task-local scoping follows the future instead.
369pub async fn scope_current_tool_call<F, T>(id: impl Into<String>, future: F) -> T
370where
371    F: Future<Output = T>,
372{
373    let id = id.into();
374    if id.trim().is_empty() {
375        future.await
376    } else {
377        CURRENT_TOOL_CALL_TASK.scope(id, future).await
378    }
379}
380
381/// Scope the active tool-call id for the duration of the returned guard.
382pub fn enter_current_tool_call(id: impl Into<String>) -> CurrentToolCallGuard {
383    let id = id.into();
384    if id.trim().is_empty() {
385        return CurrentToolCallGuard { active: false };
386    }
387    push_current_tool_call(id);
388    CurrentToolCallGuard { active: true }
389}
390
391pub fn exists(id: &str) -> bool {
392    SESSIONS.with(|s| s.borrow().contains_key(id))
393}
394
395pub fn length(id: &str) -> Option<usize> {
396    SESSIONS.with(|s| {
397        s.borrow().get(id).map(|state| {
398            state
399                .transcript
400                .as_dict()
401                .and_then(|d| d.get("messages"))
402                .and_then(|v| match v {
403                    VmValue::List(list) => Some(list.len()),
404                    _ => None,
405                })
406                .unwrap_or(0)
407        })
408    })
409}
410
411pub fn snapshot(id: &str) -> Option<VmValue> {
412    SESSIONS.with(|s| s.borrow().get(id).map(session_snapshot))
413}
414
415/// Session-only fields stay on `agent_session_snapshot`.
416pub fn transcript(id: &str) -> Option<VmValue> {
417    SESSIONS.with(|s| {
418        s.borrow()
419            .get(id)
420            .map(|state| transcript_with_session_metadata(state.transcript.clone(), state))
421    })
422}
423
424/// Open a session, or create it if missing. Returns the resolved id.
425///
426/// Newly-created sessions auto-register an event-log-backed sink when a
427/// generalized [`crate::event_log::EventLog`] has been installed for the
428/// current VM thread. For legacy env-driven workflows that still point
429/// `HARN_EVENT_LOG_DIR` at a directory, we preserve the older JSONL sink
430/// as a compatibility fallback. Re-opening an existing session does not
431/// re-register — sinks are per-session, owned by the first opener.
432pub fn open_or_create(id: Option<String>) -> String {
433    let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
434    let parent_session = current_session_id();
435    let mut was_new = false;
436    SESSIONS.with(|s| {
437        let mut map = s.borrow_mut();
438        if let Some(state) = map.get_mut(&resolved) {
439            state.last_accessed = Instant::now();
440            return;
441        }
442        was_new = true;
443        let cap = SESSION_CAP.with(|c| c.get());
444        if map.len() >= cap {
445            if let Some(victim) = map
446                .iter()
447                .min_by_key(|(_, state)| state.last_accessed)
448                .map(|(id, _)| id.clone())
449            {
450                map.remove(&victim);
451            }
452        }
453        map.insert(resolved.clone(), SessionState::new(resolved.clone()));
454    });
455    if was_new {
456        if let Some(parent) = parent_session.as_deref() {
457            crate::agent_events::mirror_session_sinks(parent, &resolved);
458        }
459        try_register_event_log(&resolved);
460    }
461    resolved
462}
463
464pub fn open_child_session(parent_id: &str, id: Option<String>) -> String {
465    let resolved = open_or_create(id);
466    link_child_session(parent_id, &resolved);
467    resolved
468}
469
470pub fn link_child_session(parent_id: &str, child_id: &str) {
471    link_child_session_with_branch(parent_id, child_id, None);
472}
473
474pub fn link_child_session_with_branch(
475    parent_id: &str,
476    child_id: &str,
477    branched_at_event_index: Option<usize>,
478) {
479    if parent_id == child_id {
480        return;
481    }
482    open_or_create(Some(parent_id.to_string()));
483    open_or_create(Some(child_id.to_string()));
484    SESSIONS.with(|s| {
485        let mut map = s.borrow_mut();
486        update_lineage(&mut map, parent_id, child_id, branched_at_event_index);
487    });
488}
489
490pub fn parent_id(id: &str) -> Option<String> {
491    SESSIONS.with(|s| s.borrow().get(id).and_then(|state| state.parent_id.clone()))
492}
493
494pub fn child_ids(id: &str) -> Vec<String> {
495    SESSIONS.with(|s| {
496        s.borrow()
497            .get(id)
498            .map(|state| state.child_ids.clone())
499            .unwrap_or_default()
500    })
501}
502
503pub fn ancestry(id: &str) -> Option<SessionAncestry> {
504    SESSIONS.with(|s| {
505        let map = s.borrow();
506        let state = map.get(id)?;
507        let mut root_id = state.id.clone();
508        let mut cursor = state.parent_id.clone();
509        let mut seen = HashSet::from([state.id.clone()]);
510        while let Some(parent_id) = cursor {
511            if !seen.insert(parent_id.clone()) {
512                break;
513            }
514            root_id = parent_id.clone();
515            cursor = map
516                .get(&parent_id)
517                .and_then(|parent| parent.parent_id.clone());
518        }
519        Some(SessionAncestry {
520            parent_id: state.parent_id.clone(),
521            child_ids: state.child_ids.clone(),
522            root_id,
523        })
524    })
525}
526
527/// Auto-register a persistent sink for a newly-created session.
528/// Silent no-op on failure — a broken observability sink must never
529/// prevent a session from starting.
530fn try_register_event_log(session_id: &str) {
531    if let Some(log) = crate::event_log::active_event_log() {
532        crate::agent_events::register_sink(
533            session_id,
534            crate::agent_events::EventLogSink::new(log, session_id),
535        );
536        return;
537    }
538    let Ok(dir) = std::env::var("HARN_EVENT_LOG_DIR") else {
539        return;
540    };
541    if dir.is_empty() {
542        return;
543    }
544    let path = std::path::PathBuf::from(dir).join(format!("event_log-{session_id}.jsonl"));
545    if let Ok(sink) = crate::agent_events::JsonlEventSink::open(&path) {
546        crate::agent_events::register_sink(session_id, sink);
547    }
548}
549
550pub fn register_event_log_sink(session_id: &str) {
551    try_register_event_log(session_id);
552}
553
554pub fn close(id: &str) {
555    SESSIONS.with(|s| {
556        s.borrow_mut().remove(id);
557    });
558    // Cross-thread per-session state must be released too, otherwise
559    // pending inbox entries can be delivered to a future session that
560    // happens to reuse the same id.
561    crate::orchestration::agent_inbox::clear_session(id);
562    crate::agent_events::clear_session_sinks(id);
563}
564
565pub fn close_with_status(
566    id: &str,
567    reason: impl Into<String>,
568    status: impl Into<String>,
569    metadata: serde_json::Value,
570) -> bool {
571    if !exists(id) {
572        return false;
573    }
574    let reason = reason.into();
575    let status = status.into();
576    let event_metadata = serde_json::json!({
577        "reason": reason,
578        "status": status,
579        "metadata": metadata,
580    });
581    let transcript_event = crate::llm::helpers::transcript_event(
582        "agent_session_closed",
583        "system",
584        "internal",
585        "Agent session closed",
586        Some(event_metadata),
587    );
588    let _ = append_event(id, transcript_event);
589    crate::llm::emit_live_agent_event_sync(&crate::agent_events::AgentEvent::SessionClosed {
590        session_id: id.to_string(),
591        reason,
592        status,
593        metadata,
594    });
595    close(id);
596    true
597}
598
599pub fn reset_transcript(id: &str) -> bool {
600    SESSIONS.with(|s| {
601        let mut map = s.borrow_mut();
602        let Some(state) = map.get_mut(id) else {
603            return false;
604        };
605        state.transcript = empty_transcript(id);
606        state.tool_format = None;
607        state.system_prompt = None;
608        state.last_transcript_budget_action = None;
609        state.last_accessed = Instant::now();
610        true
611    })
612}
613
614/// Copy `src`'s transcript into a new session id. Subscribers are NOT
615/// copied — a fork is a conversation branch, not an event fanout.
616///
617/// Touches `src`'s `last_accessed` before evicting, so the fork
618/// operation itself can't make `src` look stale and kick it out of
619/// the LRU just to make room for the new fork.
620pub fn fork(src_id: &str, dst_id: Option<String>) -> Option<String> {
621    let (
622        src_transcript,
623        src_tool_format,
624        src_system_prompt,
625        src_pinned_model,
626        src_pinned_reasoning_policy,
627        src_workspace_anchor,
628        src_workspace_policy,
629        src_transcript_budget_policy,
630        src_last_transcript_budget_action,
631        dst,
632    ) = SESSIONS.with(|s| {
633        let mut map = s.borrow_mut();
634        let src = map.get_mut(src_id)?;
635        src.last_accessed = Instant::now();
636        let dst = dst_id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
637        let forked_transcript = clone_transcript_with_id(&src.transcript, &dst);
638        Some((
639            forked_transcript,
640            src.tool_format.clone(),
641            src.system_prompt.clone(),
642            src.pinned_model.clone(),
643            src.pinned_reasoning_policy.clone(),
644            src.workspace_anchor.clone(),
645            src.workspace_policy.clone(),
646            src.transcript_budget_policy.clone(),
647            src.last_transcript_budget_action.clone(),
648            dst,
649        ))
650    })?;
651    // Ensure cap is respected when inserting the fork.
652    open_or_create(Some(dst.clone()));
653    SESSIONS.with(|s| {
654        let mut map = s.borrow_mut();
655        if let Some(state) = map.get_mut(&dst) {
656            state.transcript = src_transcript;
657            state.tool_format = src_tool_format;
658            state.system_prompt = src_system_prompt;
659            state.pinned_model = src_pinned_model;
660            state.pinned_reasoning_policy = src_pinned_reasoning_policy;
661            state.workspace_anchor = src_workspace_anchor;
662            state.workspace_policy = src_workspace_policy;
663            state.transcript_budget_policy = src_transcript_budget_policy;
664            state.last_transcript_budget_action = src_last_transcript_budget_action;
665            state.last_accessed = Instant::now();
666        }
667        update_lineage(&mut map, src_id, &dst, None);
668    });
669    let budget_ok = SESSIONS.with(|s| {
670        let mut map = s.borrow_mut();
671        let Some(state) = map.get_mut(&dst) else {
672            return false;
673        };
674        let candidate = state.transcript.clone();
675        apply_transcript_with_budget(state, candidate, "fork").is_ok()
676    });
677    if !budget_ok {
678        close(&dst);
679        return None;
680    }
681    // open_or_create evicts BEFORE inserting, so the dst slot is
682    // guaranteed once we get here. The existence check is cheap
683    // insurance against a future refactor that breaks that invariant.
684    if exists(&dst) {
685        Some(dst)
686    } else {
687        None
688    }
689}
690
691/// Fork `src_id` and truncate the destination transcript to the
692/// first `keep_first` messages (#105 — branch-replay). Pairs with the
693/// scrubber: the host picks an event index, rebuilds a message count,
694/// and calls this to spawn a live sibling session that resumes from
695/// the rebuilt state. Subscribers are not carried over (same as
696/// `fork`), so sibling events don't double-fan into the parent's
697/// consumers.
698///
699/// Returns the new session id on success, `None` if `src_id` doesn't
700/// exist.
701pub fn fork_at(src_id: &str, keep_first: usize, dst_id: Option<String>) -> Option<String> {
702    let branched_at_event_index = SESSIONS.with(|s| {
703        let map = s.borrow();
704        let src = map.get(src_id)?;
705        Some(branch_event_index(&src.transcript, keep_first))
706    })?;
707    let new_id = fork(src_id, dst_id)?;
708    link_child_session_with_branch(src_id, &new_id, Some(branched_at_event_index));
709    let _ = truncate(&new_id, keep_first);
710    Some(new_id)
711}
712
713/// Truncate the session transcript to the first `keep_first`
714/// messages (opposite of `trim`, which keeps the last N). Returns
715/// counts and the retained tip event id when the session exists.
716pub fn truncate(id: &str, keep_first: usize) -> Option<SessionTruncateResult> {
717    SESSIONS.with(|s| {
718        let mut map = s.borrow_mut();
719        let state = map.get_mut(id)?;
720        let result = truncate_state(state, keep_first)?;
721        Some(result)
722    })
723}
724
725fn truncate_state(state: &mut SessionState, keep_first: usize) -> Option<SessionTruncateResult> {
726    let dict = state
727        .transcript
728        .as_dict()
729        .cloned()
730        .unwrap_or_else(BTreeMap::new);
731    let messages: Vec<VmValue> = match dict.get("messages") {
732        Some(VmValue::List(list)) => list.iter().cloned().collect(),
733        _ => Vec::new(),
734    };
735    let existing_events = match dict.get("events") {
736        Some(VmValue::List(list)) => Some(list.iter().cloned().collect::<Vec<_>>()),
737        _ => None,
738    };
739    let kept_turn_count = keep_first.min(messages.len());
740    let removed_turn_count = messages.len().saturating_sub(kept_turn_count);
741    let mut new_tip_turn_id = existing_events
742        .as_ref()
743        .map(|events| turn_event_id_for_count(events, kept_turn_count))
744        .unwrap_or_else(|| {
745            let events = crate::llm::helpers::transcript_events_from_messages(&messages);
746            turn_event_id_for_count(&events, kept_turn_count)
747        });
748
749    if removed_turn_count > 0 {
750        let retained: Vec<VmValue> = messages.into_iter().take(kept_turn_count).collect();
751        let retained_events = match existing_events {
752            Some(events) => {
753                let keep_event_count = event_prefix_len_for_messages(&events, kept_turn_count);
754                events.into_iter().take(keep_event_count).collect()
755            }
756            None => crate::llm::helpers::transcript_events_from_messages(&retained),
757        };
758        new_tip_turn_id = turn_event_id_for_count(&retained_events, kept_turn_count);
759        let mut next = dict;
760        next.insert(
761            "events".to_string(),
762            VmValue::List(Rc::new(retained_events)),
763        );
764        next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
765        next.remove("summary");
766        apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), "truncate").ok()?;
767    }
768    state.last_accessed = Instant::now();
769    Some(SessionTruncateResult {
770        kept_turn_count,
771        removed_turn_count,
772        new_tip_turn_id,
773    })
774}
775
776/// Pop the trailing message iff it is an assistant message. Used by
777/// `agent_step_judge` to remove a vetoed assistant turn before
778/// regeneration (the "replace" on_veto path). Returns `true` if a
779/// message was popped, `false` if the transcript was empty, and an
780/// error if the trailing message was not an assistant turn —
781/// signalling a call-site discipline bug rather than a runtime error.
782pub fn pop_last_if_assistant(id: &str) -> Result<bool, String> {
783    SESSIONS.with(|s| {
784        let mut map = s.borrow_mut();
785        let Some(state) = map.get_mut(id) else {
786            return Err(format!(
787                "pop_last_if_assistant: unknown session id '{id}'"
788            ));
789        };
790        let messages: Vec<VmValue> = match state.transcript.as_dict() {
791            Some(dict) => match dict.get("messages") {
792                Some(VmValue::List(list)) => list.iter().cloned().collect(),
793                _ => Vec::new(),
794            },
795            None => Vec::new(),
796        };
797        if messages.is_empty() {
798            return Ok(false);
799        }
800        let trailing_role = messages
801            .last()
802            .and_then(|m| m.as_dict())
803            .and_then(|d| d.get("role"))
804            .map(|v| v.display())
805            .unwrap_or_default();
806        if trailing_role != "assistant" {
807            return Err(format!(
808                "pop_last_if_assistant: trailing message role is '{trailing_role}', expected 'assistant'"
809            ));
810        }
811        let keep = messages.len() - 1;
812        truncate_state(state, keep);
813        Ok(true)
814    })
815}
816
817pub fn trim(id: &str, keep_last: usize) -> Option<usize> {
818    SESSIONS.with(|s| {
819        let mut map = s.borrow_mut();
820        let state = map.get_mut(id)?;
821        let dict = state.transcript.as_dict()?.clone();
822        let messages: Vec<VmValue> = match dict.get("messages") {
823            Some(VmValue::List(list)) => list.iter().cloned().collect(),
824            _ => Vec::new(),
825        };
826        let start = messages.len().saturating_sub(keep_last);
827        let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
828        let kept = retained.len();
829        let mut next = dict;
830        next.insert(
831            "events".to_string(),
832            VmValue::List(Rc::new(
833                crate::llm::helpers::transcript_events_from_messages(&retained),
834            )),
835        );
836        next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
837        apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), "trim").ok()?;
838        state.last_accessed = Instant::now();
839        Some(kept)
840    })
841}
842
843/// Append a message dict to the session transcript. The message must
844/// have at least a string `role`; anything else is merged verbatim.
845pub fn inject_message(id: &str, message: VmValue) -> Result<(), String> {
846    let Some(msg_dict) = message.as_dict().cloned() else {
847        return Err("agent_session_inject: message must be a dict".into());
848    };
849    let role_ok = matches!(msg_dict.get("role"), Some(VmValue::String(_)));
850    if !role_ok {
851        return Err(
852            "agent_session_inject: message must have a string `role` (user|assistant|tool_result|system)"
853                .into(),
854        );
855    }
856    SESSIONS.with(|s| {
857        let mut map = s.borrow_mut();
858        let Some(state) = map.get_mut(id) else {
859            return Err(format!("agent_session_inject: unknown session id '{id}'"));
860        };
861        let dict = state
862            .transcript
863            .as_dict()
864            .cloned()
865            .unwrap_or_else(BTreeMap::new);
866        let mut messages: Vec<VmValue> = match dict.get("messages") {
867            Some(VmValue::List(list)) => list.iter().cloned().collect(),
868            _ => Vec::new(),
869        };
870        let mut events: Vec<VmValue> = match dict.get("events") {
871            Some(VmValue::List(list)) => list.iter().cloned().collect(),
872            _ => crate::llm::helpers::transcript_events_from_messages(&messages),
873        };
874        let new_message = VmValue::Dict(Rc::new(msg_dict));
875        let message_index = messages.len();
876        events.push(crate::llm::helpers::transcript_event_from_message(
877            &new_message,
878        ));
879        messages.push(new_message);
880        let mut next = dict;
881        next.insert("events".to_string(), VmValue::List(Rc::new(events)));
882        next.insert("messages".to_string(), VmValue::List(Rc::new(messages)));
883        let persisted_message = next
884            .get("messages")
885            .and_then(|value| match value {
886                VmValue::List(list) => list.get(message_index).cloned(),
887                _ => None,
888            })
889            .unwrap_or(VmValue::Nil);
890        apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), "inject_message")?;
891        emit_identified_user_message_event(id, &persisted_message);
892        emit_llm_message_event(id, message_index, &persisted_message);
893        state.last_accessed = Instant::now();
894        Ok(())
895    })
896}
897
898fn emit_identified_user_message_event(session_id: &str, message: &VmValue) {
899    let message_json = crate::llm::helpers::vm_value_to_json(message);
900    let role = message_json.get("role").and_then(|value| value.as_str());
901    if role != Some("user") {
902        return;
903    }
904    let Some(message_id) = message_json
905        .get("messageId")
906        .or_else(|| message_json.get("message_id"))
907        .and_then(|value| value.as_str())
908        .filter(|value| !value.trim().is_empty())
909    else {
910        return;
911    };
912    let content = message_json
913        .get("content")
914        .map(user_message_content_blocks)
915        .unwrap_or_default();
916    crate::agent_events::emit_event(&crate::agent_events::AgentEvent::UserMessage {
917        session_id: session_id.to_string(),
918        message_id: message_id.to_string(),
919        content,
920    });
921}
922
923fn user_message_content_blocks(content: &serde_json::Value) -> Vec<serde_json::Value> {
924    match content {
925        serde_json::Value::Array(items) => items.clone(),
926        serde_json::Value::String(text) => vec![serde_json::json!({
927            "type": "text",
928            "text": text,
929        })],
930        other => vec![serde_json::json!({
931            "type": "text",
932            "text": other.to_string(),
933        })],
934    }
935}
936
937#[derive(Clone, Debug, PartialEq, Eq)]
938struct TranscriptBudgetUsage {
939    message_count: usize,
940    event_count: usize,
941    approx_bytes: Option<usize>,
942}
943
944fn transcript_messages_from_dict(dict: &BTreeMap<String, VmValue>) -> Vec<VmValue> {
945    match dict.get("messages") {
946        Some(VmValue::List(list)) => list.iter().cloned().collect(),
947        _ => Vec::new(),
948    }
949}
950
951fn transcript_events_from_dict(dict: &BTreeMap<String, VmValue>) -> Vec<VmValue> {
952    match dict.get("events") {
953        Some(VmValue::List(list)) => list.iter().cloned().collect(),
954        _ => {
955            let messages = transcript_messages_from_dict(dict);
956            crate::llm::helpers::transcript_events_from_messages(&messages)
957        }
958    }
959}
960
961fn transcript_usage(transcript: &VmValue, include_bytes: bool) -> TranscriptBudgetUsage {
962    let Some(dict) = transcript.as_dict() else {
963        return TranscriptBudgetUsage {
964            message_count: 0,
965            event_count: 0,
966            approx_bytes: include_bytes.then_some(0),
967        };
968    };
969    let approx_bytes = if include_bytes {
970        serde_json::to_vec(&crate::llm::helpers::vm_value_to_json(transcript))
971            .map(|bytes| bytes.len())
972            .ok()
973            .or(Some(usize::MAX))
974    } else {
975        None
976    };
977    TranscriptBudgetUsage {
978        message_count: transcript_messages_from_dict(dict).len(),
979        event_count: transcript_events_from_dict(dict).len(),
980        approx_bytes,
981    }
982}
983
984fn transcript_budget_exceeded_reason(
985    usage: &TranscriptBudgetUsage,
986    policy: &SessionTranscriptBudgetPolicy,
987) -> Option<&'static str> {
988    if usage.message_count > policy.max_messages {
989        return Some("message_count");
990    }
991    if usage.event_count > policy.max_events {
992        return Some("event_count");
993    }
994    if let (Some(bytes), Some(limit)) = (usage.approx_bytes, policy.max_approx_bytes) {
995        if bytes > limit {
996            return Some("approx_bytes");
997        }
998    }
999    None
1000}
1001
1002fn transcript_budget_usage_json(usage: &TranscriptBudgetUsage) -> serde_json::Value {
1003    serde_json::json!({
1004        "messages": usage.message_count,
1005        "events": usage.event_count,
1006        "approx_bytes": usage.approx_bytes,
1007    })
1008}
1009
1010fn transcript_budget_policy_json(policy: &SessionTranscriptBudgetPolicy) -> serde_json::Value {
1011    let recovery = match &policy.recovery {
1012        TranscriptBudgetRecovery::Reject => serde_json::json!({"action": "reject"}),
1013        TranscriptBudgetRecovery::Trim { keep_last } => {
1014            serde_json::json!({"action": "trim", "keep_last": keep_last})
1015        }
1016        TranscriptBudgetRecovery::Compact { keep_last } => {
1017            serde_json::json!({"action": "compact", "keep_last": keep_last})
1018        }
1019    };
1020    serde_json::json!({
1021        "max_messages": policy.max_messages,
1022        "max_events": policy.max_events,
1023        "max_approx_bytes": policy.max_approx_bytes,
1024        "recovery": recovery,
1025    })
1026}
1027
1028fn transcript_budget_recovery_name(recovery: &TranscriptBudgetRecovery) -> &'static str {
1029    match recovery {
1030        TranscriptBudgetRecovery::Reject => "reject",
1031        TranscriptBudgetRecovery::Trim { .. } => "trim",
1032        TranscriptBudgetRecovery::Compact { .. } => "compact",
1033    }
1034}
1035
1036fn transcript_budget_error(
1037    state: &SessionState,
1038    policy: &SessionTranscriptBudgetPolicy,
1039    usage: &TranscriptBudgetUsage,
1040    reason: &str,
1041) -> String {
1042    let byte_suffix = match (usage.approx_bytes, policy.max_approx_bytes) {
1043        (Some(bytes), Some(limit)) => format!(", approx_bytes {bytes}/{limit}"),
1044        _ => String::new(),
1045    };
1046    format!(
1047        "transcript budget exceeded for session '{}': {reason} (messages {}/{}, events {}/{}{}; recovery={})",
1048        state.id,
1049        usage.message_count,
1050        policy.max_messages,
1051        usage.event_count,
1052        policy.max_events,
1053        byte_suffix,
1054        transcript_budget_recovery_name(&policy.recovery),
1055    )
1056}
1057
1058fn transcript_budget_audit_json(
1059    action: &str,
1060    source: &str,
1061    reason: &str,
1062    policy: &SessionTranscriptBudgetPolicy,
1063    usage_before: &TranscriptBudgetUsage,
1064    usage_attempted: &TranscriptBudgetUsage,
1065    usage_after: &TranscriptBudgetUsage,
1066) -> serde_json::Value {
1067    serde_json::json!({
1068        "action": action,
1069        "source": source,
1070        "reason": reason,
1071        "policy": transcript_budget_policy_json(policy),
1072        "usage_before": transcript_budget_usage_json(usage_before),
1073        "usage_attempted": transcript_budget_usage_json(usage_attempted),
1074        "usage_after": transcript_budget_usage_json(usage_after),
1075        "removed_messages": usage_attempted.message_count.saturating_sub(usage_after.message_count),
1076        "removed_events": usage_attempted.event_count.saturating_sub(usage_after.event_count),
1077    })
1078}
1079
1080fn transcript_budget_event(audit: &serde_json::Value) -> VmValue {
1081    let action = audit
1082        .get("action")
1083        .and_then(serde_json::Value::as_str)
1084        .unwrap_or("enforced");
1085    crate::llm::helpers::transcript_event(
1086        "transcript_budget",
1087        "system",
1088        "internal",
1089        &format!("Transcript budget {action}."),
1090        Some(audit.clone()),
1091    )
1092}
1093
1094fn append_event_to_transcript(transcript: VmValue, event: VmValue) -> VmValue {
1095    let Some(dict) = transcript.as_dict() else {
1096        return transcript;
1097    };
1098    let mut next = dict.clone();
1099    let mut events = transcript_events_from_dict(&next);
1100    events.push(event);
1101    next.insert("events".to_string(), VmValue::List(Rc::new(events)));
1102    VmValue::Dict(Rc::new(next))
1103}
1104
1105fn tail_message_capacity(
1106    policy: &SessionTranscriptBudgetPolicy,
1107    reserve_audit_event: bool,
1108) -> usize {
1109    let event_capacity = tail_event_capacity(policy, usize::from(reserve_audit_event));
1110    policy.max_messages.min(event_capacity)
1111}
1112
1113fn tail_event_capacity(policy: &SessionTranscriptBudgetPolicy, reserved_events: usize) -> usize {
1114    policy.max_events.saturating_sub(reserved_events)
1115}
1116
1117fn trim_transcript_for_budget(
1118    transcript: &VmValue,
1119    policy: &SessionTranscriptBudgetPolicy,
1120    keep_last: usize,
1121) -> VmValue {
1122    let dict = transcript.as_dict().cloned().unwrap_or_else(BTreeMap::new);
1123    let messages = transcript_messages_from_dict(&dict);
1124    let keep = keep_last.min(tail_message_capacity(policy, true));
1125    let start = messages.len().saturating_sub(keep);
1126    let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
1127    let mut next = dict;
1128    next.insert(
1129        "events".to_string(),
1130        VmValue::List(Rc::new(
1131            crate::llm::helpers::transcript_events_from_messages(&retained),
1132        )),
1133    );
1134    next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
1135    next.remove("summary");
1136    VmValue::Dict(Rc::new(next))
1137}
1138
1139struct BudgetCompactionLiveEvent {
1140    policy: crate::orchestration::CompactionPolicy,
1141    policy_strategy: String,
1142    metrics: crate::orchestration::TranscriptCompactedEventMetrics,
1143}
1144
1145struct BudgetCompactionResult {
1146    transcript: VmValue,
1147    live_event: Option<BudgetCompactionLiveEvent>,
1148}
1149
1150fn compact_transcript_for_budget(
1151    transcript: &VmValue,
1152    policy: &SessionTranscriptBudgetPolicy,
1153    keep_last: usize,
1154    session_id: &str,
1155) -> BudgetCompactionResult {
1156    let dict = transcript.as_dict().cloned().unwrap_or_else(BTreeMap::new);
1157    let messages = transcript_messages_from_dict(&dict);
1158    let message_capacity = policy.max_messages.min(tail_event_capacity(policy, 2));
1159    // Auto-compaction may widen a suffix to start on a clean user-turn boundary,
1160    // so reserve one extra slot beyond the summary when sizing for hard caps.
1161    let tail_keep = keep_last.min(message_capacity.saturating_sub(2));
1162    let mut config = crate::orchestration::AutoCompactConfig {
1163        token_threshold: 0,
1164        keep_last: tail_keep,
1165        compact_strategy: crate::orchestration::CompactStrategy::Llm,
1166        hard_limit_strategy: crate::orchestration::CompactStrategy::Truncate,
1167        fallback_strategy: Some(crate::orchestration::CompactStrategy::Truncate),
1168        policy_strategy: crate::orchestration::compact_strategy_name(
1169            &crate::orchestration::CompactStrategy::Llm,
1170        )
1171        .to_string(),
1172        ..Default::default()
1173    };
1174
1175    let mut json_messages = messages
1176        .iter()
1177        .map(crate::llm::helpers::vm_value_to_json)
1178        .collect::<Vec<_>>();
1179    let lifecycle =
1180        crate::orchestration::CompactLifecycle::new(crate::orchestration::CompactMode::Auto)
1181            .with_session_id(Some(session_id))
1182            .with_trigger(crate::orchestration::CompactionTrigger::BudgetPressure)
1183            .with_hook_dispatch(false)
1184            .with_evaluate_providers(false);
1185    let llm_opts = crate::llm::extract_llm_options(&[
1186        VmValue::String(Rc::from("")),
1187        VmValue::Nil,
1188        VmValue::Nil,
1189    ])
1190    .ok();
1191    let outcome = futures::executor::block_on(crate::orchestration::run_compaction_lifecycle(
1192        &mut json_messages,
1193        &mut config,
1194        llm_opts.as_ref(),
1195        lifecycle,
1196    ))
1197    .ok()
1198    .flatten();
1199
1200    let retained = json_messages
1201        .iter()
1202        .map(crate::stdlib::json_to_vm_value)
1203        .collect::<Vec<_>>();
1204    let mut events = crate::llm::helpers::transcript_events_from_messages(&retained);
1205    let summary = outcome.as_ref().map(|outcome| outcome.summary.clone());
1206    let mut live_event = None;
1207    if let Some(outcome) = outcome {
1208        events.push(crate::llm::helpers::transcript_event(
1209            "compaction",
1210            "system",
1211            "internal",
1212            "",
1213            Some(outcome.event_metadata.clone()),
1214        ));
1215        live_event = Some(BudgetCompactionLiveEvent {
1216            policy: config.policy.clone(),
1217            policy_strategy: outcome.policy_strategy,
1218            metrics: crate::orchestration::TranscriptCompactedEventMetrics {
1219                archived_messages: outcome.archived_messages,
1220                estimated_tokens_before: outcome.estimated_tokens_before,
1221                estimated_tokens_after: outcome.estimated_tokens_after,
1222                snapshot_asset_id: outcome.snapshot_asset_id,
1223            },
1224        });
1225    }
1226
1227    let mut next = dict;
1228    next.insert("events".to_string(), VmValue::List(Rc::new(events)));
1229    next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
1230    if let Some(summary) = summary {
1231        next.insert("summary".to_string(), VmValue::String(Rc::from(summary)));
1232    } else {
1233        next.remove("summary");
1234    }
1235    BudgetCompactionResult {
1236        transcript: VmValue::Dict(Rc::new(next)),
1237        live_event,
1238    }
1239}
1240
1241fn recovered_transcript_with_audit(
1242    recovered: VmValue,
1243    action: &str,
1244    source: &str,
1245    reason: &str,
1246    policy: &SessionTranscriptBudgetPolicy,
1247    usage_before: &TranscriptBudgetUsage,
1248    usage_attempted: &TranscriptBudgetUsage,
1249    include_bytes: bool,
1250) -> (VmValue, serde_json::Value, TranscriptBudgetUsage) {
1251    let usage_after_without_audit = transcript_usage(&recovered, include_bytes);
1252    let initial_audit = transcript_budget_audit_json(
1253        action,
1254        source,
1255        reason,
1256        policy,
1257        usage_before,
1258        usage_attempted,
1259        &usage_after_without_audit,
1260    );
1261    let with_initial_audit =
1262        append_event_to_transcript(recovered.clone(), transcript_budget_event(&initial_audit));
1263    let usage_after = transcript_usage(&with_initial_audit, include_bytes);
1264    let audit = transcript_budget_audit_json(
1265        action,
1266        source,
1267        reason,
1268        policy,
1269        usage_before,
1270        usage_attempted,
1271        &usage_after,
1272    );
1273    let with_audit = append_event_to_transcript(recovered, transcript_budget_event(&audit));
1274    let usage_after = transcript_usage(&with_audit, include_bytes);
1275    (with_audit, audit, usage_after)
1276}
1277
1278fn apply_transcript_with_budget(
1279    state: &mut SessionState,
1280    candidate: VmValue,
1281    source: &str,
1282) -> Result<(), String> {
1283    let policy = state.transcript_budget_policy.normalized();
1284    let include_bytes = policy.max_approx_bytes.is_some();
1285    let usage_before = transcript_usage(&state.transcript, include_bytes);
1286    let usage_attempted = transcript_usage(&candidate, include_bytes);
1287    let Some(reason) = transcript_budget_exceeded_reason(&usage_attempted, &policy) else {
1288        state.transcript = candidate;
1289        return Ok(());
1290    };
1291
1292    match policy.recovery.clone() {
1293        TranscriptBudgetRecovery::Reject => {
1294            let audit = transcript_budget_audit_json(
1295                "rejected",
1296                source,
1297                reason,
1298                &policy,
1299                &usage_before,
1300                &usage_attempted,
1301                &usage_before,
1302            );
1303            state.last_transcript_budget_action = Some(audit);
1304            Err(transcript_budget_error(
1305                state,
1306                &policy,
1307                &usage_attempted,
1308                reason,
1309            ))
1310        }
1311        TranscriptBudgetRecovery::Trim { keep_last } => {
1312            let recovered = trim_transcript_for_budget(&candidate, &policy, keep_last);
1313            let (with_audit, audit, usage_after) = recovered_transcript_with_audit(
1314                recovered,
1315                "trimmed",
1316                source,
1317                reason,
1318                &policy,
1319                &usage_before,
1320                &usage_attempted,
1321                include_bytes,
1322            );
1323            if transcript_budget_exceeded_reason(&usage_after, &policy).is_some() {
1324                let rejected = transcript_budget_audit_json(
1325                    "rejected",
1326                    source,
1327                    reason,
1328                    &policy,
1329                    &usage_before,
1330                    &usage_attempted,
1331                    &usage_after,
1332                );
1333                state.last_transcript_budget_action = Some(rejected);
1334                return Err(transcript_budget_error(
1335                    state,
1336                    &policy,
1337                    &usage_after,
1338                    reason,
1339                ));
1340            }
1341            state.last_transcript_budget_action = Some(audit);
1342            state.transcript = with_audit;
1343            Ok(())
1344        }
1345        TranscriptBudgetRecovery::Compact { keep_last } => {
1346            let compacted =
1347                compact_transcript_for_budget(&candidate, &policy, keep_last, &state.id);
1348            let (with_audit, audit, usage_after) = recovered_transcript_with_audit(
1349                compacted.transcript,
1350                "compacted",
1351                source,
1352                reason,
1353                &policy,
1354                &usage_before,
1355                &usage_attempted,
1356                include_bytes,
1357            );
1358            if transcript_budget_exceeded_reason(&usage_after, &policy).is_some() {
1359                let rejected = transcript_budget_audit_json(
1360                    "rejected",
1361                    source,
1362                    reason,
1363                    &policy,
1364                    &usage_before,
1365                    &usage_attempted,
1366                    &usage_after,
1367                );
1368                state.last_transcript_budget_action = Some(rejected);
1369                return Err(transcript_budget_error(
1370                    state,
1371                    &policy,
1372                    &usage_after,
1373                    reason,
1374                ));
1375            }
1376            state.last_transcript_budget_action = Some(audit);
1377            state.transcript = with_audit;
1378            if let Some(event) = compacted.live_event {
1379                crate::orchestration::emit_transcript_compacted_event_sync(
1380                    &state.id,
1381                    crate::orchestration::CompactMode::Auto,
1382                    crate::orchestration::CompactionTrigger::BudgetPressure
1383                        .as_str()
1384                        .to_string(),
1385                    &event.policy,
1386                    event.policy_strategy,
1387                    event.metrics,
1388                );
1389            }
1390            Ok(())
1391        }
1392    }
1393}
1394
1395fn emit_llm_message_event(session_id: &str, message_index: usize, message: &VmValue) {
1396    let mut fields = serde_json::Map::new();
1397    fields.insert(
1398        "session_id".to_string(),
1399        serde_json::Value::String(session_id.to_string()),
1400    );
1401    fields.insert(
1402        "message_index".to_string(),
1403        serde_json::json!(message_index),
1404    );
1405    let message_json = crate::llm::helpers::vm_value_to_json(message);
1406    if let Some(role) = message_json.get("role").and_then(|value| value.as_str()) {
1407        fields.insert(
1408            "role".to_string(),
1409            serde_json::Value::String(role.to_string()),
1410        );
1411    }
1412    if let Some(content) = message_json.get("content") {
1413        fields.insert("content".to_string(), content.clone());
1414    }
1415    fields.insert("message".to_string(), message_json);
1416    crate::llm::append_observability_sidecar_entry("message", fields);
1417}
1418
1419/// Create a new session from a reconstructed message list.
1420///
1421/// This is intentionally an all-at-once write instead of repeated
1422/// `inject_message` calls: importing a transcript should not re-emit
1423/// each historic turn into the active observability sidecar.
1424pub fn seed_from_messages(
1425    id: Option<String>,
1426    messages: &[serde_json::Value],
1427    metadata: serde_json::Value,
1428    system_prompt: Option<String>,
1429    tool_format: Option<String>,
1430) -> Result<String, String> {
1431    let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
1432    if exists(&resolved) {
1433        return Err(format!("agent session '{resolved}' already exists"));
1434    }
1435    open_or_create(Some(resolved.clone()));
1436    SESSIONS.with(|s| {
1437        let mut map = s.borrow_mut();
1438        let Some(state) = map.get_mut(&resolved) else {
1439            return Err(format!("failed to create agent session '{resolved}'"));
1440        };
1441        state.tool_format = tool_format.filter(|value| !value.trim().is_empty());
1442        state.system_prompt = system_prompt.filter(|value| !value.trim().is_empty());
1443
1444        let mut metadata = metadata
1445            .as_object()
1446            .cloned()
1447            .unwrap_or_else(serde_json::Map::new);
1448        if let Some(tool_format) = state.tool_format.as_ref() {
1449            metadata.insert(
1450                "tool_format".to_string(),
1451                serde_json::Value::String(tool_format.clone()),
1452            );
1453            metadata.insert(
1454                "tool_mode_locked".to_string(),
1455                serde_json::Value::Bool(true),
1456            );
1457        }
1458        if let Some(system_prompt) = state.system_prompt.as_ref() {
1459            metadata.insert(
1460                "system_prompt".to_string(),
1461                crate::llm::helpers::system_prompt_metadata(system_prompt),
1462            );
1463        }
1464        let vm_messages = crate::llm::helpers::json_messages_to_vm(messages);
1465        let candidate = crate::llm::helpers::new_transcript_with(
1466            Some(resolved.clone()),
1467            vm_messages,
1468            None,
1469            Some(crate::stdlib::json_to_vm_value(&serde_json::Value::Object(
1470                metadata,
1471            ))),
1472        );
1473        apply_transcript_with_budget(state, candidate, "seed_from_messages")?;
1474        state.last_accessed = Instant::now();
1475        Ok(resolved)
1476    })
1477}
1478
1479/// Load the messages vec (as JSON) for this session, for use as prefix
1480/// to an agent_loop run. Returns an empty vec if the session doesn't
1481/// exist or has no messages.
1482pub fn messages_json(id: &str) -> Vec<serde_json::Value> {
1483    SESSIONS.with(|s| {
1484        let map = s.borrow();
1485        let Some(state) = map.get(id) else {
1486            return Vec::new();
1487        };
1488        let Some(dict) = state.transcript.as_dict() else {
1489            return Vec::new();
1490        };
1491        match dict.get("messages") {
1492            Some(VmValue::List(list)) => list
1493                .iter()
1494                .map(crate::llm::helpers::vm_value_to_json)
1495                .collect(),
1496            _ => Vec::new(),
1497        }
1498    })
1499}
1500
1501#[derive(Clone, Debug, Default)]
1502pub struct SessionPromptState {
1503    pub messages: Vec<serde_json::Value>,
1504    pub summary: Option<String>,
1505}
1506
1507fn summary_message_json(summary: &str) -> serde_json::Value {
1508    serde_json::json!({
1509        "role": "user",
1510        "content": summary,
1511    })
1512}
1513
1514fn messages_begin_with_summary(messages: &[serde_json::Value], summary: &str) -> bool {
1515    messages.first().is_some_and(|message| {
1516        message.get("role").and_then(|value| value.as_str()) == Some("user")
1517            && message.get("content").and_then(|value| value.as_str()) == Some(summary)
1518    })
1519}
1520
1521/// Prompt-surface resume state for a persisted session.
1522///
1523/// Returns the compacted/rehydratable message list plus the transcript's
1524/// summary field. When the transcript carries a summary field but its
1525/// message list does not already begin with the compacted summary
1526/// message, this helper prepends one so session re-entry preserves the
1527/// same prompt surface the previous loop was actually using.
1528pub fn prompt_state_json(id: &str) -> SessionPromptState {
1529    SESSIONS.with(|s| {
1530        let map = s.borrow();
1531        let Some(state) = map.get(id) else {
1532            return SessionPromptState::default();
1533        };
1534        let Some(dict) = state.transcript.as_dict() else {
1535            return SessionPromptState::default();
1536        };
1537        let mut messages = match dict.get("messages") {
1538            Some(VmValue::List(list)) => list
1539                .iter()
1540                .map(crate::llm::helpers::vm_value_to_json)
1541                .collect::<Vec<_>>(),
1542            _ => Vec::new(),
1543        };
1544        let summary = dict.get("summary").and_then(|value| match value {
1545            VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
1546            _ => None,
1547        });
1548        if let Some(summary_text) = summary.as_deref() {
1549            if !messages_begin_with_summary(&messages, summary_text) {
1550                messages.insert(0, summary_message_json(summary_text));
1551            }
1552        }
1553        SessionPromptState { messages, summary }
1554    })
1555}
1556
1557/// Overwrite the transcript for this session. Used by `agent_loop` on
1558/// exit to persist the synthesized transcript.
1559pub fn store_transcript(id: &str, transcript: VmValue) -> Result<(), String> {
1560    SESSIONS.with(|s| {
1561        let mut map = s.borrow_mut();
1562        let Some(state) = map.get_mut(id) else {
1563            return Err(format!(
1564                "agent_session_store_transcript: unknown session id '{id}'"
1565            ));
1566        };
1567        let transcript = transcript_with_session_metadata(transcript, state);
1568        apply_transcript_with_budget(state, transcript, "store_transcript")?;
1569        state.last_accessed = Instant::now();
1570        Ok(())
1571    })
1572}
1573
1574/// Remove malformed reminder events after their drop audit has been emitted.
1575/// Pending-reminder rendering scans the transcript on every LLM call; pruning
1576/// invalid entries makes the drop event one-shot instead of noisy per turn.
1577pub fn prune_invalid_reminder_events(id: &str) -> usize {
1578    SESSIONS.with(|s| {
1579        let mut map = s.borrow_mut();
1580        let Some(state) = map.get_mut(id) else {
1581            return 0;
1582        };
1583        let Some(dict) = state.transcript.as_dict().cloned() else {
1584            return 0;
1585        };
1586        let Some(VmValue::List(events)) = dict.get("events") else {
1587            return 0;
1588        };
1589        let mut pruned = 0_usize;
1590        let mut kept = Vec::with_capacity(events.len());
1591        for event in events.iter().cloned() {
1592            let is_reminder = event
1593                .as_dict()
1594                .and_then(|event| event.get("kind"))
1595                .map(VmValue::display)
1596                .as_deref()
1597                == Some(crate::llm::helpers::SYSTEM_REMINDER_EVENT_KIND);
1598            if !is_reminder {
1599                kept.push(event);
1600                continue;
1601            }
1602            let valid = crate::llm::helpers::reminder_from_event(&event)
1603                .is_some_and(|reminder| !reminder.body.trim().is_empty());
1604            if valid {
1605                kept.push(event);
1606            } else {
1607                pruned += 1;
1608            }
1609        }
1610        if pruned > 0 {
1611            let mut next = dict;
1612            next.insert("events".to_string(), VmValue::List(Rc::new(kept)));
1613            let _ = apply_transcript_with_budget(
1614                state,
1615                VmValue::Dict(Rc::new(next)),
1616                "prune_invalid_reminder_events",
1617            );
1618            state.last_accessed = Instant::now();
1619        }
1620        pruned
1621    })
1622}
1623
1624/// Apply the reminder TTL lifecycle that runs once per completed agent
1625/// turn. Reminders with `ttl_turns = 1` expire and are removed; larger
1626/// finite TTLs are decremented in place. Expiry audit events are emitted
1627/// to the active EventLog when one is installed.
1628pub fn apply_reminder_post_turn(id: &str, turn: i64) -> Result<serde_json::Value, String> {
1629    let report = SESSIONS.with(|s| {
1630        let mut map = s.borrow_mut();
1631        let Some(state) = map.get_mut(id) else {
1632            return Err(format!(
1633                "agent_session_apply_reminder_post_turn: unknown session id '{id}'"
1634            ));
1635        };
1636        let report = crate::llm::helpers::apply_reminder_post_turn(&state.transcript, turn);
1637        if report.decremented_count > 0 || !report.expired.is_empty() {
1638            if let Some(next) = report.transcript.clone() {
1639                apply_transcript_with_budget(state, next, "apply_reminder_post_turn")?;
1640            }
1641            state.last_accessed = Instant::now();
1642        }
1643        Ok(report)
1644    })?;
1645
1646    for reminder in &report.expired {
1647        let mut payload = crate::llm::helpers::reminder_lifecycle_payload(Some(id), reminder);
1648        if let Some(obj) = payload.as_object_mut() {
1649            obj.insert(
1650                "transcript_id".to_string(),
1651                serde_json::Value::String(id.to_string()),
1652            );
1653            obj.insert(
1654                "reason".to_string(),
1655                serde_json::Value::String("ttl".to_string()),
1656            );
1657            obj.insert(
1658                "ttl_turns_before".to_string(),
1659                serde_json::json!(&reminder.ttl_turns),
1660            );
1661            obj.insert("expired_at_turn".to_string(), serde_json::json!(turn));
1662        }
1663        crate::llm::helpers::emit_reminder_lifecycle_event(
1664            crate::llm::helpers::REMINDER_EXPIRED_EVENT_KIND,
1665            payload,
1666        );
1667    }
1668
1669    Ok(serde_json::json!({
1670        "expired_count": report.expired.len(),
1671        "decremented_count": report.decremented_count,
1672        "remaining_count": report.remaining_count,
1673    }))
1674}
1675
1676/// Inject a typed system reminder into the session transcript's event
1677/// stream. This mirrors `transcript.inject_reminder` for live sessions:
1678/// reminders with the same `dedupe_key` are replaced before the new
1679/// reminder event is appended.
1680pub fn inject_reminder(
1681    id: &str,
1682    reminder: crate::llm::helpers::SystemReminder,
1683) -> Result<ReminderInjectionReport, String> {
1684    let reminder_id = reminder.id.clone();
1685    let dedupe_key = reminder.dedupe_key.clone();
1686    let mut deduped_reminder_ids = Vec::new();
1687    SESSIONS.with(|s| {
1688        let mut map = s.borrow_mut();
1689        let Some(state) = map.get_mut(id) else {
1690            return Err(format!(
1691                "agent_session_inject_reminder: unknown session id '{id}'"
1692            ));
1693        };
1694        let dict = state
1695            .transcript
1696            .as_dict()
1697            .cloned()
1698            .unwrap_or_else(BTreeMap::new);
1699        let mut events: Vec<VmValue> = match dict.get("events") {
1700            Some(VmValue::List(list)) => list.iter().cloned().collect(),
1701            _ => dict
1702                .get("messages")
1703                .and_then(|value| match value {
1704                    VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
1705                    _ => None,
1706                })
1707                .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
1708                .unwrap_or_default(),
1709        };
1710        if let Some(expected_key) = dedupe_key.as_deref() {
1711            events.retain(|event| {
1712                let Some(existing) = crate::llm::helpers::reminder_from_event(event) else {
1713                    return true;
1714                };
1715                if existing.dedupe_key.as_deref() == Some(expected_key) {
1716                    deduped_reminder_ids.push(existing.id);
1717                    false
1718                } else {
1719                    true
1720                }
1721            });
1722        }
1723        events.push(crate::llm::helpers::transcript_reminder_event(&reminder));
1724        let mut next = dict;
1725        next.insert("events".to_string(), VmValue::List(Rc::new(events)));
1726        apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), "inject_reminder")?;
1727        state.last_accessed = Instant::now();
1728        Ok(())
1729    })?;
1730
1731    if !deduped_reminder_ids.is_empty() {
1732        let dropped_count = deduped_reminder_ids.len();
1733        crate::llm::helpers::emit_reminder_lifecycle_event(
1734            crate::llm::helpers::REMINDER_DEDUPED_EVENT_KIND,
1735            serde_json::json!({
1736                "session_id": id,
1737                "transcript_id": id,
1738                "reminder_id": &reminder_id,
1739                "replacing_id": &reminder_id,
1740                "replaced_id": deduped_reminder_ids.first(),
1741                "replaced_ids": &deduped_reminder_ids,
1742                "dedupe_key": &dedupe_key,
1743                "dropped_reminder_ids": &deduped_reminder_ids,
1744                "dropped_count": dropped_count,
1745            }),
1746        );
1747    }
1748
1749    crate::llm::helpers::emit_reminder_lifecycle_event(
1750        crate::llm::helpers::REMINDER_INJECTED_EVENT_KIND,
1751        crate::llm::helpers::reminder_lifecycle_payload(Some(id), &reminder),
1752    );
1753
1754    Ok(ReminderInjectionReport {
1755        reminder_id,
1756        deduped_count: deduped_reminder_ids.len(),
1757    })
1758}
1759
1760/// Append a transcript event to the session without mutating its
1761/// message list. Used for orchestration-side lineage events (sub-agent
1762/// spawn/completion, workflow hooks, etc.) that should survive
1763/// persistence/replay without being replayed back into the model as
1764/// conversational messages.
1765pub fn append_event(id: &str, event: VmValue) -> Result<(), String> {
1766    let Some(event_dict) = event.as_dict() else {
1767        return Err("agent_session_append_event: event must be a dict".into());
1768    };
1769    let kind_ok = matches!(event_dict.get("kind"), Some(VmValue::String(_)));
1770    if !kind_ok {
1771        return Err("agent_session_append_event: event must have a string `kind`".into());
1772    }
1773    SESSIONS.with(|s| {
1774        let mut map = s.borrow_mut();
1775        let Some(state) = map.get_mut(id) else {
1776            return Err(format!(
1777                "agent_session_append_event: unknown session id '{id}'"
1778            ));
1779        };
1780        append_event_to_state(state, event, "append_event")?;
1781        state.last_accessed = Instant::now();
1782        Ok(())
1783    })
1784}
1785
1786fn append_event_to_state(
1787    state: &mut SessionState,
1788    event: VmValue,
1789    action: &str,
1790) -> Result<(), String> {
1791    let dict = state
1792        .transcript
1793        .as_dict()
1794        .cloned()
1795        .unwrap_or_else(BTreeMap::new);
1796    let mut events: Vec<VmValue> = match dict.get("events") {
1797        Some(VmValue::List(list)) => list.iter().cloned().collect(),
1798        _ => dict
1799            .get("messages")
1800            .and_then(|value| match value {
1801                VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
1802                _ => None,
1803            })
1804            .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
1805            .unwrap_or_default(),
1806    };
1807    events.push(event);
1808    let mut next = dict;
1809    next.insert("events".to_string(), VmValue::List(Rc::new(events)));
1810    apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), action)
1811}
1812
1813/// Replace the transcript's message list wholesale. Used by the
1814/// in-loop compaction path, which operates on JSON messages.
1815pub fn replace_messages(id: &str, messages: &[serde_json::Value]) -> Result<(), String> {
1816    replace_messages_with_summary(id, messages, None)
1817}
1818
1819/// Replace the transcript's message list and optionally update the
1820/// `summary` field on the persisted transcript. The compaction path
1821/// uses this to publish the human-readable rollup line that
1822/// `transcript_summary(transcript)` exposes to host code.
1823pub fn replace_messages_with_summary(
1824    id: &str,
1825    messages: &[serde_json::Value],
1826    summary: Option<&str>,
1827) -> Result<(), String> {
1828    SESSIONS.with(|s| {
1829        let mut map = s.borrow_mut();
1830        let Some(state) = map.get_mut(id) else {
1831            return Err(format!(
1832                "agent_session_replace_messages: unknown session id '{id}'"
1833            ));
1834        };
1835        let dict = state
1836            .transcript
1837            .as_dict()
1838            .cloned()
1839            .unwrap_or_else(BTreeMap::new);
1840        let vm_messages: Vec<VmValue> = messages
1841            .iter()
1842            .map(crate::stdlib::json_to_vm_value)
1843            .collect();
1844        let mut next = dict;
1845        next.insert(
1846            "events".to_string(),
1847            VmValue::List(Rc::new(
1848                crate::llm::helpers::transcript_events_from_messages(&vm_messages),
1849            )),
1850        );
1851        next.insert("messages".to_string(), VmValue::List(Rc::new(vm_messages)));
1852        if let Some(summary) = summary {
1853            next.insert(
1854                "summary".to_string(),
1855                VmValue::String(Rc::from(summary.to_string())),
1856            );
1857        } else {
1858            next.remove("summary");
1859        }
1860        apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), "replace_messages")?;
1861        state.last_accessed = Instant::now();
1862        Ok(())
1863    })
1864}
1865
1866pub fn append_subscriber(id: &str, callback: VmValue) {
1867    open_or_create(Some(id.to_string()));
1868    SESSIONS.with(|s| {
1869        if let Some(state) = s.borrow_mut().get_mut(id) {
1870            state.subscribers.push(callback);
1871            state.last_accessed = Instant::now();
1872        }
1873    });
1874}
1875
1876pub fn subscribers_for(id: &str) -> Vec<VmValue> {
1877    SESSIONS.with(|s| {
1878        s.borrow()
1879            .get(id)
1880            .map(|state| state.subscribers.clone())
1881            .unwrap_or_default()
1882    })
1883}
1884
1885pub fn subscriber_count(id: &str) -> usize {
1886    SESSIONS.with(|s| {
1887        s.borrow()
1888            .get(id)
1889            .map(|state| state.subscribers.len())
1890            .unwrap_or(0)
1891    })
1892}
1893
1894/// Persist the set of active skill names for session resume. Called at
1895/// the end of an agent_loop run; the next `open_or_create` for this id
1896/// reads them back via [`active_skills`].
1897pub fn set_active_skills(id: &str, skills: Vec<String>) {
1898    SESSIONS.with(|s| {
1899        if let Some(state) = s.borrow_mut().get_mut(id) {
1900            state.active_skills = skills;
1901            state.last_accessed = Instant::now();
1902        }
1903    });
1904}
1905
1906/// Skills that were active at the end of the previous agent_loop run
1907/// against this session. Returns an empty vec when the session doesn't
1908/// exist or nothing was persisted.
1909pub fn active_skills(id: &str) -> Vec<String> {
1910    SESSIONS.with(|s| {
1911        s.borrow()
1912            .get(id)
1913            .map(|state| state.active_skills.clone())
1914            .unwrap_or_default()
1915    })
1916}
1917
1918/// Claim the tool-calling contract for a session.
1919///
1920/// The first loop against a named session records its `tool_format`.
1921/// Later re-entry must use the same format so prompt/history generated
1922/// under a text contract is never replayed as native, or vice versa.
1923pub fn claim_tool_format(id: &str, tool_format: &str) -> Result<(), String> {
1924    let tool_format = tool_format.trim();
1925    if tool_format.is_empty() {
1926        return Ok(());
1927    }
1928    SESSIONS.with(|s| {
1929        let mut map = s.borrow_mut();
1930        let Some(state) = map.get_mut(id) else {
1931            return Err(format!("agent session '{id}' does not exist"));
1932        };
1933        match state.tool_format.as_deref() {
1934            Some(existing) if existing != tool_format => Err(format!(
1935                "agent session '{id}' is locked to tool_format='{existing}', but this run requested tool_format='{tool_format}'. Start a new session or fork/reset the transcript before changing tool mode."
1936            )),
1937            Some(_) => {
1938                state.last_accessed = Instant::now();
1939                Ok(())
1940            }
1941            None => {
1942                state.tool_format = Some(tool_format.to_string());
1943                state.last_accessed = Instant::now();
1944                Ok(())
1945            }
1946        }
1947    })
1948}
1949
1950pub fn tool_format(id: &str) -> Option<String> {
1951    SESSIONS.with(|s| {
1952        s.borrow()
1953            .get(id)
1954            .and_then(|state| state.tool_format.clone())
1955    })
1956}
1957
1958pub fn record_system_prompt(id: &str, system_prompt: &str) -> Result<(), String> {
1959    let system_prompt = system_prompt.trim();
1960    if system_prompt.is_empty() {
1961        return Ok(());
1962    }
1963    assert_cache_stable_system_prompt(system_prompt);
1964    SESSIONS.with(|s| {
1965        let mut map = s.borrow_mut();
1966        let Some(state) = map.get_mut(id) else {
1967            return Err(format!("agent session '{id}' does not exist"));
1968        };
1969        let changed = state.system_prompt.as_deref() != Some(system_prompt);
1970        state.system_prompt = Some(system_prompt.to_string());
1971        let dict = state
1972            .transcript
1973            .as_dict()
1974            .cloned()
1975            .unwrap_or_else(BTreeMap::new);
1976        let mut next = dict;
1977        apply_system_prompt_metadata(&mut next, system_prompt);
1978        if changed {
1979            let mut events: Vec<VmValue> = match next.get("events") {
1980                Some(VmValue::List(list)) => list.iter().cloned().collect(),
1981                _ => Vec::new(),
1982            };
1983            events.push(crate::llm::helpers::transcript_event(
1984                "system_prompt",
1985                "system",
1986                "internal",
1987                "",
1988                Some(crate::llm::helpers::system_prompt_event_metadata(
1989                    system_prompt,
1990                )),
1991            ));
1992            next.insert("events".to_string(), VmValue::List(Rc::new(events)));
1993        }
1994        apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), "record_system_prompt")?;
1995        state.last_accessed = Instant::now();
1996        Ok(())
1997    })
1998}
1999
2000pub fn system_prompt(id: &str) -> Option<String> {
2001    SESSIONS.with(|s| {
2002        s.borrow()
2003            .get(id)
2004            .and_then(|state| state.system_prompt.clone())
2005    })
2006}
2007
2008#[cfg(debug_assertions)]
2009fn forbidden_workspace_prompt_token(system_prompt: &str) -> Option<&'static str> {
2010    let mut remaining = system_prompt;
2011    while let Some(index) = remaining.find("{{") {
2012        let candidate = remaining[index + 2..].trim_start();
2013        if candidate.starts_with("workspace_") {
2014            return Some("workspace_");
2015        }
2016        if candidate.starts_with("project_") {
2017            return Some("project_");
2018        }
2019        remaining = candidate;
2020    }
2021    None
2022}
2023
2024#[cfg(debug_assertions)]
2025fn assert_cache_stable_system_prompt(system_prompt: &str) {
2026    if let Some(prefix) = forbidden_workspace_prompt_token(system_prompt) {
2027        panic!(
2028            "{CACHE_STABLE_SYSTEM_PROMPT_DIAGNOSTIC}: session system prompts must not interpolate `{{{{{prefix}...` tokens; move workspace/project context into the workspace-anchor reminder"
2029        );
2030    }
2031}
2032
2033#[cfg(not(debug_assertions))]
2034fn assert_cache_stable_system_prompt(_system_prompt: &str) {}
2035
2036/// Pin (or clear, with `None`) a model selector on a session. Returns
2037/// `Ok(true)` when the value actually changed so callers can decide
2038/// whether to broadcast a notification. The selector is stored verbatim
2039/// — alias / catalog resolution is the call-site's job.
2040pub fn set_pinned_model(id: &str, model: Option<String>) -> Result<bool, String> {
2041    let normalized = model
2042        .map(|value| value.trim().to_string())
2043        .filter(|value| !value.is_empty());
2044    SESSIONS.with(|s| {
2045        let mut map = s.borrow_mut();
2046        let Some(state) = map.get_mut(id) else {
2047            return Err(format!("agent session '{id}' does not exist"));
2048        };
2049        let changed = state.pinned_model != normalized;
2050        state.pinned_model = normalized;
2051        state.last_accessed = Instant::now();
2052        Ok(changed)
2053    })
2054}
2055
2056/// Read the session's pinned model selector, if any. Consumed by
2057/// `vm_resolve_model` as the per-session default when a script-level
2058/// `llm_call` does not pass `model:` explicitly.
2059pub fn pinned_model(id: &str) -> Option<String> {
2060    SESSIONS.with(|s| {
2061        s.borrow()
2062            .get(id)
2063            .and_then(|state| state.pinned_model.clone())
2064    })
2065}
2066
2067/// Pin (or clear) the session-level provider-aware reasoning policy.
2068pub fn set_pinned_reasoning_policy(id: &str, policy: Option<String>) -> Result<bool, String> {
2069    let normalized = match policy {
2070        Some(value) => crate::llm::reasoning_policy::normalize_policy_selector(&value)?,
2071        None => None,
2072    };
2073    SESSIONS.with(|s| {
2074        let mut map = s.borrow_mut();
2075        let Some(state) = map.get_mut(id) else {
2076            return Err(format!("agent session '{id}' does not exist"));
2077        };
2078        let changed = state.pinned_reasoning_policy != normalized;
2079        state.pinned_reasoning_policy = normalized;
2080        state.last_accessed = Instant::now();
2081        Ok(changed)
2082    })
2083}
2084
2085/// Read the session's pinned reasoning policy, if any.
2086pub fn pinned_reasoning_policy(id: &str) -> Option<String> {
2087    SESSIONS.with(|s| {
2088        s.borrow()
2089            .get(id)
2090            .and_then(|state| state.pinned_reasoning_policy.clone())
2091    })
2092}
2093
2094/// Set (or clear, with `None`) the typed workspace anchor on a session.
2095/// Returns `Ok(true)` when the value actually changed so callers can
2096/// decide whether to broadcast `AnchorChanged` notifications.
2097pub fn set_workspace_anchor(id: &str, anchor: Option<WorkspaceAnchor>) -> Result<bool, String> {
2098    SESSIONS.with(|s| {
2099        let mut map = s.borrow_mut();
2100        let Some(state) = map.get_mut(id) else {
2101            return Err(format!("agent session '{id}' does not exist"));
2102        };
2103        let changed = state.workspace_anchor != anchor;
2104        state.workspace_anchor = anchor;
2105        if changed {
2106            crate::llm::permissions::clear_session_grants(id);
2107        }
2108        state.last_accessed = Instant::now();
2109        Ok(changed)
2110    })
2111}
2112
2113/// Read the session's typed workspace anchor, if any.
2114pub fn workspace_anchor(id: &str) -> Option<WorkspaceAnchor> {
2115    SESSIONS.with(|s| {
2116        s.borrow()
2117            .get(id)
2118            .and_then(|state| state.workspace_anchor.clone())
2119    })
2120}
2121
2122/// Outcome of `reanchor_session`: previous + new anchor and whether the
2123/// swap actually moved anything. Callers use `changed` to suppress
2124/// no-op transcript / live events.
2125#[derive(Clone, Debug, PartialEq, Eq)]
2126pub struct ReanchorOutcome {
2127    pub previous: Option<WorkspaceAnchor>,
2128    pub current: WorkspaceAnchor,
2129    pub changed: bool,
2130}
2131
2132/// Atomically swap the session's primary anchor + emit the canonical
2133/// `AnchorChanged` transcript event and live `AgentEvent::AnchorChanged`
2134/// notification (#2218). Clears session-scoped permission grants so
2135/// stale anchor-based decisions don't leak into the next turn.
2136pub fn reanchor_session(
2137    id: &str,
2138    new_anchor: WorkspaceAnchor,
2139    carry_transcript: bool,
2140    compacted: bool,
2141    reason: Option<String>,
2142) -> Result<ReanchorOutcome, String> {
2143    let outcome = SESSIONS.with(|s| {
2144        let mut map = s.borrow_mut();
2145        let Some(state) = map.get_mut(id) else {
2146            return Err(format!("agent session '{id}' does not exist"));
2147        };
2148        let previous = state.workspace_anchor.clone();
2149        let changed = previous.as_ref() != Some(&new_anchor);
2150        state.workspace_anchor = Some(new_anchor.clone());
2151        if changed {
2152            crate::llm::permissions::clear_session_grants(id);
2153        }
2154        state.last_accessed = Instant::now();
2155        Ok(ReanchorOutcome {
2156            previous,
2157            current: new_anchor,
2158            changed,
2159        })
2160    })?;
2161    if !outcome.changed {
2162        return Ok(outcome);
2163    }
2164    let previous_json = outcome.previous.as_ref().map(WorkspaceAnchor::to_json);
2165    let current_json = outcome.current.to_json();
2166    let event_metadata = serde_json::json!({
2167        "previous": previous_json,
2168        "current": current_json,
2169        "carry_transcript": carry_transcript,
2170        "compacted": compacted,
2171        "reason": reason,
2172    });
2173    let event = crate::llm::helpers::transcript_event(
2174        "AnchorChanged",
2175        "system",
2176        "internal",
2177        "",
2178        Some(event_metadata),
2179    );
2180    let _ = append_event(id, event);
2181    crate::llm::emit_live_agent_event_sync(&crate::agent_events::AgentEvent::AnchorChanged {
2182        session_id: id.to_string(),
2183        previous: previous_json,
2184        current: current_json,
2185        carry_transcript,
2186        compacted,
2187        reason,
2188    });
2189    Ok(outcome)
2190}
2191
2192/// Set session-local workspace defaults. Returns `Ok(true)` when the
2193/// policy changed.
2194pub fn set_workspace_policy(id: &str, policy: WorkspacePolicy) -> Result<bool, String> {
2195    SESSIONS.with(|s| {
2196        let mut map = s.borrow_mut();
2197        let Some(state) = map.get_mut(id) else {
2198            return Err(format!("agent session '{id}' does not exist"));
2199        };
2200        let changed = state.workspace_policy != policy;
2201        state.workspace_policy = policy;
2202        state.last_accessed = Instant::now();
2203        Ok(changed)
2204    })
2205}
2206
2207/// Read the session's workspace policy, if the session exists.
2208pub fn workspace_policy(id: &str) -> Option<WorkspacePolicy> {
2209    SESSIONS.with(|s| {
2210        s.borrow()
2211            .get(id)
2212            .map(|state| state.workspace_policy.clone())
2213    })
2214}
2215
2216/// Validate and mount an additional workspace root on an anchored
2217/// session. When the path is already mounted, updates its mount mode
2218/// in place and refreshes its `mounted_at` timestamp.
2219pub fn add_workspace_root(
2220    id: &str,
2221    root: &str,
2222    mount_mode: Option<MountMode>,
2223    reason: Option<String>,
2224) -> Result<String, String> {
2225    let normalized_root = validate_workspace_root_path(root)?;
2226    let mounted_at = crate::orchestration::now_rfc3339();
2227    SESSIONS.with(|s| {
2228        let mut map = s.borrow_mut();
2229        let Some(state) = map.get_mut(id) else {
2230            return Err(format!("agent session '{id}' does not exist"));
2231        };
2232        let default_mount_mode = state.workspace_policy.default_mount_mode;
2233        let Some(anchor) = state.workspace_anchor.as_mut() else {
2234            return Err(format!("agent session '{id}' has no workspace anchor"));
2235        };
2236        let resolved_mount_mode = mount_mode.unwrap_or(default_mount_mode);
2237        if let Some(existing) = anchor
2238            .additional_roots
2239            .iter_mut()
2240            .find(|entry| entry.path == normalized_root)
2241        {
2242            existing.mount_mode = resolved_mount_mode;
2243            existing.mounted_at = mounted_at.clone();
2244        } else {
2245            anchor.additional_roots.push(MountedRoot {
2246                path: normalized_root.clone(),
2247                mount_mode: resolved_mount_mode,
2248                mounted_at: mounted_at.clone(),
2249            });
2250        }
2251        let event = crate::llm::helpers::transcript_event(
2252            "RootMounted",
2253            "system",
2254            "internal",
2255            "",
2256            Some(serde_json::json!({
2257                "path": normalized_root.to_string_lossy(),
2258                "mount_mode": resolved_mount_mode.as_str(),
2259                "mounted_at": mounted_at.clone(),
2260                "reason": reason,
2261            })),
2262        );
2263        append_event_to_state(state, event, "add_workspace_root")?;
2264        crate::llm::permissions::clear_session_grants(id);
2265        state.last_accessed = Instant::now();
2266        Ok(mounted_at.clone())
2267    })
2268}
2269
2270/// Remove one mounted root from an anchored session. Returns whether an
2271/// existing mount entry was deleted. Removing an absent root is a no-op.
2272pub fn remove_workspace_root(id: &str, root: &str) -> Result<bool, String> {
2273    let normalized_root = normalize_workspace_root_path(root);
2274    SESSIONS.with(|s| {
2275        let mut map = s.borrow_mut();
2276        let Some(state) = map.get_mut(id) else {
2277            return Err(format!("agent session '{id}' does not exist"));
2278        };
2279        let Some(anchor) = state.workspace_anchor.as_mut() else {
2280            return Err(format!("agent session '{id}' has no workspace anchor"));
2281        };
2282        let before = anchor.additional_roots.len();
2283        anchor
2284            .additional_roots
2285            .retain(|entry| entry.path != normalized_root);
2286        let removed = anchor.additional_roots.len() != before;
2287        if removed {
2288            crate::llm::permissions::clear_session_grants(id);
2289        }
2290        state.last_accessed = Instant::now();
2291        Ok(removed)
2292    })
2293}
2294
2295pub fn list_workspace_roots(id: &str) -> Result<(PathBuf, Vec<MountedRoot>), String> {
2296    SESSIONS.with(|s| {
2297        let map = s.borrow();
2298        let Some(state) = map.get(id) else {
2299            return Err(format!("agent session '{id}' does not exist"));
2300        };
2301        let Some(anchor) = state.workspace_anchor.as_ref() else {
2302            return Err(format!("agent session '{id}' has no workspace anchor"));
2303        };
2304        Ok((anchor.primary.clone(), anchor.additional_roots.clone()))
2305    })
2306}
2307
2308fn validate_workspace_root_path(root: &str) -> Result<PathBuf, String> {
2309    let normalized = normalize_workspace_root_path(root);
2310    let canonical = std::fs::canonicalize(&normalized)
2311        .map_err(|error| format!("workspace root '{root}' must exist and be readable: {error}"))?;
2312    let metadata = std::fs::metadata(&canonical)
2313        .map_err(|error| format!("workspace root '{root}' must exist and be readable: {error}"))?;
2314    if !metadata.is_dir() {
2315        return Err(format!("workspace root '{root}' must be a directory"));
2316    }
2317    std::fs::read_dir(&canonical)
2318        .map_err(|error| format!("workspace root '{root}' must be readable: {error}"))?;
2319    Ok(canonical)
2320}
2321
2322fn normalize_workspace_root_path(root: &str) -> PathBuf {
2323    let absolute = crate::stdlib::process::normalize_context_path(Path::new(root));
2324    std::fs::canonicalize(&absolute).unwrap_or(absolute)
2325}
2326
2327fn empty_transcript(id: &str) -> VmValue {
2328    use crate::llm::helpers::new_transcript_with;
2329    new_transcript_with(Some(id.to_string()), Vec::new(), None, None)
2330}
2331
2332fn clone_transcript_with_id(transcript: &VmValue, new_id: &str) -> VmValue {
2333    let Some(dict) = transcript.as_dict() else {
2334        return empty_transcript(new_id);
2335    };
2336    let mut next = dict.clone();
2337    next.insert(
2338        "id".to_string(),
2339        VmValue::String(Rc::from(new_id.to_string())),
2340    );
2341    VmValue::Dict(Rc::new(next))
2342}
2343
2344fn clone_transcript_with_parent(transcript: &VmValue, parent_id: &str) -> VmValue {
2345    let Some(dict) = transcript.as_dict() else {
2346        return transcript.clone();
2347    };
2348    let mut next = dict.clone();
2349    let metadata = match next.get("metadata") {
2350        Some(VmValue::Dict(metadata)) => {
2351            let mut metadata = metadata.as_ref().clone();
2352            metadata.insert(
2353                "parent_session_id".to_string(),
2354                VmValue::String(Rc::from(parent_id.to_string())),
2355            );
2356            VmValue::Dict(Rc::new(metadata))
2357        }
2358        _ => VmValue::Dict(Rc::new(BTreeMap::from([(
2359            "parent_session_id".to_string(),
2360            VmValue::String(Rc::from(parent_id.to_string())),
2361        )]))),
2362    };
2363    next.insert("metadata".to_string(), metadata);
2364    VmValue::Dict(Rc::new(next))
2365}
2366
2367fn apply_system_prompt_metadata(next: &mut BTreeMap<String, VmValue>, system_prompt: &str) {
2368    let mut metadata = match next.get("metadata") {
2369        Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
2370        _ => BTreeMap::new(),
2371    };
2372    metadata.insert(
2373        "system_prompt".to_string(),
2374        crate::stdlib::json_to_vm_value(&crate::llm::helpers::system_prompt_metadata(
2375            system_prompt,
2376        )),
2377    );
2378    next.insert("metadata".to_string(), VmValue::Dict(Rc::new(metadata)));
2379}
2380
2381fn transcript_with_session_metadata(transcript: VmValue, state: &SessionState) -> VmValue {
2382    let Some(dict) = transcript.as_dict() else {
2383        return transcript;
2384    };
2385    let mut next = dict.clone();
2386    let mut metadata = match next.get("metadata") {
2387        Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
2388        _ => BTreeMap::new(),
2389    };
2390    if let Some(tool_format) = state.tool_format.as_ref() {
2391        metadata.insert(
2392            "tool_format".to_string(),
2393            VmValue::String(Rc::from(tool_format.clone())),
2394        );
2395        metadata.insert("tool_mode_locked".to_string(), VmValue::Bool(true));
2396    }
2397    if let Some(system_prompt) = state.system_prompt.as_ref() {
2398        metadata.insert(
2399            "system_prompt".to_string(),
2400            crate::stdlib::json_to_vm_value(&crate::llm::helpers::system_prompt_metadata(
2401                system_prompt,
2402            )),
2403        );
2404    }
2405    if let Some(anchor) = state.workspace_anchor.as_ref() {
2406        metadata.insert(
2407            WORKSPACE_ANCHOR_METADATA_KEY.to_string(),
2408            anchor.to_vm_value(),
2409        );
2410    } else {
2411        metadata.remove(WORKSPACE_ANCHOR_METADATA_KEY);
2412    }
2413    if let Some(last_action) = state.last_transcript_budget_action.as_ref() {
2414        let usage = transcript_usage(
2415            &VmValue::Dict(Rc::new(next.clone())),
2416            state.transcript_budget_policy.max_approx_bytes.is_some(),
2417        );
2418        metadata.insert(
2419            "transcript_budget".to_string(),
2420            crate::stdlib::json_to_vm_value(&serde_json::json!({
2421                "policy": transcript_budget_policy_json(&state.transcript_budget_policy.normalized()),
2422                "usage": transcript_budget_usage_json(&usage),
2423                "last_action": last_action,
2424            })),
2425        );
2426    }
2427    if !metadata.is_empty() {
2428        next.insert("metadata".to_string(), VmValue::Dict(Rc::new(metadata)));
2429    }
2430    VmValue::Dict(Rc::new(next))
2431}
2432
2433fn session_snapshot(state: &SessionState) -> VmValue {
2434    let transcript = transcript_with_session_metadata(state.transcript.clone(), state);
2435    let Some(dict) = transcript.as_dict() else {
2436        return state.transcript.clone();
2437    };
2438    let mut next = dict.clone();
2439    let length = next
2440        .get("messages")
2441        .and_then(|value| match value {
2442            VmValue::List(list) => Some(list.len() as i64),
2443            _ => None,
2444        })
2445        .unwrap_or(0);
2446    next.insert("length".to_string(), VmValue::Int(length));
2447    next.insert(
2448        "created_at".to_string(),
2449        VmValue::String(Rc::from(state.created_at.clone())),
2450    );
2451    next.insert(
2452        "parent_id".to_string(),
2453        state
2454            .parent_id
2455            .as_ref()
2456            .map(|id| VmValue::String(Rc::from(id.clone())))
2457            .unwrap_or(VmValue::Nil),
2458    );
2459    next.insert(
2460        "child_ids".to_string(),
2461        VmValue::List(Rc::new(
2462            state
2463                .child_ids
2464                .iter()
2465                .cloned()
2466                .map(|id| VmValue::String(Rc::from(id)))
2467                .collect(),
2468        )),
2469    );
2470    next.insert(
2471        "branched_at_event_index".to_string(),
2472        state
2473            .branched_at_event_index
2474            .map(|index| VmValue::Int(index as i64))
2475            .unwrap_or(VmValue::Nil),
2476    );
2477    next.insert(
2478        "system_prompt".to_string(),
2479        state
2480            .system_prompt
2481            .as_ref()
2482            .map(|prompt| VmValue::String(Rc::from(prompt.clone())))
2483            .unwrap_or(VmValue::Nil),
2484    );
2485    next.insert(
2486        "tool_format".to_string(),
2487        state
2488            .tool_format
2489            .as_ref()
2490            .map(|format| VmValue::String(Rc::from(format.clone())))
2491            .unwrap_or(VmValue::Nil),
2492    );
2493    next.insert(
2494        "pinned_model".to_string(),
2495        state
2496            .pinned_model
2497            .as_ref()
2498            .map(|model| VmValue::String(Rc::from(model.clone())))
2499            .unwrap_or(VmValue::Nil),
2500    );
2501    next.insert(
2502        "pinned_reasoning_policy".to_string(),
2503        state
2504            .pinned_reasoning_policy
2505            .as_ref()
2506            .map(|policy| VmValue::String(Rc::from(policy.clone())))
2507            .unwrap_or(VmValue::Nil),
2508    );
2509    next.insert(
2510        "workspace_anchor".to_string(),
2511        state
2512            .workspace_anchor
2513            .as_ref()
2514            .map(WorkspaceAnchor::to_vm_value)
2515            .unwrap_or(VmValue::Nil),
2516    );
2517    next.insert(
2518        "workspace_policy".to_string(),
2519        state.workspace_policy.to_vm_value(),
2520    );
2521    VmValue::Dict(Rc::new(next))
2522}
2523
2524fn update_lineage(
2525    map: &mut HashMap<String, SessionState>,
2526    parent_id: &str,
2527    child_id: &str,
2528    branched_at_event_index: Option<usize>,
2529) {
2530    let old_parent_id = map.get(child_id).and_then(|child| child.parent_id.clone());
2531    if let Some(old_parent_id) = old_parent_id.filter(|old_parent_id| old_parent_id != parent_id) {
2532        if let Some(old_parent) = map.get_mut(&old_parent_id) {
2533            old_parent.child_ids.retain(|id| id != child_id);
2534            old_parent.last_accessed = Instant::now();
2535        }
2536    }
2537    if let Some(parent) = map.get_mut(parent_id) {
2538        parent.last_accessed = Instant::now();
2539        if !parent.child_ids.iter().any(|id| id == child_id) {
2540            parent.child_ids.push(child_id.to_string());
2541        }
2542    }
2543    if let Some(child) = map.get_mut(child_id) {
2544        child.last_accessed = Instant::now();
2545        child.parent_id = Some(parent_id.to_string());
2546        child.branched_at_event_index = branched_at_event_index;
2547        child.transcript = clone_transcript_with_parent(&child.transcript, parent_id);
2548    }
2549}
2550
2551fn branch_event_index(transcript: &VmValue, keep_first: usize) -> usize {
2552    if keep_first == 0 {
2553        return 0;
2554    }
2555    let Some(dict) = transcript.as_dict() else {
2556        return keep_first;
2557    };
2558    let Some(VmValue::List(events)) = dict.get("events") else {
2559        return keep_first;
2560    };
2561    event_prefix_len_for_messages(events, keep_first)
2562}
2563
2564fn event_kind(event: &VmValue) -> Option<String> {
2565    event
2566        .as_dict()
2567        .and_then(|dict| dict.get("kind"))
2568        .map(VmValue::display)
2569}
2570
2571fn event_id(event: &VmValue) -> Option<String> {
2572    event
2573        .as_dict()
2574        .and_then(|dict| dict.get("id"))
2575        .map(VmValue::display)
2576}
2577
2578fn is_turn_event(event: &VmValue) -> bool {
2579    matches!(
2580        event_kind(event).as_deref(),
2581        Some("message" | "tool_result")
2582    )
2583}
2584
2585fn event_prefix_len_for_messages(events: &[VmValue], keep_first: usize) -> usize {
2586    if keep_first == 0 {
2587        return 0;
2588    }
2589    let mut retained_messages = 0usize;
2590    for (index, event) in events.iter().enumerate() {
2591        if is_turn_event(event) {
2592            retained_messages += 1;
2593            if retained_messages == keep_first {
2594                return index + 1;
2595            }
2596        }
2597    }
2598    events.len()
2599}
2600
2601fn turn_event_id_for_count(events: &[VmValue], keep_first: usize) -> Option<String> {
2602    if keep_first == 0 {
2603        return None;
2604    }
2605    let mut retained_messages = 0usize;
2606    for event in events {
2607        if is_turn_event(event) {
2608            retained_messages += 1;
2609            if retained_messages == keep_first {
2610                return event_id(event);
2611            }
2612        }
2613    }
2614    None
2615}
2616
2617#[cfg(test)]
2618#[path = "agent_sessions_tests.rs"]
2619mod tests;