Skip to main content

harn_vm/
agent_sessions.rs

1//! First-class session storage.
2//!
3//! A session owns three things:
4//!
5//! 1. A transcript dict (messages, events, summary, metadata, …).
6//! 2. Closure subscribers that fire on agent-loop events for this session.
7//! 3. Its own lifecycle (open, reset, fork, trim, compact, close).
8//!
9//! Storage is thread-local because session lifecycle and subscriber dispatch
10//! are owned by the current-thread agent-loop worker. The subscribers register,
11//! fire, and unregister on that same thread, keeping ordering deterministic.
12//!
13//! Lifecycle is explicit. Builtins (`agent_session_open`,
14//! `_reset`, `_fork`, `_fork_at`, `_close`, `_trim`, `_compact`,
15//! `_inject`, `_exists`, `_length`, `_snapshot`, `_ancestry`) drive
16//! the store directly — there is no "policy" config dict that
17//! performs lifecycle as a side effect.
18
19use std::cell::{Cell, RefCell};
20use std::collections::{BTreeMap, HashMap, HashSet};
21use std::future::Future;
22use std::path::{Path, PathBuf};
23use std::time::Instant;
24
25use crate::runtime_limits::RuntimeLimits;
26use crate::value::VmValue;
27use crate::workspace_anchor::{
28    MountMode, MountedRoot, WorkspaceAnchor, WorkspacePolicy, WORKSPACE_ANCHOR_METADATA_KEY,
29};
30
31const LIVE_CLIENT_EVENT_KIND: &str = "live_session_client";
32const LIVE_CLIENT_PERMISSION_EVENT_KIND: &str = "live_session_permission_route";
33
34/// Default cap on concurrent sessions per VM thread. Beyond this the
35/// least-recently-accessed session is evicted on the next `open`.
36pub const DEFAULT_SESSION_CAP: usize = RuntimeLimits::DEFAULT.max_agent_sessions;
37
38/// Default cap on retained prompt-visible messages per session. The
39/// limit is intentionally high enough for normal long-running agents
40/// while still bounding accidental unbounded growth.
41pub const DEFAULT_TRANSCRIPT_MESSAGE_CAP: usize = 4096;
42
43/// Default cap on retained transcript audit events per session. Events
44/// include message-derived entries plus orchestration lifecycle records.
45pub const DEFAULT_TRANSCRIPT_EVENT_CAP: usize = 32768;
46pub const MAX_SCRATCHPAD_BYTES: usize = 16 * 1024;
47#[cfg(debug_assertions)]
48const CACHE_STABLE_SYSTEM_PROMPT_DIAGNOSTIC: &str = "HARN-CACHE-001";
49
50#[derive(Clone, Debug, PartialEq, Eq)]
51pub enum TranscriptBudgetRecovery {
52    Reject,
53    Trim { keep_last: usize },
54    Compact { keep_last: usize },
55}
56
57#[derive(Clone, Debug, PartialEq, Eq)]
58pub struct SessionTranscriptBudgetPolicy {
59    pub max_messages: usize,
60    pub max_events: usize,
61    pub max_approx_bytes: Option<usize>,
62    pub recovery: TranscriptBudgetRecovery,
63}
64
65impl SessionTranscriptBudgetPolicy {
66    pub fn reject(max_messages: usize, max_events: usize) -> Self {
67        Self {
68            max_messages: max_messages.max(1),
69            max_events: max_events.max(1),
70            max_approx_bytes: None,
71            recovery: TranscriptBudgetRecovery::Reject,
72        }
73    }
74
75    pub fn trim(max_messages: usize, max_events: usize, keep_last: usize) -> Self {
76        Self {
77            max_messages: max_messages.max(1),
78            max_events: max_events.max(1),
79            max_approx_bytes: None,
80            recovery: TranscriptBudgetRecovery::Trim { keep_last },
81        }
82    }
83
84    pub fn compact(max_messages: usize, max_events: usize, keep_last: usize) -> Self {
85        Self {
86            max_messages: max_messages.max(1),
87            max_events: max_events.max(1),
88            max_approx_bytes: None,
89            recovery: TranscriptBudgetRecovery::Compact { keep_last },
90        }
91    }
92
93    pub fn with_max_approx_bytes(mut self, max_approx_bytes: Option<usize>) -> Self {
94        self.max_approx_bytes = max_approx_bytes.map(|limit| limit.max(1));
95        self
96    }
97
98    fn normalized(&self) -> Self {
99        Self {
100            max_messages: self.max_messages.max(1),
101            max_events: self.max_events.max(1),
102            max_approx_bytes: self.max_approx_bytes.map(|limit| limit.max(1)),
103            recovery: self.recovery.clone(),
104        }
105    }
106}
107
108impl Default for SessionTranscriptBudgetPolicy {
109    fn default() -> Self {
110        Self::reject(DEFAULT_TRANSCRIPT_MESSAGE_CAP, DEFAULT_TRANSCRIPT_EVENT_CAP)
111    }
112}
113
114pub struct SessionState {
115    pub id: String,
116    pub transcript: VmValue,
117    pub subscribers: Vec<VmValue>,
118    pub created_at: String,
119    pub last_accessed: Instant,
120    pub parent_id: Option<String>,
121    pub child_ids: Vec<String>,
122    pub branched_at_event_index: Option<usize>,
123    /// Names of skills that were active at the end of the most recent
124    /// `agent_loop` run on this session. Empty when no skills were
125    /// matched, when the skill system wasn't used, or when the
126    /// deactivation phase cleared them. Re-entering the session
127    /// restores these as the initial active set before matching runs.
128    pub active_skills: Vec<String>,
129    /// Tool-calling protocol claimed by the first agent loop that uses
130    /// this session. A transcript is only replayable under the same
131    /// contract that produced its prompt/history.
132    pub tool_format: Option<String>,
133    /// Stable session-level system prompt material. This is transcript
134    /// metadata, not a replay message: providers receive it through
135    /// their system/developer instruction channel on each call.
136    pub system_prompt: Option<String>,
137    /// Session-pinned model selector. When set, `llm_call` invocations
138    /// that do not pass an explicit `model:` option resolve to this
139    /// selector instead of `HARN_LLM_MODEL` / provider defaults. Mid-
140    /// session swap is exposed over ACP via `session/set_config_option`
141    /// (configId="model").
142    pub pinned_model: Option<String>,
143    /// Session-pinned high-level reasoning policy. When set, `llm_call`
144    /// invocations that do not pass explicit `thinking` or
145    /// `reasoning_effort` options resolve this provider-aware policy into
146    /// the route's native thinking shape. Exposed over ACP as
147    /// `session/set_config_option(configId="thought_level")`.
148    pub pinned_reasoning_policy: Option<String>,
149    /// Session-local workspace defaults. Persona and host policy layers
150    /// can update this without rewriting the current anchor.
151    pub workspace_policy: WorkspacePolicy,
152    /// Typed workspace anchor for the session. Primary path plus any
153    /// additional mounted roots; consumed by permission matchers, the
154    /// bundle exporter, and host-side cross-project handoff flows
155    /// (epic #2208). `None` until a host opens the session with one or
156    /// the ACP `reanchor` / `add_root` primitives populate it.
157    pub workspace_anchor: Option<WorkspaceAnchor>,
158    /// Small session-local working memory rendered into agent prompts by the
159    /// Harn stdlib agent loop. This is live state, not a replayed message.
160    pub scratchpad: Option<VmValue>,
161    pub scratchpad_version: u64,
162    pub transcript_budget_policy: SessionTranscriptBudgetPolicy,
163    pub last_transcript_budget_action: Option<serde_json::Value>,
164    pub live_clients: BTreeMap<String, LiveSessionClient>,
165    pub live_controller_id: Option<String>,
166    pub completed_turn_checkpoints: Vec<SessionTurnCheckpoint>,
167    pub redo_stack: Vec<SessionRedoEntry>,
168}
169
170impl SessionState {
171    fn new(id: String) -> Self {
172        let now = Instant::now();
173        let transcript = empty_transcript(&id);
174        Self {
175            id,
176            transcript,
177            subscribers: Vec::new(),
178            created_at: crate::orchestration::now_rfc3339(),
179            last_accessed: now,
180            parent_id: None,
181            child_ids: Vec::new(),
182            branched_at_event_index: None,
183            active_skills: Vec::new(),
184            tool_format: None,
185            system_prompt: None,
186            pinned_model: None,
187            pinned_reasoning_policy: None,
188            workspace_policy: WorkspacePolicy::default(),
189            workspace_anchor: None,
190            scratchpad: None,
191            scratchpad_version: 0,
192            transcript_budget_policy: default_transcript_budget_policy(),
193            last_transcript_budget_action: None,
194            live_clients: BTreeMap::new(),
195            live_controller_id: None,
196            completed_turn_checkpoints: Vec::new(),
197            redo_stack: Vec::new(),
198        }
199    }
200}
201
202#[derive(Clone, Debug, PartialEq, Eq)]
203pub enum LiveClientMode {
204    Observer,
205    Controller,
206}
207
208impl LiveClientMode {
209    fn as_str(&self) -> &'static str {
210        match self {
211            Self::Observer => "observer",
212            Self::Controller => "controller",
213        }
214    }
215}
216
217#[derive(Clone, Debug, PartialEq, Eq)]
218pub struct LiveSessionClient {
219    pub client_id: String,
220    pub mode: LiveClientMode,
221    pub attached_at: String,
222    pub last_seen_at: String,
223    pub prompt_injection: bool,
224    pub permission_routing: bool,
225    pub metadata: serde_json::Value,
226}
227
228#[derive(Clone, Debug, PartialEq, Eq)]
229pub struct AttachLiveClient {
230    pub client_id: String,
231    pub mode: LiveClientMode,
232    pub takeover: bool,
233    pub prompt_injection: bool,
234    pub permission_routing: bool,
235    pub metadata: serde_json::Value,
236}
237
238#[derive(Clone, Debug, PartialEq, Eq)]
239pub struct LiveClientChange {
240    pub client: Option<LiveSessionClient>,
241    pub previous_controller_id: Option<String>,
242    pub active_controller_id: Option<String>,
243    pub clients: Vec<LiveSessionClient>,
244}
245
246#[derive(Clone, Debug, PartialEq, Eq)]
247pub struct SessionAncestry {
248    pub parent_id: Option<String>,
249    pub child_ids: Vec<String>,
250    pub root_id: String,
251}
252
253#[derive(Clone, Debug, PartialEq, Eq)]
254pub struct SessionTruncateResult {
255    pub kept_turn_count: usize,
256    pub removed_turn_count: usize,
257    pub new_tip_turn_id: Option<String>,
258}
259
260#[derive(Clone, Debug, PartialEq, Eq)]
261pub struct SessionCheckpointSummary {
262    pub checkpoint_id: String,
263    pub before_message_count: usize,
264    pub after_message_count: usize,
265    pub fs_snapshot_ids: Vec<String>,
266}
267
268#[derive(Clone, Debug)]
269pub struct SessionTurnCheckpoint {
270    pub checkpoint_id: String,
271    pub completed_at: String,
272    pub before_transcript: VmValue,
273    pub after_transcript: VmValue,
274    pub before_message_count: usize,
275    pub after_message_count: usize,
276    pub fs_snapshot_ids: Vec<String>,
277}
278
279#[derive(Clone, Debug)]
280pub struct SessionRedoEntry {
281    pub checkpoint: SessionTurnCheckpoint,
282    pub redo_fs_snapshot_ids: Vec<String>,
283}
284
285#[derive(Clone, Debug, PartialEq, Eq)]
286pub enum SessionCheckpointError {
287    UnknownSession,
288    NoCheckpoint,
289    NoRedo,
290}
291
292#[derive(Clone, Debug, PartialEq, Eq)]
293pub struct SessionCheckpointOutcome {
294    pub status: &'static str,
295    pub checkpoint: SessionCheckpointSummary,
296    pub redo_fs_snapshot_ids: Vec<String>,
297}
298
299#[derive(Clone, Debug, PartialEq, Eq)]
300pub struct ReminderInjectionReport {
301    pub reminder_id: String,
302    pub deduped_count: usize,
303}
304
305thread_local! {
306    static SESSIONS: RefCell<HashMap<String, SessionState>> = RefCell::new(HashMap::new());
307    static SESSION_CAP: Cell<usize> = const { Cell::new(DEFAULT_SESSION_CAP) };
308    static DEFAULT_TRANSCRIPT_BUDGET_POLICY: RefCell<SessionTranscriptBudgetPolicy> =
309        RefCell::new(SessionTranscriptBudgetPolicy::default());
310    static CURRENT_SESSION_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
311    static CURRENT_TOOL_CALL_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
312}
313
314tokio::task_local! {
315    static CURRENT_TOOL_CALL_TASK: String;
316}
317
318pub struct CurrentSessionGuard {
319    active: bool,
320}
321
322impl Drop for CurrentSessionGuard {
323    fn drop(&mut self) {
324        if self.active {
325            pop_current_session();
326        }
327    }
328}
329
330/// RAII guard that scopes the active tool-call id for the running thread.
331///
332/// Set on entry to a tool dispatch and dropped on exit, so any hostlib
333/// builtin invoked under it (e.g. `tools/write_file`) can resolve the
334/// owning tool call without threading the id through every parameter.
335pub struct CurrentToolCallGuard {
336    active: bool,
337}
338
339impl Drop for CurrentToolCallGuard {
340    fn drop(&mut self) {
341        if self.active {
342            pop_current_tool_call();
343        }
344    }
345}
346
347/// Set the per-thread session cap. Primarily for tests; production VMs
348/// inherit the default.
349pub fn set_session_cap(cap: usize) {
350    SESSION_CAP.with(|c| c.set(cap.max(1)));
351}
352
353pub fn session_cap() -> usize {
354    SESSION_CAP.with(|c| c.get())
355}
356
357pub fn set_default_transcript_budget_policy(policy: SessionTranscriptBudgetPolicy) {
358    DEFAULT_TRANSCRIPT_BUDGET_POLICY.with(|cell| {
359        *cell.borrow_mut() = policy.normalized();
360    });
361}
362
363pub fn reset_default_transcript_budget_policy() {
364    set_default_transcript_budget_policy(SessionTranscriptBudgetPolicy::default());
365}
366
367pub fn default_transcript_budget_policy() -> SessionTranscriptBudgetPolicy {
368    DEFAULT_TRANSCRIPT_BUDGET_POLICY.with(|cell| cell.borrow().clone())
369}
370
371pub fn transcript_budget_policy(id: &str) -> Option<SessionTranscriptBudgetPolicy> {
372    SESSIONS.with(|s| {
373        s.borrow()
374            .get(id)
375            .map(|state| state.transcript_budget_policy.clone())
376    })
377}
378
379pub fn set_transcript_budget_policy(
380    id: &str,
381    policy: SessionTranscriptBudgetPolicy,
382) -> Result<(), String> {
383    SESSIONS.with(|s| {
384        let mut map = s.borrow_mut();
385        let Some(state) = map.get_mut(id) else {
386            return Err(format!("agent session '{id}' does not exist"));
387        };
388        let previous = state.transcript_budget_policy.clone();
389        let previous_action = state.last_transcript_budget_action.clone();
390        state.transcript_budget_policy = policy.normalized();
391        let candidate = state.transcript.clone();
392        if let Err(error) = apply_transcript_with_budget(state, candidate, "policy_update") {
393            state.transcript_budget_policy = previous;
394            state.last_transcript_budget_action = previous_action;
395            return Err(error);
396        }
397        state.last_accessed = Instant::now();
398        Ok(())
399    })
400}
401
402/// Clear the session store. Wired into `reset_llm_state` for test isolation.
403pub fn reset_session_store() {
404    SESSIONS.with(|s| s.borrow_mut().clear());
405    CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().clear());
406    CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow_mut().clear());
407    reset_default_transcript_budget_policy();
408}
409
410pub(crate) fn push_current_session(id: String) {
411    if id.is_empty() {
412        return;
413    }
414    CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().push(id));
415}
416
417pub(crate) fn pop_current_session() {
418    CURRENT_SESSION_STACK.with(|stack| {
419        let _ = stack.borrow_mut().pop();
420    });
421}
422
423pub fn current_session_id() -> Option<String> {
424    CURRENT_SESSION_STACK.with(|stack| stack.borrow().last().cloned())
425}
426
427pub fn enter_current_session(id: impl Into<String>) -> CurrentSessionGuard {
428    let id = id.into();
429    if id.trim().is_empty() {
430        return CurrentSessionGuard { active: false };
431    }
432    push_current_session(id);
433    CurrentSessionGuard { active: true }
434}
435
436fn push_current_tool_call(id: String) {
437    if id.is_empty() {
438        return;
439    }
440    CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow_mut().push(id));
441}
442
443fn pop_current_tool_call() {
444    CURRENT_TOOL_CALL_STACK.with(|stack| {
445        let _ = stack.borrow_mut().pop();
446    });
447}
448
449/// Return the active tool-call id for the current thread, if any.
450///
451/// Hostlib builtins consult this to attribute side-effect snapshots to
452/// the owning ACP `toolCallId` without callers passing it explicitly.
453pub fn current_tool_call_id() -> Option<String> {
454    if let Ok(id) = CURRENT_TOOL_CALL_TASK.try_with(Clone::clone) {
455        if !id.trim().is_empty() {
456            return Some(id);
457        }
458    }
459    CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow().last().cloned())
460}
461
462/// Scope the active tool-call id to one async task.
463///
464/// Parallel tool dispatch runs sibling calls on the same OS thread, so
465/// thread-local guards alone cannot preserve attribution across `.await`
466/// points. Tokio task-local scoping follows the future instead.
467pub async fn scope_current_tool_call<F, T>(id: impl Into<String>, future: F) -> T
468where
469    F: Future<Output = T>,
470{
471    let id = id.into();
472    if id.trim().is_empty() {
473        future.await
474    } else {
475        CURRENT_TOOL_CALL_TASK.scope(id, future).await
476    }
477}
478
479/// Scope the active tool-call id for the duration of the returned guard.
480pub fn enter_current_tool_call(id: impl Into<String>) -> CurrentToolCallGuard {
481    let id = id.into();
482    if id.trim().is_empty() {
483        return CurrentToolCallGuard { active: false };
484    }
485    push_current_tool_call(id);
486    CurrentToolCallGuard { active: true }
487}
488
489pub fn exists(id: &str) -> bool {
490    SESSIONS.with(|s| s.borrow().contains_key(id))
491}
492
493pub fn length(id: &str) -> Option<usize> {
494    SESSIONS.with(|s| {
495        s.borrow().get(id).map(|state| {
496            state
497                .transcript
498                .as_dict()
499                .and_then(|d| d.get("messages"))
500                .and_then(|v| match v {
501                    VmValue::List(list) => Some(list.len()),
502                    _ => None,
503                })
504                .unwrap_or(0)
505        })
506    })
507}
508
509pub fn scratchpad(id: &str) -> Option<VmValue> {
510    SESSIONS.with(|s| {
511        s.borrow()
512            .get(id)
513            .and_then(|state| state.scratchpad.clone())
514    })
515}
516
517pub fn scratchpad_version(id: &str) -> Option<u64> {
518    SESSIONS.with(|s| s.borrow().get(id).map(|state| state.scratchpad_version))
519}
520
521pub fn set_scratchpad(
522    id: &str,
523    scratchpad: VmValue,
524    source: impl Into<String>,
525    reason: Option<String>,
526    metadata: serde_json::Value,
527) -> Result<u64, String> {
528    validate_scratchpad_value(&scratchpad)?;
529    SESSIONS.with(|s| {
530        let mut map = s.borrow_mut();
531        let Some(state) = map.get_mut(id) else {
532            return Err(format!("agent session '{id}' does not exist"));
533        };
534        let version = state.scratchpad_version.saturating_add(1);
535        let event = scratchpad_transcript_event(
536            "set",
537            version,
538            Some(&scratchpad),
539            source.into(),
540            reason,
541            metadata,
542        );
543        append_event_to_state(state, event, "set_scratchpad")?;
544        state.scratchpad = Some(scratchpad);
545        state.scratchpad_version = version;
546        state.last_accessed = Instant::now();
547        Ok(version)
548    })
549}
550
551pub fn clear_scratchpad(
552    id: &str,
553    source: impl Into<String>,
554    reason: Option<String>,
555    metadata: serde_json::Value,
556) -> Result<u64, String> {
557    SESSIONS.with(|s| {
558        let mut map = s.borrow_mut();
559        let Some(state) = map.get_mut(id) else {
560            return Err(format!("agent session '{id}' does not exist"));
561        };
562        let version = state.scratchpad_version.saturating_add(1);
563        let event =
564            scratchpad_transcript_event("clear", version, None, source.into(), reason, metadata);
565        append_event_to_state(state, event, "clear_scratchpad")?;
566        state.scratchpad = None;
567        state.scratchpad_version = version;
568        state.last_accessed = Instant::now();
569        Ok(version)
570    })
571}
572
573fn validate_scratchpad_value(value: &VmValue) -> Result<(), String> {
574    if !matches!(value, VmValue::Dict(_)) {
575        return Err("agent session scratchpad must be a dict".to_string());
576    }
577    let json = crate::llm::helpers::vm_value_to_json(value);
578    let approx_bytes = serde_json::to_vec(&json)
579        .map(|bytes| bytes.len())
580        .unwrap_or(usize::MAX);
581    if approx_bytes > MAX_SCRATCHPAD_BYTES {
582        return Err(format!(
583            "agent session scratchpad is {approx_bytes} bytes; max is {MAX_SCRATCHPAD_BYTES}"
584        ));
585    }
586    Ok(())
587}
588
589fn scratchpad_transcript_event(
590    action: &str,
591    version: u64,
592    scratchpad: Option<&VmValue>,
593    source: String,
594    reason: Option<String>,
595    metadata: serde_json::Value,
596) -> VmValue {
597    let scratchpad_json = scratchpad.map(crate::llm::helpers::vm_value_to_json);
598    let approx_bytes = scratchpad_json
599        .as_ref()
600        .and_then(|value| serde_json::to_vec(value).ok().map(|bytes| bytes.len()))
601        .unwrap_or(0);
602    let event_metadata = serde_json::json!({
603        "action": action,
604        "version": version,
605        "source": normalize_scratchpad_source(source),
606        "reason": reason.unwrap_or_default(),
607        "approx_bytes": approx_bytes,
608        "counts": scratchpad_json
609            .as_ref()
610            .map(scratchpad_counts_json)
611            .unwrap_or_else(|| serde_json::json!({})),
612        "metadata": metadata,
613    });
614    let content = format!("Agent scratchpad {action}");
615    crate::llm::helpers::transcript_event(
616        "agent_scratchpad",
617        "system",
618        "internal",
619        &content,
620        Some(event_metadata),
621    )
622}
623
624fn normalize_scratchpad_source(source: String) -> String {
625    let trimmed = source.trim();
626    if trimmed.is_empty() {
627        "harn.agent_scratchpad".to_string()
628    } else {
629        trimmed.to_string()
630    }
631}
632
633fn scratchpad_counts_json(value: &serde_json::Value) -> serde_json::Value {
634    serde_json::json!({
635        "goals": scratchpad_array_len(value, "goals"),
636        "open_items": scratchpad_array_len(value, "open_items"),
637        "facts": scratchpad_array_len(value, "facts"),
638        "refs": scratchpad_array_len(value, "refs"),
639    })
640}
641
642fn scratchpad_array_len(value: &serde_json::Value, key: &str) -> usize {
643    value
644        .get(key)
645        .and_then(serde_json::Value::as_array)
646        .map(Vec::len)
647        .unwrap_or(0)
648}
649
650pub fn snapshot(id: &str) -> Option<VmValue> {
651    SESSIONS.with(|s| s.borrow().get(id).map(session_snapshot))
652}
653
654/// Session-only fields stay on `agent_session_snapshot`.
655pub fn transcript(id: &str) -> Option<VmValue> {
656    SESSIONS.with(|s| {
657        s.borrow()
658            .get(id)
659            .map(|state| transcript_with_session_metadata(state.transcript.clone(), state))
660    })
661}
662
663/// Open a session, or create it if missing. Returns the resolved id.
664///
665/// Newly-created sessions auto-register an event-log-backed sink when a
666/// generalized [`crate::event_log::EventLog`] has been installed for the
667/// current VM thread. For legacy env-driven workflows that still point
668/// `HARN_EVENT_LOG_DIR` at a directory, we preserve the older JSONL sink
669/// as a compatibility fallback. Re-opening an existing session does not
670/// re-register — sinks are per-session, owned by the first opener.
671pub fn open_or_create(id: Option<String>) -> String {
672    let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
673    let parent_session = current_session_id();
674    let mut was_new = false;
675    SESSIONS.with(|s| {
676        let mut map = s.borrow_mut();
677        if let Some(state) = map.get_mut(&resolved) {
678            state.last_accessed = Instant::now();
679            return;
680        }
681        was_new = true;
682        let cap = SESSION_CAP.with(|c| c.get());
683        if map.len() >= cap {
684            if let Some(victim) = map
685                .iter()
686                .min_by_key(|(_, state)| state.last_accessed)
687                .map(|(id, _)| id.clone())
688            {
689                map.remove(&victim);
690            }
691        }
692        map.insert(resolved.clone(), SessionState::new(resolved.clone()));
693    });
694    if was_new {
695        if let Some(parent) = parent_session.as_deref() {
696            crate::agent_events::mirror_session_sinks(parent, &resolved);
697        }
698        try_register_event_log(&resolved);
699    }
700    resolved
701}
702
703pub fn open_child_session(parent_id: &str, id: Option<String>) -> String {
704    let resolved = open_or_create(id);
705    link_child_session(parent_id, &resolved);
706    resolved
707}
708
709pub fn link_child_session(parent_id: &str, child_id: &str) {
710    link_child_session_with_branch(parent_id, child_id, None);
711}
712
713pub fn link_child_session_with_branch(
714    parent_id: &str,
715    child_id: &str,
716    branched_at_event_index: Option<usize>,
717) {
718    if parent_id == child_id {
719        return;
720    }
721    open_or_create(Some(parent_id.to_string()));
722    open_or_create(Some(child_id.to_string()));
723    SESSIONS.with(|s| {
724        let mut map = s.borrow_mut();
725        update_lineage(&mut map, parent_id, child_id, branched_at_event_index);
726    });
727}
728
729pub fn parent_id(id: &str) -> Option<String> {
730    SESSIONS.with(|s| s.borrow().get(id).and_then(|state| state.parent_id.clone()))
731}
732
733pub fn child_ids(id: &str) -> Vec<String> {
734    SESSIONS.with(|s| {
735        s.borrow()
736            .get(id)
737            .map(|state| state.child_ids.clone())
738            .unwrap_or_default()
739    })
740}
741
742pub fn ancestry(id: &str) -> Option<SessionAncestry> {
743    SESSIONS.with(|s| {
744        let map = s.borrow();
745        let state = map.get(id)?;
746        let mut root_id = state.id.clone();
747        let mut cursor = state.parent_id.clone();
748        let mut seen = HashSet::from([state.id.clone()]);
749        while let Some(parent_id) = cursor {
750            if !seen.insert(parent_id.clone()) {
751                break;
752            }
753            root_id = parent_id.clone();
754            cursor = map
755                .get(&parent_id)
756                .and_then(|parent| parent.parent_id.clone());
757        }
758        Some(SessionAncestry {
759            parent_id: state.parent_id.clone(),
760            child_ids: state.child_ids.clone(),
761            root_id,
762        })
763    })
764}
765
766pub fn live_clients(id: &str) -> Option<Vec<LiveSessionClient>> {
767    SESSIONS.with(|s| {
768        s.borrow()
769            .get(id)
770            .map(|state| state.live_clients.values().cloned().collect())
771    })
772}
773
774pub fn attach_live_client(id: &str, request: AttachLiveClient) -> Result<LiveClientChange, String> {
775    SESSIONS.with(|s| {
776        let mut map = s.borrow_mut();
777        let Some(state) = map.get_mut(id) else {
778            return Err(format!("agent session '{id}' does not exist"));
779        };
780        let client_id = validate_live_client_id(request.client_id)?;
781        let now = crate::orchestration::now_rfc3339();
782        let previous_clients = state.live_clients.clone();
783        let previous_controller_id = state.live_controller_id.clone();
784
785        if request.mode == LiveClientMode::Controller {
786            let conflicting_controller = previous_controller_id
787                .as_ref()
788                .filter(|controller_id| *controller_id != &client_id)
789                .filter(|controller_id| state.live_clients.contains_key(*controller_id));
790            if let Some(previous) = conflicting_controller {
791                if !request.takeover {
792                    return Err(format!("live session already has controller '{previous}'"));
793                }
794                if let Some(previous_client) = state.live_clients.get_mut(previous) {
795                    previous_client.mode = LiveClientMode::Observer;
796                    previous_client.prompt_injection = false;
797                    previous_client.permission_routing = false;
798                    previous_client.last_seen_at = now.clone();
799                }
800            }
801            state.live_controller_id = Some(client_id.clone());
802        } else if state.live_controller_id.as_deref() == Some(client_id.as_str()) {
803            state.live_controller_id = None;
804        }
805
806        let attached_at = state
807            .live_clients
808            .get(&client_id)
809            .map(|client| client.attached_at.clone())
810            .unwrap_or_else(|| now.clone());
811        let client = LiveSessionClient {
812            client_id: client_id.clone(),
813            mode: request.mode,
814            attached_at,
815            last_seen_at: now,
816            prompt_injection: request.prompt_injection,
817            permission_routing: request.permission_routing,
818            metadata: request.metadata,
819        };
820        state.live_clients.insert(client_id, client.clone());
821        state.last_accessed = Instant::now();
822        let active_controller_id = state.live_controller_id.clone();
823        append_live_client_event(
824            state,
825            "attached",
826            Some(&client),
827            previous_controller_id.as_deref(),
828            active_controller_id.as_deref(),
829            serde_json::Value::Null,
830        )
831        .inspect_err(|_error| {
832            state.live_clients = previous_clients;
833            state.live_controller_id = previous_controller_id.clone();
834        })?;
835        Ok(live_client_change(
836            Some(client),
837            previous_controller_id,
838            state,
839        ))
840    })
841}
842
843pub fn takeover_live_client(
844    id: &str,
845    client_id: impl Into<String>,
846    metadata: serde_json::Value,
847) -> Result<LiveClientChange, String> {
848    attach_live_client(
849        id,
850        AttachLiveClient {
851            client_id: client_id.into(),
852            mode: LiveClientMode::Controller,
853            takeover: true,
854            prompt_injection: true,
855            permission_routing: true,
856            metadata,
857        },
858    )
859}
860
861pub fn detach_live_client(
862    id: &str,
863    client_id: impl Into<String>,
864    reason: Option<String>,
865    metadata: serde_json::Value,
866) -> Result<LiveClientChange, String> {
867    SESSIONS.with(|s| {
868        let mut map = s.borrow_mut();
869        let Some(state) = map.get_mut(id) else {
870            return Err(format!("agent session '{id}' does not exist"));
871        };
872        let client_id = validate_live_client_id(client_id.into())?;
873        let previous_clients = state.live_clients.clone();
874        let previous_controller_id = state.live_controller_id.clone();
875        let Some(mut client) = state.live_clients.remove(&client_id) else {
876            return Err(format!("live client '{client_id}' is not attached"));
877        };
878        client.last_seen_at = crate::orchestration::now_rfc3339();
879        if state.live_controller_id.as_deref() == Some(client_id.as_str()) {
880            state.live_controller_id = None;
881        }
882        state.last_accessed = Instant::now();
883        let active_controller_id = state.live_controller_id.clone();
884        append_live_client_event(
885            state,
886            "detached",
887            Some(&client),
888            previous_controller_id.as_deref(),
889            active_controller_id.as_deref(),
890            serde_json::json!({
891                "reason": reason.unwrap_or_else(|| "client_detached".to_string()),
892                "metadata": metadata,
893            }),
894        )
895        .inspect_err(|_error| {
896            state.live_clients = previous_clients;
897            state.live_controller_id = previous_controller_id.clone();
898        })?;
899        Ok(live_client_change(None, previous_controller_id, state))
900    })
901}
902
903pub fn heartbeat_live_client(
904    id: &str,
905    client_id: impl Into<String>,
906    metadata: serde_json::Value,
907) -> Result<LiveClientChange, String> {
908    SESSIONS.with(|s| {
909        let mut map = s.borrow_mut();
910        let Some(state) = map.get_mut(id) else {
911            return Err(format!("agent session '{id}' does not exist"));
912        };
913        let client_id = validate_live_client_id(client_id.into())?;
914        let previous_clients = state.live_clients.clone();
915        let previous_controller_id = state.live_controller_id.clone();
916        let Some(client) = state.live_clients.get_mut(&client_id) else {
917            return Err(format!("live client '{client_id}' is not attached"));
918        };
919        client.last_seen_at = crate::orchestration::now_rfc3339();
920        if !metadata.is_null() {
921            client.metadata = metadata.clone();
922        }
923        let client = client.clone();
924        state.last_accessed = Instant::now();
925        let active_controller_id = state.live_controller_id.clone();
926        append_live_client_event(
927            state,
928            "heartbeat",
929            Some(&client),
930            previous_controller_id.as_deref(),
931            active_controller_id.as_deref(),
932            serde_json::json!({ "metadata": metadata }),
933        )
934        .inspect_err(|_error| {
935            state.live_clients = previous_clients;
936            state.live_controller_id = previous_controller_id.clone();
937        })?;
938        Ok(live_client_change(
939            Some(client),
940            previous_controller_id,
941            state,
942        ))
943    })
944}
945
946pub fn inject_prompt_from_live_client(
947    id: &str,
948    client_id: impl Into<String>,
949    content: VmValue,
950    metadata: serde_json::Value,
951) -> Result<(), String> {
952    let client_id = validate_live_client_id(client_id.into())?;
953    ensure_live_controller(id, &client_id, LiveControllerCapability::PromptInjection)?;
954    let mut message = BTreeMap::new();
955    message.insert(
956        "role".to_string(),
957        VmValue::String(std::sync::Arc::from("user")),
958    );
959    message.insert("content".to_string(), content);
960    message.insert(
961        "metadata".to_string(),
962        crate::stdlib::json_to_vm_value(&serde_json::json!({
963            "live_session": {
964                "client_id": client_id,
965                "mode": "controller",
966                "source": "live_session_attach",
967                "metadata": metadata,
968            }
969        })),
970    );
971    inject_message(id, VmValue::Dict(std::sync::Arc::new(message)))
972}
973
974pub fn route_live_permission_request(
975    id: &str,
976    client_id: impl Into<String>,
977    request: serde_json::Value,
978    metadata: serde_json::Value,
979) -> Result<serde_json::Value, String> {
980    let client_id = validate_live_client_id(client_id.into())?;
981    let client =
982        ensure_live_controller(id, &client_id, LiveControllerCapability::PermissionRouting)?;
983    let request_id = request
984        .get("id")
985        .or_else(|| request.get("request_id"))
986        .and_then(serde_json::Value::as_str)
987        .filter(|id| !id.trim().is_empty())
988        .unwrap_or("permission_request");
989    let event_metadata = serde_json::json!({
990        "action": "permission_routed",
991        "client": live_client_json(&client),
992        "request_id": request_id,
993        "request": request,
994        "metadata": metadata,
995    });
996    let event = crate::llm::helpers::transcript_event(
997        LIVE_CLIENT_PERMISSION_EVENT_KIND,
998        "system",
999        "internal",
1000        "Live session permission request routed",
1001        Some(event_metadata.clone()),
1002    );
1003    append_event(id, event)?;
1004    Ok(event_metadata)
1005}
1006
1007enum LiveControllerCapability {
1008    PromptInjection,
1009    PermissionRouting,
1010}
1011
1012fn ensure_live_controller(
1013    id: &str,
1014    client_id: &str,
1015    capability: LiveControllerCapability,
1016) -> Result<LiveSessionClient, String> {
1017    SESSIONS.with(|s| {
1018        let map = s.borrow();
1019        let Some(state) = map.get(id) else {
1020            return Err(format!("agent session '{id}' does not exist"));
1021        };
1022        if state.live_controller_id.as_deref() != Some(client_id) {
1023            return Err(format!(
1024                "live client '{client_id}' is not the active controller"
1025            ));
1026        }
1027        let Some(client) = state.live_clients.get(client_id) else {
1028            return Err(format!("live client '{client_id}' is not attached"));
1029        };
1030        match capability {
1031            LiveControllerCapability::PromptInjection if !client.prompt_injection => Err(format!(
1032                "live client '{client_id}' cannot inject prompts for this session"
1033            )),
1034            LiveControllerCapability::PermissionRouting if !client.permission_routing => Err(
1035                format!("live client '{client_id}' cannot route permissions for this session"),
1036            ),
1037            _ => Ok(client.clone()),
1038        }
1039    })
1040}
1041
1042fn append_live_client_event(
1043    state: &mut SessionState,
1044    action: &str,
1045    client: Option<&LiveSessionClient>,
1046    previous_controller_id: Option<&str>,
1047    active_controller_id: Option<&str>,
1048    extra: serde_json::Value,
1049) -> Result<(), String> {
1050    let metadata = serde_json::json!({
1051        "action": action,
1052        "session_id": state.id,
1053        "client": client.map(live_client_json),
1054        "previous_controller_id": previous_controller_id,
1055        "active_controller_id": active_controller_id,
1056        "clients": state
1057            .live_clients
1058            .values()
1059            .map(live_client_json)
1060            .collect::<Vec<_>>(),
1061        "extra": extra,
1062    });
1063    let event = crate::llm::helpers::transcript_event(
1064        LIVE_CLIENT_EVENT_KIND,
1065        "system",
1066        "internal",
1067        "Live session client lifecycle changed",
1068        Some(metadata),
1069    );
1070    append_event_to_state(state, event, "live_client")
1071}
1072
1073fn live_client_change(
1074    client: Option<LiveSessionClient>,
1075    previous_controller_id: Option<String>,
1076    state: &SessionState,
1077) -> LiveClientChange {
1078    LiveClientChange {
1079        client,
1080        previous_controller_id,
1081        active_controller_id: state.live_controller_id.clone(),
1082        clients: state.live_clients.values().cloned().collect(),
1083    }
1084}
1085
1086fn validate_live_client_id(id: impl Into<String>) -> Result<String, String> {
1087    let id = id.into();
1088    let trimmed = id.trim();
1089    if trimmed.is_empty() {
1090        return Err("live client id cannot be empty".to_string());
1091    }
1092    Ok(trimmed.to_string())
1093}
1094
1095pub fn live_client_json(client: &LiveSessionClient) -> serde_json::Value {
1096    serde_json::json!({
1097        "client_id": client.client_id,
1098        "mode": client.mode.as_str(),
1099        "attached_at": client.attached_at,
1100        "last_seen_at": client.last_seen_at,
1101        "prompt_injection": client.prompt_injection,
1102        "permission_routing": client.permission_routing,
1103        "metadata": client.metadata,
1104    })
1105}
1106
1107pub fn live_client_change_json(change: &LiveClientChange) -> serde_json::Value {
1108    serde_json::json!({
1109        "client": change.client.as_ref().map(live_client_json),
1110        "previous_controller_id": change.previous_controller_id,
1111        "active_controller_id": change.active_controller_id,
1112        "clients": change
1113            .clients
1114            .iter()
1115            .map(live_client_json)
1116            .collect::<Vec<_>>(),
1117    })
1118}
1119
1120/// Auto-register a persistent sink for a newly-created session.
1121/// Silent no-op on failure — a broken observability sink must never
1122/// prevent a session from starting.
1123fn try_register_event_log(session_id: &str) {
1124    if let Some(log) = crate::event_log::active_event_log() {
1125        crate::agent_events::register_sink(
1126            session_id,
1127            crate::agent_events::EventLogSink::new(log, session_id),
1128        );
1129        return;
1130    }
1131    let Ok(dir) = std::env::var("HARN_EVENT_LOG_DIR") else {
1132        return;
1133    };
1134    if dir.is_empty() {
1135        return;
1136    }
1137    let path = std::path::PathBuf::from(dir).join(format!("event_log-{session_id}.jsonl"));
1138    if let Ok(sink) = crate::agent_events::JsonlEventSink::open(&path) {
1139        crate::agent_events::register_sink(session_id, sink);
1140    }
1141}
1142
1143pub fn register_event_log_sink(session_id: &str) {
1144    try_register_event_log(session_id);
1145}
1146
1147pub fn close(id: &str) {
1148    SESSIONS.with(|s| {
1149        s.borrow_mut().remove(id);
1150    });
1151    // Cross-thread per-session state must be released too, otherwise
1152    // pending inbox entries can be delivered to a future session that
1153    // happens to reuse the same id.
1154    crate::orchestration::agent_inbox::clear_session(id);
1155    crate::agent_events::clear_session_sinks(id);
1156}
1157
1158pub fn close_with_status(
1159    id: &str,
1160    reason: impl Into<String>,
1161    status: impl Into<String>,
1162    metadata: serde_json::Value,
1163) -> bool {
1164    if !exists(id) {
1165        return false;
1166    }
1167    let reason = reason.into();
1168    let status = status.into();
1169    let event_metadata = serde_json::json!({
1170        "reason": reason,
1171        "status": status,
1172        "metadata": metadata,
1173    });
1174    let transcript_event = crate::llm::helpers::transcript_event(
1175        "agent_session_closed",
1176        "system",
1177        "internal",
1178        "Agent session closed",
1179        Some(event_metadata),
1180    );
1181    let _ = append_event(id, transcript_event);
1182    crate::llm::emit_live_agent_event_sync(&crate::agent_events::AgentEvent::SessionClosed {
1183        session_id: id.to_string(),
1184        reason,
1185        status,
1186        metadata,
1187    });
1188    close(id);
1189    true
1190}
1191
1192pub fn reset_transcript(id: &str) -> bool {
1193    SESSIONS.with(|s| {
1194        let mut map = s.borrow_mut();
1195        let Some(state) = map.get_mut(id) else {
1196            return false;
1197        };
1198        state.transcript = empty_transcript(id);
1199        state.tool_format = None;
1200        state.system_prompt = None;
1201        state.scratchpad = None;
1202        state.scratchpad_version = 0;
1203        state.last_transcript_budget_action = None;
1204        state.completed_turn_checkpoints.clear();
1205        state.redo_stack.clear();
1206        state.last_accessed = Instant::now();
1207        true
1208    })
1209}
1210
1211/// Copy `src`'s transcript into a new session id. Subscribers are NOT
1212/// copied — a fork is a conversation branch, not an event fanout.
1213///
1214/// Touches `src`'s `last_accessed` before evicting, so the fork
1215/// operation itself can't make `src` look stale and kick it out of
1216/// the LRU just to make room for the new fork.
1217pub fn fork(src_id: &str, dst_id: Option<String>) -> Option<String> {
1218    let (
1219        src_transcript,
1220        src_tool_format,
1221        src_system_prompt,
1222        src_pinned_model,
1223        src_pinned_reasoning_policy,
1224        src_workspace_anchor,
1225        src_workspace_policy,
1226        src_scratchpad,
1227        src_scratchpad_version,
1228        src_transcript_budget_policy,
1229        src_last_transcript_budget_action,
1230        dst,
1231    ) = SESSIONS.with(|s| {
1232        let mut map = s.borrow_mut();
1233        let src = map.get_mut(src_id)?;
1234        src.last_accessed = Instant::now();
1235        let dst = dst_id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
1236        let forked_transcript = clone_transcript_with_id(&src.transcript, &dst);
1237        Some((
1238            forked_transcript,
1239            src.tool_format.clone(),
1240            src.system_prompt.clone(),
1241            src.pinned_model.clone(),
1242            src.pinned_reasoning_policy.clone(),
1243            src.workspace_anchor.clone(),
1244            src.workspace_policy.clone(),
1245            src.scratchpad.clone(),
1246            src.scratchpad_version,
1247            src.transcript_budget_policy.clone(),
1248            src.last_transcript_budget_action.clone(),
1249            dst,
1250        ))
1251    })?;
1252    // Ensure cap is respected when inserting the fork.
1253    open_or_create(Some(dst.clone()));
1254    SESSIONS.with(|s| {
1255        let mut map = s.borrow_mut();
1256        if let Some(state) = map.get_mut(&dst) {
1257            state.transcript = src_transcript;
1258            state.tool_format = src_tool_format;
1259            state.system_prompt = src_system_prompt;
1260            state.pinned_model = src_pinned_model;
1261            state.pinned_reasoning_policy = src_pinned_reasoning_policy;
1262            state.workspace_anchor = src_workspace_anchor;
1263            state.workspace_policy = src_workspace_policy;
1264            state.scratchpad = src_scratchpad;
1265            state.scratchpad_version = src_scratchpad_version;
1266            state.transcript_budget_policy = src_transcript_budget_policy;
1267            state.last_transcript_budget_action = src_last_transcript_budget_action;
1268            state.last_accessed = Instant::now();
1269        }
1270        update_lineage(&mut map, src_id, &dst, None);
1271    });
1272    let budget_ok = SESSIONS.with(|s| {
1273        let mut map = s.borrow_mut();
1274        let Some(state) = map.get_mut(&dst) else {
1275            return false;
1276        };
1277        let candidate = state.transcript.clone();
1278        apply_transcript_with_budget(state, candidate, "fork").is_ok()
1279    });
1280    if !budget_ok {
1281        close(&dst);
1282        return None;
1283    }
1284    // open_or_create evicts BEFORE inserting, so the dst slot is
1285    // guaranteed once we get here. The existence check is cheap
1286    // insurance against a future refactor that breaks that invariant.
1287    if exists(&dst) {
1288        Some(dst)
1289    } else {
1290        None
1291    }
1292}
1293
1294/// Fork `src_id` and truncate the destination transcript to the
1295/// first `keep_first` messages (#105 — branch-replay). Pairs with the
1296/// scrubber: the host picks an event index, rebuilds a message count,
1297/// and calls this to spawn a live sibling session that resumes from
1298/// the rebuilt state. Subscribers are not carried over (same as
1299/// `fork`), so sibling events don't double-fan into the parent's
1300/// consumers.
1301///
1302/// Returns the new session id on success, `None` if `src_id` doesn't
1303/// exist.
1304pub fn fork_at(src_id: &str, keep_first: usize, dst_id: Option<String>) -> Option<String> {
1305    let branched_at_event_index = SESSIONS.with(|s| {
1306        let map = s.borrow();
1307        let src = map.get(src_id)?;
1308        Some(branch_event_index(&src.transcript, keep_first))
1309    })?;
1310    let new_id = fork(src_id, dst_id)?;
1311    link_child_session_with_branch(src_id, &new_id, Some(branched_at_event_index));
1312    let _ = truncate(&new_id, keep_first);
1313    Some(new_id)
1314}
1315
1316/// Truncate the session transcript to the first `keep_first`
1317/// messages (opposite of `trim`, which keeps the last N). Returns
1318/// counts and the retained tip event id when the session exists.
1319pub fn truncate(id: &str, keep_first: usize) -> Option<SessionTruncateResult> {
1320    SESSIONS.with(|s| {
1321        let mut map = s.borrow_mut();
1322        let state = map.get_mut(id)?;
1323        let result = truncate_state(state, keep_first)?;
1324        Some(result)
1325    })
1326}
1327
1328fn truncate_state(state: &mut SessionState, keep_first: usize) -> Option<SessionTruncateResult> {
1329    let dict = state
1330        .transcript
1331        .as_dict()
1332        .cloned()
1333        .unwrap_or_else(BTreeMap::new);
1334    let messages: Vec<VmValue> = match dict.get("messages") {
1335        Some(VmValue::List(list)) => list.iter().cloned().collect(),
1336        _ => Vec::new(),
1337    };
1338    let existing_events = match dict.get("events") {
1339        Some(VmValue::List(list)) => Some(list.iter().cloned().collect::<Vec<_>>()),
1340        _ => None,
1341    };
1342    let kept_turn_count = keep_first.min(messages.len());
1343    let removed_turn_count = messages.len().saturating_sub(kept_turn_count);
1344    let mut new_tip_turn_id = existing_events
1345        .as_ref()
1346        .map(|events| turn_event_id_for_count(events, kept_turn_count))
1347        .unwrap_or_else(|| {
1348            let events = crate::llm::helpers::transcript_events_from_messages(&messages);
1349            turn_event_id_for_count(&events, kept_turn_count)
1350        });
1351
1352    if removed_turn_count > 0 {
1353        let retained: Vec<VmValue> = messages.into_iter().take(kept_turn_count).collect();
1354        let retained_events = match existing_events {
1355            Some(events) => {
1356                let keep_event_count = event_prefix_len_for_messages(&events, kept_turn_count);
1357                events.into_iter().take(keep_event_count).collect()
1358            }
1359            None => crate::llm::helpers::transcript_events_from_messages(&retained),
1360        };
1361        new_tip_turn_id = turn_event_id_for_count(&retained_events, kept_turn_count);
1362        let mut next = dict;
1363        next.insert(
1364            "events".to_string(),
1365            VmValue::List(std::sync::Arc::new(retained_events)),
1366        );
1367        next.insert(
1368            "messages".to_string(),
1369            VmValue::List(std::sync::Arc::new(retained)),
1370        );
1371        next.remove("summary");
1372        apply_transcript_with_budget(state, VmValue::Dict(std::sync::Arc::new(next)), "truncate")
1373            .ok()?;
1374    }
1375    state.last_accessed = Instant::now();
1376    Some(SessionTruncateResult {
1377        kept_turn_count,
1378        removed_turn_count,
1379        new_tip_turn_id,
1380    })
1381}
1382
1383/// Pop the trailing message iff it is an assistant message. Used by
1384/// `agent_step_judge` to remove a vetoed assistant turn before
1385/// regeneration (the "replace" on_veto path). Returns `true` if a
1386/// message was popped, `false` if the transcript was empty, and an
1387/// error if the trailing message was not an assistant turn —
1388/// signalling a call-site discipline bug rather than a runtime error.
1389pub fn pop_last_if_assistant(id: &str) -> Result<bool, String> {
1390    SESSIONS.with(|s| {
1391        let mut map = s.borrow_mut();
1392        let Some(state) = map.get_mut(id) else {
1393            return Err(format!(
1394                "pop_last_if_assistant: unknown session id '{id}'"
1395            ));
1396        };
1397        let messages: Vec<VmValue> = match state.transcript.as_dict() {
1398            Some(dict) => match dict.get("messages") {
1399                Some(VmValue::List(list)) => list.iter().cloned().collect(),
1400                _ => Vec::new(),
1401            },
1402            None => Vec::new(),
1403        };
1404        if messages.is_empty() {
1405            return Ok(false);
1406        }
1407        let trailing_role = messages
1408            .last()
1409            .and_then(|m| m.as_dict())
1410            .and_then(|d| d.get("role"))
1411            .map(|v| v.display())
1412            .unwrap_or_default();
1413        if trailing_role != "assistant" {
1414            return Err(format!(
1415                "pop_last_if_assistant: trailing message role is '{trailing_role}', expected 'assistant'"
1416            ));
1417        }
1418        let keep = messages.len() - 1;
1419        truncate_state(state, keep);
1420        Ok(true)
1421    })
1422}
1423
1424pub fn trim(id: &str, keep_last: usize) -> Option<usize> {
1425    SESSIONS.with(|s| {
1426        let mut map = s.borrow_mut();
1427        let state = map.get_mut(id)?;
1428        let dict = state.transcript.as_dict()?.clone();
1429        let messages: Vec<VmValue> = match dict.get("messages") {
1430            Some(VmValue::List(list)) => list.iter().cloned().collect(),
1431            _ => Vec::new(),
1432        };
1433        let start = messages.len().saturating_sub(keep_last);
1434        let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
1435        let kept = retained.len();
1436        let mut next = dict;
1437        next.insert(
1438            "events".to_string(),
1439            VmValue::List(std::sync::Arc::new(
1440                crate::llm::helpers::transcript_events_from_messages(&retained),
1441            )),
1442        );
1443        next.insert(
1444            "messages".to_string(),
1445            VmValue::List(std::sync::Arc::new(retained)),
1446        );
1447        apply_transcript_with_budget(state, VmValue::Dict(std::sync::Arc::new(next)), "trim")
1448            .ok()?;
1449        state.last_accessed = Instant::now();
1450        Some(kept)
1451    })
1452}
1453
1454/// Append a message dict to the session transcript. The message must
1455/// have at least a string `role`; anything else is merged verbatim.
1456pub fn inject_message(id: &str, message: VmValue) -> Result<(), String> {
1457    let Some(msg_dict) = message.as_dict().cloned() else {
1458        return Err("agent_session_inject: message must be a dict".into());
1459    };
1460    let role_ok = matches!(msg_dict.get("role"), Some(VmValue::String(_)));
1461    if !role_ok {
1462        return Err(
1463            "agent_session_inject: message must have a string `role` (user|assistant|tool_result|system)"
1464                .into(),
1465        );
1466    }
1467    SESSIONS.with(|s| {
1468        let mut map = s.borrow_mut();
1469        let Some(state) = map.get_mut(id) else {
1470            return Err(format!("agent_session_inject: unknown session id '{id}'"));
1471        };
1472        let dict = state
1473            .transcript
1474            .as_dict()
1475            .cloned()
1476            .unwrap_or_else(BTreeMap::new);
1477        let mut messages: Vec<VmValue> = match dict.get("messages") {
1478            Some(VmValue::List(list)) => list.iter().cloned().collect(),
1479            _ => Vec::new(),
1480        };
1481        let mut events: Vec<VmValue> = match dict.get("events") {
1482            Some(VmValue::List(list)) => list.iter().cloned().collect(),
1483            _ => crate::llm::helpers::transcript_events_from_messages(&messages),
1484        };
1485        let new_message = VmValue::Dict(std::sync::Arc::new(msg_dict));
1486        let message_index = messages.len();
1487        events.push(crate::llm::helpers::transcript_event_from_message(
1488            &new_message,
1489        ));
1490        messages.push(new_message);
1491        let mut next = dict;
1492        next.insert(
1493            "events".to_string(),
1494            VmValue::List(std::sync::Arc::new(events)),
1495        );
1496        next.insert(
1497            "messages".to_string(),
1498            VmValue::List(std::sync::Arc::new(messages)),
1499        );
1500        let persisted_message = next
1501            .get("messages")
1502            .and_then(|value| match value {
1503                VmValue::List(list) => list.get(message_index).cloned(),
1504                _ => None,
1505            })
1506            .unwrap_or(VmValue::Nil);
1507        apply_transcript_with_budget(
1508            state,
1509            VmValue::Dict(std::sync::Arc::new(next)),
1510            "inject_message",
1511        )?;
1512        emit_identified_user_message_event(id, &persisted_message);
1513        emit_llm_message_event(id, message_index, &persisted_message);
1514        state.last_accessed = Instant::now();
1515        Ok(())
1516    })
1517}
1518
1519fn emit_identified_user_message_event(session_id: &str, message: &VmValue) {
1520    let message_json = crate::llm::helpers::vm_value_to_json(message);
1521    let role = message_json.get("role").and_then(|value| value.as_str());
1522    if role != Some("user") {
1523        return;
1524    }
1525    let Some(message_id) = message_json
1526        .get("messageId")
1527        .or_else(|| message_json.get("message_id"))
1528        .and_then(|value| value.as_str())
1529        .filter(|value| !value.trim().is_empty())
1530    else {
1531        return;
1532    };
1533    let content = message_json
1534        .get("content")
1535        .map(user_message_content_blocks)
1536        .unwrap_or_default();
1537    crate::agent_events::emit_event(&crate::agent_events::AgentEvent::UserMessage {
1538        session_id: session_id.to_string(),
1539        message_id: message_id.to_string(),
1540        content,
1541    });
1542}
1543
1544fn user_message_content_blocks(content: &serde_json::Value) -> Vec<serde_json::Value> {
1545    match content {
1546        serde_json::Value::Array(items) => items.clone(),
1547        serde_json::Value::String(text) => vec![serde_json::json!({
1548            "type": "text",
1549            "text": text,
1550        })],
1551        other => vec![serde_json::json!({
1552            "type": "text",
1553            "text": other.to_string(),
1554        })],
1555    }
1556}
1557
1558#[derive(Clone, Debug, PartialEq, Eq)]
1559struct TranscriptBudgetUsage {
1560    message_count: usize,
1561    event_count: usize,
1562    approx_bytes: Option<usize>,
1563}
1564
1565fn transcript_messages_from_dict(dict: &BTreeMap<String, VmValue>) -> Vec<VmValue> {
1566    match dict.get("messages") {
1567        Some(VmValue::List(list)) => list.iter().cloned().collect(),
1568        _ => Vec::new(),
1569    }
1570}
1571
1572fn transcript_message_count(transcript: &VmValue) -> usize {
1573    transcript
1574        .as_dict()
1575        .map(transcript_messages_from_dict)
1576        .map(|messages| messages.len())
1577        .unwrap_or(0)
1578}
1579
1580fn transcript_events_from_dict(dict: &BTreeMap<String, VmValue>) -> Vec<VmValue> {
1581    match dict.get("events") {
1582        Some(VmValue::List(list)) => list.iter().cloned().collect(),
1583        _ => {
1584            let messages = transcript_messages_from_dict(dict);
1585            crate::llm::helpers::transcript_events_from_messages(&messages)
1586        }
1587    }
1588}
1589
1590fn transcript_usage(transcript: &VmValue, include_bytes: bool) -> TranscriptBudgetUsage {
1591    let Some(dict) = transcript.as_dict() else {
1592        return TranscriptBudgetUsage {
1593            message_count: 0,
1594            event_count: 0,
1595            approx_bytes: include_bytes.then_some(0),
1596        };
1597    };
1598    let approx_bytes = if include_bytes {
1599        serde_json::to_vec(&crate::llm::helpers::vm_value_to_json(transcript))
1600            .map(|bytes| bytes.len())
1601            .ok()
1602            .or(Some(usize::MAX))
1603    } else {
1604        None
1605    };
1606    TranscriptBudgetUsage {
1607        message_count: transcript_messages_from_dict(dict).len(),
1608        event_count: transcript_events_from_dict(dict).len(),
1609        approx_bytes,
1610    }
1611}
1612
1613fn transcript_budget_exceeded_reason(
1614    usage: &TranscriptBudgetUsage,
1615    policy: &SessionTranscriptBudgetPolicy,
1616) -> Option<&'static str> {
1617    if usage.message_count > policy.max_messages {
1618        return Some("message_count");
1619    }
1620    if usage.event_count > policy.max_events {
1621        return Some("event_count");
1622    }
1623    if let (Some(bytes), Some(limit)) = (usage.approx_bytes, policy.max_approx_bytes) {
1624        if bytes > limit {
1625            return Some("approx_bytes");
1626        }
1627    }
1628    None
1629}
1630
1631fn transcript_budget_usage_json(usage: &TranscriptBudgetUsage) -> serde_json::Value {
1632    serde_json::json!({
1633        "messages": usage.message_count,
1634        "events": usage.event_count,
1635        "approx_bytes": usage.approx_bytes,
1636    })
1637}
1638
1639fn transcript_budget_policy_json(policy: &SessionTranscriptBudgetPolicy) -> serde_json::Value {
1640    let recovery = match &policy.recovery {
1641        TranscriptBudgetRecovery::Reject => serde_json::json!({"action": "reject"}),
1642        TranscriptBudgetRecovery::Trim { keep_last } => {
1643            serde_json::json!({"action": "trim", "keep_last": keep_last})
1644        }
1645        TranscriptBudgetRecovery::Compact { keep_last } => {
1646            serde_json::json!({"action": "compact", "keep_last": keep_last})
1647        }
1648    };
1649    serde_json::json!({
1650        "max_messages": policy.max_messages,
1651        "max_events": policy.max_events,
1652        "max_approx_bytes": policy.max_approx_bytes,
1653        "recovery": recovery,
1654    })
1655}
1656
1657fn transcript_budget_recovery_name(recovery: &TranscriptBudgetRecovery) -> &'static str {
1658    match recovery {
1659        TranscriptBudgetRecovery::Reject => "reject",
1660        TranscriptBudgetRecovery::Trim { .. } => "trim",
1661        TranscriptBudgetRecovery::Compact { .. } => "compact",
1662    }
1663}
1664
1665fn transcript_budget_error(
1666    state: &SessionState,
1667    policy: &SessionTranscriptBudgetPolicy,
1668    usage: &TranscriptBudgetUsage,
1669    reason: &str,
1670) -> String {
1671    let byte_suffix = match (usage.approx_bytes, policy.max_approx_bytes) {
1672        (Some(bytes), Some(limit)) => format!(", approx_bytes {bytes}/{limit}"),
1673        _ => String::new(),
1674    };
1675    format!(
1676        "transcript budget exceeded for session '{}': {reason} (messages {}/{}, events {}/{}{}; recovery={})",
1677        state.id,
1678        usage.message_count,
1679        policy.max_messages,
1680        usage.event_count,
1681        policy.max_events,
1682        byte_suffix,
1683        transcript_budget_recovery_name(&policy.recovery),
1684    )
1685}
1686
1687fn transcript_budget_audit_json(
1688    action: &str,
1689    source: &str,
1690    reason: &str,
1691    policy: &SessionTranscriptBudgetPolicy,
1692    usage_before: &TranscriptBudgetUsage,
1693    usage_attempted: &TranscriptBudgetUsage,
1694    usage_after: &TranscriptBudgetUsage,
1695) -> serde_json::Value {
1696    serde_json::json!({
1697        "action": action,
1698        "source": source,
1699        "reason": reason,
1700        "policy": transcript_budget_policy_json(policy),
1701        "usage_before": transcript_budget_usage_json(usage_before),
1702        "usage_attempted": transcript_budget_usage_json(usage_attempted),
1703        "usage_after": transcript_budget_usage_json(usage_after),
1704        "removed_messages": usage_attempted.message_count.saturating_sub(usage_after.message_count),
1705        "removed_events": usage_attempted.event_count.saturating_sub(usage_after.event_count),
1706    })
1707}
1708
1709fn transcript_budget_event(audit: &serde_json::Value) -> VmValue {
1710    let action = audit
1711        .get("action")
1712        .and_then(serde_json::Value::as_str)
1713        .unwrap_or("enforced");
1714    crate::llm::helpers::transcript_event(
1715        "transcript_budget",
1716        "system",
1717        "internal",
1718        &format!("Transcript budget {action}."),
1719        Some(audit.clone()),
1720    )
1721}
1722
1723fn append_event_to_transcript(transcript: VmValue, event: VmValue) -> VmValue {
1724    let Some(dict) = transcript.as_dict() else {
1725        return transcript;
1726    };
1727    let mut next = dict.clone();
1728    let mut events = transcript_events_from_dict(&next);
1729    events.push(event);
1730    next.insert(
1731        "events".to_string(),
1732        VmValue::List(std::sync::Arc::new(events)),
1733    );
1734    VmValue::Dict(std::sync::Arc::new(next))
1735}
1736
1737fn tail_message_capacity(
1738    policy: &SessionTranscriptBudgetPolicy,
1739    reserve_audit_event: bool,
1740) -> usize {
1741    let event_capacity = tail_event_capacity(policy, usize::from(reserve_audit_event));
1742    policy.max_messages.min(event_capacity)
1743}
1744
1745fn tail_event_capacity(policy: &SessionTranscriptBudgetPolicy, reserved_events: usize) -> usize {
1746    policy.max_events.saturating_sub(reserved_events)
1747}
1748
1749fn trim_transcript_for_budget(
1750    transcript: &VmValue,
1751    policy: &SessionTranscriptBudgetPolicy,
1752    keep_last: usize,
1753) -> VmValue {
1754    let dict = transcript.as_dict().cloned().unwrap_or_else(BTreeMap::new);
1755    let messages = transcript_messages_from_dict(&dict);
1756    let keep = keep_last.min(tail_message_capacity(policy, true));
1757    let start = messages.len().saturating_sub(keep);
1758    let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
1759    let mut next = dict;
1760    next.insert(
1761        "events".to_string(),
1762        VmValue::List(std::sync::Arc::new(
1763            crate::llm::helpers::transcript_events_from_messages(&retained),
1764        )),
1765    );
1766    next.insert(
1767        "messages".to_string(),
1768        VmValue::List(std::sync::Arc::new(retained)),
1769    );
1770    next.remove("summary");
1771    VmValue::Dict(std::sync::Arc::new(next))
1772}
1773
1774struct BudgetCompactionLiveEvent {
1775    policy: crate::orchestration::CompactionPolicy,
1776    policy_strategy: String,
1777    metrics: crate::orchestration::TranscriptCompactedEventMetrics,
1778}
1779
1780struct BudgetCompactionResult {
1781    transcript: VmValue,
1782    live_event: Option<BudgetCompactionLiveEvent>,
1783}
1784
1785fn compact_transcript_for_budget(
1786    transcript: &VmValue,
1787    policy: &SessionTranscriptBudgetPolicy,
1788    keep_last: usize,
1789    session_id: &str,
1790) -> BudgetCompactionResult {
1791    let dict = transcript.as_dict().cloned().unwrap_or_else(BTreeMap::new);
1792    let messages = transcript_messages_from_dict(&dict);
1793    let message_capacity = policy.max_messages.min(tail_event_capacity(policy, 2));
1794    // Auto-compaction may widen a suffix to start on a clean user-turn boundary,
1795    // so reserve one extra slot beyond the summary when sizing for hard caps.
1796    let tail_keep = keep_last.min(message_capacity.saturating_sub(2));
1797    let mut config = crate::orchestration::AutoCompactConfig {
1798        token_threshold: 0,
1799        keep_last: tail_keep,
1800        compact_strategy: crate::orchestration::CompactStrategy::Llm,
1801        hard_limit_strategy: crate::orchestration::CompactStrategy::Truncate,
1802        fallback_strategy: Some(crate::orchestration::CompactStrategy::Truncate),
1803        policy_strategy: crate::orchestration::compact_strategy_name(
1804            &crate::orchestration::CompactStrategy::Llm,
1805        )
1806        .to_string(),
1807        ..Default::default()
1808    };
1809
1810    let mut json_messages = messages
1811        .iter()
1812        .map(crate::llm::helpers::vm_value_to_json)
1813        .collect::<Vec<_>>();
1814    let lifecycle =
1815        crate::orchestration::CompactLifecycle::new(crate::orchestration::CompactMode::Auto)
1816            .with_session_id(Some(session_id))
1817            .with_trigger(crate::orchestration::CompactionTrigger::BudgetPressure)
1818            .with_hook_dispatch(false)
1819            .with_evaluate_providers(false);
1820    let llm_opts = crate::llm::extract_llm_options(&[
1821        VmValue::String(std::sync::Arc::from("")),
1822        VmValue::Nil,
1823        VmValue::Nil,
1824    ])
1825    .ok();
1826    let outcome = futures::executor::block_on(crate::orchestration::run_compaction_lifecycle(
1827        &mut json_messages,
1828        &mut config,
1829        llm_opts.as_ref(),
1830        lifecycle,
1831    ))
1832    .ok()
1833    .flatten();
1834
1835    let retained = json_messages
1836        .iter()
1837        .map(crate::stdlib::json_to_vm_value)
1838        .collect::<Vec<_>>();
1839    let mut events = crate::llm::helpers::transcript_events_from_messages(&retained);
1840    let summary = outcome.as_ref().map(|outcome| outcome.summary.clone());
1841    let mut live_event = None;
1842    if let Some(outcome) = outcome {
1843        events.push(crate::llm::helpers::transcript_event(
1844            "compaction",
1845            "system",
1846            "internal",
1847            "",
1848            Some(outcome.event_metadata.clone()),
1849        ));
1850        live_event = Some(BudgetCompactionLiveEvent {
1851            policy: config.policy.clone(),
1852            policy_strategy: outcome.policy_strategy,
1853            metrics: crate::orchestration::TranscriptCompactedEventMetrics {
1854                archived_messages: outcome.archived_messages,
1855                estimated_tokens_before: outcome.estimated_tokens_before,
1856                estimated_tokens_after: outcome.estimated_tokens_after,
1857                snapshot_asset_id: outcome.snapshot_asset_id,
1858            },
1859        });
1860    }
1861
1862    let mut next = dict;
1863    next.insert(
1864        "events".to_string(),
1865        VmValue::List(std::sync::Arc::new(events)),
1866    );
1867    next.insert(
1868        "messages".to_string(),
1869        VmValue::List(std::sync::Arc::new(retained)),
1870    );
1871    if let Some(summary) = summary {
1872        next.insert(
1873            "summary".to_string(),
1874            VmValue::String(std::sync::Arc::from(summary)),
1875        );
1876    } else {
1877        next.remove("summary");
1878    }
1879    BudgetCompactionResult {
1880        transcript: VmValue::Dict(std::sync::Arc::new(next)),
1881        live_event,
1882    }
1883}
1884
1885fn recovered_transcript_with_audit(
1886    recovered: VmValue,
1887    action: &str,
1888    source: &str,
1889    reason: &str,
1890    policy: &SessionTranscriptBudgetPolicy,
1891    usage_before: &TranscriptBudgetUsage,
1892    usage_attempted: &TranscriptBudgetUsage,
1893    include_bytes: bool,
1894) -> (VmValue, serde_json::Value, TranscriptBudgetUsage) {
1895    let usage_after_without_audit = transcript_usage(&recovered, include_bytes);
1896    let initial_audit = transcript_budget_audit_json(
1897        action,
1898        source,
1899        reason,
1900        policy,
1901        usage_before,
1902        usage_attempted,
1903        &usage_after_without_audit,
1904    );
1905    let with_initial_audit =
1906        append_event_to_transcript(recovered.clone(), transcript_budget_event(&initial_audit));
1907    let usage_after = transcript_usage(&with_initial_audit, include_bytes);
1908    let audit = transcript_budget_audit_json(
1909        action,
1910        source,
1911        reason,
1912        policy,
1913        usage_before,
1914        usage_attempted,
1915        &usage_after,
1916    );
1917    let with_audit = append_event_to_transcript(recovered, transcript_budget_event(&audit));
1918    let usage_after = transcript_usage(&with_audit, include_bytes);
1919    (with_audit, audit, usage_after)
1920}
1921
1922fn apply_transcript_with_budget(
1923    state: &mut SessionState,
1924    candidate: VmValue,
1925    source: &str,
1926) -> Result<(), String> {
1927    state.redo_stack.clear();
1928    let policy = state.transcript_budget_policy.normalized();
1929    let include_bytes = policy.max_approx_bytes.is_some();
1930    let usage_before = transcript_usage(&state.transcript, include_bytes);
1931    let usage_attempted = transcript_usage(&candidate, include_bytes);
1932    let Some(reason) = transcript_budget_exceeded_reason(&usage_attempted, &policy) else {
1933        state.transcript = candidate;
1934        return Ok(());
1935    };
1936
1937    match policy.recovery.clone() {
1938        TranscriptBudgetRecovery::Reject => {
1939            let audit = transcript_budget_audit_json(
1940                "rejected",
1941                source,
1942                reason,
1943                &policy,
1944                &usage_before,
1945                &usage_attempted,
1946                &usage_before,
1947            );
1948            state.last_transcript_budget_action = Some(audit);
1949            Err(transcript_budget_error(
1950                state,
1951                &policy,
1952                &usage_attempted,
1953                reason,
1954            ))
1955        }
1956        TranscriptBudgetRecovery::Trim { keep_last } => {
1957            let recovered = trim_transcript_for_budget(&candidate, &policy, keep_last);
1958            let (with_audit, audit, usage_after) = recovered_transcript_with_audit(
1959                recovered,
1960                "trimmed",
1961                source,
1962                reason,
1963                &policy,
1964                &usage_before,
1965                &usage_attempted,
1966                include_bytes,
1967            );
1968            if transcript_budget_exceeded_reason(&usage_after, &policy).is_some() {
1969                let rejected = transcript_budget_audit_json(
1970                    "rejected",
1971                    source,
1972                    reason,
1973                    &policy,
1974                    &usage_before,
1975                    &usage_attempted,
1976                    &usage_after,
1977                );
1978                state.last_transcript_budget_action = Some(rejected);
1979                return Err(transcript_budget_error(
1980                    state,
1981                    &policy,
1982                    &usage_after,
1983                    reason,
1984                ));
1985            }
1986            state.last_transcript_budget_action = Some(audit);
1987            state.transcript = with_audit;
1988            Ok(())
1989        }
1990        TranscriptBudgetRecovery::Compact { keep_last } => {
1991            let compacted =
1992                compact_transcript_for_budget(&candidate, &policy, keep_last, &state.id);
1993            let (with_audit, audit, usage_after) = recovered_transcript_with_audit(
1994                compacted.transcript,
1995                "compacted",
1996                source,
1997                reason,
1998                &policy,
1999                &usage_before,
2000                &usage_attempted,
2001                include_bytes,
2002            );
2003            if transcript_budget_exceeded_reason(&usage_after, &policy).is_some() {
2004                let rejected = transcript_budget_audit_json(
2005                    "rejected",
2006                    source,
2007                    reason,
2008                    &policy,
2009                    &usage_before,
2010                    &usage_attempted,
2011                    &usage_after,
2012                );
2013                state.last_transcript_budget_action = Some(rejected);
2014                return Err(transcript_budget_error(
2015                    state,
2016                    &policy,
2017                    &usage_after,
2018                    reason,
2019                ));
2020            }
2021            state.last_transcript_budget_action = Some(audit);
2022            state.transcript = with_audit;
2023            if let Some(event) = compacted.live_event {
2024                crate::orchestration::emit_transcript_compacted_event_sync(
2025                    &state.id,
2026                    crate::orchestration::CompactMode::Auto,
2027                    crate::orchestration::CompactionTrigger::BudgetPressure
2028                        .as_str()
2029                        .to_string(),
2030                    &event.policy,
2031                    event.policy_strategy,
2032                    event.metrics,
2033                );
2034            }
2035            Ok(())
2036        }
2037    }
2038}
2039
2040fn emit_llm_message_event(session_id: &str, message_index: usize, message: &VmValue) {
2041    let mut fields = serde_json::Map::new();
2042    fields.insert(
2043        "session_id".to_string(),
2044        serde_json::Value::String(session_id.to_string()),
2045    );
2046    fields.insert(
2047        "message_index".to_string(),
2048        serde_json::json!(message_index),
2049    );
2050    let message_json = crate::llm::helpers::vm_value_to_json(message);
2051    if let Some(role) = message_json.get("role").and_then(|value| value.as_str()) {
2052        fields.insert(
2053            "role".to_string(),
2054            serde_json::Value::String(role.to_string()),
2055        );
2056    }
2057    if let Some(content) = message_json.get("content") {
2058        fields.insert("content".to_string(), content.clone());
2059    }
2060    fields.insert("message".to_string(), message_json);
2061    crate::llm::append_observability_sidecar_entry("message", fields);
2062}
2063
2064/// Create a new session from a reconstructed message list.
2065///
2066/// This is intentionally an all-at-once write instead of repeated
2067/// `inject_message` calls: importing a transcript should not re-emit
2068/// each historic turn into the active observability sidecar.
2069pub fn seed_from_messages(
2070    id: Option<String>,
2071    messages: &[serde_json::Value],
2072    metadata: serde_json::Value,
2073    system_prompt: Option<String>,
2074    tool_format: Option<String>,
2075) -> Result<String, String> {
2076    let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
2077    if exists(&resolved) {
2078        return Err(format!("agent session '{resolved}' already exists"));
2079    }
2080    open_or_create(Some(resolved.clone()));
2081    SESSIONS.with(|s| {
2082        let mut map = s.borrow_mut();
2083        let Some(state) = map.get_mut(&resolved) else {
2084            return Err(format!("failed to create agent session '{resolved}'"));
2085        };
2086        state.tool_format = tool_format.filter(|value| !value.trim().is_empty());
2087        state.system_prompt = system_prompt.filter(|value| !value.trim().is_empty());
2088
2089        let mut metadata = metadata
2090            .as_object()
2091            .cloned()
2092            .unwrap_or_else(serde_json::Map::new);
2093        if let Some(tool_format) = state.tool_format.as_ref() {
2094            metadata.insert(
2095                "tool_format".to_string(),
2096                serde_json::Value::String(tool_format.clone()),
2097            );
2098            metadata.insert(
2099                "tool_mode_locked".to_string(),
2100                serde_json::Value::Bool(true),
2101            );
2102        }
2103        if let Some(system_prompt) = state.system_prompt.as_ref() {
2104            metadata.insert(
2105                "system_prompt".to_string(),
2106                crate::llm::helpers::system_prompt_metadata(system_prompt),
2107            );
2108        }
2109        let vm_messages = crate::llm::helpers::json_messages_to_vm(messages);
2110        let candidate = crate::llm::helpers::new_transcript_with(
2111            Some(resolved.clone()),
2112            vm_messages,
2113            None,
2114            Some(crate::stdlib::json_to_vm_value(&serde_json::Value::Object(
2115                metadata,
2116            ))),
2117        );
2118        apply_transcript_with_budget(state, candidate, "seed_from_messages")?;
2119        state.last_accessed = Instant::now();
2120        Ok(resolved)
2121    })
2122}
2123
2124/// Load the messages vec (as JSON) for this session, for use as prefix
2125/// to an agent_loop run. Returns an empty vec if the session doesn't
2126/// exist or has no messages.
2127pub fn messages_json(id: &str) -> Vec<serde_json::Value> {
2128    SESSIONS.with(|s| {
2129        let map = s.borrow();
2130        let Some(state) = map.get(id) else {
2131            return Vec::new();
2132        };
2133        let Some(dict) = state.transcript.as_dict() else {
2134            return Vec::new();
2135        };
2136        match dict.get("messages") {
2137            Some(VmValue::List(list)) => list
2138                .iter()
2139                .map(crate::llm::helpers::vm_value_to_json)
2140                .collect(),
2141            _ => Vec::new(),
2142        }
2143    })
2144}
2145
2146#[derive(Clone, Debug, Default)]
2147pub struct SessionPromptState {
2148    pub messages: Vec<serde_json::Value>,
2149    pub summary: Option<String>,
2150}
2151
2152fn summary_message_json(summary: &str) -> serde_json::Value {
2153    serde_json::json!({
2154        "role": "user",
2155        "content": summary,
2156    })
2157}
2158
2159fn messages_begin_with_summary(messages: &[serde_json::Value], summary: &str) -> bool {
2160    messages.first().is_some_and(|message| {
2161        message.get("role").and_then(|value| value.as_str()) == Some("user")
2162            && message.get("content").and_then(|value| value.as_str()) == Some(summary)
2163    })
2164}
2165
2166/// Prompt-surface resume state for a persisted session.
2167///
2168/// Returns the compacted/rehydratable message list plus the transcript's
2169/// summary field. When the transcript carries a summary field but its
2170/// message list does not already begin with the compacted summary
2171/// message, this helper prepends one so session re-entry preserves the
2172/// same prompt surface the previous loop was actually using.
2173pub fn prompt_state_json(id: &str) -> SessionPromptState {
2174    SESSIONS.with(|s| {
2175        let map = s.borrow();
2176        let Some(state) = map.get(id) else {
2177            return SessionPromptState::default();
2178        };
2179        let Some(dict) = state.transcript.as_dict() else {
2180            return SessionPromptState::default();
2181        };
2182        let mut messages = match dict.get("messages") {
2183            Some(VmValue::List(list)) => list
2184                .iter()
2185                .map(crate::llm::helpers::vm_value_to_json)
2186                .collect::<Vec<_>>(),
2187            _ => Vec::new(),
2188        };
2189        let summary = dict.get("summary").and_then(|value| match value {
2190            VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
2191            _ => None,
2192        });
2193        if let Some(summary_text) = summary.as_deref() {
2194            if !messages_begin_with_summary(&messages, summary_text) {
2195                messages.insert(0, summary_message_json(summary_text));
2196            }
2197        }
2198        SessionPromptState { messages, summary }
2199    })
2200}
2201
2202/// Overwrite the transcript for this session. Used by `agent_loop` on
2203/// exit to persist the synthesized transcript.
2204pub fn store_transcript(id: &str, transcript: VmValue) -> Result<(), String> {
2205    SESSIONS.with(|s| {
2206        let mut map = s.borrow_mut();
2207        let Some(state) = map.get_mut(id) else {
2208            return Err(format!(
2209                "agent_session_store_transcript: unknown session id '{id}'"
2210            ));
2211        };
2212        let transcript = transcript_with_session_metadata(transcript, state);
2213        apply_transcript_with_budget(state, transcript, "store_transcript")?;
2214        state.last_accessed = Instant::now();
2215        Ok(())
2216    })
2217}
2218
2219fn checkpoint_summary(checkpoint: &SessionTurnCheckpoint) -> SessionCheckpointSummary {
2220    SessionCheckpointSummary {
2221        checkpoint_id: checkpoint.checkpoint_id.clone(),
2222        before_message_count: checkpoint.before_message_count,
2223        after_message_count: checkpoint.after_message_count,
2224        fs_snapshot_ids: checkpoint.fs_snapshot_ids.clone(),
2225    }
2226}
2227
2228fn checkpoint_error_status(error: SessionCheckpointError) -> &'static str {
2229    match error {
2230        SessionCheckpointError::UnknownSession => "unknown_session",
2231        SessionCheckpointError::NoCheckpoint => "no_checkpoint",
2232        SessionCheckpointError::NoRedo => "no_redo",
2233    }
2234}
2235
2236pub fn checkpoint_status_name(error: SessionCheckpointError) -> &'static str {
2237    checkpoint_error_status(error)
2238}
2239
2240/// Clear redo checkpoints after host-side workspace mutations that are not part
2241/// of the redo flow. Returns whether any redo state was discarded.
2242pub fn invalidate_redo(id: &str) -> bool {
2243    SESSIONS.with(|s| {
2244        let mut map = s.borrow_mut();
2245        let Some(state) = map.get_mut(id) else {
2246            return false;
2247        };
2248        let had_redo = !state.redo_stack.is_empty();
2249        state.redo_stack.clear();
2250        state.last_accessed = Instant::now();
2251        had_redo
2252    })
2253}
2254
2255/// Record a completed prompt turn boundary.
2256///
2257/// `before_transcript` must be captured immediately before the user turn
2258/// starts. The current live transcript becomes the redo target, and optional
2259/// `fs_snapshot_ids` name host-owned filesystem snapshots captured during the
2260/// turn. Harn owns the transcript stack; hosts own concrete file restoration.
2261pub fn record_completed_turn_checkpoint(
2262    id: &str,
2263    before_transcript: VmValue,
2264    fs_snapshot_ids: Vec<String>,
2265) -> Result<Option<SessionCheckpointSummary>, SessionCheckpointError> {
2266    SESSIONS.with(|s| {
2267        let mut map = s.borrow_mut();
2268        let Some(state) = map.get_mut(id) else {
2269            return Err(SessionCheckpointError::UnknownSession);
2270        };
2271        let after_transcript = transcript_with_session_metadata(state.transcript.clone(), state);
2272        let before_message_count = transcript_message_count(&before_transcript);
2273        let after_message_count = transcript_message_count(&after_transcript);
2274        if crate::values_equal(&before_transcript, &after_transcript) && fs_snapshot_ids.is_empty()
2275        {
2276            return Ok(None);
2277        }
2278        let checkpoint = SessionTurnCheckpoint {
2279            checkpoint_id: format!("turn_{}", uuid::Uuid::now_v7().simple()),
2280            completed_at: crate::orchestration::now_rfc3339(),
2281            before_message_count,
2282            after_message_count,
2283            before_transcript,
2284            after_transcript,
2285            fs_snapshot_ids,
2286        };
2287        state.redo_stack.clear();
2288        state.completed_turn_checkpoints.push(checkpoint.clone());
2289        state.last_accessed = Instant::now();
2290        Ok(Some(checkpoint_summary(&checkpoint)))
2291    })
2292}
2293
2294pub fn rollback_plan(id: &str) -> Result<SessionCheckpointSummary, SessionCheckpointError> {
2295    SESSIONS.with(|s| {
2296        let map = s.borrow();
2297        let Some(state) = map.get(id) else {
2298            return Err(SessionCheckpointError::UnknownSession);
2299        };
2300        state
2301            .completed_turn_checkpoints
2302            .last()
2303            .map(checkpoint_summary)
2304            .ok_or(SessionCheckpointError::NoCheckpoint)
2305    })
2306}
2307
2308pub fn redo_plan(id: &str) -> Result<SessionCheckpointSummary, SessionCheckpointError> {
2309    SESSIONS.with(|s| {
2310        let map = s.borrow();
2311        let Some(state) = map.get(id) else {
2312            return Err(SessionCheckpointError::UnknownSession);
2313        };
2314        state
2315            .redo_stack
2316            .last()
2317            .map(|entry| {
2318                let mut summary = checkpoint_summary(&entry.checkpoint);
2319                summary.fs_snapshot_ids = entry.redo_fs_snapshot_ids.clone();
2320                summary
2321            })
2322            .ok_or(SessionCheckpointError::NoRedo)
2323    })
2324}
2325
2326pub fn rollback_last_completed_turn(
2327    id: &str,
2328    redo_fs_snapshot_ids: Vec<String>,
2329) -> Result<SessionCheckpointOutcome, SessionCheckpointError> {
2330    SESSIONS.with(|s| {
2331        let mut map = s.borrow_mut();
2332        let Some(state) = map.get_mut(id) else {
2333            return Err(SessionCheckpointError::UnknownSession);
2334        };
2335        let Some(checkpoint) = state.completed_turn_checkpoints.pop() else {
2336            return Err(SessionCheckpointError::NoCheckpoint);
2337        };
2338        state.transcript = checkpoint.before_transcript.clone();
2339        state.redo_stack.push(SessionRedoEntry {
2340            checkpoint: checkpoint.clone(),
2341            redo_fs_snapshot_ids: redo_fs_snapshot_ids.clone(),
2342        });
2343        state.last_accessed = Instant::now();
2344        Ok(SessionCheckpointOutcome {
2345            status: "rolled_back",
2346            checkpoint: checkpoint_summary(&checkpoint),
2347            redo_fs_snapshot_ids,
2348        })
2349    })
2350}
2351
2352pub fn redo_last_rollback(id: &str) -> Result<SessionCheckpointOutcome, SessionCheckpointError> {
2353    SESSIONS.with(|s| {
2354        let mut map = s.borrow_mut();
2355        let Some(state) = map.get_mut(id) else {
2356            return Err(SessionCheckpointError::UnknownSession);
2357        };
2358        let Some(entry) = state.redo_stack.pop() else {
2359            return Err(SessionCheckpointError::NoRedo);
2360        };
2361        let checkpoint = entry.checkpoint;
2362        state.transcript = checkpoint.after_transcript.clone();
2363        state.completed_turn_checkpoints.push(checkpoint.clone());
2364        state.last_accessed = Instant::now();
2365        Ok(SessionCheckpointOutcome {
2366            status: "redone",
2367            checkpoint: checkpoint_summary(&checkpoint),
2368            redo_fs_snapshot_ids: entry.redo_fs_snapshot_ids,
2369        })
2370    })
2371}
2372
2373/// Remove malformed reminder events after their drop audit has been emitted.
2374/// Pending-reminder rendering scans the transcript on every LLM call; pruning
2375/// invalid entries makes the drop event one-shot instead of noisy per turn.
2376pub fn prune_invalid_reminder_events(id: &str) -> usize {
2377    SESSIONS.with(|s| {
2378        let mut map = s.borrow_mut();
2379        let Some(state) = map.get_mut(id) else {
2380            return 0;
2381        };
2382        let Some(dict) = state.transcript.as_dict().cloned() else {
2383            return 0;
2384        };
2385        let Some(VmValue::List(events)) = dict.get("events") else {
2386            return 0;
2387        };
2388        let mut pruned = 0_usize;
2389        let mut kept = Vec::with_capacity(events.len());
2390        for event in events.iter().cloned() {
2391            let is_reminder = event
2392                .as_dict()
2393                .and_then(|event| event.get("kind"))
2394                .map(VmValue::display)
2395                .as_deref()
2396                == Some(crate::llm::helpers::SYSTEM_REMINDER_EVENT_KIND);
2397            if !is_reminder {
2398                kept.push(event);
2399                continue;
2400            }
2401            let valid = crate::llm::helpers::reminder_from_event(&event)
2402                .is_some_and(|reminder| !reminder.body.trim().is_empty());
2403            if valid {
2404                kept.push(event);
2405            } else {
2406                pruned += 1;
2407            }
2408        }
2409        if pruned > 0 {
2410            let mut next = dict;
2411            next.insert(
2412                "events".to_string(),
2413                VmValue::List(std::sync::Arc::new(kept)),
2414            );
2415            let _ = apply_transcript_with_budget(
2416                state,
2417                VmValue::Dict(std::sync::Arc::new(next)),
2418                "prune_invalid_reminder_events",
2419            );
2420            state.last_accessed = Instant::now();
2421        }
2422        pruned
2423    })
2424}
2425
2426/// Apply the reminder TTL lifecycle that runs once per completed agent
2427/// turn. Reminders with `ttl_turns = 1` expire and are removed; larger
2428/// finite TTLs are decremented in place. Expiry audit events are emitted
2429/// to the active EventLog when one is installed.
2430pub fn apply_reminder_post_turn(id: &str, turn: i64) -> Result<serde_json::Value, String> {
2431    let report = SESSIONS.with(|s| {
2432        let mut map = s.borrow_mut();
2433        let Some(state) = map.get_mut(id) else {
2434            return Err(format!(
2435                "agent_session_apply_reminder_post_turn: unknown session id '{id}'"
2436            ));
2437        };
2438        let report = crate::llm::helpers::apply_reminder_post_turn(&state.transcript, turn);
2439        if report.decremented_count > 0 || !report.expired.is_empty() {
2440            if let Some(next) = report.transcript.clone() {
2441                apply_transcript_with_budget(state, next, "apply_reminder_post_turn")?;
2442            }
2443            state.last_accessed = Instant::now();
2444        }
2445        Ok(report)
2446    })?;
2447
2448    for reminder in &report.expired {
2449        let mut payload = crate::llm::helpers::reminder_lifecycle_payload(Some(id), reminder);
2450        if let Some(obj) = payload.as_object_mut() {
2451            obj.insert(
2452                "transcript_id".to_string(),
2453                serde_json::Value::String(id.to_string()),
2454            );
2455            obj.insert(
2456                "reason".to_string(),
2457                serde_json::Value::String("ttl".to_string()),
2458            );
2459            obj.insert(
2460                "ttl_turns_before".to_string(),
2461                serde_json::json!(&reminder.ttl_turns),
2462            );
2463            obj.insert("expired_at_turn".to_string(), serde_json::json!(turn));
2464        }
2465        crate::llm::helpers::emit_reminder_lifecycle_event(
2466            crate::llm::helpers::REMINDER_EXPIRED_EVENT_KIND,
2467            payload,
2468        );
2469    }
2470
2471    Ok(serde_json::json!({
2472        "expired_count": report.expired.len(),
2473        "decremented_count": report.decremented_count,
2474        "remaining_count": report.remaining_count,
2475    }))
2476}
2477
2478/// Inject a typed system reminder into the session transcript's event
2479/// stream. This mirrors `transcript.inject_reminder` for live sessions:
2480/// reminders with the same `dedupe_key` are replaced before the new
2481/// reminder event is appended.
2482pub fn inject_reminder(
2483    id: &str,
2484    reminder: crate::llm::helpers::SystemReminder,
2485) -> Result<ReminderInjectionReport, String> {
2486    let reminder_id = reminder.id.clone();
2487    let dedupe_key = reminder.dedupe_key.clone();
2488    let mut deduped_reminder_ids = Vec::new();
2489    SESSIONS.with(|s| {
2490        let mut map = s.borrow_mut();
2491        let Some(state) = map.get_mut(id) else {
2492            return Err(format!(
2493                "agent_session_inject_reminder: unknown session id '{id}'"
2494            ));
2495        };
2496        let dict = state
2497            .transcript
2498            .as_dict()
2499            .cloned()
2500            .unwrap_or_else(BTreeMap::new);
2501        let mut events: Vec<VmValue> = match dict.get("events") {
2502            Some(VmValue::List(list)) => list.iter().cloned().collect(),
2503            _ => dict
2504                .get("messages")
2505                .and_then(|value| match value {
2506                    VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
2507                    _ => None,
2508                })
2509                .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
2510                .unwrap_or_default(),
2511        };
2512        if let Some(expected_key) = dedupe_key.as_deref() {
2513            events.retain(|event| {
2514                let Some(existing) = crate::llm::helpers::reminder_from_event(event) else {
2515                    return true;
2516                };
2517                if existing.dedupe_key.as_deref() == Some(expected_key) {
2518                    deduped_reminder_ids.push(existing.id);
2519                    false
2520                } else {
2521                    true
2522                }
2523            });
2524        }
2525        events.push(crate::llm::helpers::transcript_reminder_event(&reminder));
2526        let mut next = dict;
2527        next.insert(
2528            "events".to_string(),
2529            VmValue::List(std::sync::Arc::new(events)),
2530        );
2531        apply_transcript_with_budget(
2532            state,
2533            VmValue::Dict(std::sync::Arc::new(next)),
2534            "inject_reminder",
2535        )?;
2536        state.last_accessed = Instant::now();
2537        Ok(())
2538    })?;
2539
2540    if !deduped_reminder_ids.is_empty() {
2541        let dropped_count = deduped_reminder_ids.len();
2542        crate::llm::helpers::emit_reminder_lifecycle_event(
2543            crate::llm::helpers::REMINDER_DEDUPED_EVENT_KIND,
2544            serde_json::json!({
2545                "session_id": id,
2546                "transcript_id": id,
2547                "reminder_id": &reminder_id,
2548                "replacing_id": &reminder_id,
2549                "replaced_id": deduped_reminder_ids.first(),
2550                "replaced_ids": &deduped_reminder_ids,
2551                "dedupe_key": &dedupe_key,
2552                "dropped_reminder_ids": &deduped_reminder_ids,
2553                "dropped_count": dropped_count,
2554            }),
2555        );
2556    }
2557
2558    crate::llm::helpers::emit_reminder_lifecycle_event(
2559        crate::llm::helpers::REMINDER_INJECTED_EVENT_KIND,
2560        crate::llm::helpers::reminder_lifecycle_payload(Some(id), &reminder),
2561    );
2562
2563    Ok(ReminderInjectionReport {
2564        reminder_id,
2565        deduped_count: deduped_reminder_ids.len(),
2566    })
2567}
2568
2569/// Append a transcript event to the session without mutating its
2570/// message list. Used for orchestration-side lineage events (sub-agent
2571/// spawn/completion, workflow hooks, etc.) that should survive
2572/// persistence/replay without being replayed back into the model as
2573/// conversational messages.
2574pub fn append_event(id: &str, event: VmValue) -> Result<(), String> {
2575    let Some(event_dict) = event.as_dict() else {
2576        return Err("agent_session_append_event: event must be a dict".into());
2577    };
2578    let kind_ok = matches!(event_dict.get("kind"), Some(VmValue::String(_)));
2579    if !kind_ok {
2580        return Err("agent_session_append_event: event must have a string `kind`".into());
2581    }
2582    SESSIONS.with(|s| {
2583        let mut map = s.borrow_mut();
2584        let Some(state) = map.get_mut(id) else {
2585            return Err(format!(
2586                "agent_session_append_event: unknown session id '{id}'"
2587            ));
2588        };
2589        append_event_to_state(state, event, "append_event")?;
2590        state.last_accessed = Instant::now();
2591        Ok(())
2592    })
2593}
2594
2595fn append_event_to_state(
2596    state: &mut SessionState,
2597    event: VmValue,
2598    action: &str,
2599) -> Result<(), String> {
2600    let dict = state
2601        .transcript
2602        .as_dict()
2603        .cloned()
2604        .unwrap_or_else(BTreeMap::new);
2605    let mut events: Vec<VmValue> = match dict.get("events") {
2606        Some(VmValue::List(list)) => list.iter().cloned().collect(),
2607        _ => dict
2608            .get("messages")
2609            .and_then(|value| match value {
2610                VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
2611                _ => None,
2612            })
2613            .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
2614            .unwrap_or_default(),
2615    };
2616    events.push(event);
2617    let mut next = dict;
2618    next.insert(
2619        "events".to_string(),
2620        VmValue::List(std::sync::Arc::new(events)),
2621    );
2622    apply_transcript_with_budget(state, VmValue::Dict(std::sync::Arc::new(next)), action)
2623}
2624
2625/// Replace the transcript's message list wholesale. Used by the
2626/// in-loop compaction path, which operates on JSON messages.
2627pub fn replace_messages(id: &str, messages: &[serde_json::Value]) -> Result<(), String> {
2628    replace_messages_with_summary(id, messages, None)
2629}
2630
2631/// Replace the transcript's message list and optionally update the
2632/// `summary` field on the persisted transcript. The compaction path
2633/// uses this to publish the human-readable rollup line that
2634/// `transcript_summary(transcript)` exposes to host code.
2635pub fn replace_messages_with_summary(
2636    id: &str,
2637    messages: &[serde_json::Value],
2638    summary: Option<&str>,
2639) -> Result<(), String> {
2640    SESSIONS.with(|s| {
2641        let mut map = s.borrow_mut();
2642        let Some(state) = map.get_mut(id) else {
2643            return Err(format!(
2644                "agent_session_replace_messages: unknown session id '{id}'"
2645            ));
2646        };
2647        let dict = state
2648            .transcript
2649            .as_dict()
2650            .cloned()
2651            .unwrap_or_else(BTreeMap::new);
2652        let vm_messages: Vec<VmValue> = messages
2653            .iter()
2654            .map(crate::stdlib::json_to_vm_value)
2655            .collect();
2656        let mut next = dict;
2657        next.insert(
2658            "events".to_string(),
2659            VmValue::List(std::sync::Arc::new(
2660                crate::llm::helpers::transcript_events_from_messages(&vm_messages),
2661            )),
2662        );
2663        next.insert(
2664            "messages".to_string(),
2665            VmValue::List(std::sync::Arc::new(vm_messages)),
2666        );
2667        if let Some(summary) = summary {
2668            next.insert(
2669                "summary".to_string(),
2670                VmValue::String(std::sync::Arc::from(summary.to_string())),
2671            );
2672        } else {
2673            next.remove("summary");
2674        }
2675        apply_transcript_with_budget(
2676            state,
2677            VmValue::Dict(std::sync::Arc::new(next)),
2678            "replace_messages",
2679        )?;
2680        state.last_accessed = Instant::now();
2681        Ok(())
2682    })
2683}
2684
2685pub fn append_subscriber(id: &str, callback: VmValue) {
2686    open_or_create(Some(id.to_string()));
2687    SESSIONS.with(|s| {
2688        if let Some(state) = s.borrow_mut().get_mut(id) {
2689            state.subscribers.push(callback);
2690            state.last_accessed = Instant::now();
2691        }
2692    });
2693}
2694
2695pub fn subscribers_for(id: &str) -> Vec<VmValue> {
2696    SESSIONS.with(|s| {
2697        s.borrow()
2698            .get(id)
2699            .map(|state| state.subscribers.clone())
2700            .unwrap_or_default()
2701    })
2702}
2703
2704pub fn subscriber_count(id: &str) -> usize {
2705    SESSIONS.with(|s| {
2706        s.borrow()
2707            .get(id)
2708            .map(|state| state.subscribers.len())
2709            .unwrap_or(0)
2710    })
2711}
2712
2713/// Persist the set of active skill names for session resume. Called at
2714/// the end of an agent_loop run; the next `open_or_create` for this id
2715/// reads them back via [`active_skills`].
2716pub fn set_active_skills(id: &str, skills: Vec<String>) {
2717    SESSIONS.with(|s| {
2718        if let Some(state) = s.borrow_mut().get_mut(id) {
2719            state.active_skills = skills;
2720            state.last_accessed = Instant::now();
2721        }
2722    });
2723}
2724
2725/// Skills that were active at the end of the previous agent_loop run
2726/// against this session. Returns an empty vec when the session doesn't
2727/// exist or nothing was persisted.
2728pub fn active_skills(id: &str) -> Vec<String> {
2729    SESSIONS.with(|s| {
2730        s.borrow()
2731            .get(id)
2732            .map(|state| state.active_skills.clone())
2733            .unwrap_or_default()
2734    })
2735}
2736
2737/// Claim the tool-calling contract for a session.
2738///
2739/// The first loop against a named session records its `tool_format`.
2740/// Later re-entry must use the same format so prompt/history generated
2741/// under a text contract is never replayed as native, or vice versa.
2742pub fn claim_tool_format(id: &str, tool_format: &str) -> Result<(), String> {
2743    let tool_format = tool_format.trim();
2744    if tool_format.is_empty() {
2745        return Ok(());
2746    }
2747    SESSIONS.with(|s| {
2748        let mut map = s.borrow_mut();
2749        let Some(state) = map.get_mut(id) else {
2750            return Err(format!("agent session '{id}' does not exist"));
2751        };
2752        match state.tool_format.as_deref() {
2753            Some(existing) if existing != tool_format => Err(format!(
2754                "agent session '{id}' is locked to tool_format='{existing}', but this run requested tool_format='{tool_format}'. Start a new session or fork/reset the transcript before changing tool mode."
2755            )),
2756            Some(_) => {
2757                state.last_accessed = Instant::now();
2758                Ok(())
2759            }
2760            None => {
2761                state.tool_format = Some(tool_format.to_string());
2762                state.last_accessed = Instant::now();
2763                Ok(())
2764            }
2765        }
2766    })
2767}
2768
2769pub fn tool_format(id: &str) -> Option<String> {
2770    SESSIONS.with(|s| {
2771        s.borrow()
2772            .get(id)
2773            .and_then(|state| state.tool_format.clone())
2774    })
2775}
2776
2777pub fn record_system_prompt(id: &str, system_prompt: &str) -> Result<(), String> {
2778    let system_prompt = system_prompt.trim();
2779    if system_prompt.is_empty() {
2780        return Ok(());
2781    }
2782    assert_cache_stable_system_prompt(system_prompt);
2783    SESSIONS.with(|s| {
2784        let mut map = s.borrow_mut();
2785        let Some(state) = map.get_mut(id) else {
2786            return Err(format!("agent session '{id}' does not exist"));
2787        };
2788        let changed = state.system_prompt.as_deref() != Some(system_prompt);
2789        state.system_prompt = Some(system_prompt.to_string());
2790        let dict = state
2791            .transcript
2792            .as_dict()
2793            .cloned()
2794            .unwrap_or_else(BTreeMap::new);
2795        let mut next = dict;
2796        apply_system_prompt_metadata(&mut next, system_prompt);
2797        if changed {
2798            let mut events: Vec<VmValue> = match next.get("events") {
2799                Some(VmValue::List(list)) => list.iter().cloned().collect(),
2800                _ => Vec::new(),
2801            };
2802            events.push(crate::llm::helpers::transcript_event(
2803                "system_prompt",
2804                "system",
2805                "internal",
2806                "",
2807                Some(crate::llm::helpers::system_prompt_event_metadata(
2808                    system_prompt,
2809                )),
2810            ));
2811            next.insert(
2812                "events".to_string(),
2813                VmValue::List(std::sync::Arc::new(events)),
2814            );
2815        }
2816        apply_transcript_with_budget(
2817            state,
2818            VmValue::Dict(std::sync::Arc::new(next)),
2819            "record_system_prompt",
2820        )?;
2821        state.last_accessed = Instant::now();
2822        Ok(())
2823    })
2824}
2825
2826pub fn system_prompt(id: &str) -> Option<String> {
2827    SESSIONS.with(|s| {
2828        s.borrow()
2829            .get(id)
2830            .and_then(|state| state.system_prompt.clone())
2831    })
2832}
2833
2834#[cfg(debug_assertions)]
2835fn forbidden_workspace_prompt_token(system_prompt: &str) -> Option<&'static str> {
2836    let mut remaining = system_prompt;
2837    while let Some(index) = remaining.find("{{") {
2838        let candidate = remaining[index + 2..].trim_start();
2839        if candidate.starts_with("workspace_") {
2840            return Some("workspace_");
2841        }
2842        if candidate.starts_with("project_") {
2843            return Some("project_");
2844        }
2845        remaining = candidate;
2846    }
2847    None
2848}
2849
2850#[cfg(debug_assertions)]
2851fn assert_cache_stable_system_prompt(system_prompt: &str) {
2852    if let Some(prefix) = forbidden_workspace_prompt_token(system_prompt) {
2853        panic!(
2854            "{CACHE_STABLE_SYSTEM_PROMPT_DIAGNOSTIC}: session system prompts must not interpolate `{{{{{prefix}...` tokens; move workspace/project context into the workspace-anchor reminder"
2855        );
2856    }
2857}
2858
2859#[cfg(not(debug_assertions))]
2860fn assert_cache_stable_system_prompt(_system_prompt: &str) {}
2861
2862/// Pin (or clear, with `None`) a model selector on a session. Returns
2863/// `Ok(true)` when the value actually changed so callers can decide
2864/// whether to broadcast a notification. The selector is stored verbatim
2865/// — alias / catalog resolution is the call-site's job.
2866pub fn set_pinned_model(id: &str, model: Option<String>) -> Result<bool, String> {
2867    let normalized = model
2868        .map(|value| value.trim().to_string())
2869        .filter(|value| !value.is_empty());
2870    SESSIONS.with(|s| {
2871        let mut map = s.borrow_mut();
2872        let Some(state) = map.get_mut(id) else {
2873            return Err(format!("agent session '{id}' does not exist"));
2874        };
2875        let changed = state.pinned_model != normalized;
2876        state.pinned_model = normalized;
2877        state.last_accessed = Instant::now();
2878        Ok(changed)
2879    })
2880}
2881
2882/// Read the session's pinned model selector, if any. Consumed by
2883/// `vm_resolve_model` as the per-session default when a script-level
2884/// `llm_call` does not pass `model:` explicitly.
2885pub fn pinned_model(id: &str) -> Option<String> {
2886    SESSIONS.with(|s| {
2887        s.borrow()
2888            .get(id)
2889            .and_then(|state| state.pinned_model.clone())
2890    })
2891}
2892
2893/// Pin (or clear) the session-level provider-aware reasoning policy.
2894pub fn set_pinned_reasoning_policy(id: &str, policy: Option<String>) -> Result<bool, String> {
2895    let normalized = match policy {
2896        Some(value) => crate::llm::reasoning_policy::normalize_policy_selector(&value)?,
2897        None => None,
2898    };
2899    SESSIONS.with(|s| {
2900        let mut map = s.borrow_mut();
2901        let Some(state) = map.get_mut(id) else {
2902            return Err(format!("agent session '{id}' does not exist"));
2903        };
2904        let changed = state.pinned_reasoning_policy != normalized;
2905        state.pinned_reasoning_policy = normalized;
2906        state.last_accessed = Instant::now();
2907        Ok(changed)
2908    })
2909}
2910
2911/// Read the session's pinned reasoning policy, if any.
2912pub fn pinned_reasoning_policy(id: &str) -> Option<String> {
2913    SESSIONS.with(|s| {
2914        s.borrow()
2915            .get(id)
2916            .and_then(|state| state.pinned_reasoning_policy.clone())
2917    })
2918}
2919
2920/// Set (or clear, with `None`) the typed workspace anchor on a session.
2921/// Returns `Ok(true)` when the value actually changed so callers can
2922/// decide whether to broadcast `AnchorChanged` notifications.
2923pub fn set_workspace_anchor(id: &str, anchor: Option<WorkspaceAnchor>) -> Result<bool, String> {
2924    SESSIONS.with(|s| {
2925        let mut map = s.borrow_mut();
2926        let Some(state) = map.get_mut(id) else {
2927            return Err(format!("agent session '{id}' does not exist"));
2928        };
2929        let changed = state.workspace_anchor != anchor;
2930        state.workspace_anchor = anchor;
2931        if changed {
2932            state.redo_stack.clear();
2933            crate::llm::permissions::clear_session_grants(id);
2934        }
2935        state.last_accessed = Instant::now();
2936        Ok(changed)
2937    })
2938}
2939
2940/// Read the session's typed workspace anchor, if any.
2941pub fn workspace_anchor(id: &str) -> Option<WorkspaceAnchor> {
2942    SESSIONS.with(|s| {
2943        s.borrow()
2944            .get(id)
2945            .and_then(|state| state.workspace_anchor.clone())
2946    })
2947}
2948
2949/// Outcome of `reanchor_session`: previous + new anchor and whether the
2950/// swap actually moved anything. Callers use `changed` to suppress
2951/// no-op transcript / live events.
2952#[derive(Clone, Debug, PartialEq, Eq)]
2953pub struct ReanchorOutcome {
2954    pub previous: Option<WorkspaceAnchor>,
2955    pub current: WorkspaceAnchor,
2956    pub changed: bool,
2957}
2958
2959/// Atomically swap the session's primary anchor + emit the canonical
2960/// `AnchorChanged` transcript event and live `AgentEvent::AnchorChanged`
2961/// notification (#2218). Clears session-scoped permission grants so
2962/// stale anchor-based decisions don't leak into the next turn.
2963pub fn reanchor_session(
2964    id: &str,
2965    new_anchor: WorkspaceAnchor,
2966    carry_transcript: bool,
2967    compacted: bool,
2968    reason: Option<String>,
2969) -> Result<ReanchorOutcome, String> {
2970    let outcome = SESSIONS.with(|s| {
2971        let mut map = s.borrow_mut();
2972        let Some(state) = map.get_mut(id) else {
2973            return Err(format!("agent session '{id}' does not exist"));
2974        };
2975        let previous = state.workspace_anchor.clone();
2976        let changed = previous.as_ref() != Some(&new_anchor);
2977        state.workspace_anchor = Some(new_anchor.clone());
2978        if changed {
2979            crate::llm::permissions::clear_session_grants(id);
2980        }
2981        state.last_accessed = Instant::now();
2982        Ok(ReanchorOutcome {
2983            previous,
2984            current: new_anchor,
2985            changed,
2986        })
2987    })?;
2988    if !outcome.changed {
2989        return Ok(outcome);
2990    }
2991    let previous_json = outcome.previous.as_ref().map(WorkspaceAnchor::to_json);
2992    let current_json = outcome.current.to_json();
2993    let event_metadata = serde_json::json!({
2994        "previous": previous_json,
2995        "current": current_json,
2996        "carry_transcript": carry_transcript,
2997        "compacted": compacted,
2998        "reason": reason,
2999    });
3000    let event = crate::llm::helpers::transcript_event(
3001        "AnchorChanged",
3002        "system",
3003        "internal",
3004        "",
3005        Some(event_metadata),
3006    );
3007    let _ = append_event(id, event);
3008    crate::llm::emit_live_agent_event_sync(&crate::agent_events::AgentEvent::AnchorChanged {
3009        session_id: id.to_string(),
3010        previous: previous_json,
3011        current: current_json,
3012        carry_transcript,
3013        compacted,
3014        reason,
3015    });
3016    Ok(outcome)
3017}
3018
3019/// Set session-local workspace defaults. Returns `Ok(true)` when the
3020/// policy changed.
3021pub fn set_workspace_policy(id: &str, policy: WorkspacePolicy) -> Result<bool, String> {
3022    SESSIONS.with(|s| {
3023        let mut map = s.borrow_mut();
3024        let Some(state) = map.get_mut(id) else {
3025            return Err(format!("agent session '{id}' does not exist"));
3026        };
3027        let changed = state.workspace_policy != policy;
3028        state.workspace_policy = policy;
3029        if changed {
3030            state.redo_stack.clear();
3031        }
3032        state.last_accessed = Instant::now();
3033        Ok(changed)
3034    })
3035}
3036
3037/// Read the session's workspace policy, if the session exists.
3038pub fn workspace_policy(id: &str) -> Option<WorkspacePolicy> {
3039    SESSIONS.with(|s| {
3040        s.borrow()
3041            .get(id)
3042            .map(|state| state.workspace_policy.clone())
3043    })
3044}
3045
3046/// Validate and mount an additional workspace root on an anchored
3047/// session. When the path is already mounted, updates its mount mode
3048/// in place and refreshes its `mounted_at` timestamp.
3049pub fn add_workspace_root(
3050    id: &str,
3051    root: &str,
3052    mount_mode: Option<MountMode>,
3053    reason: Option<String>,
3054) -> Result<String, String> {
3055    let normalized_root = validate_workspace_root_path(root)?;
3056    let mounted_at = crate::orchestration::now_rfc3339();
3057    SESSIONS.with(|s| {
3058        let mut map = s.borrow_mut();
3059        let Some(state) = map.get_mut(id) else {
3060            return Err(format!("agent session '{id}' does not exist"));
3061        };
3062        let default_mount_mode = state.workspace_policy.default_mount_mode;
3063        let Some(anchor) = state.workspace_anchor.as_mut() else {
3064            return Err(format!("agent session '{id}' has no workspace anchor"));
3065        };
3066        let resolved_mount_mode = mount_mode.unwrap_or(default_mount_mode);
3067        if let Some(existing) = anchor
3068            .additional_roots
3069            .iter_mut()
3070            .find(|entry| entry.path == normalized_root)
3071        {
3072            let changed =
3073                existing.mount_mode != resolved_mount_mode || existing.mounted_at != mounted_at;
3074            existing.mount_mode = resolved_mount_mode;
3075            existing.mounted_at = mounted_at.clone();
3076            if changed {
3077                state.redo_stack.clear();
3078            }
3079        } else {
3080            anchor.additional_roots.push(MountedRoot {
3081                path: normalized_root.clone(),
3082                mount_mode: resolved_mount_mode,
3083                mounted_at: mounted_at.clone(),
3084            });
3085            state.redo_stack.clear();
3086        }
3087        let event = crate::llm::helpers::transcript_event(
3088            "RootMounted",
3089            "system",
3090            "internal",
3091            "",
3092            Some(serde_json::json!({
3093                "path": normalized_root.to_string_lossy(),
3094                "mount_mode": resolved_mount_mode.as_str(),
3095                "mounted_at": mounted_at.clone(),
3096                "reason": reason,
3097            })),
3098        );
3099        append_event_to_state(state, event, "add_workspace_root")?;
3100        crate::llm::permissions::clear_session_grants(id);
3101        state.last_accessed = Instant::now();
3102        Ok(mounted_at.clone())
3103    })
3104}
3105
3106/// Remove one mounted root from an anchored session. Returns whether an
3107/// existing mount entry was deleted. Removing an absent root is a no-op.
3108pub fn remove_workspace_root(id: &str, root: &str) -> Result<bool, String> {
3109    let normalized_root = normalize_workspace_root_path(root);
3110    SESSIONS.with(|s| {
3111        let mut map = s.borrow_mut();
3112        let Some(state) = map.get_mut(id) else {
3113            return Err(format!("agent session '{id}' does not exist"));
3114        };
3115        let Some(anchor) = state.workspace_anchor.as_mut() else {
3116            return Err(format!("agent session '{id}' has no workspace anchor"));
3117        };
3118        let before = anchor.additional_roots.len();
3119        anchor
3120            .additional_roots
3121            .retain(|entry| entry.path != normalized_root);
3122        let removed = anchor.additional_roots.len() != before;
3123        if removed {
3124            state.redo_stack.clear();
3125            crate::llm::permissions::clear_session_grants(id);
3126        }
3127        state.last_accessed = Instant::now();
3128        Ok(removed)
3129    })
3130}
3131
3132pub fn list_workspace_roots(id: &str) -> Result<(PathBuf, Vec<MountedRoot>), String> {
3133    SESSIONS.with(|s| {
3134        let map = s.borrow();
3135        let Some(state) = map.get(id) else {
3136            return Err(format!("agent session '{id}' does not exist"));
3137        };
3138        let Some(anchor) = state.workspace_anchor.as_ref() else {
3139            return Err(format!("agent session '{id}' has no workspace anchor"));
3140        };
3141        Ok((anchor.primary.clone(), anchor.additional_roots.clone()))
3142    })
3143}
3144
3145fn validate_workspace_root_path(root: &str) -> Result<PathBuf, String> {
3146    let normalized = normalize_workspace_root_path(root);
3147    let canonical = std::fs::canonicalize(&normalized)
3148        .map_err(|error| format!("workspace root '{root}' must exist and be readable: {error}"))?;
3149    let metadata = std::fs::metadata(&canonical)
3150        .map_err(|error| format!("workspace root '{root}' must exist and be readable: {error}"))?;
3151    if !metadata.is_dir() {
3152        return Err(format!("workspace root '{root}' must be a directory"));
3153    }
3154    std::fs::read_dir(&canonical)
3155        .map_err(|error| format!("workspace root '{root}' must be readable: {error}"))?;
3156    Ok(canonical)
3157}
3158
3159fn normalize_workspace_root_path(root: &str) -> PathBuf {
3160    let absolute = crate::stdlib::process::normalize_context_path(Path::new(root));
3161    std::fs::canonicalize(&absolute).unwrap_or(absolute)
3162}
3163
3164fn empty_transcript(id: &str) -> VmValue {
3165    use crate::llm::helpers::new_transcript_with;
3166    new_transcript_with(Some(id.to_string()), Vec::new(), None, None)
3167}
3168
3169fn clone_transcript_with_id(transcript: &VmValue, new_id: &str) -> VmValue {
3170    let Some(dict) = transcript.as_dict() else {
3171        return empty_transcript(new_id);
3172    };
3173    let mut next = dict.clone();
3174    next.insert(
3175        "id".to_string(),
3176        VmValue::String(std::sync::Arc::from(new_id.to_string())),
3177    );
3178    VmValue::Dict(std::sync::Arc::new(next))
3179}
3180
3181fn clone_transcript_with_parent(transcript: &VmValue, parent_id: &str) -> VmValue {
3182    let Some(dict) = transcript.as_dict() else {
3183        return transcript.clone();
3184    };
3185    let mut next = dict.clone();
3186    let metadata = match next.get("metadata") {
3187        Some(VmValue::Dict(metadata)) => {
3188            let mut metadata = metadata.as_ref().clone();
3189            metadata.insert(
3190                "parent_session_id".to_string(),
3191                VmValue::String(std::sync::Arc::from(parent_id.to_string())),
3192            );
3193            VmValue::Dict(std::sync::Arc::new(metadata))
3194        }
3195        _ => VmValue::Dict(std::sync::Arc::new(BTreeMap::from([(
3196            "parent_session_id".to_string(),
3197            VmValue::String(std::sync::Arc::from(parent_id.to_string())),
3198        )]))),
3199    };
3200    next.insert("metadata".to_string(), metadata);
3201    VmValue::Dict(std::sync::Arc::new(next))
3202}
3203
3204fn apply_system_prompt_metadata(next: &mut BTreeMap<String, VmValue>, system_prompt: &str) {
3205    let mut metadata = match next.get("metadata") {
3206        Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
3207        _ => BTreeMap::new(),
3208    };
3209    metadata.insert(
3210        "system_prompt".to_string(),
3211        crate::stdlib::json_to_vm_value(&crate::llm::helpers::system_prompt_metadata(
3212            system_prompt,
3213        )),
3214    );
3215    next.insert(
3216        "metadata".to_string(),
3217        VmValue::Dict(std::sync::Arc::new(metadata)),
3218    );
3219}
3220
3221fn transcript_with_session_metadata(transcript: VmValue, state: &SessionState) -> VmValue {
3222    let Some(dict) = transcript.as_dict() else {
3223        return transcript;
3224    };
3225    let mut next = dict.clone();
3226    let mut metadata = match next.get("metadata") {
3227        Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
3228        _ => BTreeMap::new(),
3229    };
3230    if let Some(tool_format) = state.tool_format.as_ref() {
3231        metadata.insert(
3232            "tool_format".to_string(),
3233            VmValue::String(std::sync::Arc::from(tool_format.clone())),
3234        );
3235        metadata.insert("tool_mode_locked".to_string(), VmValue::Bool(true));
3236    }
3237    if let Some(system_prompt) = state.system_prompt.as_ref() {
3238        metadata.insert(
3239            "system_prompt".to_string(),
3240            crate::stdlib::json_to_vm_value(&crate::llm::helpers::system_prompt_metadata(
3241                system_prompt,
3242            )),
3243        );
3244    }
3245    if let Some(anchor) = state.workspace_anchor.as_ref() {
3246        metadata.insert(
3247            WORKSPACE_ANCHOR_METADATA_KEY.to_string(),
3248            anchor.to_vm_value(),
3249        );
3250    } else {
3251        metadata.remove(WORKSPACE_ANCHOR_METADATA_KEY);
3252    }
3253    if let Some(scratchpad) = state.scratchpad.as_ref() {
3254        metadata.insert("agent_scratchpad".to_string(), scratchpad.clone());
3255        metadata.insert(
3256            "agent_scratchpad_version".to_string(),
3257            VmValue::Int(state.scratchpad_version as i64),
3258        );
3259    } else {
3260        metadata.remove("agent_scratchpad");
3261        metadata.remove("agent_scratchpad_version");
3262    }
3263    if let Some(last_action) = state.last_transcript_budget_action.as_ref() {
3264        let usage = transcript_usage(
3265            &VmValue::Dict(std::sync::Arc::new(next.clone())),
3266            state.transcript_budget_policy.max_approx_bytes.is_some(),
3267        );
3268        metadata.insert(
3269            "transcript_budget".to_string(),
3270            crate::stdlib::json_to_vm_value(&serde_json::json!({
3271                "policy": transcript_budget_policy_json(&state.transcript_budget_policy.normalized()),
3272                "usage": transcript_budget_usage_json(&usage),
3273                "last_action": last_action,
3274            })),
3275        );
3276    }
3277    if !metadata.is_empty() {
3278        next.insert(
3279            "metadata".to_string(),
3280            VmValue::Dict(std::sync::Arc::new(metadata)),
3281        );
3282    }
3283    VmValue::Dict(std::sync::Arc::new(next))
3284}
3285
3286fn session_snapshot(state: &SessionState) -> VmValue {
3287    let transcript = transcript_with_session_metadata(state.transcript.clone(), state);
3288    let Some(dict) = transcript.as_dict() else {
3289        return state.transcript.clone();
3290    };
3291    let mut next = dict.clone();
3292    let length = next
3293        .get("messages")
3294        .and_then(|value| match value {
3295            VmValue::List(list) => Some(list.len() as i64),
3296            _ => None,
3297        })
3298        .unwrap_or(0);
3299    next.insert("length".to_string(), VmValue::Int(length));
3300    next.insert(
3301        "created_at".to_string(),
3302        VmValue::String(std::sync::Arc::from(state.created_at.clone())),
3303    );
3304    next.insert(
3305        "parent_id".to_string(),
3306        state
3307            .parent_id
3308            .as_ref()
3309            .map(|id| VmValue::String(std::sync::Arc::from(id.clone())))
3310            .unwrap_or(VmValue::Nil),
3311    );
3312    next.insert(
3313        "child_ids".to_string(),
3314        VmValue::List(std::sync::Arc::new(
3315            state
3316                .child_ids
3317                .iter()
3318                .cloned()
3319                .map(|id| VmValue::String(std::sync::Arc::from(id)))
3320                .collect(),
3321        )),
3322    );
3323    next.insert(
3324        "branched_at_event_index".to_string(),
3325        state
3326            .branched_at_event_index
3327            .map(|index| VmValue::Int(index as i64))
3328            .unwrap_or(VmValue::Nil),
3329    );
3330    next.insert(
3331        "system_prompt".to_string(),
3332        state
3333            .system_prompt
3334            .as_ref()
3335            .map(|prompt| VmValue::String(std::sync::Arc::from(prompt.clone())))
3336            .unwrap_or(VmValue::Nil),
3337    );
3338    next.insert(
3339        "tool_format".to_string(),
3340        state
3341            .tool_format
3342            .as_ref()
3343            .map(|format| VmValue::String(std::sync::Arc::from(format.clone())))
3344            .unwrap_or(VmValue::Nil),
3345    );
3346    next.insert(
3347        "pinned_model".to_string(),
3348        state
3349            .pinned_model
3350            .as_ref()
3351            .map(|model| VmValue::String(std::sync::Arc::from(model.clone())))
3352            .unwrap_or(VmValue::Nil),
3353    );
3354    next.insert(
3355        "pinned_reasoning_policy".to_string(),
3356        state
3357            .pinned_reasoning_policy
3358            .as_ref()
3359            .map(|policy| VmValue::String(std::sync::Arc::from(policy.clone())))
3360            .unwrap_or(VmValue::Nil),
3361    );
3362    next.insert(
3363        "scratchpad".to_string(),
3364        state.scratchpad.clone().unwrap_or(VmValue::Nil),
3365    );
3366    next.insert(
3367        "scratchpad_version".to_string(),
3368        VmValue::Int(state.scratchpad_version as i64),
3369    );
3370    next.insert(
3371        "workspace_anchor".to_string(),
3372        state
3373            .workspace_anchor
3374            .as_ref()
3375            .map(WorkspaceAnchor::to_vm_value)
3376            .unwrap_or(VmValue::Nil),
3377    );
3378    next.insert(
3379        "workspace_policy".to_string(),
3380        state.workspace_policy.to_vm_value(),
3381    );
3382    next.insert(
3383        "live_clients".to_string(),
3384        crate::stdlib::json_to_vm_value(&serde_json::Value::Array(
3385            state.live_clients.values().map(live_client_json).collect(),
3386        )),
3387    );
3388    next.insert(
3389        "live_controller_id".to_string(),
3390        state
3391            .live_controller_id
3392            .as_ref()
3393            .map(|id| VmValue::String(std::sync::Arc::from(id.clone())))
3394            .unwrap_or(VmValue::Nil),
3395    );
3396    next.insert(
3397        "completed_turn_checkpoint_count".to_string(),
3398        VmValue::Int(state.completed_turn_checkpoints.len() as i64),
3399    );
3400    next.insert(
3401        "redo_checkpoint_count".to_string(),
3402        VmValue::Int(state.redo_stack.len() as i64),
3403    );
3404    VmValue::Dict(std::sync::Arc::new(next))
3405}
3406
3407fn update_lineage(
3408    map: &mut HashMap<String, SessionState>,
3409    parent_id: &str,
3410    child_id: &str,
3411    branched_at_event_index: Option<usize>,
3412) {
3413    let old_parent_id = map.get(child_id).and_then(|child| child.parent_id.clone());
3414    if let Some(old_parent_id) = old_parent_id.filter(|old_parent_id| old_parent_id != parent_id) {
3415        if let Some(old_parent) = map.get_mut(&old_parent_id) {
3416            old_parent.child_ids.retain(|id| id != child_id);
3417            old_parent.last_accessed = Instant::now();
3418        }
3419    }
3420    if let Some(parent) = map.get_mut(parent_id) {
3421        parent.last_accessed = Instant::now();
3422        if !parent.child_ids.iter().any(|id| id == child_id) {
3423            parent.child_ids.push(child_id.to_string());
3424        }
3425    }
3426    if let Some(child) = map.get_mut(child_id) {
3427        child.last_accessed = Instant::now();
3428        child.parent_id = Some(parent_id.to_string());
3429        child.branched_at_event_index = branched_at_event_index;
3430        child.transcript = clone_transcript_with_parent(&child.transcript, parent_id);
3431    }
3432}
3433
3434fn branch_event_index(transcript: &VmValue, keep_first: usize) -> usize {
3435    if keep_first == 0 {
3436        return 0;
3437    }
3438    let Some(dict) = transcript.as_dict() else {
3439        return keep_first;
3440    };
3441    let Some(VmValue::List(events)) = dict.get("events") else {
3442        return keep_first;
3443    };
3444    event_prefix_len_for_messages(events, keep_first)
3445}
3446
3447fn event_kind(event: &VmValue) -> Option<String> {
3448    event
3449        .as_dict()
3450        .and_then(|dict| dict.get("kind"))
3451        .map(VmValue::display)
3452}
3453
3454fn event_id(event: &VmValue) -> Option<String> {
3455    event
3456        .as_dict()
3457        .and_then(|dict| dict.get("id"))
3458        .map(VmValue::display)
3459}
3460
3461fn is_turn_event(event: &VmValue) -> bool {
3462    matches!(
3463        event_kind(event).as_deref(),
3464        Some("message" | "tool_result")
3465    )
3466}
3467
3468fn event_prefix_len_for_messages(events: &[VmValue], keep_first: usize) -> usize {
3469    if keep_first == 0 {
3470        return 0;
3471    }
3472    let mut retained_messages = 0usize;
3473    for (index, event) in events.iter().enumerate() {
3474        if is_turn_event(event) {
3475            retained_messages += 1;
3476            if retained_messages == keep_first {
3477                return index + 1;
3478            }
3479        }
3480    }
3481    events.len()
3482}
3483
3484fn turn_event_id_for_count(events: &[VmValue], keep_first: usize) -> Option<String> {
3485    if keep_first == 0 {
3486        return None;
3487    }
3488    let mut retained_messages = 0usize;
3489    for event in events {
3490        if is_turn_event(event) {
3491            retained_messages += 1;
3492            if retained_messages == keep_first {
3493                return event_id(event);
3494            }
3495        }
3496    }
3497    None
3498}
3499
3500#[cfg(test)]
3501#[path = "agent_sessions_tests.rs"]
3502mod tests;