Skip to main content

harn_vm/
agent_sessions.rs

1//! First-class session storage.
2//!
3//! A session owns three things:
4//!
5//! 1. A transcript dict (messages, events, summary, metadata, …).
6//! 2. Closure subscribers that fire on agent-loop events for this session.
7//! 3. Its own lifecycle (open, reset, fork, trim, compact, close).
8//!
9//! Storage is thread-local because `VmValue` contains `Rc`, which is
10//! neither `Send` nor `Sync`. The agent loop runs on a tokio
11//! current-thread worker, so all session reads and writes happen on the
12//! same thread. The closure-subscribers register, fire, and unregister
13//! on that same thread.
14//!
15//! Lifecycle is explicit. Builtins (`agent_session_open`,
16//! `_reset`, `_fork`, `_fork_at`, `_close`, `_trim`, `_compact`,
17//! `_inject`, `_exists`, `_length`, `_snapshot`, `_ancestry`) drive
18//! the store directly — there is no "policy" config dict that
19//! performs lifecycle as a side effect.
20
21use std::cell::{Cell, RefCell};
22use std::collections::{BTreeMap, HashMap, HashSet};
23use std::rc::Rc;
24use std::time::Instant;
25
26use crate::value::VmValue;
27
28/// Default cap on concurrent sessions per VM thread. Beyond this the
29/// least-recently-accessed session is evicted on the next `open`.
30pub const DEFAULT_SESSION_CAP: usize = 128;
31
32pub struct SessionState {
33    pub id: String,
34    pub transcript: VmValue,
35    pub subscribers: Vec<VmValue>,
36    pub created_at: Instant,
37    pub last_accessed: Instant,
38    pub parent_id: Option<String>,
39    pub child_ids: Vec<String>,
40    pub branched_at_event_index: Option<usize>,
41    /// Names of skills that were active at the end of the most recent
42    /// `agent_loop` run on this session. Empty when no skills were
43    /// matched, when the skill system wasn't used, or when the
44    /// deactivation phase cleared them. Re-entering the session
45    /// restores these as the initial active set before matching runs.
46    pub active_skills: Vec<String>,
47}
48
49impl SessionState {
50    fn new(id: String) -> Self {
51        let now = Instant::now();
52        let transcript = empty_transcript(&id);
53        Self {
54            id,
55            transcript,
56            subscribers: Vec::new(),
57            created_at: now,
58            last_accessed: now,
59            parent_id: None,
60            child_ids: Vec::new(),
61            branched_at_event_index: None,
62            active_skills: Vec::new(),
63        }
64    }
65}
66
67#[derive(Clone, Debug, PartialEq, Eq)]
68pub struct SessionAncestry {
69    pub parent_id: Option<String>,
70    pub child_ids: Vec<String>,
71    pub root_id: String,
72}
73
74thread_local! {
75    static SESSIONS: RefCell<HashMap<String, SessionState>> = RefCell::new(HashMap::new());
76    static SESSION_CAP: Cell<usize> = const { Cell::new(DEFAULT_SESSION_CAP) };
77    static CURRENT_SESSION_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
78}
79
80/// Set the per-thread session cap. Primarily for tests; production VMs
81/// inherit the default.
82pub fn set_session_cap(cap: usize) {
83    SESSION_CAP.with(|c| c.set(cap.max(1)));
84}
85
86pub fn session_cap() -> usize {
87    SESSION_CAP.with(|c| c.get())
88}
89
90/// Clear the session store. Wired into `reset_llm_state` for test isolation.
91pub fn reset_session_store() {
92    SESSIONS.with(|s| s.borrow_mut().clear());
93    CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().clear());
94}
95
96pub(crate) fn push_current_session(id: String) {
97    if id.is_empty() {
98        return;
99    }
100    CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().push(id));
101}
102
103pub(crate) fn pop_current_session() {
104    CURRENT_SESSION_STACK.with(|stack| {
105        let _ = stack.borrow_mut().pop();
106    });
107}
108
109pub fn current_session_id() -> Option<String> {
110    CURRENT_SESSION_STACK.with(|stack| stack.borrow().last().cloned())
111}
112
113pub fn exists(id: &str) -> bool {
114    SESSIONS.with(|s| s.borrow().contains_key(id))
115}
116
117pub fn length(id: &str) -> Option<usize> {
118    SESSIONS.with(|s| {
119        s.borrow().get(id).map(|state| {
120            state
121                .transcript
122                .as_dict()
123                .and_then(|d| d.get("messages"))
124                .and_then(|v| match v {
125                    VmValue::List(list) => Some(list.len()),
126                    _ => None,
127                })
128                .unwrap_or(0)
129        })
130    })
131}
132
133pub fn snapshot(id: &str) -> Option<VmValue> {
134    SESSIONS.with(|s| s.borrow().get(id).map(session_snapshot))
135}
136
137/// Open a session, or create it if missing. Returns the resolved id.
138///
139/// Newly-created sessions auto-register an event-log-backed sink when a
140/// generalized [`crate::event_log::EventLog`] has been installed for the
141/// current VM thread. For legacy env-driven workflows that still point
142/// `HARN_EVENT_LOG_DIR` at a directory, we preserve the older JSONL sink
143/// as a compatibility fallback. Re-opening an existing session does not
144/// re-register — sinks are per-session, owned by the first opener.
145pub fn open_or_create(id: Option<String>) -> String {
146    let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
147    let mut was_new = false;
148    SESSIONS.with(|s| {
149        let mut map = s.borrow_mut();
150        if let Some(state) = map.get_mut(&resolved) {
151            state.last_accessed = Instant::now();
152            return;
153        }
154        was_new = true;
155        let cap = SESSION_CAP.with(|c| c.get());
156        if map.len() >= cap {
157            if let Some(victim) = map
158                .iter()
159                .min_by_key(|(_, state)| state.last_accessed)
160                .map(|(id, _)| id.clone())
161            {
162                map.remove(&victim);
163            }
164        }
165        map.insert(resolved.clone(), SessionState::new(resolved.clone()));
166    });
167    if was_new {
168        try_register_event_log(&resolved);
169    }
170    resolved
171}
172
173pub fn open_child_session(parent_id: &str, id: Option<String>) -> String {
174    let resolved = fork(parent_id, id.clone()).unwrap_or_else(|| open_or_create(id));
175    link_child_session(parent_id, &resolved);
176    resolved
177}
178
179pub fn link_child_session(parent_id: &str, child_id: &str) {
180    link_child_session_with_branch(parent_id, child_id, None);
181}
182
183pub fn link_child_session_with_branch(
184    parent_id: &str,
185    child_id: &str,
186    branched_at_event_index: Option<usize>,
187) {
188    if parent_id == child_id {
189        return;
190    }
191    open_or_create(Some(parent_id.to_string()));
192    open_or_create(Some(child_id.to_string()));
193    SESSIONS.with(|s| {
194        let mut map = s.borrow_mut();
195        update_lineage(&mut map, parent_id, child_id, branched_at_event_index);
196    });
197}
198
199pub fn parent_id(id: &str) -> Option<String> {
200    SESSIONS.with(|s| s.borrow().get(id).and_then(|state| state.parent_id.clone()))
201}
202
203pub fn child_ids(id: &str) -> Vec<String> {
204    SESSIONS.with(|s| {
205        s.borrow()
206            .get(id)
207            .map(|state| state.child_ids.clone())
208            .unwrap_or_default()
209    })
210}
211
212pub fn ancestry(id: &str) -> Option<SessionAncestry> {
213    SESSIONS.with(|s| {
214        let map = s.borrow();
215        let state = map.get(id)?;
216        let mut root_id = state.id.clone();
217        let mut cursor = state.parent_id.clone();
218        let mut seen = HashSet::from([state.id.clone()]);
219        while let Some(parent_id) = cursor {
220            if !seen.insert(parent_id.clone()) {
221                break;
222            }
223            root_id = parent_id.clone();
224            cursor = map
225                .get(&parent_id)
226                .and_then(|parent| parent.parent_id.clone());
227        }
228        Some(SessionAncestry {
229            parent_id: state.parent_id.clone(),
230            child_ids: state.child_ids.clone(),
231            root_id,
232        })
233    })
234}
235
236/// Auto-register a persistent sink for a newly-created session.
237/// Silent no-op on failure — a broken observability sink must never
238/// prevent a session from starting.
239fn try_register_event_log(session_id: &str) {
240    if let Some(log) = crate::event_log::active_event_log() {
241        crate::agent_events::register_sink(
242            session_id,
243            crate::agent_events::EventLogSink::new(log, session_id),
244        );
245        return;
246    }
247    let Ok(dir) = std::env::var("HARN_EVENT_LOG_DIR") else {
248        return;
249    };
250    if dir.is_empty() {
251        return;
252    }
253    let path = std::path::PathBuf::from(dir).join(format!("event_log-{session_id}.jsonl"));
254    if let Ok(sink) = crate::agent_events::JsonlEventSink::open(&path) {
255        crate::agent_events::register_sink(session_id, sink);
256    }
257}
258
259pub fn close(id: &str) {
260    SESSIONS.with(|s| {
261        s.borrow_mut().remove(id);
262    });
263}
264
265pub fn reset_transcript(id: &str) -> bool {
266    SESSIONS.with(|s| {
267        let mut map = s.borrow_mut();
268        let Some(state) = map.get_mut(id) else {
269            return false;
270        };
271        state.transcript = empty_transcript(id);
272        state.last_accessed = Instant::now();
273        true
274    })
275}
276
277/// Copy `src`'s transcript into a new session id. Subscribers are NOT
278/// copied — a fork is a conversation branch, not an event fanout.
279///
280/// Touches `src`'s `last_accessed` before evicting, so the fork
281/// operation itself can't make `src` look stale and kick it out of
282/// the LRU just to make room for the new fork.
283pub fn fork(src_id: &str, dst_id: Option<String>) -> Option<String> {
284    let (src_transcript, dst) = SESSIONS.with(|s| {
285        let mut map = s.borrow_mut();
286        let src = map.get_mut(src_id)?;
287        src.last_accessed = Instant::now();
288        let dst = dst_id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
289        let forked_transcript = clone_transcript_with_id(&src.transcript, &dst);
290        Some((forked_transcript, dst))
291    })?;
292    // Ensure cap is respected when inserting the fork.
293    open_or_create(Some(dst.clone()));
294    SESSIONS.with(|s| {
295        let mut map = s.borrow_mut();
296        if let Some(state) = map.get_mut(&dst) {
297            state.transcript = src_transcript;
298            state.last_accessed = Instant::now();
299        }
300        update_lineage(&mut map, src_id, &dst, None);
301    });
302    // open_or_create evicts BEFORE inserting, so the dst slot is
303    // guaranteed once we get here. The existence check is cheap
304    // insurance against a future refactor that breaks that invariant.
305    if exists(&dst) {
306        Some(dst)
307    } else {
308        None
309    }
310}
311
312/// Fork `src_id` and truncate the destination transcript to the
313/// first `keep_first` messages (#105 — branch-replay). Pairs with the
314/// scrubber: the IDE picks an event index, rebuilds a message count,
315/// and calls this to spawn a live sibling session that resumes from
316/// the rebuilt state. Subscribers are not carried over (same as
317/// `fork`), so sibling events don't double-fan into the parent's
318/// consumers.
319///
320/// Returns the new session id on success, `None` if `src_id` doesn't
321/// exist.
322pub fn fork_at(src_id: &str, keep_first: usize, dst_id: Option<String>) -> Option<String> {
323    let branched_at_event_index = SESSIONS.with(|s| {
324        let map = s.borrow();
325        let src = map.get(src_id)?;
326        Some(branch_event_index(&src.transcript, keep_first))
327    })?;
328    let new_id = fork(src_id, dst_id)?;
329    link_child_session_with_branch(src_id, &new_id, Some(branched_at_event_index));
330    retain_first(&new_id, keep_first);
331    Some(new_id)
332}
333
334/// Truncate the session transcript to the first `keep_first`
335/// messages (opposite of `trim`, which keeps the last N). Used by
336/// `fork_at` to cut a branch at a scrubber position.
337fn retain_first(id: &str, keep_first: usize) {
338    SESSIONS.with(|s| {
339        let mut map = s.borrow_mut();
340        let Some(state) = map.get_mut(id) else {
341            return;
342        };
343        let Some(dict) = state.transcript.as_dict() else {
344            return;
345        };
346        let dict = dict.clone();
347        let messages: Vec<VmValue> = match dict.get("messages") {
348            Some(VmValue::List(list)) => list.iter().cloned().collect(),
349            _ => Vec::new(),
350        };
351        let retained: Vec<VmValue> = messages.into_iter().take(keep_first).collect();
352        let mut next = dict;
353        next.insert(
354            "events".to_string(),
355            VmValue::List(Rc::new(
356                crate::llm::helpers::transcript_events_from_messages(&retained),
357            )),
358        );
359        next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
360        state.transcript = VmValue::Dict(Rc::new(next));
361        state.last_accessed = Instant::now();
362    });
363}
364
365/// Retain only the last `keep_last` messages in the session transcript.
366/// Returns the kept count (<= keep_last).
367pub fn trim(id: &str, keep_last: usize) -> Option<usize> {
368    SESSIONS.with(|s| {
369        let mut map = s.borrow_mut();
370        let state = map.get_mut(id)?;
371        let dict = state.transcript.as_dict()?.clone();
372        let messages: Vec<VmValue> = match dict.get("messages") {
373            Some(VmValue::List(list)) => list.iter().cloned().collect(),
374            _ => Vec::new(),
375        };
376        let start = messages.len().saturating_sub(keep_last);
377        let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
378        let kept = retained.len();
379        let mut next = dict;
380        next.insert(
381            "events".to_string(),
382            VmValue::List(Rc::new(
383                crate::llm::helpers::transcript_events_from_messages(&retained),
384            )),
385        );
386        next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
387        state.transcript = VmValue::Dict(Rc::new(next));
388        state.last_accessed = Instant::now();
389        Some(kept)
390    })
391}
392
393/// Append a message dict to the session transcript. The message must
394/// have at least a string `role`; anything else is merged verbatim.
395pub fn inject_message(id: &str, message: VmValue) -> Result<(), String> {
396    let Some(msg_dict) = message.as_dict().cloned() else {
397        return Err("agent_session_inject: message must be a dict".into());
398    };
399    let role_ok = matches!(msg_dict.get("role"), Some(VmValue::String(_)));
400    if !role_ok {
401        return Err(
402            "agent_session_inject: message must have a string `role` (user|assistant|tool_result|system)"
403                .into(),
404        );
405    }
406    SESSIONS.with(|s| {
407        let mut map = s.borrow_mut();
408        let Some(state) = map.get_mut(id) else {
409            return Err(format!("agent_session_inject: unknown session id '{id}'"));
410        };
411        let dict = state
412            .transcript
413            .as_dict()
414            .cloned()
415            .unwrap_or_else(BTreeMap::new);
416        let mut messages: Vec<VmValue> = match dict.get("messages") {
417            Some(VmValue::List(list)) => list.iter().cloned().collect(),
418            _ => Vec::new(),
419        };
420        messages.push(VmValue::Dict(Rc::new(msg_dict)));
421        let mut next = dict;
422        next.insert(
423            "events".to_string(),
424            VmValue::List(Rc::new(
425                crate::llm::helpers::transcript_events_from_messages(&messages),
426            )),
427        );
428        next.insert("messages".to_string(), VmValue::List(Rc::new(messages)));
429        state.transcript = VmValue::Dict(Rc::new(next));
430        state.last_accessed = Instant::now();
431        Ok(())
432    })
433}
434
435/// Load the messages vec (as JSON) for this session, for use as prefix
436/// to an agent_loop run. Returns an empty vec if the session doesn't
437/// exist or has no messages.
438pub fn messages_json(id: &str) -> Vec<serde_json::Value> {
439    SESSIONS.with(|s| {
440        let map = s.borrow();
441        let Some(state) = map.get(id) else {
442            return Vec::new();
443        };
444        let Some(dict) = state.transcript.as_dict() else {
445            return Vec::new();
446        };
447        match dict.get("messages") {
448            Some(VmValue::List(list)) => list
449                .iter()
450                .map(crate::llm::helpers::vm_value_to_json)
451                .collect(),
452            _ => Vec::new(),
453        }
454    })
455}
456
457#[derive(Clone, Debug, Default)]
458pub struct SessionPromptState {
459    pub messages: Vec<serde_json::Value>,
460    pub summary: Option<String>,
461}
462
463fn summary_message_json(summary: &str) -> serde_json::Value {
464    serde_json::json!({
465        "role": "user",
466        "content": summary,
467    })
468}
469
470fn messages_begin_with_summary(messages: &[serde_json::Value], summary: &str) -> bool {
471    messages.first().is_some_and(|message| {
472        message.get("role").and_then(|value| value.as_str()) == Some("user")
473            && message.get("content").and_then(|value| value.as_str()) == Some(summary)
474    })
475}
476
477/// Prompt-surface resume state for a persisted session.
478///
479/// Returns the compacted/rehydratable message list plus the transcript's
480/// summary field. When the transcript carries a summary field but its
481/// message list does not already begin with the compacted summary
482/// message, this helper prepends one so session re-entry preserves the
483/// same prompt surface the previous loop was actually using.
484pub fn prompt_state_json(id: &str) -> SessionPromptState {
485    SESSIONS.with(|s| {
486        let map = s.borrow();
487        let Some(state) = map.get(id) else {
488            return SessionPromptState::default();
489        };
490        let Some(dict) = state.transcript.as_dict() else {
491            return SessionPromptState::default();
492        };
493        let mut messages = match dict.get("messages") {
494            Some(VmValue::List(list)) => list
495                .iter()
496                .map(crate::llm::helpers::vm_value_to_json)
497                .collect::<Vec<_>>(),
498            _ => Vec::new(),
499        };
500        let summary = dict.get("summary").and_then(|value| match value {
501            VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
502            _ => None,
503        });
504        if let Some(summary_text) = summary.as_deref() {
505            if !messages_begin_with_summary(&messages, summary_text) {
506                messages.insert(0, summary_message_json(summary_text));
507            }
508        }
509        SessionPromptState { messages, summary }
510    })
511}
512
513/// Overwrite the transcript for this session. Used by `agent_loop` on
514/// exit to persist the synthesized transcript.
515pub fn store_transcript(id: &str, transcript: VmValue) {
516    SESSIONS.with(|s| {
517        if let Some(state) = s.borrow_mut().get_mut(id) {
518            state.transcript = transcript;
519            state.last_accessed = Instant::now();
520        }
521    });
522}
523
524/// Append a transcript event to the session without mutating its
525/// message list. Used for orchestration-side lineage events (sub-agent
526/// spawn/completion, workflow hooks, etc.) that should survive
527/// persistence/replay without being replayed back into the model as
528/// conversational messages.
529pub fn append_event(id: &str, event: VmValue) -> Result<(), String> {
530    let Some(event_dict) = event.as_dict() else {
531        return Err("agent_session_append_event: event must be a dict".into());
532    };
533    let kind_ok = matches!(event_dict.get("kind"), Some(VmValue::String(_)));
534    if !kind_ok {
535        return Err("agent_session_append_event: event must have a string `kind`".into());
536    }
537    SESSIONS.with(|s| {
538        let mut map = s.borrow_mut();
539        let Some(state) = map.get_mut(id) else {
540            return Err(format!(
541                "agent_session_append_event: unknown session id '{id}'"
542            ));
543        };
544        let dict = state
545            .transcript
546            .as_dict()
547            .cloned()
548            .unwrap_or_else(BTreeMap::new);
549        let mut events: Vec<VmValue> = match dict.get("events") {
550            Some(VmValue::List(list)) => list.iter().cloned().collect(),
551            _ => dict
552                .get("messages")
553                .and_then(|value| match value {
554                    VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
555                    _ => None,
556                })
557                .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
558                .unwrap_or_default(),
559        };
560        events.push(event);
561        let mut next = dict;
562        next.insert("events".to_string(), VmValue::List(Rc::new(events)));
563        state.transcript = VmValue::Dict(Rc::new(next));
564        state.last_accessed = Instant::now();
565        Ok(())
566    })
567}
568
569/// Replace the transcript's message list wholesale. Used by the
570/// in-loop compaction path, which operates on JSON messages.
571pub fn replace_messages(id: &str, messages: &[serde_json::Value]) {
572    SESSIONS.with(|s| {
573        let mut map = s.borrow_mut();
574        let Some(state) = map.get_mut(id) else {
575            return;
576        };
577        let dict = state
578            .transcript
579            .as_dict()
580            .cloned()
581            .unwrap_or_else(BTreeMap::new);
582        let vm_messages: Vec<VmValue> = messages
583            .iter()
584            .map(crate::stdlib::json_to_vm_value)
585            .collect();
586        let mut next = dict;
587        next.insert(
588            "events".to_string(),
589            VmValue::List(Rc::new(
590                crate::llm::helpers::transcript_events_from_messages(&vm_messages),
591            )),
592        );
593        next.insert("messages".to_string(), VmValue::List(Rc::new(vm_messages)));
594        state.transcript = VmValue::Dict(Rc::new(next));
595        state.last_accessed = Instant::now();
596    });
597}
598
599pub fn append_subscriber(id: &str, callback: VmValue) {
600    open_or_create(Some(id.to_string()));
601    SESSIONS.with(|s| {
602        if let Some(state) = s.borrow_mut().get_mut(id) {
603            state.subscribers.push(callback);
604            state.last_accessed = Instant::now();
605        }
606    });
607}
608
609pub fn subscribers_for(id: &str) -> Vec<VmValue> {
610    SESSIONS.with(|s| {
611        s.borrow()
612            .get(id)
613            .map(|state| state.subscribers.clone())
614            .unwrap_or_default()
615    })
616}
617
618pub fn subscriber_count(id: &str) -> usize {
619    SESSIONS.with(|s| {
620        s.borrow()
621            .get(id)
622            .map(|state| state.subscribers.len())
623            .unwrap_or(0)
624    })
625}
626
627/// Persist the set of active skill names for session resume. Called at
628/// the end of an agent_loop run; the next `open_or_create` for this id
629/// reads them back via [`active_skills`].
630pub fn set_active_skills(id: &str, skills: Vec<String>) {
631    SESSIONS.with(|s| {
632        if let Some(state) = s.borrow_mut().get_mut(id) {
633            state.active_skills = skills;
634            state.last_accessed = Instant::now();
635        }
636    });
637}
638
639/// Skills that were active at the end of the previous agent_loop run
640/// against this session. Returns an empty vec when the session doesn't
641/// exist or nothing was persisted.
642pub fn active_skills(id: &str) -> Vec<String> {
643    SESSIONS.with(|s| {
644        s.borrow()
645            .get(id)
646            .map(|state| state.active_skills.clone())
647            .unwrap_or_default()
648    })
649}
650
651fn empty_transcript(id: &str) -> VmValue {
652    use crate::llm::helpers::new_transcript_with;
653    new_transcript_with(Some(id.to_string()), Vec::new(), None, None)
654}
655
656fn clone_transcript_with_id(transcript: &VmValue, new_id: &str) -> VmValue {
657    let Some(dict) = transcript.as_dict() else {
658        return empty_transcript(new_id);
659    };
660    let mut next = dict.clone();
661    next.insert(
662        "id".to_string(),
663        VmValue::String(Rc::from(new_id.to_string())),
664    );
665    VmValue::Dict(Rc::new(next))
666}
667
668fn clone_transcript_with_parent(transcript: &VmValue, parent_id: &str) -> VmValue {
669    let Some(dict) = transcript.as_dict() else {
670        return transcript.clone();
671    };
672    let mut next = dict.clone();
673    let metadata = match next.get("metadata") {
674        Some(VmValue::Dict(metadata)) => {
675            let mut metadata = metadata.as_ref().clone();
676            metadata.insert(
677                "parent_session_id".to_string(),
678                VmValue::String(Rc::from(parent_id.to_string())),
679            );
680            VmValue::Dict(Rc::new(metadata))
681        }
682        _ => VmValue::Dict(Rc::new(BTreeMap::from([(
683            "parent_session_id".to_string(),
684            VmValue::String(Rc::from(parent_id.to_string())),
685        )]))),
686    };
687    next.insert("metadata".to_string(), metadata);
688    VmValue::Dict(Rc::new(next))
689}
690
691fn session_snapshot(state: &SessionState) -> VmValue {
692    let Some(dict) = state.transcript.as_dict() else {
693        return state.transcript.clone();
694    };
695    let mut next = dict.clone();
696    next.insert(
697        "parent_id".to_string(),
698        state
699            .parent_id
700            .as_ref()
701            .map(|id| VmValue::String(Rc::from(id.clone())))
702            .unwrap_or(VmValue::Nil),
703    );
704    next.insert(
705        "child_ids".to_string(),
706        VmValue::List(Rc::new(
707            state
708                .child_ids
709                .iter()
710                .cloned()
711                .map(|id| VmValue::String(Rc::from(id)))
712                .collect(),
713        )),
714    );
715    next.insert(
716        "branched_at_event_index".to_string(),
717        state
718            .branched_at_event_index
719            .map(|index| VmValue::Int(index as i64))
720            .unwrap_or(VmValue::Nil),
721    );
722    VmValue::Dict(Rc::new(next))
723}
724
725fn update_lineage(
726    map: &mut HashMap<String, SessionState>,
727    parent_id: &str,
728    child_id: &str,
729    branched_at_event_index: Option<usize>,
730) {
731    let old_parent_id = map.get(child_id).and_then(|child| child.parent_id.clone());
732    if let Some(old_parent_id) = old_parent_id.filter(|old_parent_id| old_parent_id != parent_id) {
733        if let Some(old_parent) = map.get_mut(&old_parent_id) {
734            old_parent.child_ids.retain(|id| id != child_id);
735            old_parent.last_accessed = Instant::now();
736        }
737    }
738    if let Some(parent) = map.get_mut(parent_id) {
739        parent.last_accessed = Instant::now();
740        if !parent.child_ids.iter().any(|id| id == child_id) {
741            parent.child_ids.push(child_id.to_string());
742        }
743    }
744    if let Some(child) = map.get_mut(child_id) {
745        child.last_accessed = Instant::now();
746        child.parent_id = Some(parent_id.to_string());
747        child.branched_at_event_index = branched_at_event_index;
748        child.transcript = clone_transcript_with_parent(&child.transcript, parent_id);
749    }
750}
751
752fn branch_event_index(transcript: &VmValue, keep_first: usize) -> usize {
753    if keep_first == 0 {
754        return 0;
755    }
756    let Some(dict) = transcript.as_dict() else {
757        return keep_first;
758    };
759    let Some(VmValue::List(events)) = dict.get("events") else {
760        return keep_first;
761    };
762    let mut retained_messages = 0usize;
763    for (index, event) in events.iter().enumerate() {
764        let kind = event
765            .as_dict()
766            .and_then(|dict| dict.get("kind"))
767            .map(VmValue::display);
768        if matches!(kind.as_deref(), Some("message" | "tool_result")) {
769            retained_messages += 1;
770            if retained_messages == keep_first {
771                return index + 1;
772            }
773        }
774    }
775    events.len()
776}
777
778#[cfg(test)]
779mod tests {
780    use super::*;
781    use crate::agent_events::{
782        emit_event, reset_all_sinks, session_external_sink_count, AgentEvent,
783    };
784    use crate::event_log::{active_event_log, EventLog, Topic};
785    use std::collections::BTreeMap;
786
787    fn make_msg(role: &str, content: &str) -> VmValue {
788        let mut m: BTreeMap<String, VmValue> = BTreeMap::new();
789        m.insert("role".to_string(), VmValue::String(Rc::from(role)));
790        m.insert("content".to_string(), VmValue::String(Rc::from(content)));
791        VmValue::Dict(Rc::new(m))
792    }
793
794    fn message_count(id: &str) -> usize {
795        SESSIONS.with(|s| {
796            let map = s.borrow();
797            let Some(state) = map.get(id) else { return 0 };
798            let Some(dict) = state.transcript.as_dict() else {
799                return 0;
800            };
801            match dict.get("messages") {
802                Some(VmValue::List(list)) => list.len(),
803                _ => 0,
804            }
805        })
806    }
807
808    #[test]
809    fn fork_at_truncates_destination_to_keep_first() {
810        reset_session_store();
811        let src = open_or_create(Some("src-fork-at".into()));
812        inject_message(&src, make_msg("user", "a")).unwrap();
813        inject_message(&src, make_msg("assistant", "b")).unwrap();
814        inject_message(&src, make_msg("user", "c")).unwrap();
815        inject_message(&src, make_msg("assistant", "d")).unwrap();
816        assert_eq!(message_count(&src), 4);
817
818        let dst = fork_at(&src, 2, Some("dst-fork-at".into())).expect("fork_at");
819        assert_ne!(dst, src);
820        assert_eq!(message_count(&dst), 2, "branched at message index 2");
821        assert_eq!(
822            snapshot(&dst)
823                .and_then(|value| value.as_dict().cloned())
824                .and_then(|dict| dict
825                    .get("branched_at_event_index")
826                    .and_then(VmValue::as_int)),
827            Some(2)
828        );
829        // Source untouched.
830        assert_eq!(message_count(&src), 4);
831        // Subscribers not carried — forks start with a clean fanout list.
832        assert_eq!(subscriber_count(&dst), 0);
833        reset_session_store();
834    }
835
836    #[test]
837    fn fork_at_on_unknown_source_returns_none() {
838        reset_session_store();
839        assert!(fork_at("does-not-exist", 3, None).is_none());
840    }
841
842    #[test]
843    fn child_sessions_record_parent_lineage() {
844        reset_session_store();
845        let parent = open_or_create(Some("parent-session".into()));
846        let child = open_child_session(&parent, Some("child-session".into()));
847        assert_eq!(parent_id(&child).as_deref(), Some("parent-session"));
848        assert_eq!(child_ids(&parent), vec!["child-session".to_string()]);
849        assert_eq!(
850            ancestry(&child),
851            Some(SessionAncestry {
852                parent_id: Some("parent-session".to_string()),
853                child_ids: Vec::new(),
854                root_id: "parent-session".to_string(),
855            })
856        );
857
858        let transcript = snapshot(&child).expect("child transcript");
859        let transcript = transcript.as_dict().expect("child snapshot");
860        let metadata = transcript
861            .get("metadata")
862            .and_then(|value| value.as_dict())
863            .expect("child metadata");
864        assert!(
865            matches!(transcript.get("parent_id"), Some(VmValue::String(value)) if value.as_ref() == "parent-session")
866        );
867        assert!(
868            matches!(transcript.get("child_ids"), Some(VmValue::List(children)) if children.is_empty())
869        );
870        assert!(matches!(
871            transcript.get("branched_at_event_index"),
872            Some(VmValue::Nil)
873        ));
874        assert!(matches!(
875            metadata.get("parent_session_id"),
876            Some(VmValue::String(value)) if value.as_ref() == "parent-session"
877        ));
878    }
879
880    #[test]
881    fn branch_event_index_counts_non_message_events() {
882        reset_session_store();
883        let src = open_or_create(Some("branch-event-index".into()));
884        let transcript = VmValue::Dict(Rc::new(BTreeMap::from([
885            ("id".to_string(), VmValue::String(Rc::from(src.clone()))),
886            (
887                "messages".to_string(),
888                VmValue::List(Rc::new(vec![
889                    make_msg("user", "a"),
890                    make_msg("assistant", "b"),
891                ])),
892            ),
893            (
894                "events".to_string(),
895                VmValue::List(Rc::new(vec![
896                    VmValue::Dict(Rc::new(BTreeMap::from([(
897                        "kind".to_string(),
898                        VmValue::String(Rc::from("message")),
899                    )]))),
900                    VmValue::Dict(Rc::new(BTreeMap::from([(
901                        "kind".to_string(),
902                        VmValue::String(Rc::from("sub_agent_start")),
903                    )]))),
904                    VmValue::Dict(Rc::new(BTreeMap::from([(
905                        "kind".to_string(),
906                        VmValue::String(Rc::from("message")),
907                    )]))),
908                ])),
909            ),
910        ])));
911        store_transcript(&src, transcript);
912
913        let dst = fork_at(&src, 2, Some("branch-event-index-child".into())).expect("fork_at");
914        assert_eq!(
915            snapshot(&dst)
916                .and_then(|value| value.as_dict().cloned())
917                .and_then(|dict| dict
918                    .get("branched_at_event_index")
919                    .and_then(VmValue::as_int)),
920            Some(3)
921        );
922    }
923
924    #[test]
925    fn child_session_forks_parent_transcript() {
926        reset_session_store();
927        let parent = open_or_create(Some("parent-fork-parent".into()));
928        inject_message(&parent, make_msg("user", "parent context")).unwrap();
929
930        let child = open_child_session(&parent, Some("parent-fork-child".into()));
931        assert_eq!(message_count(&parent), 1);
932        assert_eq!(message_count(&child), 1);
933
934        let child_messages = messages_json(&child);
935        assert_eq!(
936            child_messages[0]["content"].as_str(),
937            Some("parent context"),
938        );
939    }
940
941    #[test]
942    fn prompt_state_prepends_summary_message_when_missing_from_messages() {
943        reset_session_store();
944        let session = open_or_create(Some("prompt-state-summary".into()));
945        let transcript = crate::llm::helpers::new_transcript_with_events(
946            Some(session.clone()),
947            vec![make_msg("assistant", "latest answer")],
948            Some("[auto-compacted 2 older messages]\nsummary".to_string()),
949            None,
950            Vec::new(),
951            Vec::new(),
952            Some("active"),
953        );
954        store_transcript(&session, transcript);
955
956        let prompt = prompt_state_json(&session);
957        assert_eq!(
958            prompt.summary.as_deref(),
959            Some("[auto-compacted 2 older messages]\nsummary")
960        );
961        assert_eq!(prompt.messages.len(), 2);
962        assert_eq!(prompt.messages[0]["role"].as_str(), Some("user"));
963        assert_eq!(
964            prompt.messages[0]["content"].as_str(),
965            Some("[auto-compacted 2 older messages]\nsummary"),
966        );
967        assert_eq!(prompt.messages[1]["role"].as_str(), Some("assistant"));
968    }
969
970    #[tokio::test(flavor = "current_thread")]
971    async fn open_or_create_registers_event_log_sink_when_active_log_is_installed() {
972        reset_all_sinks();
973        crate::event_log::reset_active_event_log();
974        let dir = tempfile::tempdir().expect("tempdir");
975        crate::event_log::install_default_for_base_dir(dir.path()).expect("install event log");
976
977        let session = open_or_create(Some("event-log-session".into()));
978        assert_eq!(session_external_sink_count(&session), 1);
979
980        emit_event(&AgentEvent::TurnStart {
981            session_id: session.clone(),
982            iteration: 0,
983        });
984        tokio::time::sleep(std::time::Duration::from_millis(25)).await;
985
986        let topic = Topic::new("observability.agent_events.event-log-session").unwrap();
987        let log = active_event_log().expect("active event log");
988        let events = log.read_range(&topic, None, usize::MAX).await.unwrap();
989        assert_eq!(events.len(), 1);
990        assert_eq!(events[0].1.kind, "turn_start");
991
992        crate::event_log::reset_active_event_log();
993        reset_all_sinks();
994    }
995}