Skip to main content

harn_vm/
agent_sessions.rs

1//! First-class session storage.
2//!
3//! A session owns three things:
4//!
5//! 1. A transcript dict (messages, events, summary, metadata, …).
6//! 2. Closure subscribers that fire on agent-loop events for this session.
7//! 3. Its own lifecycle (open, reset, fork, trim, compact, close).
8//!
9//! Storage is thread-local because session lifecycle and subscriber dispatch
10//! are owned by the current-thread agent-loop worker. The subscribers register,
11//! fire, and unregister on that same thread, keeping ordering deterministic.
12//!
13//! Lifecycle is explicit. Builtins (`agent_session_open`,
14//! `_reset`, `_fork`, `_fork_at`, `_close`, `_trim`, `_compact`,
15//! `_inject`, `_exists`, `_length`, `_snapshot`, `_ancestry`) drive
16//! the store directly — there is no "policy" config dict that
17//! performs lifecycle as a side effect.
18
19use std::cell::{Cell, RefCell};
20use std::collections::{BTreeMap, HashMap, HashSet};
21use std::future::Future;
22use std::path::{Path, PathBuf};
23use std::time::Instant;
24
25use crate::runtime_limits::RuntimeLimits;
26use crate::value::VmValue;
27use crate::workspace_anchor::{
28    MountMode, MountedRoot, WorkspaceAnchor, WorkspacePolicy, WORKSPACE_ANCHOR_METADATA_KEY,
29};
30
31/// Default cap on concurrent sessions per VM thread. Beyond this the
32/// least-recently-accessed session is evicted on the next `open`.
33pub const DEFAULT_SESSION_CAP: usize = RuntimeLimits::DEFAULT.max_agent_sessions;
34
35/// Default cap on retained prompt-visible messages per session. The
36/// limit is intentionally high enough for normal long-running agents
37/// while still bounding accidental unbounded growth.
38pub const DEFAULT_TRANSCRIPT_MESSAGE_CAP: usize = 4096;
39
40/// Default cap on retained transcript audit events per session. Events
41/// include message-derived entries plus orchestration lifecycle records.
42pub const DEFAULT_TRANSCRIPT_EVENT_CAP: usize = 32768;
43pub const MAX_SCRATCHPAD_BYTES: usize = 16 * 1024;
44#[cfg(debug_assertions)]
45const CACHE_STABLE_SYSTEM_PROMPT_DIAGNOSTIC: &str = "HARN-CACHE-001";
46
47#[derive(Clone, Debug, PartialEq, Eq)]
48pub enum TranscriptBudgetRecovery {
49    Reject,
50    Trim { keep_last: usize },
51    Compact { keep_last: usize },
52}
53
54#[derive(Clone, Debug, PartialEq, Eq)]
55pub struct SessionTranscriptBudgetPolicy {
56    pub max_messages: usize,
57    pub max_events: usize,
58    pub max_approx_bytes: Option<usize>,
59    pub recovery: TranscriptBudgetRecovery,
60}
61
62impl SessionTranscriptBudgetPolicy {
63    pub fn reject(max_messages: usize, max_events: usize) -> Self {
64        Self {
65            max_messages: max_messages.max(1),
66            max_events: max_events.max(1),
67            max_approx_bytes: None,
68            recovery: TranscriptBudgetRecovery::Reject,
69        }
70    }
71
72    pub fn trim(max_messages: usize, max_events: usize, keep_last: usize) -> Self {
73        Self {
74            max_messages: max_messages.max(1),
75            max_events: max_events.max(1),
76            max_approx_bytes: None,
77            recovery: TranscriptBudgetRecovery::Trim { keep_last },
78        }
79    }
80
81    pub fn compact(max_messages: usize, max_events: usize, keep_last: usize) -> Self {
82        Self {
83            max_messages: max_messages.max(1),
84            max_events: max_events.max(1),
85            max_approx_bytes: None,
86            recovery: TranscriptBudgetRecovery::Compact { keep_last },
87        }
88    }
89
90    pub fn with_max_approx_bytes(mut self, max_approx_bytes: Option<usize>) -> Self {
91        self.max_approx_bytes = max_approx_bytes.map(|limit| limit.max(1));
92        self
93    }
94
95    fn normalized(&self) -> Self {
96        Self {
97            max_messages: self.max_messages.max(1),
98            max_events: self.max_events.max(1),
99            max_approx_bytes: self.max_approx_bytes.map(|limit| limit.max(1)),
100            recovery: self.recovery.clone(),
101        }
102    }
103}
104
105impl Default for SessionTranscriptBudgetPolicy {
106    fn default() -> Self {
107        Self::reject(DEFAULT_TRANSCRIPT_MESSAGE_CAP, DEFAULT_TRANSCRIPT_EVENT_CAP)
108    }
109}
110
111pub struct SessionState {
112    pub id: String,
113    pub transcript: VmValue,
114    pub subscribers: Vec<VmValue>,
115    pub created_at: String,
116    pub last_accessed: Instant,
117    pub parent_id: Option<String>,
118    pub child_ids: Vec<String>,
119    pub branched_at_event_index: Option<usize>,
120    /// Names of skills that were active at the end of the most recent
121    /// `agent_loop` run on this session. Empty when no skills were
122    /// matched, when the skill system wasn't used, or when the
123    /// deactivation phase cleared them. Re-entering the session
124    /// restores these as the initial active set before matching runs.
125    pub active_skills: Vec<String>,
126    /// Tool-calling protocol claimed by the first agent loop that uses
127    /// this session. A transcript is only replayable under the same
128    /// contract that produced its prompt/history.
129    pub tool_format: Option<String>,
130    /// Stable session-level system prompt material. This is transcript
131    /// metadata, not a replay message: providers receive it through
132    /// their system/developer instruction channel on each call.
133    pub system_prompt: Option<String>,
134    /// Session-pinned model selector. When set, `llm_call` invocations
135    /// that do not pass an explicit `model:` option resolve to this
136    /// selector instead of `HARN_LLM_MODEL` / provider defaults. Mid-
137    /// session swap is exposed over ACP via `session/set_config_option`
138    /// (configId="model").
139    pub pinned_model: Option<String>,
140    /// Session-pinned high-level reasoning policy. When set, `llm_call`
141    /// invocations that do not pass explicit `thinking` or
142    /// `reasoning_effort` options resolve this provider-aware policy into
143    /// the route's native thinking shape. Exposed over ACP as
144    /// `session/set_config_option(configId="thought_level")`.
145    pub pinned_reasoning_policy: Option<String>,
146    /// Session-local workspace defaults. Persona and host policy layers
147    /// can update this without rewriting the current anchor.
148    pub workspace_policy: WorkspacePolicy,
149    /// Typed workspace anchor for the session. Primary path plus any
150    /// additional mounted roots; consumed by permission matchers, the
151    /// bundle exporter, and host-side cross-project handoff flows
152    /// (epic #2208). `None` until a host opens the session with one or
153    /// the ACP `reanchor` / `add_root` primitives populate it.
154    pub workspace_anchor: Option<WorkspaceAnchor>,
155    /// Small session-local working memory rendered into agent prompts by the
156    /// Harn stdlib agent loop. This is live state, not a replayed message.
157    pub scratchpad: Option<VmValue>,
158    pub scratchpad_version: u64,
159    pub transcript_budget_policy: SessionTranscriptBudgetPolicy,
160    pub last_transcript_budget_action: Option<serde_json::Value>,
161}
162
163impl SessionState {
164    fn new(id: String) -> Self {
165        let now = Instant::now();
166        let transcript = empty_transcript(&id);
167        Self {
168            id,
169            transcript,
170            subscribers: Vec::new(),
171            created_at: crate::orchestration::now_rfc3339(),
172            last_accessed: now,
173            parent_id: None,
174            child_ids: Vec::new(),
175            branched_at_event_index: None,
176            active_skills: Vec::new(),
177            tool_format: None,
178            system_prompt: None,
179            pinned_model: None,
180            pinned_reasoning_policy: None,
181            workspace_policy: WorkspacePolicy::default(),
182            workspace_anchor: None,
183            scratchpad: None,
184            scratchpad_version: 0,
185            transcript_budget_policy: default_transcript_budget_policy(),
186            last_transcript_budget_action: None,
187        }
188    }
189}
190
191#[derive(Clone, Debug, PartialEq, Eq)]
192pub struct SessionAncestry {
193    pub parent_id: Option<String>,
194    pub child_ids: Vec<String>,
195    pub root_id: String,
196}
197
198#[derive(Clone, Debug, PartialEq, Eq)]
199pub struct SessionTruncateResult {
200    pub kept_turn_count: usize,
201    pub removed_turn_count: usize,
202    pub new_tip_turn_id: Option<String>,
203}
204
205#[derive(Clone, Debug, PartialEq, Eq)]
206pub struct ReminderInjectionReport {
207    pub reminder_id: String,
208    pub deduped_count: usize,
209}
210
211thread_local! {
212    static SESSIONS: RefCell<HashMap<String, SessionState>> = RefCell::new(HashMap::new());
213    static SESSION_CAP: Cell<usize> = const { Cell::new(DEFAULT_SESSION_CAP) };
214    static DEFAULT_TRANSCRIPT_BUDGET_POLICY: RefCell<SessionTranscriptBudgetPolicy> =
215        RefCell::new(SessionTranscriptBudgetPolicy::default());
216    static CURRENT_SESSION_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
217    static CURRENT_TOOL_CALL_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
218}
219
220tokio::task_local! {
221    static CURRENT_TOOL_CALL_TASK: String;
222}
223
224pub struct CurrentSessionGuard {
225    active: bool,
226}
227
228impl Drop for CurrentSessionGuard {
229    fn drop(&mut self) {
230        if self.active {
231            pop_current_session();
232        }
233    }
234}
235
236/// RAII guard that scopes the active tool-call id for the running thread.
237///
238/// Set on entry to a tool dispatch and dropped on exit, so any hostlib
239/// builtin invoked under it (e.g. `tools/write_file`) can resolve the
240/// owning tool call without threading the id through every parameter.
241pub struct CurrentToolCallGuard {
242    active: bool,
243}
244
245impl Drop for CurrentToolCallGuard {
246    fn drop(&mut self) {
247        if self.active {
248            pop_current_tool_call();
249        }
250    }
251}
252
253/// Set the per-thread session cap. Primarily for tests; production VMs
254/// inherit the default.
255pub fn set_session_cap(cap: usize) {
256    SESSION_CAP.with(|c| c.set(cap.max(1)));
257}
258
259pub fn session_cap() -> usize {
260    SESSION_CAP.with(|c| c.get())
261}
262
263pub fn set_default_transcript_budget_policy(policy: SessionTranscriptBudgetPolicy) {
264    DEFAULT_TRANSCRIPT_BUDGET_POLICY.with(|cell| {
265        *cell.borrow_mut() = policy.normalized();
266    });
267}
268
269pub fn reset_default_transcript_budget_policy() {
270    set_default_transcript_budget_policy(SessionTranscriptBudgetPolicy::default());
271}
272
273pub fn default_transcript_budget_policy() -> SessionTranscriptBudgetPolicy {
274    DEFAULT_TRANSCRIPT_BUDGET_POLICY.with(|cell| cell.borrow().clone())
275}
276
277pub fn transcript_budget_policy(id: &str) -> Option<SessionTranscriptBudgetPolicy> {
278    SESSIONS.with(|s| {
279        s.borrow()
280            .get(id)
281            .map(|state| state.transcript_budget_policy.clone())
282    })
283}
284
285pub fn set_transcript_budget_policy(
286    id: &str,
287    policy: SessionTranscriptBudgetPolicy,
288) -> Result<(), String> {
289    SESSIONS.with(|s| {
290        let mut map = s.borrow_mut();
291        let Some(state) = map.get_mut(id) else {
292            return Err(format!("agent session '{id}' does not exist"));
293        };
294        let previous = state.transcript_budget_policy.clone();
295        let previous_action = state.last_transcript_budget_action.clone();
296        state.transcript_budget_policy = policy.normalized();
297        let candidate = state.transcript.clone();
298        if let Err(error) = apply_transcript_with_budget(state, candidate, "policy_update") {
299            state.transcript_budget_policy = previous;
300            state.last_transcript_budget_action = previous_action;
301            return Err(error);
302        }
303        state.last_accessed = Instant::now();
304        Ok(())
305    })
306}
307
308/// Clear the session store. Wired into `reset_llm_state` for test isolation.
309pub fn reset_session_store() {
310    SESSIONS.with(|s| s.borrow_mut().clear());
311    CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().clear());
312    CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow_mut().clear());
313    reset_default_transcript_budget_policy();
314}
315
316pub(crate) fn push_current_session(id: String) {
317    if id.is_empty() {
318        return;
319    }
320    CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().push(id));
321}
322
323pub(crate) fn pop_current_session() {
324    CURRENT_SESSION_STACK.with(|stack| {
325        let _ = stack.borrow_mut().pop();
326    });
327}
328
329pub fn current_session_id() -> Option<String> {
330    CURRENT_SESSION_STACK.with(|stack| stack.borrow().last().cloned())
331}
332
333pub fn enter_current_session(id: impl Into<String>) -> CurrentSessionGuard {
334    let id = id.into();
335    if id.trim().is_empty() {
336        return CurrentSessionGuard { active: false };
337    }
338    push_current_session(id);
339    CurrentSessionGuard { active: true }
340}
341
342fn push_current_tool_call(id: String) {
343    if id.is_empty() {
344        return;
345    }
346    CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow_mut().push(id));
347}
348
349fn pop_current_tool_call() {
350    CURRENT_TOOL_CALL_STACK.with(|stack| {
351        let _ = stack.borrow_mut().pop();
352    });
353}
354
355/// Return the active tool-call id for the current thread, if any.
356///
357/// Hostlib builtins consult this to attribute side-effect snapshots to
358/// the owning ACP `toolCallId` without callers passing it explicitly.
359pub fn current_tool_call_id() -> Option<String> {
360    if let Ok(id) = CURRENT_TOOL_CALL_TASK.try_with(Clone::clone) {
361        if !id.trim().is_empty() {
362            return Some(id);
363        }
364    }
365    CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow().last().cloned())
366}
367
368/// Scope the active tool-call id to one async task.
369///
370/// Parallel tool dispatch runs sibling calls on the same OS thread, so
371/// thread-local guards alone cannot preserve attribution across `.await`
372/// points. Tokio task-local scoping follows the future instead.
373pub async fn scope_current_tool_call<F, T>(id: impl Into<String>, future: F) -> T
374where
375    F: Future<Output = T>,
376{
377    let id = id.into();
378    if id.trim().is_empty() {
379        future.await
380    } else {
381        CURRENT_TOOL_CALL_TASK.scope(id, future).await
382    }
383}
384
385/// Scope the active tool-call id for the duration of the returned guard.
386pub fn enter_current_tool_call(id: impl Into<String>) -> CurrentToolCallGuard {
387    let id = id.into();
388    if id.trim().is_empty() {
389        return CurrentToolCallGuard { active: false };
390    }
391    push_current_tool_call(id);
392    CurrentToolCallGuard { active: true }
393}
394
395pub fn exists(id: &str) -> bool {
396    SESSIONS.with(|s| s.borrow().contains_key(id))
397}
398
399pub fn length(id: &str) -> Option<usize> {
400    SESSIONS.with(|s| {
401        s.borrow().get(id).map(|state| {
402            state
403                .transcript
404                .as_dict()
405                .and_then(|d| d.get("messages"))
406                .and_then(|v| match v {
407                    VmValue::List(list) => Some(list.len()),
408                    _ => None,
409                })
410                .unwrap_or(0)
411        })
412    })
413}
414
415pub fn scratchpad(id: &str) -> Option<VmValue> {
416    SESSIONS.with(|s| {
417        s.borrow()
418            .get(id)
419            .and_then(|state| state.scratchpad.clone())
420    })
421}
422
423pub fn scratchpad_version(id: &str) -> Option<u64> {
424    SESSIONS.with(|s| s.borrow().get(id).map(|state| state.scratchpad_version))
425}
426
427pub fn set_scratchpad(
428    id: &str,
429    scratchpad: VmValue,
430    source: impl Into<String>,
431    reason: Option<String>,
432    metadata: serde_json::Value,
433) -> Result<u64, String> {
434    validate_scratchpad_value(&scratchpad)?;
435    SESSIONS.with(|s| {
436        let mut map = s.borrow_mut();
437        let Some(state) = map.get_mut(id) else {
438            return Err(format!("agent session '{id}' does not exist"));
439        };
440        let version = state.scratchpad_version.saturating_add(1);
441        let event = scratchpad_transcript_event(
442            "set",
443            version,
444            Some(&scratchpad),
445            source.into(),
446            reason,
447            metadata,
448        );
449        append_event_to_state(state, event, "set_scratchpad")?;
450        state.scratchpad = Some(scratchpad);
451        state.scratchpad_version = version;
452        state.last_accessed = Instant::now();
453        Ok(version)
454    })
455}
456
457pub fn clear_scratchpad(
458    id: &str,
459    source: impl Into<String>,
460    reason: Option<String>,
461    metadata: serde_json::Value,
462) -> Result<u64, String> {
463    SESSIONS.with(|s| {
464        let mut map = s.borrow_mut();
465        let Some(state) = map.get_mut(id) else {
466            return Err(format!("agent session '{id}' does not exist"));
467        };
468        let version = state.scratchpad_version.saturating_add(1);
469        let event =
470            scratchpad_transcript_event("clear", version, None, source.into(), reason, metadata);
471        append_event_to_state(state, event, "clear_scratchpad")?;
472        state.scratchpad = None;
473        state.scratchpad_version = version;
474        state.last_accessed = Instant::now();
475        Ok(version)
476    })
477}
478
479fn validate_scratchpad_value(value: &VmValue) -> Result<(), String> {
480    if !matches!(value, VmValue::Dict(_)) {
481        return Err("agent session scratchpad must be a dict".to_string());
482    }
483    let json = crate::llm::helpers::vm_value_to_json(value);
484    let approx_bytes = serde_json::to_vec(&json)
485        .map(|bytes| bytes.len())
486        .unwrap_or(usize::MAX);
487    if approx_bytes > MAX_SCRATCHPAD_BYTES {
488        return Err(format!(
489            "agent session scratchpad is {approx_bytes} bytes; max is {MAX_SCRATCHPAD_BYTES}"
490        ));
491    }
492    Ok(())
493}
494
495fn scratchpad_transcript_event(
496    action: &str,
497    version: u64,
498    scratchpad: Option<&VmValue>,
499    source: String,
500    reason: Option<String>,
501    metadata: serde_json::Value,
502) -> VmValue {
503    let scratchpad_json = scratchpad.map(crate::llm::helpers::vm_value_to_json);
504    let approx_bytes = scratchpad_json
505        .as_ref()
506        .and_then(|value| serde_json::to_vec(value).ok().map(|bytes| bytes.len()))
507        .unwrap_or(0);
508    let event_metadata = serde_json::json!({
509        "action": action,
510        "version": version,
511        "source": normalize_scratchpad_source(source),
512        "reason": reason.unwrap_or_default(),
513        "approx_bytes": approx_bytes,
514        "counts": scratchpad_json
515            .as_ref()
516            .map(scratchpad_counts_json)
517            .unwrap_or_else(|| serde_json::json!({})),
518        "metadata": metadata,
519    });
520    let content = format!("Agent scratchpad {action}");
521    crate::llm::helpers::transcript_event(
522        "agent_scratchpad",
523        "system",
524        "internal",
525        &content,
526        Some(event_metadata),
527    )
528}
529
530fn normalize_scratchpad_source(source: String) -> String {
531    let trimmed = source.trim();
532    if trimmed.is_empty() {
533        "harn.agent_scratchpad".to_string()
534    } else {
535        trimmed.to_string()
536    }
537}
538
539fn scratchpad_counts_json(value: &serde_json::Value) -> serde_json::Value {
540    serde_json::json!({
541        "goals": scratchpad_array_len(value, "goals"),
542        "open_items": scratchpad_array_len(value, "open_items"),
543        "facts": scratchpad_array_len(value, "facts"),
544        "refs": scratchpad_array_len(value, "refs"),
545    })
546}
547
548fn scratchpad_array_len(value: &serde_json::Value, key: &str) -> usize {
549    value
550        .get(key)
551        .and_then(serde_json::Value::as_array)
552        .map(Vec::len)
553        .unwrap_or(0)
554}
555
556pub fn snapshot(id: &str) -> Option<VmValue> {
557    SESSIONS.with(|s| s.borrow().get(id).map(session_snapshot))
558}
559
560/// Session-only fields stay on `agent_session_snapshot`.
561pub fn transcript(id: &str) -> Option<VmValue> {
562    SESSIONS.with(|s| {
563        s.borrow()
564            .get(id)
565            .map(|state| transcript_with_session_metadata(state.transcript.clone(), state))
566    })
567}
568
569/// Open a session, or create it if missing. Returns the resolved id.
570///
571/// Newly-created sessions auto-register an event-log-backed sink when a
572/// generalized [`crate::event_log::EventLog`] has been installed for the
573/// current VM thread. For legacy env-driven workflows that still point
574/// `HARN_EVENT_LOG_DIR` at a directory, we preserve the older JSONL sink
575/// as a compatibility fallback. Re-opening an existing session does not
576/// re-register — sinks are per-session, owned by the first opener.
577pub fn open_or_create(id: Option<String>) -> String {
578    let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
579    let parent_session = current_session_id();
580    let mut was_new = false;
581    SESSIONS.with(|s| {
582        let mut map = s.borrow_mut();
583        if let Some(state) = map.get_mut(&resolved) {
584            state.last_accessed = Instant::now();
585            return;
586        }
587        was_new = true;
588        let cap = SESSION_CAP.with(|c| c.get());
589        if map.len() >= cap {
590            if let Some(victim) = map
591                .iter()
592                .min_by_key(|(_, state)| state.last_accessed)
593                .map(|(id, _)| id.clone())
594            {
595                map.remove(&victim);
596            }
597        }
598        map.insert(resolved.clone(), SessionState::new(resolved.clone()));
599    });
600    if was_new {
601        if let Some(parent) = parent_session.as_deref() {
602            crate::agent_events::mirror_session_sinks(parent, &resolved);
603        }
604        try_register_event_log(&resolved);
605    }
606    resolved
607}
608
609pub fn open_child_session(parent_id: &str, id: Option<String>) -> String {
610    let resolved = open_or_create(id);
611    link_child_session(parent_id, &resolved);
612    resolved
613}
614
615pub fn link_child_session(parent_id: &str, child_id: &str) {
616    link_child_session_with_branch(parent_id, child_id, None);
617}
618
619pub fn link_child_session_with_branch(
620    parent_id: &str,
621    child_id: &str,
622    branched_at_event_index: Option<usize>,
623) {
624    if parent_id == child_id {
625        return;
626    }
627    open_or_create(Some(parent_id.to_string()));
628    open_or_create(Some(child_id.to_string()));
629    SESSIONS.with(|s| {
630        let mut map = s.borrow_mut();
631        update_lineage(&mut map, parent_id, child_id, branched_at_event_index);
632    });
633}
634
635pub fn parent_id(id: &str) -> Option<String> {
636    SESSIONS.with(|s| s.borrow().get(id).and_then(|state| state.parent_id.clone()))
637}
638
639pub fn child_ids(id: &str) -> Vec<String> {
640    SESSIONS.with(|s| {
641        s.borrow()
642            .get(id)
643            .map(|state| state.child_ids.clone())
644            .unwrap_or_default()
645    })
646}
647
648pub fn ancestry(id: &str) -> Option<SessionAncestry> {
649    SESSIONS.with(|s| {
650        let map = s.borrow();
651        let state = map.get(id)?;
652        let mut root_id = state.id.clone();
653        let mut cursor = state.parent_id.clone();
654        let mut seen = HashSet::from([state.id.clone()]);
655        while let Some(parent_id) = cursor {
656            if !seen.insert(parent_id.clone()) {
657                break;
658            }
659            root_id = parent_id.clone();
660            cursor = map
661                .get(&parent_id)
662                .and_then(|parent| parent.parent_id.clone());
663        }
664        Some(SessionAncestry {
665            parent_id: state.parent_id.clone(),
666            child_ids: state.child_ids.clone(),
667            root_id,
668        })
669    })
670}
671
672/// Auto-register a persistent sink for a newly-created session.
673/// Silent no-op on failure — a broken observability sink must never
674/// prevent a session from starting.
675fn try_register_event_log(session_id: &str) {
676    if let Some(log) = crate::event_log::active_event_log() {
677        crate::agent_events::register_sink(
678            session_id,
679            crate::agent_events::EventLogSink::new(log, session_id),
680        );
681        return;
682    }
683    let Ok(dir) = std::env::var("HARN_EVENT_LOG_DIR") else {
684        return;
685    };
686    if dir.is_empty() {
687        return;
688    }
689    let path = std::path::PathBuf::from(dir).join(format!("event_log-{session_id}.jsonl"));
690    if let Ok(sink) = crate::agent_events::JsonlEventSink::open(&path) {
691        crate::agent_events::register_sink(session_id, sink);
692    }
693}
694
695pub fn register_event_log_sink(session_id: &str) {
696    try_register_event_log(session_id);
697}
698
699pub fn close(id: &str) {
700    SESSIONS.with(|s| {
701        s.borrow_mut().remove(id);
702    });
703    // Cross-thread per-session state must be released too, otherwise
704    // pending inbox entries can be delivered to a future session that
705    // happens to reuse the same id.
706    crate::orchestration::agent_inbox::clear_session(id);
707    crate::agent_events::clear_session_sinks(id);
708}
709
710pub fn close_with_status(
711    id: &str,
712    reason: impl Into<String>,
713    status: impl Into<String>,
714    metadata: serde_json::Value,
715) -> bool {
716    if !exists(id) {
717        return false;
718    }
719    let reason = reason.into();
720    let status = status.into();
721    let event_metadata = serde_json::json!({
722        "reason": reason,
723        "status": status,
724        "metadata": metadata,
725    });
726    let transcript_event = crate::llm::helpers::transcript_event(
727        "agent_session_closed",
728        "system",
729        "internal",
730        "Agent session closed",
731        Some(event_metadata),
732    );
733    let _ = append_event(id, transcript_event);
734    crate::llm::emit_live_agent_event_sync(&crate::agent_events::AgentEvent::SessionClosed {
735        session_id: id.to_string(),
736        reason,
737        status,
738        metadata,
739    });
740    close(id);
741    true
742}
743
744pub fn reset_transcript(id: &str) -> bool {
745    SESSIONS.with(|s| {
746        let mut map = s.borrow_mut();
747        let Some(state) = map.get_mut(id) else {
748            return false;
749        };
750        state.transcript = empty_transcript(id);
751        state.tool_format = None;
752        state.system_prompt = None;
753        state.scratchpad = None;
754        state.scratchpad_version = 0;
755        state.last_transcript_budget_action = None;
756        state.last_accessed = Instant::now();
757        true
758    })
759}
760
761/// Copy `src`'s transcript into a new session id. Subscribers are NOT
762/// copied — a fork is a conversation branch, not an event fanout.
763///
764/// Touches `src`'s `last_accessed` before evicting, so the fork
765/// operation itself can't make `src` look stale and kick it out of
766/// the LRU just to make room for the new fork.
767pub fn fork(src_id: &str, dst_id: Option<String>) -> Option<String> {
768    let (
769        src_transcript,
770        src_tool_format,
771        src_system_prompt,
772        src_pinned_model,
773        src_pinned_reasoning_policy,
774        src_workspace_anchor,
775        src_workspace_policy,
776        src_scratchpad,
777        src_scratchpad_version,
778        src_transcript_budget_policy,
779        src_last_transcript_budget_action,
780        dst,
781    ) = SESSIONS.with(|s| {
782        let mut map = s.borrow_mut();
783        let src = map.get_mut(src_id)?;
784        src.last_accessed = Instant::now();
785        let dst = dst_id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
786        let forked_transcript = clone_transcript_with_id(&src.transcript, &dst);
787        Some((
788            forked_transcript,
789            src.tool_format.clone(),
790            src.system_prompt.clone(),
791            src.pinned_model.clone(),
792            src.pinned_reasoning_policy.clone(),
793            src.workspace_anchor.clone(),
794            src.workspace_policy.clone(),
795            src.scratchpad.clone(),
796            src.scratchpad_version,
797            src.transcript_budget_policy.clone(),
798            src.last_transcript_budget_action.clone(),
799            dst,
800        ))
801    })?;
802    // Ensure cap is respected when inserting the fork.
803    open_or_create(Some(dst.clone()));
804    SESSIONS.with(|s| {
805        let mut map = s.borrow_mut();
806        if let Some(state) = map.get_mut(&dst) {
807            state.transcript = src_transcript;
808            state.tool_format = src_tool_format;
809            state.system_prompt = src_system_prompt;
810            state.pinned_model = src_pinned_model;
811            state.pinned_reasoning_policy = src_pinned_reasoning_policy;
812            state.workspace_anchor = src_workspace_anchor;
813            state.workspace_policy = src_workspace_policy;
814            state.scratchpad = src_scratchpad;
815            state.scratchpad_version = src_scratchpad_version;
816            state.transcript_budget_policy = src_transcript_budget_policy;
817            state.last_transcript_budget_action = src_last_transcript_budget_action;
818            state.last_accessed = Instant::now();
819        }
820        update_lineage(&mut map, src_id, &dst, None);
821    });
822    let budget_ok = SESSIONS.with(|s| {
823        let mut map = s.borrow_mut();
824        let Some(state) = map.get_mut(&dst) else {
825            return false;
826        };
827        let candidate = state.transcript.clone();
828        apply_transcript_with_budget(state, candidate, "fork").is_ok()
829    });
830    if !budget_ok {
831        close(&dst);
832        return None;
833    }
834    // open_or_create evicts BEFORE inserting, so the dst slot is
835    // guaranteed once we get here. The existence check is cheap
836    // insurance against a future refactor that breaks that invariant.
837    if exists(&dst) {
838        Some(dst)
839    } else {
840        None
841    }
842}
843
844/// Fork `src_id` and truncate the destination transcript to the
845/// first `keep_first` messages (#105 — branch-replay). Pairs with the
846/// scrubber: the host picks an event index, rebuilds a message count,
847/// and calls this to spawn a live sibling session that resumes from
848/// the rebuilt state. Subscribers are not carried over (same as
849/// `fork`), so sibling events don't double-fan into the parent's
850/// consumers.
851///
852/// Returns the new session id on success, `None` if `src_id` doesn't
853/// exist.
854pub fn fork_at(src_id: &str, keep_first: usize, dst_id: Option<String>) -> Option<String> {
855    let branched_at_event_index = SESSIONS.with(|s| {
856        let map = s.borrow();
857        let src = map.get(src_id)?;
858        Some(branch_event_index(&src.transcript, keep_first))
859    })?;
860    let new_id = fork(src_id, dst_id)?;
861    link_child_session_with_branch(src_id, &new_id, Some(branched_at_event_index));
862    let _ = truncate(&new_id, keep_first);
863    Some(new_id)
864}
865
866/// Truncate the session transcript to the first `keep_first`
867/// messages (opposite of `trim`, which keeps the last N). Returns
868/// counts and the retained tip event id when the session exists.
869pub fn truncate(id: &str, keep_first: usize) -> Option<SessionTruncateResult> {
870    SESSIONS.with(|s| {
871        let mut map = s.borrow_mut();
872        let state = map.get_mut(id)?;
873        let result = truncate_state(state, keep_first)?;
874        Some(result)
875    })
876}
877
878fn truncate_state(state: &mut SessionState, keep_first: usize) -> Option<SessionTruncateResult> {
879    let dict = state
880        .transcript
881        .as_dict()
882        .cloned()
883        .unwrap_or_else(BTreeMap::new);
884    let messages: Vec<VmValue> = match dict.get("messages") {
885        Some(VmValue::List(list)) => list.iter().cloned().collect(),
886        _ => Vec::new(),
887    };
888    let existing_events = match dict.get("events") {
889        Some(VmValue::List(list)) => Some(list.iter().cloned().collect::<Vec<_>>()),
890        _ => None,
891    };
892    let kept_turn_count = keep_first.min(messages.len());
893    let removed_turn_count = messages.len().saturating_sub(kept_turn_count);
894    let mut new_tip_turn_id = existing_events
895        .as_ref()
896        .map(|events| turn_event_id_for_count(events, kept_turn_count))
897        .unwrap_or_else(|| {
898            let events = crate::llm::helpers::transcript_events_from_messages(&messages);
899            turn_event_id_for_count(&events, kept_turn_count)
900        });
901
902    if removed_turn_count > 0 {
903        let retained: Vec<VmValue> = messages.into_iter().take(kept_turn_count).collect();
904        let retained_events = match existing_events {
905            Some(events) => {
906                let keep_event_count = event_prefix_len_for_messages(&events, kept_turn_count);
907                events.into_iter().take(keep_event_count).collect()
908            }
909            None => crate::llm::helpers::transcript_events_from_messages(&retained),
910        };
911        new_tip_turn_id = turn_event_id_for_count(&retained_events, kept_turn_count);
912        let mut next = dict;
913        next.insert(
914            "events".to_string(),
915            VmValue::List(std::sync::Arc::new(retained_events)),
916        );
917        next.insert(
918            "messages".to_string(),
919            VmValue::List(std::sync::Arc::new(retained)),
920        );
921        next.remove("summary");
922        apply_transcript_with_budget(state, VmValue::Dict(std::sync::Arc::new(next)), "truncate")
923            .ok()?;
924    }
925    state.last_accessed = Instant::now();
926    Some(SessionTruncateResult {
927        kept_turn_count,
928        removed_turn_count,
929        new_tip_turn_id,
930    })
931}
932
933/// Pop the trailing message iff it is an assistant message. Used by
934/// `agent_step_judge` to remove a vetoed assistant turn before
935/// regeneration (the "replace" on_veto path). Returns `true` if a
936/// message was popped, `false` if the transcript was empty, and an
937/// error if the trailing message was not an assistant turn —
938/// signalling a call-site discipline bug rather than a runtime error.
939pub fn pop_last_if_assistant(id: &str) -> Result<bool, String> {
940    SESSIONS.with(|s| {
941        let mut map = s.borrow_mut();
942        let Some(state) = map.get_mut(id) else {
943            return Err(format!(
944                "pop_last_if_assistant: unknown session id '{id}'"
945            ));
946        };
947        let messages: Vec<VmValue> = match state.transcript.as_dict() {
948            Some(dict) => match dict.get("messages") {
949                Some(VmValue::List(list)) => list.iter().cloned().collect(),
950                _ => Vec::new(),
951            },
952            None => Vec::new(),
953        };
954        if messages.is_empty() {
955            return Ok(false);
956        }
957        let trailing_role = messages
958            .last()
959            .and_then(|m| m.as_dict())
960            .and_then(|d| d.get("role"))
961            .map(|v| v.display())
962            .unwrap_or_default();
963        if trailing_role != "assistant" {
964            return Err(format!(
965                "pop_last_if_assistant: trailing message role is '{trailing_role}', expected 'assistant'"
966            ));
967        }
968        let keep = messages.len() - 1;
969        truncate_state(state, keep);
970        Ok(true)
971    })
972}
973
974pub fn trim(id: &str, keep_last: usize) -> Option<usize> {
975    SESSIONS.with(|s| {
976        let mut map = s.borrow_mut();
977        let state = map.get_mut(id)?;
978        let dict = state.transcript.as_dict()?.clone();
979        let messages: Vec<VmValue> = match dict.get("messages") {
980            Some(VmValue::List(list)) => list.iter().cloned().collect(),
981            _ => Vec::new(),
982        };
983        let start = messages.len().saturating_sub(keep_last);
984        let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
985        let kept = retained.len();
986        let mut next = dict;
987        next.insert(
988            "events".to_string(),
989            VmValue::List(std::sync::Arc::new(
990                crate::llm::helpers::transcript_events_from_messages(&retained),
991            )),
992        );
993        next.insert(
994            "messages".to_string(),
995            VmValue::List(std::sync::Arc::new(retained)),
996        );
997        apply_transcript_with_budget(state, VmValue::Dict(std::sync::Arc::new(next)), "trim")
998            .ok()?;
999        state.last_accessed = Instant::now();
1000        Some(kept)
1001    })
1002}
1003
1004/// Append a message dict to the session transcript. The message must
1005/// have at least a string `role`; anything else is merged verbatim.
1006pub fn inject_message(id: &str, message: VmValue) -> Result<(), String> {
1007    let Some(msg_dict) = message.as_dict().cloned() else {
1008        return Err("agent_session_inject: message must be a dict".into());
1009    };
1010    let role_ok = matches!(msg_dict.get("role"), Some(VmValue::String(_)));
1011    if !role_ok {
1012        return Err(
1013            "agent_session_inject: message must have a string `role` (user|assistant|tool_result|system)"
1014                .into(),
1015        );
1016    }
1017    SESSIONS.with(|s| {
1018        let mut map = s.borrow_mut();
1019        let Some(state) = map.get_mut(id) else {
1020            return Err(format!("agent_session_inject: unknown session id '{id}'"));
1021        };
1022        let dict = state
1023            .transcript
1024            .as_dict()
1025            .cloned()
1026            .unwrap_or_else(BTreeMap::new);
1027        let mut messages: Vec<VmValue> = match dict.get("messages") {
1028            Some(VmValue::List(list)) => list.iter().cloned().collect(),
1029            _ => Vec::new(),
1030        };
1031        let mut events: Vec<VmValue> = match dict.get("events") {
1032            Some(VmValue::List(list)) => list.iter().cloned().collect(),
1033            _ => crate::llm::helpers::transcript_events_from_messages(&messages),
1034        };
1035        let new_message = VmValue::Dict(std::sync::Arc::new(msg_dict));
1036        let message_index = messages.len();
1037        events.push(crate::llm::helpers::transcript_event_from_message(
1038            &new_message,
1039        ));
1040        messages.push(new_message);
1041        let mut next = dict;
1042        next.insert(
1043            "events".to_string(),
1044            VmValue::List(std::sync::Arc::new(events)),
1045        );
1046        next.insert(
1047            "messages".to_string(),
1048            VmValue::List(std::sync::Arc::new(messages)),
1049        );
1050        let persisted_message = next
1051            .get("messages")
1052            .and_then(|value| match value {
1053                VmValue::List(list) => list.get(message_index).cloned(),
1054                _ => None,
1055            })
1056            .unwrap_or(VmValue::Nil);
1057        apply_transcript_with_budget(
1058            state,
1059            VmValue::Dict(std::sync::Arc::new(next)),
1060            "inject_message",
1061        )?;
1062        emit_identified_user_message_event(id, &persisted_message);
1063        emit_llm_message_event(id, message_index, &persisted_message);
1064        state.last_accessed = Instant::now();
1065        Ok(())
1066    })
1067}
1068
1069fn emit_identified_user_message_event(session_id: &str, message: &VmValue) {
1070    let message_json = crate::llm::helpers::vm_value_to_json(message);
1071    let role = message_json.get("role").and_then(|value| value.as_str());
1072    if role != Some("user") {
1073        return;
1074    }
1075    let Some(message_id) = message_json
1076        .get("messageId")
1077        .or_else(|| message_json.get("message_id"))
1078        .and_then(|value| value.as_str())
1079        .filter(|value| !value.trim().is_empty())
1080    else {
1081        return;
1082    };
1083    let content = message_json
1084        .get("content")
1085        .map(user_message_content_blocks)
1086        .unwrap_or_default();
1087    crate::agent_events::emit_event(&crate::agent_events::AgentEvent::UserMessage {
1088        session_id: session_id.to_string(),
1089        message_id: message_id.to_string(),
1090        content,
1091    });
1092}
1093
1094fn user_message_content_blocks(content: &serde_json::Value) -> Vec<serde_json::Value> {
1095    match content {
1096        serde_json::Value::Array(items) => items.clone(),
1097        serde_json::Value::String(text) => vec![serde_json::json!({
1098            "type": "text",
1099            "text": text,
1100        })],
1101        other => vec![serde_json::json!({
1102            "type": "text",
1103            "text": other.to_string(),
1104        })],
1105    }
1106}
1107
1108#[derive(Clone, Debug, PartialEq, Eq)]
1109struct TranscriptBudgetUsage {
1110    message_count: usize,
1111    event_count: usize,
1112    approx_bytes: Option<usize>,
1113}
1114
1115fn transcript_messages_from_dict(dict: &BTreeMap<String, VmValue>) -> Vec<VmValue> {
1116    match dict.get("messages") {
1117        Some(VmValue::List(list)) => list.iter().cloned().collect(),
1118        _ => Vec::new(),
1119    }
1120}
1121
1122fn transcript_events_from_dict(dict: &BTreeMap<String, VmValue>) -> Vec<VmValue> {
1123    match dict.get("events") {
1124        Some(VmValue::List(list)) => list.iter().cloned().collect(),
1125        _ => {
1126            let messages = transcript_messages_from_dict(dict);
1127            crate::llm::helpers::transcript_events_from_messages(&messages)
1128        }
1129    }
1130}
1131
1132fn transcript_usage(transcript: &VmValue, include_bytes: bool) -> TranscriptBudgetUsage {
1133    let Some(dict) = transcript.as_dict() else {
1134        return TranscriptBudgetUsage {
1135            message_count: 0,
1136            event_count: 0,
1137            approx_bytes: include_bytes.then_some(0),
1138        };
1139    };
1140    let approx_bytes = if include_bytes {
1141        serde_json::to_vec(&crate::llm::helpers::vm_value_to_json(transcript))
1142            .map(|bytes| bytes.len())
1143            .ok()
1144            .or(Some(usize::MAX))
1145    } else {
1146        None
1147    };
1148    TranscriptBudgetUsage {
1149        message_count: transcript_messages_from_dict(dict).len(),
1150        event_count: transcript_events_from_dict(dict).len(),
1151        approx_bytes,
1152    }
1153}
1154
1155fn transcript_budget_exceeded_reason(
1156    usage: &TranscriptBudgetUsage,
1157    policy: &SessionTranscriptBudgetPolicy,
1158) -> Option<&'static str> {
1159    if usage.message_count > policy.max_messages {
1160        return Some("message_count");
1161    }
1162    if usage.event_count > policy.max_events {
1163        return Some("event_count");
1164    }
1165    if let (Some(bytes), Some(limit)) = (usage.approx_bytes, policy.max_approx_bytes) {
1166        if bytes > limit {
1167            return Some("approx_bytes");
1168        }
1169    }
1170    None
1171}
1172
1173fn transcript_budget_usage_json(usage: &TranscriptBudgetUsage) -> serde_json::Value {
1174    serde_json::json!({
1175        "messages": usage.message_count,
1176        "events": usage.event_count,
1177        "approx_bytes": usage.approx_bytes,
1178    })
1179}
1180
1181fn transcript_budget_policy_json(policy: &SessionTranscriptBudgetPolicy) -> serde_json::Value {
1182    let recovery = match &policy.recovery {
1183        TranscriptBudgetRecovery::Reject => serde_json::json!({"action": "reject"}),
1184        TranscriptBudgetRecovery::Trim { keep_last } => {
1185            serde_json::json!({"action": "trim", "keep_last": keep_last})
1186        }
1187        TranscriptBudgetRecovery::Compact { keep_last } => {
1188            serde_json::json!({"action": "compact", "keep_last": keep_last})
1189        }
1190    };
1191    serde_json::json!({
1192        "max_messages": policy.max_messages,
1193        "max_events": policy.max_events,
1194        "max_approx_bytes": policy.max_approx_bytes,
1195        "recovery": recovery,
1196    })
1197}
1198
1199fn transcript_budget_recovery_name(recovery: &TranscriptBudgetRecovery) -> &'static str {
1200    match recovery {
1201        TranscriptBudgetRecovery::Reject => "reject",
1202        TranscriptBudgetRecovery::Trim { .. } => "trim",
1203        TranscriptBudgetRecovery::Compact { .. } => "compact",
1204    }
1205}
1206
1207fn transcript_budget_error(
1208    state: &SessionState,
1209    policy: &SessionTranscriptBudgetPolicy,
1210    usage: &TranscriptBudgetUsage,
1211    reason: &str,
1212) -> String {
1213    let byte_suffix = match (usage.approx_bytes, policy.max_approx_bytes) {
1214        (Some(bytes), Some(limit)) => format!(", approx_bytes {bytes}/{limit}"),
1215        _ => String::new(),
1216    };
1217    format!(
1218        "transcript budget exceeded for session '{}': {reason} (messages {}/{}, events {}/{}{}; recovery={})",
1219        state.id,
1220        usage.message_count,
1221        policy.max_messages,
1222        usage.event_count,
1223        policy.max_events,
1224        byte_suffix,
1225        transcript_budget_recovery_name(&policy.recovery),
1226    )
1227}
1228
1229fn transcript_budget_audit_json(
1230    action: &str,
1231    source: &str,
1232    reason: &str,
1233    policy: &SessionTranscriptBudgetPolicy,
1234    usage_before: &TranscriptBudgetUsage,
1235    usage_attempted: &TranscriptBudgetUsage,
1236    usage_after: &TranscriptBudgetUsage,
1237) -> serde_json::Value {
1238    serde_json::json!({
1239        "action": action,
1240        "source": source,
1241        "reason": reason,
1242        "policy": transcript_budget_policy_json(policy),
1243        "usage_before": transcript_budget_usage_json(usage_before),
1244        "usage_attempted": transcript_budget_usage_json(usage_attempted),
1245        "usage_after": transcript_budget_usage_json(usage_after),
1246        "removed_messages": usage_attempted.message_count.saturating_sub(usage_after.message_count),
1247        "removed_events": usage_attempted.event_count.saturating_sub(usage_after.event_count),
1248    })
1249}
1250
1251fn transcript_budget_event(audit: &serde_json::Value) -> VmValue {
1252    let action = audit
1253        .get("action")
1254        .and_then(serde_json::Value::as_str)
1255        .unwrap_or("enforced");
1256    crate::llm::helpers::transcript_event(
1257        "transcript_budget",
1258        "system",
1259        "internal",
1260        &format!("Transcript budget {action}."),
1261        Some(audit.clone()),
1262    )
1263}
1264
1265fn append_event_to_transcript(transcript: VmValue, event: VmValue) -> VmValue {
1266    let Some(dict) = transcript.as_dict() else {
1267        return transcript;
1268    };
1269    let mut next = dict.clone();
1270    let mut events = transcript_events_from_dict(&next);
1271    events.push(event);
1272    next.insert(
1273        "events".to_string(),
1274        VmValue::List(std::sync::Arc::new(events)),
1275    );
1276    VmValue::Dict(std::sync::Arc::new(next))
1277}
1278
1279fn tail_message_capacity(
1280    policy: &SessionTranscriptBudgetPolicy,
1281    reserve_audit_event: bool,
1282) -> usize {
1283    let event_capacity = tail_event_capacity(policy, usize::from(reserve_audit_event));
1284    policy.max_messages.min(event_capacity)
1285}
1286
1287fn tail_event_capacity(policy: &SessionTranscriptBudgetPolicy, reserved_events: usize) -> usize {
1288    policy.max_events.saturating_sub(reserved_events)
1289}
1290
1291fn trim_transcript_for_budget(
1292    transcript: &VmValue,
1293    policy: &SessionTranscriptBudgetPolicy,
1294    keep_last: usize,
1295) -> VmValue {
1296    let dict = transcript.as_dict().cloned().unwrap_or_else(BTreeMap::new);
1297    let messages = transcript_messages_from_dict(&dict);
1298    let keep = keep_last.min(tail_message_capacity(policy, true));
1299    let start = messages.len().saturating_sub(keep);
1300    let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
1301    let mut next = dict;
1302    next.insert(
1303        "events".to_string(),
1304        VmValue::List(std::sync::Arc::new(
1305            crate::llm::helpers::transcript_events_from_messages(&retained),
1306        )),
1307    );
1308    next.insert(
1309        "messages".to_string(),
1310        VmValue::List(std::sync::Arc::new(retained)),
1311    );
1312    next.remove("summary");
1313    VmValue::Dict(std::sync::Arc::new(next))
1314}
1315
1316struct BudgetCompactionLiveEvent {
1317    policy: crate::orchestration::CompactionPolicy,
1318    policy_strategy: String,
1319    metrics: crate::orchestration::TranscriptCompactedEventMetrics,
1320}
1321
1322struct BudgetCompactionResult {
1323    transcript: VmValue,
1324    live_event: Option<BudgetCompactionLiveEvent>,
1325}
1326
1327fn compact_transcript_for_budget(
1328    transcript: &VmValue,
1329    policy: &SessionTranscriptBudgetPolicy,
1330    keep_last: usize,
1331    session_id: &str,
1332) -> BudgetCompactionResult {
1333    let dict = transcript.as_dict().cloned().unwrap_or_else(BTreeMap::new);
1334    let messages = transcript_messages_from_dict(&dict);
1335    let message_capacity = policy.max_messages.min(tail_event_capacity(policy, 2));
1336    // Auto-compaction may widen a suffix to start on a clean user-turn boundary,
1337    // so reserve one extra slot beyond the summary when sizing for hard caps.
1338    let tail_keep = keep_last.min(message_capacity.saturating_sub(2));
1339    let mut config = crate::orchestration::AutoCompactConfig {
1340        token_threshold: 0,
1341        keep_last: tail_keep,
1342        compact_strategy: crate::orchestration::CompactStrategy::Llm,
1343        hard_limit_strategy: crate::orchestration::CompactStrategy::Truncate,
1344        fallback_strategy: Some(crate::orchestration::CompactStrategy::Truncate),
1345        policy_strategy: crate::orchestration::compact_strategy_name(
1346            &crate::orchestration::CompactStrategy::Llm,
1347        )
1348        .to_string(),
1349        ..Default::default()
1350    };
1351
1352    let mut json_messages = messages
1353        .iter()
1354        .map(crate::llm::helpers::vm_value_to_json)
1355        .collect::<Vec<_>>();
1356    let lifecycle =
1357        crate::orchestration::CompactLifecycle::new(crate::orchestration::CompactMode::Auto)
1358            .with_session_id(Some(session_id))
1359            .with_trigger(crate::orchestration::CompactionTrigger::BudgetPressure)
1360            .with_hook_dispatch(false)
1361            .with_evaluate_providers(false);
1362    let llm_opts = crate::llm::extract_llm_options(&[
1363        VmValue::String(std::sync::Arc::from("")),
1364        VmValue::Nil,
1365        VmValue::Nil,
1366    ])
1367    .ok();
1368    let outcome = futures::executor::block_on(crate::orchestration::run_compaction_lifecycle(
1369        &mut json_messages,
1370        &mut config,
1371        llm_opts.as_ref(),
1372        lifecycle,
1373    ))
1374    .ok()
1375    .flatten();
1376
1377    let retained = json_messages
1378        .iter()
1379        .map(crate::stdlib::json_to_vm_value)
1380        .collect::<Vec<_>>();
1381    let mut events = crate::llm::helpers::transcript_events_from_messages(&retained);
1382    let summary = outcome.as_ref().map(|outcome| outcome.summary.clone());
1383    let mut live_event = None;
1384    if let Some(outcome) = outcome {
1385        events.push(crate::llm::helpers::transcript_event(
1386            "compaction",
1387            "system",
1388            "internal",
1389            "",
1390            Some(outcome.event_metadata.clone()),
1391        ));
1392        live_event = Some(BudgetCompactionLiveEvent {
1393            policy: config.policy.clone(),
1394            policy_strategy: outcome.policy_strategy,
1395            metrics: crate::orchestration::TranscriptCompactedEventMetrics {
1396                archived_messages: outcome.archived_messages,
1397                estimated_tokens_before: outcome.estimated_tokens_before,
1398                estimated_tokens_after: outcome.estimated_tokens_after,
1399                snapshot_asset_id: outcome.snapshot_asset_id,
1400            },
1401        });
1402    }
1403
1404    let mut next = dict;
1405    next.insert(
1406        "events".to_string(),
1407        VmValue::List(std::sync::Arc::new(events)),
1408    );
1409    next.insert(
1410        "messages".to_string(),
1411        VmValue::List(std::sync::Arc::new(retained)),
1412    );
1413    if let Some(summary) = summary {
1414        next.insert(
1415            "summary".to_string(),
1416            VmValue::String(std::sync::Arc::from(summary)),
1417        );
1418    } else {
1419        next.remove("summary");
1420    }
1421    BudgetCompactionResult {
1422        transcript: VmValue::Dict(std::sync::Arc::new(next)),
1423        live_event,
1424    }
1425}
1426
1427fn recovered_transcript_with_audit(
1428    recovered: VmValue,
1429    action: &str,
1430    source: &str,
1431    reason: &str,
1432    policy: &SessionTranscriptBudgetPolicy,
1433    usage_before: &TranscriptBudgetUsage,
1434    usage_attempted: &TranscriptBudgetUsage,
1435    include_bytes: bool,
1436) -> (VmValue, serde_json::Value, TranscriptBudgetUsage) {
1437    let usage_after_without_audit = transcript_usage(&recovered, include_bytes);
1438    let initial_audit = transcript_budget_audit_json(
1439        action,
1440        source,
1441        reason,
1442        policy,
1443        usage_before,
1444        usage_attempted,
1445        &usage_after_without_audit,
1446    );
1447    let with_initial_audit =
1448        append_event_to_transcript(recovered.clone(), transcript_budget_event(&initial_audit));
1449    let usage_after = transcript_usage(&with_initial_audit, include_bytes);
1450    let audit = transcript_budget_audit_json(
1451        action,
1452        source,
1453        reason,
1454        policy,
1455        usage_before,
1456        usage_attempted,
1457        &usage_after,
1458    );
1459    let with_audit = append_event_to_transcript(recovered, transcript_budget_event(&audit));
1460    let usage_after = transcript_usage(&with_audit, include_bytes);
1461    (with_audit, audit, usage_after)
1462}
1463
1464fn apply_transcript_with_budget(
1465    state: &mut SessionState,
1466    candidate: VmValue,
1467    source: &str,
1468) -> Result<(), String> {
1469    let policy = state.transcript_budget_policy.normalized();
1470    let include_bytes = policy.max_approx_bytes.is_some();
1471    let usage_before = transcript_usage(&state.transcript, include_bytes);
1472    let usage_attempted = transcript_usage(&candidate, include_bytes);
1473    let Some(reason) = transcript_budget_exceeded_reason(&usage_attempted, &policy) else {
1474        state.transcript = candidate;
1475        return Ok(());
1476    };
1477
1478    match policy.recovery.clone() {
1479        TranscriptBudgetRecovery::Reject => {
1480            let audit = transcript_budget_audit_json(
1481                "rejected",
1482                source,
1483                reason,
1484                &policy,
1485                &usage_before,
1486                &usage_attempted,
1487                &usage_before,
1488            );
1489            state.last_transcript_budget_action = Some(audit);
1490            Err(transcript_budget_error(
1491                state,
1492                &policy,
1493                &usage_attempted,
1494                reason,
1495            ))
1496        }
1497        TranscriptBudgetRecovery::Trim { keep_last } => {
1498            let recovered = trim_transcript_for_budget(&candidate, &policy, keep_last);
1499            let (with_audit, audit, usage_after) = recovered_transcript_with_audit(
1500                recovered,
1501                "trimmed",
1502                source,
1503                reason,
1504                &policy,
1505                &usage_before,
1506                &usage_attempted,
1507                include_bytes,
1508            );
1509            if transcript_budget_exceeded_reason(&usage_after, &policy).is_some() {
1510                let rejected = transcript_budget_audit_json(
1511                    "rejected",
1512                    source,
1513                    reason,
1514                    &policy,
1515                    &usage_before,
1516                    &usage_attempted,
1517                    &usage_after,
1518                );
1519                state.last_transcript_budget_action = Some(rejected);
1520                return Err(transcript_budget_error(
1521                    state,
1522                    &policy,
1523                    &usage_after,
1524                    reason,
1525                ));
1526            }
1527            state.last_transcript_budget_action = Some(audit);
1528            state.transcript = with_audit;
1529            Ok(())
1530        }
1531        TranscriptBudgetRecovery::Compact { keep_last } => {
1532            let compacted =
1533                compact_transcript_for_budget(&candidate, &policy, keep_last, &state.id);
1534            let (with_audit, audit, usage_after) = recovered_transcript_with_audit(
1535                compacted.transcript,
1536                "compacted",
1537                source,
1538                reason,
1539                &policy,
1540                &usage_before,
1541                &usage_attempted,
1542                include_bytes,
1543            );
1544            if transcript_budget_exceeded_reason(&usage_after, &policy).is_some() {
1545                let rejected = transcript_budget_audit_json(
1546                    "rejected",
1547                    source,
1548                    reason,
1549                    &policy,
1550                    &usage_before,
1551                    &usage_attempted,
1552                    &usage_after,
1553                );
1554                state.last_transcript_budget_action = Some(rejected);
1555                return Err(transcript_budget_error(
1556                    state,
1557                    &policy,
1558                    &usage_after,
1559                    reason,
1560                ));
1561            }
1562            state.last_transcript_budget_action = Some(audit);
1563            state.transcript = with_audit;
1564            if let Some(event) = compacted.live_event {
1565                crate::orchestration::emit_transcript_compacted_event_sync(
1566                    &state.id,
1567                    crate::orchestration::CompactMode::Auto,
1568                    crate::orchestration::CompactionTrigger::BudgetPressure
1569                        .as_str()
1570                        .to_string(),
1571                    &event.policy,
1572                    event.policy_strategy,
1573                    event.metrics,
1574                );
1575            }
1576            Ok(())
1577        }
1578    }
1579}
1580
1581fn emit_llm_message_event(session_id: &str, message_index: usize, message: &VmValue) {
1582    let mut fields = serde_json::Map::new();
1583    fields.insert(
1584        "session_id".to_string(),
1585        serde_json::Value::String(session_id.to_string()),
1586    );
1587    fields.insert(
1588        "message_index".to_string(),
1589        serde_json::json!(message_index),
1590    );
1591    let message_json = crate::llm::helpers::vm_value_to_json(message);
1592    if let Some(role) = message_json.get("role").and_then(|value| value.as_str()) {
1593        fields.insert(
1594            "role".to_string(),
1595            serde_json::Value::String(role.to_string()),
1596        );
1597    }
1598    if let Some(content) = message_json.get("content") {
1599        fields.insert("content".to_string(), content.clone());
1600    }
1601    fields.insert("message".to_string(), message_json);
1602    crate::llm::append_observability_sidecar_entry("message", fields);
1603}
1604
1605/// Create a new session from a reconstructed message list.
1606///
1607/// This is intentionally an all-at-once write instead of repeated
1608/// `inject_message` calls: importing a transcript should not re-emit
1609/// each historic turn into the active observability sidecar.
1610pub fn seed_from_messages(
1611    id: Option<String>,
1612    messages: &[serde_json::Value],
1613    metadata: serde_json::Value,
1614    system_prompt: Option<String>,
1615    tool_format: Option<String>,
1616) -> Result<String, String> {
1617    let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
1618    if exists(&resolved) {
1619        return Err(format!("agent session '{resolved}' already exists"));
1620    }
1621    open_or_create(Some(resolved.clone()));
1622    SESSIONS.with(|s| {
1623        let mut map = s.borrow_mut();
1624        let Some(state) = map.get_mut(&resolved) else {
1625            return Err(format!("failed to create agent session '{resolved}'"));
1626        };
1627        state.tool_format = tool_format.filter(|value| !value.trim().is_empty());
1628        state.system_prompt = system_prompt.filter(|value| !value.trim().is_empty());
1629
1630        let mut metadata = metadata
1631            .as_object()
1632            .cloned()
1633            .unwrap_or_else(serde_json::Map::new);
1634        if let Some(tool_format) = state.tool_format.as_ref() {
1635            metadata.insert(
1636                "tool_format".to_string(),
1637                serde_json::Value::String(tool_format.clone()),
1638            );
1639            metadata.insert(
1640                "tool_mode_locked".to_string(),
1641                serde_json::Value::Bool(true),
1642            );
1643        }
1644        if let Some(system_prompt) = state.system_prompt.as_ref() {
1645            metadata.insert(
1646                "system_prompt".to_string(),
1647                crate::llm::helpers::system_prompt_metadata(system_prompt),
1648            );
1649        }
1650        let vm_messages = crate::llm::helpers::json_messages_to_vm(messages);
1651        let candidate = crate::llm::helpers::new_transcript_with(
1652            Some(resolved.clone()),
1653            vm_messages,
1654            None,
1655            Some(crate::stdlib::json_to_vm_value(&serde_json::Value::Object(
1656                metadata,
1657            ))),
1658        );
1659        apply_transcript_with_budget(state, candidate, "seed_from_messages")?;
1660        state.last_accessed = Instant::now();
1661        Ok(resolved)
1662    })
1663}
1664
1665/// Load the messages vec (as JSON) for this session, for use as prefix
1666/// to an agent_loop run. Returns an empty vec if the session doesn't
1667/// exist or has no messages.
1668pub fn messages_json(id: &str) -> Vec<serde_json::Value> {
1669    SESSIONS.with(|s| {
1670        let map = s.borrow();
1671        let Some(state) = map.get(id) else {
1672            return Vec::new();
1673        };
1674        let Some(dict) = state.transcript.as_dict() else {
1675            return Vec::new();
1676        };
1677        match dict.get("messages") {
1678            Some(VmValue::List(list)) => list
1679                .iter()
1680                .map(crate::llm::helpers::vm_value_to_json)
1681                .collect(),
1682            _ => Vec::new(),
1683        }
1684    })
1685}
1686
1687#[derive(Clone, Debug, Default)]
1688pub struct SessionPromptState {
1689    pub messages: Vec<serde_json::Value>,
1690    pub summary: Option<String>,
1691}
1692
1693fn summary_message_json(summary: &str) -> serde_json::Value {
1694    serde_json::json!({
1695        "role": "user",
1696        "content": summary,
1697    })
1698}
1699
1700fn messages_begin_with_summary(messages: &[serde_json::Value], summary: &str) -> bool {
1701    messages.first().is_some_and(|message| {
1702        message.get("role").and_then(|value| value.as_str()) == Some("user")
1703            && message.get("content").and_then(|value| value.as_str()) == Some(summary)
1704    })
1705}
1706
1707/// Prompt-surface resume state for a persisted session.
1708///
1709/// Returns the compacted/rehydratable message list plus the transcript's
1710/// summary field. When the transcript carries a summary field but its
1711/// message list does not already begin with the compacted summary
1712/// message, this helper prepends one so session re-entry preserves the
1713/// same prompt surface the previous loop was actually using.
1714pub fn prompt_state_json(id: &str) -> SessionPromptState {
1715    SESSIONS.with(|s| {
1716        let map = s.borrow();
1717        let Some(state) = map.get(id) else {
1718            return SessionPromptState::default();
1719        };
1720        let Some(dict) = state.transcript.as_dict() else {
1721            return SessionPromptState::default();
1722        };
1723        let mut messages = match dict.get("messages") {
1724            Some(VmValue::List(list)) => list
1725                .iter()
1726                .map(crate::llm::helpers::vm_value_to_json)
1727                .collect::<Vec<_>>(),
1728            _ => Vec::new(),
1729        };
1730        let summary = dict.get("summary").and_then(|value| match value {
1731            VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
1732            _ => None,
1733        });
1734        if let Some(summary_text) = summary.as_deref() {
1735            if !messages_begin_with_summary(&messages, summary_text) {
1736                messages.insert(0, summary_message_json(summary_text));
1737            }
1738        }
1739        SessionPromptState { messages, summary }
1740    })
1741}
1742
1743/// Overwrite the transcript for this session. Used by `agent_loop` on
1744/// exit to persist the synthesized transcript.
1745pub fn store_transcript(id: &str, transcript: VmValue) -> Result<(), String> {
1746    SESSIONS.with(|s| {
1747        let mut map = s.borrow_mut();
1748        let Some(state) = map.get_mut(id) else {
1749            return Err(format!(
1750                "agent_session_store_transcript: unknown session id '{id}'"
1751            ));
1752        };
1753        let transcript = transcript_with_session_metadata(transcript, state);
1754        apply_transcript_with_budget(state, transcript, "store_transcript")?;
1755        state.last_accessed = Instant::now();
1756        Ok(())
1757    })
1758}
1759
1760/// Remove malformed reminder events after their drop audit has been emitted.
1761/// Pending-reminder rendering scans the transcript on every LLM call; pruning
1762/// invalid entries makes the drop event one-shot instead of noisy per turn.
1763pub fn prune_invalid_reminder_events(id: &str) -> usize {
1764    SESSIONS.with(|s| {
1765        let mut map = s.borrow_mut();
1766        let Some(state) = map.get_mut(id) else {
1767            return 0;
1768        };
1769        let Some(dict) = state.transcript.as_dict().cloned() else {
1770            return 0;
1771        };
1772        let Some(VmValue::List(events)) = dict.get("events") else {
1773            return 0;
1774        };
1775        let mut pruned = 0_usize;
1776        let mut kept = Vec::with_capacity(events.len());
1777        for event in events.iter().cloned() {
1778            let is_reminder = event
1779                .as_dict()
1780                .and_then(|event| event.get("kind"))
1781                .map(VmValue::display)
1782                .as_deref()
1783                == Some(crate::llm::helpers::SYSTEM_REMINDER_EVENT_KIND);
1784            if !is_reminder {
1785                kept.push(event);
1786                continue;
1787            }
1788            let valid = crate::llm::helpers::reminder_from_event(&event)
1789                .is_some_and(|reminder| !reminder.body.trim().is_empty());
1790            if valid {
1791                kept.push(event);
1792            } else {
1793                pruned += 1;
1794            }
1795        }
1796        if pruned > 0 {
1797            let mut next = dict;
1798            next.insert(
1799                "events".to_string(),
1800                VmValue::List(std::sync::Arc::new(kept)),
1801            );
1802            let _ = apply_transcript_with_budget(
1803                state,
1804                VmValue::Dict(std::sync::Arc::new(next)),
1805                "prune_invalid_reminder_events",
1806            );
1807            state.last_accessed = Instant::now();
1808        }
1809        pruned
1810    })
1811}
1812
1813/// Apply the reminder TTL lifecycle that runs once per completed agent
1814/// turn. Reminders with `ttl_turns = 1` expire and are removed; larger
1815/// finite TTLs are decremented in place. Expiry audit events are emitted
1816/// to the active EventLog when one is installed.
1817pub fn apply_reminder_post_turn(id: &str, turn: i64) -> Result<serde_json::Value, String> {
1818    let report = SESSIONS.with(|s| {
1819        let mut map = s.borrow_mut();
1820        let Some(state) = map.get_mut(id) else {
1821            return Err(format!(
1822                "agent_session_apply_reminder_post_turn: unknown session id '{id}'"
1823            ));
1824        };
1825        let report = crate::llm::helpers::apply_reminder_post_turn(&state.transcript, turn);
1826        if report.decremented_count > 0 || !report.expired.is_empty() {
1827            if let Some(next) = report.transcript.clone() {
1828                apply_transcript_with_budget(state, next, "apply_reminder_post_turn")?;
1829            }
1830            state.last_accessed = Instant::now();
1831        }
1832        Ok(report)
1833    })?;
1834
1835    for reminder in &report.expired {
1836        let mut payload = crate::llm::helpers::reminder_lifecycle_payload(Some(id), reminder);
1837        if let Some(obj) = payload.as_object_mut() {
1838            obj.insert(
1839                "transcript_id".to_string(),
1840                serde_json::Value::String(id.to_string()),
1841            );
1842            obj.insert(
1843                "reason".to_string(),
1844                serde_json::Value::String("ttl".to_string()),
1845            );
1846            obj.insert(
1847                "ttl_turns_before".to_string(),
1848                serde_json::json!(&reminder.ttl_turns),
1849            );
1850            obj.insert("expired_at_turn".to_string(), serde_json::json!(turn));
1851        }
1852        crate::llm::helpers::emit_reminder_lifecycle_event(
1853            crate::llm::helpers::REMINDER_EXPIRED_EVENT_KIND,
1854            payload,
1855        );
1856    }
1857
1858    Ok(serde_json::json!({
1859        "expired_count": report.expired.len(),
1860        "decremented_count": report.decremented_count,
1861        "remaining_count": report.remaining_count,
1862    }))
1863}
1864
1865/// Inject a typed system reminder into the session transcript's event
1866/// stream. This mirrors `transcript.inject_reminder` for live sessions:
1867/// reminders with the same `dedupe_key` are replaced before the new
1868/// reminder event is appended.
1869pub fn inject_reminder(
1870    id: &str,
1871    reminder: crate::llm::helpers::SystemReminder,
1872) -> Result<ReminderInjectionReport, String> {
1873    let reminder_id = reminder.id.clone();
1874    let dedupe_key = reminder.dedupe_key.clone();
1875    let mut deduped_reminder_ids = Vec::new();
1876    SESSIONS.with(|s| {
1877        let mut map = s.borrow_mut();
1878        let Some(state) = map.get_mut(id) else {
1879            return Err(format!(
1880                "agent_session_inject_reminder: unknown session id '{id}'"
1881            ));
1882        };
1883        let dict = state
1884            .transcript
1885            .as_dict()
1886            .cloned()
1887            .unwrap_or_else(BTreeMap::new);
1888        let mut events: Vec<VmValue> = match dict.get("events") {
1889            Some(VmValue::List(list)) => list.iter().cloned().collect(),
1890            _ => dict
1891                .get("messages")
1892                .and_then(|value| match value {
1893                    VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
1894                    _ => None,
1895                })
1896                .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
1897                .unwrap_or_default(),
1898        };
1899        if let Some(expected_key) = dedupe_key.as_deref() {
1900            events.retain(|event| {
1901                let Some(existing) = crate::llm::helpers::reminder_from_event(event) else {
1902                    return true;
1903                };
1904                if existing.dedupe_key.as_deref() == Some(expected_key) {
1905                    deduped_reminder_ids.push(existing.id);
1906                    false
1907                } else {
1908                    true
1909                }
1910            });
1911        }
1912        events.push(crate::llm::helpers::transcript_reminder_event(&reminder));
1913        let mut next = dict;
1914        next.insert(
1915            "events".to_string(),
1916            VmValue::List(std::sync::Arc::new(events)),
1917        );
1918        apply_transcript_with_budget(
1919            state,
1920            VmValue::Dict(std::sync::Arc::new(next)),
1921            "inject_reminder",
1922        )?;
1923        state.last_accessed = Instant::now();
1924        Ok(())
1925    })?;
1926
1927    if !deduped_reminder_ids.is_empty() {
1928        let dropped_count = deduped_reminder_ids.len();
1929        crate::llm::helpers::emit_reminder_lifecycle_event(
1930            crate::llm::helpers::REMINDER_DEDUPED_EVENT_KIND,
1931            serde_json::json!({
1932                "session_id": id,
1933                "transcript_id": id,
1934                "reminder_id": &reminder_id,
1935                "replacing_id": &reminder_id,
1936                "replaced_id": deduped_reminder_ids.first(),
1937                "replaced_ids": &deduped_reminder_ids,
1938                "dedupe_key": &dedupe_key,
1939                "dropped_reminder_ids": &deduped_reminder_ids,
1940                "dropped_count": dropped_count,
1941            }),
1942        );
1943    }
1944
1945    crate::llm::helpers::emit_reminder_lifecycle_event(
1946        crate::llm::helpers::REMINDER_INJECTED_EVENT_KIND,
1947        crate::llm::helpers::reminder_lifecycle_payload(Some(id), &reminder),
1948    );
1949
1950    Ok(ReminderInjectionReport {
1951        reminder_id,
1952        deduped_count: deduped_reminder_ids.len(),
1953    })
1954}
1955
1956/// Append a transcript event to the session without mutating its
1957/// message list. Used for orchestration-side lineage events (sub-agent
1958/// spawn/completion, workflow hooks, etc.) that should survive
1959/// persistence/replay without being replayed back into the model as
1960/// conversational messages.
1961pub fn append_event(id: &str, event: VmValue) -> Result<(), String> {
1962    let Some(event_dict) = event.as_dict() else {
1963        return Err("agent_session_append_event: event must be a dict".into());
1964    };
1965    let kind_ok = matches!(event_dict.get("kind"), Some(VmValue::String(_)));
1966    if !kind_ok {
1967        return Err("agent_session_append_event: event must have a string `kind`".into());
1968    }
1969    SESSIONS.with(|s| {
1970        let mut map = s.borrow_mut();
1971        let Some(state) = map.get_mut(id) else {
1972            return Err(format!(
1973                "agent_session_append_event: unknown session id '{id}'"
1974            ));
1975        };
1976        append_event_to_state(state, event, "append_event")?;
1977        state.last_accessed = Instant::now();
1978        Ok(())
1979    })
1980}
1981
1982fn append_event_to_state(
1983    state: &mut SessionState,
1984    event: VmValue,
1985    action: &str,
1986) -> Result<(), String> {
1987    let dict = state
1988        .transcript
1989        .as_dict()
1990        .cloned()
1991        .unwrap_or_else(BTreeMap::new);
1992    let mut events: Vec<VmValue> = match dict.get("events") {
1993        Some(VmValue::List(list)) => list.iter().cloned().collect(),
1994        _ => dict
1995            .get("messages")
1996            .and_then(|value| match value {
1997                VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
1998                _ => None,
1999            })
2000            .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
2001            .unwrap_or_default(),
2002    };
2003    events.push(event);
2004    let mut next = dict;
2005    next.insert(
2006        "events".to_string(),
2007        VmValue::List(std::sync::Arc::new(events)),
2008    );
2009    apply_transcript_with_budget(state, VmValue::Dict(std::sync::Arc::new(next)), action)
2010}
2011
2012/// Replace the transcript's message list wholesale. Used by the
2013/// in-loop compaction path, which operates on JSON messages.
2014pub fn replace_messages(id: &str, messages: &[serde_json::Value]) -> Result<(), String> {
2015    replace_messages_with_summary(id, messages, None)
2016}
2017
2018/// Replace the transcript's message list and optionally update the
2019/// `summary` field on the persisted transcript. The compaction path
2020/// uses this to publish the human-readable rollup line that
2021/// `transcript_summary(transcript)` exposes to host code.
2022pub fn replace_messages_with_summary(
2023    id: &str,
2024    messages: &[serde_json::Value],
2025    summary: Option<&str>,
2026) -> Result<(), String> {
2027    SESSIONS.with(|s| {
2028        let mut map = s.borrow_mut();
2029        let Some(state) = map.get_mut(id) else {
2030            return Err(format!(
2031                "agent_session_replace_messages: unknown session id '{id}'"
2032            ));
2033        };
2034        let dict = state
2035            .transcript
2036            .as_dict()
2037            .cloned()
2038            .unwrap_or_else(BTreeMap::new);
2039        let vm_messages: Vec<VmValue> = messages
2040            .iter()
2041            .map(crate::stdlib::json_to_vm_value)
2042            .collect();
2043        let mut next = dict;
2044        next.insert(
2045            "events".to_string(),
2046            VmValue::List(std::sync::Arc::new(
2047                crate::llm::helpers::transcript_events_from_messages(&vm_messages),
2048            )),
2049        );
2050        next.insert(
2051            "messages".to_string(),
2052            VmValue::List(std::sync::Arc::new(vm_messages)),
2053        );
2054        if let Some(summary) = summary {
2055            next.insert(
2056                "summary".to_string(),
2057                VmValue::String(std::sync::Arc::from(summary.to_string())),
2058            );
2059        } else {
2060            next.remove("summary");
2061        }
2062        apply_transcript_with_budget(
2063            state,
2064            VmValue::Dict(std::sync::Arc::new(next)),
2065            "replace_messages",
2066        )?;
2067        state.last_accessed = Instant::now();
2068        Ok(())
2069    })
2070}
2071
2072pub fn append_subscriber(id: &str, callback: VmValue) {
2073    open_or_create(Some(id.to_string()));
2074    SESSIONS.with(|s| {
2075        if let Some(state) = s.borrow_mut().get_mut(id) {
2076            state.subscribers.push(callback);
2077            state.last_accessed = Instant::now();
2078        }
2079    });
2080}
2081
2082pub fn subscribers_for(id: &str) -> Vec<VmValue> {
2083    SESSIONS.with(|s| {
2084        s.borrow()
2085            .get(id)
2086            .map(|state| state.subscribers.clone())
2087            .unwrap_or_default()
2088    })
2089}
2090
2091pub fn subscriber_count(id: &str) -> usize {
2092    SESSIONS.with(|s| {
2093        s.borrow()
2094            .get(id)
2095            .map(|state| state.subscribers.len())
2096            .unwrap_or(0)
2097    })
2098}
2099
2100/// Persist the set of active skill names for session resume. Called at
2101/// the end of an agent_loop run; the next `open_or_create` for this id
2102/// reads them back via [`active_skills`].
2103pub fn set_active_skills(id: &str, skills: Vec<String>) {
2104    SESSIONS.with(|s| {
2105        if let Some(state) = s.borrow_mut().get_mut(id) {
2106            state.active_skills = skills;
2107            state.last_accessed = Instant::now();
2108        }
2109    });
2110}
2111
2112/// Skills that were active at the end of the previous agent_loop run
2113/// against this session. Returns an empty vec when the session doesn't
2114/// exist or nothing was persisted.
2115pub fn active_skills(id: &str) -> Vec<String> {
2116    SESSIONS.with(|s| {
2117        s.borrow()
2118            .get(id)
2119            .map(|state| state.active_skills.clone())
2120            .unwrap_or_default()
2121    })
2122}
2123
2124/// Claim the tool-calling contract for a session.
2125///
2126/// The first loop against a named session records its `tool_format`.
2127/// Later re-entry must use the same format so prompt/history generated
2128/// under a text contract is never replayed as native, or vice versa.
2129pub fn claim_tool_format(id: &str, tool_format: &str) -> Result<(), String> {
2130    let tool_format = tool_format.trim();
2131    if tool_format.is_empty() {
2132        return Ok(());
2133    }
2134    SESSIONS.with(|s| {
2135        let mut map = s.borrow_mut();
2136        let Some(state) = map.get_mut(id) else {
2137            return Err(format!("agent session '{id}' does not exist"));
2138        };
2139        match state.tool_format.as_deref() {
2140            Some(existing) if existing != tool_format => Err(format!(
2141                "agent session '{id}' is locked to tool_format='{existing}', but this run requested tool_format='{tool_format}'. Start a new session or fork/reset the transcript before changing tool mode."
2142            )),
2143            Some(_) => {
2144                state.last_accessed = Instant::now();
2145                Ok(())
2146            }
2147            None => {
2148                state.tool_format = Some(tool_format.to_string());
2149                state.last_accessed = Instant::now();
2150                Ok(())
2151            }
2152        }
2153    })
2154}
2155
2156pub fn tool_format(id: &str) -> Option<String> {
2157    SESSIONS.with(|s| {
2158        s.borrow()
2159            .get(id)
2160            .and_then(|state| state.tool_format.clone())
2161    })
2162}
2163
2164pub fn record_system_prompt(id: &str, system_prompt: &str) -> Result<(), String> {
2165    let system_prompt = system_prompt.trim();
2166    if system_prompt.is_empty() {
2167        return Ok(());
2168    }
2169    assert_cache_stable_system_prompt(system_prompt);
2170    SESSIONS.with(|s| {
2171        let mut map = s.borrow_mut();
2172        let Some(state) = map.get_mut(id) else {
2173            return Err(format!("agent session '{id}' does not exist"));
2174        };
2175        let changed = state.system_prompt.as_deref() != Some(system_prompt);
2176        state.system_prompt = Some(system_prompt.to_string());
2177        let dict = state
2178            .transcript
2179            .as_dict()
2180            .cloned()
2181            .unwrap_or_else(BTreeMap::new);
2182        let mut next = dict;
2183        apply_system_prompt_metadata(&mut next, system_prompt);
2184        if changed {
2185            let mut events: Vec<VmValue> = match next.get("events") {
2186                Some(VmValue::List(list)) => list.iter().cloned().collect(),
2187                _ => Vec::new(),
2188            };
2189            events.push(crate::llm::helpers::transcript_event(
2190                "system_prompt",
2191                "system",
2192                "internal",
2193                "",
2194                Some(crate::llm::helpers::system_prompt_event_metadata(
2195                    system_prompt,
2196                )),
2197            ));
2198            next.insert(
2199                "events".to_string(),
2200                VmValue::List(std::sync::Arc::new(events)),
2201            );
2202        }
2203        apply_transcript_with_budget(
2204            state,
2205            VmValue::Dict(std::sync::Arc::new(next)),
2206            "record_system_prompt",
2207        )?;
2208        state.last_accessed = Instant::now();
2209        Ok(())
2210    })
2211}
2212
2213pub fn system_prompt(id: &str) -> Option<String> {
2214    SESSIONS.with(|s| {
2215        s.borrow()
2216            .get(id)
2217            .and_then(|state| state.system_prompt.clone())
2218    })
2219}
2220
2221#[cfg(debug_assertions)]
2222fn forbidden_workspace_prompt_token(system_prompt: &str) -> Option<&'static str> {
2223    let mut remaining = system_prompt;
2224    while let Some(index) = remaining.find("{{") {
2225        let candidate = remaining[index + 2..].trim_start();
2226        if candidate.starts_with("workspace_") {
2227            return Some("workspace_");
2228        }
2229        if candidate.starts_with("project_") {
2230            return Some("project_");
2231        }
2232        remaining = candidate;
2233    }
2234    None
2235}
2236
2237#[cfg(debug_assertions)]
2238fn assert_cache_stable_system_prompt(system_prompt: &str) {
2239    if let Some(prefix) = forbidden_workspace_prompt_token(system_prompt) {
2240        panic!(
2241            "{CACHE_STABLE_SYSTEM_PROMPT_DIAGNOSTIC}: session system prompts must not interpolate `{{{{{prefix}...` tokens; move workspace/project context into the workspace-anchor reminder"
2242        );
2243    }
2244}
2245
2246#[cfg(not(debug_assertions))]
2247fn assert_cache_stable_system_prompt(_system_prompt: &str) {}
2248
2249/// Pin (or clear, with `None`) a model selector on a session. Returns
2250/// `Ok(true)` when the value actually changed so callers can decide
2251/// whether to broadcast a notification. The selector is stored verbatim
2252/// — alias / catalog resolution is the call-site's job.
2253pub fn set_pinned_model(id: &str, model: Option<String>) -> Result<bool, String> {
2254    let normalized = model
2255        .map(|value| value.trim().to_string())
2256        .filter(|value| !value.is_empty());
2257    SESSIONS.with(|s| {
2258        let mut map = s.borrow_mut();
2259        let Some(state) = map.get_mut(id) else {
2260            return Err(format!("agent session '{id}' does not exist"));
2261        };
2262        let changed = state.pinned_model != normalized;
2263        state.pinned_model = normalized;
2264        state.last_accessed = Instant::now();
2265        Ok(changed)
2266    })
2267}
2268
2269/// Read the session's pinned model selector, if any. Consumed by
2270/// `vm_resolve_model` as the per-session default when a script-level
2271/// `llm_call` does not pass `model:` explicitly.
2272pub fn pinned_model(id: &str) -> Option<String> {
2273    SESSIONS.with(|s| {
2274        s.borrow()
2275            .get(id)
2276            .and_then(|state| state.pinned_model.clone())
2277    })
2278}
2279
2280/// Pin (or clear) the session-level provider-aware reasoning policy.
2281pub fn set_pinned_reasoning_policy(id: &str, policy: Option<String>) -> Result<bool, String> {
2282    let normalized = match policy {
2283        Some(value) => crate::llm::reasoning_policy::normalize_policy_selector(&value)?,
2284        None => None,
2285    };
2286    SESSIONS.with(|s| {
2287        let mut map = s.borrow_mut();
2288        let Some(state) = map.get_mut(id) else {
2289            return Err(format!("agent session '{id}' does not exist"));
2290        };
2291        let changed = state.pinned_reasoning_policy != normalized;
2292        state.pinned_reasoning_policy = normalized;
2293        state.last_accessed = Instant::now();
2294        Ok(changed)
2295    })
2296}
2297
2298/// Read the session's pinned reasoning policy, if any.
2299pub fn pinned_reasoning_policy(id: &str) -> Option<String> {
2300    SESSIONS.with(|s| {
2301        s.borrow()
2302            .get(id)
2303            .and_then(|state| state.pinned_reasoning_policy.clone())
2304    })
2305}
2306
2307/// Set (or clear, with `None`) the typed workspace anchor on a session.
2308/// Returns `Ok(true)` when the value actually changed so callers can
2309/// decide whether to broadcast `AnchorChanged` notifications.
2310pub fn set_workspace_anchor(id: &str, anchor: Option<WorkspaceAnchor>) -> Result<bool, String> {
2311    SESSIONS.with(|s| {
2312        let mut map = s.borrow_mut();
2313        let Some(state) = map.get_mut(id) else {
2314            return Err(format!("agent session '{id}' does not exist"));
2315        };
2316        let changed = state.workspace_anchor != anchor;
2317        state.workspace_anchor = anchor;
2318        if changed {
2319            crate::llm::permissions::clear_session_grants(id);
2320        }
2321        state.last_accessed = Instant::now();
2322        Ok(changed)
2323    })
2324}
2325
2326/// Read the session's typed workspace anchor, if any.
2327pub fn workspace_anchor(id: &str) -> Option<WorkspaceAnchor> {
2328    SESSIONS.with(|s| {
2329        s.borrow()
2330            .get(id)
2331            .and_then(|state| state.workspace_anchor.clone())
2332    })
2333}
2334
2335/// Outcome of `reanchor_session`: previous + new anchor and whether the
2336/// swap actually moved anything. Callers use `changed` to suppress
2337/// no-op transcript / live events.
2338#[derive(Clone, Debug, PartialEq, Eq)]
2339pub struct ReanchorOutcome {
2340    pub previous: Option<WorkspaceAnchor>,
2341    pub current: WorkspaceAnchor,
2342    pub changed: bool,
2343}
2344
2345/// Atomically swap the session's primary anchor + emit the canonical
2346/// `AnchorChanged` transcript event and live `AgentEvent::AnchorChanged`
2347/// notification (#2218). Clears session-scoped permission grants so
2348/// stale anchor-based decisions don't leak into the next turn.
2349pub fn reanchor_session(
2350    id: &str,
2351    new_anchor: WorkspaceAnchor,
2352    carry_transcript: bool,
2353    compacted: bool,
2354    reason: Option<String>,
2355) -> Result<ReanchorOutcome, String> {
2356    let outcome = SESSIONS.with(|s| {
2357        let mut map = s.borrow_mut();
2358        let Some(state) = map.get_mut(id) else {
2359            return Err(format!("agent session '{id}' does not exist"));
2360        };
2361        let previous = state.workspace_anchor.clone();
2362        let changed = previous.as_ref() != Some(&new_anchor);
2363        state.workspace_anchor = Some(new_anchor.clone());
2364        if changed {
2365            crate::llm::permissions::clear_session_grants(id);
2366        }
2367        state.last_accessed = Instant::now();
2368        Ok(ReanchorOutcome {
2369            previous,
2370            current: new_anchor,
2371            changed,
2372        })
2373    })?;
2374    if !outcome.changed {
2375        return Ok(outcome);
2376    }
2377    let previous_json = outcome.previous.as_ref().map(WorkspaceAnchor::to_json);
2378    let current_json = outcome.current.to_json();
2379    let event_metadata = serde_json::json!({
2380        "previous": previous_json,
2381        "current": current_json,
2382        "carry_transcript": carry_transcript,
2383        "compacted": compacted,
2384        "reason": reason,
2385    });
2386    let event = crate::llm::helpers::transcript_event(
2387        "AnchorChanged",
2388        "system",
2389        "internal",
2390        "",
2391        Some(event_metadata),
2392    );
2393    let _ = append_event(id, event);
2394    crate::llm::emit_live_agent_event_sync(&crate::agent_events::AgentEvent::AnchorChanged {
2395        session_id: id.to_string(),
2396        previous: previous_json,
2397        current: current_json,
2398        carry_transcript,
2399        compacted,
2400        reason,
2401    });
2402    Ok(outcome)
2403}
2404
2405/// Set session-local workspace defaults. Returns `Ok(true)` when the
2406/// policy changed.
2407pub fn set_workspace_policy(id: &str, policy: WorkspacePolicy) -> Result<bool, String> {
2408    SESSIONS.with(|s| {
2409        let mut map = s.borrow_mut();
2410        let Some(state) = map.get_mut(id) else {
2411            return Err(format!("agent session '{id}' does not exist"));
2412        };
2413        let changed = state.workspace_policy != policy;
2414        state.workspace_policy = policy;
2415        state.last_accessed = Instant::now();
2416        Ok(changed)
2417    })
2418}
2419
2420/// Read the session's workspace policy, if the session exists.
2421pub fn workspace_policy(id: &str) -> Option<WorkspacePolicy> {
2422    SESSIONS.with(|s| {
2423        s.borrow()
2424            .get(id)
2425            .map(|state| state.workspace_policy.clone())
2426    })
2427}
2428
2429/// Validate and mount an additional workspace root on an anchored
2430/// session. When the path is already mounted, updates its mount mode
2431/// in place and refreshes its `mounted_at` timestamp.
2432pub fn add_workspace_root(
2433    id: &str,
2434    root: &str,
2435    mount_mode: Option<MountMode>,
2436    reason: Option<String>,
2437) -> Result<String, String> {
2438    let normalized_root = validate_workspace_root_path(root)?;
2439    let mounted_at = crate::orchestration::now_rfc3339();
2440    SESSIONS.with(|s| {
2441        let mut map = s.borrow_mut();
2442        let Some(state) = map.get_mut(id) else {
2443            return Err(format!("agent session '{id}' does not exist"));
2444        };
2445        let default_mount_mode = state.workspace_policy.default_mount_mode;
2446        let Some(anchor) = state.workspace_anchor.as_mut() else {
2447            return Err(format!("agent session '{id}' has no workspace anchor"));
2448        };
2449        let resolved_mount_mode = mount_mode.unwrap_or(default_mount_mode);
2450        if let Some(existing) = anchor
2451            .additional_roots
2452            .iter_mut()
2453            .find(|entry| entry.path == normalized_root)
2454        {
2455            existing.mount_mode = resolved_mount_mode;
2456            existing.mounted_at = mounted_at.clone();
2457        } else {
2458            anchor.additional_roots.push(MountedRoot {
2459                path: normalized_root.clone(),
2460                mount_mode: resolved_mount_mode,
2461                mounted_at: mounted_at.clone(),
2462            });
2463        }
2464        let event = crate::llm::helpers::transcript_event(
2465            "RootMounted",
2466            "system",
2467            "internal",
2468            "",
2469            Some(serde_json::json!({
2470                "path": normalized_root.to_string_lossy(),
2471                "mount_mode": resolved_mount_mode.as_str(),
2472                "mounted_at": mounted_at.clone(),
2473                "reason": reason,
2474            })),
2475        );
2476        append_event_to_state(state, event, "add_workspace_root")?;
2477        crate::llm::permissions::clear_session_grants(id);
2478        state.last_accessed = Instant::now();
2479        Ok(mounted_at.clone())
2480    })
2481}
2482
2483/// Remove one mounted root from an anchored session. Returns whether an
2484/// existing mount entry was deleted. Removing an absent root is a no-op.
2485pub fn remove_workspace_root(id: &str, root: &str) -> Result<bool, String> {
2486    let normalized_root = normalize_workspace_root_path(root);
2487    SESSIONS.with(|s| {
2488        let mut map = s.borrow_mut();
2489        let Some(state) = map.get_mut(id) else {
2490            return Err(format!("agent session '{id}' does not exist"));
2491        };
2492        let Some(anchor) = state.workspace_anchor.as_mut() else {
2493            return Err(format!("agent session '{id}' has no workspace anchor"));
2494        };
2495        let before = anchor.additional_roots.len();
2496        anchor
2497            .additional_roots
2498            .retain(|entry| entry.path != normalized_root);
2499        let removed = anchor.additional_roots.len() != before;
2500        if removed {
2501            crate::llm::permissions::clear_session_grants(id);
2502        }
2503        state.last_accessed = Instant::now();
2504        Ok(removed)
2505    })
2506}
2507
2508pub fn list_workspace_roots(id: &str) -> Result<(PathBuf, Vec<MountedRoot>), String> {
2509    SESSIONS.with(|s| {
2510        let map = s.borrow();
2511        let Some(state) = map.get(id) else {
2512            return Err(format!("agent session '{id}' does not exist"));
2513        };
2514        let Some(anchor) = state.workspace_anchor.as_ref() else {
2515            return Err(format!("agent session '{id}' has no workspace anchor"));
2516        };
2517        Ok((anchor.primary.clone(), anchor.additional_roots.clone()))
2518    })
2519}
2520
2521fn validate_workspace_root_path(root: &str) -> Result<PathBuf, String> {
2522    let normalized = normalize_workspace_root_path(root);
2523    let canonical = std::fs::canonicalize(&normalized)
2524        .map_err(|error| format!("workspace root '{root}' must exist and be readable: {error}"))?;
2525    let metadata = std::fs::metadata(&canonical)
2526        .map_err(|error| format!("workspace root '{root}' must exist and be readable: {error}"))?;
2527    if !metadata.is_dir() {
2528        return Err(format!("workspace root '{root}' must be a directory"));
2529    }
2530    std::fs::read_dir(&canonical)
2531        .map_err(|error| format!("workspace root '{root}' must be readable: {error}"))?;
2532    Ok(canonical)
2533}
2534
2535fn normalize_workspace_root_path(root: &str) -> PathBuf {
2536    let absolute = crate::stdlib::process::normalize_context_path(Path::new(root));
2537    std::fs::canonicalize(&absolute).unwrap_or(absolute)
2538}
2539
2540fn empty_transcript(id: &str) -> VmValue {
2541    use crate::llm::helpers::new_transcript_with;
2542    new_transcript_with(Some(id.to_string()), Vec::new(), None, None)
2543}
2544
2545fn clone_transcript_with_id(transcript: &VmValue, new_id: &str) -> VmValue {
2546    let Some(dict) = transcript.as_dict() else {
2547        return empty_transcript(new_id);
2548    };
2549    let mut next = dict.clone();
2550    next.insert(
2551        "id".to_string(),
2552        VmValue::String(std::sync::Arc::from(new_id.to_string())),
2553    );
2554    VmValue::Dict(std::sync::Arc::new(next))
2555}
2556
2557fn clone_transcript_with_parent(transcript: &VmValue, parent_id: &str) -> VmValue {
2558    let Some(dict) = transcript.as_dict() else {
2559        return transcript.clone();
2560    };
2561    let mut next = dict.clone();
2562    let metadata = match next.get("metadata") {
2563        Some(VmValue::Dict(metadata)) => {
2564            let mut metadata = metadata.as_ref().clone();
2565            metadata.insert(
2566                "parent_session_id".to_string(),
2567                VmValue::String(std::sync::Arc::from(parent_id.to_string())),
2568            );
2569            VmValue::Dict(std::sync::Arc::new(metadata))
2570        }
2571        _ => VmValue::Dict(std::sync::Arc::new(BTreeMap::from([(
2572            "parent_session_id".to_string(),
2573            VmValue::String(std::sync::Arc::from(parent_id.to_string())),
2574        )]))),
2575    };
2576    next.insert("metadata".to_string(), metadata);
2577    VmValue::Dict(std::sync::Arc::new(next))
2578}
2579
2580fn apply_system_prompt_metadata(next: &mut BTreeMap<String, VmValue>, system_prompt: &str) {
2581    let mut metadata = match next.get("metadata") {
2582        Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
2583        _ => BTreeMap::new(),
2584    };
2585    metadata.insert(
2586        "system_prompt".to_string(),
2587        crate::stdlib::json_to_vm_value(&crate::llm::helpers::system_prompt_metadata(
2588            system_prompt,
2589        )),
2590    );
2591    next.insert(
2592        "metadata".to_string(),
2593        VmValue::Dict(std::sync::Arc::new(metadata)),
2594    );
2595}
2596
2597fn transcript_with_session_metadata(transcript: VmValue, state: &SessionState) -> VmValue {
2598    let Some(dict) = transcript.as_dict() else {
2599        return transcript;
2600    };
2601    let mut next = dict.clone();
2602    let mut metadata = match next.get("metadata") {
2603        Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
2604        _ => BTreeMap::new(),
2605    };
2606    if let Some(tool_format) = state.tool_format.as_ref() {
2607        metadata.insert(
2608            "tool_format".to_string(),
2609            VmValue::String(std::sync::Arc::from(tool_format.clone())),
2610        );
2611        metadata.insert("tool_mode_locked".to_string(), VmValue::Bool(true));
2612    }
2613    if let Some(system_prompt) = state.system_prompt.as_ref() {
2614        metadata.insert(
2615            "system_prompt".to_string(),
2616            crate::stdlib::json_to_vm_value(&crate::llm::helpers::system_prompt_metadata(
2617                system_prompt,
2618            )),
2619        );
2620    }
2621    if let Some(anchor) = state.workspace_anchor.as_ref() {
2622        metadata.insert(
2623            WORKSPACE_ANCHOR_METADATA_KEY.to_string(),
2624            anchor.to_vm_value(),
2625        );
2626    } else {
2627        metadata.remove(WORKSPACE_ANCHOR_METADATA_KEY);
2628    }
2629    if let Some(scratchpad) = state.scratchpad.as_ref() {
2630        metadata.insert("agent_scratchpad".to_string(), scratchpad.clone());
2631        metadata.insert(
2632            "agent_scratchpad_version".to_string(),
2633            VmValue::Int(state.scratchpad_version as i64),
2634        );
2635    } else {
2636        metadata.remove("agent_scratchpad");
2637        metadata.remove("agent_scratchpad_version");
2638    }
2639    if let Some(last_action) = state.last_transcript_budget_action.as_ref() {
2640        let usage = transcript_usage(
2641            &VmValue::Dict(std::sync::Arc::new(next.clone())),
2642            state.transcript_budget_policy.max_approx_bytes.is_some(),
2643        );
2644        metadata.insert(
2645            "transcript_budget".to_string(),
2646            crate::stdlib::json_to_vm_value(&serde_json::json!({
2647                "policy": transcript_budget_policy_json(&state.transcript_budget_policy.normalized()),
2648                "usage": transcript_budget_usage_json(&usage),
2649                "last_action": last_action,
2650            })),
2651        );
2652    }
2653    if !metadata.is_empty() {
2654        next.insert(
2655            "metadata".to_string(),
2656            VmValue::Dict(std::sync::Arc::new(metadata)),
2657        );
2658    }
2659    VmValue::Dict(std::sync::Arc::new(next))
2660}
2661
2662fn session_snapshot(state: &SessionState) -> VmValue {
2663    let transcript = transcript_with_session_metadata(state.transcript.clone(), state);
2664    let Some(dict) = transcript.as_dict() else {
2665        return state.transcript.clone();
2666    };
2667    let mut next = dict.clone();
2668    let length = next
2669        .get("messages")
2670        .and_then(|value| match value {
2671            VmValue::List(list) => Some(list.len() as i64),
2672            _ => None,
2673        })
2674        .unwrap_or(0);
2675    next.insert("length".to_string(), VmValue::Int(length));
2676    next.insert(
2677        "created_at".to_string(),
2678        VmValue::String(std::sync::Arc::from(state.created_at.clone())),
2679    );
2680    next.insert(
2681        "parent_id".to_string(),
2682        state
2683            .parent_id
2684            .as_ref()
2685            .map(|id| VmValue::String(std::sync::Arc::from(id.clone())))
2686            .unwrap_or(VmValue::Nil),
2687    );
2688    next.insert(
2689        "child_ids".to_string(),
2690        VmValue::List(std::sync::Arc::new(
2691            state
2692                .child_ids
2693                .iter()
2694                .cloned()
2695                .map(|id| VmValue::String(std::sync::Arc::from(id)))
2696                .collect(),
2697        )),
2698    );
2699    next.insert(
2700        "branched_at_event_index".to_string(),
2701        state
2702            .branched_at_event_index
2703            .map(|index| VmValue::Int(index as i64))
2704            .unwrap_or(VmValue::Nil),
2705    );
2706    next.insert(
2707        "system_prompt".to_string(),
2708        state
2709            .system_prompt
2710            .as_ref()
2711            .map(|prompt| VmValue::String(std::sync::Arc::from(prompt.clone())))
2712            .unwrap_or(VmValue::Nil),
2713    );
2714    next.insert(
2715        "tool_format".to_string(),
2716        state
2717            .tool_format
2718            .as_ref()
2719            .map(|format| VmValue::String(std::sync::Arc::from(format.clone())))
2720            .unwrap_or(VmValue::Nil),
2721    );
2722    next.insert(
2723        "pinned_model".to_string(),
2724        state
2725            .pinned_model
2726            .as_ref()
2727            .map(|model| VmValue::String(std::sync::Arc::from(model.clone())))
2728            .unwrap_or(VmValue::Nil),
2729    );
2730    next.insert(
2731        "pinned_reasoning_policy".to_string(),
2732        state
2733            .pinned_reasoning_policy
2734            .as_ref()
2735            .map(|policy| VmValue::String(std::sync::Arc::from(policy.clone())))
2736            .unwrap_or(VmValue::Nil),
2737    );
2738    next.insert(
2739        "scratchpad".to_string(),
2740        state.scratchpad.clone().unwrap_or(VmValue::Nil),
2741    );
2742    next.insert(
2743        "scratchpad_version".to_string(),
2744        VmValue::Int(state.scratchpad_version as i64),
2745    );
2746    next.insert(
2747        "workspace_anchor".to_string(),
2748        state
2749            .workspace_anchor
2750            .as_ref()
2751            .map(WorkspaceAnchor::to_vm_value)
2752            .unwrap_or(VmValue::Nil),
2753    );
2754    next.insert(
2755        "workspace_policy".to_string(),
2756        state.workspace_policy.to_vm_value(),
2757    );
2758    VmValue::Dict(std::sync::Arc::new(next))
2759}
2760
2761fn update_lineage(
2762    map: &mut HashMap<String, SessionState>,
2763    parent_id: &str,
2764    child_id: &str,
2765    branched_at_event_index: Option<usize>,
2766) {
2767    let old_parent_id = map.get(child_id).and_then(|child| child.parent_id.clone());
2768    if let Some(old_parent_id) = old_parent_id.filter(|old_parent_id| old_parent_id != parent_id) {
2769        if let Some(old_parent) = map.get_mut(&old_parent_id) {
2770            old_parent.child_ids.retain(|id| id != child_id);
2771            old_parent.last_accessed = Instant::now();
2772        }
2773    }
2774    if let Some(parent) = map.get_mut(parent_id) {
2775        parent.last_accessed = Instant::now();
2776        if !parent.child_ids.iter().any(|id| id == child_id) {
2777            parent.child_ids.push(child_id.to_string());
2778        }
2779    }
2780    if let Some(child) = map.get_mut(child_id) {
2781        child.last_accessed = Instant::now();
2782        child.parent_id = Some(parent_id.to_string());
2783        child.branched_at_event_index = branched_at_event_index;
2784        child.transcript = clone_transcript_with_parent(&child.transcript, parent_id);
2785    }
2786}
2787
2788fn branch_event_index(transcript: &VmValue, keep_first: usize) -> usize {
2789    if keep_first == 0 {
2790        return 0;
2791    }
2792    let Some(dict) = transcript.as_dict() else {
2793        return keep_first;
2794    };
2795    let Some(VmValue::List(events)) = dict.get("events") else {
2796        return keep_first;
2797    };
2798    event_prefix_len_for_messages(events, keep_first)
2799}
2800
2801fn event_kind(event: &VmValue) -> Option<String> {
2802    event
2803        .as_dict()
2804        .and_then(|dict| dict.get("kind"))
2805        .map(VmValue::display)
2806}
2807
2808fn event_id(event: &VmValue) -> Option<String> {
2809    event
2810        .as_dict()
2811        .and_then(|dict| dict.get("id"))
2812        .map(VmValue::display)
2813}
2814
2815fn is_turn_event(event: &VmValue) -> bool {
2816    matches!(
2817        event_kind(event).as_deref(),
2818        Some("message" | "tool_result")
2819    )
2820}
2821
2822fn event_prefix_len_for_messages(events: &[VmValue], keep_first: usize) -> usize {
2823    if keep_first == 0 {
2824        return 0;
2825    }
2826    let mut retained_messages = 0usize;
2827    for (index, event) in events.iter().enumerate() {
2828        if is_turn_event(event) {
2829            retained_messages += 1;
2830            if retained_messages == keep_first {
2831                return index + 1;
2832            }
2833        }
2834    }
2835    events.len()
2836}
2837
2838fn turn_event_id_for_count(events: &[VmValue], keep_first: usize) -> Option<String> {
2839    if keep_first == 0 {
2840        return None;
2841    }
2842    let mut retained_messages = 0usize;
2843    for event in events {
2844        if is_turn_event(event) {
2845            retained_messages += 1;
2846            if retained_messages == keep_first {
2847                return event_id(event);
2848            }
2849        }
2850    }
2851    None
2852}
2853
2854#[cfg(test)]
2855#[path = "agent_sessions_tests.rs"]
2856mod tests;