Skip to main content

harn_vm/
agent_sessions.rs

1//! First-class session storage.
2//!
3//! A session owns three things:
4//!
5//! 1. A transcript dict (messages, events, summary, metadata, …).
6//! 2. Closure subscribers that fire on agent-loop events for this session.
7//! 3. Its own lifecycle (open, reset, fork, trim, compact, close).
8//!
9//! Storage is thread-local because `VmValue` contains `Rc`, which is
10//! neither `Send` nor `Sync`. The agent loop runs on a tokio
11//! current-thread worker, so all session reads and writes happen on the
12//! same thread. The closure-subscribers register, fire, and unregister
13//! on that same thread.
14//!
15//! Lifecycle is explicit. Builtins (`agent_session_open`,
16//! `_reset`, `_fork`, `_fork_at`, `_close`, `_trim`, `_compact`,
17//! `_inject`, `_exists`, `_length`, `_snapshot`, `_ancestry`) drive
18//! the store directly — there is no "policy" config dict that
19//! performs lifecycle as a side effect.
20
21use std::cell::{Cell, RefCell};
22use std::collections::{BTreeMap, HashMap, HashSet};
23use std::rc::Rc;
24use std::time::Instant;
25
26use crate::value::VmValue;
27
28/// Default cap on concurrent sessions per VM thread. Beyond this the
29/// least-recently-accessed session is evicted on the next `open`.
30pub const DEFAULT_SESSION_CAP: usize = 128;
31
32pub struct SessionState {
33    pub id: String,
34    pub transcript: VmValue,
35    pub subscribers: Vec<VmValue>,
36    pub created_at: String,
37    pub last_accessed: Instant,
38    pub parent_id: Option<String>,
39    pub child_ids: Vec<String>,
40    pub branched_at_event_index: Option<usize>,
41    /// Names of skills that were active at the end of the most recent
42    /// `agent_loop` run on this session. Empty when no skills were
43    /// matched, when the skill system wasn't used, or when the
44    /// deactivation phase cleared them. Re-entering the session
45    /// restores these as the initial active set before matching runs.
46    pub active_skills: Vec<String>,
47    /// Tool-calling protocol claimed by the first agent loop that uses
48    /// this session. A transcript is only replayable under the same
49    /// contract that produced its prompt/history.
50    pub tool_format: Option<String>,
51    /// Stable session-level system prompt material. This is transcript
52    /// metadata, not a replay message: providers receive it through
53    /// their system/developer instruction channel on each call.
54    pub system_prompt: Option<String>,
55}
56
57impl SessionState {
58    fn new(id: String) -> Self {
59        let now = Instant::now();
60        let transcript = empty_transcript(&id);
61        Self {
62            id,
63            transcript,
64            subscribers: Vec::new(),
65            created_at: crate::orchestration::now_rfc3339(),
66            last_accessed: now,
67            parent_id: None,
68            child_ids: Vec::new(),
69            branched_at_event_index: None,
70            active_skills: Vec::new(),
71            tool_format: None,
72            system_prompt: None,
73        }
74    }
75}
76
77#[derive(Clone, Debug, PartialEq, Eq)]
78pub struct SessionAncestry {
79    pub parent_id: Option<String>,
80    pub child_ids: Vec<String>,
81    pub root_id: String,
82}
83
84thread_local! {
85    static SESSIONS: RefCell<HashMap<String, SessionState>> = RefCell::new(HashMap::new());
86    static SESSION_CAP: Cell<usize> = const { Cell::new(DEFAULT_SESSION_CAP) };
87    static CURRENT_SESSION_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
88}
89
90pub struct CurrentSessionGuard {
91    active: bool,
92}
93
94impl Drop for CurrentSessionGuard {
95    fn drop(&mut self) {
96        if self.active {
97            pop_current_session();
98        }
99    }
100}
101
102/// Set the per-thread session cap. Primarily for tests; production VMs
103/// inherit the default.
104pub fn set_session_cap(cap: usize) {
105    SESSION_CAP.with(|c| c.set(cap.max(1)));
106}
107
108pub fn session_cap() -> usize {
109    SESSION_CAP.with(|c| c.get())
110}
111
112/// Clear the session store. Wired into `reset_llm_state` for test isolation.
113pub fn reset_session_store() {
114    SESSIONS.with(|s| s.borrow_mut().clear());
115    CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().clear());
116}
117
118pub(crate) fn push_current_session(id: String) {
119    if id.is_empty() {
120        return;
121    }
122    CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().push(id));
123}
124
125pub(crate) fn pop_current_session() {
126    CURRENT_SESSION_STACK.with(|stack| {
127        let _ = stack.borrow_mut().pop();
128    });
129}
130
131pub fn current_session_id() -> Option<String> {
132    CURRENT_SESSION_STACK.with(|stack| stack.borrow().last().cloned())
133}
134
135pub fn enter_current_session(id: impl Into<String>) -> CurrentSessionGuard {
136    let id = id.into();
137    if id.trim().is_empty() {
138        return CurrentSessionGuard { active: false };
139    }
140    push_current_session(id);
141    CurrentSessionGuard { active: true }
142}
143
144pub fn exists(id: &str) -> bool {
145    SESSIONS.with(|s| s.borrow().contains_key(id))
146}
147
148pub fn length(id: &str) -> Option<usize> {
149    SESSIONS.with(|s| {
150        s.borrow().get(id).map(|state| {
151            state
152                .transcript
153                .as_dict()
154                .and_then(|d| d.get("messages"))
155                .and_then(|v| match v {
156                    VmValue::List(list) => Some(list.len()),
157                    _ => None,
158                })
159                .unwrap_or(0)
160        })
161    })
162}
163
164pub fn snapshot(id: &str) -> Option<VmValue> {
165    SESSIONS.with(|s| s.borrow().get(id).map(session_snapshot))
166}
167
168/// Return the canonical transcript surface for APIs whose result field is
169/// named `transcript`. Session-only fields stay on `agent_session_snapshot`.
170pub fn transcript(id: &str) -> Option<VmValue> {
171    SESSIONS.with(|s| {
172        s.borrow()
173            .get(id)
174            .map(|state| transcript_with_session_metadata(state.transcript.clone(), state))
175    })
176}
177
178/// Open a session, or create it if missing. Returns the resolved id.
179///
180/// Newly-created sessions auto-register an event-log-backed sink when a
181/// generalized [`crate::event_log::EventLog`] has been installed for the
182/// current VM thread. For legacy env-driven workflows that still point
183/// `HARN_EVENT_LOG_DIR` at a directory, we preserve the older JSONL sink
184/// as a compatibility fallback. Re-opening an existing session does not
185/// re-register — sinks are per-session, owned by the first opener.
186pub fn open_or_create(id: Option<String>) -> String {
187    let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
188    let parent_session = current_session_id();
189    let mut was_new = false;
190    SESSIONS.with(|s| {
191        let mut map = s.borrow_mut();
192        if let Some(state) = map.get_mut(&resolved) {
193            state.last_accessed = Instant::now();
194            return;
195        }
196        was_new = true;
197        let cap = SESSION_CAP.with(|c| c.get());
198        if map.len() >= cap {
199            if let Some(victim) = map
200                .iter()
201                .min_by_key(|(_, state)| state.last_accessed)
202                .map(|(id, _)| id.clone())
203            {
204                map.remove(&victim);
205            }
206        }
207        map.insert(resolved.clone(), SessionState::new(resolved.clone()));
208    });
209    if was_new {
210        if let Some(parent) = parent_session.as_deref() {
211            crate::agent_events::mirror_session_sinks(parent, &resolved);
212        }
213        try_register_event_log(&resolved);
214    }
215    resolved
216}
217
218pub fn open_child_session(parent_id: &str, id: Option<String>) -> String {
219    let resolved = open_or_create(id);
220    link_child_session(parent_id, &resolved);
221    resolved
222}
223
224pub fn link_child_session(parent_id: &str, child_id: &str) {
225    link_child_session_with_branch(parent_id, child_id, None);
226}
227
228pub fn link_child_session_with_branch(
229    parent_id: &str,
230    child_id: &str,
231    branched_at_event_index: Option<usize>,
232) {
233    if parent_id == child_id {
234        return;
235    }
236    open_or_create(Some(parent_id.to_string()));
237    open_or_create(Some(child_id.to_string()));
238    SESSIONS.with(|s| {
239        let mut map = s.borrow_mut();
240        update_lineage(&mut map, parent_id, child_id, branched_at_event_index);
241    });
242}
243
244pub fn parent_id(id: &str) -> Option<String> {
245    SESSIONS.with(|s| s.borrow().get(id).and_then(|state| state.parent_id.clone()))
246}
247
248pub fn child_ids(id: &str) -> Vec<String> {
249    SESSIONS.with(|s| {
250        s.borrow()
251            .get(id)
252            .map(|state| state.child_ids.clone())
253            .unwrap_or_default()
254    })
255}
256
257pub fn ancestry(id: &str) -> Option<SessionAncestry> {
258    SESSIONS.with(|s| {
259        let map = s.borrow();
260        let state = map.get(id)?;
261        let mut root_id = state.id.clone();
262        let mut cursor = state.parent_id.clone();
263        let mut seen = HashSet::from([state.id.clone()]);
264        while let Some(parent_id) = cursor {
265            if !seen.insert(parent_id.clone()) {
266                break;
267            }
268            root_id = parent_id.clone();
269            cursor = map
270                .get(&parent_id)
271                .and_then(|parent| parent.parent_id.clone());
272        }
273        Some(SessionAncestry {
274            parent_id: state.parent_id.clone(),
275            child_ids: state.child_ids.clone(),
276            root_id,
277        })
278    })
279}
280
281/// Auto-register a persistent sink for a newly-created session.
282/// Silent no-op on failure — a broken observability sink must never
283/// prevent a session from starting.
284fn try_register_event_log(session_id: &str) {
285    if let Some(log) = crate::event_log::active_event_log() {
286        crate::agent_events::register_sink(
287            session_id,
288            crate::agent_events::EventLogSink::new(log, session_id),
289        );
290        return;
291    }
292    let Ok(dir) = std::env::var("HARN_EVENT_LOG_DIR") else {
293        return;
294    };
295    if dir.is_empty() {
296        return;
297    }
298    let path = std::path::PathBuf::from(dir).join(format!("event_log-{session_id}.jsonl"));
299    if let Ok(sink) = crate::agent_events::JsonlEventSink::open(&path) {
300        crate::agent_events::register_sink(session_id, sink);
301    }
302}
303
304pub fn register_event_log_sink(session_id: &str) {
305    try_register_event_log(session_id);
306}
307
308pub fn close(id: &str) {
309    SESSIONS.with(|s| {
310        s.borrow_mut().remove(id);
311    });
312}
313
314pub fn close_with_status(
315    id: &str,
316    reason: impl Into<String>,
317    status: impl Into<String>,
318    metadata: serde_json::Value,
319) -> bool {
320    if !exists(id) {
321        return false;
322    }
323    let reason = reason.into();
324    let status = status.into();
325    let event_metadata = serde_json::json!({
326        "reason": reason,
327        "status": status,
328        "metadata": metadata,
329    });
330    let transcript_event = crate::llm::helpers::transcript_event(
331        "agent_session_closed",
332        "system",
333        "internal",
334        "Agent session closed",
335        Some(event_metadata.clone()),
336    );
337    let _ = append_event(id, transcript_event);
338    crate::llm::emit_live_agent_event_sync(&crate::agent_events::AgentEvent::SessionClosed {
339        session_id: id.to_string(),
340        reason,
341        status,
342        metadata,
343    });
344    close(id);
345    crate::agent_events::clear_session_sinks(id);
346    true
347}
348
349pub fn reset_transcript(id: &str) -> bool {
350    SESSIONS.with(|s| {
351        let mut map = s.borrow_mut();
352        let Some(state) = map.get_mut(id) else {
353            return false;
354        };
355        state.transcript = empty_transcript(id);
356        state.tool_format = None;
357        state.system_prompt = None;
358        state.last_accessed = Instant::now();
359        true
360    })
361}
362
363/// Copy `src`'s transcript into a new session id. Subscribers are NOT
364/// copied — a fork is a conversation branch, not an event fanout.
365///
366/// Touches `src`'s `last_accessed` before evicting, so the fork
367/// operation itself can't make `src` look stale and kick it out of
368/// the LRU just to make room for the new fork.
369pub fn fork(src_id: &str, dst_id: Option<String>) -> Option<String> {
370    let (src_transcript, src_tool_format, src_system_prompt, dst) = SESSIONS.with(|s| {
371        let mut map = s.borrow_mut();
372        let src = map.get_mut(src_id)?;
373        src.last_accessed = Instant::now();
374        let dst = dst_id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
375        let forked_transcript = clone_transcript_with_id(&src.transcript, &dst);
376        Some((
377            forked_transcript,
378            src.tool_format.clone(),
379            src.system_prompt.clone(),
380            dst,
381        ))
382    })?;
383    // Ensure cap is respected when inserting the fork.
384    open_or_create(Some(dst.clone()));
385    SESSIONS.with(|s| {
386        let mut map = s.borrow_mut();
387        if let Some(state) = map.get_mut(&dst) {
388            state.transcript = src_transcript;
389            state.tool_format = src_tool_format;
390            state.system_prompt = src_system_prompt;
391            state.last_accessed = Instant::now();
392        }
393        update_lineage(&mut map, src_id, &dst, None);
394    });
395    // open_or_create evicts BEFORE inserting, so the dst slot is
396    // guaranteed once we get here. The existence check is cheap
397    // insurance against a future refactor that breaks that invariant.
398    if exists(&dst) {
399        Some(dst)
400    } else {
401        None
402    }
403}
404
405/// Fork `src_id` and truncate the destination transcript to the
406/// first `keep_first` messages (#105 — branch-replay). Pairs with the
407/// scrubber: the host picks an event index, rebuilds a message count,
408/// and calls this to spawn a live sibling session that resumes from
409/// the rebuilt state. Subscribers are not carried over (same as
410/// `fork`), so sibling events don't double-fan into the parent's
411/// consumers.
412///
413/// Returns the new session id on success, `None` if `src_id` doesn't
414/// exist.
415pub fn fork_at(src_id: &str, keep_first: usize, dst_id: Option<String>) -> Option<String> {
416    let branched_at_event_index = SESSIONS.with(|s| {
417        let map = s.borrow();
418        let src = map.get(src_id)?;
419        Some(branch_event_index(&src.transcript, keep_first))
420    })?;
421    let new_id = fork(src_id, dst_id)?;
422    link_child_session_with_branch(src_id, &new_id, Some(branched_at_event_index));
423    retain_first(&new_id, keep_first);
424    Some(new_id)
425}
426
427/// Truncate the session transcript to the first `keep_first`
428/// messages (opposite of `trim`, which keeps the last N). Used by
429/// `fork_at` to cut a branch at a scrubber position.
430fn retain_first(id: &str, keep_first: usize) {
431    SESSIONS.with(|s| {
432        let mut map = s.borrow_mut();
433        let Some(state) = map.get_mut(id) else {
434            return;
435        };
436        let Some(dict) = state.transcript.as_dict() else {
437            return;
438        };
439        let dict = dict.clone();
440        let messages: Vec<VmValue> = match dict.get("messages") {
441            Some(VmValue::List(list)) => list.iter().cloned().collect(),
442            _ => Vec::new(),
443        };
444        let retained: Vec<VmValue> = messages.into_iter().take(keep_first).collect();
445        let mut next = dict;
446        next.insert(
447            "events".to_string(),
448            VmValue::List(Rc::new(
449                crate::llm::helpers::transcript_events_from_messages(&retained),
450            )),
451        );
452        next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
453        state.transcript = VmValue::Dict(Rc::new(next));
454        state.last_accessed = Instant::now();
455    });
456}
457
458/// Retain only the last `keep_last` messages in the session transcript.
459/// Returns the kept count (<= keep_last).
460pub fn trim(id: &str, keep_last: usize) -> Option<usize> {
461    SESSIONS.with(|s| {
462        let mut map = s.borrow_mut();
463        let state = map.get_mut(id)?;
464        let dict = state.transcript.as_dict()?.clone();
465        let messages: Vec<VmValue> = match dict.get("messages") {
466            Some(VmValue::List(list)) => list.iter().cloned().collect(),
467            _ => Vec::new(),
468        };
469        let start = messages.len().saturating_sub(keep_last);
470        let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
471        let kept = retained.len();
472        let mut next = dict;
473        next.insert(
474            "events".to_string(),
475            VmValue::List(Rc::new(
476                crate::llm::helpers::transcript_events_from_messages(&retained),
477            )),
478        );
479        next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
480        state.transcript = VmValue::Dict(Rc::new(next));
481        state.last_accessed = Instant::now();
482        Some(kept)
483    })
484}
485
486/// Append a message dict to the session transcript. The message must
487/// have at least a string `role`; anything else is merged verbatim.
488pub fn inject_message(id: &str, message: VmValue) -> Result<(), String> {
489    let Some(msg_dict) = message.as_dict().cloned() else {
490        return Err("agent_session_inject: message must be a dict".into());
491    };
492    let role_ok = matches!(msg_dict.get("role"), Some(VmValue::String(_)));
493    if !role_ok {
494        return Err(
495            "agent_session_inject: message must have a string `role` (user|assistant|tool_result|system)"
496                .into(),
497        );
498    }
499    SESSIONS.with(|s| {
500        let mut map = s.borrow_mut();
501        let Some(state) = map.get_mut(id) else {
502            return Err(format!("agent_session_inject: unknown session id '{id}'"));
503        };
504        let dict = state
505            .transcript
506            .as_dict()
507            .cloned()
508            .unwrap_or_else(BTreeMap::new);
509        let mut messages: Vec<VmValue> = match dict.get("messages") {
510            Some(VmValue::List(list)) => list.iter().cloned().collect(),
511            _ => Vec::new(),
512        };
513        let mut events: Vec<VmValue> = match dict.get("events") {
514            Some(VmValue::List(list)) => list.iter().cloned().collect(),
515            _ => crate::llm::helpers::transcript_events_from_messages(&messages),
516        };
517        let new_message = VmValue::Dict(Rc::new(msg_dict));
518        emit_llm_message_event(id, messages.len(), &new_message);
519        events.push(crate::llm::helpers::transcript_event_from_message(
520            &new_message,
521        ));
522        messages.push(new_message);
523        let mut next = dict;
524        next.insert("events".to_string(), VmValue::List(Rc::new(events)));
525        next.insert("messages".to_string(), VmValue::List(Rc::new(messages)));
526        state.transcript = VmValue::Dict(Rc::new(next));
527        state.last_accessed = Instant::now();
528        Ok(())
529    })
530}
531
532fn emit_llm_message_event(session_id: &str, message_index: usize, message: &VmValue) {
533    let mut fields = serde_json::Map::new();
534    fields.insert(
535        "session_id".to_string(),
536        serde_json::Value::String(session_id.to_string()),
537    );
538    fields.insert(
539        "message_index".to_string(),
540        serde_json::json!(message_index),
541    );
542    let message_json = crate::llm::helpers::vm_value_to_json(message);
543    if let Some(role) = message_json.get("role").and_then(|value| value.as_str()) {
544        fields.insert(
545            "role".to_string(),
546            serde_json::Value::String(role.to_string()),
547        );
548    }
549    if let Some(content) = message_json.get("content") {
550        fields.insert("content".to_string(), content.clone());
551    }
552    fields.insert("message".to_string(), message_json);
553    crate::llm::append_observability_sidecar_entry("message", fields);
554}
555
556/// Create a new session from a reconstructed message list.
557///
558/// This is intentionally an all-at-once write instead of repeated
559/// `inject_message` calls: importing a transcript should not re-emit
560/// each historic turn into the active observability sidecar.
561pub fn seed_from_messages(
562    id: Option<String>,
563    messages: &[serde_json::Value],
564    metadata: serde_json::Value,
565    system_prompt: Option<String>,
566    tool_format: Option<String>,
567) -> Result<String, String> {
568    let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
569    if exists(&resolved) {
570        return Err(format!("agent session '{resolved}' already exists"));
571    }
572    open_or_create(Some(resolved.clone()));
573    SESSIONS.with(|s| {
574        let mut map = s.borrow_mut();
575        let Some(state) = map.get_mut(&resolved) else {
576            return Err(format!("failed to create agent session '{resolved}'"));
577        };
578        state.tool_format = tool_format.filter(|value| !value.trim().is_empty());
579        state.system_prompt = system_prompt.filter(|value| !value.trim().is_empty());
580
581        let mut metadata = metadata
582            .as_object()
583            .cloned()
584            .unwrap_or_else(serde_json::Map::new);
585        if let Some(tool_format) = state.tool_format.as_ref() {
586            metadata.insert(
587                "tool_format".to_string(),
588                serde_json::Value::String(tool_format.clone()),
589            );
590            metadata.insert(
591                "tool_mode_locked".to_string(),
592                serde_json::Value::Bool(true),
593            );
594        }
595        if let Some(system_prompt) = state.system_prompt.as_ref() {
596            metadata.insert(
597                "system_prompt".to_string(),
598                crate::llm::helpers::system_prompt_metadata(system_prompt),
599            );
600        }
601        let vm_messages = crate::llm::helpers::json_messages_to_vm(messages);
602        state.transcript = crate::llm::helpers::new_transcript_with(
603            Some(resolved.clone()),
604            vm_messages,
605            None,
606            Some(crate::stdlib::json_to_vm_value(&serde_json::Value::Object(
607                metadata,
608            ))),
609        );
610        state.last_accessed = Instant::now();
611        Ok(resolved)
612    })
613}
614
615/// Load the messages vec (as JSON) for this session, for use as prefix
616/// to an agent_loop run. Returns an empty vec if the session doesn't
617/// exist or has no messages.
618pub fn messages_json(id: &str) -> Vec<serde_json::Value> {
619    SESSIONS.with(|s| {
620        let map = s.borrow();
621        let Some(state) = map.get(id) else {
622            return Vec::new();
623        };
624        let Some(dict) = state.transcript.as_dict() else {
625            return Vec::new();
626        };
627        match dict.get("messages") {
628            Some(VmValue::List(list)) => list
629                .iter()
630                .map(crate::llm::helpers::vm_value_to_json)
631                .collect(),
632            _ => Vec::new(),
633        }
634    })
635}
636
637#[derive(Clone, Debug, Default)]
638pub struct SessionPromptState {
639    pub messages: Vec<serde_json::Value>,
640    pub summary: Option<String>,
641}
642
643fn summary_message_json(summary: &str) -> serde_json::Value {
644    serde_json::json!({
645        "role": "user",
646        "content": summary,
647    })
648}
649
650fn messages_begin_with_summary(messages: &[serde_json::Value], summary: &str) -> bool {
651    messages.first().is_some_and(|message| {
652        message.get("role").and_then(|value| value.as_str()) == Some("user")
653            && message.get("content").and_then(|value| value.as_str()) == Some(summary)
654    })
655}
656
657/// Prompt-surface resume state for a persisted session.
658///
659/// Returns the compacted/rehydratable message list plus the transcript's
660/// summary field. When the transcript carries a summary field but its
661/// message list does not already begin with the compacted summary
662/// message, this helper prepends one so session re-entry preserves the
663/// same prompt surface the previous loop was actually using.
664pub fn prompt_state_json(id: &str) -> SessionPromptState {
665    SESSIONS.with(|s| {
666        let map = s.borrow();
667        let Some(state) = map.get(id) else {
668            return SessionPromptState::default();
669        };
670        let Some(dict) = state.transcript.as_dict() else {
671            return SessionPromptState::default();
672        };
673        let mut messages = match dict.get("messages") {
674            Some(VmValue::List(list)) => list
675                .iter()
676                .map(crate::llm::helpers::vm_value_to_json)
677                .collect::<Vec<_>>(),
678            _ => Vec::new(),
679        };
680        let summary = dict.get("summary").and_then(|value| match value {
681            VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
682            _ => None,
683        });
684        if let Some(summary_text) = summary.as_deref() {
685            if !messages_begin_with_summary(&messages, summary_text) {
686                messages.insert(0, summary_message_json(summary_text));
687            }
688        }
689        SessionPromptState { messages, summary }
690    })
691}
692
693/// Overwrite the transcript for this session. Used by `agent_loop` on
694/// exit to persist the synthesized transcript.
695pub fn store_transcript(id: &str, transcript: VmValue) {
696    SESSIONS.with(|s| {
697        if let Some(state) = s.borrow_mut().get_mut(id) {
698            state.transcript = transcript_with_session_metadata(transcript, state);
699            state.last_accessed = Instant::now();
700        }
701    });
702}
703
704/// Append a transcript event to the session without mutating its
705/// message list. Used for orchestration-side lineage events (sub-agent
706/// spawn/completion, workflow hooks, etc.) that should survive
707/// persistence/replay without being replayed back into the model as
708/// conversational messages.
709pub fn append_event(id: &str, event: VmValue) -> Result<(), String> {
710    let Some(event_dict) = event.as_dict() else {
711        return Err("agent_session_append_event: event must be a dict".into());
712    };
713    let kind_ok = matches!(event_dict.get("kind"), Some(VmValue::String(_)));
714    if !kind_ok {
715        return Err("agent_session_append_event: event must have a string `kind`".into());
716    }
717    SESSIONS.with(|s| {
718        let mut map = s.borrow_mut();
719        let Some(state) = map.get_mut(id) else {
720            return Err(format!(
721                "agent_session_append_event: unknown session id '{id}'"
722            ));
723        };
724        let dict = state
725            .transcript
726            .as_dict()
727            .cloned()
728            .unwrap_or_else(BTreeMap::new);
729        let mut events: Vec<VmValue> = match dict.get("events") {
730            Some(VmValue::List(list)) => list.iter().cloned().collect(),
731            _ => dict
732                .get("messages")
733                .and_then(|value| match value {
734                    VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
735                    _ => None,
736                })
737                .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
738                .unwrap_or_default(),
739        };
740        events.push(event);
741        let mut next = dict;
742        next.insert("events".to_string(), VmValue::List(Rc::new(events)));
743        state.transcript = VmValue::Dict(Rc::new(next));
744        state.last_accessed = Instant::now();
745        Ok(())
746    })
747}
748
749/// Replace the transcript's message list wholesale. Used by the
750/// in-loop compaction path, which operates on JSON messages.
751pub fn replace_messages(id: &str, messages: &[serde_json::Value]) {
752    replace_messages_with_summary(id, messages, None);
753}
754
755/// Replace the transcript's message list and optionally update the
756/// `summary` field on the persisted transcript. The compaction path
757/// uses this to publish the human-readable rollup line that
758/// `transcript_summary(transcript)` exposes to host code.
759pub fn replace_messages_with_summary(
760    id: &str,
761    messages: &[serde_json::Value],
762    summary: Option<&str>,
763) {
764    SESSIONS.with(|s| {
765        let mut map = s.borrow_mut();
766        let Some(state) = map.get_mut(id) else {
767            return;
768        };
769        let dict = state
770            .transcript
771            .as_dict()
772            .cloned()
773            .unwrap_or_else(BTreeMap::new);
774        let vm_messages: Vec<VmValue> = messages
775            .iter()
776            .map(crate::stdlib::json_to_vm_value)
777            .collect();
778        let mut next = dict;
779        next.insert(
780            "events".to_string(),
781            VmValue::List(Rc::new(
782                crate::llm::helpers::transcript_events_from_messages(&vm_messages),
783            )),
784        );
785        next.insert("messages".to_string(), VmValue::List(Rc::new(vm_messages)));
786        if let Some(summary) = summary {
787            next.insert(
788                "summary".to_string(),
789                VmValue::String(Rc::from(summary.to_string())),
790            );
791        }
792        state.transcript = VmValue::Dict(Rc::new(next));
793        state.last_accessed = Instant::now();
794    });
795}
796
797pub fn append_subscriber(id: &str, callback: VmValue) {
798    open_or_create(Some(id.to_string()));
799    SESSIONS.with(|s| {
800        if let Some(state) = s.borrow_mut().get_mut(id) {
801            state.subscribers.push(callback);
802            state.last_accessed = Instant::now();
803        }
804    });
805}
806
807pub fn subscribers_for(id: &str) -> Vec<VmValue> {
808    SESSIONS.with(|s| {
809        s.borrow()
810            .get(id)
811            .map(|state| state.subscribers.clone())
812            .unwrap_or_default()
813    })
814}
815
816pub fn subscriber_count(id: &str) -> usize {
817    SESSIONS.with(|s| {
818        s.borrow()
819            .get(id)
820            .map(|state| state.subscribers.len())
821            .unwrap_or(0)
822    })
823}
824
825/// Persist the set of active skill names for session resume. Called at
826/// the end of an agent_loop run; the next `open_or_create` for this id
827/// reads them back via [`active_skills`].
828pub fn set_active_skills(id: &str, skills: Vec<String>) {
829    SESSIONS.with(|s| {
830        if let Some(state) = s.borrow_mut().get_mut(id) {
831            state.active_skills = skills;
832            state.last_accessed = Instant::now();
833        }
834    });
835}
836
837/// Skills that were active at the end of the previous agent_loop run
838/// against this session. Returns an empty vec when the session doesn't
839/// exist or nothing was persisted.
840pub fn active_skills(id: &str) -> Vec<String> {
841    SESSIONS.with(|s| {
842        s.borrow()
843            .get(id)
844            .map(|state| state.active_skills.clone())
845            .unwrap_or_default()
846    })
847}
848
849/// Claim the tool-calling contract for a session.
850///
851/// The first loop against a named session records its `tool_format`.
852/// Later re-entry must use the same format so prompt/history generated
853/// under a text contract is never replayed as native, or vice versa.
854pub fn claim_tool_format(id: &str, tool_format: &str) -> Result<(), String> {
855    let tool_format = tool_format.trim();
856    if tool_format.is_empty() {
857        return Ok(());
858    }
859    SESSIONS.with(|s| {
860        let mut map = s.borrow_mut();
861        let Some(state) = map.get_mut(id) else {
862            return Err(format!("agent session '{id}' does not exist"));
863        };
864        match state.tool_format.as_deref() {
865            Some(existing) if existing != tool_format => Err(format!(
866                "agent session '{id}' is locked to tool_format='{existing}', but this run requested tool_format='{tool_format}'. Start a new session or fork/reset the transcript before changing tool mode."
867            )),
868            Some(_) => {
869                state.last_accessed = Instant::now();
870                Ok(())
871            }
872            None => {
873                state.tool_format = Some(tool_format.to_string());
874                state.last_accessed = Instant::now();
875                Ok(())
876            }
877        }
878    })
879}
880
881pub fn tool_format(id: &str) -> Option<String> {
882    SESSIONS.with(|s| {
883        s.borrow()
884            .get(id)
885            .and_then(|state| state.tool_format.clone())
886    })
887}
888
889pub fn record_system_prompt(id: &str, system_prompt: &str) -> Result<(), String> {
890    let system_prompt = system_prompt.trim();
891    if system_prompt.is_empty() {
892        return Ok(());
893    }
894    SESSIONS.with(|s| {
895        let mut map = s.borrow_mut();
896        let Some(state) = map.get_mut(id) else {
897            return Err(format!("agent session '{id}' does not exist"));
898        };
899        let changed = state.system_prompt.as_deref() != Some(system_prompt);
900        state.system_prompt = Some(system_prompt.to_string());
901        let dict = state
902            .transcript
903            .as_dict()
904            .cloned()
905            .unwrap_or_else(BTreeMap::new);
906        let mut next = dict;
907        apply_system_prompt_metadata(&mut next, system_prompt);
908        if changed {
909            let mut events: Vec<VmValue> = match next.get("events") {
910                Some(VmValue::List(list)) => list.iter().cloned().collect(),
911                _ => Vec::new(),
912            };
913            events.push(crate::llm::helpers::transcript_event(
914                "system_prompt",
915                "system",
916                "internal",
917                "",
918                Some(crate::llm::helpers::system_prompt_event_metadata(
919                    system_prompt,
920                )),
921            ));
922            next.insert("events".to_string(), VmValue::List(Rc::new(events)));
923        }
924        state.transcript = VmValue::Dict(Rc::new(next));
925        state.last_accessed = Instant::now();
926        Ok(())
927    })
928}
929
930pub fn system_prompt(id: &str) -> Option<String> {
931    SESSIONS.with(|s| {
932        s.borrow()
933            .get(id)
934            .and_then(|state| state.system_prompt.clone())
935    })
936}
937
938fn empty_transcript(id: &str) -> VmValue {
939    use crate::llm::helpers::new_transcript_with;
940    new_transcript_with(Some(id.to_string()), Vec::new(), None, None)
941}
942
943fn clone_transcript_with_id(transcript: &VmValue, new_id: &str) -> VmValue {
944    let Some(dict) = transcript.as_dict() else {
945        return empty_transcript(new_id);
946    };
947    let mut next = dict.clone();
948    next.insert(
949        "id".to_string(),
950        VmValue::String(Rc::from(new_id.to_string())),
951    );
952    VmValue::Dict(Rc::new(next))
953}
954
955fn clone_transcript_with_parent(transcript: &VmValue, parent_id: &str) -> VmValue {
956    let Some(dict) = transcript.as_dict() else {
957        return transcript.clone();
958    };
959    let mut next = dict.clone();
960    let metadata = match next.get("metadata") {
961        Some(VmValue::Dict(metadata)) => {
962            let mut metadata = metadata.as_ref().clone();
963            metadata.insert(
964                "parent_session_id".to_string(),
965                VmValue::String(Rc::from(parent_id.to_string())),
966            );
967            VmValue::Dict(Rc::new(metadata))
968        }
969        _ => VmValue::Dict(Rc::new(BTreeMap::from([(
970            "parent_session_id".to_string(),
971            VmValue::String(Rc::from(parent_id.to_string())),
972        )]))),
973    };
974    next.insert("metadata".to_string(), metadata);
975    VmValue::Dict(Rc::new(next))
976}
977
978fn apply_system_prompt_metadata(next: &mut BTreeMap<String, VmValue>, system_prompt: &str) {
979    let mut metadata = match next.get("metadata") {
980        Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
981        _ => BTreeMap::new(),
982    };
983    metadata.insert(
984        "system_prompt".to_string(),
985        crate::stdlib::json_to_vm_value(&crate::llm::helpers::system_prompt_metadata(
986            system_prompt,
987        )),
988    );
989    next.insert("metadata".to_string(), VmValue::Dict(Rc::new(metadata)));
990}
991
992fn transcript_with_session_metadata(transcript: VmValue, state: &SessionState) -> VmValue {
993    let Some(dict) = transcript.as_dict() else {
994        return transcript;
995    };
996    let mut next = dict.clone();
997    let mut metadata = match next.get("metadata") {
998        Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
999        _ => BTreeMap::new(),
1000    };
1001    if let Some(tool_format) = state.tool_format.as_ref() {
1002        metadata.insert(
1003            "tool_format".to_string(),
1004            VmValue::String(Rc::from(tool_format.clone())),
1005        );
1006        metadata.insert("tool_mode_locked".to_string(), VmValue::Bool(true));
1007    }
1008    if let Some(system_prompt) = state.system_prompt.as_ref() {
1009        metadata.insert(
1010            "system_prompt".to_string(),
1011            crate::stdlib::json_to_vm_value(&crate::llm::helpers::system_prompt_metadata(
1012                system_prompt,
1013            )),
1014        );
1015    }
1016    if !metadata.is_empty() {
1017        next.insert("metadata".to_string(), VmValue::Dict(Rc::new(metadata)));
1018    }
1019    VmValue::Dict(Rc::new(next))
1020}
1021
1022fn session_snapshot(state: &SessionState) -> VmValue {
1023    let transcript = transcript_with_session_metadata(state.transcript.clone(), state);
1024    let Some(dict) = transcript.as_dict() else {
1025        return state.transcript.clone();
1026    };
1027    let mut next = dict.clone();
1028    let length = next
1029        .get("messages")
1030        .and_then(|value| match value {
1031            VmValue::List(list) => Some(list.len() as i64),
1032            _ => None,
1033        })
1034        .unwrap_or(0);
1035    next.insert("length".to_string(), VmValue::Int(length));
1036    next.insert(
1037        "created_at".to_string(),
1038        VmValue::String(Rc::from(state.created_at.clone())),
1039    );
1040    next.insert(
1041        "parent_id".to_string(),
1042        state
1043            .parent_id
1044            .as_ref()
1045            .map(|id| VmValue::String(Rc::from(id.clone())))
1046            .unwrap_or(VmValue::Nil),
1047    );
1048    next.insert(
1049        "child_ids".to_string(),
1050        VmValue::List(Rc::new(
1051            state
1052                .child_ids
1053                .iter()
1054                .cloned()
1055                .map(|id| VmValue::String(Rc::from(id)))
1056                .collect(),
1057        )),
1058    );
1059    next.insert(
1060        "branched_at_event_index".to_string(),
1061        state
1062            .branched_at_event_index
1063            .map(|index| VmValue::Int(index as i64))
1064            .unwrap_or(VmValue::Nil),
1065    );
1066    next.insert(
1067        "system_prompt".to_string(),
1068        state
1069            .system_prompt
1070            .as_ref()
1071            .map(|prompt| VmValue::String(Rc::from(prompt.clone())))
1072            .unwrap_or(VmValue::Nil),
1073    );
1074    next.insert(
1075        "tool_format".to_string(),
1076        state
1077            .tool_format
1078            .as_ref()
1079            .map(|format| VmValue::String(Rc::from(format.clone())))
1080            .unwrap_or(VmValue::Nil),
1081    );
1082    VmValue::Dict(Rc::new(next))
1083}
1084
1085fn update_lineage(
1086    map: &mut HashMap<String, SessionState>,
1087    parent_id: &str,
1088    child_id: &str,
1089    branched_at_event_index: Option<usize>,
1090) {
1091    let old_parent_id = map.get(child_id).and_then(|child| child.parent_id.clone());
1092    if let Some(old_parent_id) = old_parent_id.filter(|old_parent_id| old_parent_id != parent_id) {
1093        if let Some(old_parent) = map.get_mut(&old_parent_id) {
1094            old_parent.child_ids.retain(|id| id != child_id);
1095            old_parent.last_accessed = Instant::now();
1096        }
1097    }
1098    if let Some(parent) = map.get_mut(parent_id) {
1099        parent.last_accessed = Instant::now();
1100        if !parent.child_ids.iter().any(|id| id == child_id) {
1101            parent.child_ids.push(child_id.to_string());
1102        }
1103    }
1104    if let Some(child) = map.get_mut(child_id) {
1105        child.last_accessed = Instant::now();
1106        child.parent_id = Some(parent_id.to_string());
1107        child.branched_at_event_index = branched_at_event_index;
1108        child.transcript = clone_transcript_with_parent(&child.transcript, parent_id);
1109    }
1110}
1111
1112fn branch_event_index(transcript: &VmValue, keep_first: usize) -> usize {
1113    if keep_first == 0 {
1114        return 0;
1115    }
1116    let Some(dict) = transcript.as_dict() else {
1117        return keep_first;
1118    };
1119    let Some(VmValue::List(events)) = dict.get("events") else {
1120        return keep_first;
1121    };
1122    let mut retained_messages = 0usize;
1123    for (index, event) in events.iter().enumerate() {
1124        let kind = event
1125            .as_dict()
1126            .and_then(|dict| dict.get("kind"))
1127            .map(VmValue::display);
1128        if matches!(kind.as_deref(), Some("message" | "tool_result")) {
1129            retained_messages += 1;
1130            if retained_messages == keep_first {
1131                return index + 1;
1132            }
1133        }
1134    }
1135    events.len()
1136}
1137
1138#[cfg(test)]
1139mod tests {
1140    use super::*;
1141    use crate::agent_events::{
1142        emit_event, register_sink, reset_all_sinks, session_external_sink_count, AgentEvent,
1143        AgentEventSink,
1144    };
1145    use crate::event_log::{active_event_log, EventLog, Topic};
1146    use std::collections::BTreeMap;
1147    use std::sync::{Arc, Mutex};
1148
1149    fn make_msg(role: &str, content: &str) -> VmValue {
1150        let mut m: BTreeMap<String, VmValue> = BTreeMap::new();
1151        m.insert("role".to_string(), VmValue::String(Rc::from(role)));
1152        m.insert("content".to_string(), VmValue::String(Rc::from(content)));
1153        VmValue::Dict(Rc::new(m))
1154    }
1155
1156    fn message_count(id: &str) -> usize {
1157        SESSIONS.with(|s| {
1158            let map = s.borrow();
1159            let Some(state) = map.get(id) else { return 0 };
1160            let Some(dict) = state.transcript.as_dict() else {
1161                return 0;
1162            };
1163            match dict.get("messages") {
1164                Some(VmValue::List(list)) => list.len(),
1165                _ => 0,
1166            }
1167        })
1168    }
1169
1170    fn event_count_by_kind(id: &str, expected_kind: &str) -> usize {
1171        snapshot(id)
1172            .and_then(|snapshot| snapshot.as_dict().cloned())
1173            .and_then(|dict| dict.get("events").cloned())
1174            .and_then(|events| match events {
1175                VmValue::List(events) => Some(
1176                    events
1177                        .iter()
1178                        .filter(|event| {
1179                            event
1180                                .as_dict()
1181                                .and_then(|dict| dict.get("kind"))
1182                                .map(VmValue::display)
1183                                .as_deref()
1184                                == Some(expected_kind)
1185                        })
1186                        .count(),
1187                ),
1188                _ => None,
1189            })
1190            .unwrap_or(0)
1191    }
1192
1193    struct CapturingSink(Arc<Mutex<Vec<AgentEvent>>>);
1194
1195    impl AgentEventSink for CapturingSink {
1196        fn handle_event(&self, event: &AgentEvent) {
1197            self.0
1198                .lock()
1199                .expect("capture sink poisoned")
1200                .push(event.clone());
1201        }
1202    }
1203
1204    #[test]
1205    fn records_system_prompt_as_metadata_event_without_message() {
1206        reset_session_store();
1207        let id = open_or_create(Some("system-prompt-session".into()));
1208        record_system_prompt(&id, "Follow the workflow.").unwrap();
1209        record_system_prompt(&id, "Follow the workflow.").unwrap();
1210        inject_message(&id, make_msg("user", "hello")).unwrap();
1211
1212        let snapshot = snapshot(&id).expect("session snapshot");
1213        let snapshot_dict = snapshot.as_dict().expect("session snapshot dict");
1214        let metadata = snapshot_dict
1215            .get("metadata")
1216            .and_then(VmValue::as_dict)
1217            .expect("metadata");
1218        let system_prompt = metadata
1219            .get("system_prompt")
1220            .and_then(VmValue::as_dict)
1221            .expect("system prompt metadata");
1222        assert_eq!(
1223            system_prompt
1224                .get("content")
1225                .map(VmValue::display)
1226                .as_deref(),
1227            Some("Follow the workflow.")
1228        );
1229        assert!(
1230            matches!(snapshot_dict.get("system_prompt"), Some(VmValue::String(value)) if value.as_ref() == "Follow the workflow.")
1231        );
1232        assert!(matches!(snapshot_dict.get("length"), Some(VmValue::Int(1))));
1233
1234        let transcript = transcript(&id).expect("canonical transcript");
1235        let transcript_dict = transcript.as_dict().expect("canonical transcript dict");
1236        assert!(!transcript_dict.contains_key("system_prompt"));
1237        assert!(transcript_dict
1238            .get("metadata")
1239            .and_then(VmValue::as_dict)
1240            .and_then(|metadata| metadata.get("system_prompt"))
1241            .is_some());
1242        assert_eq!(message_count(&id), 1);
1243        assert_eq!(event_count_by_kind(&id, "system_prompt"), 1);
1244    }
1245
1246    #[test]
1247    fn close_with_status_emits_terminal_event_and_clears_sinks() {
1248        reset_all_sinks();
1249        let id = open_or_create(Some("close-reason-session".into()));
1250        inject_message(&id, make_msg("user", "hello")).unwrap();
1251        let captured = Arc::new(Mutex::new(Vec::new()));
1252        register_sink(&id, Arc::new(CapturingSink(captured.clone())));
1253        assert_eq!(session_external_sink_count(&id), 1);
1254
1255        assert!(close_with_status(
1256            &id,
1257            "timeout",
1258            "timeout",
1259            serde_json::json!({"idle_ms": 5000}),
1260        ));
1261
1262        assert!(!exists(&id));
1263        assert_eq!(session_external_sink_count(&id), 0);
1264        let events = captured.lock().expect("capture sink poisoned");
1265        assert_eq!(events.len(), 1);
1266        match &events[0] {
1267            AgentEvent::SessionClosed {
1268                session_id,
1269                reason,
1270                status,
1271                metadata,
1272            } => {
1273                assert_eq!(session_id, "close-reason-session");
1274                assert_eq!(reason, "timeout");
1275                assert_eq!(status, "timeout");
1276                assert_eq!(metadata["idle_ms"], 5000);
1277            }
1278            other => panic!("expected SessionClosed, got {other:?}"),
1279        }
1280        reset_all_sinks();
1281    }
1282
1283    #[test]
1284    fn fork_at_truncates_destination_to_keep_first() {
1285        reset_session_store();
1286        let src = open_or_create(Some("src-fork-at".into()));
1287        inject_message(&src, make_msg("user", "a")).unwrap();
1288        inject_message(&src, make_msg("assistant", "b")).unwrap();
1289        inject_message(&src, make_msg("user", "c")).unwrap();
1290        inject_message(&src, make_msg("assistant", "d")).unwrap();
1291        assert_eq!(message_count(&src), 4);
1292
1293        let dst = fork_at(&src, 2, Some("dst-fork-at".into())).expect("fork_at");
1294        assert_ne!(dst, src);
1295        assert_eq!(message_count(&dst), 2, "branched at message index 2");
1296        assert_eq!(
1297            snapshot(&dst)
1298                .and_then(|value| value.as_dict().cloned())
1299                .and_then(|dict| dict
1300                    .get("branched_at_event_index")
1301                    .and_then(VmValue::as_int)),
1302            Some(2)
1303        );
1304        // Source untouched.
1305        assert_eq!(message_count(&src), 4);
1306        // Subscribers not carried — forks start with a clean fanout list.
1307        assert_eq!(subscriber_count(&dst), 0);
1308        reset_session_store();
1309    }
1310
1311    #[test]
1312    fn fork_at_on_unknown_source_returns_none() {
1313        reset_session_store();
1314        assert!(fork_at("does-not-exist", 3, None).is_none());
1315    }
1316
1317    #[test]
1318    fn child_sessions_record_parent_lineage() {
1319        reset_session_store();
1320        let parent = open_or_create(Some("parent-session".into()));
1321        let child = open_child_session(&parent, Some("child-session".into()));
1322        assert_eq!(parent_id(&child).as_deref(), Some("parent-session"));
1323        assert_eq!(child_ids(&parent), vec!["child-session".to_string()]);
1324        assert_eq!(
1325            ancestry(&child),
1326            Some(SessionAncestry {
1327                parent_id: Some("parent-session".to_string()),
1328                child_ids: Vec::new(),
1329                root_id: "parent-session".to_string(),
1330            })
1331        );
1332
1333        let transcript = snapshot(&child).expect("child transcript");
1334        let transcript = transcript.as_dict().expect("child snapshot");
1335        let metadata = transcript
1336            .get("metadata")
1337            .and_then(|value| value.as_dict())
1338            .expect("child metadata");
1339        assert!(
1340            matches!(transcript.get("parent_id"), Some(VmValue::String(value)) if value.as_ref() == "parent-session")
1341        );
1342        assert!(
1343            matches!(transcript.get("child_ids"), Some(VmValue::List(children)) if children.is_empty())
1344        );
1345        assert!(matches!(transcript.get("length"), Some(VmValue::Int(0))));
1346        assert!(
1347            matches!(transcript.get("created_at"), Some(VmValue::String(value)) if !value.is_empty())
1348        );
1349        assert!(matches!(
1350            transcript.get("system_prompt"),
1351            Some(VmValue::Nil)
1352        ));
1353        assert!(matches!(transcript.get("tool_format"), Some(VmValue::Nil)));
1354        assert!(matches!(
1355            transcript.get("branched_at_event_index"),
1356            Some(VmValue::Nil)
1357        ));
1358        assert!(matches!(
1359            metadata.get("parent_session_id"),
1360            Some(VmValue::String(value)) if value.as_ref() == "parent-session"
1361        ));
1362    }
1363
1364    #[test]
1365    fn branch_event_index_counts_non_message_events() {
1366        reset_session_store();
1367        let src = open_or_create(Some("branch-event-index".into()));
1368        let transcript = VmValue::Dict(Rc::new(BTreeMap::from([
1369            ("id".to_string(), VmValue::String(Rc::from(src.clone()))),
1370            (
1371                "messages".to_string(),
1372                VmValue::List(Rc::new(vec![
1373                    make_msg("user", "a"),
1374                    make_msg("assistant", "b"),
1375                ])),
1376            ),
1377            (
1378                "events".to_string(),
1379                VmValue::List(Rc::new(vec![
1380                    VmValue::Dict(Rc::new(BTreeMap::from([(
1381                        "kind".to_string(),
1382                        VmValue::String(Rc::from("message")),
1383                    )]))),
1384                    VmValue::Dict(Rc::new(BTreeMap::from([(
1385                        "kind".to_string(),
1386                        VmValue::String(Rc::from("sub_agent_start")),
1387                    )]))),
1388                    VmValue::Dict(Rc::new(BTreeMap::from([(
1389                        "kind".to_string(),
1390                        VmValue::String(Rc::from("message")),
1391                    )]))),
1392                ])),
1393            ),
1394        ])));
1395        store_transcript(&src, transcript);
1396
1397        let dst = fork_at(&src, 2, Some("branch-event-index-child".into())).expect("fork_at");
1398        assert_eq!(
1399            snapshot(&dst)
1400                .and_then(|value| value.as_dict().cloned())
1401                .and_then(|dict| dict
1402                    .get("branched_at_event_index")
1403                    .and_then(VmValue::as_int)),
1404            Some(3)
1405        );
1406    }
1407
1408    #[test]
1409    fn child_session_records_lineage_without_reusing_parent_transcript() {
1410        reset_session_store();
1411        let parent = open_or_create(Some("parent-fork-parent".into()));
1412        inject_message(&parent, make_msg("user", "parent context")).unwrap();
1413        claim_tool_format(&parent, "native").unwrap();
1414
1415        let child = open_child_session(&parent, Some("parent-fork-child".into()));
1416        assert_eq!(message_count(&parent), 1);
1417        assert_eq!(message_count(&child), 0);
1418        assert_eq!(tool_format(&child), None);
1419        assert_eq!(parent_id(&child).as_deref(), Some(parent.as_str()));
1420    }
1421
1422    #[test]
1423    fn prompt_state_prepends_summary_message_when_missing_from_messages() {
1424        reset_session_store();
1425        let session = open_or_create(Some("prompt-state-summary".into()));
1426        let transcript = crate::llm::helpers::new_transcript_with_events(
1427            Some(session.clone()),
1428            vec![make_msg("assistant", "latest answer")],
1429            Some("[auto-compacted 2 older messages]\nsummary".to_string()),
1430            None,
1431            Vec::new(),
1432            Vec::new(),
1433            Some("active"),
1434        );
1435        store_transcript(&session, transcript);
1436
1437        let prompt = prompt_state_json(&session);
1438        assert_eq!(
1439            prompt.summary.as_deref(),
1440            Some("[auto-compacted 2 older messages]\nsummary")
1441        );
1442        assert_eq!(prompt.messages.len(), 2);
1443        assert_eq!(prompt.messages[0]["role"].as_str(), Some("user"));
1444        assert_eq!(
1445            prompt.messages[0]["content"].as_str(),
1446            Some("[auto-compacted 2 older messages]\nsummary"),
1447        );
1448        assert_eq!(prompt.messages[1]["role"].as_str(), Some("assistant"));
1449    }
1450
1451    #[tokio::test(flavor = "current_thread", start_paused = true)]
1452    async fn open_or_create_registers_event_log_sink_when_active_log_is_installed() {
1453        reset_all_sinks();
1454        crate::event_log::reset_active_event_log();
1455        let dir = tempfile::tempdir().expect("tempdir");
1456        crate::event_log::install_default_for_base_dir(dir.path()).expect("install event log");
1457
1458        let session = open_or_create(Some("event-log-session".into()));
1459        assert_eq!(session_external_sink_count(&session), 1);
1460
1461        emit_event(&AgentEvent::TurnStart {
1462            session_id: session.clone(),
1463            iteration: 0,
1464        });
1465        tokio::time::sleep(std::time::Duration::from_millis(25)).await;
1466
1467        let topic = Topic::new("observability.agent_events.event-log-session").unwrap();
1468        let log = active_event_log().expect("active event log");
1469        let events = log.read_range(&topic, None, usize::MAX).await.unwrap();
1470        assert_eq!(events.len(), 1);
1471        assert_eq!(events[0].1.kind, "turn_start");
1472
1473        crate::event_log::reset_active_event_log();
1474        reset_all_sinks();
1475    }
1476}