Skip to main content

harn_vm/
agent_sessions.rs

1//! First-class session storage.
2//!
3//! A session owns three things:
4//!
5//! 1. A transcript dict (messages, events, summary, metadata, …).
6//! 2. Closure subscribers that fire on agent-loop events for this session.
7//! 3. Its own lifecycle (open, reset, fork, trim, compact, close).
8//!
9//! Storage is thread-local because `VmValue` contains `Rc`, which is
10//! neither `Send` nor `Sync`. The agent loop runs on a tokio
11//! current-thread worker, so all session reads and writes happen on the
12//! same thread. The closure-subscribers register, fire, and unregister
13//! on that same thread.
14//!
15//! Lifecycle is explicit. Builtins (`agent_session_open`,
16//! `_reset`, `_fork`, `_fork_at`, `_close`, `_trim`, `_compact`,
17//! `_inject`, `_exists`, `_length`, `_snapshot`, `_ancestry`) drive
18//! the store directly — there is no "policy" config dict that
19//! performs lifecycle as a side effect.
20
21use std::cell::{Cell, RefCell};
22use std::collections::{BTreeMap, HashMap, HashSet};
23use std::rc::Rc;
24use std::time::Instant;
25
26use crate::value::VmValue;
27
28/// Default cap on concurrent sessions per VM thread. Beyond this the
29/// least-recently-accessed session is evicted on the next `open`.
30pub const DEFAULT_SESSION_CAP: usize = 128;
31
32pub struct SessionState {
33    pub id: String,
34    pub transcript: VmValue,
35    pub subscribers: Vec<VmValue>,
36    pub created_at: Instant,
37    pub last_accessed: Instant,
38    pub parent_id: Option<String>,
39    pub child_ids: Vec<String>,
40    pub branched_at_event_index: Option<usize>,
41    /// Names of skills that were active at the end of the most recent
42    /// `agent_loop` run on this session. Empty when no skills were
43    /// matched, when the skill system wasn't used, or when the
44    /// deactivation phase cleared them. Re-entering the session
45    /// restores these as the initial active set before matching runs.
46    pub active_skills: Vec<String>,
47    /// Tool-calling protocol claimed by the first agent loop that uses
48    /// this session. A transcript is only replayable under the same
49    /// contract that produced its prompt/history.
50    pub tool_format: Option<String>,
51    /// Stable session-level system prompt material. This is transcript
52    /// metadata, not a replay message: providers receive it through
53    /// their system/developer instruction channel on each call.
54    pub system_prompt: Option<String>,
55}
56
57impl SessionState {
58    fn new(id: String) -> Self {
59        let now = Instant::now();
60        let transcript = empty_transcript(&id);
61        Self {
62            id,
63            transcript,
64            subscribers: Vec::new(),
65            created_at: now,
66            last_accessed: now,
67            parent_id: None,
68            child_ids: Vec::new(),
69            branched_at_event_index: None,
70            active_skills: Vec::new(),
71            tool_format: None,
72            system_prompt: None,
73        }
74    }
75}
76
77#[derive(Clone, Debug, PartialEq, Eq)]
78pub struct SessionAncestry {
79    pub parent_id: Option<String>,
80    pub child_ids: Vec<String>,
81    pub root_id: String,
82}
83
84thread_local! {
85    static SESSIONS: RefCell<HashMap<String, SessionState>> = RefCell::new(HashMap::new());
86    static SESSION_CAP: Cell<usize> = const { Cell::new(DEFAULT_SESSION_CAP) };
87    static CURRENT_SESSION_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
88}
89
90pub struct CurrentSessionGuard {
91    active: bool,
92}
93
94impl Drop for CurrentSessionGuard {
95    fn drop(&mut self) {
96        if self.active {
97            pop_current_session();
98        }
99    }
100}
101
102/// Set the per-thread session cap. Primarily for tests; production VMs
103/// inherit the default.
104pub fn set_session_cap(cap: usize) {
105    SESSION_CAP.with(|c| c.set(cap.max(1)));
106}
107
108pub fn session_cap() -> usize {
109    SESSION_CAP.with(|c| c.get())
110}
111
112/// Clear the session store. Wired into `reset_llm_state` for test isolation.
113pub fn reset_session_store() {
114    SESSIONS.with(|s| s.borrow_mut().clear());
115    CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().clear());
116}
117
118pub(crate) fn push_current_session(id: String) {
119    if id.is_empty() {
120        return;
121    }
122    CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().push(id));
123}
124
125pub(crate) fn pop_current_session() {
126    CURRENT_SESSION_STACK.with(|stack| {
127        let _ = stack.borrow_mut().pop();
128    });
129}
130
131pub fn current_session_id() -> Option<String> {
132    CURRENT_SESSION_STACK.with(|stack| stack.borrow().last().cloned())
133}
134
135pub fn enter_current_session(id: impl Into<String>) -> CurrentSessionGuard {
136    let id = id.into();
137    if id.trim().is_empty() {
138        return CurrentSessionGuard { active: false };
139    }
140    push_current_session(id);
141    CurrentSessionGuard { active: true }
142}
143
144pub fn exists(id: &str) -> bool {
145    SESSIONS.with(|s| s.borrow().contains_key(id))
146}
147
148pub fn length(id: &str) -> Option<usize> {
149    SESSIONS.with(|s| {
150        s.borrow().get(id).map(|state| {
151            state
152                .transcript
153                .as_dict()
154                .and_then(|d| d.get("messages"))
155                .and_then(|v| match v {
156                    VmValue::List(list) => Some(list.len()),
157                    _ => None,
158                })
159                .unwrap_or(0)
160        })
161    })
162}
163
164pub fn snapshot(id: &str) -> Option<VmValue> {
165    SESSIONS.with(|s| s.borrow().get(id).map(session_snapshot))
166}
167
168/// Open a session, or create it if missing. Returns the resolved id.
169///
170/// Newly-created sessions auto-register an event-log-backed sink when a
171/// generalized [`crate::event_log::EventLog`] has been installed for the
172/// current VM thread. For legacy env-driven workflows that still point
173/// `HARN_EVENT_LOG_DIR` at a directory, we preserve the older JSONL sink
174/// as a compatibility fallback. Re-opening an existing session does not
175/// re-register — sinks are per-session, owned by the first opener.
176pub fn open_or_create(id: Option<String>) -> String {
177    let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
178    let parent_session = current_session_id();
179    let mut was_new = false;
180    SESSIONS.with(|s| {
181        let mut map = s.borrow_mut();
182        if let Some(state) = map.get_mut(&resolved) {
183            state.last_accessed = Instant::now();
184            return;
185        }
186        was_new = true;
187        let cap = SESSION_CAP.with(|c| c.get());
188        if map.len() >= cap {
189            if let Some(victim) = map
190                .iter()
191                .min_by_key(|(_, state)| state.last_accessed)
192                .map(|(id, _)| id.clone())
193            {
194                map.remove(&victim);
195            }
196        }
197        map.insert(resolved.clone(), SessionState::new(resolved.clone()));
198    });
199    if was_new {
200        if let Some(parent) = parent_session.as_deref() {
201            crate::agent_events::mirror_session_sinks(parent, &resolved);
202        }
203        try_register_event_log(&resolved);
204    }
205    resolved
206}
207
208pub fn open_child_session(parent_id: &str, id: Option<String>) -> String {
209    let resolved = open_or_create(id);
210    link_child_session(parent_id, &resolved);
211    resolved
212}
213
214pub fn link_child_session(parent_id: &str, child_id: &str) {
215    link_child_session_with_branch(parent_id, child_id, None);
216}
217
218pub fn link_child_session_with_branch(
219    parent_id: &str,
220    child_id: &str,
221    branched_at_event_index: Option<usize>,
222) {
223    if parent_id == child_id {
224        return;
225    }
226    open_or_create(Some(parent_id.to_string()));
227    open_or_create(Some(child_id.to_string()));
228    SESSIONS.with(|s| {
229        let mut map = s.borrow_mut();
230        update_lineage(&mut map, parent_id, child_id, branched_at_event_index);
231    });
232}
233
234pub fn parent_id(id: &str) -> Option<String> {
235    SESSIONS.with(|s| s.borrow().get(id).and_then(|state| state.parent_id.clone()))
236}
237
238pub fn child_ids(id: &str) -> Vec<String> {
239    SESSIONS.with(|s| {
240        s.borrow()
241            .get(id)
242            .map(|state| state.child_ids.clone())
243            .unwrap_or_default()
244    })
245}
246
247pub fn ancestry(id: &str) -> Option<SessionAncestry> {
248    SESSIONS.with(|s| {
249        let map = s.borrow();
250        let state = map.get(id)?;
251        let mut root_id = state.id.clone();
252        let mut cursor = state.parent_id.clone();
253        let mut seen = HashSet::from([state.id.clone()]);
254        while let Some(parent_id) = cursor {
255            if !seen.insert(parent_id.clone()) {
256                break;
257            }
258            root_id = parent_id.clone();
259            cursor = map
260                .get(&parent_id)
261                .and_then(|parent| parent.parent_id.clone());
262        }
263        Some(SessionAncestry {
264            parent_id: state.parent_id.clone(),
265            child_ids: state.child_ids.clone(),
266            root_id,
267        })
268    })
269}
270
271/// Auto-register a persistent sink for a newly-created session.
272/// Silent no-op on failure — a broken observability sink must never
273/// prevent a session from starting.
274fn try_register_event_log(session_id: &str) {
275    if let Some(log) = crate::event_log::active_event_log() {
276        crate::agent_events::register_sink(
277            session_id,
278            crate::agent_events::EventLogSink::new(log, session_id),
279        );
280        return;
281    }
282    let Ok(dir) = std::env::var("HARN_EVENT_LOG_DIR") else {
283        return;
284    };
285    if dir.is_empty() {
286        return;
287    }
288    let path = std::path::PathBuf::from(dir).join(format!("event_log-{session_id}.jsonl"));
289    if let Ok(sink) = crate::agent_events::JsonlEventSink::open(&path) {
290        crate::agent_events::register_sink(session_id, sink);
291    }
292}
293
294pub fn register_event_log_sink(session_id: &str) {
295    try_register_event_log(session_id);
296}
297
298pub fn close(id: &str) {
299    SESSIONS.with(|s| {
300        s.borrow_mut().remove(id);
301    });
302}
303
304pub fn reset_transcript(id: &str) -> bool {
305    SESSIONS.with(|s| {
306        let mut map = s.borrow_mut();
307        let Some(state) = map.get_mut(id) else {
308            return false;
309        };
310        state.transcript = empty_transcript(id);
311        state.tool_format = None;
312        state.system_prompt = None;
313        state.last_accessed = Instant::now();
314        true
315    })
316}
317
318/// Copy `src`'s transcript into a new session id. Subscribers are NOT
319/// copied — a fork is a conversation branch, not an event fanout.
320///
321/// Touches `src`'s `last_accessed` before evicting, so the fork
322/// operation itself can't make `src` look stale and kick it out of
323/// the LRU just to make room for the new fork.
324pub fn fork(src_id: &str, dst_id: Option<String>) -> Option<String> {
325    let (src_transcript, src_tool_format, src_system_prompt, dst) = SESSIONS.with(|s| {
326        let mut map = s.borrow_mut();
327        let src = map.get_mut(src_id)?;
328        src.last_accessed = Instant::now();
329        let dst = dst_id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
330        let forked_transcript = clone_transcript_with_id(&src.transcript, &dst);
331        Some((
332            forked_transcript,
333            src.tool_format.clone(),
334            src.system_prompt.clone(),
335            dst,
336        ))
337    })?;
338    // Ensure cap is respected when inserting the fork.
339    open_or_create(Some(dst.clone()));
340    SESSIONS.with(|s| {
341        let mut map = s.borrow_mut();
342        if let Some(state) = map.get_mut(&dst) {
343            state.transcript = src_transcript;
344            state.tool_format = src_tool_format;
345            state.system_prompt = src_system_prompt;
346            state.last_accessed = Instant::now();
347        }
348        update_lineage(&mut map, src_id, &dst, None);
349    });
350    // open_or_create evicts BEFORE inserting, so the dst slot is
351    // guaranteed once we get here. The existence check is cheap
352    // insurance against a future refactor that breaks that invariant.
353    if exists(&dst) {
354        Some(dst)
355    } else {
356        None
357    }
358}
359
360/// Fork `src_id` and truncate the destination transcript to the
361/// first `keep_first` messages (#105 — branch-replay). Pairs with the
362/// scrubber: the host picks an event index, rebuilds a message count,
363/// and calls this to spawn a live sibling session that resumes from
364/// the rebuilt state. Subscribers are not carried over (same as
365/// `fork`), so sibling events don't double-fan into the parent's
366/// consumers.
367///
368/// Returns the new session id on success, `None` if `src_id` doesn't
369/// exist.
370pub fn fork_at(src_id: &str, keep_first: usize, dst_id: Option<String>) -> Option<String> {
371    let branched_at_event_index = SESSIONS.with(|s| {
372        let map = s.borrow();
373        let src = map.get(src_id)?;
374        Some(branch_event_index(&src.transcript, keep_first))
375    })?;
376    let new_id = fork(src_id, dst_id)?;
377    link_child_session_with_branch(src_id, &new_id, Some(branched_at_event_index));
378    retain_first(&new_id, keep_first);
379    Some(new_id)
380}
381
382/// Truncate the session transcript to the first `keep_first`
383/// messages (opposite of `trim`, which keeps the last N). Used by
384/// `fork_at` to cut a branch at a scrubber position.
385fn retain_first(id: &str, keep_first: usize) {
386    SESSIONS.with(|s| {
387        let mut map = s.borrow_mut();
388        let Some(state) = map.get_mut(id) else {
389            return;
390        };
391        let Some(dict) = state.transcript.as_dict() else {
392            return;
393        };
394        let dict = dict.clone();
395        let messages: Vec<VmValue> = match dict.get("messages") {
396            Some(VmValue::List(list)) => list.iter().cloned().collect(),
397            _ => Vec::new(),
398        };
399        let retained: Vec<VmValue> = messages.into_iter().take(keep_first).collect();
400        let mut next = dict;
401        next.insert(
402            "events".to_string(),
403            VmValue::List(Rc::new(
404                crate::llm::helpers::transcript_events_from_messages(&retained),
405            )),
406        );
407        next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
408        state.transcript = VmValue::Dict(Rc::new(next));
409        state.last_accessed = Instant::now();
410    });
411}
412
413/// Retain only the last `keep_last` messages in the session transcript.
414/// Returns the kept count (<= keep_last).
415pub fn trim(id: &str, keep_last: usize) -> Option<usize> {
416    SESSIONS.with(|s| {
417        let mut map = s.borrow_mut();
418        let state = map.get_mut(id)?;
419        let dict = state.transcript.as_dict()?.clone();
420        let messages: Vec<VmValue> = match dict.get("messages") {
421            Some(VmValue::List(list)) => list.iter().cloned().collect(),
422            _ => Vec::new(),
423        };
424        let start = messages.len().saturating_sub(keep_last);
425        let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
426        let kept = retained.len();
427        let mut next = dict;
428        next.insert(
429            "events".to_string(),
430            VmValue::List(Rc::new(
431                crate::llm::helpers::transcript_events_from_messages(&retained),
432            )),
433        );
434        next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
435        state.transcript = VmValue::Dict(Rc::new(next));
436        state.last_accessed = Instant::now();
437        Some(kept)
438    })
439}
440
441/// Append a message dict to the session transcript. The message must
442/// have at least a string `role`; anything else is merged verbatim.
443pub fn inject_message(id: &str, message: VmValue) -> Result<(), String> {
444    let Some(msg_dict) = message.as_dict().cloned() else {
445        return Err("agent_session_inject: message must be a dict".into());
446    };
447    let role_ok = matches!(msg_dict.get("role"), Some(VmValue::String(_)));
448    if !role_ok {
449        return Err(
450            "agent_session_inject: message must have a string `role` (user|assistant|tool_result|system)"
451                .into(),
452        );
453    }
454    SESSIONS.with(|s| {
455        let mut map = s.borrow_mut();
456        let Some(state) = map.get_mut(id) else {
457            return Err(format!("agent_session_inject: unknown session id '{id}'"));
458        };
459        let dict = state
460            .transcript
461            .as_dict()
462            .cloned()
463            .unwrap_or_else(BTreeMap::new);
464        let mut messages: Vec<VmValue> = match dict.get("messages") {
465            Some(VmValue::List(list)) => list.iter().cloned().collect(),
466            _ => Vec::new(),
467        };
468        let mut events: Vec<VmValue> = match dict.get("events") {
469            Some(VmValue::List(list)) => list.iter().cloned().collect(),
470            _ => crate::llm::helpers::transcript_events_from_messages(&messages),
471        };
472        let new_message = VmValue::Dict(Rc::new(msg_dict));
473        events.push(crate::llm::helpers::transcript_event_from_message(
474            &new_message,
475        ));
476        messages.push(new_message);
477        let mut next = dict;
478        next.insert("events".to_string(), VmValue::List(Rc::new(events)));
479        next.insert("messages".to_string(), VmValue::List(Rc::new(messages)));
480        state.transcript = VmValue::Dict(Rc::new(next));
481        state.last_accessed = Instant::now();
482        Ok(())
483    })
484}
485
486/// Load the messages vec (as JSON) for this session, for use as prefix
487/// to an agent_loop run. Returns an empty vec if the session doesn't
488/// exist or has no messages.
489pub fn messages_json(id: &str) -> Vec<serde_json::Value> {
490    SESSIONS.with(|s| {
491        let map = s.borrow();
492        let Some(state) = map.get(id) else {
493            return Vec::new();
494        };
495        let Some(dict) = state.transcript.as_dict() else {
496            return Vec::new();
497        };
498        match dict.get("messages") {
499            Some(VmValue::List(list)) => list
500                .iter()
501                .map(crate::llm::helpers::vm_value_to_json)
502                .collect(),
503            _ => Vec::new(),
504        }
505    })
506}
507
508#[derive(Clone, Debug, Default)]
509pub struct SessionPromptState {
510    pub messages: Vec<serde_json::Value>,
511    pub summary: Option<String>,
512}
513
514fn summary_message_json(summary: &str) -> serde_json::Value {
515    serde_json::json!({
516        "role": "user",
517        "content": summary,
518    })
519}
520
521fn messages_begin_with_summary(messages: &[serde_json::Value], summary: &str) -> bool {
522    messages.first().is_some_and(|message| {
523        message.get("role").and_then(|value| value.as_str()) == Some("user")
524            && message.get("content").and_then(|value| value.as_str()) == Some(summary)
525    })
526}
527
528/// Prompt-surface resume state for a persisted session.
529///
530/// Returns the compacted/rehydratable message list plus the transcript's
531/// summary field. When the transcript carries a summary field but its
532/// message list does not already begin with the compacted summary
533/// message, this helper prepends one so session re-entry preserves the
534/// same prompt surface the previous loop was actually using.
535pub fn prompt_state_json(id: &str) -> SessionPromptState {
536    SESSIONS.with(|s| {
537        let map = s.borrow();
538        let Some(state) = map.get(id) else {
539            return SessionPromptState::default();
540        };
541        let Some(dict) = state.transcript.as_dict() else {
542            return SessionPromptState::default();
543        };
544        let mut messages = match dict.get("messages") {
545            Some(VmValue::List(list)) => list
546                .iter()
547                .map(crate::llm::helpers::vm_value_to_json)
548                .collect::<Vec<_>>(),
549            _ => Vec::new(),
550        };
551        let summary = dict.get("summary").and_then(|value| match value {
552            VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
553            _ => None,
554        });
555        if let Some(summary_text) = summary.as_deref() {
556            if !messages_begin_with_summary(&messages, summary_text) {
557                messages.insert(0, summary_message_json(summary_text));
558            }
559        }
560        SessionPromptState { messages, summary }
561    })
562}
563
564/// Overwrite the transcript for this session. Used by `agent_loop` on
565/// exit to persist the synthesized transcript.
566pub fn store_transcript(id: &str, transcript: VmValue) {
567    SESSIONS.with(|s| {
568        if let Some(state) = s.borrow_mut().get_mut(id) {
569            state.transcript = transcript_with_session_metadata(transcript, state);
570            state.last_accessed = Instant::now();
571        }
572    });
573}
574
575/// Append a transcript event to the session without mutating its
576/// message list. Used for orchestration-side lineage events (sub-agent
577/// spawn/completion, workflow hooks, etc.) that should survive
578/// persistence/replay without being replayed back into the model as
579/// conversational messages.
580pub fn append_event(id: &str, event: VmValue) -> Result<(), String> {
581    let Some(event_dict) = event.as_dict() else {
582        return Err("agent_session_append_event: event must be a dict".into());
583    };
584    let kind_ok = matches!(event_dict.get("kind"), Some(VmValue::String(_)));
585    if !kind_ok {
586        return Err("agent_session_append_event: event must have a string `kind`".into());
587    }
588    SESSIONS.with(|s| {
589        let mut map = s.borrow_mut();
590        let Some(state) = map.get_mut(id) else {
591            return Err(format!(
592                "agent_session_append_event: unknown session id '{id}'"
593            ));
594        };
595        let dict = state
596            .transcript
597            .as_dict()
598            .cloned()
599            .unwrap_or_else(BTreeMap::new);
600        let mut events: Vec<VmValue> = match dict.get("events") {
601            Some(VmValue::List(list)) => list.iter().cloned().collect(),
602            _ => dict
603                .get("messages")
604                .and_then(|value| match value {
605                    VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
606                    _ => None,
607                })
608                .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
609                .unwrap_or_default(),
610        };
611        events.push(event);
612        let mut next = dict;
613        next.insert("events".to_string(), VmValue::List(Rc::new(events)));
614        state.transcript = VmValue::Dict(Rc::new(next));
615        state.last_accessed = Instant::now();
616        Ok(())
617    })
618}
619
620/// Replace the transcript's message list wholesale. Used by the
621/// in-loop compaction path, which operates on JSON messages.
622pub fn replace_messages(id: &str, messages: &[serde_json::Value]) {
623    replace_messages_with_summary(id, messages, None);
624}
625
626/// Replace the transcript's message list and optionally update the
627/// `summary` field on the persisted transcript. The compaction path
628/// uses this to publish the human-readable rollup line that
629/// `transcript_summary(transcript)` exposes to host code.
630pub fn replace_messages_with_summary(
631    id: &str,
632    messages: &[serde_json::Value],
633    summary: Option<&str>,
634) {
635    SESSIONS.with(|s| {
636        let mut map = s.borrow_mut();
637        let Some(state) = map.get_mut(id) else {
638            return;
639        };
640        let dict = state
641            .transcript
642            .as_dict()
643            .cloned()
644            .unwrap_or_else(BTreeMap::new);
645        let vm_messages: Vec<VmValue> = messages
646            .iter()
647            .map(crate::stdlib::json_to_vm_value)
648            .collect();
649        let mut next = dict;
650        next.insert(
651            "events".to_string(),
652            VmValue::List(Rc::new(
653                crate::llm::helpers::transcript_events_from_messages(&vm_messages),
654            )),
655        );
656        next.insert("messages".to_string(), VmValue::List(Rc::new(vm_messages)));
657        if let Some(summary) = summary {
658            next.insert(
659                "summary".to_string(),
660                VmValue::String(Rc::from(summary.to_string())),
661            );
662        }
663        state.transcript = VmValue::Dict(Rc::new(next));
664        state.last_accessed = Instant::now();
665    });
666}
667
668pub fn append_subscriber(id: &str, callback: VmValue) {
669    open_or_create(Some(id.to_string()));
670    SESSIONS.with(|s| {
671        if let Some(state) = s.borrow_mut().get_mut(id) {
672            state.subscribers.push(callback);
673            state.last_accessed = Instant::now();
674        }
675    });
676}
677
678pub fn subscribers_for(id: &str) -> Vec<VmValue> {
679    SESSIONS.with(|s| {
680        s.borrow()
681            .get(id)
682            .map(|state| state.subscribers.clone())
683            .unwrap_or_default()
684    })
685}
686
687pub fn subscriber_count(id: &str) -> usize {
688    SESSIONS.with(|s| {
689        s.borrow()
690            .get(id)
691            .map(|state| state.subscribers.len())
692            .unwrap_or(0)
693    })
694}
695
696/// Persist the set of active skill names for session resume. Called at
697/// the end of an agent_loop run; the next `open_or_create` for this id
698/// reads them back via [`active_skills`].
699pub fn set_active_skills(id: &str, skills: Vec<String>) {
700    SESSIONS.with(|s| {
701        if let Some(state) = s.borrow_mut().get_mut(id) {
702            state.active_skills = skills;
703            state.last_accessed = Instant::now();
704        }
705    });
706}
707
708/// Skills that were active at the end of the previous agent_loop run
709/// against this session. Returns an empty vec when the session doesn't
710/// exist or nothing was persisted.
711pub fn active_skills(id: &str) -> Vec<String> {
712    SESSIONS.with(|s| {
713        s.borrow()
714            .get(id)
715            .map(|state| state.active_skills.clone())
716            .unwrap_or_default()
717    })
718}
719
720/// Claim the tool-calling contract for a session.
721///
722/// The first loop against a named session records its `tool_format`.
723/// Later re-entry must use the same format so prompt/history generated
724/// under a text contract is never replayed as native, or vice versa.
725pub fn claim_tool_format(id: &str, tool_format: &str) -> Result<(), String> {
726    let tool_format = tool_format.trim();
727    if tool_format.is_empty() {
728        return Ok(());
729    }
730    SESSIONS.with(|s| {
731        let mut map = s.borrow_mut();
732        let Some(state) = map.get_mut(id) else {
733            return Err(format!("agent session '{id}' does not exist"));
734        };
735        match state.tool_format.as_deref() {
736            Some(existing) if existing != tool_format => Err(format!(
737                "agent session '{id}' is locked to tool_format='{existing}', but this run requested tool_format='{tool_format}'. Start a new session or fork/reset the transcript before changing tool mode."
738            )),
739            Some(_) => {
740                state.last_accessed = Instant::now();
741                Ok(())
742            }
743            None => {
744                state.tool_format = Some(tool_format.to_string());
745                state.last_accessed = Instant::now();
746                Ok(())
747            }
748        }
749    })
750}
751
752pub fn tool_format(id: &str) -> Option<String> {
753    SESSIONS.with(|s| {
754        s.borrow()
755            .get(id)
756            .and_then(|state| state.tool_format.clone())
757    })
758}
759
760pub fn record_system_prompt(id: &str, system_prompt: &str) -> Result<(), String> {
761    let system_prompt = system_prompt.trim();
762    if system_prompt.is_empty() {
763        return Ok(());
764    }
765    SESSIONS.with(|s| {
766        let mut map = s.borrow_mut();
767        let Some(state) = map.get_mut(id) else {
768            return Err(format!("agent session '{id}' does not exist"));
769        };
770        let changed = state.system_prompt.as_deref() != Some(system_prompt);
771        state.system_prompt = Some(system_prompt.to_string());
772        let dict = state
773            .transcript
774            .as_dict()
775            .cloned()
776            .unwrap_or_else(BTreeMap::new);
777        let mut next = dict;
778        apply_system_prompt_metadata(&mut next, system_prompt);
779        if changed {
780            let mut events: Vec<VmValue> = match next.get("events") {
781                Some(VmValue::List(list)) => list.iter().cloned().collect(),
782                _ => Vec::new(),
783            };
784            events.push(crate::llm::helpers::transcript_event(
785                "system_prompt",
786                "system",
787                "internal",
788                "",
789                Some(crate::llm::helpers::system_prompt_event_metadata(
790                    system_prompt,
791                )),
792            ));
793            next.insert("events".to_string(), VmValue::List(Rc::new(events)));
794        }
795        state.transcript = VmValue::Dict(Rc::new(next));
796        state.last_accessed = Instant::now();
797        Ok(())
798    })
799}
800
801pub fn system_prompt(id: &str) -> Option<String> {
802    SESSIONS.with(|s| {
803        s.borrow()
804            .get(id)
805            .and_then(|state| state.system_prompt.clone())
806    })
807}
808
809fn empty_transcript(id: &str) -> VmValue {
810    use crate::llm::helpers::new_transcript_with;
811    new_transcript_with(Some(id.to_string()), Vec::new(), None, None)
812}
813
814fn clone_transcript_with_id(transcript: &VmValue, new_id: &str) -> VmValue {
815    let Some(dict) = transcript.as_dict() else {
816        return empty_transcript(new_id);
817    };
818    let mut next = dict.clone();
819    next.insert(
820        "id".to_string(),
821        VmValue::String(Rc::from(new_id.to_string())),
822    );
823    VmValue::Dict(Rc::new(next))
824}
825
826fn clone_transcript_with_parent(transcript: &VmValue, parent_id: &str) -> VmValue {
827    let Some(dict) = transcript.as_dict() else {
828        return transcript.clone();
829    };
830    let mut next = dict.clone();
831    let metadata = match next.get("metadata") {
832        Some(VmValue::Dict(metadata)) => {
833            let mut metadata = metadata.as_ref().clone();
834            metadata.insert(
835                "parent_session_id".to_string(),
836                VmValue::String(Rc::from(parent_id.to_string())),
837            );
838            VmValue::Dict(Rc::new(metadata))
839        }
840        _ => VmValue::Dict(Rc::new(BTreeMap::from([(
841            "parent_session_id".to_string(),
842            VmValue::String(Rc::from(parent_id.to_string())),
843        )]))),
844    };
845    next.insert("metadata".to_string(), metadata);
846    VmValue::Dict(Rc::new(next))
847}
848
849fn apply_system_prompt_metadata(next: &mut BTreeMap<String, VmValue>, system_prompt: &str) {
850    let mut metadata = match next.get("metadata") {
851        Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
852        _ => BTreeMap::new(),
853    };
854    metadata.insert(
855        "system_prompt".to_string(),
856        crate::stdlib::json_to_vm_value(&crate::llm::helpers::system_prompt_metadata(
857            system_prompt,
858        )),
859    );
860    next.insert("metadata".to_string(), VmValue::Dict(Rc::new(metadata)));
861}
862
863fn transcript_with_session_metadata(transcript: VmValue, state: &SessionState) -> VmValue {
864    let Some(dict) = transcript.as_dict() else {
865        return transcript;
866    };
867    let mut next = dict.clone();
868    let mut metadata = match next.get("metadata") {
869        Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
870        _ => BTreeMap::new(),
871    };
872    if let Some(tool_format) = state.tool_format.as_ref() {
873        metadata.insert(
874            "tool_format".to_string(),
875            VmValue::String(Rc::from(tool_format.clone())),
876        );
877        metadata.insert("tool_mode_locked".to_string(), VmValue::Bool(true));
878    }
879    if let Some(system_prompt) = state.system_prompt.as_ref() {
880        metadata.insert(
881            "system_prompt".to_string(),
882            crate::stdlib::json_to_vm_value(&crate::llm::helpers::system_prompt_metadata(
883                system_prompt,
884            )),
885        );
886    }
887    if !metadata.is_empty() {
888        next.insert("metadata".to_string(), VmValue::Dict(Rc::new(metadata)));
889    }
890    VmValue::Dict(Rc::new(next))
891}
892
893fn session_snapshot(state: &SessionState) -> VmValue {
894    let transcript = transcript_with_session_metadata(state.transcript.clone(), state);
895    let Some(dict) = transcript.as_dict() else {
896        return state.transcript.clone();
897    };
898    let mut next = dict.clone();
899    next.insert(
900        "parent_id".to_string(),
901        state
902            .parent_id
903            .as_ref()
904            .map(|id| VmValue::String(Rc::from(id.clone())))
905            .unwrap_or(VmValue::Nil),
906    );
907    next.insert(
908        "child_ids".to_string(),
909        VmValue::List(Rc::new(
910            state
911                .child_ids
912                .iter()
913                .cloned()
914                .map(|id| VmValue::String(Rc::from(id)))
915                .collect(),
916        )),
917    );
918    next.insert(
919        "branched_at_event_index".to_string(),
920        state
921            .branched_at_event_index
922            .map(|index| VmValue::Int(index as i64))
923            .unwrap_or(VmValue::Nil),
924    );
925    VmValue::Dict(Rc::new(next))
926}
927
928fn update_lineage(
929    map: &mut HashMap<String, SessionState>,
930    parent_id: &str,
931    child_id: &str,
932    branched_at_event_index: Option<usize>,
933) {
934    let old_parent_id = map.get(child_id).and_then(|child| child.parent_id.clone());
935    if let Some(old_parent_id) = old_parent_id.filter(|old_parent_id| old_parent_id != parent_id) {
936        if let Some(old_parent) = map.get_mut(&old_parent_id) {
937            old_parent.child_ids.retain(|id| id != child_id);
938            old_parent.last_accessed = Instant::now();
939        }
940    }
941    if let Some(parent) = map.get_mut(parent_id) {
942        parent.last_accessed = Instant::now();
943        if !parent.child_ids.iter().any(|id| id == child_id) {
944            parent.child_ids.push(child_id.to_string());
945        }
946    }
947    if let Some(child) = map.get_mut(child_id) {
948        child.last_accessed = Instant::now();
949        child.parent_id = Some(parent_id.to_string());
950        child.branched_at_event_index = branched_at_event_index;
951        child.transcript = clone_transcript_with_parent(&child.transcript, parent_id);
952    }
953}
954
955fn branch_event_index(transcript: &VmValue, keep_first: usize) -> usize {
956    if keep_first == 0 {
957        return 0;
958    }
959    let Some(dict) = transcript.as_dict() else {
960        return keep_first;
961    };
962    let Some(VmValue::List(events)) = dict.get("events") else {
963        return keep_first;
964    };
965    let mut retained_messages = 0usize;
966    for (index, event) in events.iter().enumerate() {
967        let kind = event
968            .as_dict()
969            .and_then(|dict| dict.get("kind"))
970            .map(VmValue::display);
971        if matches!(kind.as_deref(), Some("message" | "tool_result")) {
972            retained_messages += 1;
973            if retained_messages == keep_first {
974                return index + 1;
975            }
976        }
977    }
978    events.len()
979}
980
981#[cfg(test)]
982mod tests {
983    use super::*;
984    use crate::agent_events::{
985        emit_event, reset_all_sinks, session_external_sink_count, AgentEvent,
986    };
987    use crate::event_log::{active_event_log, EventLog, Topic};
988    use std::collections::BTreeMap;
989
990    fn make_msg(role: &str, content: &str) -> VmValue {
991        let mut m: BTreeMap<String, VmValue> = BTreeMap::new();
992        m.insert("role".to_string(), VmValue::String(Rc::from(role)));
993        m.insert("content".to_string(), VmValue::String(Rc::from(content)));
994        VmValue::Dict(Rc::new(m))
995    }
996
997    fn message_count(id: &str) -> usize {
998        SESSIONS.with(|s| {
999            let map = s.borrow();
1000            let Some(state) = map.get(id) else { return 0 };
1001            let Some(dict) = state.transcript.as_dict() else {
1002                return 0;
1003            };
1004            match dict.get("messages") {
1005                Some(VmValue::List(list)) => list.len(),
1006                _ => 0,
1007            }
1008        })
1009    }
1010
1011    fn event_count_by_kind(id: &str, expected_kind: &str) -> usize {
1012        snapshot(id)
1013            .and_then(|snapshot| snapshot.as_dict().cloned())
1014            .and_then(|dict| dict.get("events").cloned())
1015            .and_then(|events| match events {
1016                VmValue::List(events) => Some(
1017                    events
1018                        .iter()
1019                        .filter(|event| {
1020                            event
1021                                .as_dict()
1022                                .and_then(|dict| dict.get("kind"))
1023                                .map(VmValue::display)
1024                                .as_deref()
1025                                == Some(expected_kind)
1026                        })
1027                        .count(),
1028                ),
1029                _ => None,
1030            })
1031            .unwrap_or(0)
1032    }
1033
1034    #[test]
1035    fn records_system_prompt_as_metadata_event_without_message() {
1036        reset_session_store();
1037        let id = open_or_create(Some("system-prompt-session".into()));
1038        record_system_prompt(&id, "Follow the workflow.").unwrap();
1039        record_system_prompt(&id, "Follow the workflow.").unwrap();
1040        inject_message(&id, make_msg("user", "hello")).unwrap();
1041
1042        let snapshot = snapshot(&id).expect("session snapshot");
1043        let metadata = snapshot
1044            .as_dict()
1045            .and_then(|dict| dict.get("metadata"))
1046            .and_then(VmValue::as_dict)
1047            .expect("metadata");
1048        let system_prompt = metadata
1049            .get("system_prompt")
1050            .and_then(VmValue::as_dict)
1051            .expect("system prompt metadata");
1052        assert_eq!(
1053            system_prompt
1054                .get("content")
1055                .map(VmValue::display)
1056                .as_deref(),
1057            Some("Follow the workflow.")
1058        );
1059        assert_eq!(message_count(&id), 1);
1060        assert_eq!(event_count_by_kind(&id, "system_prompt"), 1);
1061    }
1062
1063    #[test]
1064    fn fork_at_truncates_destination_to_keep_first() {
1065        reset_session_store();
1066        let src = open_or_create(Some("src-fork-at".into()));
1067        inject_message(&src, make_msg("user", "a")).unwrap();
1068        inject_message(&src, make_msg("assistant", "b")).unwrap();
1069        inject_message(&src, make_msg("user", "c")).unwrap();
1070        inject_message(&src, make_msg("assistant", "d")).unwrap();
1071        assert_eq!(message_count(&src), 4);
1072
1073        let dst = fork_at(&src, 2, Some("dst-fork-at".into())).expect("fork_at");
1074        assert_ne!(dst, src);
1075        assert_eq!(message_count(&dst), 2, "branched at message index 2");
1076        assert_eq!(
1077            snapshot(&dst)
1078                .and_then(|value| value.as_dict().cloned())
1079                .and_then(|dict| dict
1080                    .get("branched_at_event_index")
1081                    .and_then(VmValue::as_int)),
1082            Some(2)
1083        );
1084        // Source untouched.
1085        assert_eq!(message_count(&src), 4);
1086        // Subscribers not carried — forks start with a clean fanout list.
1087        assert_eq!(subscriber_count(&dst), 0);
1088        reset_session_store();
1089    }
1090
1091    #[test]
1092    fn fork_at_on_unknown_source_returns_none() {
1093        reset_session_store();
1094        assert!(fork_at("does-not-exist", 3, None).is_none());
1095    }
1096
1097    #[test]
1098    fn child_sessions_record_parent_lineage() {
1099        reset_session_store();
1100        let parent = open_or_create(Some("parent-session".into()));
1101        let child = open_child_session(&parent, Some("child-session".into()));
1102        assert_eq!(parent_id(&child).as_deref(), Some("parent-session"));
1103        assert_eq!(child_ids(&parent), vec!["child-session".to_string()]);
1104        assert_eq!(
1105            ancestry(&child),
1106            Some(SessionAncestry {
1107                parent_id: Some("parent-session".to_string()),
1108                child_ids: Vec::new(),
1109                root_id: "parent-session".to_string(),
1110            })
1111        );
1112
1113        let transcript = snapshot(&child).expect("child transcript");
1114        let transcript = transcript.as_dict().expect("child snapshot");
1115        let metadata = transcript
1116            .get("metadata")
1117            .and_then(|value| value.as_dict())
1118            .expect("child metadata");
1119        assert!(
1120            matches!(transcript.get("parent_id"), Some(VmValue::String(value)) if value.as_ref() == "parent-session")
1121        );
1122        assert!(
1123            matches!(transcript.get("child_ids"), Some(VmValue::List(children)) if children.is_empty())
1124        );
1125        assert!(matches!(
1126            transcript.get("branched_at_event_index"),
1127            Some(VmValue::Nil)
1128        ));
1129        assert!(matches!(
1130            metadata.get("parent_session_id"),
1131            Some(VmValue::String(value)) if value.as_ref() == "parent-session"
1132        ));
1133    }
1134
1135    #[test]
1136    fn branch_event_index_counts_non_message_events() {
1137        reset_session_store();
1138        let src = open_or_create(Some("branch-event-index".into()));
1139        let transcript = VmValue::Dict(Rc::new(BTreeMap::from([
1140            ("id".to_string(), VmValue::String(Rc::from(src.clone()))),
1141            (
1142                "messages".to_string(),
1143                VmValue::List(Rc::new(vec![
1144                    make_msg("user", "a"),
1145                    make_msg("assistant", "b"),
1146                ])),
1147            ),
1148            (
1149                "events".to_string(),
1150                VmValue::List(Rc::new(vec![
1151                    VmValue::Dict(Rc::new(BTreeMap::from([(
1152                        "kind".to_string(),
1153                        VmValue::String(Rc::from("message")),
1154                    )]))),
1155                    VmValue::Dict(Rc::new(BTreeMap::from([(
1156                        "kind".to_string(),
1157                        VmValue::String(Rc::from("sub_agent_start")),
1158                    )]))),
1159                    VmValue::Dict(Rc::new(BTreeMap::from([(
1160                        "kind".to_string(),
1161                        VmValue::String(Rc::from("message")),
1162                    )]))),
1163                ])),
1164            ),
1165        ])));
1166        store_transcript(&src, transcript);
1167
1168        let dst = fork_at(&src, 2, Some("branch-event-index-child".into())).expect("fork_at");
1169        assert_eq!(
1170            snapshot(&dst)
1171                .and_then(|value| value.as_dict().cloned())
1172                .and_then(|dict| dict
1173                    .get("branched_at_event_index")
1174                    .and_then(VmValue::as_int)),
1175            Some(3)
1176        );
1177    }
1178
1179    #[test]
1180    fn child_session_records_lineage_without_reusing_parent_transcript() {
1181        reset_session_store();
1182        let parent = open_or_create(Some("parent-fork-parent".into()));
1183        inject_message(&parent, make_msg("user", "parent context")).unwrap();
1184        claim_tool_format(&parent, "native").unwrap();
1185
1186        let child = open_child_session(&parent, Some("parent-fork-child".into()));
1187        assert_eq!(message_count(&parent), 1);
1188        assert_eq!(message_count(&child), 0);
1189        assert_eq!(tool_format(&child), None);
1190        assert_eq!(parent_id(&child).as_deref(), Some(parent.as_str()));
1191    }
1192
1193    #[test]
1194    fn prompt_state_prepends_summary_message_when_missing_from_messages() {
1195        reset_session_store();
1196        let session = open_or_create(Some("prompt-state-summary".into()));
1197        let transcript = crate::llm::helpers::new_transcript_with_events(
1198            Some(session.clone()),
1199            vec![make_msg("assistant", "latest answer")],
1200            Some("[auto-compacted 2 older messages]\nsummary".to_string()),
1201            None,
1202            Vec::new(),
1203            Vec::new(),
1204            Some("active"),
1205        );
1206        store_transcript(&session, transcript);
1207
1208        let prompt = prompt_state_json(&session);
1209        assert_eq!(
1210            prompt.summary.as_deref(),
1211            Some("[auto-compacted 2 older messages]\nsummary")
1212        );
1213        assert_eq!(prompt.messages.len(), 2);
1214        assert_eq!(prompt.messages[0]["role"].as_str(), Some("user"));
1215        assert_eq!(
1216            prompt.messages[0]["content"].as_str(),
1217            Some("[auto-compacted 2 older messages]\nsummary"),
1218        );
1219        assert_eq!(prompt.messages[1]["role"].as_str(), Some("assistant"));
1220    }
1221
1222    #[tokio::test(flavor = "current_thread", start_paused = true)]
1223    async fn open_or_create_registers_event_log_sink_when_active_log_is_installed() {
1224        reset_all_sinks();
1225        crate::event_log::reset_active_event_log();
1226        let dir = tempfile::tempdir().expect("tempdir");
1227        crate::event_log::install_default_for_base_dir(dir.path()).expect("install event log");
1228
1229        let session = open_or_create(Some("event-log-session".into()));
1230        assert_eq!(session_external_sink_count(&session), 1);
1231
1232        emit_event(&AgentEvent::TurnStart {
1233            session_id: session.clone(),
1234            iteration: 0,
1235        });
1236        tokio::time::sleep(std::time::Duration::from_millis(25)).await;
1237
1238        let topic = Topic::new("observability.agent_events.event-log-session").unwrap();
1239        let log = active_event_log().expect("active event log");
1240        let events = log.read_range(&topic, None, usize::MAX).await.unwrap();
1241        assert_eq!(events.len(), 1);
1242        assert_eq!(events[0].1.kind, "turn_start");
1243
1244        crate::event_log::reset_active_event_log();
1245        reset_all_sinks();
1246    }
1247}