Skip to main content

harn_vm/
agent_sessions.rs

1//! First-class session storage.
2//!
3//! A session owns three things:
4//!
5//! 1. A transcript dict (messages, events, summary, metadata, …).
6//! 2. Closure subscribers that fire on agent-loop events for this session.
7//! 3. Its own lifecycle (open, reset, fork, trim, compact, close).
8//!
9//! Storage is thread-local because `VmValue` contains `Rc`, which is
10//! neither `Send` nor `Sync`. The agent loop runs on a tokio
11//! current-thread worker, so all session reads and writes happen on the
12//! same thread. The closure-subscribers register, fire, and unregister
13//! on that same thread.
14//!
15//! Lifecycle is explicit. Builtins (`agent_session_open`,
16//! `_reset`, `_fork`, `_fork_at`, `_close`, `_trim`, `_compact`,
17//! `_inject`, `_exists`, `_length`, `_snapshot`, `_ancestry`) drive
18//! the store directly — there is no "policy" config dict that
19//! performs lifecycle as a side effect.
20
21use std::cell::{Cell, RefCell};
22use std::collections::{BTreeMap, HashMap, HashSet};
23use std::rc::Rc;
24use std::time::Instant;
25
26use crate::value::VmValue;
27
28/// Default cap on concurrent sessions per VM thread. Beyond this the
29/// least-recently-accessed session is evicted on the next `open`.
30pub const DEFAULT_SESSION_CAP: usize = 128;
31
32pub struct SessionState {
33    pub id: String,
34    pub transcript: VmValue,
35    pub subscribers: Vec<VmValue>,
36    pub created_at: Instant,
37    pub last_accessed: Instant,
38    pub parent_id: Option<String>,
39    pub child_ids: Vec<String>,
40    pub branched_at_event_index: Option<usize>,
41    /// Names of skills that were active at the end of the most recent
42    /// `agent_loop` run on this session. Empty when no skills were
43    /// matched, when the skill system wasn't used, or when the
44    /// deactivation phase cleared them. Re-entering the session
45    /// restores these as the initial active set before matching runs.
46    pub active_skills: Vec<String>,
47    /// Tool-calling protocol claimed by the first agent loop that uses
48    /// this session. A transcript is only replayable under the same
49    /// contract that produced its prompt/history.
50    pub tool_format: Option<String>,
51}
52
53impl SessionState {
54    fn new(id: String) -> Self {
55        let now = Instant::now();
56        let transcript = empty_transcript(&id);
57        Self {
58            id,
59            transcript,
60            subscribers: Vec::new(),
61            created_at: now,
62            last_accessed: now,
63            parent_id: None,
64            child_ids: Vec::new(),
65            branched_at_event_index: None,
66            active_skills: Vec::new(),
67            tool_format: None,
68        }
69    }
70}
71
72#[derive(Clone, Debug, PartialEq, Eq)]
73pub struct SessionAncestry {
74    pub parent_id: Option<String>,
75    pub child_ids: Vec<String>,
76    pub root_id: String,
77}
78
79thread_local! {
80    static SESSIONS: RefCell<HashMap<String, SessionState>> = RefCell::new(HashMap::new());
81    static SESSION_CAP: Cell<usize> = const { Cell::new(DEFAULT_SESSION_CAP) };
82    static CURRENT_SESSION_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
83}
84
85pub struct CurrentSessionGuard {
86    active: bool,
87}
88
89impl Drop for CurrentSessionGuard {
90    fn drop(&mut self) {
91        if self.active {
92            pop_current_session();
93        }
94    }
95}
96
97/// Set the per-thread session cap. Primarily for tests; production VMs
98/// inherit the default.
99pub fn set_session_cap(cap: usize) {
100    SESSION_CAP.with(|c| c.set(cap.max(1)));
101}
102
103pub fn session_cap() -> usize {
104    SESSION_CAP.with(|c| c.get())
105}
106
107/// Clear the session store. Wired into `reset_llm_state` for test isolation.
108pub fn reset_session_store() {
109    SESSIONS.with(|s| s.borrow_mut().clear());
110    CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().clear());
111}
112
113pub(crate) fn push_current_session(id: String) {
114    if id.is_empty() {
115        return;
116    }
117    CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().push(id));
118}
119
120pub(crate) fn pop_current_session() {
121    CURRENT_SESSION_STACK.with(|stack| {
122        let _ = stack.borrow_mut().pop();
123    });
124}
125
126pub fn current_session_id() -> Option<String> {
127    CURRENT_SESSION_STACK.with(|stack| stack.borrow().last().cloned())
128}
129
130pub fn enter_current_session(id: impl Into<String>) -> CurrentSessionGuard {
131    let id = id.into();
132    if id.trim().is_empty() {
133        return CurrentSessionGuard { active: false };
134    }
135    push_current_session(id);
136    CurrentSessionGuard { active: true }
137}
138
139pub fn exists(id: &str) -> bool {
140    SESSIONS.with(|s| s.borrow().contains_key(id))
141}
142
143pub fn length(id: &str) -> Option<usize> {
144    SESSIONS.with(|s| {
145        s.borrow().get(id).map(|state| {
146            state
147                .transcript
148                .as_dict()
149                .and_then(|d| d.get("messages"))
150                .and_then(|v| match v {
151                    VmValue::List(list) => Some(list.len()),
152                    _ => None,
153                })
154                .unwrap_or(0)
155        })
156    })
157}
158
159pub fn snapshot(id: &str) -> Option<VmValue> {
160    SESSIONS.with(|s| s.borrow().get(id).map(session_snapshot))
161}
162
163/// Open a session, or create it if missing. Returns the resolved id.
164///
165/// Newly-created sessions auto-register an event-log-backed sink when a
166/// generalized [`crate::event_log::EventLog`] has been installed for the
167/// current VM thread. For legacy env-driven workflows that still point
168/// `HARN_EVENT_LOG_DIR` at a directory, we preserve the older JSONL sink
169/// as a compatibility fallback. Re-opening an existing session does not
170/// re-register — sinks are per-session, owned by the first opener.
171pub fn open_or_create(id: Option<String>) -> String {
172    let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
173    let parent_session = current_session_id();
174    let mut was_new = false;
175    SESSIONS.with(|s| {
176        let mut map = s.borrow_mut();
177        if let Some(state) = map.get_mut(&resolved) {
178            state.last_accessed = Instant::now();
179            return;
180        }
181        was_new = true;
182        let cap = SESSION_CAP.with(|c| c.get());
183        if map.len() >= cap {
184            if let Some(victim) = map
185                .iter()
186                .min_by_key(|(_, state)| state.last_accessed)
187                .map(|(id, _)| id.clone())
188            {
189                map.remove(&victim);
190            }
191        }
192        map.insert(resolved.clone(), SessionState::new(resolved.clone()));
193    });
194    if was_new {
195        if let Some(parent) = parent_session.as_deref() {
196            crate::agent_events::mirror_session_sinks(parent, &resolved);
197        }
198        try_register_event_log(&resolved);
199    }
200    resolved
201}
202
203pub fn open_child_session(parent_id: &str, id: Option<String>) -> String {
204    let resolved = open_or_create(id);
205    link_child_session(parent_id, &resolved);
206    resolved
207}
208
209pub fn link_child_session(parent_id: &str, child_id: &str) {
210    link_child_session_with_branch(parent_id, child_id, None);
211}
212
213pub fn link_child_session_with_branch(
214    parent_id: &str,
215    child_id: &str,
216    branched_at_event_index: Option<usize>,
217) {
218    if parent_id == child_id {
219        return;
220    }
221    open_or_create(Some(parent_id.to_string()));
222    open_or_create(Some(child_id.to_string()));
223    SESSIONS.with(|s| {
224        let mut map = s.borrow_mut();
225        update_lineage(&mut map, parent_id, child_id, branched_at_event_index);
226    });
227}
228
229pub fn parent_id(id: &str) -> Option<String> {
230    SESSIONS.with(|s| s.borrow().get(id).and_then(|state| state.parent_id.clone()))
231}
232
233pub fn child_ids(id: &str) -> Vec<String> {
234    SESSIONS.with(|s| {
235        s.borrow()
236            .get(id)
237            .map(|state| state.child_ids.clone())
238            .unwrap_or_default()
239    })
240}
241
242pub fn ancestry(id: &str) -> Option<SessionAncestry> {
243    SESSIONS.with(|s| {
244        let map = s.borrow();
245        let state = map.get(id)?;
246        let mut root_id = state.id.clone();
247        let mut cursor = state.parent_id.clone();
248        let mut seen = HashSet::from([state.id.clone()]);
249        while let Some(parent_id) = cursor {
250            if !seen.insert(parent_id.clone()) {
251                break;
252            }
253            root_id = parent_id.clone();
254            cursor = map
255                .get(&parent_id)
256                .and_then(|parent| parent.parent_id.clone());
257        }
258        Some(SessionAncestry {
259            parent_id: state.parent_id.clone(),
260            child_ids: state.child_ids.clone(),
261            root_id,
262        })
263    })
264}
265
266/// Auto-register a persistent sink for a newly-created session.
267/// Silent no-op on failure — a broken observability sink must never
268/// prevent a session from starting.
269fn try_register_event_log(session_id: &str) {
270    if let Some(log) = crate::event_log::active_event_log() {
271        crate::agent_events::register_sink(
272            session_id,
273            crate::agent_events::EventLogSink::new(log, session_id),
274        );
275        return;
276    }
277    let Ok(dir) = std::env::var("HARN_EVENT_LOG_DIR") else {
278        return;
279    };
280    if dir.is_empty() {
281        return;
282    }
283    let path = std::path::PathBuf::from(dir).join(format!("event_log-{session_id}.jsonl"));
284    if let Ok(sink) = crate::agent_events::JsonlEventSink::open(&path) {
285        crate::agent_events::register_sink(session_id, sink);
286    }
287}
288
289pub fn register_event_log_sink(session_id: &str) {
290    try_register_event_log(session_id);
291}
292
293pub fn close(id: &str) {
294    SESSIONS.with(|s| {
295        s.borrow_mut().remove(id);
296    });
297}
298
299pub fn reset_transcript(id: &str) -> bool {
300    SESSIONS.with(|s| {
301        let mut map = s.borrow_mut();
302        let Some(state) = map.get_mut(id) else {
303            return false;
304        };
305        state.transcript = empty_transcript(id);
306        state.tool_format = None;
307        state.last_accessed = Instant::now();
308        true
309    })
310}
311
312/// Copy `src`'s transcript into a new session id. Subscribers are NOT
313/// copied — a fork is a conversation branch, not an event fanout.
314///
315/// Touches `src`'s `last_accessed` before evicting, so the fork
316/// operation itself can't make `src` look stale and kick it out of
317/// the LRU just to make room for the new fork.
318pub fn fork(src_id: &str, dst_id: Option<String>) -> Option<String> {
319    let (src_transcript, src_tool_format, dst) = SESSIONS.with(|s| {
320        let mut map = s.borrow_mut();
321        let src = map.get_mut(src_id)?;
322        src.last_accessed = Instant::now();
323        let dst = dst_id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
324        let forked_transcript = clone_transcript_with_id(&src.transcript, &dst);
325        Some((forked_transcript, src.tool_format.clone(), dst))
326    })?;
327    // Ensure cap is respected when inserting the fork.
328    open_or_create(Some(dst.clone()));
329    SESSIONS.with(|s| {
330        let mut map = s.borrow_mut();
331        if let Some(state) = map.get_mut(&dst) {
332            state.transcript = src_transcript;
333            state.tool_format = src_tool_format;
334            state.last_accessed = Instant::now();
335        }
336        update_lineage(&mut map, src_id, &dst, None);
337    });
338    // open_or_create evicts BEFORE inserting, so the dst slot is
339    // guaranteed once we get here. The existence check is cheap
340    // insurance against a future refactor that breaks that invariant.
341    if exists(&dst) {
342        Some(dst)
343    } else {
344        None
345    }
346}
347
348/// Fork `src_id` and truncate the destination transcript to the
349/// first `keep_first` messages (#105 — branch-replay). Pairs with the
350/// scrubber: the host picks an event index, rebuilds a message count,
351/// and calls this to spawn a live sibling session that resumes from
352/// the rebuilt state. Subscribers are not carried over (same as
353/// `fork`), so sibling events don't double-fan into the parent's
354/// consumers.
355///
356/// Returns the new session id on success, `None` if `src_id` doesn't
357/// exist.
358pub fn fork_at(src_id: &str, keep_first: usize, dst_id: Option<String>) -> Option<String> {
359    let branched_at_event_index = SESSIONS.with(|s| {
360        let map = s.borrow();
361        let src = map.get(src_id)?;
362        Some(branch_event_index(&src.transcript, keep_first))
363    })?;
364    let new_id = fork(src_id, dst_id)?;
365    link_child_session_with_branch(src_id, &new_id, Some(branched_at_event_index));
366    retain_first(&new_id, keep_first);
367    Some(new_id)
368}
369
370/// Truncate the session transcript to the first `keep_first`
371/// messages (opposite of `trim`, which keeps the last N). Used by
372/// `fork_at` to cut a branch at a scrubber position.
373fn retain_first(id: &str, keep_first: usize) {
374    SESSIONS.with(|s| {
375        let mut map = s.borrow_mut();
376        let Some(state) = map.get_mut(id) else {
377            return;
378        };
379        let Some(dict) = state.transcript.as_dict() else {
380            return;
381        };
382        let dict = dict.clone();
383        let messages: Vec<VmValue> = match dict.get("messages") {
384            Some(VmValue::List(list)) => list.iter().cloned().collect(),
385            _ => Vec::new(),
386        };
387        let retained: Vec<VmValue> = messages.into_iter().take(keep_first).collect();
388        let mut next = dict;
389        next.insert(
390            "events".to_string(),
391            VmValue::List(Rc::new(
392                crate::llm::helpers::transcript_events_from_messages(&retained),
393            )),
394        );
395        next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
396        state.transcript = VmValue::Dict(Rc::new(next));
397        state.last_accessed = Instant::now();
398    });
399}
400
401/// Retain only the last `keep_last` messages in the session transcript.
402/// Returns the kept count (<= keep_last).
403pub fn trim(id: &str, keep_last: usize) -> Option<usize> {
404    SESSIONS.with(|s| {
405        let mut map = s.borrow_mut();
406        let state = map.get_mut(id)?;
407        let dict = state.transcript.as_dict()?.clone();
408        let messages: Vec<VmValue> = match dict.get("messages") {
409            Some(VmValue::List(list)) => list.iter().cloned().collect(),
410            _ => Vec::new(),
411        };
412        let start = messages.len().saturating_sub(keep_last);
413        let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
414        let kept = retained.len();
415        let mut next = dict;
416        next.insert(
417            "events".to_string(),
418            VmValue::List(Rc::new(
419                crate::llm::helpers::transcript_events_from_messages(&retained),
420            )),
421        );
422        next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
423        state.transcript = VmValue::Dict(Rc::new(next));
424        state.last_accessed = Instant::now();
425        Some(kept)
426    })
427}
428
429/// Append a message dict to the session transcript. The message must
430/// have at least a string `role`; anything else is merged verbatim.
431pub fn inject_message(id: &str, message: VmValue) -> Result<(), String> {
432    let Some(msg_dict) = message.as_dict().cloned() else {
433        return Err("agent_session_inject: message must be a dict".into());
434    };
435    let role_ok = matches!(msg_dict.get("role"), Some(VmValue::String(_)));
436    if !role_ok {
437        return Err(
438            "agent_session_inject: message must have a string `role` (user|assistant|tool_result|system)"
439                .into(),
440        );
441    }
442    SESSIONS.with(|s| {
443        let mut map = s.borrow_mut();
444        let Some(state) = map.get_mut(id) else {
445            return Err(format!("agent_session_inject: unknown session id '{id}'"));
446        };
447        let dict = state
448            .transcript
449            .as_dict()
450            .cloned()
451            .unwrap_or_else(BTreeMap::new);
452        let mut messages: Vec<VmValue> = match dict.get("messages") {
453            Some(VmValue::List(list)) => list.iter().cloned().collect(),
454            _ => Vec::new(),
455        };
456        let mut events: Vec<VmValue> = match dict.get("events") {
457            Some(VmValue::List(list)) => list.iter().cloned().collect(),
458            _ => crate::llm::helpers::transcript_events_from_messages(&messages),
459        };
460        let new_message = VmValue::Dict(Rc::new(msg_dict));
461        events.push(crate::llm::helpers::transcript_event_from_message(
462            &new_message,
463        ));
464        messages.push(new_message);
465        let mut next = dict;
466        next.insert("events".to_string(), VmValue::List(Rc::new(events)));
467        next.insert("messages".to_string(), VmValue::List(Rc::new(messages)));
468        state.transcript = VmValue::Dict(Rc::new(next));
469        state.last_accessed = Instant::now();
470        Ok(())
471    })
472}
473
474/// Load the messages vec (as JSON) for this session, for use as prefix
475/// to an agent_loop run. Returns an empty vec if the session doesn't
476/// exist or has no messages.
477pub fn messages_json(id: &str) -> Vec<serde_json::Value> {
478    SESSIONS.with(|s| {
479        let map = s.borrow();
480        let Some(state) = map.get(id) else {
481            return Vec::new();
482        };
483        let Some(dict) = state.transcript.as_dict() else {
484            return Vec::new();
485        };
486        match dict.get("messages") {
487            Some(VmValue::List(list)) => list
488                .iter()
489                .map(crate::llm::helpers::vm_value_to_json)
490                .collect(),
491            _ => Vec::new(),
492        }
493    })
494}
495
496#[derive(Clone, Debug, Default)]
497pub struct SessionPromptState {
498    pub messages: Vec<serde_json::Value>,
499    pub summary: Option<String>,
500}
501
502fn summary_message_json(summary: &str) -> serde_json::Value {
503    serde_json::json!({
504        "role": "user",
505        "content": summary,
506    })
507}
508
509fn messages_begin_with_summary(messages: &[serde_json::Value], summary: &str) -> bool {
510    messages.first().is_some_and(|message| {
511        message.get("role").and_then(|value| value.as_str()) == Some("user")
512            && message.get("content").and_then(|value| value.as_str()) == Some(summary)
513    })
514}
515
516/// Prompt-surface resume state for a persisted session.
517///
518/// Returns the compacted/rehydratable message list plus the transcript's
519/// summary field. When the transcript carries a summary field but its
520/// message list does not already begin with the compacted summary
521/// message, this helper prepends one so session re-entry preserves the
522/// same prompt surface the previous loop was actually using.
523pub fn prompt_state_json(id: &str) -> SessionPromptState {
524    SESSIONS.with(|s| {
525        let map = s.borrow();
526        let Some(state) = map.get(id) else {
527            return SessionPromptState::default();
528        };
529        let Some(dict) = state.transcript.as_dict() else {
530            return SessionPromptState::default();
531        };
532        let mut messages = match dict.get("messages") {
533            Some(VmValue::List(list)) => list
534                .iter()
535                .map(crate::llm::helpers::vm_value_to_json)
536                .collect::<Vec<_>>(),
537            _ => Vec::new(),
538        };
539        let summary = dict.get("summary").and_then(|value| match value {
540            VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
541            _ => None,
542        });
543        if let Some(summary_text) = summary.as_deref() {
544            if !messages_begin_with_summary(&messages, summary_text) {
545                messages.insert(0, summary_message_json(summary_text));
546            }
547        }
548        SessionPromptState { messages, summary }
549    })
550}
551
552/// Overwrite the transcript for this session. Used by `agent_loop` on
553/// exit to persist the synthesized transcript.
554pub fn store_transcript(id: &str, transcript: VmValue) {
555    SESSIONS.with(|s| {
556        if let Some(state) = s.borrow_mut().get_mut(id) {
557            state.transcript = transcript_with_session_metadata(transcript, state);
558            state.last_accessed = Instant::now();
559        }
560    });
561}
562
563/// Append a transcript event to the session without mutating its
564/// message list. Used for orchestration-side lineage events (sub-agent
565/// spawn/completion, workflow hooks, etc.) that should survive
566/// persistence/replay without being replayed back into the model as
567/// conversational messages.
568pub fn append_event(id: &str, event: VmValue) -> Result<(), String> {
569    let Some(event_dict) = event.as_dict() else {
570        return Err("agent_session_append_event: event must be a dict".into());
571    };
572    let kind_ok = matches!(event_dict.get("kind"), Some(VmValue::String(_)));
573    if !kind_ok {
574        return Err("agent_session_append_event: event must have a string `kind`".into());
575    }
576    SESSIONS.with(|s| {
577        let mut map = s.borrow_mut();
578        let Some(state) = map.get_mut(id) else {
579            return Err(format!(
580                "agent_session_append_event: unknown session id '{id}'"
581            ));
582        };
583        let dict = state
584            .transcript
585            .as_dict()
586            .cloned()
587            .unwrap_or_else(BTreeMap::new);
588        let mut events: Vec<VmValue> = match dict.get("events") {
589            Some(VmValue::List(list)) => list.iter().cloned().collect(),
590            _ => dict
591                .get("messages")
592                .and_then(|value| match value {
593                    VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
594                    _ => None,
595                })
596                .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
597                .unwrap_or_default(),
598        };
599        events.push(event);
600        let mut next = dict;
601        next.insert("events".to_string(), VmValue::List(Rc::new(events)));
602        state.transcript = VmValue::Dict(Rc::new(next));
603        state.last_accessed = Instant::now();
604        Ok(())
605    })
606}
607
608/// Replace the transcript's message list wholesale. Used by the
609/// in-loop compaction path, which operates on JSON messages.
610pub fn replace_messages(id: &str, messages: &[serde_json::Value]) {
611    replace_messages_with_summary(id, messages, None);
612}
613
614/// Replace the transcript's message list and optionally update the
615/// `summary` field on the persisted transcript. The compaction path
616/// uses this to publish the human-readable rollup line that
617/// `transcript_summary(transcript)` exposes to host code.
618pub fn replace_messages_with_summary(
619    id: &str,
620    messages: &[serde_json::Value],
621    summary: Option<&str>,
622) {
623    SESSIONS.with(|s| {
624        let mut map = s.borrow_mut();
625        let Some(state) = map.get_mut(id) else {
626            return;
627        };
628        let dict = state
629            .transcript
630            .as_dict()
631            .cloned()
632            .unwrap_or_else(BTreeMap::new);
633        let vm_messages: Vec<VmValue> = messages
634            .iter()
635            .map(crate::stdlib::json_to_vm_value)
636            .collect();
637        let mut next = dict;
638        next.insert(
639            "events".to_string(),
640            VmValue::List(Rc::new(
641                crate::llm::helpers::transcript_events_from_messages(&vm_messages),
642            )),
643        );
644        next.insert("messages".to_string(), VmValue::List(Rc::new(vm_messages)));
645        if let Some(summary) = summary {
646            next.insert(
647                "summary".to_string(),
648                VmValue::String(Rc::from(summary.to_string())),
649            );
650        }
651        state.transcript = VmValue::Dict(Rc::new(next));
652        state.last_accessed = Instant::now();
653    });
654}
655
656pub fn append_subscriber(id: &str, callback: VmValue) {
657    open_or_create(Some(id.to_string()));
658    SESSIONS.with(|s| {
659        if let Some(state) = s.borrow_mut().get_mut(id) {
660            state.subscribers.push(callback);
661            state.last_accessed = Instant::now();
662        }
663    });
664}
665
666pub fn subscribers_for(id: &str) -> Vec<VmValue> {
667    SESSIONS.with(|s| {
668        s.borrow()
669            .get(id)
670            .map(|state| state.subscribers.clone())
671            .unwrap_or_default()
672    })
673}
674
675pub fn subscriber_count(id: &str) -> usize {
676    SESSIONS.with(|s| {
677        s.borrow()
678            .get(id)
679            .map(|state| state.subscribers.len())
680            .unwrap_or(0)
681    })
682}
683
684/// Persist the set of active skill names for session resume. Called at
685/// the end of an agent_loop run; the next `open_or_create` for this id
686/// reads them back via [`active_skills`].
687pub fn set_active_skills(id: &str, skills: Vec<String>) {
688    SESSIONS.with(|s| {
689        if let Some(state) = s.borrow_mut().get_mut(id) {
690            state.active_skills = skills;
691            state.last_accessed = Instant::now();
692        }
693    });
694}
695
696/// Skills that were active at the end of the previous agent_loop run
697/// against this session. Returns an empty vec when the session doesn't
698/// exist or nothing was persisted.
699pub fn active_skills(id: &str) -> Vec<String> {
700    SESSIONS.with(|s| {
701        s.borrow()
702            .get(id)
703            .map(|state| state.active_skills.clone())
704            .unwrap_or_default()
705    })
706}
707
708/// Claim the tool-calling contract for a session.
709///
710/// The first loop against a named session records its `tool_format`.
711/// Later re-entry must use the same format so prompt/history generated
712/// under a text contract is never replayed as native, or vice versa.
713pub fn claim_tool_format(id: &str, tool_format: &str) -> Result<(), String> {
714    let tool_format = tool_format.trim();
715    if tool_format.is_empty() {
716        return Ok(());
717    }
718    SESSIONS.with(|s| {
719        let mut map = s.borrow_mut();
720        let Some(state) = map.get_mut(id) else {
721            return Err(format!("agent session '{id}' does not exist"));
722        };
723        match state.tool_format.as_deref() {
724            Some(existing) if existing != tool_format => Err(format!(
725                "agent session '{id}' is locked to tool_format='{existing}', but this run requested tool_format='{tool_format}'. Start a new session or fork/reset the transcript before changing tool mode."
726            )),
727            Some(_) => {
728                state.last_accessed = Instant::now();
729                Ok(())
730            }
731            None => {
732                state.tool_format = Some(tool_format.to_string());
733                state.last_accessed = Instant::now();
734                Ok(())
735            }
736        }
737    })
738}
739
740pub fn tool_format(id: &str) -> Option<String> {
741    SESSIONS.with(|s| {
742        s.borrow()
743            .get(id)
744            .and_then(|state| state.tool_format.clone())
745    })
746}
747
748fn empty_transcript(id: &str) -> VmValue {
749    use crate::llm::helpers::new_transcript_with;
750    new_transcript_with(Some(id.to_string()), Vec::new(), None, None)
751}
752
753fn clone_transcript_with_id(transcript: &VmValue, new_id: &str) -> VmValue {
754    let Some(dict) = transcript.as_dict() else {
755        return empty_transcript(new_id);
756    };
757    let mut next = dict.clone();
758    next.insert(
759        "id".to_string(),
760        VmValue::String(Rc::from(new_id.to_string())),
761    );
762    VmValue::Dict(Rc::new(next))
763}
764
765fn clone_transcript_with_parent(transcript: &VmValue, parent_id: &str) -> VmValue {
766    let Some(dict) = transcript.as_dict() else {
767        return transcript.clone();
768    };
769    let mut next = dict.clone();
770    let metadata = match next.get("metadata") {
771        Some(VmValue::Dict(metadata)) => {
772            let mut metadata = metadata.as_ref().clone();
773            metadata.insert(
774                "parent_session_id".to_string(),
775                VmValue::String(Rc::from(parent_id.to_string())),
776            );
777            VmValue::Dict(Rc::new(metadata))
778        }
779        _ => VmValue::Dict(Rc::new(BTreeMap::from([(
780            "parent_session_id".to_string(),
781            VmValue::String(Rc::from(parent_id.to_string())),
782        )]))),
783    };
784    next.insert("metadata".to_string(), metadata);
785    VmValue::Dict(Rc::new(next))
786}
787
788fn transcript_with_session_metadata(transcript: VmValue, state: &SessionState) -> VmValue {
789    let Some(dict) = transcript.as_dict() else {
790        return transcript;
791    };
792    let mut next = dict.clone();
793    let mut metadata = match next.get("metadata") {
794        Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
795        _ => BTreeMap::new(),
796    };
797    if let Some(tool_format) = state.tool_format.as_ref() {
798        metadata.insert(
799            "tool_format".to_string(),
800            VmValue::String(Rc::from(tool_format.clone())),
801        );
802        metadata.insert("tool_mode_locked".to_string(), VmValue::Bool(true));
803    }
804    if !metadata.is_empty() {
805        next.insert("metadata".to_string(), VmValue::Dict(Rc::new(metadata)));
806    }
807    VmValue::Dict(Rc::new(next))
808}
809
810fn session_snapshot(state: &SessionState) -> VmValue {
811    let Some(dict) = state.transcript.as_dict() else {
812        return state.transcript.clone();
813    };
814    let mut next = dict.clone();
815    next.insert(
816        "parent_id".to_string(),
817        state
818            .parent_id
819            .as_ref()
820            .map(|id| VmValue::String(Rc::from(id.clone())))
821            .unwrap_or(VmValue::Nil),
822    );
823    next.insert(
824        "child_ids".to_string(),
825        VmValue::List(Rc::new(
826            state
827                .child_ids
828                .iter()
829                .cloned()
830                .map(|id| VmValue::String(Rc::from(id)))
831                .collect(),
832        )),
833    );
834    next.insert(
835        "branched_at_event_index".to_string(),
836        state
837            .branched_at_event_index
838            .map(|index| VmValue::Int(index as i64))
839            .unwrap_or(VmValue::Nil),
840    );
841    VmValue::Dict(Rc::new(next))
842}
843
844fn update_lineage(
845    map: &mut HashMap<String, SessionState>,
846    parent_id: &str,
847    child_id: &str,
848    branched_at_event_index: Option<usize>,
849) {
850    let old_parent_id = map.get(child_id).and_then(|child| child.parent_id.clone());
851    if let Some(old_parent_id) = old_parent_id.filter(|old_parent_id| old_parent_id != parent_id) {
852        if let Some(old_parent) = map.get_mut(&old_parent_id) {
853            old_parent.child_ids.retain(|id| id != child_id);
854            old_parent.last_accessed = Instant::now();
855        }
856    }
857    if let Some(parent) = map.get_mut(parent_id) {
858        parent.last_accessed = Instant::now();
859        if !parent.child_ids.iter().any(|id| id == child_id) {
860            parent.child_ids.push(child_id.to_string());
861        }
862    }
863    if let Some(child) = map.get_mut(child_id) {
864        child.last_accessed = Instant::now();
865        child.parent_id = Some(parent_id.to_string());
866        child.branched_at_event_index = branched_at_event_index;
867        child.transcript = clone_transcript_with_parent(&child.transcript, parent_id);
868    }
869}
870
871fn branch_event_index(transcript: &VmValue, keep_first: usize) -> usize {
872    if keep_first == 0 {
873        return 0;
874    }
875    let Some(dict) = transcript.as_dict() else {
876        return keep_first;
877    };
878    let Some(VmValue::List(events)) = dict.get("events") else {
879        return keep_first;
880    };
881    let mut retained_messages = 0usize;
882    for (index, event) in events.iter().enumerate() {
883        let kind = event
884            .as_dict()
885            .and_then(|dict| dict.get("kind"))
886            .map(VmValue::display);
887        if matches!(kind.as_deref(), Some("message" | "tool_result")) {
888            retained_messages += 1;
889            if retained_messages == keep_first {
890                return index + 1;
891            }
892        }
893    }
894    events.len()
895}
896
897#[cfg(test)]
898mod tests {
899    use super::*;
900    use crate::agent_events::{
901        emit_event, reset_all_sinks, session_external_sink_count, AgentEvent,
902    };
903    use crate::event_log::{active_event_log, EventLog, Topic};
904    use std::collections::BTreeMap;
905
906    fn make_msg(role: &str, content: &str) -> VmValue {
907        let mut m: BTreeMap<String, VmValue> = BTreeMap::new();
908        m.insert("role".to_string(), VmValue::String(Rc::from(role)));
909        m.insert("content".to_string(), VmValue::String(Rc::from(content)));
910        VmValue::Dict(Rc::new(m))
911    }
912
913    fn message_count(id: &str) -> usize {
914        SESSIONS.with(|s| {
915            let map = s.borrow();
916            let Some(state) = map.get(id) else { return 0 };
917            let Some(dict) = state.transcript.as_dict() else {
918                return 0;
919            };
920            match dict.get("messages") {
921                Some(VmValue::List(list)) => list.len(),
922                _ => 0,
923            }
924        })
925    }
926
927    #[test]
928    fn fork_at_truncates_destination_to_keep_first() {
929        reset_session_store();
930        let src = open_or_create(Some("src-fork-at".into()));
931        inject_message(&src, make_msg("user", "a")).unwrap();
932        inject_message(&src, make_msg("assistant", "b")).unwrap();
933        inject_message(&src, make_msg("user", "c")).unwrap();
934        inject_message(&src, make_msg("assistant", "d")).unwrap();
935        assert_eq!(message_count(&src), 4);
936
937        let dst = fork_at(&src, 2, Some("dst-fork-at".into())).expect("fork_at");
938        assert_ne!(dst, src);
939        assert_eq!(message_count(&dst), 2, "branched at message index 2");
940        assert_eq!(
941            snapshot(&dst)
942                .and_then(|value| value.as_dict().cloned())
943                .and_then(|dict| dict
944                    .get("branched_at_event_index")
945                    .and_then(VmValue::as_int)),
946            Some(2)
947        );
948        // Source untouched.
949        assert_eq!(message_count(&src), 4);
950        // Subscribers not carried — forks start with a clean fanout list.
951        assert_eq!(subscriber_count(&dst), 0);
952        reset_session_store();
953    }
954
955    #[test]
956    fn fork_at_on_unknown_source_returns_none() {
957        reset_session_store();
958        assert!(fork_at("does-not-exist", 3, None).is_none());
959    }
960
961    #[test]
962    fn child_sessions_record_parent_lineage() {
963        reset_session_store();
964        let parent = open_or_create(Some("parent-session".into()));
965        let child = open_child_session(&parent, Some("child-session".into()));
966        assert_eq!(parent_id(&child).as_deref(), Some("parent-session"));
967        assert_eq!(child_ids(&parent), vec!["child-session".to_string()]);
968        assert_eq!(
969            ancestry(&child),
970            Some(SessionAncestry {
971                parent_id: Some("parent-session".to_string()),
972                child_ids: Vec::new(),
973                root_id: "parent-session".to_string(),
974            })
975        );
976
977        let transcript = snapshot(&child).expect("child transcript");
978        let transcript = transcript.as_dict().expect("child snapshot");
979        let metadata = transcript
980            .get("metadata")
981            .and_then(|value| value.as_dict())
982            .expect("child metadata");
983        assert!(
984            matches!(transcript.get("parent_id"), Some(VmValue::String(value)) if value.as_ref() == "parent-session")
985        );
986        assert!(
987            matches!(transcript.get("child_ids"), Some(VmValue::List(children)) if children.is_empty())
988        );
989        assert!(matches!(
990            transcript.get("branched_at_event_index"),
991            Some(VmValue::Nil)
992        ));
993        assert!(matches!(
994            metadata.get("parent_session_id"),
995            Some(VmValue::String(value)) if value.as_ref() == "parent-session"
996        ));
997    }
998
999    #[test]
1000    fn branch_event_index_counts_non_message_events() {
1001        reset_session_store();
1002        let src = open_or_create(Some("branch-event-index".into()));
1003        let transcript = VmValue::Dict(Rc::new(BTreeMap::from([
1004            ("id".to_string(), VmValue::String(Rc::from(src.clone()))),
1005            (
1006                "messages".to_string(),
1007                VmValue::List(Rc::new(vec![
1008                    make_msg("user", "a"),
1009                    make_msg("assistant", "b"),
1010                ])),
1011            ),
1012            (
1013                "events".to_string(),
1014                VmValue::List(Rc::new(vec![
1015                    VmValue::Dict(Rc::new(BTreeMap::from([(
1016                        "kind".to_string(),
1017                        VmValue::String(Rc::from("message")),
1018                    )]))),
1019                    VmValue::Dict(Rc::new(BTreeMap::from([(
1020                        "kind".to_string(),
1021                        VmValue::String(Rc::from("sub_agent_start")),
1022                    )]))),
1023                    VmValue::Dict(Rc::new(BTreeMap::from([(
1024                        "kind".to_string(),
1025                        VmValue::String(Rc::from("message")),
1026                    )]))),
1027                ])),
1028            ),
1029        ])));
1030        store_transcript(&src, transcript);
1031
1032        let dst = fork_at(&src, 2, Some("branch-event-index-child".into())).expect("fork_at");
1033        assert_eq!(
1034            snapshot(&dst)
1035                .and_then(|value| value.as_dict().cloned())
1036                .and_then(|dict| dict
1037                    .get("branched_at_event_index")
1038                    .and_then(VmValue::as_int)),
1039            Some(3)
1040        );
1041    }
1042
1043    #[test]
1044    fn child_session_records_lineage_without_reusing_parent_transcript() {
1045        reset_session_store();
1046        let parent = open_or_create(Some("parent-fork-parent".into()));
1047        inject_message(&parent, make_msg("user", "parent context")).unwrap();
1048        claim_tool_format(&parent, "native").unwrap();
1049
1050        let child = open_child_session(&parent, Some("parent-fork-child".into()));
1051        assert_eq!(message_count(&parent), 1);
1052        assert_eq!(message_count(&child), 0);
1053        assert_eq!(tool_format(&child), None);
1054        assert_eq!(parent_id(&child).as_deref(), Some(parent.as_str()));
1055    }
1056
1057    #[test]
1058    fn prompt_state_prepends_summary_message_when_missing_from_messages() {
1059        reset_session_store();
1060        let session = open_or_create(Some("prompt-state-summary".into()));
1061        let transcript = crate::llm::helpers::new_transcript_with_events(
1062            Some(session.clone()),
1063            vec![make_msg("assistant", "latest answer")],
1064            Some("[auto-compacted 2 older messages]\nsummary".to_string()),
1065            None,
1066            Vec::new(),
1067            Vec::new(),
1068            Some("active"),
1069        );
1070        store_transcript(&session, transcript);
1071
1072        let prompt = prompt_state_json(&session);
1073        assert_eq!(
1074            prompt.summary.as_deref(),
1075            Some("[auto-compacted 2 older messages]\nsummary")
1076        );
1077        assert_eq!(prompt.messages.len(), 2);
1078        assert_eq!(prompt.messages[0]["role"].as_str(), Some("user"));
1079        assert_eq!(
1080            prompt.messages[0]["content"].as_str(),
1081            Some("[auto-compacted 2 older messages]\nsummary"),
1082        );
1083        assert_eq!(prompt.messages[1]["role"].as_str(), Some("assistant"));
1084    }
1085
1086    #[tokio::test(flavor = "current_thread", start_paused = true)]
1087    async fn open_or_create_registers_event_log_sink_when_active_log_is_installed() {
1088        reset_all_sinks();
1089        crate::event_log::reset_active_event_log();
1090        let dir = tempfile::tempdir().expect("tempdir");
1091        crate::event_log::install_default_for_base_dir(dir.path()).expect("install event log");
1092
1093        let session = open_or_create(Some("event-log-session".into()));
1094        assert_eq!(session_external_sink_count(&session), 1);
1095
1096        emit_event(&AgentEvent::TurnStart {
1097            session_id: session.clone(),
1098            iteration: 0,
1099        });
1100        tokio::time::sleep(std::time::Duration::from_millis(25)).await;
1101
1102        let topic = Topic::new("observability.agent_events.event-log-session").unwrap();
1103        let log = active_event_log().expect("active event log");
1104        let events = log.read_range(&topic, None, usize::MAX).await.unwrap();
1105        assert_eq!(events.len(), 1);
1106        assert_eq!(events[0].1.kind, "turn_start");
1107
1108        crate::event_log::reset_active_event_log();
1109        reset_all_sinks();
1110    }
1111}