Skip to main content

harn_vm/
agent_sessions.rs

1//! First-class session storage.
2//!
3//! A session owns three things:
4//!
5//! 1. A transcript dict (messages, events, summary, metadata, …).
6//! 2. Closure subscribers that fire on agent-loop events for this session.
7//! 3. Its own lifecycle (open, reset, fork, trim, compact, close).
8//!
9//! Storage is thread-local because `VmValue` contains `Rc`, which is
10//! neither `Send` nor `Sync`. The agent loop runs on a tokio
11//! current-thread worker, so all session reads and writes happen on the
12//! same thread. The closure-subscribers register, fire, and unregister
13//! on that same thread.
14//!
15//! Lifecycle is explicit. Builtins (`agent_session_open`,
16//! `_reset`, `_fork`, `_fork_at`, `_close`, `_trim`, `_compact`,
17//! `_inject`, `_exists`, `_length`, `_snapshot`, `_ancestry`) drive
18//! the store directly — there is no "policy" config dict that
19//! performs lifecycle as a side effect.
20
21use std::cell::{Cell, RefCell};
22use std::collections::{BTreeMap, HashMap, HashSet};
23use std::rc::Rc;
24use std::time::Instant;
25
26use crate::value::VmValue;
27
28/// Default cap on concurrent sessions per VM thread. Beyond this the
29/// least-recently-accessed session is evicted on the next `open`.
30pub const DEFAULT_SESSION_CAP: usize = 128;
31
32pub struct SessionState {
33    pub id: String,
34    pub transcript: VmValue,
35    pub subscribers: Vec<VmValue>,
36    pub created_at: Instant,
37    pub last_accessed: Instant,
38    pub parent_id: Option<String>,
39    pub child_ids: Vec<String>,
40    pub branched_at_event_index: Option<usize>,
41    /// Names of skills that were active at the end of the most recent
42    /// `agent_loop` run on this session. Empty when no skills were
43    /// matched, when the skill system wasn't used, or when the
44    /// deactivation phase cleared them. Re-entering the session
45    /// restores these as the initial active set before matching runs.
46    pub active_skills: Vec<String>,
47}
48
49impl SessionState {
50    fn new(id: String) -> Self {
51        let now = Instant::now();
52        let transcript = empty_transcript(&id);
53        Self {
54            id,
55            transcript,
56            subscribers: Vec::new(),
57            created_at: now,
58            last_accessed: now,
59            parent_id: None,
60            child_ids: Vec::new(),
61            branched_at_event_index: None,
62            active_skills: Vec::new(),
63        }
64    }
65}
66
67#[derive(Clone, Debug, PartialEq, Eq)]
68pub struct SessionAncestry {
69    pub parent_id: Option<String>,
70    pub child_ids: Vec<String>,
71    pub root_id: String,
72}
73
74thread_local! {
75    static SESSIONS: RefCell<HashMap<String, SessionState>> = RefCell::new(HashMap::new());
76    static SESSION_CAP: Cell<usize> = const { Cell::new(DEFAULT_SESSION_CAP) };
77    static CURRENT_SESSION_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
78}
79
80pub struct CurrentSessionGuard {
81    active: bool,
82}
83
84impl Drop for CurrentSessionGuard {
85    fn drop(&mut self) {
86        if self.active {
87            pop_current_session();
88        }
89    }
90}
91
92/// Set the per-thread session cap. Primarily for tests; production VMs
93/// inherit the default.
94pub fn set_session_cap(cap: usize) {
95    SESSION_CAP.with(|c| c.set(cap.max(1)));
96}
97
98pub fn session_cap() -> usize {
99    SESSION_CAP.with(|c| c.get())
100}
101
102/// Clear the session store. Wired into `reset_llm_state` for test isolation.
103pub fn reset_session_store() {
104    SESSIONS.with(|s| s.borrow_mut().clear());
105    CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().clear());
106}
107
108pub(crate) fn push_current_session(id: String) {
109    if id.is_empty() {
110        return;
111    }
112    CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().push(id));
113}
114
115pub(crate) fn pop_current_session() {
116    CURRENT_SESSION_STACK.with(|stack| {
117        let _ = stack.borrow_mut().pop();
118    });
119}
120
121pub fn current_session_id() -> Option<String> {
122    CURRENT_SESSION_STACK.with(|stack| stack.borrow().last().cloned())
123}
124
125pub fn enter_current_session(id: impl Into<String>) -> CurrentSessionGuard {
126    let id = id.into();
127    if id.trim().is_empty() {
128        return CurrentSessionGuard { active: false };
129    }
130    push_current_session(id);
131    CurrentSessionGuard { active: true }
132}
133
134pub fn exists(id: &str) -> bool {
135    SESSIONS.with(|s| s.borrow().contains_key(id))
136}
137
138pub fn length(id: &str) -> Option<usize> {
139    SESSIONS.with(|s| {
140        s.borrow().get(id).map(|state| {
141            state
142                .transcript
143                .as_dict()
144                .and_then(|d| d.get("messages"))
145                .and_then(|v| match v {
146                    VmValue::List(list) => Some(list.len()),
147                    _ => None,
148                })
149                .unwrap_or(0)
150        })
151    })
152}
153
154pub fn snapshot(id: &str) -> Option<VmValue> {
155    SESSIONS.with(|s| s.borrow().get(id).map(session_snapshot))
156}
157
158/// Open a session, or create it if missing. Returns the resolved id.
159///
160/// Newly-created sessions auto-register an event-log-backed sink when a
161/// generalized [`crate::event_log::EventLog`] has been installed for the
162/// current VM thread. For legacy env-driven workflows that still point
163/// `HARN_EVENT_LOG_DIR` at a directory, we preserve the older JSONL sink
164/// as a compatibility fallback. Re-opening an existing session does not
165/// re-register — sinks are per-session, owned by the first opener.
166pub fn open_or_create(id: Option<String>) -> String {
167    let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
168    let mut was_new = false;
169    SESSIONS.with(|s| {
170        let mut map = s.borrow_mut();
171        if let Some(state) = map.get_mut(&resolved) {
172            state.last_accessed = Instant::now();
173            return;
174        }
175        was_new = true;
176        let cap = SESSION_CAP.with(|c| c.get());
177        if map.len() >= cap {
178            if let Some(victim) = map
179                .iter()
180                .min_by_key(|(_, state)| state.last_accessed)
181                .map(|(id, _)| id.clone())
182            {
183                map.remove(&victim);
184            }
185        }
186        map.insert(resolved.clone(), SessionState::new(resolved.clone()));
187    });
188    if was_new {
189        try_register_event_log(&resolved);
190    }
191    resolved
192}
193
194pub fn open_child_session(parent_id: &str, id: Option<String>) -> String {
195    let resolved = fork(parent_id, id.clone()).unwrap_or_else(|| open_or_create(id));
196    link_child_session(parent_id, &resolved);
197    resolved
198}
199
200pub fn link_child_session(parent_id: &str, child_id: &str) {
201    link_child_session_with_branch(parent_id, child_id, None);
202}
203
204pub fn link_child_session_with_branch(
205    parent_id: &str,
206    child_id: &str,
207    branched_at_event_index: Option<usize>,
208) {
209    if parent_id == child_id {
210        return;
211    }
212    open_or_create(Some(parent_id.to_string()));
213    open_or_create(Some(child_id.to_string()));
214    SESSIONS.with(|s| {
215        let mut map = s.borrow_mut();
216        update_lineage(&mut map, parent_id, child_id, branched_at_event_index);
217    });
218}
219
220pub fn parent_id(id: &str) -> Option<String> {
221    SESSIONS.with(|s| s.borrow().get(id).and_then(|state| state.parent_id.clone()))
222}
223
224pub fn child_ids(id: &str) -> Vec<String> {
225    SESSIONS.with(|s| {
226        s.borrow()
227            .get(id)
228            .map(|state| state.child_ids.clone())
229            .unwrap_or_default()
230    })
231}
232
233pub fn ancestry(id: &str) -> Option<SessionAncestry> {
234    SESSIONS.with(|s| {
235        let map = s.borrow();
236        let state = map.get(id)?;
237        let mut root_id = state.id.clone();
238        let mut cursor = state.parent_id.clone();
239        let mut seen = HashSet::from([state.id.clone()]);
240        while let Some(parent_id) = cursor {
241            if !seen.insert(parent_id.clone()) {
242                break;
243            }
244            root_id = parent_id.clone();
245            cursor = map
246                .get(&parent_id)
247                .and_then(|parent| parent.parent_id.clone());
248        }
249        Some(SessionAncestry {
250            parent_id: state.parent_id.clone(),
251            child_ids: state.child_ids.clone(),
252            root_id,
253        })
254    })
255}
256
257/// Auto-register a persistent sink for a newly-created session.
258/// Silent no-op on failure — a broken observability sink must never
259/// prevent a session from starting.
260fn try_register_event_log(session_id: &str) {
261    if let Some(log) = crate::event_log::active_event_log() {
262        crate::agent_events::register_sink(
263            session_id,
264            crate::agent_events::EventLogSink::new(log, session_id),
265        );
266        return;
267    }
268    let Ok(dir) = std::env::var("HARN_EVENT_LOG_DIR") else {
269        return;
270    };
271    if dir.is_empty() {
272        return;
273    }
274    let path = std::path::PathBuf::from(dir).join(format!("event_log-{session_id}.jsonl"));
275    if let Ok(sink) = crate::agent_events::JsonlEventSink::open(&path) {
276        crate::agent_events::register_sink(session_id, sink);
277    }
278}
279
280pub fn close(id: &str) {
281    SESSIONS.with(|s| {
282        s.borrow_mut().remove(id);
283    });
284}
285
286pub fn reset_transcript(id: &str) -> bool {
287    SESSIONS.with(|s| {
288        let mut map = s.borrow_mut();
289        let Some(state) = map.get_mut(id) else {
290            return false;
291        };
292        state.transcript = empty_transcript(id);
293        state.last_accessed = Instant::now();
294        true
295    })
296}
297
298/// Copy `src`'s transcript into a new session id. Subscribers are NOT
299/// copied — a fork is a conversation branch, not an event fanout.
300///
301/// Touches `src`'s `last_accessed` before evicting, so the fork
302/// operation itself can't make `src` look stale and kick it out of
303/// the LRU just to make room for the new fork.
304pub fn fork(src_id: &str, dst_id: Option<String>) -> Option<String> {
305    let (src_transcript, dst) = SESSIONS.with(|s| {
306        let mut map = s.borrow_mut();
307        let src = map.get_mut(src_id)?;
308        src.last_accessed = Instant::now();
309        let dst = dst_id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
310        let forked_transcript = clone_transcript_with_id(&src.transcript, &dst);
311        Some((forked_transcript, dst))
312    })?;
313    // Ensure cap is respected when inserting the fork.
314    open_or_create(Some(dst.clone()));
315    SESSIONS.with(|s| {
316        let mut map = s.borrow_mut();
317        if let Some(state) = map.get_mut(&dst) {
318            state.transcript = src_transcript;
319            state.last_accessed = Instant::now();
320        }
321        update_lineage(&mut map, src_id, &dst, None);
322    });
323    // open_or_create evicts BEFORE inserting, so the dst slot is
324    // guaranteed once we get here. The existence check is cheap
325    // insurance against a future refactor that breaks that invariant.
326    if exists(&dst) {
327        Some(dst)
328    } else {
329        None
330    }
331}
332
333/// Fork `src_id` and truncate the destination transcript to the
334/// first `keep_first` messages (#105 — branch-replay). Pairs with the
335/// scrubber: the IDE picks an event index, rebuilds a message count,
336/// and calls this to spawn a live sibling session that resumes from
337/// the rebuilt state. Subscribers are not carried over (same as
338/// `fork`), so sibling events don't double-fan into the parent's
339/// consumers.
340///
341/// Returns the new session id on success, `None` if `src_id` doesn't
342/// exist.
343pub fn fork_at(src_id: &str, keep_first: usize, dst_id: Option<String>) -> Option<String> {
344    let branched_at_event_index = SESSIONS.with(|s| {
345        let map = s.borrow();
346        let src = map.get(src_id)?;
347        Some(branch_event_index(&src.transcript, keep_first))
348    })?;
349    let new_id = fork(src_id, dst_id)?;
350    link_child_session_with_branch(src_id, &new_id, Some(branched_at_event_index));
351    retain_first(&new_id, keep_first);
352    Some(new_id)
353}
354
355/// Truncate the session transcript to the first `keep_first`
356/// messages (opposite of `trim`, which keeps the last N). Used by
357/// `fork_at` to cut a branch at a scrubber position.
358fn retain_first(id: &str, keep_first: usize) {
359    SESSIONS.with(|s| {
360        let mut map = s.borrow_mut();
361        let Some(state) = map.get_mut(id) else {
362            return;
363        };
364        let Some(dict) = state.transcript.as_dict() else {
365            return;
366        };
367        let dict = dict.clone();
368        let messages: Vec<VmValue> = match dict.get("messages") {
369            Some(VmValue::List(list)) => list.iter().cloned().collect(),
370            _ => Vec::new(),
371        };
372        let retained: Vec<VmValue> = messages.into_iter().take(keep_first).collect();
373        let mut next = dict;
374        next.insert(
375            "events".to_string(),
376            VmValue::List(Rc::new(
377                crate::llm::helpers::transcript_events_from_messages(&retained),
378            )),
379        );
380        next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
381        state.transcript = VmValue::Dict(Rc::new(next));
382        state.last_accessed = Instant::now();
383    });
384}
385
386/// Retain only the last `keep_last` messages in the session transcript.
387/// Returns the kept count (<= keep_last).
388pub fn trim(id: &str, keep_last: usize) -> Option<usize> {
389    SESSIONS.with(|s| {
390        let mut map = s.borrow_mut();
391        let state = map.get_mut(id)?;
392        let dict = state.transcript.as_dict()?.clone();
393        let messages: Vec<VmValue> = match dict.get("messages") {
394            Some(VmValue::List(list)) => list.iter().cloned().collect(),
395            _ => Vec::new(),
396        };
397        let start = messages.len().saturating_sub(keep_last);
398        let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
399        let kept = retained.len();
400        let mut next = dict;
401        next.insert(
402            "events".to_string(),
403            VmValue::List(Rc::new(
404                crate::llm::helpers::transcript_events_from_messages(&retained),
405            )),
406        );
407        next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
408        state.transcript = VmValue::Dict(Rc::new(next));
409        state.last_accessed = Instant::now();
410        Some(kept)
411    })
412}
413
414/// Append a message dict to the session transcript. The message must
415/// have at least a string `role`; anything else is merged verbatim.
416pub fn inject_message(id: &str, message: VmValue) -> Result<(), String> {
417    let Some(msg_dict) = message.as_dict().cloned() else {
418        return Err("agent_session_inject: message must be a dict".into());
419    };
420    let role_ok = matches!(msg_dict.get("role"), Some(VmValue::String(_)));
421    if !role_ok {
422        return Err(
423            "agent_session_inject: message must have a string `role` (user|assistant|tool_result|system)"
424                .into(),
425        );
426    }
427    SESSIONS.with(|s| {
428        let mut map = s.borrow_mut();
429        let Some(state) = map.get_mut(id) else {
430            return Err(format!("agent_session_inject: unknown session id '{id}'"));
431        };
432        let dict = state
433            .transcript
434            .as_dict()
435            .cloned()
436            .unwrap_or_else(BTreeMap::new);
437        let mut messages: Vec<VmValue> = match dict.get("messages") {
438            Some(VmValue::List(list)) => list.iter().cloned().collect(),
439            _ => Vec::new(),
440        };
441        messages.push(VmValue::Dict(Rc::new(msg_dict)));
442        let mut next = dict;
443        next.insert(
444            "events".to_string(),
445            VmValue::List(Rc::new(
446                crate::llm::helpers::transcript_events_from_messages(&messages),
447            )),
448        );
449        next.insert("messages".to_string(), VmValue::List(Rc::new(messages)));
450        state.transcript = VmValue::Dict(Rc::new(next));
451        state.last_accessed = Instant::now();
452        Ok(())
453    })
454}
455
456/// Load the messages vec (as JSON) for this session, for use as prefix
457/// to an agent_loop run. Returns an empty vec if the session doesn't
458/// exist or has no messages.
459pub fn messages_json(id: &str) -> Vec<serde_json::Value> {
460    SESSIONS.with(|s| {
461        let map = s.borrow();
462        let Some(state) = map.get(id) else {
463            return Vec::new();
464        };
465        let Some(dict) = state.transcript.as_dict() else {
466            return Vec::new();
467        };
468        match dict.get("messages") {
469            Some(VmValue::List(list)) => list
470                .iter()
471                .map(crate::llm::helpers::vm_value_to_json)
472                .collect(),
473            _ => Vec::new(),
474        }
475    })
476}
477
478#[derive(Clone, Debug, Default)]
479pub struct SessionPromptState {
480    pub messages: Vec<serde_json::Value>,
481    pub summary: Option<String>,
482}
483
484fn summary_message_json(summary: &str) -> serde_json::Value {
485    serde_json::json!({
486        "role": "user",
487        "content": summary,
488    })
489}
490
491fn messages_begin_with_summary(messages: &[serde_json::Value], summary: &str) -> bool {
492    messages.first().is_some_and(|message| {
493        message.get("role").and_then(|value| value.as_str()) == Some("user")
494            && message.get("content").and_then(|value| value.as_str()) == Some(summary)
495    })
496}
497
498/// Prompt-surface resume state for a persisted session.
499///
500/// Returns the compacted/rehydratable message list plus the transcript's
501/// summary field. When the transcript carries a summary field but its
502/// message list does not already begin with the compacted summary
503/// message, this helper prepends one so session re-entry preserves the
504/// same prompt surface the previous loop was actually using.
505pub fn prompt_state_json(id: &str) -> SessionPromptState {
506    SESSIONS.with(|s| {
507        let map = s.borrow();
508        let Some(state) = map.get(id) else {
509            return SessionPromptState::default();
510        };
511        let Some(dict) = state.transcript.as_dict() else {
512            return SessionPromptState::default();
513        };
514        let mut messages = match dict.get("messages") {
515            Some(VmValue::List(list)) => list
516                .iter()
517                .map(crate::llm::helpers::vm_value_to_json)
518                .collect::<Vec<_>>(),
519            _ => Vec::new(),
520        };
521        let summary = dict.get("summary").and_then(|value| match value {
522            VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
523            _ => None,
524        });
525        if let Some(summary_text) = summary.as_deref() {
526            if !messages_begin_with_summary(&messages, summary_text) {
527                messages.insert(0, summary_message_json(summary_text));
528            }
529        }
530        SessionPromptState { messages, summary }
531    })
532}
533
534/// Overwrite the transcript for this session. Used by `agent_loop` on
535/// exit to persist the synthesized transcript.
536pub fn store_transcript(id: &str, transcript: VmValue) {
537    SESSIONS.with(|s| {
538        if let Some(state) = s.borrow_mut().get_mut(id) {
539            state.transcript = transcript;
540            state.last_accessed = Instant::now();
541        }
542    });
543}
544
545/// Append a transcript event to the session without mutating its
546/// message list. Used for orchestration-side lineage events (sub-agent
547/// spawn/completion, workflow hooks, etc.) that should survive
548/// persistence/replay without being replayed back into the model as
549/// conversational messages.
550pub fn append_event(id: &str, event: VmValue) -> Result<(), String> {
551    let Some(event_dict) = event.as_dict() else {
552        return Err("agent_session_append_event: event must be a dict".into());
553    };
554    let kind_ok = matches!(event_dict.get("kind"), Some(VmValue::String(_)));
555    if !kind_ok {
556        return Err("agent_session_append_event: event must have a string `kind`".into());
557    }
558    SESSIONS.with(|s| {
559        let mut map = s.borrow_mut();
560        let Some(state) = map.get_mut(id) else {
561            return Err(format!(
562                "agent_session_append_event: unknown session id '{id}'"
563            ));
564        };
565        let dict = state
566            .transcript
567            .as_dict()
568            .cloned()
569            .unwrap_or_else(BTreeMap::new);
570        let mut events: Vec<VmValue> = match dict.get("events") {
571            Some(VmValue::List(list)) => list.iter().cloned().collect(),
572            _ => dict
573                .get("messages")
574                .and_then(|value| match value {
575                    VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
576                    _ => None,
577                })
578                .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
579                .unwrap_or_default(),
580        };
581        events.push(event);
582        let mut next = dict;
583        next.insert("events".to_string(), VmValue::List(Rc::new(events)));
584        state.transcript = VmValue::Dict(Rc::new(next));
585        state.last_accessed = Instant::now();
586        Ok(())
587    })
588}
589
590/// Replace the transcript's message list wholesale. Used by the
591/// in-loop compaction path, which operates on JSON messages.
592pub fn replace_messages(id: &str, messages: &[serde_json::Value]) {
593    SESSIONS.with(|s| {
594        let mut map = s.borrow_mut();
595        let Some(state) = map.get_mut(id) else {
596            return;
597        };
598        let dict = state
599            .transcript
600            .as_dict()
601            .cloned()
602            .unwrap_or_else(BTreeMap::new);
603        let vm_messages: Vec<VmValue> = messages
604            .iter()
605            .map(crate::stdlib::json_to_vm_value)
606            .collect();
607        let mut next = dict;
608        next.insert(
609            "events".to_string(),
610            VmValue::List(Rc::new(
611                crate::llm::helpers::transcript_events_from_messages(&vm_messages),
612            )),
613        );
614        next.insert("messages".to_string(), VmValue::List(Rc::new(vm_messages)));
615        state.transcript = VmValue::Dict(Rc::new(next));
616        state.last_accessed = Instant::now();
617    });
618}
619
620pub fn append_subscriber(id: &str, callback: VmValue) {
621    open_or_create(Some(id.to_string()));
622    SESSIONS.with(|s| {
623        if let Some(state) = s.borrow_mut().get_mut(id) {
624            state.subscribers.push(callback);
625            state.last_accessed = Instant::now();
626        }
627    });
628}
629
630pub fn subscribers_for(id: &str) -> Vec<VmValue> {
631    SESSIONS.with(|s| {
632        s.borrow()
633            .get(id)
634            .map(|state| state.subscribers.clone())
635            .unwrap_or_default()
636    })
637}
638
639pub fn subscriber_count(id: &str) -> usize {
640    SESSIONS.with(|s| {
641        s.borrow()
642            .get(id)
643            .map(|state| state.subscribers.len())
644            .unwrap_or(0)
645    })
646}
647
648/// Persist the set of active skill names for session resume. Called at
649/// the end of an agent_loop run; the next `open_or_create` for this id
650/// reads them back via [`active_skills`].
651pub fn set_active_skills(id: &str, skills: Vec<String>) {
652    SESSIONS.with(|s| {
653        if let Some(state) = s.borrow_mut().get_mut(id) {
654            state.active_skills = skills;
655            state.last_accessed = Instant::now();
656        }
657    });
658}
659
660/// Skills that were active at the end of the previous agent_loop run
661/// against this session. Returns an empty vec when the session doesn't
662/// exist or nothing was persisted.
663pub fn active_skills(id: &str) -> Vec<String> {
664    SESSIONS.with(|s| {
665        s.borrow()
666            .get(id)
667            .map(|state| state.active_skills.clone())
668            .unwrap_or_default()
669    })
670}
671
672fn empty_transcript(id: &str) -> VmValue {
673    use crate::llm::helpers::new_transcript_with;
674    new_transcript_with(Some(id.to_string()), Vec::new(), None, None)
675}
676
677fn clone_transcript_with_id(transcript: &VmValue, new_id: &str) -> VmValue {
678    let Some(dict) = transcript.as_dict() else {
679        return empty_transcript(new_id);
680    };
681    let mut next = dict.clone();
682    next.insert(
683        "id".to_string(),
684        VmValue::String(Rc::from(new_id.to_string())),
685    );
686    VmValue::Dict(Rc::new(next))
687}
688
689fn clone_transcript_with_parent(transcript: &VmValue, parent_id: &str) -> VmValue {
690    let Some(dict) = transcript.as_dict() else {
691        return transcript.clone();
692    };
693    let mut next = dict.clone();
694    let metadata = match next.get("metadata") {
695        Some(VmValue::Dict(metadata)) => {
696            let mut metadata = metadata.as_ref().clone();
697            metadata.insert(
698                "parent_session_id".to_string(),
699                VmValue::String(Rc::from(parent_id.to_string())),
700            );
701            VmValue::Dict(Rc::new(metadata))
702        }
703        _ => VmValue::Dict(Rc::new(BTreeMap::from([(
704            "parent_session_id".to_string(),
705            VmValue::String(Rc::from(parent_id.to_string())),
706        )]))),
707    };
708    next.insert("metadata".to_string(), metadata);
709    VmValue::Dict(Rc::new(next))
710}
711
712fn session_snapshot(state: &SessionState) -> VmValue {
713    let Some(dict) = state.transcript.as_dict() else {
714        return state.transcript.clone();
715    };
716    let mut next = dict.clone();
717    next.insert(
718        "parent_id".to_string(),
719        state
720            .parent_id
721            .as_ref()
722            .map(|id| VmValue::String(Rc::from(id.clone())))
723            .unwrap_or(VmValue::Nil),
724    );
725    next.insert(
726        "child_ids".to_string(),
727        VmValue::List(Rc::new(
728            state
729                .child_ids
730                .iter()
731                .cloned()
732                .map(|id| VmValue::String(Rc::from(id)))
733                .collect(),
734        )),
735    );
736    next.insert(
737        "branched_at_event_index".to_string(),
738        state
739            .branched_at_event_index
740            .map(|index| VmValue::Int(index as i64))
741            .unwrap_or(VmValue::Nil),
742    );
743    VmValue::Dict(Rc::new(next))
744}
745
746fn update_lineage(
747    map: &mut HashMap<String, SessionState>,
748    parent_id: &str,
749    child_id: &str,
750    branched_at_event_index: Option<usize>,
751) {
752    let old_parent_id = map.get(child_id).and_then(|child| child.parent_id.clone());
753    if let Some(old_parent_id) = old_parent_id.filter(|old_parent_id| old_parent_id != parent_id) {
754        if let Some(old_parent) = map.get_mut(&old_parent_id) {
755            old_parent.child_ids.retain(|id| id != child_id);
756            old_parent.last_accessed = Instant::now();
757        }
758    }
759    if let Some(parent) = map.get_mut(parent_id) {
760        parent.last_accessed = Instant::now();
761        if !parent.child_ids.iter().any(|id| id == child_id) {
762            parent.child_ids.push(child_id.to_string());
763        }
764    }
765    if let Some(child) = map.get_mut(child_id) {
766        child.last_accessed = Instant::now();
767        child.parent_id = Some(parent_id.to_string());
768        child.branched_at_event_index = branched_at_event_index;
769        child.transcript = clone_transcript_with_parent(&child.transcript, parent_id);
770    }
771}
772
773fn branch_event_index(transcript: &VmValue, keep_first: usize) -> usize {
774    if keep_first == 0 {
775        return 0;
776    }
777    let Some(dict) = transcript.as_dict() else {
778        return keep_first;
779    };
780    let Some(VmValue::List(events)) = dict.get("events") else {
781        return keep_first;
782    };
783    let mut retained_messages = 0usize;
784    for (index, event) in events.iter().enumerate() {
785        let kind = event
786            .as_dict()
787            .and_then(|dict| dict.get("kind"))
788            .map(VmValue::display);
789        if matches!(kind.as_deref(), Some("message" | "tool_result")) {
790            retained_messages += 1;
791            if retained_messages == keep_first {
792                return index + 1;
793            }
794        }
795    }
796    events.len()
797}
798
799#[cfg(test)]
800mod tests {
801    use super::*;
802    use crate::agent_events::{
803        emit_event, reset_all_sinks, session_external_sink_count, AgentEvent,
804    };
805    use crate::event_log::{active_event_log, EventLog, Topic};
806    use std::collections::BTreeMap;
807
808    fn make_msg(role: &str, content: &str) -> VmValue {
809        let mut m: BTreeMap<String, VmValue> = BTreeMap::new();
810        m.insert("role".to_string(), VmValue::String(Rc::from(role)));
811        m.insert("content".to_string(), VmValue::String(Rc::from(content)));
812        VmValue::Dict(Rc::new(m))
813    }
814
815    fn message_count(id: &str) -> usize {
816        SESSIONS.with(|s| {
817            let map = s.borrow();
818            let Some(state) = map.get(id) else { return 0 };
819            let Some(dict) = state.transcript.as_dict() else {
820                return 0;
821            };
822            match dict.get("messages") {
823                Some(VmValue::List(list)) => list.len(),
824                _ => 0,
825            }
826        })
827    }
828
829    #[test]
830    fn fork_at_truncates_destination_to_keep_first() {
831        reset_session_store();
832        let src = open_or_create(Some("src-fork-at".into()));
833        inject_message(&src, make_msg("user", "a")).unwrap();
834        inject_message(&src, make_msg("assistant", "b")).unwrap();
835        inject_message(&src, make_msg("user", "c")).unwrap();
836        inject_message(&src, make_msg("assistant", "d")).unwrap();
837        assert_eq!(message_count(&src), 4);
838
839        let dst = fork_at(&src, 2, Some("dst-fork-at".into())).expect("fork_at");
840        assert_ne!(dst, src);
841        assert_eq!(message_count(&dst), 2, "branched at message index 2");
842        assert_eq!(
843            snapshot(&dst)
844                .and_then(|value| value.as_dict().cloned())
845                .and_then(|dict| dict
846                    .get("branched_at_event_index")
847                    .and_then(VmValue::as_int)),
848            Some(2)
849        );
850        // Source untouched.
851        assert_eq!(message_count(&src), 4);
852        // Subscribers not carried — forks start with a clean fanout list.
853        assert_eq!(subscriber_count(&dst), 0);
854        reset_session_store();
855    }
856
857    #[test]
858    fn fork_at_on_unknown_source_returns_none() {
859        reset_session_store();
860        assert!(fork_at("does-not-exist", 3, None).is_none());
861    }
862
863    #[test]
864    fn child_sessions_record_parent_lineage() {
865        reset_session_store();
866        let parent = open_or_create(Some("parent-session".into()));
867        let child = open_child_session(&parent, Some("child-session".into()));
868        assert_eq!(parent_id(&child).as_deref(), Some("parent-session"));
869        assert_eq!(child_ids(&parent), vec!["child-session".to_string()]);
870        assert_eq!(
871            ancestry(&child),
872            Some(SessionAncestry {
873                parent_id: Some("parent-session".to_string()),
874                child_ids: Vec::new(),
875                root_id: "parent-session".to_string(),
876            })
877        );
878
879        let transcript = snapshot(&child).expect("child transcript");
880        let transcript = transcript.as_dict().expect("child snapshot");
881        let metadata = transcript
882            .get("metadata")
883            .and_then(|value| value.as_dict())
884            .expect("child metadata");
885        assert!(
886            matches!(transcript.get("parent_id"), Some(VmValue::String(value)) if value.as_ref() == "parent-session")
887        );
888        assert!(
889            matches!(transcript.get("child_ids"), Some(VmValue::List(children)) if children.is_empty())
890        );
891        assert!(matches!(
892            transcript.get("branched_at_event_index"),
893            Some(VmValue::Nil)
894        ));
895        assert!(matches!(
896            metadata.get("parent_session_id"),
897            Some(VmValue::String(value)) if value.as_ref() == "parent-session"
898        ));
899    }
900
901    #[test]
902    fn branch_event_index_counts_non_message_events() {
903        reset_session_store();
904        let src = open_or_create(Some("branch-event-index".into()));
905        let transcript = VmValue::Dict(Rc::new(BTreeMap::from([
906            ("id".to_string(), VmValue::String(Rc::from(src.clone()))),
907            (
908                "messages".to_string(),
909                VmValue::List(Rc::new(vec![
910                    make_msg("user", "a"),
911                    make_msg("assistant", "b"),
912                ])),
913            ),
914            (
915                "events".to_string(),
916                VmValue::List(Rc::new(vec![
917                    VmValue::Dict(Rc::new(BTreeMap::from([(
918                        "kind".to_string(),
919                        VmValue::String(Rc::from("message")),
920                    )]))),
921                    VmValue::Dict(Rc::new(BTreeMap::from([(
922                        "kind".to_string(),
923                        VmValue::String(Rc::from("sub_agent_start")),
924                    )]))),
925                    VmValue::Dict(Rc::new(BTreeMap::from([(
926                        "kind".to_string(),
927                        VmValue::String(Rc::from("message")),
928                    )]))),
929                ])),
930            ),
931        ])));
932        store_transcript(&src, transcript);
933
934        let dst = fork_at(&src, 2, Some("branch-event-index-child".into())).expect("fork_at");
935        assert_eq!(
936            snapshot(&dst)
937                .and_then(|value| value.as_dict().cloned())
938                .and_then(|dict| dict
939                    .get("branched_at_event_index")
940                    .and_then(VmValue::as_int)),
941            Some(3)
942        );
943    }
944
945    #[test]
946    fn child_session_forks_parent_transcript() {
947        reset_session_store();
948        let parent = open_or_create(Some("parent-fork-parent".into()));
949        inject_message(&parent, make_msg("user", "parent context")).unwrap();
950
951        let child = open_child_session(&parent, Some("parent-fork-child".into()));
952        assert_eq!(message_count(&parent), 1);
953        assert_eq!(message_count(&child), 1);
954
955        let child_messages = messages_json(&child);
956        assert_eq!(
957            child_messages[0]["content"].as_str(),
958            Some("parent context"),
959        );
960    }
961
962    #[test]
963    fn prompt_state_prepends_summary_message_when_missing_from_messages() {
964        reset_session_store();
965        let session = open_or_create(Some("prompt-state-summary".into()));
966        let transcript = crate::llm::helpers::new_transcript_with_events(
967            Some(session.clone()),
968            vec![make_msg("assistant", "latest answer")],
969            Some("[auto-compacted 2 older messages]\nsummary".to_string()),
970            None,
971            Vec::new(),
972            Vec::new(),
973            Some("active"),
974        );
975        store_transcript(&session, transcript);
976
977        let prompt = prompt_state_json(&session);
978        assert_eq!(
979            prompt.summary.as_deref(),
980            Some("[auto-compacted 2 older messages]\nsummary")
981        );
982        assert_eq!(prompt.messages.len(), 2);
983        assert_eq!(prompt.messages[0]["role"].as_str(), Some("user"));
984        assert_eq!(
985            prompt.messages[0]["content"].as_str(),
986            Some("[auto-compacted 2 older messages]\nsummary"),
987        );
988        assert_eq!(prompt.messages[1]["role"].as_str(), Some("assistant"));
989    }
990
991    #[tokio::test(flavor = "current_thread")]
992    async fn open_or_create_registers_event_log_sink_when_active_log_is_installed() {
993        reset_all_sinks();
994        crate::event_log::reset_active_event_log();
995        let dir = tempfile::tempdir().expect("tempdir");
996        crate::event_log::install_default_for_base_dir(dir.path()).expect("install event log");
997
998        let session = open_or_create(Some("event-log-session".into()));
999        assert_eq!(session_external_sink_count(&session), 1);
1000
1001        emit_event(&AgentEvent::TurnStart {
1002            session_id: session.clone(),
1003            iteration: 0,
1004        });
1005        tokio::time::sleep(std::time::Duration::from_millis(25)).await;
1006
1007        let topic = Topic::new("observability.agent_events.event-log-session").unwrap();
1008        let log = active_event_log().expect("active event log");
1009        let events = log.read_range(&topic, None, usize::MAX).await.unwrap();
1010        assert_eq!(events.len(), 1);
1011        assert_eq!(events[0].1.kind, "turn_start");
1012
1013        crate::event_log::reset_active_event_log();
1014        reset_all_sinks();
1015    }
1016}