Skip to main content

harn_vm/
agent_sessions.rs

1//! First-class session storage.
2//!
3//! A session owns three things:
4//!
5//! 1. A transcript dict (messages, events, summary, metadata, …).
6//! 2. Closure subscribers that fire on agent-loop events for this session.
7//! 3. Its own lifecycle (open, reset, fork, trim, compact, close).
8//!
9//! Storage is thread-local because `VmValue` contains `Rc`, which is
10//! neither `Send` nor `Sync`. The agent loop runs on a tokio
11//! current-thread worker, so all session reads and writes happen on the
12//! same thread. The closure-subscribers register, fire, and unregister
13//! on that same thread.
14//!
15//! Lifecycle is explicit. Builtins (`agent_session_open`,
16//! `_reset`, `_fork`, `_fork_at`, `_close`, `_trim`, `_compact`,
17//! `_inject`, `_exists`, `_length`, `_snapshot`, `_ancestry`) drive
18//! the store directly — there is no "policy" config dict that
19//! performs lifecycle as a side effect.
20
21use std::cell::{Cell, RefCell};
22use std::collections::{BTreeMap, HashMap, HashSet};
23use std::future::Future;
24use std::path::{Path, PathBuf};
25use std::rc::Rc;
26use std::time::Instant;
27
28use crate::runtime_limits::RuntimeLimits;
29use crate::value::VmValue;
30use crate::workspace_anchor::{
31    MountMode, MountedRoot, WorkspaceAnchor, WorkspacePolicy, WORKSPACE_ANCHOR_METADATA_KEY,
32};
33
34/// Default cap on concurrent sessions per VM thread. Beyond this the
35/// least-recently-accessed session is evicted on the next `open`.
36pub const DEFAULT_SESSION_CAP: usize = RuntimeLimits::DEFAULT.max_agent_sessions;
37
38/// Default cap on retained prompt-visible messages per session. The
39/// limit is intentionally high enough for normal long-running agents
40/// while still bounding accidental unbounded growth.
41pub const DEFAULT_TRANSCRIPT_MESSAGE_CAP: usize = 4096;
42
43/// Default cap on retained transcript audit events per session. Events
44/// include message-derived entries plus orchestration lifecycle records.
45pub const DEFAULT_TRANSCRIPT_EVENT_CAP: usize = 32768;
46#[cfg(debug_assertions)]
47const CACHE_STABLE_SYSTEM_PROMPT_DIAGNOSTIC: &str = "HARN-CACHE-001";
48
49#[derive(Clone, Debug, PartialEq, Eq)]
50pub enum TranscriptBudgetRecovery {
51    Reject,
52    Trim { keep_last: usize },
53    Compact { keep_last: usize },
54}
55
56#[derive(Clone, Debug, PartialEq, Eq)]
57pub struct SessionTranscriptBudgetPolicy {
58    pub max_messages: usize,
59    pub max_events: usize,
60    pub max_approx_bytes: Option<usize>,
61    pub recovery: TranscriptBudgetRecovery,
62}
63
64impl SessionTranscriptBudgetPolicy {
65    pub fn reject(max_messages: usize, max_events: usize) -> Self {
66        Self {
67            max_messages: max_messages.max(1),
68            max_events: max_events.max(1),
69            max_approx_bytes: None,
70            recovery: TranscriptBudgetRecovery::Reject,
71        }
72    }
73
74    pub fn trim(max_messages: usize, max_events: usize, keep_last: usize) -> Self {
75        Self {
76            max_messages: max_messages.max(1),
77            max_events: max_events.max(1),
78            max_approx_bytes: None,
79            recovery: TranscriptBudgetRecovery::Trim { keep_last },
80        }
81    }
82
83    pub fn compact(max_messages: usize, max_events: usize, keep_last: usize) -> Self {
84        Self {
85            max_messages: max_messages.max(1),
86            max_events: max_events.max(1),
87            max_approx_bytes: None,
88            recovery: TranscriptBudgetRecovery::Compact { keep_last },
89        }
90    }
91
92    pub fn with_max_approx_bytes(mut self, max_approx_bytes: Option<usize>) -> Self {
93        self.max_approx_bytes = max_approx_bytes.map(|limit| limit.max(1));
94        self
95    }
96
97    fn normalized(&self) -> Self {
98        Self {
99            max_messages: self.max_messages.max(1),
100            max_events: self.max_events.max(1),
101            max_approx_bytes: self.max_approx_bytes.map(|limit| limit.max(1)),
102            recovery: self.recovery.clone(),
103        }
104    }
105}
106
107impl Default for SessionTranscriptBudgetPolicy {
108    fn default() -> Self {
109        Self::reject(DEFAULT_TRANSCRIPT_MESSAGE_CAP, DEFAULT_TRANSCRIPT_EVENT_CAP)
110    }
111}
112
113pub struct SessionState {
114    pub id: String,
115    pub transcript: VmValue,
116    pub subscribers: Vec<VmValue>,
117    pub created_at: String,
118    pub last_accessed: Instant,
119    pub parent_id: Option<String>,
120    pub child_ids: Vec<String>,
121    pub branched_at_event_index: Option<usize>,
122    /// Names of skills that were active at the end of the most recent
123    /// `agent_loop` run on this session. Empty when no skills were
124    /// matched, when the skill system wasn't used, or when the
125    /// deactivation phase cleared them. Re-entering the session
126    /// restores these as the initial active set before matching runs.
127    pub active_skills: Vec<String>,
128    /// Tool-calling protocol claimed by the first agent loop that uses
129    /// this session. A transcript is only replayable under the same
130    /// contract that produced its prompt/history.
131    pub tool_format: Option<String>,
132    /// Stable session-level system prompt material. This is transcript
133    /// metadata, not a replay message: providers receive it through
134    /// their system/developer instruction channel on each call.
135    pub system_prompt: Option<String>,
136    /// Session-pinned model selector. When set, `llm_call` invocations
137    /// that do not pass an explicit `model:` option resolve to this
138    /// selector instead of `HARN_LLM_MODEL` / provider defaults. Mid-
139    /// session swap is exposed over ACP via `session/set_config_option`
140    /// (configId="model").
141    pub pinned_model: Option<String>,
142    /// Session-pinned high-level reasoning policy. When set, `llm_call`
143    /// invocations that do not pass explicit `thinking` or
144    /// `reasoning_effort` options resolve this provider-aware policy into
145    /// the route's native thinking shape. Exposed over ACP as
146    /// `session/set_config_option(configId="thought_level")`.
147    pub pinned_reasoning_policy: Option<String>,
148    /// Session-local workspace defaults. Persona and host policy layers
149    /// can update this without rewriting the current anchor.
150    pub workspace_policy: WorkspacePolicy,
151    /// Typed workspace anchor for the session. Primary path plus any
152    /// additional mounted roots; consumed by permission matchers, the
153    /// bundle exporter, and host-side cross-project handoff flows
154    /// (epic #2208). `None` until a host opens the session with one or
155    /// the ACP `reanchor` / `add_root` primitives populate it.
156    pub workspace_anchor: Option<WorkspaceAnchor>,
157    pub transcript_budget_policy: SessionTranscriptBudgetPolicy,
158    pub last_transcript_budget_action: Option<serde_json::Value>,
159}
160
161impl SessionState {
162    fn new(id: String) -> Self {
163        let now = Instant::now();
164        let transcript = empty_transcript(&id);
165        Self {
166            id,
167            transcript,
168            subscribers: Vec::new(),
169            created_at: crate::orchestration::now_rfc3339(),
170            last_accessed: now,
171            parent_id: None,
172            child_ids: Vec::new(),
173            branched_at_event_index: None,
174            active_skills: Vec::new(),
175            tool_format: None,
176            system_prompt: None,
177            pinned_model: None,
178            pinned_reasoning_policy: None,
179            workspace_policy: WorkspacePolicy::default(),
180            workspace_anchor: None,
181            transcript_budget_policy: default_transcript_budget_policy(),
182            last_transcript_budget_action: None,
183        }
184    }
185}
186
187#[derive(Clone, Debug, PartialEq, Eq)]
188pub struct SessionAncestry {
189    pub parent_id: Option<String>,
190    pub child_ids: Vec<String>,
191    pub root_id: String,
192}
193
194#[derive(Clone, Debug, PartialEq, Eq)]
195pub struct SessionTruncateResult {
196    pub kept_turn_count: usize,
197    pub removed_turn_count: usize,
198    pub new_tip_turn_id: Option<String>,
199}
200
201#[derive(Clone, Debug, PartialEq, Eq)]
202pub struct ReminderInjectionReport {
203    pub reminder_id: String,
204    pub deduped_count: usize,
205}
206
207thread_local! {
208    static SESSIONS: RefCell<HashMap<String, SessionState>> = RefCell::new(HashMap::new());
209    static SESSION_CAP: Cell<usize> = const { Cell::new(DEFAULT_SESSION_CAP) };
210    static DEFAULT_TRANSCRIPT_BUDGET_POLICY: RefCell<SessionTranscriptBudgetPolicy> =
211        RefCell::new(SessionTranscriptBudgetPolicy::default());
212    static CURRENT_SESSION_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
213    static CURRENT_TOOL_CALL_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
214}
215
216tokio::task_local! {
217    static CURRENT_TOOL_CALL_TASK: String;
218}
219
220pub struct CurrentSessionGuard {
221    active: bool,
222}
223
224impl Drop for CurrentSessionGuard {
225    fn drop(&mut self) {
226        if self.active {
227            pop_current_session();
228        }
229    }
230}
231
232/// RAII guard that scopes the active tool-call id for the running thread.
233///
234/// Set on entry to a tool dispatch and dropped on exit, so any hostlib
235/// builtin invoked under it (e.g. `tools/write_file`) can resolve the
236/// owning tool call without threading the id through every parameter.
237pub struct CurrentToolCallGuard {
238    active: bool,
239}
240
241impl Drop for CurrentToolCallGuard {
242    fn drop(&mut self) {
243        if self.active {
244            pop_current_tool_call();
245        }
246    }
247}
248
249/// Set the per-thread session cap. Primarily for tests; production VMs
250/// inherit the default.
251pub fn set_session_cap(cap: usize) {
252    SESSION_CAP.with(|c| c.set(cap.max(1)));
253}
254
255pub fn session_cap() -> usize {
256    SESSION_CAP.with(|c| c.get())
257}
258
259pub fn set_default_transcript_budget_policy(policy: SessionTranscriptBudgetPolicy) {
260    DEFAULT_TRANSCRIPT_BUDGET_POLICY.with(|cell| {
261        *cell.borrow_mut() = policy.normalized();
262    });
263}
264
265pub fn reset_default_transcript_budget_policy() {
266    set_default_transcript_budget_policy(SessionTranscriptBudgetPolicy::default());
267}
268
269pub fn default_transcript_budget_policy() -> SessionTranscriptBudgetPolicy {
270    DEFAULT_TRANSCRIPT_BUDGET_POLICY.with(|cell| cell.borrow().clone())
271}
272
273pub fn transcript_budget_policy(id: &str) -> Option<SessionTranscriptBudgetPolicy> {
274    SESSIONS.with(|s| {
275        s.borrow()
276            .get(id)
277            .map(|state| state.transcript_budget_policy.clone())
278    })
279}
280
281pub fn set_transcript_budget_policy(
282    id: &str,
283    policy: SessionTranscriptBudgetPolicy,
284) -> Result<(), String> {
285    SESSIONS.with(|s| {
286        let mut map = s.borrow_mut();
287        let Some(state) = map.get_mut(id) else {
288            return Err(format!("agent session '{id}' does not exist"));
289        };
290        let previous = state.transcript_budget_policy.clone();
291        let previous_action = state.last_transcript_budget_action.clone();
292        state.transcript_budget_policy = policy.normalized();
293        let candidate = state.transcript.clone();
294        if let Err(error) = apply_transcript_with_budget(state, candidate, "policy_update") {
295            state.transcript_budget_policy = previous;
296            state.last_transcript_budget_action = previous_action;
297            return Err(error);
298        }
299        state.last_accessed = Instant::now();
300        Ok(())
301    })
302}
303
304/// Clear the session store. Wired into `reset_llm_state` for test isolation.
305pub fn reset_session_store() {
306    SESSIONS.with(|s| s.borrow_mut().clear());
307    CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().clear());
308    CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow_mut().clear());
309    reset_default_transcript_budget_policy();
310}
311
312pub(crate) fn push_current_session(id: String) {
313    if id.is_empty() {
314        return;
315    }
316    CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().push(id));
317}
318
319pub(crate) fn pop_current_session() {
320    CURRENT_SESSION_STACK.with(|stack| {
321        let _ = stack.borrow_mut().pop();
322    });
323}
324
325pub fn current_session_id() -> Option<String> {
326    CURRENT_SESSION_STACK.with(|stack| stack.borrow().last().cloned())
327}
328
329pub fn enter_current_session(id: impl Into<String>) -> CurrentSessionGuard {
330    let id = id.into();
331    if id.trim().is_empty() {
332        return CurrentSessionGuard { active: false };
333    }
334    push_current_session(id);
335    CurrentSessionGuard { active: true }
336}
337
338fn push_current_tool_call(id: String) {
339    if id.is_empty() {
340        return;
341    }
342    CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow_mut().push(id));
343}
344
345fn pop_current_tool_call() {
346    CURRENT_TOOL_CALL_STACK.with(|stack| {
347        let _ = stack.borrow_mut().pop();
348    });
349}
350
351/// Return the active tool-call id for the current thread, if any.
352///
353/// Hostlib builtins consult this to attribute side-effect snapshots to
354/// the owning ACP `toolCallId` without callers passing it explicitly.
355pub fn current_tool_call_id() -> Option<String> {
356    if let Ok(id) = CURRENT_TOOL_CALL_TASK.try_with(Clone::clone) {
357        if !id.trim().is_empty() {
358            return Some(id);
359        }
360    }
361    CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow().last().cloned())
362}
363
364/// Scope the active tool-call id to one async task.
365///
366/// Parallel tool dispatch runs sibling calls on the same OS thread, so
367/// thread-local guards alone cannot preserve attribution across `.await`
368/// points. Tokio task-local scoping follows the future instead.
369pub async fn scope_current_tool_call<F, T>(id: impl Into<String>, future: F) -> T
370where
371    F: Future<Output = T>,
372{
373    let id = id.into();
374    if id.trim().is_empty() {
375        future.await
376    } else {
377        CURRENT_TOOL_CALL_TASK.scope(id, future).await
378    }
379}
380
381/// Scope the active tool-call id for the duration of the returned guard.
382pub fn enter_current_tool_call(id: impl Into<String>) -> CurrentToolCallGuard {
383    let id = id.into();
384    if id.trim().is_empty() {
385        return CurrentToolCallGuard { active: false };
386    }
387    push_current_tool_call(id);
388    CurrentToolCallGuard { active: true }
389}
390
391pub fn exists(id: &str) -> bool {
392    SESSIONS.with(|s| s.borrow().contains_key(id))
393}
394
395pub fn length(id: &str) -> Option<usize> {
396    SESSIONS.with(|s| {
397        s.borrow().get(id).map(|state| {
398            state
399                .transcript
400                .as_dict()
401                .and_then(|d| d.get("messages"))
402                .and_then(|v| match v {
403                    VmValue::List(list) => Some(list.len()),
404                    _ => None,
405                })
406                .unwrap_or(0)
407        })
408    })
409}
410
411pub fn snapshot(id: &str) -> Option<VmValue> {
412    SESSIONS.with(|s| s.borrow().get(id).map(session_snapshot))
413}
414
415/// Return the canonical transcript surface for APIs whose result field is
416/// named `transcript`. Session-only fields stay on `agent_session_snapshot`.
417pub fn transcript(id: &str) -> Option<VmValue> {
418    SESSIONS.with(|s| {
419        s.borrow()
420            .get(id)
421            .map(|state| transcript_with_session_metadata(state.transcript.clone(), state))
422    })
423}
424
425/// Open a session, or create it if missing. Returns the resolved id.
426///
427/// Newly-created sessions auto-register an event-log-backed sink when a
428/// generalized [`crate::event_log::EventLog`] has been installed for the
429/// current VM thread. For legacy env-driven workflows that still point
430/// `HARN_EVENT_LOG_DIR` at a directory, we preserve the older JSONL sink
431/// as a compatibility fallback. Re-opening an existing session does not
432/// re-register — sinks are per-session, owned by the first opener.
433pub fn open_or_create(id: Option<String>) -> String {
434    let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
435    let parent_session = current_session_id();
436    let mut was_new = false;
437    SESSIONS.with(|s| {
438        let mut map = s.borrow_mut();
439        if let Some(state) = map.get_mut(&resolved) {
440            state.last_accessed = Instant::now();
441            return;
442        }
443        was_new = true;
444        let cap = SESSION_CAP.with(|c| c.get());
445        if map.len() >= cap {
446            if let Some(victim) = map
447                .iter()
448                .min_by_key(|(_, state)| state.last_accessed)
449                .map(|(id, _)| id.clone())
450            {
451                map.remove(&victim);
452            }
453        }
454        map.insert(resolved.clone(), SessionState::new(resolved.clone()));
455    });
456    if was_new {
457        if let Some(parent) = parent_session.as_deref() {
458            crate::agent_events::mirror_session_sinks(parent, &resolved);
459        }
460        try_register_event_log(&resolved);
461    }
462    resolved
463}
464
465pub fn open_child_session(parent_id: &str, id: Option<String>) -> String {
466    let resolved = open_or_create(id);
467    link_child_session(parent_id, &resolved);
468    resolved
469}
470
471pub fn link_child_session(parent_id: &str, child_id: &str) {
472    link_child_session_with_branch(parent_id, child_id, None);
473}
474
475pub fn link_child_session_with_branch(
476    parent_id: &str,
477    child_id: &str,
478    branched_at_event_index: Option<usize>,
479) {
480    if parent_id == child_id {
481        return;
482    }
483    open_or_create(Some(parent_id.to_string()));
484    open_or_create(Some(child_id.to_string()));
485    SESSIONS.with(|s| {
486        let mut map = s.borrow_mut();
487        update_lineage(&mut map, parent_id, child_id, branched_at_event_index);
488    });
489}
490
491pub fn parent_id(id: &str) -> Option<String> {
492    SESSIONS.with(|s| s.borrow().get(id).and_then(|state| state.parent_id.clone()))
493}
494
495pub fn child_ids(id: &str) -> Vec<String> {
496    SESSIONS.with(|s| {
497        s.borrow()
498            .get(id)
499            .map(|state| state.child_ids.clone())
500            .unwrap_or_default()
501    })
502}
503
504pub fn ancestry(id: &str) -> Option<SessionAncestry> {
505    SESSIONS.with(|s| {
506        let map = s.borrow();
507        let state = map.get(id)?;
508        let mut root_id = state.id.clone();
509        let mut cursor = state.parent_id.clone();
510        let mut seen = HashSet::from([state.id.clone()]);
511        while let Some(parent_id) = cursor {
512            if !seen.insert(parent_id.clone()) {
513                break;
514            }
515            root_id = parent_id.clone();
516            cursor = map
517                .get(&parent_id)
518                .and_then(|parent| parent.parent_id.clone());
519        }
520        Some(SessionAncestry {
521            parent_id: state.parent_id.clone(),
522            child_ids: state.child_ids.clone(),
523            root_id,
524        })
525    })
526}
527
528/// Auto-register a persistent sink for a newly-created session.
529/// Silent no-op on failure — a broken observability sink must never
530/// prevent a session from starting.
531fn try_register_event_log(session_id: &str) {
532    if let Some(log) = crate::event_log::active_event_log() {
533        crate::agent_events::register_sink(
534            session_id,
535            crate::agent_events::EventLogSink::new(log, session_id),
536        );
537        return;
538    }
539    let Ok(dir) = std::env::var("HARN_EVENT_LOG_DIR") else {
540        return;
541    };
542    if dir.is_empty() {
543        return;
544    }
545    let path = std::path::PathBuf::from(dir).join(format!("event_log-{session_id}.jsonl"));
546    if let Ok(sink) = crate::agent_events::JsonlEventSink::open(&path) {
547        crate::agent_events::register_sink(session_id, sink);
548    }
549}
550
551pub fn register_event_log_sink(session_id: &str) {
552    try_register_event_log(session_id);
553}
554
555pub fn close(id: &str) {
556    SESSIONS.with(|s| {
557        s.borrow_mut().remove(id);
558    });
559    // Cross-thread per-session state must be released too, otherwise
560    // pending inbox entries can be delivered to a future session that
561    // happens to reuse the same id.
562    crate::orchestration::agent_inbox::clear_session(id);
563    crate::agent_events::clear_session_sinks(id);
564}
565
566pub fn close_with_status(
567    id: &str,
568    reason: impl Into<String>,
569    status: impl Into<String>,
570    metadata: serde_json::Value,
571) -> bool {
572    if !exists(id) {
573        return false;
574    }
575    let reason = reason.into();
576    let status = status.into();
577    let event_metadata = serde_json::json!({
578        "reason": reason,
579        "status": status,
580        "metadata": metadata,
581    });
582    let transcript_event = crate::llm::helpers::transcript_event(
583        "agent_session_closed",
584        "system",
585        "internal",
586        "Agent session closed",
587        Some(event_metadata.clone()),
588    );
589    let _ = append_event(id, transcript_event);
590    crate::llm::emit_live_agent_event_sync(&crate::agent_events::AgentEvent::SessionClosed {
591        session_id: id.to_string(),
592        reason,
593        status,
594        metadata,
595    });
596    close(id);
597    true
598}
599
600pub fn reset_transcript(id: &str) -> bool {
601    SESSIONS.with(|s| {
602        let mut map = s.borrow_mut();
603        let Some(state) = map.get_mut(id) else {
604            return false;
605        };
606        state.transcript = empty_transcript(id);
607        state.tool_format = None;
608        state.system_prompt = None;
609        state.last_transcript_budget_action = None;
610        state.last_accessed = Instant::now();
611        true
612    })
613}
614
615/// Copy `src`'s transcript into a new session id. Subscribers are NOT
616/// copied — a fork is a conversation branch, not an event fanout.
617///
618/// Touches `src`'s `last_accessed` before evicting, so the fork
619/// operation itself can't make `src` look stale and kick it out of
620/// the LRU just to make room for the new fork.
621pub fn fork(src_id: &str, dst_id: Option<String>) -> Option<String> {
622    let (
623        src_transcript,
624        src_tool_format,
625        src_system_prompt,
626        src_pinned_model,
627        src_pinned_reasoning_policy,
628        src_workspace_anchor,
629        src_workspace_policy,
630        src_transcript_budget_policy,
631        src_last_transcript_budget_action,
632        dst,
633    ) = SESSIONS.with(|s| {
634        let mut map = s.borrow_mut();
635        let src = map.get_mut(src_id)?;
636        src.last_accessed = Instant::now();
637        let dst = dst_id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
638        let forked_transcript = clone_transcript_with_id(&src.transcript, &dst);
639        Some((
640            forked_transcript,
641            src.tool_format.clone(),
642            src.system_prompt.clone(),
643            src.pinned_model.clone(),
644            src.pinned_reasoning_policy.clone(),
645            src.workspace_anchor.clone(),
646            src.workspace_policy.clone(),
647            src.transcript_budget_policy.clone(),
648            src.last_transcript_budget_action.clone(),
649            dst,
650        ))
651    })?;
652    // Ensure cap is respected when inserting the fork.
653    open_or_create(Some(dst.clone()));
654    SESSIONS.with(|s| {
655        let mut map = s.borrow_mut();
656        if let Some(state) = map.get_mut(&dst) {
657            state.transcript = src_transcript;
658            state.tool_format = src_tool_format;
659            state.system_prompt = src_system_prompt;
660            state.pinned_model = src_pinned_model;
661            state.pinned_reasoning_policy = src_pinned_reasoning_policy;
662            state.workspace_anchor = src_workspace_anchor;
663            state.workspace_policy = src_workspace_policy;
664            state.transcript_budget_policy = src_transcript_budget_policy;
665            state.last_transcript_budget_action = src_last_transcript_budget_action;
666            state.last_accessed = Instant::now();
667        }
668        update_lineage(&mut map, src_id, &dst, None);
669    });
670    let budget_ok = SESSIONS.with(|s| {
671        let mut map = s.borrow_mut();
672        let Some(state) = map.get_mut(&dst) else {
673            return false;
674        };
675        let candidate = state.transcript.clone();
676        apply_transcript_with_budget(state, candidate, "fork").is_ok()
677    });
678    if !budget_ok {
679        close(&dst);
680        return None;
681    }
682    // open_or_create evicts BEFORE inserting, so the dst slot is
683    // guaranteed once we get here. The existence check is cheap
684    // insurance against a future refactor that breaks that invariant.
685    if exists(&dst) {
686        Some(dst)
687    } else {
688        None
689    }
690}
691
692/// Fork `src_id` and truncate the destination transcript to the
693/// first `keep_first` messages (#105 — branch-replay). Pairs with the
694/// scrubber: the host picks an event index, rebuilds a message count,
695/// and calls this to spawn a live sibling session that resumes from
696/// the rebuilt state. Subscribers are not carried over (same as
697/// `fork`), so sibling events don't double-fan into the parent's
698/// consumers.
699///
700/// Returns the new session id on success, `None` if `src_id` doesn't
701/// exist.
702pub fn fork_at(src_id: &str, keep_first: usize, dst_id: Option<String>) -> Option<String> {
703    let branched_at_event_index = SESSIONS.with(|s| {
704        let map = s.borrow();
705        let src = map.get(src_id)?;
706        Some(branch_event_index(&src.transcript, keep_first))
707    })?;
708    let new_id = fork(src_id, dst_id)?;
709    link_child_session_with_branch(src_id, &new_id, Some(branched_at_event_index));
710    let _ = truncate(&new_id, keep_first);
711    Some(new_id)
712}
713
714/// Truncate the session transcript to the first `keep_first`
715/// messages (opposite of `trim`, which keeps the last N). Returns
716/// counts and the retained tip event id when the session exists.
717pub fn truncate(id: &str, keep_first: usize) -> Option<SessionTruncateResult> {
718    SESSIONS.with(|s| {
719        let mut map = s.borrow_mut();
720        let state = map.get_mut(id)?;
721        let result = truncate_state(state, keep_first)?;
722        Some(result)
723    })
724}
725
726fn truncate_state(state: &mut SessionState, keep_first: usize) -> Option<SessionTruncateResult> {
727    let dict = state
728        .transcript
729        .as_dict()
730        .cloned()
731        .unwrap_or_else(BTreeMap::new);
732    let messages: Vec<VmValue> = match dict.get("messages") {
733        Some(VmValue::List(list)) => list.iter().cloned().collect(),
734        _ => Vec::new(),
735    };
736    let existing_events = match dict.get("events") {
737        Some(VmValue::List(list)) => Some(list.iter().cloned().collect::<Vec<_>>()),
738        _ => None,
739    };
740    let kept_turn_count = keep_first.min(messages.len());
741    let removed_turn_count = messages.len().saturating_sub(kept_turn_count);
742    let mut new_tip_turn_id = existing_events
743        .as_ref()
744        .map(|events| turn_event_id_for_count(events, kept_turn_count))
745        .unwrap_or_else(|| {
746            let events = crate::llm::helpers::transcript_events_from_messages(&messages);
747            turn_event_id_for_count(&events, kept_turn_count)
748        });
749
750    if removed_turn_count > 0 {
751        let retained: Vec<VmValue> = messages.into_iter().take(kept_turn_count).collect();
752        let retained_events = match existing_events {
753            Some(events) => {
754                let keep_event_count = event_prefix_len_for_messages(&events, kept_turn_count);
755                events.into_iter().take(keep_event_count).collect()
756            }
757            None => crate::llm::helpers::transcript_events_from_messages(&retained),
758        };
759        new_tip_turn_id = turn_event_id_for_count(&retained_events, kept_turn_count);
760        let mut next = dict;
761        next.insert(
762            "events".to_string(),
763            VmValue::List(Rc::new(retained_events)),
764        );
765        next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
766        next.remove("summary");
767        apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), "truncate").ok()?;
768    }
769    state.last_accessed = Instant::now();
770    Some(SessionTruncateResult {
771        kept_turn_count,
772        removed_turn_count,
773        new_tip_turn_id,
774    })
775}
776
777/// Pop the trailing message iff it is an assistant message. Used by
778/// `agent_step_judge` to remove a vetoed assistant turn before
779/// regeneration (the "replace" on_veto path). Returns `true` if a
780/// message was popped, `false` if the transcript was empty, and an
781/// error if the trailing message was not an assistant turn —
782/// signalling a call-site discipline bug rather than a runtime error.
783pub fn pop_last_if_assistant(id: &str) -> Result<bool, String> {
784    SESSIONS.with(|s| {
785        let mut map = s.borrow_mut();
786        let Some(state) = map.get_mut(id) else {
787            return Err(format!(
788                "pop_last_if_assistant: unknown session id '{id}'"
789            ));
790        };
791        let messages: Vec<VmValue> = match state.transcript.as_dict() {
792            Some(dict) => match dict.get("messages") {
793                Some(VmValue::List(list)) => list.iter().cloned().collect(),
794                _ => Vec::new(),
795            },
796            None => Vec::new(),
797        };
798        if messages.is_empty() {
799            return Ok(false);
800        }
801        let trailing_role = messages
802            .last()
803            .and_then(|m| m.as_dict())
804            .and_then(|d| d.get("role"))
805            .map(|v| v.display())
806            .unwrap_or_default();
807        if trailing_role != "assistant" {
808            return Err(format!(
809                "pop_last_if_assistant: trailing message role is '{trailing_role}', expected 'assistant'"
810            ));
811        }
812        let keep = messages.len() - 1;
813        truncate_state(state, keep);
814        Ok(true)
815    })
816}
817
818/// Retain only the last `keep_last` messages in the session transcript.
819/// Returns the kept count (<= keep_last).
820pub fn trim(id: &str, keep_last: usize) -> Option<usize> {
821    SESSIONS.with(|s| {
822        let mut map = s.borrow_mut();
823        let state = map.get_mut(id)?;
824        let dict = state.transcript.as_dict()?.clone();
825        let messages: Vec<VmValue> = match dict.get("messages") {
826            Some(VmValue::List(list)) => list.iter().cloned().collect(),
827            _ => Vec::new(),
828        };
829        let start = messages.len().saturating_sub(keep_last);
830        let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
831        let kept = retained.len();
832        let mut next = dict;
833        next.insert(
834            "events".to_string(),
835            VmValue::List(Rc::new(
836                crate::llm::helpers::transcript_events_from_messages(&retained),
837            )),
838        );
839        next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
840        apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), "trim").ok()?;
841        state.last_accessed = Instant::now();
842        Some(kept)
843    })
844}
845
846/// Append a message dict to the session transcript. The message must
847/// have at least a string `role`; anything else is merged verbatim.
848pub fn inject_message(id: &str, message: VmValue) -> Result<(), String> {
849    let Some(msg_dict) = message.as_dict().cloned() else {
850        return Err("agent_session_inject: message must be a dict".into());
851    };
852    let role_ok = matches!(msg_dict.get("role"), Some(VmValue::String(_)));
853    if !role_ok {
854        return Err(
855            "agent_session_inject: message must have a string `role` (user|assistant|tool_result|system)"
856                .into(),
857        );
858    }
859    SESSIONS.with(|s| {
860        let mut map = s.borrow_mut();
861        let Some(state) = map.get_mut(id) else {
862            return Err(format!("agent_session_inject: unknown session id '{id}'"));
863        };
864        let dict = state
865            .transcript
866            .as_dict()
867            .cloned()
868            .unwrap_or_else(BTreeMap::new);
869        let mut messages: Vec<VmValue> = match dict.get("messages") {
870            Some(VmValue::List(list)) => list.iter().cloned().collect(),
871            _ => Vec::new(),
872        };
873        let mut events: Vec<VmValue> = match dict.get("events") {
874            Some(VmValue::List(list)) => list.iter().cloned().collect(),
875            _ => crate::llm::helpers::transcript_events_from_messages(&messages),
876        };
877        let new_message = VmValue::Dict(Rc::new(msg_dict));
878        let message_index = messages.len();
879        events.push(crate::llm::helpers::transcript_event_from_message(
880            &new_message,
881        ));
882        messages.push(new_message);
883        let mut next = dict;
884        next.insert("events".to_string(), VmValue::List(Rc::new(events)));
885        next.insert("messages".to_string(), VmValue::List(Rc::new(messages)));
886        let persisted_message = next
887            .get("messages")
888            .and_then(|value| match value {
889                VmValue::List(list) => list.get(message_index).cloned(),
890                _ => None,
891            })
892            .unwrap_or(VmValue::Nil);
893        apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), "inject_message")?;
894        emit_identified_user_message_event(id, &persisted_message);
895        emit_llm_message_event(id, message_index, &persisted_message);
896        state.last_accessed = Instant::now();
897        Ok(())
898    })
899}
900
901fn emit_identified_user_message_event(session_id: &str, message: &VmValue) {
902    let message_json = crate::llm::helpers::vm_value_to_json(message);
903    let role = message_json.get("role").and_then(|value| value.as_str());
904    if role != Some("user") {
905        return;
906    }
907    let Some(message_id) = message_json
908        .get("messageId")
909        .or_else(|| message_json.get("message_id"))
910        .and_then(|value| value.as_str())
911        .filter(|value| !value.trim().is_empty())
912    else {
913        return;
914    };
915    let content = message_json
916        .get("content")
917        .map(user_message_content_blocks)
918        .unwrap_or_default();
919    crate::agent_events::emit_event(&crate::agent_events::AgentEvent::UserMessage {
920        session_id: session_id.to_string(),
921        message_id: message_id.to_string(),
922        content,
923    });
924}
925
926fn user_message_content_blocks(content: &serde_json::Value) -> Vec<serde_json::Value> {
927    match content {
928        serde_json::Value::Array(items) => items.clone(),
929        serde_json::Value::String(text) => vec![serde_json::json!({
930            "type": "text",
931            "text": text,
932        })],
933        other => vec![serde_json::json!({
934            "type": "text",
935            "text": other.to_string(),
936        })],
937    }
938}
939
940#[derive(Clone, Debug, PartialEq, Eq)]
941struct TranscriptBudgetUsage {
942    message_count: usize,
943    event_count: usize,
944    approx_bytes: Option<usize>,
945}
946
947fn transcript_messages_from_dict(dict: &BTreeMap<String, VmValue>) -> Vec<VmValue> {
948    match dict.get("messages") {
949        Some(VmValue::List(list)) => list.iter().cloned().collect(),
950        _ => Vec::new(),
951    }
952}
953
954fn transcript_events_from_dict(dict: &BTreeMap<String, VmValue>) -> Vec<VmValue> {
955    match dict.get("events") {
956        Some(VmValue::List(list)) => list.iter().cloned().collect(),
957        _ => {
958            let messages = transcript_messages_from_dict(dict);
959            crate::llm::helpers::transcript_events_from_messages(&messages)
960        }
961    }
962}
963
964fn transcript_usage(transcript: &VmValue, include_bytes: bool) -> TranscriptBudgetUsage {
965    let Some(dict) = transcript.as_dict() else {
966        return TranscriptBudgetUsage {
967            message_count: 0,
968            event_count: 0,
969            approx_bytes: include_bytes.then_some(0),
970        };
971    };
972    let approx_bytes = if include_bytes {
973        serde_json::to_vec(&crate::llm::helpers::vm_value_to_json(transcript))
974            .map(|bytes| bytes.len())
975            .ok()
976            .or(Some(usize::MAX))
977    } else {
978        None
979    };
980    TranscriptBudgetUsage {
981        message_count: transcript_messages_from_dict(dict).len(),
982        event_count: transcript_events_from_dict(dict).len(),
983        approx_bytes,
984    }
985}
986
987fn transcript_budget_exceeded_reason(
988    usage: &TranscriptBudgetUsage,
989    policy: &SessionTranscriptBudgetPolicy,
990) -> Option<&'static str> {
991    if usage.message_count > policy.max_messages {
992        return Some("message_count");
993    }
994    if usage.event_count > policy.max_events {
995        return Some("event_count");
996    }
997    if let (Some(bytes), Some(limit)) = (usage.approx_bytes, policy.max_approx_bytes) {
998        if bytes > limit {
999            return Some("approx_bytes");
1000        }
1001    }
1002    None
1003}
1004
1005fn transcript_budget_usage_json(usage: &TranscriptBudgetUsage) -> serde_json::Value {
1006    serde_json::json!({
1007        "messages": usage.message_count,
1008        "events": usage.event_count,
1009        "approx_bytes": usage.approx_bytes,
1010    })
1011}
1012
1013fn transcript_budget_policy_json(policy: &SessionTranscriptBudgetPolicy) -> serde_json::Value {
1014    let recovery = match &policy.recovery {
1015        TranscriptBudgetRecovery::Reject => serde_json::json!({"action": "reject"}),
1016        TranscriptBudgetRecovery::Trim { keep_last } => {
1017            serde_json::json!({"action": "trim", "keep_last": keep_last})
1018        }
1019        TranscriptBudgetRecovery::Compact { keep_last } => {
1020            serde_json::json!({"action": "compact", "keep_last": keep_last})
1021        }
1022    };
1023    serde_json::json!({
1024        "max_messages": policy.max_messages,
1025        "max_events": policy.max_events,
1026        "max_approx_bytes": policy.max_approx_bytes,
1027        "recovery": recovery,
1028    })
1029}
1030
1031fn transcript_budget_recovery_name(recovery: &TranscriptBudgetRecovery) -> &'static str {
1032    match recovery {
1033        TranscriptBudgetRecovery::Reject => "reject",
1034        TranscriptBudgetRecovery::Trim { .. } => "trim",
1035        TranscriptBudgetRecovery::Compact { .. } => "compact",
1036    }
1037}
1038
1039fn transcript_budget_error(
1040    state: &SessionState,
1041    policy: &SessionTranscriptBudgetPolicy,
1042    usage: &TranscriptBudgetUsage,
1043    reason: &str,
1044) -> String {
1045    let byte_suffix = match (usage.approx_bytes, policy.max_approx_bytes) {
1046        (Some(bytes), Some(limit)) => format!(", approx_bytes {bytes}/{limit}"),
1047        _ => String::new(),
1048    };
1049    format!(
1050        "transcript budget exceeded for session '{}': {reason} (messages {}/{}, events {}/{}{}; recovery={})",
1051        state.id,
1052        usage.message_count,
1053        policy.max_messages,
1054        usage.event_count,
1055        policy.max_events,
1056        byte_suffix,
1057        transcript_budget_recovery_name(&policy.recovery),
1058    )
1059}
1060
1061fn transcript_budget_audit_json(
1062    action: &str,
1063    source: &str,
1064    reason: &str,
1065    policy: &SessionTranscriptBudgetPolicy,
1066    usage_before: &TranscriptBudgetUsage,
1067    usage_attempted: &TranscriptBudgetUsage,
1068    usage_after: &TranscriptBudgetUsage,
1069) -> serde_json::Value {
1070    serde_json::json!({
1071        "action": action,
1072        "source": source,
1073        "reason": reason,
1074        "policy": transcript_budget_policy_json(policy),
1075        "usage_before": transcript_budget_usage_json(usage_before),
1076        "usage_attempted": transcript_budget_usage_json(usage_attempted),
1077        "usage_after": transcript_budget_usage_json(usage_after),
1078        "removed_messages": usage_attempted.message_count.saturating_sub(usage_after.message_count),
1079        "removed_events": usage_attempted.event_count.saturating_sub(usage_after.event_count),
1080    })
1081}
1082
1083fn transcript_budget_event(audit: &serde_json::Value) -> VmValue {
1084    let action = audit
1085        .get("action")
1086        .and_then(serde_json::Value::as_str)
1087        .unwrap_or("enforced");
1088    crate::llm::helpers::transcript_event(
1089        "transcript_budget",
1090        "system",
1091        "internal",
1092        &format!("Transcript budget {action}."),
1093        Some(audit.clone()),
1094    )
1095}
1096
1097fn append_event_to_transcript(transcript: VmValue, event: VmValue) -> VmValue {
1098    let Some(dict) = transcript.as_dict() else {
1099        return transcript;
1100    };
1101    let mut next = dict.clone();
1102    let mut events = transcript_events_from_dict(&next);
1103    events.push(event);
1104    next.insert("events".to_string(), VmValue::List(Rc::new(events)));
1105    VmValue::Dict(Rc::new(next))
1106}
1107
1108fn summary_message_vm(summary: &str) -> VmValue {
1109    crate::stdlib::json_to_vm_value(&summary_message_json(summary))
1110}
1111
1112fn tail_message_capacity(
1113    policy: &SessionTranscriptBudgetPolicy,
1114    reserve_audit_event: bool,
1115) -> usize {
1116    let event_capacity = if reserve_audit_event {
1117        policy.max_events.saturating_sub(1)
1118    } else {
1119        policy.max_events
1120    };
1121    policy.max_messages.min(event_capacity)
1122}
1123
1124fn trim_transcript_for_budget(
1125    transcript: &VmValue,
1126    policy: &SessionTranscriptBudgetPolicy,
1127    keep_last: usize,
1128) -> VmValue {
1129    let dict = transcript.as_dict().cloned().unwrap_or_else(BTreeMap::new);
1130    let messages = transcript_messages_from_dict(&dict);
1131    let keep = keep_last.min(tail_message_capacity(policy, true));
1132    let start = messages.len().saturating_sub(keep);
1133    let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
1134    let mut next = dict;
1135    next.insert(
1136        "events".to_string(),
1137        VmValue::List(Rc::new(
1138            crate::llm::helpers::transcript_events_from_messages(&retained),
1139        )),
1140    );
1141    next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
1142    next.remove("summary");
1143    VmValue::Dict(Rc::new(next))
1144}
1145
1146fn compact_transcript_for_budget(
1147    transcript: &VmValue,
1148    policy: &SessionTranscriptBudgetPolicy,
1149    keep_last: usize,
1150    reason: &str,
1151) -> VmValue {
1152    let dict = transcript.as_dict().cloned().unwrap_or_else(BTreeMap::new);
1153    let messages = transcript_messages_from_dict(&dict);
1154    let message_capacity = tail_message_capacity(policy, true);
1155    let mut retained = Vec::new();
1156    let mut summary = None;
1157
1158    if messages.len() > message_capacity {
1159        if message_capacity > 0 {
1160            let tail_keep = keep_last.min(message_capacity.saturating_sub(1));
1161            let archived = messages.len().saturating_sub(tail_keep);
1162            let summary_text = format!(
1163                "[auto-compacted {archived} older message(s) under transcript budget]\nSession transcript exceeded the {reason} budget; retained the most recent {tail_keep} message(s)."
1164            );
1165            retained.push(summary_message_vm(&summary_text));
1166            retained.extend(messages.into_iter().skip(archived).take(tail_keep));
1167            summary = Some(summary_text);
1168        }
1169    } else {
1170        retained = messages;
1171    }
1172
1173    let mut next = dict;
1174    next.insert(
1175        "events".to_string(),
1176        VmValue::List(Rc::new(
1177            crate::llm::helpers::transcript_events_from_messages(&retained),
1178        )),
1179    );
1180    next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
1181    if let Some(summary) = summary {
1182        next.insert("summary".to_string(), VmValue::String(Rc::from(summary)));
1183    } else {
1184        next.remove("summary");
1185    }
1186    VmValue::Dict(Rc::new(next))
1187}
1188
1189fn recovered_transcript_with_audit(
1190    recovered: VmValue,
1191    action: &str,
1192    source: &str,
1193    reason: &str,
1194    policy: &SessionTranscriptBudgetPolicy,
1195    usage_before: &TranscriptBudgetUsage,
1196    usage_attempted: &TranscriptBudgetUsage,
1197    include_bytes: bool,
1198) -> (VmValue, serde_json::Value, TranscriptBudgetUsage) {
1199    let usage_after_without_audit = transcript_usage(&recovered, include_bytes);
1200    let initial_audit = transcript_budget_audit_json(
1201        action,
1202        source,
1203        reason,
1204        policy,
1205        usage_before,
1206        usage_attempted,
1207        &usage_after_without_audit,
1208    );
1209    let with_initial_audit =
1210        append_event_to_transcript(recovered.clone(), transcript_budget_event(&initial_audit));
1211    let usage_after = transcript_usage(&with_initial_audit, include_bytes);
1212    let audit = transcript_budget_audit_json(
1213        action,
1214        source,
1215        reason,
1216        policy,
1217        usage_before,
1218        usage_attempted,
1219        &usage_after,
1220    );
1221    let with_audit = append_event_to_transcript(recovered, transcript_budget_event(&audit));
1222    let usage_after = transcript_usage(&with_audit, include_bytes);
1223    (with_audit, audit, usage_after)
1224}
1225
1226fn apply_transcript_with_budget(
1227    state: &mut SessionState,
1228    candidate: VmValue,
1229    source: &str,
1230) -> Result<(), String> {
1231    let policy = state.transcript_budget_policy.normalized();
1232    let include_bytes = policy.max_approx_bytes.is_some();
1233    let usage_before = transcript_usage(&state.transcript, include_bytes);
1234    let usage_attempted = transcript_usage(&candidate, include_bytes);
1235    let Some(reason) = transcript_budget_exceeded_reason(&usage_attempted, &policy) else {
1236        state.transcript = candidate;
1237        return Ok(());
1238    };
1239
1240    match policy.recovery.clone() {
1241        TranscriptBudgetRecovery::Reject => {
1242            let audit = transcript_budget_audit_json(
1243                "rejected",
1244                source,
1245                reason,
1246                &policy,
1247                &usage_before,
1248                &usage_attempted,
1249                &usage_before,
1250            );
1251            state.last_transcript_budget_action = Some(audit);
1252            Err(transcript_budget_error(
1253                state,
1254                &policy,
1255                &usage_attempted,
1256                reason,
1257            ))
1258        }
1259        TranscriptBudgetRecovery::Trim { keep_last } => {
1260            let recovered = trim_transcript_for_budget(&candidate, &policy, keep_last);
1261            let (with_audit, audit, usage_after) = recovered_transcript_with_audit(
1262                recovered,
1263                "trimmed",
1264                source,
1265                reason,
1266                &policy,
1267                &usage_before,
1268                &usage_attempted,
1269                include_bytes,
1270            );
1271            if transcript_budget_exceeded_reason(&usage_after, &policy).is_some() {
1272                let rejected = transcript_budget_audit_json(
1273                    "rejected",
1274                    source,
1275                    reason,
1276                    &policy,
1277                    &usage_before,
1278                    &usage_attempted,
1279                    &usage_after,
1280                );
1281                state.last_transcript_budget_action = Some(rejected);
1282                return Err(transcript_budget_error(
1283                    state,
1284                    &policy,
1285                    &usage_after,
1286                    reason,
1287                ));
1288            }
1289            state.last_transcript_budget_action = Some(audit);
1290            state.transcript = with_audit;
1291            Ok(())
1292        }
1293        TranscriptBudgetRecovery::Compact { keep_last } => {
1294            let recovered = compact_transcript_for_budget(&candidate, &policy, keep_last, reason);
1295            let (with_audit, audit, usage_after) = recovered_transcript_with_audit(
1296                recovered,
1297                "compacted",
1298                source,
1299                reason,
1300                &policy,
1301                &usage_before,
1302                &usage_attempted,
1303                include_bytes,
1304            );
1305            if transcript_budget_exceeded_reason(&usage_after, &policy).is_some() {
1306                let rejected = transcript_budget_audit_json(
1307                    "rejected",
1308                    source,
1309                    reason,
1310                    &policy,
1311                    &usage_before,
1312                    &usage_attempted,
1313                    &usage_after,
1314                );
1315                state.last_transcript_budget_action = Some(rejected);
1316                return Err(transcript_budget_error(
1317                    state,
1318                    &policy,
1319                    &usage_after,
1320                    reason,
1321                ));
1322            }
1323            state.last_transcript_budget_action = Some(audit);
1324            state.transcript = with_audit;
1325            Ok(())
1326        }
1327    }
1328}
1329
1330fn emit_llm_message_event(session_id: &str, message_index: usize, message: &VmValue) {
1331    let mut fields = serde_json::Map::new();
1332    fields.insert(
1333        "session_id".to_string(),
1334        serde_json::Value::String(session_id.to_string()),
1335    );
1336    fields.insert(
1337        "message_index".to_string(),
1338        serde_json::json!(message_index),
1339    );
1340    let message_json = crate::llm::helpers::vm_value_to_json(message);
1341    if let Some(role) = message_json.get("role").and_then(|value| value.as_str()) {
1342        fields.insert(
1343            "role".to_string(),
1344            serde_json::Value::String(role.to_string()),
1345        );
1346    }
1347    if let Some(content) = message_json.get("content") {
1348        fields.insert("content".to_string(), content.clone());
1349    }
1350    fields.insert("message".to_string(), message_json);
1351    crate::llm::append_observability_sidecar_entry("message", fields);
1352}
1353
1354/// Create a new session from a reconstructed message list.
1355///
1356/// This is intentionally an all-at-once write instead of repeated
1357/// `inject_message` calls: importing a transcript should not re-emit
1358/// each historic turn into the active observability sidecar.
1359pub fn seed_from_messages(
1360    id: Option<String>,
1361    messages: &[serde_json::Value],
1362    metadata: serde_json::Value,
1363    system_prompt: Option<String>,
1364    tool_format: Option<String>,
1365) -> Result<String, String> {
1366    let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
1367    if exists(&resolved) {
1368        return Err(format!("agent session '{resolved}' already exists"));
1369    }
1370    open_or_create(Some(resolved.clone()));
1371    SESSIONS.with(|s| {
1372        let mut map = s.borrow_mut();
1373        let Some(state) = map.get_mut(&resolved) else {
1374            return Err(format!("failed to create agent session '{resolved}'"));
1375        };
1376        state.tool_format = tool_format.filter(|value| !value.trim().is_empty());
1377        state.system_prompt = system_prompt.filter(|value| !value.trim().is_empty());
1378
1379        let mut metadata = metadata
1380            .as_object()
1381            .cloned()
1382            .unwrap_or_else(serde_json::Map::new);
1383        if let Some(tool_format) = state.tool_format.as_ref() {
1384            metadata.insert(
1385                "tool_format".to_string(),
1386                serde_json::Value::String(tool_format.clone()),
1387            );
1388            metadata.insert(
1389                "tool_mode_locked".to_string(),
1390                serde_json::Value::Bool(true),
1391            );
1392        }
1393        if let Some(system_prompt) = state.system_prompt.as_ref() {
1394            metadata.insert(
1395                "system_prompt".to_string(),
1396                crate::llm::helpers::system_prompt_metadata(system_prompt),
1397            );
1398        }
1399        let vm_messages = crate::llm::helpers::json_messages_to_vm(messages);
1400        let candidate = crate::llm::helpers::new_transcript_with(
1401            Some(resolved.clone()),
1402            vm_messages,
1403            None,
1404            Some(crate::stdlib::json_to_vm_value(&serde_json::Value::Object(
1405                metadata,
1406            ))),
1407        );
1408        apply_transcript_with_budget(state, candidate, "seed_from_messages")?;
1409        state.last_accessed = Instant::now();
1410        Ok(resolved)
1411    })
1412}
1413
1414/// Load the messages vec (as JSON) for this session, for use as prefix
1415/// to an agent_loop run. Returns an empty vec if the session doesn't
1416/// exist or has no messages.
1417pub fn messages_json(id: &str) -> Vec<serde_json::Value> {
1418    SESSIONS.with(|s| {
1419        let map = s.borrow();
1420        let Some(state) = map.get(id) else {
1421            return Vec::new();
1422        };
1423        let Some(dict) = state.transcript.as_dict() else {
1424            return Vec::new();
1425        };
1426        match dict.get("messages") {
1427            Some(VmValue::List(list)) => list
1428                .iter()
1429                .map(crate::llm::helpers::vm_value_to_json)
1430                .collect(),
1431            _ => Vec::new(),
1432        }
1433    })
1434}
1435
1436#[derive(Clone, Debug, Default)]
1437pub struct SessionPromptState {
1438    pub messages: Vec<serde_json::Value>,
1439    pub summary: Option<String>,
1440}
1441
1442fn summary_message_json(summary: &str) -> serde_json::Value {
1443    serde_json::json!({
1444        "role": "user",
1445        "content": summary,
1446    })
1447}
1448
1449fn messages_begin_with_summary(messages: &[serde_json::Value], summary: &str) -> bool {
1450    messages.first().is_some_and(|message| {
1451        message.get("role").and_then(|value| value.as_str()) == Some("user")
1452            && message.get("content").and_then(|value| value.as_str()) == Some(summary)
1453    })
1454}
1455
1456/// Prompt-surface resume state for a persisted session.
1457///
1458/// Returns the compacted/rehydratable message list plus the transcript's
1459/// summary field. When the transcript carries a summary field but its
1460/// message list does not already begin with the compacted summary
1461/// message, this helper prepends one so session re-entry preserves the
1462/// same prompt surface the previous loop was actually using.
1463pub fn prompt_state_json(id: &str) -> SessionPromptState {
1464    SESSIONS.with(|s| {
1465        let map = s.borrow();
1466        let Some(state) = map.get(id) else {
1467            return SessionPromptState::default();
1468        };
1469        let Some(dict) = state.transcript.as_dict() else {
1470            return SessionPromptState::default();
1471        };
1472        let mut messages = match dict.get("messages") {
1473            Some(VmValue::List(list)) => list
1474                .iter()
1475                .map(crate::llm::helpers::vm_value_to_json)
1476                .collect::<Vec<_>>(),
1477            _ => Vec::new(),
1478        };
1479        let summary = dict.get("summary").and_then(|value| match value {
1480            VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
1481            _ => None,
1482        });
1483        if let Some(summary_text) = summary.as_deref() {
1484            if !messages_begin_with_summary(&messages, summary_text) {
1485                messages.insert(0, summary_message_json(summary_text));
1486            }
1487        }
1488        SessionPromptState { messages, summary }
1489    })
1490}
1491
1492/// Overwrite the transcript for this session. Used by `agent_loop` on
1493/// exit to persist the synthesized transcript.
1494pub fn store_transcript(id: &str, transcript: VmValue) -> Result<(), String> {
1495    SESSIONS.with(|s| {
1496        let mut map = s.borrow_mut();
1497        let Some(state) = map.get_mut(id) else {
1498            return Err(format!(
1499                "agent_session_store_transcript: unknown session id '{id}'"
1500            ));
1501        };
1502        let transcript = transcript_with_session_metadata(transcript, state);
1503        apply_transcript_with_budget(state, transcript, "store_transcript")?;
1504        state.last_accessed = Instant::now();
1505        Ok(())
1506    })
1507}
1508
1509/// Remove malformed reminder events after their drop audit has been emitted.
1510/// Pending-reminder rendering scans the transcript on every LLM call; pruning
1511/// invalid entries makes the drop event one-shot instead of noisy per turn.
1512pub fn prune_invalid_reminder_events(id: &str) -> usize {
1513    SESSIONS.with(|s| {
1514        let mut map = s.borrow_mut();
1515        let Some(state) = map.get_mut(id) else {
1516            return 0;
1517        };
1518        let Some(dict) = state.transcript.as_dict().cloned() else {
1519            return 0;
1520        };
1521        let Some(VmValue::List(events)) = dict.get("events") else {
1522            return 0;
1523        };
1524        let mut pruned = 0_usize;
1525        let mut kept = Vec::with_capacity(events.len());
1526        for event in events.iter().cloned() {
1527            let is_reminder = event
1528                .as_dict()
1529                .and_then(|event| event.get("kind"))
1530                .map(VmValue::display)
1531                .as_deref()
1532                == Some(crate::llm::helpers::SYSTEM_REMINDER_EVENT_KIND);
1533            if !is_reminder {
1534                kept.push(event);
1535                continue;
1536            }
1537            let valid = crate::llm::helpers::reminder_from_event(&event)
1538                .is_some_and(|reminder| !reminder.body.trim().is_empty());
1539            if valid {
1540                kept.push(event);
1541            } else {
1542                pruned += 1;
1543            }
1544        }
1545        if pruned > 0 {
1546            let mut next = dict;
1547            next.insert("events".to_string(), VmValue::List(Rc::new(kept)));
1548            let _ = apply_transcript_with_budget(
1549                state,
1550                VmValue::Dict(Rc::new(next)),
1551                "prune_invalid_reminder_events",
1552            );
1553            state.last_accessed = Instant::now();
1554        }
1555        pruned
1556    })
1557}
1558
1559/// Apply the reminder TTL lifecycle that runs once per completed agent
1560/// turn. Reminders with `ttl_turns = 1` expire and are removed; larger
1561/// finite TTLs are decremented in place. Expiry audit events are emitted
1562/// to the active EventLog when one is installed.
1563pub fn apply_reminder_post_turn(id: &str, turn: i64) -> Result<serde_json::Value, String> {
1564    let report = SESSIONS.with(|s| {
1565        let mut map = s.borrow_mut();
1566        let Some(state) = map.get_mut(id) else {
1567            return Err(format!(
1568                "agent_session_apply_reminder_post_turn: unknown session id '{id}'"
1569            ));
1570        };
1571        let report = crate::llm::helpers::apply_reminder_post_turn(&state.transcript, turn);
1572        if report.decremented_count > 0 || !report.expired.is_empty() {
1573            if let Some(next) = report.transcript.clone() {
1574                apply_transcript_with_budget(state, next, "apply_reminder_post_turn")?;
1575            }
1576            state.last_accessed = Instant::now();
1577        }
1578        Ok(report)
1579    })?;
1580
1581    for reminder in &report.expired {
1582        let mut payload = crate::llm::helpers::reminder_lifecycle_payload(Some(id), reminder);
1583        if let Some(obj) = payload.as_object_mut() {
1584            obj.insert(
1585                "transcript_id".to_string(),
1586                serde_json::Value::String(id.to_string()),
1587            );
1588            obj.insert(
1589                "reason".to_string(),
1590                serde_json::Value::String("ttl".to_string()),
1591            );
1592            obj.insert(
1593                "ttl_turns_before".to_string(),
1594                serde_json::json!(&reminder.ttl_turns),
1595            );
1596            obj.insert("expired_at_turn".to_string(), serde_json::json!(turn));
1597        }
1598        crate::llm::helpers::emit_reminder_lifecycle_event(
1599            crate::llm::helpers::REMINDER_EXPIRED_EVENT_KIND,
1600            payload,
1601        );
1602    }
1603
1604    Ok(serde_json::json!({
1605        "expired_count": report.expired.len(),
1606        "decremented_count": report.decremented_count,
1607        "remaining_count": report.remaining_count,
1608    }))
1609}
1610
1611/// Inject a typed system reminder into the session transcript's event
1612/// stream. This mirrors `transcript.inject_reminder` for live sessions:
1613/// reminders with the same `dedupe_key` are replaced before the new
1614/// reminder event is appended.
1615pub fn inject_reminder(
1616    id: &str,
1617    reminder: crate::llm::helpers::SystemReminder,
1618) -> Result<ReminderInjectionReport, String> {
1619    let reminder_id = reminder.id.clone();
1620    let dedupe_key = reminder.dedupe_key.clone();
1621    let mut deduped_reminder_ids = Vec::new();
1622    SESSIONS.with(|s| {
1623        let mut map = s.borrow_mut();
1624        let Some(state) = map.get_mut(id) else {
1625            return Err(format!(
1626                "agent_session_inject_reminder: unknown session id '{id}'"
1627            ));
1628        };
1629        let dict = state
1630            .transcript
1631            .as_dict()
1632            .cloned()
1633            .unwrap_or_else(BTreeMap::new);
1634        let mut events: Vec<VmValue> = match dict.get("events") {
1635            Some(VmValue::List(list)) => list.iter().cloned().collect(),
1636            _ => dict
1637                .get("messages")
1638                .and_then(|value| match value {
1639                    VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
1640                    _ => None,
1641                })
1642                .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
1643                .unwrap_or_default(),
1644        };
1645        if let Some(expected_key) = dedupe_key.as_deref() {
1646            events.retain(|event| {
1647                let Some(existing) = crate::llm::helpers::reminder_from_event(event) else {
1648                    return true;
1649                };
1650                if existing.dedupe_key.as_deref() == Some(expected_key) {
1651                    deduped_reminder_ids.push(existing.id);
1652                    false
1653                } else {
1654                    true
1655                }
1656            });
1657        }
1658        events.push(crate::llm::helpers::transcript_reminder_event(&reminder));
1659        let mut next = dict;
1660        next.insert("events".to_string(), VmValue::List(Rc::new(events)));
1661        apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), "inject_reminder")?;
1662        state.last_accessed = Instant::now();
1663        Ok(())
1664    })?;
1665
1666    if !deduped_reminder_ids.is_empty() {
1667        let dropped_count = deduped_reminder_ids.len();
1668        crate::llm::helpers::emit_reminder_lifecycle_event(
1669            crate::llm::helpers::REMINDER_DEDUPED_EVENT_KIND,
1670            serde_json::json!({
1671                "session_id": id,
1672                "transcript_id": id,
1673                "reminder_id": &reminder_id,
1674                "replacing_id": &reminder_id,
1675                "replaced_id": deduped_reminder_ids.first(),
1676                "replaced_ids": &deduped_reminder_ids,
1677                "dedupe_key": &dedupe_key,
1678                "dropped_reminder_ids": &deduped_reminder_ids,
1679                "dropped_count": dropped_count,
1680            }),
1681        );
1682    }
1683
1684    crate::llm::helpers::emit_reminder_lifecycle_event(
1685        crate::llm::helpers::REMINDER_INJECTED_EVENT_KIND,
1686        crate::llm::helpers::reminder_lifecycle_payload(Some(id), &reminder),
1687    );
1688
1689    Ok(ReminderInjectionReport {
1690        reminder_id,
1691        deduped_count: deduped_reminder_ids.len(),
1692    })
1693}
1694
1695/// Append a transcript event to the session without mutating its
1696/// message list. Used for orchestration-side lineage events (sub-agent
1697/// spawn/completion, workflow hooks, etc.) that should survive
1698/// persistence/replay without being replayed back into the model as
1699/// conversational messages.
1700pub fn append_event(id: &str, event: VmValue) -> Result<(), String> {
1701    let Some(event_dict) = event.as_dict() else {
1702        return Err("agent_session_append_event: event must be a dict".into());
1703    };
1704    let kind_ok = matches!(event_dict.get("kind"), Some(VmValue::String(_)));
1705    if !kind_ok {
1706        return Err("agent_session_append_event: event must have a string `kind`".into());
1707    }
1708    SESSIONS.with(|s| {
1709        let mut map = s.borrow_mut();
1710        let Some(state) = map.get_mut(id) else {
1711            return Err(format!(
1712                "agent_session_append_event: unknown session id '{id}'"
1713            ));
1714        };
1715        append_event_to_state(state, event, "append_event")?;
1716        state.last_accessed = Instant::now();
1717        Ok(())
1718    })
1719}
1720
1721fn append_event_to_state(
1722    state: &mut SessionState,
1723    event: VmValue,
1724    action: &str,
1725) -> Result<(), String> {
1726    let dict = state
1727        .transcript
1728        .as_dict()
1729        .cloned()
1730        .unwrap_or_else(BTreeMap::new);
1731    let mut events: Vec<VmValue> = match dict.get("events") {
1732        Some(VmValue::List(list)) => list.iter().cloned().collect(),
1733        _ => dict
1734            .get("messages")
1735            .and_then(|value| match value {
1736                VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
1737                _ => None,
1738            })
1739            .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
1740            .unwrap_or_default(),
1741    };
1742    events.push(event);
1743    let mut next = dict;
1744    next.insert("events".to_string(), VmValue::List(Rc::new(events)));
1745    apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), action)
1746}
1747
1748/// Replace the transcript's message list wholesale. Used by the
1749/// in-loop compaction path, which operates on JSON messages.
1750pub fn replace_messages(id: &str, messages: &[serde_json::Value]) -> Result<(), String> {
1751    replace_messages_with_summary(id, messages, None)
1752}
1753
1754/// Replace the transcript's message list and optionally update the
1755/// `summary` field on the persisted transcript. The compaction path
1756/// uses this to publish the human-readable rollup line that
1757/// `transcript_summary(transcript)` exposes to host code.
1758pub fn replace_messages_with_summary(
1759    id: &str,
1760    messages: &[serde_json::Value],
1761    summary: Option<&str>,
1762) -> Result<(), String> {
1763    SESSIONS.with(|s| {
1764        let mut map = s.borrow_mut();
1765        let Some(state) = map.get_mut(id) else {
1766            return Err(format!(
1767                "agent_session_replace_messages: unknown session id '{id}'"
1768            ));
1769        };
1770        let dict = state
1771            .transcript
1772            .as_dict()
1773            .cloned()
1774            .unwrap_or_else(BTreeMap::new);
1775        let vm_messages: Vec<VmValue> = messages
1776            .iter()
1777            .map(crate::stdlib::json_to_vm_value)
1778            .collect();
1779        let mut next = dict;
1780        next.insert(
1781            "events".to_string(),
1782            VmValue::List(Rc::new(
1783                crate::llm::helpers::transcript_events_from_messages(&vm_messages),
1784            )),
1785        );
1786        next.insert("messages".to_string(), VmValue::List(Rc::new(vm_messages)));
1787        if let Some(summary) = summary {
1788            next.insert(
1789                "summary".to_string(),
1790                VmValue::String(Rc::from(summary.to_string())),
1791            );
1792        } else {
1793            next.remove("summary");
1794        }
1795        apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), "replace_messages")?;
1796        state.last_accessed = Instant::now();
1797        Ok(())
1798    })
1799}
1800
1801pub fn append_subscriber(id: &str, callback: VmValue) {
1802    open_or_create(Some(id.to_string()));
1803    SESSIONS.with(|s| {
1804        if let Some(state) = s.borrow_mut().get_mut(id) {
1805            state.subscribers.push(callback);
1806            state.last_accessed = Instant::now();
1807        }
1808    });
1809}
1810
1811pub fn subscribers_for(id: &str) -> Vec<VmValue> {
1812    SESSIONS.with(|s| {
1813        s.borrow()
1814            .get(id)
1815            .map(|state| state.subscribers.clone())
1816            .unwrap_or_default()
1817    })
1818}
1819
1820pub fn subscriber_count(id: &str) -> usize {
1821    SESSIONS.with(|s| {
1822        s.borrow()
1823            .get(id)
1824            .map(|state| state.subscribers.len())
1825            .unwrap_or(0)
1826    })
1827}
1828
1829/// Persist the set of active skill names for session resume. Called at
1830/// the end of an agent_loop run; the next `open_or_create` for this id
1831/// reads them back via [`active_skills`].
1832pub fn set_active_skills(id: &str, skills: Vec<String>) {
1833    SESSIONS.with(|s| {
1834        if let Some(state) = s.borrow_mut().get_mut(id) {
1835            state.active_skills = skills;
1836            state.last_accessed = Instant::now();
1837        }
1838    });
1839}
1840
1841/// Skills that were active at the end of the previous agent_loop run
1842/// against this session. Returns an empty vec when the session doesn't
1843/// exist or nothing was persisted.
1844pub fn active_skills(id: &str) -> Vec<String> {
1845    SESSIONS.with(|s| {
1846        s.borrow()
1847            .get(id)
1848            .map(|state| state.active_skills.clone())
1849            .unwrap_or_default()
1850    })
1851}
1852
1853/// Claim the tool-calling contract for a session.
1854///
1855/// The first loop against a named session records its `tool_format`.
1856/// Later re-entry must use the same format so prompt/history generated
1857/// under a text contract is never replayed as native, or vice versa.
1858pub fn claim_tool_format(id: &str, tool_format: &str) -> Result<(), String> {
1859    let tool_format = tool_format.trim();
1860    if tool_format.is_empty() {
1861        return Ok(());
1862    }
1863    SESSIONS.with(|s| {
1864        let mut map = s.borrow_mut();
1865        let Some(state) = map.get_mut(id) else {
1866            return Err(format!("agent session '{id}' does not exist"));
1867        };
1868        match state.tool_format.as_deref() {
1869            Some(existing) if existing != tool_format => Err(format!(
1870                "agent session '{id}' is locked to tool_format='{existing}', but this run requested tool_format='{tool_format}'. Start a new session or fork/reset the transcript before changing tool mode."
1871            )),
1872            Some(_) => {
1873                state.last_accessed = Instant::now();
1874                Ok(())
1875            }
1876            None => {
1877                state.tool_format = Some(tool_format.to_string());
1878                state.last_accessed = Instant::now();
1879                Ok(())
1880            }
1881        }
1882    })
1883}
1884
1885pub fn tool_format(id: &str) -> Option<String> {
1886    SESSIONS.with(|s| {
1887        s.borrow()
1888            .get(id)
1889            .and_then(|state| state.tool_format.clone())
1890    })
1891}
1892
1893pub fn record_system_prompt(id: &str, system_prompt: &str) -> Result<(), String> {
1894    let system_prompt = system_prompt.trim();
1895    if system_prompt.is_empty() {
1896        return Ok(());
1897    }
1898    assert_cache_stable_system_prompt(system_prompt);
1899    SESSIONS.with(|s| {
1900        let mut map = s.borrow_mut();
1901        let Some(state) = map.get_mut(id) else {
1902            return Err(format!("agent session '{id}' does not exist"));
1903        };
1904        let changed = state.system_prompt.as_deref() != Some(system_prompt);
1905        state.system_prompt = Some(system_prompt.to_string());
1906        let dict = state
1907            .transcript
1908            .as_dict()
1909            .cloned()
1910            .unwrap_or_else(BTreeMap::new);
1911        let mut next = dict;
1912        apply_system_prompt_metadata(&mut next, system_prompt);
1913        if changed {
1914            let mut events: Vec<VmValue> = match next.get("events") {
1915                Some(VmValue::List(list)) => list.iter().cloned().collect(),
1916                _ => Vec::new(),
1917            };
1918            events.push(crate::llm::helpers::transcript_event(
1919                "system_prompt",
1920                "system",
1921                "internal",
1922                "",
1923                Some(crate::llm::helpers::system_prompt_event_metadata(
1924                    system_prompt,
1925                )),
1926            ));
1927            next.insert("events".to_string(), VmValue::List(Rc::new(events)));
1928        }
1929        apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), "record_system_prompt")?;
1930        state.last_accessed = Instant::now();
1931        Ok(())
1932    })
1933}
1934
1935pub fn system_prompt(id: &str) -> Option<String> {
1936    SESSIONS.with(|s| {
1937        s.borrow()
1938            .get(id)
1939            .and_then(|state| state.system_prompt.clone())
1940    })
1941}
1942
1943#[cfg(debug_assertions)]
1944fn forbidden_workspace_prompt_token(system_prompt: &str) -> Option<&'static str> {
1945    let mut remaining = system_prompt;
1946    while let Some(index) = remaining.find("{{") {
1947        let candidate = remaining[index + 2..].trim_start();
1948        if candidate.starts_with("workspace_") {
1949            return Some("workspace_");
1950        }
1951        if candidate.starts_with("project_") {
1952            return Some("project_");
1953        }
1954        remaining = candidate;
1955    }
1956    None
1957}
1958
1959#[cfg(debug_assertions)]
1960fn assert_cache_stable_system_prompt(system_prompt: &str) {
1961    if let Some(prefix) = forbidden_workspace_prompt_token(system_prompt) {
1962        panic!(
1963            "{CACHE_STABLE_SYSTEM_PROMPT_DIAGNOSTIC}: session system prompts must not interpolate `{{{{{prefix}...` tokens; move workspace/project context into the workspace-anchor reminder"
1964        );
1965    }
1966}
1967
1968#[cfg(not(debug_assertions))]
1969fn assert_cache_stable_system_prompt(_system_prompt: &str) {}
1970
1971/// Pin (or clear, with `None`) a model selector on a session. Returns
1972/// `Ok(true)` when the value actually changed so callers can decide
1973/// whether to broadcast a notification. The selector is stored verbatim
1974/// — alias / catalog resolution is the call-site's job.
1975pub fn set_pinned_model(id: &str, model: Option<String>) -> Result<bool, String> {
1976    let normalized = model
1977        .map(|value| value.trim().to_string())
1978        .filter(|value| !value.is_empty());
1979    SESSIONS.with(|s| {
1980        let mut map = s.borrow_mut();
1981        let Some(state) = map.get_mut(id) else {
1982            return Err(format!("agent session '{id}' does not exist"));
1983        };
1984        let changed = state.pinned_model != normalized;
1985        state.pinned_model = normalized;
1986        state.last_accessed = Instant::now();
1987        Ok(changed)
1988    })
1989}
1990
1991/// Read the session's pinned model selector, if any. Consumed by
1992/// `vm_resolve_model` as the per-session default when a script-level
1993/// `llm_call` does not pass `model:` explicitly.
1994pub fn pinned_model(id: &str) -> Option<String> {
1995    SESSIONS.with(|s| {
1996        s.borrow()
1997            .get(id)
1998            .and_then(|state| state.pinned_model.clone())
1999    })
2000}
2001
2002/// Pin (or clear) the session-level provider-aware reasoning policy.
2003pub fn set_pinned_reasoning_policy(id: &str, policy: Option<String>) -> Result<bool, String> {
2004    let normalized = match policy {
2005        Some(value) => crate::llm::reasoning_policy::normalize_policy_selector(&value)?,
2006        None => None,
2007    };
2008    SESSIONS.with(|s| {
2009        let mut map = s.borrow_mut();
2010        let Some(state) = map.get_mut(id) else {
2011            return Err(format!("agent session '{id}' does not exist"));
2012        };
2013        let changed = state.pinned_reasoning_policy != normalized;
2014        state.pinned_reasoning_policy = normalized;
2015        state.last_accessed = Instant::now();
2016        Ok(changed)
2017    })
2018}
2019
2020/// Read the session's pinned reasoning policy, if any.
2021pub fn pinned_reasoning_policy(id: &str) -> Option<String> {
2022    SESSIONS.with(|s| {
2023        s.borrow()
2024            .get(id)
2025            .and_then(|state| state.pinned_reasoning_policy.clone())
2026    })
2027}
2028
2029/// Set (or clear, with `None`) the typed workspace anchor on a session.
2030/// Returns `Ok(true)` when the value actually changed so callers can
2031/// decide whether to broadcast `AnchorChanged` notifications.
2032pub fn set_workspace_anchor(id: &str, anchor: Option<WorkspaceAnchor>) -> Result<bool, String> {
2033    SESSIONS.with(|s| {
2034        let mut map = s.borrow_mut();
2035        let Some(state) = map.get_mut(id) else {
2036            return Err(format!("agent session '{id}' does not exist"));
2037        };
2038        let changed = state.workspace_anchor != anchor;
2039        state.workspace_anchor = anchor;
2040        if changed {
2041            crate::llm::permissions::clear_session_grants(id);
2042        }
2043        state.last_accessed = Instant::now();
2044        Ok(changed)
2045    })
2046}
2047
2048/// Read the session's typed workspace anchor, if any.
2049pub fn workspace_anchor(id: &str) -> Option<WorkspaceAnchor> {
2050    SESSIONS.with(|s| {
2051        s.borrow()
2052            .get(id)
2053            .and_then(|state| state.workspace_anchor.clone())
2054    })
2055}
2056
2057/// Outcome of `reanchor_session`: previous + new anchor and whether the
2058/// swap actually moved anything. Callers use `changed` to suppress
2059/// no-op transcript / live events.
2060#[derive(Clone, Debug, PartialEq, Eq)]
2061pub struct ReanchorOutcome {
2062    pub previous: Option<WorkspaceAnchor>,
2063    pub current: WorkspaceAnchor,
2064    pub changed: bool,
2065}
2066
2067/// Atomically swap the session's primary anchor + emit the canonical
2068/// `AnchorChanged` transcript event and live `AgentEvent::AnchorChanged`
2069/// notification (#2218). Clears session-scoped permission grants so
2070/// stale anchor-based decisions don't leak into the next turn.
2071pub fn reanchor_session(
2072    id: &str,
2073    new_anchor: WorkspaceAnchor,
2074    carry_transcript: bool,
2075    compacted: bool,
2076    reason: Option<String>,
2077) -> Result<ReanchorOutcome, String> {
2078    let outcome = SESSIONS.with(|s| {
2079        let mut map = s.borrow_mut();
2080        let Some(state) = map.get_mut(id) else {
2081            return Err(format!("agent session '{id}' does not exist"));
2082        };
2083        let previous = state.workspace_anchor.clone();
2084        let changed = previous.as_ref() != Some(&new_anchor);
2085        state.workspace_anchor = Some(new_anchor.clone());
2086        if changed {
2087            crate::llm::permissions::clear_session_grants(id);
2088        }
2089        state.last_accessed = Instant::now();
2090        Ok(ReanchorOutcome {
2091            previous,
2092            current: new_anchor,
2093            changed,
2094        })
2095    })?;
2096    if !outcome.changed {
2097        return Ok(outcome);
2098    }
2099    let previous_json = outcome.previous.as_ref().map(WorkspaceAnchor::to_json);
2100    let current_json = outcome.current.to_json();
2101    let event_metadata = serde_json::json!({
2102        "previous": previous_json,
2103        "current": current_json,
2104        "carry_transcript": carry_transcript,
2105        "compacted": compacted,
2106        "reason": reason,
2107    });
2108    let event = crate::llm::helpers::transcript_event(
2109        "AnchorChanged",
2110        "system",
2111        "internal",
2112        "",
2113        Some(event_metadata),
2114    );
2115    let _ = append_event(id, event);
2116    crate::llm::emit_live_agent_event_sync(&crate::agent_events::AgentEvent::AnchorChanged {
2117        session_id: id.to_string(),
2118        previous: previous_json,
2119        current: current_json,
2120        carry_transcript,
2121        compacted,
2122        reason,
2123    });
2124    Ok(outcome)
2125}
2126
2127/// Set session-local workspace defaults. Returns `Ok(true)` when the
2128/// policy changed.
2129pub fn set_workspace_policy(id: &str, policy: WorkspacePolicy) -> Result<bool, String> {
2130    SESSIONS.with(|s| {
2131        let mut map = s.borrow_mut();
2132        let Some(state) = map.get_mut(id) else {
2133            return Err(format!("agent session '{id}' does not exist"));
2134        };
2135        let changed = state.workspace_policy != policy;
2136        state.workspace_policy = policy;
2137        state.last_accessed = Instant::now();
2138        Ok(changed)
2139    })
2140}
2141
2142/// Read the session's workspace policy, if the session exists.
2143pub fn workspace_policy(id: &str) -> Option<WorkspacePolicy> {
2144    SESSIONS.with(|s| {
2145        s.borrow()
2146            .get(id)
2147            .map(|state| state.workspace_policy.clone())
2148    })
2149}
2150
2151/// Validate and mount an additional workspace root on an anchored
2152/// session. When the path is already mounted, updates its mount mode
2153/// in place and refreshes its `mounted_at` timestamp.
2154pub fn add_workspace_root(
2155    id: &str,
2156    root: &str,
2157    mount_mode: Option<MountMode>,
2158    reason: Option<String>,
2159) -> Result<String, String> {
2160    let normalized_root = validate_workspace_root_path(root)?;
2161    let mounted_at = crate::orchestration::now_rfc3339();
2162    SESSIONS.with(|s| {
2163        let mut map = s.borrow_mut();
2164        let Some(state) = map.get_mut(id) else {
2165            return Err(format!("agent session '{id}' does not exist"));
2166        };
2167        let default_mount_mode = state.workspace_policy.default_mount_mode;
2168        let Some(anchor) = state.workspace_anchor.as_mut() else {
2169            return Err(format!("agent session '{id}' has no workspace anchor"));
2170        };
2171        let resolved_mount_mode = mount_mode.unwrap_or(default_mount_mode);
2172        if let Some(existing) = anchor
2173            .additional_roots
2174            .iter_mut()
2175            .find(|entry| entry.path == normalized_root)
2176        {
2177            existing.mount_mode = resolved_mount_mode;
2178            existing.mounted_at = mounted_at.clone();
2179        } else {
2180            anchor.additional_roots.push(MountedRoot {
2181                path: normalized_root.clone(),
2182                mount_mode: resolved_mount_mode,
2183                mounted_at: mounted_at.clone(),
2184            });
2185        }
2186        let event = crate::llm::helpers::transcript_event(
2187            "RootMounted",
2188            "system",
2189            "internal",
2190            "",
2191            Some(serde_json::json!({
2192                "path": normalized_root.to_string_lossy(),
2193                "mount_mode": resolved_mount_mode.as_str(),
2194                "mounted_at": mounted_at.clone(),
2195                "reason": reason,
2196            })),
2197        );
2198        append_event_to_state(state, event, "add_workspace_root")?;
2199        crate::llm::permissions::clear_session_grants(id);
2200        state.last_accessed = Instant::now();
2201        Ok(mounted_at.clone())
2202    })
2203}
2204
2205/// Remove one mounted root from an anchored session. Returns whether an
2206/// existing mount entry was deleted. Removing an absent root is a no-op.
2207pub fn remove_workspace_root(id: &str, root: &str) -> Result<bool, String> {
2208    let normalized_root = normalize_workspace_root_path(root);
2209    SESSIONS.with(|s| {
2210        let mut map = s.borrow_mut();
2211        let Some(state) = map.get_mut(id) else {
2212            return Err(format!("agent session '{id}' does not exist"));
2213        };
2214        let Some(anchor) = state.workspace_anchor.as_mut() else {
2215            return Err(format!("agent session '{id}' has no workspace anchor"));
2216        };
2217        let before = anchor.additional_roots.len();
2218        anchor
2219            .additional_roots
2220            .retain(|entry| entry.path != normalized_root);
2221        let removed = anchor.additional_roots.len() != before;
2222        if removed {
2223            crate::llm::permissions::clear_session_grants(id);
2224        }
2225        state.last_accessed = Instant::now();
2226        Ok(removed)
2227    })
2228}
2229
2230/// Return the anchored primary path plus the mounted additional roots.
2231pub fn list_workspace_roots(id: &str) -> Result<(PathBuf, Vec<MountedRoot>), String> {
2232    SESSIONS.with(|s| {
2233        let map = s.borrow();
2234        let Some(state) = map.get(id) else {
2235            return Err(format!("agent session '{id}' does not exist"));
2236        };
2237        let Some(anchor) = state.workspace_anchor.as_ref() else {
2238            return Err(format!("agent session '{id}' has no workspace anchor"));
2239        };
2240        Ok((anchor.primary.clone(), anchor.additional_roots.clone()))
2241    })
2242}
2243
2244fn validate_workspace_root_path(root: &str) -> Result<PathBuf, String> {
2245    let normalized = normalize_workspace_root_path(root);
2246    let canonical = std::fs::canonicalize(&normalized)
2247        .map_err(|error| format!("workspace root '{root}' must exist and be readable: {error}"))?;
2248    let metadata = std::fs::metadata(&canonical)
2249        .map_err(|error| format!("workspace root '{root}' must exist and be readable: {error}"))?;
2250    if !metadata.is_dir() {
2251        return Err(format!("workspace root '{root}' must be a directory"));
2252    }
2253    std::fs::read_dir(&canonical)
2254        .map_err(|error| format!("workspace root '{root}' must be readable: {error}"))?;
2255    Ok(canonical)
2256}
2257
2258fn normalize_workspace_root_path(root: &str) -> PathBuf {
2259    let absolute = crate::stdlib::process::normalize_context_path(Path::new(root));
2260    std::fs::canonicalize(&absolute).unwrap_or(absolute)
2261}
2262
2263fn empty_transcript(id: &str) -> VmValue {
2264    use crate::llm::helpers::new_transcript_with;
2265    new_transcript_with(Some(id.to_string()), Vec::new(), None, None)
2266}
2267
2268fn clone_transcript_with_id(transcript: &VmValue, new_id: &str) -> VmValue {
2269    let Some(dict) = transcript.as_dict() else {
2270        return empty_transcript(new_id);
2271    };
2272    let mut next = dict.clone();
2273    next.insert(
2274        "id".to_string(),
2275        VmValue::String(Rc::from(new_id.to_string())),
2276    );
2277    VmValue::Dict(Rc::new(next))
2278}
2279
2280fn clone_transcript_with_parent(transcript: &VmValue, parent_id: &str) -> VmValue {
2281    let Some(dict) = transcript.as_dict() else {
2282        return transcript.clone();
2283    };
2284    let mut next = dict.clone();
2285    let metadata = match next.get("metadata") {
2286        Some(VmValue::Dict(metadata)) => {
2287            let mut metadata = metadata.as_ref().clone();
2288            metadata.insert(
2289                "parent_session_id".to_string(),
2290                VmValue::String(Rc::from(parent_id.to_string())),
2291            );
2292            VmValue::Dict(Rc::new(metadata))
2293        }
2294        _ => VmValue::Dict(Rc::new(BTreeMap::from([(
2295            "parent_session_id".to_string(),
2296            VmValue::String(Rc::from(parent_id.to_string())),
2297        )]))),
2298    };
2299    next.insert("metadata".to_string(), metadata);
2300    VmValue::Dict(Rc::new(next))
2301}
2302
2303fn apply_system_prompt_metadata(next: &mut BTreeMap<String, VmValue>, system_prompt: &str) {
2304    let mut metadata = match next.get("metadata") {
2305        Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
2306        _ => BTreeMap::new(),
2307    };
2308    metadata.insert(
2309        "system_prompt".to_string(),
2310        crate::stdlib::json_to_vm_value(&crate::llm::helpers::system_prompt_metadata(
2311            system_prompt,
2312        )),
2313    );
2314    next.insert("metadata".to_string(), VmValue::Dict(Rc::new(metadata)));
2315}
2316
2317fn transcript_with_session_metadata(transcript: VmValue, state: &SessionState) -> VmValue {
2318    let Some(dict) = transcript.as_dict() else {
2319        return transcript;
2320    };
2321    let mut next = dict.clone();
2322    let mut metadata = match next.get("metadata") {
2323        Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
2324        _ => BTreeMap::new(),
2325    };
2326    if let Some(tool_format) = state.tool_format.as_ref() {
2327        metadata.insert(
2328            "tool_format".to_string(),
2329            VmValue::String(Rc::from(tool_format.clone())),
2330        );
2331        metadata.insert("tool_mode_locked".to_string(), VmValue::Bool(true));
2332    }
2333    if let Some(system_prompt) = state.system_prompt.as_ref() {
2334        metadata.insert(
2335            "system_prompt".to_string(),
2336            crate::stdlib::json_to_vm_value(&crate::llm::helpers::system_prompt_metadata(
2337                system_prompt,
2338            )),
2339        );
2340    }
2341    if let Some(anchor) = state.workspace_anchor.as_ref() {
2342        metadata.insert(
2343            WORKSPACE_ANCHOR_METADATA_KEY.to_string(),
2344            anchor.to_vm_value(),
2345        );
2346    } else {
2347        metadata.remove(WORKSPACE_ANCHOR_METADATA_KEY);
2348    }
2349    if let Some(last_action) = state.last_transcript_budget_action.as_ref() {
2350        let usage = transcript_usage(
2351            &VmValue::Dict(Rc::new(next.clone())),
2352            state.transcript_budget_policy.max_approx_bytes.is_some(),
2353        );
2354        metadata.insert(
2355            "transcript_budget".to_string(),
2356            crate::stdlib::json_to_vm_value(&serde_json::json!({
2357                "policy": transcript_budget_policy_json(&state.transcript_budget_policy.normalized()),
2358                "usage": transcript_budget_usage_json(&usage),
2359                "last_action": last_action,
2360            })),
2361        );
2362    }
2363    if !metadata.is_empty() {
2364        next.insert("metadata".to_string(), VmValue::Dict(Rc::new(metadata)));
2365    }
2366    VmValue::Dict(Rc::new(next))
2367}
2368
2369fn session_snapshot(state: &SessionState) -> VmValue {
2370    let transcript = transcript_with_session_metadata(state.transcript.clone(), state);
2371    let Some(dict) = transcript.as_dict() else {
2372        return state.transcript.clone();
2373    };
2374    let mut next = dict.clone();
2375    let length = next
2376        .get("messages")
2377        .and_then(|value| match value {
2378            VmValue::List(list) => Some(list.len() as i64),
2379            _ => None,
2380        })
2381        .unwrap_or(0);
2382    next.insert("length".to_string(), VmValue::Int(length));
2383    next.insert(
2384        "created_at".to_string(),
2385        VmValue::String(Rc::from(state.created_at.clone())),
2386    );
2387    next.insert(
2388        "parent_id".to_string(),
2389        state
2390            .parent_id
2391            .as_ref()
2392            .map(|id| VmValue::String(Rc::from(id.clone())))
2393            .unwrap_or(VmValue::Nil),
2394    );
2395    next.insert(
2396        "child_ids".to_string(),
2397        VmValue::List(Rc::new(
2398            state
2399                .child_ids
2400                .iter()
2401                .cloned()
2402                .map(|id| VmValue::String(Rc::from(id)))
2403                .collect(),
2404        )),
2405    );
2406    next.insert(
2407        "branched_at_event_index".to_string(),
2408        state
2409            .branched_at_event_index
2410            .map(|index| VmValue::Int(index as i64))
2411            .unwrap_or(VmValue::Nil),
2412    );
2413    next.insert(
2414        "system_prompt".to_string(),
2415        state
2416            .system_prompt
2417            .as_ref()
2418            .map(|prompt| VmValue::String(Rc::from(prompt.clone())))
2419            .unwrap_or(VmValue::Nil),
2420    );
2421    next.insert(
2422        "tool_format".to_string(),
2423        state
2424            .tool_format
2425            .as_ref()
2426            .map(|format| VmValue::String(Rc::from(format.clone())))
2427            .unwrap_or(VmValue::Nil),
2428    );
2429    next.insert(
2430        "pinned_model".to_string(),
2431        state
2432            .pinned_model
2433            .as_ref()
2434            .map(|model| VmValue::String(Rc::from(model.clone())))
2435            .unwrap_or(VmValue::Nil),
2436    );
2437    next.insert(
2438        "pinned_reasoning_policy".to_string(),
2439        state
2440            .pinned_reasoning_policy
2441            .as_ref()
2442            .map(|policy| VmValue::String(Rc::from(policy.clone())))
2443            .unwrap_or(VmValue::Nil),
2444    );
2445    next.insert(
2446        "workspace_anchor".to_string(),
2447        state
2448            .workspace_anchor
2449            .as_ref()
2450            .map(WorkspaceAnchor::to_vm_value)
2451            .unwrap_or(VmValue::Nil),
2452    );
2453    next.insert(
2454        "workspace_policy".to_string(),
2455        state.workspace_policy.to_vm_value(),
2456    );
2457    VmValue::Dict(Rc::new(next))
2458}
2459
2460fn update_lineage(
2461    map: &mut HashMap<String, SessionState>,
2462    parent_id: &str,
2463    child_id: &str,
2464    branched_at_event_index: Option<usize>,
2465) {
2466    let old_parent_id = map.get(child_id).and_then(|child| child.parent_id.clone());
2467    if let Some(old_parent_id) = old_parent_id.filter(|old_parent_id| old_parent_id != parent_id) {
2468        if let Some(old_parent) = map.get_mut(&old_parent_id) {
2469            old_parent.child_ids.retain(|id| id != child_id);
2470            old_parent.last_accessed = Instant::now();
2471        }
2472    }
2473    if let Some(parent) = map.get_mut(parent_id) {
2474        parent.last_accessed = Instant::now();
2475        if !parent.child_ids.iter().any(|id| id == child_id) {
2476            parent.child_ids.push(child_id.to_string());
2477        }
2478    }
2479    if let Some(child) = map.get_mut(child_id) {
2480        child.last_accessed = Instant::now();
2481        child.parent_id = Some(parent_id.to_string());
2482        child.branched_at_event_index = branched_at_event_index;
2483        child.transcript = clone_transcript_with_parent(&child.transcript, parent_id);
2484    }
2485}
2486
2487fn branch_event_index(transcript: &VmValue, keep_first: usize) -> usize {
2488    if keep_first == 0 {
2489        return 0;
2490    }
2491    let Some(dict) = transcript.as_dict() else {
2492        return keep_first;
2493    };
2494    let Some(VmValue::List(events)) = dict.get("events") else {
2495        return keep_first;
2496    };
2497    event_prefix_len_for_messages(events, keep_first)
2498}
2499
2500fn event_kind(event: &VmValue) -> Option<String> {
2501    event
2502        .as_dict()
2503        .and_then(|dict| dict.get("kind"))
2504        .map(VmValue::display)
2505}
2506
2507fn event_id(event: &VmValue) -> Option<String> {
2508    event
2509        .as_dict()
2510        .and_then(|dict| dict.get("id"))
2511        .map(VmValue::display)
2512}
2513
2514fn is_turn_event(event: &VmValue) -> bool {
2515    matches!(
2516        event_kind(event).as_deref(),
2517        Some("message" | "tool_result")
2518    )
2519}
2520
2521fn event_prefix_len_for_messages(events: &[VmValue], keep_first: usize) -> usize {
2522    if keep_first == 0 {
2523        return 0;
2524    }
2525    let mut retained_messages = 0usize;
2526    for (index, event) in events.iter().enumerate() {
2527        if is_turn_event(event) {
2528            retained_messages += 1;
2529            if retained_messages == keep_first {
2530                return index + 1;
2531            }
2532        }
2533    }
2534    events.len()
2535}
2536
2537fn turn_event_id_for_count(events: &[VmValue], keep_first: usize) -> Option<String> {
2538    if keep_first == 0 {
2539        return None;
2540    }
2541    let mut retained_messages = 0usize;
2542    for event in events {
2543        if is_turn_event(event) {
2544            retained_messages += 1;
2545            if retained_messages == keep_first {
2546                return event_id(event);
2547            }
2548        }
2549    }
2550    None
2551}
2552
2553#[cfg(test)]
2554#[path = "agent_sessions_tests.rs"]
2555mod tests;