Skip to main content

harn_vm/
agent_sessions.rs

1//! First-class session storage.
2//!
3//! A session owns three things:
4//!
5//! 1. A transcript dict (messages, events, summary, metadata, …).
6//! 2. Closure subscribers that fire on agent-loop events for this session.
7//! 3. Its own lifecycle (open, reset, fork, trim, compact, close).
8//!
9//! Storage is thread-local because `VmValue` contains `Rc`, which is
10//! neither `Send` nor `Sync`. The agent loop runs on a tokio
11//! current-thread worker, so all session reads and writes happen on the
12//! same thread. The closure-subscribers register, fire, and unregister
13//! on that same thread.
14//!
15//! Lifecycle is explicit. Builtins (`agent_session_open`,
16//! `_reset`, `_fork`, `_fork_at`, `_close`, `_trim`, `_compact`,
17//! `_inject`, `_exists`, `_length`, `_snapshot`, `_ancestry`) drive
18//! the store directly — there is no "policy" config dict that
19//! performs lifecycle as a side effect.
20
21use std::cell::{Cell, RefCell};
22use std::collections::{BTreeMap, HashMap, HashSet};
23use std::future::Future;
24use std::path::{Path, PathBuf};
25use std::rc::Rc;
26use std::time::Instant;
27
28use crate::runtime_limits::RuntimeLimits;
29use crate::value::VmValue;
30use crate::workspace_anchor::{
31    MountMode, MountedRoot, WorkspaceAnchor, WorkspacePolicy, WORKSPACE_ANCHOR_METADATA_KEY,
32};
33
34/// Default cap on concurrent sessions per VM thread. Beyond this the
35/// least-recently-accessed session is evicted on the next `open`.
36pub const DEFAULT_SESSION_CAP: usize = RuntimeLimits::DEFAULT.max_agent_sessions;
37
38/// Default cap on retained prompt-visible messages per session. The
39/// limit is intentionally high enough for normal long-running agents
40/// while still bounding accidental unbounded growth.
41pub const DEFAULT_TRANSCRIPT_MESSAGE_CAP: usize = 4096;
42
43/// Default cap on retained transcript audit events per session. Events
44/// include message-derived entries plus orchestration lifecycle records.
45pub const DEFAULT_TRANSCRIPT_EVENT_CAP: usize = 32768;
46#[cfg(debug_assertions)]
47const CACHE_STABLE_SYSTEM_PROMPT_DIAGNOSTIC: &str = "HARN-CACHE-001";
48
49#[derive(Clone, Debug, PartialEq, Eq)]
50pub enum TranscriptBudgetRecovery {
51    Reject,
52    Trim { keep_last: usize },
53    Compact { keep_last: usize },
54}
55
56#[derive(Clone, Debug, PartialEq, Eq)]
57pub struct SessionTranscriptBudgetPolicy {
58    pub max_messages: usize,
59    pub max_events: usize,
60    pub max_approx_bytes: Option<usize>,
61    pub recovery: TranscriptBudgetRecovery,
62}
63
64impl SessionTranscriptBudgetPolicy {
65    pub fn reject(max_messages: usize, max_events: usize) -> Self {
66        Self {
67            max_messages: max_messages.max(1),
68            max_events: max_events.max(1),
69            max_approx_bytes: None,
70            recovery: TranscriptBudgetRecovery::Reject,
71        }
72    }
73
74    pub fn trim(max_messages: usize, max_events: usize, keep_last: usize) -> Self {
75        Self {
76            max_messages: max_messages.max(1),
77            max_events: max_events.max(1),
78            max_approx_bytes: None,
79            recovery: TranscriptBudgetRecovery::Trim { keep_last },
80        }
81    }
82
83    pub fn compact(max_messages: usize, max_events: usize, keep_last: usize) -> Self {
84        Self {
85            max_messages: max_messages.max(1),
86            max_events: max_events.max(1),
87            max_approx_bytes: None,
88            recovery: TranscriptBudgetRecovery::Compact { keep_last },
89        }
90    }
91
92    pub fn with_max_approx_bytes(mut self, max_approx_bytes: Option<usize>) -> Self {
93        self.max_approx_bytes = max_approx_bytes.map(|limit| limit.max(1));
94        self
95    }
96
97    fn normalized(&self) -> Self {
98        Self {
99            max_messages: self.max_messages.max(1),
100            max_events: self.max_events.max(1),
101            max_approx_bytes: self.max_approx_bytes.map(|limit| limit.max(1)),
102            recovery: self.recovery.clone(),
103        }
104    }
105}
106
107impl Default for SessionTranscriptBudgetPolicy {
108    fn default() -> Self {
109        Self::reject(DEFAULT_TRANSCRIPT_MESSAGE_CAP, DEFAULT_TRANSCRIPT_EVENT_CAP)
110    }
111}
112
113pub struct SessionState {
114    pub id: String,
115    pub transcript: VmValue,
116    pub subscribers: Vec<VmValue>,
117    pub created_at: String,
118    pub last_accessed: Instant,
119    pub parent_id: Option<String>,
120    pub child_ids: Vec<String>,
121    pub branched_at_event_index: Option<usize>,
122    /// Names of skills that were active at the end of the most recent
123    /// `agent_loop` run on this session. Empty when no skills were
124    /// matched, when the skill system wasn't used, or when the
125    /// deactivation phase cleared them. Re-entering the session
126    /// restores these as the initial active set before matching runs.
127    pub active_skills: Vec<String>,
128    /// Tool-calling protocol claimed by the first agent loop that uses
129    /// this session. A transcript is only replayable under the same
130    /// contract that produced its prompt/history.
131    pub tool_format: Option<String>,
132    /// Stable session-level system prompt material. This is transcript
133    /// metadata, not a replay message: providers receive it through
134    /// their system/developer instruction channel on each call.
135    pub system_prompt: Option<String>,
136    /// Session-pinned model selector. When set, `llm_call` invocations
137    /// that do not pass an explicit `model:` option resolve to this
138    /// selector instead of `HARN_LLM_MODEL` / provider defaults. Mid-
139    /// session swap is exposed over ACP via `session/set_config_option`
140    /// (configId="model").
141    pub pinned_model: Option<String>,
142    /// Session-pinned high-level reasoning policy. When set, `llm_call`
143    /// invocations that do not pass explicit `thinking` or
144    /// `reasoning_effort` options resolve this provider-aware policy into
145    /// the route's native thinking shape. Exposed over ACP as
146    /// `session/set_config_option(configId="thought_level")`.
147    pub pinned_reasoning_policy: Option<String>,
148    /// Session-local workspace defaults. Persona and host policy layers
149    /// can update this without rewriting the current anchor.
150    pub workspace_policy: WorkspacePolicy,
151    /// Typed workspace anchor for the session. Primary path plus any
152    /// additional mounted roots; consumed by permission matchers, the
153    /// bundle exporter, and host-side cross-project handoff flows
154    /// (epic #2208). `None` until a host opens the session with one or
155    /// the ACP `reanchor` / `add_root` primitives populate it.
156    pub workspace_anchor: Option<WorkspaceAnchor>,
157    pub transcript_budget_policy: SessionTranscriptBudgetPolicy,
158    pub last_transcript_budget_action: Option<serde_json::Value>,
159}
160
161impl SessionState {
162    fn new(id: String) -> Self {
163        let now = Instant::now();
164        let transcript = empty_transcript(&id);
165        Self {
166            id,
167            transcript,
168            subscribers: Vec::new(),
169            created_at: crate::orchestration::now_rfc3339(),
170            last_accessed: now,
171            parent_id: None,
172            child_ids: Vec::new(),
173            branched_at_event_index: None,
174            active_skills: Vec::new(),
175            tool_format: None,
176            system_prompt: None,
177            pinned_model: None,
178            pinned_reasoning_policy: None,
179            workspace_policy: WorkspacePolicy::default(),
180            workspace_anchor: None,
181            transcript_budget_policy: default_transcript_budget_policy(),
182            last_transcript_budget_action: None,
183        }
184    }
185}
186
187#[derive(Clone, Debug, PartialEq, Eq)]
188pub struct SessionAncestry {
189    pub parent_id: Option<String>,
190    pub child_ids: Vec<String>,
191    pub root_id: String,
192}
193
194#[derive(Clone, Debug, PartialEq, Eq)]
195pub struct SessionTruncateResult {
196    pub kept_turn_count: usize,
197    pub removed_turn_count: usize,
198    pub new_tip_turn_id: Option<String>,
199}
200
201#[derive(Clone, Debug, PartialEq, Eq)]
202pub struct ReminderInjectionReport {
203    pub reminder_id: String,
204    pub deduped_count: usize,
205}
206
207thread_local! {
208    static SESSIONS: RefCell<HashMap<String, SessionState>> = RefCell::new(HashMap::new());
209    static SESSION_CAP: Cell<usize> = const { Cell::new(DEFAULT_SESSION_CAP) };
210    static DEFAULT_TRANSCRIPT_BUDGET_POLICY: RefCell<SessionTranscriptBudgetPolicy> =
211        RefCell::new(SessionTranscriptBudgetPolicy::default());
212    static CURRENT_SESSION_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
213    static CURRENT_TOOL_CALL_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
214}
215
216tokio::task_local! {
217    static CURRENT_TOOL_CALL_TASK: String;
218}
219
220pub struct CurrentSessionGuard {
221    active: bool,
222}
223
224impl Drop for CurrentSessionGuard {
225    fn drop(&mut self) {
226        if self.active {
227            pop_current_session();
228        }
229    }
230}
231
232/// RAII guard that scopes the active tool-call id for the running thread.
233///
234/// Set on entry to a tool dispatch and dropped on exit, so any hostlib
235/// builtin invoked under it (e.g. `tools/write_file`) can resolve the
236/// owning tool call without threading the id through every parameter.
237pub struct CurrentToolCallGuard {
238    active: bool,
239}
240
241impl Drop for CurrentToolCallGuard {
242    fn drop(&mut self) {
243        if self.active {
244            pop_current_tool_call();
245        }
246    }
247}
248
249/// Set the per-thread session cap. Primarily for tests; production VMs
250/// inherit the default.
251pub fn set_session_cap(cap: usize) {
252    SESSION_CAP.with(|c| c.set(cap.max(1)));
253}
254
255pub fn session_cap() -> usize {
256    SESSION_CAP.with(|c| c.get())
257}
258
259pub fn set_default_transcript_budget_policy(policy: SessionTranscriptBudgetPolicy) {
260    DEFAULT_TRANSCRIPT_BUDGET_POLICY.with(|cell| {
261        *cell.borrow_mut() = policy.normalized();
262    });
263}
264
265pub fn reset_default_transcript_budget_policy() {
266    set_default_transcript_budget_policy(SessionTranscriptBudgetPolicy::default());
267}
268
269pub fn default_transcript_budget_policy() -> SessionTranscriptBudgetPolicy {
270    DEFAULT_TRANSCRIPT_BUDGET_POLICY.with(|cell| cell.borrow().clone())
271}
272
273pub fn transcript_budget_policy(id: &str) -> Option<SessionTranscriptBudgetPolicy> {
274    SESSIONS.with(|s| {
275        s.borrow()
276            .get(id)
277            .map(|state| state.transcript_budget_policy.clone())
278    })
279}
280
281pub fn set_transcript_budget_policy(
282    id: &str,
283    policy: SessionTranscriptBudgetPolicy,
284) -> Result<(), String> {
285    SESSIONS.with(|s| {
286        let mut map = s.borrow_mut();
287        let Some(state) = map.get_mut(id) else {
288            return Err(format!("agent session '{id}' does not exist"));
289        };
290        let previous = state.transcript_budget_policy.clone();
291        let previous_action = state.last_transcript_budget_action.clone();
292        state.transcript_budget_policy = policy.normalized();
293        let candidate = state.transcript.clone();
294        if let Err(error) = apply_transcript_with_budget(state, candidate, "policy_update") {
295            state.transcript_budget_policy = previous;
296            state.last_transcript_budget_action = previous_action;
297            return Err(error);
298        }
299        state.last_accessed = Instant::now();
300        Ok(())
301    })
302}
303
304/// Clear the session store. Wired into `reset_llm_state` for test isolation.
305pub fn reset_session_store() {
306    SESSIONS.with(|s| s.borrow_mut().clear());
307    CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().clear());
308    CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow_mut().clear());
309    reset_default_transcript_budget_policy();
310}
311
312pub(crate) fn push_current_session(id: String) {
313    if id.is_empty() {
314        return;
315    }
316    CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().push(id));
317}
318
319pub(crate) fn pop_current_session() {
320    CURRENT_SESSION_STACK.with(|stack| {
321        let _ = stack.borrow_mut().pop();
322    });
323}
324
325pub fn current_session_id() -> Option<String> {
326    CURRENT_SESSION_STACK.with(|stack| stack.borrow().last().cloned())
327}
328
329pub fn enter_current_session(id: impl Into<String>) -> CurrentSessionGuard {
330    let id = id.into();
331    if id.trim().is_empty() {
332        return CurrentSessionGuard { active: false };
333    }
334    push_current_session(id);
335    CurrentSessionGuard { active: true }
336}
337
338fn push_current_tool_call(id: String) {
339    if id.is_empty() {
340        return;
341    }
342    CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow_mut().push(id));
343}
344
345fn pop_current_tool_call() {
346    CURRENT_TOOL_CALL_STACK.with(|stack| {
347        let _ = stack.borrow_mut().pop();
348    });
349}
350
351/// Return the active tool-call id for the current thread, if any.
352///
353/// Hostlib builtins consult this to attribute side-effect snapshots to
354/// the owning ACP `toolCallId` without callers passing it explicitly.
355pub fn current_tool_call_id() -> Option<String> {
356    if let Ok(id) = CURRENT_TOOL_CALL_TASK.try_with(Clone::clone) {
357        if !id.trim().is_empty() {
358            return Some(id);
359        }
360    }
361    CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow().last().cloned())
362}
363
364/// Scope the active tool-call id to one async task.
365///
366/// Parallel tool dispatch runs sibling calls on the same OS thread, so
367/// thread-local guards alone cannot preserve attribution across `.await`
368/// points. Tokio task-local scoping follows the future instead.
369pub async fn scope_current_tool_call<F, T>(id: impl Into<String>, future: F) -> T
370where
371    F: Future<Output = T>,
372{
373    let id = id.into();
374    if id.trim().is_empty() {
375        future.await
376    } else {
377        CURRENT_TOOL_CALL_TASK.scope(id, future).await
378    }
379}
380
381/// Scope the active tool-call id for the duration of the returned guard.
382pub fn enter_current_tool_call(id: impl Into<String>) -> CurrentToolCallGuard {
383    let id = id.into();
384    if id.trim().is_empty() {
385        return CurrentToolCallGuard { active: false };
386    }
387    push_current_tool_call(id);
388    CurrentToolCallGuard { active: true }
389}
390
391pub fn exists(id: &str) -> bool {
392    SESSIONS.with(|s| s.borrow().contains_key(id))
393}
394
395pub fn length(id: &str) -> Option<usize> {
396    SESSIONS.with(|s| {
397        s.borrow().get(id).map(|state| {
398            state
399                .transcript
400                .as_dict()
401                .and_then(|d| d.get("messages"))
402                .and_then(|v| match v {
403                    VmValue::List(list) => Some(list.len()),
404                    _ => None,
405                })
406                .unwrap_or(0)
407        })
408    })
409}
410
411pub fn snapshot(id: &str) -> Option<VmValue> {
412    SESSIONS.with(|s| s.borrow().get(id).map(session_snapshot))
413}
414
415/// Session-only fields stay on `agent_session_snapshot`.
416pub fn transcript(id: &str) -> Option<VmValue> {
417    SESSIONS.with(|s| {
418        s.borrow()
419            .get(id)
420            .map(|state| transcript_with_session_metadata(state.transcript.clone(), state))
421    })
422}
423
424/// Open a session, or create it if missing. Returns the resolved id.
425///
426/// Newly-created sessions auto-register an event-log-backed sink when a
427/// generalized [`crate::event_log::EventLog`] has been installed for the
428/// current VM thread. For legacy env-driven workflows that still point
429/// `HARN_EVENT_LOG_DIR` at a directory, we preserve the older JSONL sink
430/// as a compatibility fallback. Re-opening an existing session does not
431/// re-register — sinks are per-session, owned by the first opener.
432pub fn open_or_create(id: Option<String>) -> String {
433    let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
434    let parent_session = current_session_id();
435    let mut was_new = false;
436    SESSIONS.with(|s| {
437        let mut map = s.borrow_mut();
438        if let Some(state) = map.get_mut(&resolved) {
439            state.last_accessed = Instant::now();
440            return;
441        }
442        was_new = true;
443        let cap = SESSION_CAP.with(|c| c.get());
444        if map.len() >= cap {
445            if let Some(victim) = map
446                .iter()
447                .min_by_key(|(_, state)| state.last_accessed)
448                .map(|(id, _)| id.clone())
449            {
450                map.remove(&victim);
451            }
452        }
453        map.insert(resolved.clone(), SessionState::new(resolved.clone()));
454    });
455    if was_new {
456        if let Some(parent) = parent_session.as_deref() {
457            crate::agent_events::mirror_session_sinks(parent, &resolved);
458        }
459        try_register_event_log(&resolved);
460    }
461    resolved
462}
463
464pub fn open_child_session(parent_id: &str, id: Option<String>) -> String {
465    let resolved = open_or_create(id);
466    link_child_session(parent_id, &resolved);
467    resolved
468}
469
470pub fn link_child_session(parent_id: &str, child_id: &str) {
471    link_child_session_with_branch(parent_id, child_id, None);
472}
473
474pub fn link_child_session_with_branch(
475    parent_id: &str,
476    child_id: &str,
477    branched_at_event_index: Option<usize>,
478) {
479    if parent_id == child_id {
480        return;
481    }
482    open_or_create(Some(parent_id.to_string()));
483    open_or_create(Some(child_id.to_string()));
484    SESSIONS.with(|s| {
485        let mut map = s.borrow_mut();
486        update_lineage(&mut map, parent_id, child_id, branched_at_event_index);
487    });
488}
489
490pub fn parent_id(id: &str) -> Option<String> {
491    SESSIONS.with(|s| s.borrow().get(id).and_then(|state| state.parent_id.clone()))
492}
493
494pub fn child_ids(id: &str) -> Vec<String> {
495    SESSIONS.with(|s| {
496        s.borrow()
497            .get(id)
498            .map(|state| state.child_ids.clone())
499            .unwrap_or_default()
500    })
501}
502
503pub fn ancestry(id: &str) -> Option<SessionAncestry> {
504    SESSIONS.with(|s| {
505        let map = s.borrow();
506        let state = map.get(id)?;
507        let mut root_id = state.id.clone();
508        let mut cursor = state.parent_id.clone();
509        let mut seen = HashSet::from([state.id.clone()]);
510        while let Some(parent_id) = cursor {
511            if !seen.insert(parent_id.clone()) {
512                break;
513            }
514            root_id = parent_id.clone();
515            cursor = map
516                .get(&parent_id)
517                .and_then(|parent| parent.parent_id.clone());
518        }
519        Some(SessionAncestry {
520            parent_id: state.parent_id.clone(),
521            child_ids: state.child_ids.clone(),
522            root_id,
523        })
524    })
525}
526
527/// Auto-register a persistent sink for a newly-created session.
528/// Silent no-op on failure — a broken observability sink must never
529/// prevent a session from starting.
530fn try_register_event_log(session_id: &str) {
531    if let Some(log) = crate::event_log::active_event_log() {
532        crate::agent_events::register_sink(
533            session_id,
534            crate::agent_events::EventLogSink::new(log, session_id),
535        );
536        return;
537    }
538    let Ok(dir) = std::env::var("HARN_EVENT_LOG_DIR") else {
539        return;
540    };
541    if dir.is_empty() {
542        return;
543    }
544    let path = std::path::PathBuf::from(dir).join(format!("event_log-{session_id}.jsonl"));
545    if let Ok(sink) = crate::agent_events::JsonlEventSink::open(&path) {
546        crate::agent_events::register_sink(session_id, sink);
547    }
548}
549
550pub fn register_event_log_sink(session_id: &str) {
551    try_register_event_log(session_id);
552}
553
554pub fn close(id: &str) {
555    SESSIONS.with(|s| {
556        s.borrow_mut().remove(id);
557    });
558    // Cross-thread per-session state must be released too, otherwise
559    // pending inbox entries can be delivered to a future session that
560    // happens to reuse the same id.
561    crate::orchestration::agent_inbox::clear_session(id);
562    crate::agent_events::clear_session_sinks(id);
563}
564
565pub fn close_with_status(
566    id: &str,
567    reason: impl Into<String>,
568    status: impl Into<String>,
569    metadata: serde_json::Value,
570) -> bool {
571    if !exists(id) {
572        return false;
573    }
574    let reason = reason.into();
575    let status = status.into();
576    let event_metadata = serde_json::json!({
577        "reason": reason,
578        "status": status,
579        "metadata": metadata,
580    });
581    let transcript_event = crate::llm::helpers::transcript_event(
582        "agent_session_closed",
583        "system",
584        "internal",
585        "Agent session closed",
586        Some(event_metadata),
587    );
588    let _ = append_event(id, transcript_event);
589    crate::llm::emit_live_agent_event_sync(&crate::agent_events::AgentEvent::SessionClosed {
590        session_id: id.to_string(),
591        reason,
592        status,
593        metadata,
594    });
595    close(id);
596    true
597}
598
599pub fn reset_transcript(id: &str) -> bool {
600    SESSIONS.with(|s| {
601        let mut map = s.borrow_mut();
602        let Some(state) = map.get_mut(id) else {
603            return false;
604        };
605        state.transcript = empty_transcript(id);
606        state.tool_format = None;
607        state.system_prompt = None;
608        state.last_transcript_budget_action = None;
609        state.last_accessed = Instant::now();
610        true
611    })
612}
613
614/// Copy `src`'s transcript into a new session id. Subscribers are NOT
615/// copied — a fork is a conversation branch, not an event fanout.
616///
617/// Touches `src`'s `last_accessed` before evicting, so the fork
618/// operation itself can't make `src` look stale and kick it out of
619/// the LRU just to make room for the new fork.
620pub fn fork(src_id: &str, dst_id: Option<String>) -> Option<String> {
621    let (
622        src_transcript,
623        src_tool_format,
624        src_system_prompt,
625        src_pinned_model,
626        src_pinned_reasoning_policy,
627        src_workspace_anchor,
628        src_workspace_policy,
629        src_transcript_budget_policy,
630        src_last_transcript_budget_action,
631        dst,
632    ) = SESSIONS.with(|s| {
633        let mut map = s.borrow_mut();
634        let src = map.get_mut(src_id)?;
635        src.last_accessed = Instant::now();
636        let dst = dst_id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
637        let forked_transcript = clone_transcript_with_id(&src.transcript, &dst);
638        Some((
639            forked_transcript,
640            src.tool_format.clone(),
641            src.system_prompt.clone(),
642            src.pinned_model.clone(),
643            src.pinned_reasoning_policy.clone(),
644            src.workspace_anchor.clone(),
645            src.workspace_policy.clone(),
646            src.transcript_budget_policy.clone(),
647            src.last_transcript_budget_action.clone(),
648            dst,
649        ))
650    })?;
651    // Ensure cap is respected when inserting the fork.
652    open_or_create(Some(dst.clone()));
653    SESSIONS.with(|s| {
654        let mut map = s.borrow_mut();
655        if let Some(state) = map.get_mut(&dst) {
656            state.transcript = src_transcript;
657            state.tool_format = src_tool_format;
658            state.system_prompt = src_system_prompt;
659            state.pinned_model = src_pinned_model;
660            state.pinned_reasoning_policy = src_pinned_reasoning_policy;
661            state.workspace_anchor = src_workspace_anchor;
662            state.workspace_policy = src_workspace_policy;
663            state.transcript_budget_policy = src_transcript_budget_policy;
664            state.last_transcript_budget_action = src_last_transcript_budget_action;
665            state.last_accessed = Instant::now();
666        }
667        update_lineage(&mut map, src_id, &dst, None);
668    });
669    let budget_ok = SESSIONS.with(|s| {
670        let mut map = s.borrow_mut();
671        let Some(state) = map.get_mut(&dst) else {
672            return false;
673        };
674        let candidate = state.transcript.clone();
675        apply_transcript_with_budget(state, candidate, "fork").is_ok()
676    });
677    if !budget_ok {
678        close(&dst);
679        return None;
680    }
681    // open_or_create evicts BEFORE inserting, so the dst slot is
682    // guaranteed once we get here. The existence check is cheap
683    // insurance against a future refactor that breaks that invariant.
684    if exists(&dst) {
685        Some(dst)
686    } else {
687        None
688    }
689}
690
691/// Fork `src_id` and truncate the destination transcript to the
692/// first `keep_first` messages (#105 — branch-replay). Pairs with the
693/// scrubber: the host picks an event index, rebuilds a message count,
694/// and calls this to spawn a live sibling session that resumes from
695/// the rebuilt state. Subscribers are not carried over (same as
696/// `fork`), so sibling events don't double-fan into the parent's
697/// consumers.
698///
699/// Returns the new session id on success, `None` if `src_id` doesn't
700/// exist.
701pub fn fork_at(src_id: &str, keep_first: usize, dst_id: Option<String>) -> Option<String> {
702    let branched_at_event_index = SESSIONS.with(|s| {
703        let map = s.borrow();
704        let src = map.get(src_id)?;
705        Some(branch_event_index(&src.transcript, keep_first))
706    })?;
707    let new_id = fork(src_id, dst_id)?;
708    link_child_session_with_branch(src_id, &new_id, Some(branched_at_event_index));
709    let _ = truncate(&new_id, keep_first);
710    Some(new_id)
711}
712
713/// Truncate the session transcript to the first `keep_first`
714/// messages (opposite of `trim`, which keeps the last N). Returns
715/// counts and the retained tip event id when the session exists.
716pub fn truncate(id: &str, keep_first: usize) -> Option<SessionTruncateResult> {
717    SESSIONS.with(|s| {
718        let mut map = s.borrow_mut();
719        let state = map.get_mut(id)?;
720        let result = truncate_state(state, keep_first)?;
721        Some(result)
722    })
723}
724
725fn truncate_state(state: &mut SessionState, keep_first: usize) -> Option<SessionTruncateResult> {
726    let dict = state
727        .transcript
728        .as_dict()
729        .cloned()
730        .unwrap_or_else(BTreeMap::new);
731    let messages: Vec<VmValue> = match dict.get("messages") {
732        Some(VmValue::List(list)) => list.iter().cloned().collect(),
733        _ => Vec::new(),
734    };
735    let existing_events = match dict.get("events") {
736        Some(VmValue::List(list)) => Some(list.iter().cloned().collect::<Vec<_>>()),
737        _ => None,
738    };
739    let kept_turn_count = keep_first.min(messages.len());
740    let removed_turn_count = messages.len().saturating_sub(kept_turn_count);
741    let mut new_tip_turn_id = existing_events
742        .as_ref()
743        .map(|events| turn_event_id_for_count(events, kept_turn_count))
744        .unwrap_or_else(|| {
745            let events = crate::llm::helpers::transcript_events_from_messages(&messages);
746            turn_event_id_for_count(&events, kept_turn_count)
747        });
748
749    if removed_turn_count > 0 {
750        let retained: Vec<VmValue> = messages.into_iter().take(kept_turn_count).collect();
751        let retained_events = match existing_events {
752            Some(events) => {
753                let keep_event_count = event_prefix_len_for_messages(&events, kept_turn_count);
754                events.into_iter().take(keep_event_count).collect()
755            }
756            None => crate::llm::helpers::transcript_events_from_messages(&retained),
757        };
758        new_tip_turn_id = turn_event_id_for_count(&retained_events, kept_turn_count);
759        let mut next = dict;
760        next.insert(
761            "events".to_string(),
762            VmValue::List(Rc::new(retained_events)),
763        );
764        next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
765        next.remove("summary");
766        apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), "truncate").ok()?;
767    }
768    state.last_accessed = Instant::now();
769    Some(SessionTruncateResult {
770        kept_turn_count,
771        removed_turn_count,
772        new_tip_turn_id,
773    })
774}
775
776/// Pop the trailing message iff it is an assistant message. Used by
777/// `agent_step_judge` to remove a vetoed assistant turn before
778/// regeneration (the "replace" on_veto path). Returns `true` if a
779/// message was popped, `false` if the transcript was empty, and an
780/// error if the trailing message was not an assistant turn —
781/// signalling a call-site discipline bug rather than a runtime error.
782pub fn pop_last_if_assistant(id: &str) -> Result<bool, String> {
783    SESSIONS.with(|s| {
784        let mut map = s.borrow_mut();
785        let Some(state) = map.get_mut(id) else {
786            return Err(format!(
787                "pop_last_if_assistant: unknown session id '{id}'"
788            ));
789        };
790        let messages: Vec<VmValue> = match state.transcript.as_dict() {
791            Some(dict) => match dict.get("messages") {
792                Some(VmValue::List(list)) => list.iter().cloned().collect(),
793                _ => Vec::new(),
794            },
795            None => Vec::new(),
796        };
797        if messages.is_empty() {
798            return Ok(false);
799        }
800        let trailing_role = messages
801            .last()
802            .and_then(|m| m.as_dict())
803            .and_then(|d| d.get("role"))
804            .map(|v| v.display())
805            .unwrap_or_default();
806        if trailing_role != "assistant" {
807            return Err(format!(
808                "pop_last_if_assistant: trailing message role is '{trailing_role}', expected 'assistant'"
809            ));
810        }
811        let keep = messages.len() - 1;
812        truncate_state(state, keep);
813        Ok(true)
814    })
815}
816
817pub fn trim(id: &str, keep_last: usize) -> Option<usize> {
818    SESSIONS.with(|s| {
819        let mut map = s.borrow_mut();
820        let state = map.get_mut(id)?;
821        let dict = state.transcript.as_dict()?.clone();
822        let messages: Vec<VmValue> = match dict.get("messages") {
823            Some(VmValue::List(list)) => list.iter().cloned().collect(),
824            _ => Vec::new(),
825        };
826        let start = messages.len().saturating_sub(keep_last);
827        let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
828        let kept = retained.len();
829        let mut next = dict;
830        next.insert(
831            "events".to_string(),
832            VmValue::List(Rc::new(
833                crate::llm::helpers::transcript_events_from_messages(&retained),
834            )),
835        );
836        next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
837        apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), "trim").ok()?;
838        state.last_accessed = Instant::now();
839        Some(kept)
840    })
841}
842
843/// Append a message dict to the session transcript. The message must
844/// have at least a string `role`; anything else is merged verbatim.
845pub fn inject_message(id: &str, message: VmValue) -> Result<(), String> {
846    let Some(msg_dict) = message.as_dict().cloned() else {
847        return Err("agent_session_inject: message must be a dict".into());
848    };
849    let role_ok = matches!(msg_dict.get("role"), Some(VmValue::String(_)));
850    if !role_ok {
851        return Err(
852            "agent_session_inject: message must have a string `role` (user|assistant|tool_result|system)"
853                .into(),
854        );
855    }
856    SESSIONS.with(|s| {
857        let mut map = s.borrow_mut();
858        let Some(state) = map.get_mut(id) else {
859            return Err(format!("agent_session_inject: unknown session id '{id}'"));
860        };
861        let dict = state
862            .transcript
863            .as_dict()
864            .cloned()
865            .unwrap_or_else(BTreeMap::new);
866        let mut messages: Vec<VmValue> = match dict.get("messages") {
867            Some(VmValue::List(list)) => list.iter().cloned().collect(),
868            _ => Vec::new(),
869        };
870        let mut events: Vec<VmValue> = match dict.get("events") {
871            Some(VmValue::List(list)) => list.iter().cloned().collect(),
872            _ => crate::llm::helpers::transcript_events_from_messages(&messages),
873        };
874        let new_message = VmValue::Dict(Rc::new(msg_dict));
875        let message_index = messages.len();
876        events.push(crate::llm::helpers::transcript_event_from_message(
877            &new_message,
878        ));
879        messages.push(new_message);
880        let mut next = dict;
881        next.insert("events".to_string(), VmValue::List(Rc::new(events)));
882        next.insert("messages".to_string(), VmValue::List(Rc::new(messages)));
883        let persisted_message = next
884            .get("messages")
885            .and_then(|value| match value {
886                VmValue::List(list) => list.get(message_index).cloned(),
887                _ => None,
888            })
889            .unwrap_or(VmValue::Nil);
890        apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), "inject_message")?;
891        emit_identified_user_message_event(id, &persisted_message);
892        emit_llm_message_event(id, message_index, &persisted_message);
893        state.last_accessed = Instant::now();
894        Ok(())
895    })
896}
897
898fn emit_identified_user_message_event(session_id: &str, message: &VmValue) {
899    let message_json = crate::llm::helpers::vm_value_to_json(message);
900    let role = message_json.get("role").and_then(|value| value.as_str());
901    if role != Some("user") {
902        return;
903    }
904    let Some(message_id) = message_json
905        .get("messageId")
906        .or_else(|| message_json.get("message_id"))
907        .and_then(|value| value.as_str())
908        .filter(|value| !value.trim().is_empty())
909    else {
910        return;
911    };
912    let content = message_json
913        .get("content")
914        .map(user_message_content_blocks)
915        .unwrap_or_default();
916    crate::agent_events::emit_event(&crate::agent_events::AgentEvent::UserMessage {
917        session_id: session_id.to_string(),
918        message_id: message_id.to_string(),
919        content,
920    });
921}
922
923fn user_message_content_blocks(content: &serde_json::Value) -> Vec<serde_json::Value> {
924    match content {
925        serde_json::Value::Array(items) => items.clone(),
926        serde_json::Value::String(text) => vec![serde_json::json!({
927            "type": "text",
928            "text": text,
929        })],
930        other => vec![serde_json::json!({
931            "type": "text",
932            "text": other.to_string(),
933        })],
934    }
935}
936
937#[derive(Clone, Debug, PartialEq, Eq)]
938struct TranscriptBudgetUsage {
939    message_count: usize,
940    event_count: usize,
941    approx_bytes: Option<usize>,
942}
943
944fn transcript_messages_from_dict(dict: &BTreeMap<String, VmValue>) -> Vec<VmValue> {
945    match dict.get("messages") {
946        Some(VmValue::List(list)) => list.iter().cloned().collect(),
947        _ => Vec::new(),
948    }
949}
950
951fn transcript_events_from_dict(dict: &BTreeMap<String, VmValue>) -> Vec<VmValue> {
952    match dict.get("events") {
953        Some(VmValue::List(list)) => list.iter().cloned().collect(),
954        _ => {
955            let messages = transcript_messages_from_dict(dict);
956            crate::llm::helpers::transcript_events_from_messages(&messages)
957        }
958    }
959}
960
961fn transcript_usage(transcript: &VmValue, include_bytes: bool) -> TranscriptBudgetUsage {
962    let Some(dict) = transcript.as_dict() else {
963        return TranscriptBudgetUsage {
964            message_count: 0,
965            event_count: 0,
966            approx_bytes: include_bytes.then_some(0),
967        };
968    };
969    let approx_bytes = if include_bytes {
970        serde_json::to_vec(&crate::llm::helpers::vm_value_to_json(transcript))
971            .map(|bytes| bytes.len())
972            .ok()
973            .or(Some(usize::MAX))
974    } else {
975        None
976    };
977    TranscriptBudgetUsage {
978        message_count: transcript_messages_from_dict(dict).len(),
979        event_count: transcript_events_from_dict(dict).len(),
980        approx_bytes,
981    }
982}
983
984fn transcript_budget_exceeded_reason(
985    usage: &TranscriptBudgetUsage,
986    policy: &SessionTranscriptBudgetPolicy,
987) -> Option<&'static str> {
988    if usage.message_count > policy.max_messages {
989        return Some("message_count");
990    }
991    if usage.event_count > policy.max_events {
992        return Some("event_count");
993    }
994    if let (Some(bytes), Some(limit)) = (usage.approx_bytes, policy.max_approx_bytes) {
995        if bytes > limit {
996            return Some("approx_bytes");
997        }
998    }
999    None
1000}
1001
1002fn transcript_budget_usage_json(usage: &TranscriptBudgetUsage) -> serde_json::Value {
1003    serde_json::json!({
1004        "messages": usage.message_count,
1005        "events": usage.event_count,
1006        "approx_bytes": usage.approx_bytes,
1007    })
1008}
1009
1010fn transcript_budget_policy_json(policy: &SessionTranscriptBudgetPolicy) -> serde_json::Value {
1011    let recovery = match &policy.recovery {
1012        TranscriptBudgetRecovery::Reject => serde_json::json!({"action": "reject"}),
1013        TranscriptBudgetRecovery::Trim { keep_last } => {
1014            serde_json::json!({"action": "trim", "keep_last": keep_last})
1015        }
1016        TranscriptBudgetRecovery::Compact { keep_last } => {
1017            serde_json::json!({"action": "compact", "keep_last": keep_last})
1018        }
1019    };
1020    serde_json::json!({
1021        "max_messages": policy.max_messages,
1022        "max_events": policy.max_events,
1023        "max_approx_bytes": policy.max_approx_bytes,
1024        "recovery": recovery,
1025    })
1026}
1027
1028fn transcript_budget_recovery_name(recovery: &TranscriptBudgetRecovery) -> &'static str {
1029    match recovery {
1030        TranscriptBudgetRecovery::Reject => "reject",
1031        TranscriptBudgetRecovery::Trim { .. } => "trim",
1032        TranscriptBudgetRecovery::Compact { .. } => "compact",
1033    }
1034}
1035
1036fn transcript_budget_error(
1037    state: &SessionState,
1038    policy: &SessionTranscriptBudgetPolicy,
1039    usage: &TranscriptBudgetUsage,
1040    reason: &str,
1041) -> String {
1042    let byte_suffix = match (usage.approx_bytes, policy.max_approx_bytes) {
1043        (Some(bytes), Some(limit)) => format!(", approx_bytes {bytes}/{limit}"),
1044        _ => String::new(),
1045    };
1046    format!(
1047        "transcript budget exceeded for session '{}': {reason} (messages {}/{}, events {}/{}{}; recovery={})",
1048        state.id,
1049        usage.message_count,
1050        policy.max_messages,
1051        usage.event_count,
1052        policy.max_events,
1053        byte_suffix,
1054        transcript_budget_recovery_name(&policy.recovery),
1055    )
1056}
1057
1058fn transcript_budget_audit_json(
1059    action: &str,
1060    source: &str,
1061    reason: &str,
1062    policy: &SessionTranscriptBudgetPolicy,
1063    usage_before: &TranscriptBudgetUsage,
1064    usage_attempted: &TranscriptBudgetUsage,
1065    usage_after: &TranscriptBudgetUsage,
1066) -> serde_json::Value {
1067    serde_json::json!({
1068        "action": action,
1069        "source": source,
1070        "reason": reason,
1071        "policy": transcript_budget_policy_json(policy),
1072        "usage_before": transcript_budget_usage_json(usage_before),
1073        "usage_attempted": transcript_budget_usage_json(usage_attempted),
1074        "usage_after": transcript_budget_usage_json(usage_after),
1075        "removed_messages": usage_attempted.message_count.saturating_sub(usage_after.message_count),
1076        "removed_events": usage_attempted.event_count.saturating_sub(usage_after.event_count),
1077    })
1078}
1079
1080fn transcript_budget_event(audit: &serde_json::Value) -> VmValue {
1081    let action = audit
1082        .get("action")
1083        .and_then(serde_json::Value::as_str)
1084        .unwrap_or("enforced");
1085    crate::llm::helpers::transcript_event(
1086        "transcript_budget",
1087        "system",
1088        "internal",
1089        &format!("Transcript budget {action}."),
1090        Some(audit.clone()),
1091    )
1092}
1093
1094fn append_event_to_transcript(transcript: VmValue, event: VmValue) -> VmValue {
1095    let Some(dict) = transcript.as_dict() else {
1096        return transcript;
1097    };
1098    let mut next = dict.clone();
1099    let mut events = transcript_events_from_dict(&next);
1100    events.push(event);
1101    next.insert("events".to_string(), VmValue::List(Rc::new(events)));
1102    VmValue::Dict(Rc::new(next))
1103}
1104
1105fn summary_message_vm(summary: &str) -> VmValue {
1106    crate::stdlib::json_to_vm_value(&summary_message_json(summary))
1107}
1108
1109fn tail_message_capacity(
1110    policy: &SessionTranscriptBudgetPolicy,
1111    reserve_audit_event: bool,
1112) -> usize {
1113    let event_capacity = if reserve_audit_event {
1114        policy.max_events.saturating_sub(1)
1115    } else {
1116        policy.max_events
1117    };
1118    policy.max_messages.min(event_capacity)
1119}
1120
1121fn trim_transcript_for_budget(
1122    transcript: &VmValue,
1123    policy: &SessionTranscriptBudgetPolicy,
1124    keep_last: usize,
1125) -> VmValue {
1126    let dict = transcript.as_dict().cloned().unwrap_or_else(BTreeMap::new);
1127    let messages = transcript_messages_from_dict(&dict);
1128    let keep = keep_last.min(tail_message_capacity(policy, true));
1129    let start = messages.len().saturating_sub(keep);
1130    let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
1131    let mut next = dict;
1132    next.insert(
1133        "events".to_string(),
1134        VmValue::List(Rc::new(
1135            crate::llm::helpers::transcript_events_from_messages(&retained),
1136        )),
1137    );
1138    next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
1139    next.remove("summary");
1140    VmValue::Dict(Rc::new(next))
1141}
1142
1143fn compact_transcript_for_budget(
1144    transcript: &VmValue,
1145    policy: &SessionTranscriptBudgetPolicy,
1146    keep_last: usize,
1147    reason: &str,
1148) -> VmValue {
1149    let dict = transcript.as_dict().cloned().unwrap_or_else(BTreeMap::new);
1150    let messages = transcript_messages_from_dict(&dict);
1151    let message_capacity = tail_message_capacity(policy, true);
1152    let mut retained = Vec::new();
1153    let mut summary = None;
1154
1155    if messages.len() > message_capacity {
1156        if message_capacity > 0 {
1157            let tail_keep = keep_last.min(message_capacity.saturating_sub(1));
1158            let archived = messages.len().saturating_sub(tail_keep);
1159            let summary_text = format!(
1160                "[auto-compacted {archived} older message(s) under transcript budget]\nSession transcript exceeded the {reason} budget; retained the most recent {tail_keep} message(s)."
1161            );
1162            retained.push(summary_message_vm(&summary_text));
1163            retained.extend(messages.into_iter().skip(archived).take(tail_keep));
1164            summary = Some(summary_text);
1165        }
1166    } else {
1167        retained = messages;
1168    }
1169
1170    let mut next = dict;
1171    next.insert(
1172        "events".to_string(),
1173        VmValue::List(Rc::new(
1174            crate::llm::helpers::transcript_events_from_messages(&retained),
1175        )),
1176    );
1177    next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
1178    if let Some(summary) = summary {
1179        next.insert("summary".to_string(), VmValue::String(Rc::from(summary)));
1180    } else {
1181        next.remove("summary");
1182    }
1183    VmValue::Dict(Rc::new(next))
1184}
1185
1186fn recovered_transcript_with_audit(
1187    recovered: VmValue,
1188    action: &str,
1189    source: &str,
1190    reason: &str,
1191    policy: &SessionTranscriptBudgetPolicy,
1192    usage_before: &TranscriptBudgetUsage,
1193    usage_attempted: &TranscriptBudgetUsage,
1194    include_bytes: bool,
1195) -> (VmValue, serde_json::Value, TranscriptBudgetUsage) {
1196    let usage_after_without_audit = transcript_usage(&recovered, include_bytes);
1197    let initial_audit = transcript_budget_audit_json(
1198        action,
1199        source,
1200        reason,
1201        policy,
1202        usage_before,
1203        usage_attempted,
1204        &usage_after_without_audit,
1205    );
1206    let with_initial_audit =
1207        append_event_to_transcript(recovered.clone(), transcript_budget_event(&initial_audit));
1208    let usage_after = transcript_usage(&with_initial_audit, include_bytes);
1209    let audit = transcript_budget_audit_json(
1210        action,
1211        source,
1212        reason,
1213        policy,
1214        usage_before,
1215        usage_attempted,
1216        &usage_after,
1217    );
1218    let with_audit = append_event_to_transcript(recovered, transcript_budget_event(&audit));
1219    let usage_after = transcript_usage(&with_audit, include_bytes);
1220    (with_audit, audit, usage_after)
1221}
1222
1223fn apply_transcript_with_budget(
1224    state: &mut SessionState,
1225    candidate: VmValue,
1226    source: &str,
1227) -> Result<(), String> {
1228    let policy = state.transcript_budget_policy.normalized();
1229    let include_bytes = policy.max_approx_bytes.is_some();
1230    let usage_before = transcript_usage(&state.transcript, include_bytes);
1231    let usage_attempted = transcript_usage(&candidate, include_bytes);
1232    let Some(reason) = transcript_budget_exceeded_reason(&usage_attempted, &policy) else {
1233        state.transcript = candidate;
1234        return Ok(());
1235    };
1236
1237    match policy.recovery.clone() {
1238        TranscriptBudgetRecovery::Reject => {
1239            let audit = transcript_budget_audit_json(
1240                "rejected",
1241                source,
1242                reason,
1243                &policy,
1244                &usage_before,
1245                &usage_attempted,
1246                &usage_before,
1247            );
1248            state.last_transcript_budget_action = Some(audit);
1249            Err(transcript_budget_error(
1250                state,
1251                &policy,
1252                &usage_attempted,
1253                reason,
1254            ))
1255        }
1256        TranscriptBudgetRecovery::Trim { keep_last } => {
1257            let recovered = trim_transcript_for_budget(&candidate, &policy, keep_last);
1258            let (with_audit, audit, usage_after) = recovered_transcript_with_audit(
1259                recovered,
1260                "trimmed",
1261                source,
1262                reason,
1263                &policy,
1264                &usage_before,
1265                &usage_attempted,
1266                include_bytes,
1267            );
1268            if transcript_budget_exceeded_reason(&usage_after, &policy).is_some() {
1269                let rejected = transcript_budget_audit_json(
1270                    "rejected",
1271                    source,
1272                    reason,
1273                    &policy,
1274                    &usage_before,
1275                    &usage_attempted,
1276                    &usage_after,
1277                );
1278                state.last_transcript_budget_action = Some(rejected);
1279                return Err(transcript_budget_error(
1280                    state,
1281                    &policy,
1282                    &usage_after,
1283                    reason,
1284                ));
1285            }
1286            state.last_transcript_budget_action = Some(audit);
1287            state.transcript = with_audit;
1288            Ok(())
1289        }
1290        TranscriptBudgetRecovery::Compact { keep_last } => {
1291            let recovered = compact_transcript_for_budget(&candidate, &policy, keep_last, reason);
1292            let (with_audit, audit, usage_after) = recovered_transcript_with_audit(
1293                recovered,
1294                "compacted",
1295                source,
1296                reason,
1297                &policy,
1298                &usage_before,
1299                &usage_attempted,
1300                include_bytes,
1301            );
1302            if transcript_budget_exceeded_reason(&usage_after, &policy).is_some() {
1303                let rejected = transcript_budget_audit_json(
1304                    "rejected",
1305                    source,
1306                    reason,
1307                    &policy,
1308                    &usage_before,
1309                    &usage_attempted,
1310                    &usage_after,
1311                );
1312                state.last_transcript_budget_action = Some(rejected);
1313                return Err(transcript_budget_error(
1314                    state,
1315                    &policy,
1316                    &usage_after,
1317                    reason,
1318                ));
1319            }
1320            state.last_transcript_budget_action = Some(audit);
1321            state.transcript = with_audit;
1322            Ok(())
1323        }
1324    }
1325}
1326
1327fn emit_llm_message_event(session_id: &str, message_index: usize, message: &VmValue) {
1328    let mut fields = serde_json::Map::new();
1329    fields.insert(
1330        "session_id".to_string(),
1331        serde_json::Value::String(session_id.to_string()),
1332    );
1333    fields.insert(
1334        "message_index".to_string(),
1335        serde_json::json!(message_index),
1336    );
1337    let message_json = crate::llm::helpers::vm_value_to_json(message);
1338    if let Some(role) = message_json.get("role").and_then(|value| value.as_str()) {
1339        fields.insert(
1340            "role".to_string(),
1341            serde_json::Value::String(role.to_string()),
1342        );
1343    }
1344    if let Some(content) = message_json.get("content") {
1345        fields.insert("content".to_string(), content.clone());
1346    }
1347    fields.insert("message".to_string(), message_json);
1348    crate::llm::append_observability_sidecar_entry("message", fields);
1349}
1350
1351/// Create a new session from a reconstructed message list.
1352///
1353/// This is intentionally an all-at-once write instead of repeated
1354/// `inject_message` calls: importing a transcript should not re-emit
1355/// each historic turn into the active observability sidecar.
1356pub fn seed_from_messages(
1357    id: Option<String>,
1358    messages: &[serde_json::Value],
1359    metadata: serde_json::Value,
1360    system_prompt: Option<String>,
1361    tool_format: Option<String>,
1362) -> Result<String, String> {
1363    let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
1364    if exists(&resolved) {
1365        return Err(format!("agent session '{resolved}' already exists"));
1366    }
1367    open_or_create(Some(resolved.clone()));
1368    SESSIONS.with(|s| {
1369        let mut map = s.borrow_mut();
1370        let Some(state) = map.get_mut(&resolved) else {
1371            return Err(format!("failed to create agent session '{resolved}'"));
1372        };
1373        state.tool_format = tool_format.filter(|value| !value.trim().is_empty());
1374        state.system_prompt = system_prompt.filter(|value| !value.trim().is_empty());
1375
1376        let mut metadata = metadata
1377            .as_object()
1378            .cloned()
1379            .unwrap_or_else(serde_json::Map::new);
1380        if let Some(tool_format) = state.tool_format.as_ref() {
1381            metadata.insert(
1382                "tool_format".to_string(),
1383                serde_json::Value::String(tool_format.clone()),
1384            );
1385            metadata.insert(
1386                "tool_mode_locked".to_string(),
1387                serde_json::Value::Bool(true),
1388            );
1389        }
1390        if let Some(system_prompt) = state.system_prompt.as_ref() {
1391            metadata.insert(
1392                "system_prompt".to_string(),
1393                crate::llm::helpers::system_prompt_metadata(system_prompt),
1394            );
1395        }
1396        let vm_messages = crate::llm::helpers::json_messages_to_vm(messages);
1397        let candidate = crate::llm::helpers::new_transcript_with(
1398            Some(resolved.clone()),
1399            vm_messages,
1400            None,
1401            Some(crate::stdlib::json_to_vm_value(&serde_json::Value::Object(
1402                metadata,
1403            ))),
1404        );
1405        apply_transcript_with_budget(state, candidate, "seed_from_messages")?;
1406        state.last_accessed = Instant::now();
1407        Ok(resolved)
1408    })
1409}
1410
1411/// Load the messages vec (as JSON) for this session, for use as prefix
1412/// to an agent_loop run. Returns an empty vec if the session doesn't
1413/// exist or has no messages.
1414pub fn messages_json(id: &str) -> Vec<serde_json::Value> {
1415    SESSIONS.with(|s| {
1416        let map = s.borrow();
1417        let Some(state) = map.get(id) else {
1418            return Vec::new();
1419        };
1420        let Some(dict) = state.transcript.as_dict() else {
1421            return Vec::new();
1422        };
1423        match dict.get("messages") {
1424            Some(VmValue::List(list)) => list
1425                .iter()
1426                .map(crate::llm::helpers::vm_value_to_json)
1427                .collect(),
1428            _ => Vec::new(),
1429        }
1430    })
1431}
1432
1433#[derive(Clone, Debug, Default)]
1434pub struct SessionPromptState {
1435    pub messages: Vec<serde_json::Value>,
1436    pub summary: Option<String>,
1437}
1438
1439fn summary_message_json(summary: &str) -> serde_json::Value {
1440    serde_json::json!({
1441        "role": "user",
1442        "content": summary,
1443    })
1444}
1445
1446fn messages_begin_with_summary(messages: &[serde_json::Value], summary: &str) -> bool {
1447    messages.first().is_some_and(|message| {
1448        message.get("role").and_then(|value| value.as_str()) == Some("user")
1449            && message.get("content").and_then(|value| value.as_str()) == Some(summary)
1450    })
1451}
1452
1453/// Prompt-surface resume state for a persisted session.
1454///
1455/// Returns the compacted/rehydratable message list plus the transcript's
1456/// summary field. When the transcript carries a summary field but its
1457/// message list does not already begin with the compacted summary
1458/// message, this helper prepends one so session re-entry preserves the
1459/// same prompt surface the previous loop was actually using.
1460pub fn prompt_state_json(id: &str) -> SessionPromptState {
1461    SESSIONS.with(|s| {
1462        let map = s.borrow();
1463        let Some(state) = map.get(id) else {
1464            return SessionPromptState::default();
1465        };
1466        let Some(dict) = state.transcript.as_dict() else {
1467            return SessionPromptState::default();
1468        };
1469        let mut messages = match dict.get("messages") {
1470            Some(VmValue::List(list)) => list
1471                .iter()
1472                .map(crate::llm::helpers::vm_value_to_json)
1473                .collect::<Vec<_>>(),
1474            _ => Vec::new(),
1475        };
1476        let summary = dict.get("summary").and_then(|value| match value {
1477            VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
1478            _ => None,
1479        });
1480        if let Some(summary_text) = summary.as_deref() {
1481            if !messages_begin_with_summary(&messages, summary_text) {
1482                messages.insert(0, summary_message_json(summary_text));
1483            }
1484        }
1485        SessionPromptState { messages, summary }
1486    })
1487}
1488
1489/// Overwrite the transcript for this session. Used by `agent_loop` on
1490/// exit to persist the synthesized transcript.
1491pub fn store_transcript(id: &str, transcript: VmValue) -> Result<(), String> {
1492    SESSIONS.with(|s| {
1493        let mut map = s.borrow_mut();
1494        let Some(state) = map.get_mut(id) else {
1495            return Err(format!(
1496                "agent_session_store_transcript: unknown session id '{id}'"
1497            ));
1498        };
1499        let transcript = transcript_with_session_metadata(transcript, state);
1500        apply_transcript_with_budget(state, transcript, "store_transcript")?;
1501        state.last_accessed = Instant::now();
1502        Ok(())
1503    })
1504}
1505
1506/// Remove malformed reminder events after their drop audit has been emitted.
1507/// Pending-reminder rendering scans the transcript on every LLM call; pruning
1508/// invalid entries makes the drop event one-shot instead of noisy per turn.
1509pub fn prune_invalid_reminder_events(id: &str) -> usize {
1510    SESSIONS.with(|s| {
1511        let mut map = s.borrow_mut();
1512        let Some(state) = map.get_mut(id) else {
1513            return 0;
1514        };
1515        let Some(dict) = state.transcript.as_dict().cloned() else {
1516            return 0;
1517        };
1518        let Some(VmValue::List(events)) = dict.get("events") else {
1519            return 0;
1520        };
1521        let mut pruned = 0_usize;
1522        let mut kept = Vec::with_capacity(events.len());
1523        for event in events.iter().cloned() {
1524            let is_reminder = event
1525                .as_dict()
1526                .and_then(|event| event.get("kind"))
1527                .map(VmValue::display)
1528                .as_deref()
1529                == Some(crate::llm::helpers::SYSTEM_REMINDER_EVENT_KIND);
1530            if !is_reminder {
1531                kept.push(event);
1532                continue;
1533            }
1534            let valid = crate::llm::helpers::reminder_from_event(&event)
1535                .is_some_and(|reminder| !reminder.body.trim().is_empty());
1536            if valid {
1537                kept.push(event);
1538            } else {
1539                pruned += 1;
1540            }
1541        }
1542        if pruned > 0 {
1543            let mut next = dict;
1544            next.insert("events".to_string(), VmValue::List(Rc::new(kept)));
1545            let _ = apply_transcript_with_budget(
1546                state,
1547                VmValue::Dict(Rc::new(next)),
1548                "prune_invalid_reminder_events",
1549            );
1550            state.last_accessed = Instant::now();
1551        }
1552        pruned
1553    })
1554}
1555
1556/// Apply the reminder TTL lifecycle that runs once per completed agent
1557/// turn. Reminders with `ttl_turns = 1` expire and are removed; larger
1558/// finite TTLs are decremented in place. Expiry audit events are emitted
1559/// to the active EventLog when one is installed.
1560pub fn apply_reminder_post_turn(id: &str, turn: i64) -> Result<serde_json::Value, String> {
1561    let report = SESSIONS.with(|s| {
1562        let mut map = s.borrow_mut();
1563        let Some(state) = map.get_mut(id) else {
1564            return Err(format!(
1565                "agent_session_apply_reminder_post_turn: unknown session id '{id}'"
1566            ));
1567        };
1568        let report = crate::llm::helpers::apply_reminder_post_turn(&state.transcript, turn);
1569        if report.decremented_count > 0 || !report.expired.is_empty() {
1570            if let Some(next) = report.transcript.clone() {
1571                apply_transcript_with_budget(state, next, "apply_reminder_post_turn")?;
1572            }
1573            state.last_accessed = Instant::now();
1574        }
1575        Ok(report)
1576    })?;
1577
1578    for reminder in &report.expired {
1579        let mut payload = crate::llm::helpers::reminder_lifecycle_payload(Some(id), reminder);
1580        if let Some(obj) = payload.as_object_mut() {
1581            obj.insert(
1582                "transcript_id".to_string(),
1583                serde_json::Value::String(id.to_string()),
1584            );
1585            obj.insert(
1586                "reason".to_string(),
1587                serde_json::Value::String("ttl".to_string()),
1588            );
1589            obj.insert(
1590                "ttl_turns_before".to_string(),
1591                serde_json::json!(&reminder.ttl_turns),
1592            );
1593            obj.insert("expired_at_turn".to_string(), serde_json::json!(turn));
1594        }
1595        crate::llm::helpers::emit_reminder_lifecycle_event(
1596            crate::llm::helpers::REMINDER_EXPIRED_EVENT_KIND,
1597            payload,
1598        );
1599    }
1600
1601    Ok(serde_json::json!({
1602        "expired_count": report.expired.len(),
1603        "decremented_count": report.decremented_count,
1604        "remaining_count": report.remaining_count,
1605    }))
1606}
1607
1608/// Inject a typed system reminder into the session transcript's event
1609/// stream. This mirrors `transcript.inject_reminder` for live sessions:
1610/// reminders with the same `dedupe_key` are replaced before the new
1611/// reminder event is appended.
1612pub fn inject_reminder(
1613    id: &str,
1614    reminder: crate::llm::helpers::SystemReminder,
1615) -> Result<ReminderInjectionReport, String> {
1616    let reminder_id = reminder.id.clone();
1617    let dedupe_key = reminder.dedupe_key.clone();
1618    let mut deduped_reminder_ids = Vec::new();
1619    SESSIONS.with(|s| {
1620        let mut map = s.borrow_mut();
1621        let Some(state) = map.get_mut(id) else {
1622            return Err(format!(
1623                "agent_session_inject_reminder: unknown session id '{id}'"
1624            ));
1625        };
1626        let dict = state
1627            .transcript
1628            .as_dict()
1629            .cloned()
1630            .unwrap_or_else(BTreeMap::new);
1631        let mut events: Vec<VmValue> = match dict.get("events") {
1632            Some(VmValue::List(list)) => list.iter().cloned().collect(),
1633            _ => dict
1634                .get("messages")
1635                .and_then(|value| match value {
1636                    VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
1637                    _ => None,
1638                })
1639                .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
1640                .unwrap_or_default(),
1641        };
1642        if let Some(expected_key) = dedupe_key.as_deref() {
1643            events.retain(|event| {
1644                let Some(existing) = crate::llm::helpers::reminder_from_event(event) else {
1645                    return true;
1646                };
1647                if existing.dedupe_key.as_deref() == Some(expected_key) {
1648                    deduped_reminder_ids.push(existing.id);
1649                    false
1650                } else {
1651                    true
1652                }
1653            });
1654        }
1655        events.push(crate::llm::helpers::transcript_reminder_event(&reminder));
1656        let mut next = dict;
1657        next.insert("events".to_string(), VmValue::List(Rc::new(events)));
1658        apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), "inject_reminder")?;
1659        state.last_accessed = Instant::now();
1660        Ok(())
1661    })?;
1662
1663    if !deduped_reminder_ids.is_empty() {
1664        let dropped_count = deduped_reminder_ids.len();
1665        crate::llm::helpers::emit_reminder_lifecycle_event(
1666            crate::llm::helpers::REMINDER_DEDUPED_EVENT_KIND,
1667            serde_json::json!({
1668                "session_id": id,
1669                "transcript_id": id,
1670                "reminder_id": &reminder_id,
1671                "replacing_id": &reminder_id,
1672                "replaced_id": deduped_reminder_ids.first(),
1673                "replaced_ids": &deduped_reminder_ids,
1674                "dedupe_key": &dedupe_key,
1675                "dropped_reminder_ids": &deduped_reminder_ids,
1676                "dropped_count": dropped_count,
1677            }),
1678        );
1679    }
1680
1681    crate::llm::helpers::emit_reminder_lifecycle_event(
1682        crate::llm::helpers::REMINDER_INJECTED_EVENT_KIND,
1683        crate::llm::helpers::reminder_lifecycle_payload(Some(id), &reminder),
1684    );
1685
1686    Ok(ReminderInjectionReport {
1687        reminder_id,
1688        deduped_count: deduped_reminder_ids.len(),
1689    })
1690}
1691
1692/// Append a transcript event to the session without mutating its
1693/// message list. Used for orchestration-side lineage events (sub-agent
1694/// spawn/completion, workflow hooks, etc.) that should survive
1695/// persistence/replay without being replayed back into the model as
1696/// conversational messages.
1697pub fn append_event(id: &str, event: VmValue) -> Result<(), String> {
1698    let Some(event_dict) = event.as_dict() else {
1699        return Err("agent_session_append_event: event must be a dict".into());
1700    };
1701    let kind_ok = matches!(event_dict.get("kind"), Some(VmValue::String(_)));
1702    if !kind_ok {
1703        return Err("agent_session_append_event: event must have a string `kind`".into());
1704    }
1705    SESSIONS.with(|s| {
1706        let mut map = s.borrow_mut();
1707        let Some(state) = map.get_mut(id) else {
1708            return Err(format!(
1709                "agent_session_append_event: unknown session id '{id}'"
1710            ));
1711        };
1712        append_event_to_state(state, event, "append_event")?;
1713        state.last_accessed = Instant::now();
1714        Ok(())
1715    })
1716}
1717
1718fn append_event_to_state(
1719    state: &mut SessionState,
1720    event: VmValue,
1721    action: &str,
1722) -> Result<(), String> {
1723    let dict = state
1724        .transcript
1725        .as_dict()
1726        .cloned()
1727        .unwrap_or_else(BTreeMap::new);
1728    let mut events: Vec<VmValue> = match dict.get("events") {
1729        Some(VmValue::List(list)) => list.iter().cloned().collect(),
1730        _ => dict
1731            .get("messages")
1732            .and_then(|value| match value {
1733                VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
1734                _ => None,
1735            })
1736            .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
1737            .unwrap_or_default(),
1738    };
1739    events.push(event);
1740    let mut next = dict;
1741    next.insert("events".to_string(), VmValue::List(Rc::new(events)));
1742    apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), action)
1743}
1744
1745/// Replace the transcript's message list wholesale. Used by the
1746/// in-loop compaction path, which operates on JSON messages.
1747pub fn replace_messages(id: &str, messages: &[serde_json::Value]) -> Result<(), String> {
1748    replace_messages_with_summary(id, messages, None)
1749}
1750
1751/// Replace the transcript's message list and optionally update the
1752/// `summary` field on the persisted transcript. The compaction path
1753/// uses this to publish the human-readable rollup line that
1754/// `transcript_summary(transcript)` exposes to host code.
1755pub fn replace_messages_with_summary(
1756    id: &str,
1757    messages: &[serde_json::Value],
1758    summary: Option<&str>,
1759) -> Result<(), String> {
1760    SESSIONS.with(|s| {
1761        let mut map = s.borrow_mut();
1762        let Some(state) = map.get_mut(id) else {
1763            return Err(format!(
1764                "agent_session_replace_messages: unknown session id '{id}'"
1765            ));
1766        };
1767        let dict = state
1768            .transcript
1769            .as_dict()
1770            .cloned()
1771            .unwrap_or_else(BTreeMap::new);
1772        let vm_messages: Vec<VmValue> = messages
1773            .iter()
1774            .map(crate::stdlib::json_to_vm_value)
1775            .collect();
1776        let mut next = dict;
1777        next.insert(
1778            "events".to_string(),
1779            VmValue::List(Rc::new(
1780                crate::llm::helpers::transcript_events_from_messages(&vm_messages),
1781            )),
1782        );
1783        next.insert("messages".to_string(), VmValue::List(Rc::new(vm_messages)));
1784        if let Some(summary) = summary {
1785            next.insert(
1786                "summary".to_string(),
1787                VmValue::String(Rc::from(summary.to_string())),
1788            );
1789        } else {
1790            next.remove("summary");
1791        }
1792        apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), "replace_messages")?;
1793        state.last_accessed = Instant::now();
1794        Ok(())
1795    })
1796}
1797
1798pub fn append_subscriber(id: &str, callback: VmValue) {
1799    open_or_create(Some(id.to_string()));
1800    SESSIONS.with(|s| {
1801        if let Some(state) = s.borrow_mut().get_mut(id) {
1802            state.subscribers.push(callback);
1803            state.last_accessed = Instant::now();
1804        }
1805    });
1806}
1807
1808pub fn subscribers_for(id: &str) -> Vec<VmValue> {
1809    SESSIONS.with(|s| {
1810        s.borrow()
1811            .get(id)
1812            .map(|state| state.subscribers.clone())
1813            .unwrap_or_default()
1814    })
1815}
1816
1817pub fn subscriber_count(id: &str) -> usize {
1818    SESSIONS.with(|s| {
1819        s.borrow()
1820            .get(id)
1821            .map(|state| state.subscribers.len())
1822            .unwrap_or(0)
1823    })
1824}
1825
1826/// Persist the set of active skill names for session resume. Called at
1827/// the end of an agent_loop run; the next `open_or_create` for this id
1828/// reads them back via [`active_skills`].
1829pub fn set_active_skills(id: &str, skills: Vec<String>) {
1830    SESSIONS.with(|s| {
1831        if let Some(state) = s.borrow_mut().get_mut(id) {
1832            state.active_skills = skills;
1833            state.last_accessed = Instant::now();
1834        }
1835    });
1836}
1837
1838/// Skills that were active at the end of the previous agent_loop run
1839/// against this session. Returns an empty vec when the session doesn't
1840/// exist or nothing was persisted.
1841pub fn active_skills(id: &str) -> Vec<String> {
1842    SESSIONS.with(|s| {
1843        s.borrow()
1844            .get(id)
1845            .map(|state| state.active_skills.clone())
1846            .unwrap_or_default()
1847    })
1848}
1849
1850/// Claim the tool-calling contract for a session.
1851///
1852/// The first loop against a named session records its `tool_format`.
1853/// Later re-entry must use the same format so prompt/history generated
1854/// under a text contract is never replayed as native, or vice versa.
1855pub fn claim_tool_format(id: &str, tool_format: &str) -> Result<(), String> {
1856    let tool_format = tool_format.trim();
1857    if tool_format.is_empty() {
1858        return Ok(());
1859    }
1860    SESSIONS.with(|s| {
1861        let mut map = s.borrow_mut();
1862        let Some(state) = map.get_mut(id) else {
1863            return Err(format!("agent session '{id}' does not exist"));
1864        };
1865        match state.tool_format.as_deref() {
1866            Some(existing) if existing != tool_format => Err(format!(
1867                "agent session '{id}' is locked to tool_format='{existing}', but this run requested tool_format='{tool_format}'. Start a new session or fork/reset the transcript before changing tool mode."
1868            )),
1869            Some(_) => {
1870                state.last_accessed = Instant::now();
1871                Ok(())
1872            }
1873            None => {
1874                state.tool_format = Some(tool_format.to_string());
1875                state.last_accessed = Instant::now();
1876                Ok(())
1877            }
1878        }
1879    })
1880}
1881
1882pub fn tool_format(id: &str) -> Option<String> {
1883    SESSIONS.with(|s| {
1884        s.borrow()
1885            .get(id)
1886            .and_then(|state| state.tool_format.clone())
1887    })
1888}
1889
1890pub fn record_system_prompt(id: &str, system_prompt: &str) -> Result<(), String> {
1891    let system_prompt = system_prompt.trim();
1892    if system_prompt.is_empty() {
1893        return Ok(());
1894    }
1895    assert_cache_stable_system_prompt(system_prompt);
1896    SESSIONS.with(|s| {
1897        let mut map = s.borrow_mut();
1898        let Some(state) = map.get_mut(id) else {
1899            return Err(format!("agent session '{id}' does not exist"));
1900        };
1901        let changed = state.system_prompt.as_deref() != Some(system_prompt);
1902        state.system_prompt = Some(system_prompt.to_string());
1903        let dict = state
1904            .transcript
1905            .as_dict()
1906            .cloned()
1907            .unwrap_or_else(BTreeMap::new);
1908        let mut next = dict;
1909        apply_system_prompt_metadata(&mut next, system_prompt);
1910        if changed {
1911            let mut events: Vec<VmValue> = match next.get("events") {
1912                Some(VmValue::List(list)) => list.iter().cloned().collect(),
1913                _ => Vec::new(),
1914            };
1915            events.push(crate::llm::helpers::transcript_event(
1916                "system_prompt",
1917                "system",
1918                "internal",
1919                "",
1920                Some(crate::llm::helpers::system_prompt_event_metadata(
1921                    system_prompt,
1922                )),
1923            ));
1924            next.insert("events".to_string(), VmValue::List(Rc::new(events)));
1925        }
1926        apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), "record_system_prompt")?;
1927        state.last_accessed = Instant::now();
1928        Ok(())
1929    })
1930}
1931
1932pub fn system_prompt(id: &str) -> Option<String> {
1933    SESSIONS.with(|s| {
1934        s.borrow()
1935            .get(id)
1936            .and_then(|state| state.system_prompt.clone())
1937    })
1938}
1939
1940#[cfg(debug_assertions)]
1941fn forbidden_workspace_prompt_token(system_prompt: &str) -> Option<&'static str> {
1942    let mut remaining = system_prompt;
1943    while let Some(index) = remaining.find("{{") {
1944        let candidate = remaining[index + 2..].trim_start();
1945        if candidate.starts_with("workspace_") {
1946            return Some("workspace_");
1947        }
1948        if candidate.starts_with("project_") {
1949            return Some("project_");
1950        }
1951        remaining = candidate;
1952    }
1953    None
1954}
1955
1956#[cfg(debug_assertions)]
1957fn assert_cache_stable_system_prompt(system_prompt: &str) {
1958    if let Some(prefix) = forbidden_workspace_prompt_token(system_prompt) {
1959        panic!(
1960            "{CACHE_STABLE_SYSTEM_PROMPT_DIAGNOSTIC}: session system prompts must not interpolate `{{{{{prefix}...` tokens; move workspace/project context into the workspace-anchor reminder"
1961        );
1962    }
1963}
1964
1965#[cfg(not(debug_assertions))]
1966fn assert_cache_stable_system_prompt(_system_prompt: &str) {}
1967
1968/// Pin (or clear, with `None`) a model selector on a session. Returns
1969/// `Ok(true)` when the value actually changed so callers can decide
1970/// whether to broadcast a notification. The selector is stored verbatim
1971/// — alias / catalog resolution is the call-site's job.
1972pub fn set_pinned_model(id: &str, model: Option<String>) -> Result<bool, String> {
1973    let normalized = model
1974        .map(|value| value.trim().to_string())
1975        .filter(|value| !value.is_empty());
1976    SESSIONS.with(|s| {
1977        let mut map = s.borrow_mut();
1978        let Some(state) = map.get_mut(id) else {
1979            return Err(format!("agent session '{id}' does not exist"));
1980        };
1981        let changed = state.pinned_model != normalized;
1982        state.pinned_model = normalized;
1983        state.last_accessed = Instant::now();
1984        Ok(changed)
1985    })
1986}
1987
1988/// Read the session's pinned model selector, if any. Consumed by
1989/// `vm_resolve_model` as the per-session default when a script-level
1990/// `llm_call` does not pass `model:` explicitly.
1991pub fn pinned_model(id: &str) -> Option<String> {
1992    SESSIONS.with(|s| {
1993        s.borrow()
1994            .get(id)
1995            .and_then(|state| state.pinned_model.clone())
1996    })
1997}
1998
1999/// Pin (or clear) the session-level provider-aware reasoning policy.
2000pub fn set_pinned_reasoning_policy(id: &str, policy: Option<String>) -> Result<bool, String> {
2001    let normalized = match policy {
2002        Some(value) => crate::llm::reasoning_policy::normalize_policy_selector(&value)?,
2003        None => None,
2004    };
2005    SESSIONS.with(|s| {
2006        let mut map = s.borrow_mut();
2007        let Some(state) = map.get_mut(id) else {
2008            return Err(format!("agent session '{id}' does not exist"));
2009        };
2010        let changed = state.pinned_reasoning_policy != normalized;
2011        state.pinned_reasoning_policy = normalized;
2012        state.last_accessed = Instant::now();
2013        Ok(changed)
2014    })
2015}
2016
2017/// Read the session's pinned reasoning policy, if any.
2018pub fn pinned_reasoning_policy(id: &str) -> Option<String> {
2019    SESSIONS.with(|s| {
2020        s.borrow()
2021            .get(id)
2022            .and_then(|state| state.pinned_reasoning_policy.clone())
2023    })
2024}
2025
2026/// Set (or clear, with `None`) the typed workspace anchor on a session.
2027/// Returns `Ok(true)` when the value actually changed so callers can
2028/// decide whether to broadcast `AnchorChanged` notifications.
2029pub fn set_workspace_anchor(id: &str, anchor: Option<WorkspaceAnchor>) -> Result<bool, String> {
2030    SESSIONS.with(|s| {
2031        let mut map = s.borrow_mut();
2032        let Some(state) = map.get_mut(id) else {
2033            return Err(format!("agent session '{id}' does not exist"));
2034        };
2035        let changed = state.workspace_anchor != anchor;
2036        state.workspace_anchor = anchor;
2037        if changed {
2038            crate::llm::permissions::clear_session_grants(id);
2039        }
2040        state.last_accessed = Instant::now();
2041        Ok(changed)
2042    })
2043}
2044
2045/// Read the session's typed workspace anchor, if any.
2046pub fn workspace_anchor(id: &str) -> Option<WorkspaceAnchor> {
2047    SESSIONS.with(|s| {
2048        s.borrow()
2049            .get(id)
2050            .and_then(|state| state.workspace_anchor.clone())
2051    })
2052}
2053
2054/// Outcome of `reanchor_session`: previous + new anchor and whether the
2055/// swap actually moved anything. Callers use `changed` to suppress
2056/// no-op transcript / live events.
2057#[derive(Clone, Debug, PartialEq, Eq)]
2058pub struct ReanchorOutcome {
2059    pub previous: Option<WorkspaceAnchor>,
2060    pub current: WorkspaceAnchor,
2061    pub changed: bool,
2062}
2063
2064/// Atomically swap the session's primary anchor + emit the canonical
2065/// `AnchorChanged` transcript event and live `AgentEvent::AnchorChanged`
2066/// notification (#2218). Clears session-scoped permission grants so
2067/// stale anchor-based decisions don't leak into the next turn.
2068pub fn reanchor_session(
2069    id: &str,
2070    new_anchor: WorkspaceAnchor,
2071    carry_transcript: bool,
2072    compacted: bool,
2073    reason: Option<String>,
2074) -> Result<ReanchorOutcome, String> {
2075    let outcome = SESSIONS.with(|s| {
2076        let mut map = s.borrow_mut();
2077        let Some(state) = map.get_mut(id) else {
2078            return Err(format!("agent session '{id}' does not exist"));
2079        };
2080        let previous = state.workspace_anchor.clone();
2081        let changed = previous.as_ref() != Some(&new_anchor);
2082        state.workspace_anchor = Some(new_anchor.clone());
2083        if changed {
2084            crate::llm::permissions::clear_session_grants(id);
2085        }
2086        state.last_accessed = Instant::now();
2087        Ok(ReanchorOutcome {
2088            previous,
2089            current: new_anchor,
2090            changed,
2091        })
2092    })?;
2093    if !outcome.changed {
2094        return Ok(outcome);
2095    }
2096    let previous_json = outcome.previous.as_ref().map(WorkspaceAnchor::to_json);
2097    let current_json = outcome.current.to_json();
2098    let event_metadata = serde_json::json!({
2099        "previous": previous_json,
2100        "current": current_json,
2101        "carry_transcript": carry_transcript,
2102        "compacted": compacted,
2103        "reason": reason,
2104    });
2105    let event = crate::llm::helpers::transcript_event(
2106        "AnchorChanged",
2107        "system",
2108        "internal",
2109        "",
2110        Some(event_metadata),
2111    );
2112    let _ = append_event(id, event);
2113    crate::llm::emit_live_agent_event_sync(&crate::agent_events::AgentEvent::AnchorChanged {
2114        session_id: id.to_string(),
2115        previous: previous_json,
2116        current: current_json,
2117        carry_transcript,
2118        compacted,
2119        reason,
2120    });
2121    Ok(outcome)
2122}
2123
2124/// Set session-local workspace defaults. Returns `Ok(true)` when the
2125/// policy changed.
2126pub fn set_workspace_policy(id: &str, policy: WorkspacePolicy) -> Result<bool, String> {
2127    SESSIONS.with(|s| {
2128        let mut map = s.borrow_mut();
2129        let Some(state) = map.get_mut(id) else {
2130            return Err(format!("agent session '{id}' does not exist"));
2131        };
2132        let changed = state.workspace_policy != policy;
2133        state.workspace_policy = policy;
2134        state.last_accessed = Instant::now();
2135        Ok(changed)
2136    })
2137}
2138
2139/// Read the session's workspace policy, if the session exists.
2140pub fn workspace_policy(id: &str) -> Option<WorkspacePolicy> {
2141    SESSIONS.with(|s| {
2142        s.borrow()
2143            .get(id)
2144            .map(|state| state.workspace_policy.clone())
2145    })
2146}
2147
2148/// Validate and mount an additional workspace root on an anchored
2149/// session. When the path is already mounted, updates its mount mode
2150/// in place and refreshes its `mounted_at` timestamp.
2151pub fn add_workspace_root(
2152    id: &str,
2153    root: &str,
2154    mount_mode: Option<MountMode>,
2155    reason: Option<String>,
2156) -> Result<String, String> {
2157    let normalized_root = validate_workspace_root_path(root)?;
2158    let mounted_at = crate::orchestration::now_rfc3339();
2159    SESSIONS.with(|s| {
2160        let mut map = s.borrow_mut();
2161        let Some(state) = map.get_mut(id) else {
2162            return Err(format!("agent session '{id}' does not exist"));
2163        };
2164        let default_mount_mode = state.workspace_policy.default_mount_mode;
2165        let Some(anchor) = state.workspace_anchor.as_mut() else {
2166            return Err(format!("agent session '{id}' has no workspace anchor"));
2167        };
2168        let resolved_mount_mode = mount_mode.unwrap_or(default_mount_mode);
2169        if let Some(existing) = anchor
2170            .additional_roots
2171            .iter_mut()
2172            .find(|entry| entry.path == normalized_root)
2173        {
2174            existing.mount_mode = resolved_mount_mode;
2175            existing.mounted_at = mounted_at.clone();
2176        } else {
2177            anchor.additional_roots.push(MountedRoot {
2178                path: normalized_root.clone(),
2179                mount_mode: resolved_mount_mode,
2180                mounted_at: mounted_at.clone(),
2181            });
2182        }
2183        let event = crate::llm::helpers::transcript_event(
2184            "RootMounted",
2185            "system",
2186            "internal",
2187            "",
2188            Some(serde_json::json!({
2189                "path": normalized_root.to_string_lossy(),
2190                "mount_mode": resolved_mount_mode.as_str(),
2191                "mounted_at": mounted_at.clone(),
2192                "reason": reason,
2193            })),
2194        );
2195        append_event_to_state(state, event, "add_workspace_root")?;
2196        crate::llm::permissions::clear_session_grants(id);
2197        state.last_accessed = Instant::now();
2198        Ok(mounted_at.clone())
2199    })
2200}
2201
2202/// Remove one mounted root from an anchored session. Returns whether an
2203/// existing mount entry was deleted. Removing an absent root is a no-op.
2204pub fn remove_workspace_root(id: &str, root: &str) -> Result<bool, String> {
2205    let normalized_root = normalize_workspace_root_path(root);
2206    SESSIONS.with(|s| {
2207        let mut map = s.borrow_mut();
2208        let Some(state) = map.get_mut(id) else {
2209            return Err(format!("agent session '{id}' does not exist"));
2210        };
2211        let Some(anchor) = state.workspace_anchor.as_mut() else {
2212            return Err(format!("agent session '{id}' has no workspace anchor"));
2213        };
2214        let before = anchor.additional_roots.len();
2215        anchor
2216            .additional_roots
2217            .retain(|entry| entry.path != normalized_root);
2218        let removed = anchor.additional_roots.len() != before;
2219        if removed {
2220            crate::llm::permissions::clear_session_grants(id);
2221        }
2222        state.last_accessed = Instant::now();
2223        Ok(removed)
2224    })
2225}
2226
2227pub fn list_workspace_roots(id: &str) -> Result<(PathBuf, Vec<MountedRoot>), String> {
2228    SESSIONS.with(|s| {
2229        let map = s.borrow();
2230        let Some(state) = map.get(id) else {
2231            return Err(format!("agent session '{id}' does not exist"));
2232        };
2233        let Some(anchor) = state.workspace_anchor.as_ref() else {
2234            return Err(format!("agent session '{id}' has no workspace anchor"));
2235        };
2236        Ok((anchor.primary.clone(), anchor.additional_roots.clone()))
2237    })
2238}
2239
2240fn validate_workspace_root_path(root: &str) -> Result<PathBuf, String> {
2241    let normalized = normalize_workspace_root_path(root);
2242    let canonical = std::fs::canonicalize(&normalized)
2243        .map_err(|error| format!("workspace root '{root}' must exist and be readable: {error}"))?;
2244    let metadata = std::fs::metadata(&canonical)
2245        .map_err(|error| format!("workspace root '{root}' must exist and be readable: {error}"))?;
2246    if !metadata.is_dir() {
2247        return Err(format!("workspace root '{root}' must be a directory"));
2248    }
2249    std::fs::read_dir(&canonical)
2250        .map_err(|error| format!("workspace root '{root}' must be readable: {error}"))?;
2251    Ok(canonical)
2252}
2253
2254fn normalize_workspace_root_path(root: &str) -> PathBuf {
2255    let absolute = crate::stdlib::process::normalize_context_path(Path::new(root));
2256    std::fs::canonicalize(&absolute).unwrap_or(absolute)
2257}
2258
2259fn empty_transcript(id: &str) -> VmValue {
2260    use crate::llm::helpers::new_transcript_with;
2261    new_transcript_with(Some(id.to_string()), Vec::new(), None, None)
2262}
2263
2264fn clone_transcript_with_id(transcript: &VmValue, new_id: &str) -> VmValue {
2265    let Some(dict) = transcript.as_dict() else {
2266        return empty_transcript(new_id);
2267    };
2268    let mut next = dict.clone();
2269    next.insert(
2270        "id".to_string(),
2271        VmValue::String(Rc::from(new_id.to_string())),
2272    );
2273    VmValue::Dict(Rc::new(next))
2274}
2275
2276fn clone_transcript_with_parent(transcript: &VmValue, parent_id: &str) -> VmValue {
2277    let Some(dict) = transcript.as_dict() else {
2278        return transcript.clone();
2279    };
2280    let mut next = dict.clone();
2281    let metadata = match next.get("metadata") {
2282        Some(VmValue::Dict(metadata)) => {
2283            let mut metadata = metadata.as_ref().clone();
2284            metadata.insert(
2285                "parent_session_id".to_string(),
2286                VmValue::String(Rc::from(parent_id.to_string())),
2287            );
2288            VmValue::Dict(Rc::new(metadata))
2289        }
2290        _ => VmValue::Dict(Rc::new(BTreeMap::from([(
2291            "parent_session_id".to_string(),
2292            VmValue::String(Rc::from(parent_id.to_string())),
2293        )]))),
2294    };
2295    next.insert("metadata".to_string(), metadata);
2296    VmValue::Dict(Rc::new(next))
2297}
2298
2299fn apply_system_prompt_metadata(next: &mut BTreeMap<String, VmValue>, system_prompt: &str) {
2300    let mut metadata = match next.get("metadata") {
2301        Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
2302        _ => BTreeMap::new(),
2303    };
2304    metadata.insert(
2305        "system_prompt".to_string(),
2306        crate::stdlib::json_to_vm_value(&crate::llm::helpers::system_prompt_metadata(
2307            system_prompt,
2308        )),
2309    );
2310    next.insert("metadata".to_string(), VmValue::Dict(Rc::new(metadata)));
2311}
2312
2313fn transcript_with_session_metadata(transcript: VmValue, state: &SessionState) -> VmValue {
2314    let Some(dict) = transcript.as_dict() else {
2315        return transcript;
2316    };
2317    let mut next = dict.clone();
2318    let mut metadata = match next.get("metadata") {
2319        Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
2320        _ => BTreeMap::new(),
2321    };
2322    if let Some(tool_format) = state.tool_format.as_ref() {
2323        metadata.insert(
2324            "tool_format".to_string(),
2325            VmValue::String(Rc::from(tool_format.clone())),
2326        );
2327        metadata.insert("tool_mode_locked".to_string(), VmValue::Bool(true));
2328    }
2329    if let Some(system_prompt) = state.system_prompt.as_ref() {
2330        metadata.insert(
2331            "system_prompt".to_string(),
2332            crate::stdlib::json_to_vm_value(&crate::llm::helpers::system_prompt_metadata(
2333                system_prompt,
2334            )),
2335        );
2336    }
2337    if let Some(anchor) = state.workspace_anchor.as_ref() {
2338        metadata.insert(
2339            WORKSPACE_ANCHOR_METADATA_KEY.to_string(),
2340            anchor.to_vm_value(),
2341        );
2342    } else {
2343        metadata.remove(WORKSPACE_ANCHOR_METADATA_KEY);
2344    }
2345    if let Some(last_action) = state.last_transcript_budget_action.as_ref() {
2346        let usage = transcript_usage(
2347            &VmValue::Dict(Rc::new(next.clone())),
2348            state.transcript_budget_policy.max_approx_bytes.is_some(),
2349        );
2350        metadata.insert(
2351            "transcript_budget".to_string(),
2352            crate::stdlib::json_to_vm_value(&serde_json::json!({
2353                "policy": transcript_budget_policy_json(&state.transcript_budget_policy.normalized()),
2354                "usage": transcript_budget_usage_json(&usage),
2355                "last_action": last_action,
2356            })),
2357        );
2358    }
2359    if !metadata.is_empty() {
2360        next.insert("metadata".to_string(), VmValue::Dict(Rc::new(metadata)));
2361    }
2362    VmValue::Dict(Rc::new(next))
2363}
2364
2365fn session_snapshot(state: &SessionState) -> VmValue {
2366    let transcript = transcript_with_session_metadata(state.transcript.clone(), state);
2367    let Some(dict) = transcript.as_dict() else {
2368        return state.transcript.clone();
2369    };
2370    let mut next = dict.clone();
2371    let length = next
2372        .get("messages")
2373        .and_then(|value| match value {
2374            VmValue::List(list) => Some(list.len() as i64),
2375            _ => None,
2376        })
2377        .unwrap_or(0);
2378    next.insert("length".to_string(), VmValue::Int(length));
2379    next.insert(
2380        "created_at".to_string(),
2381        VmValue::String(Rc::from(state.created_at.clone())),
2382    );
2383    next.insert(
2384        "parent_id".to_string(),
2385        state
2386            .parent_id
2387            .as_ref()
2388            .map(|id| VmValue::String(Rc::from(id.clone())))
2389            .unwrap_or(VmValue::Nil),
2390    );
2391    next.insert(
2392        "child_ids".to_string(),
2393        VmValue::List(Rc::new(
2394            state
2395                .child_ids
2396                .iter()
2397                .cloned()
2398                .map(|id| VmValue::String(Rc::from(id)))
2399                .collect(),
2400        )),
2401    );
2402    next.insert(
2403        "branched_at_event_index".to_string(),
2404        state
2405            .branched_at_event_index
2406            .map(|index| VmValue::Int(index as i64))
2407            .unwrap_or(VmValue::Nil),
2408    );
2409    next.insert(
2410        "system_prompt".to_string(),
2411        state
2412            .system_prompt
2413            .as_ref()
2414            .map(|prompt| VmValue::String(Rc::from(prompt.clone())))
2415            .unwrap_or(VmValue::Nil),
2416    );
2417    next.insert(
2418        "tool_format".to_string(),
2419        state
2420            .tool_format
2421            .as_ref()
2422            .map(|format| VmValue::String(Rc::from(format.clone())))
2423            .unwrap_or(VmValue::Nil),
2424    );
2425    next.insert(
2426        "pinned_model".to_string(),
2427        state
2428            .pinned_model
2429            .as_ref()
2430            .map(|model| VmValue::String(Rc::from(model.clone())))
2431            .unwrap_or(VmValue::Nil),
2432    );
2433    next.insert(
2434        "pinned_reasoning_policy".to_string(),
2435        state
2436            .pinned_reasoning_policy
2437            .as_ref()
2438            .map(|policy| VmValue::String(Rc::from(policy.clone())))
2439            .unwrap_or(VmValue::Nil),
2440    );
2441    next.insert(
2442        "workspace_anchor".to_string(),
2443        state
2444            .workspace_anchor
2445            .as_ref()
2446            .map(WorkspaceAnchor::to_vm_value)
2447            .unwrap_or(VmValue::Nil),
2448    );
2449    next.insert(
2450        "workspace_policy".to_string(),
2451        state.workspace_policy.to_vm_value(),
2452    );
2453    VmValue::Dict(Rc::new(next))
2454}
2455
2456fn update_lineage(
2457    map: &mut HashMap<String, SessionState>,
2458    parent_id: &str,
2459    child_id: &str,
2460    branched_at_event_index: Option<usize>,
2461) {
2462    let old_parent_id = map.get(child_id).and_then(|child| child.parent_id.clone());
2463    if let Some(old_parent_id) = old_parent_id.filter(|old_parent_id| old_parent_id != parent_id) {
2464        if let Some(old_parent) = map.get_mut(&old_parent_id) {
2465            old_parent.child_ids.retain(|id| id != child_id);
2466            old_parent.last_accessed = Instant::now();
2467        }
2468    }
2469    if let Some(parent) = map.get_mut(parent_id) {
2470        parent.last_accessed = Instant::now();
2471        if !parent.child_ids.iter().any(|id| id == child_id) {
2472            parent.child_ids.push(child_id.to_string());
2473        }
2474    }
2475    if let Some(child) = map.get_mut(child_id) {
2476        child.last_accessed = Instant::now();
2477        child.parent_id = Some(parent_id.to_string());
2478        child.branched_at_event_index = branched_at_event_index;
2479        child.transcript = clone_transcript_with_parent(&child.transcript, parent_id);
2480    }
2481}
2482
2483fn branch_event_index(transcript: &VmValue, keep_first: usize) -> usize {
2484    if keep_first == 0 {
2485        return 0;
2486    }
2487    let Some(dict) = transcript.as_dict() else {
2488        return keep_first;
2489    };
2490    let Some(VmValue::List(events)) = dict.get("events") else {
2491        return keep_first;
2492    };
2493    event_prefix_len_for_messages(events, keep_first)
2494}
2495
2496fn event_kind(event: &VmValue) -> Option<String> {
2497    event
2498        .as_dict()
2499        .and_then(|dict| dict.get("kind"))
2500        .map(VmValue::display)
2501}
2502
2503fn event_id(event: &VmValue) -> Option<String> {
2504    event
2505        .as_dict()
2506        .and_then(|dict| dict.get("id"))
2507        .map(VmValue::display)
2508}
2509
2510fn is_turn_event(event: &VmValue) -> bool {
2511    matches!(
2512        event_kind(event).as_deref(),
2513        Some("message" | "tool_result")
2514    )
2515}
2516
2517fn event_prefix_len_for_messages(events: &[VmValue], keep_first: usize) -> usize {
2518    if keep_first == 0 {
2519        return 0;
2520    }
2521    let mut retained_messages = 0usize;
2522    for (index, event) in events.iter().enumerate() {
2523        if is_turn_event(event) {
2524            retained_messages += 1;
2525            if retained_messages == keep_first {
2526                return index + 1;
2527            }
2528        }
2529    }
2530    events.len()
2531}
2532
2533fn turn_event_id_for_count(events: &[VmValue], keep_first: usize) -> Option<String> {
2534    if keep_first == 0 {
2535        return None;
2536    }
2537    let mut retained_messages = 0usize;
2538    for event in events {
2539        if is_turn_event(event) {
2540            retained_messages += 1;
2541            if retained_messages == keep_first {
2542                return event_id(event);
2543            }
2544        }
2545    }
2546    None
2547}
2548
2549#[cfg(test)]
2550#[path = "agent_sessions_tests.rs"]
2551mod tests;