Skip to main content

harn_vm/
agent_sessions.rs

1//! First-class session storage.
2//!
3//! A session owns three things:
4//!
5//! 1. A transcript dict (messages, events, summary, metadata, …).
6//! 2. Closure subscribers that fire on agent-loop events for this session.
7//! 3. Its own lifecycle (open, reset, fork, trim, compact, close).
8//!
9//! Storage is thread-local because `VmValue` contains `Rc`, which is
10//! neither `Send` nor `Sync`. The agent loop runs on a tokio
11//! current-thread worker, so all session reads and writes happen on the
12//! same thread. The closure-subscribers register, fire, and unregister
13//! on that same thread.
14//!
15//! Lifecycle is explicit. Builtins (`agent_session_open`,
16//! `_reset`, `_fork`, `_close`, `_trim`, `_compact`, `_inject`,
17//! `_exists`, `_length`, `_snapshot`) drive the store directly — there
18//! is no "policy" config dict that performs lifecycle as a side effect.
19
20use std::cell::{Cell, RefCell};
21use std::collections::{BTreeMap, HashMap};
22use std::rc::Rc;
23use std::time::Instant;
24
25use crate::value::VmValue;
26
27/// Default cap on concurrent sessions per VM thread. Beyond this the
28/// least-recently-accessed session is evicted on the next `open`.
29pub const DEFAULT_SESSION_CAP: usize = 128;
30
31pub struct SessionState {
32    pub id: String,
33    pub transcript: VmValue,
34    pub subscribers: Vec<VmValue>,
35    pub created_at: Instant,
36    pub last_accessed: Instant,
37    pub parent_id: Option<String>,
38    pub child_ids: Vec<String>,
39    /// Names of skills that were active at the end of the most recent
40    /// `agent_loop` run on this session. Empty when no skills were
41    /// matched, when the skill system wasn't used, or when the
42    /// deactivation phase cleared them. Re-entering the session
43    /// restores these as the initial active set before matching runs.
44    pub active_skills: Vec<String>,
45}
46
47impl SessionState {
48    fn new(id: String) -> Self {
49        let now = Instant::now();
50        let transcript = empty_transcript(&id);
51        Self {
52            id,
53            transcript,
54            subscribers: Vec::new(),
55            created_at: now,
56            last_accessed: now,
57            parent_id: None,
58            child_ids: Vec::new(),
59            active_skills: Vec::new(),
60        }
61    }
62}
63
64thread_local! {
65    static SESSIONS: RefCell<HashMap<String, SessionState>> = RefCell::new(HashMap::new());
66    static SESSION_CAP: Cell<usize> = const { Cell::new(DEFAULT_SESSION_CAP) };
67    static CURRENT_SESSION_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
68}
69
70/// Set the per-thread session cap. Primarily for tests; production VMs
71/// inherit the default.
72pub fn set_session_cap(cap: usize) {
73    SESSION_CAP.with(|c| c.set(cap.max(1)));
74}
75
76pub fn session_cap() -> usize {
77    SESSION_CAP.with(|c| c.get())
78}
79
80/// Clear the session store. Wired into `reset_llm_state` for test isolation.
81pub fn reset_session_store() {
82    SESSIONS.with(|s| s.borrow_mut().clear());
83    CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().clear());
84}
85
86pub(crate) fn push_current_session(id: String) {
87    if id.is_empty() {
88        return;
89    }
90    CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().push(id));
91}
92
93pub(crate) fn pop_current_session() {
94    CURRENT_SESSION_STACK.with(|stack| {
95        let _ = stack.borrow_mut().pop();
96    });
97}
98
99pub(crate) fn current_session_id() -> Option<String> {
100    CURRENT_SESSION_STACK.with(|stack| stack.borrow().last().cloned())
101}
102
103pub fn exists(id: &str) -> bool {
104    SESSIONS.with(|s| s.borrow().contains_key(id))
105}
106
107pub fn length(id: &str) -> Option<usize> {
108    SESSIONS.with(|s| {
109        s.borrow().get(id).map(|state| {
110            state
111                .transcript
112                .as_dict()
113                .and_then(|d| d.get("messages"))
114                .and_then(|v| match v {
115                    VmValue::List(list) => Some(list.len()),
116                    _ => None,
117                })
118                .unwrap_or(0)
119        })
120    })
121}
122
123pub fn snapshot(id: &str) -> Option<VmValue> {
124    SESSIONS.with(|s| s.borrow().get(id).map(|state| state.transcript.clone()))
125}
126
127/// Open a session, or create it if missing. Returns the resolved id.
128///
129/// When the `HARN_EVENT_LOG_DIR` environment variable points at an
130/// existing (or creatable) directory, a [`JsonlEventSink`] is
131/// auto-registered against the newly-created session so the agent
132/// loop's AgentEvent stream persists to `event_log-<session_id>.jsonl`.
133/// Re-opening an existing session does not re-register — sinks are
134/// per-session, owned by the first opener.
135pub fn open_or_create(id: Option<String>) -> String {
136    let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
137    let mut was_new = false;
138    SESSIONS.with(|s| {
139        let mut map = s.borrow_mut();
140        if let Some(state) = map.get_mut(&resolved) {
141            state.last_accessed = Instant::now();
142            return;
143        }
144        was_new = true;
145        let cap = SESSION_CAP.with(|c| c.get());
146        if map.len() >= cap {
147            if let Some(victim) = map
148                .iter()
149                .min_by_key(|(_, state)| state.last_accessed)
150                .map(|(id, _)| id.clone())
151            {
152                map.remove(&victim);
153            }
154        }
155        map.insert(resolved.clone(), SessionState::new(resolved.clone()));
156    });
157    if was_new {
158        try_register_jsonl_event_log(&resolved);
159    }
160    resolved
161}
162
163pub fn open_child_session(parent_id: &str, id: Option<String>) -> String {
164    let resolved = fork(parent_id, id.clone()).unwrap_or_else(|| open_or_create(id));
165    link_child_session(parent_id, &resolved);
166    resolved
167}
168
169pub fn link_child_session(parent_id: &str, child_id: &str) {
170    if parent_id == child_id {
171        return;
172    }
173    open_or_create(Some(parent_id.to_string()));
174    open_or_create(Some(child_id.to_string()));
175    SESSIONS.with(|s| {
176        let mut map = s.borrow_mut();
177        if let Some(parent) = map.get_mut(parent_id) {
178            parent.last_accessed = Instant::now();
179            if !parent.child_ids.iter().any(|id| id == child_id) {
180                parent.child_ids.push(child_id.to_string());
181            }
182        }
183        if let Some(child) = map.get_mut(child_id) {
184            child.last_accessed = Instant::now();
185            child.parent_id = Some(parent_id.to_string());
186            child.transcript = clone_transcript_with_parent(&child.transcript, parent_id);
187        }
188    });
189}
190
191pub fn parent_id(id: &str) -> Option<String> {
192    SESSIONS.with(|s| s.borrow().get(id).and_then(|state| state.parent_id.clone()))
193}
194
195pub fn child_ids(id: &str) -> Vec<String> {
196    SESSIONS.with(|s| {
197        s.borrow()
198            .get(id)
199            .map(|state| state.child_ids.clone())
200            .unwrap_or_default()
201    })
202}
203
204/// Auto-register a [`JsonlEventSink`] for a newly-created session
205/// when `HARN_EVENT_LOG_DIR` is set. Silent no-op when the env var
206/// is missing or the file can't be opened — a broken log sink must
207/// never prevent a session from starting.
208fn try_register_jsonl_event_log(session_id: &str) {
209    let Ok(dir) = std::env::var("HARN_EVENT_LOG_DIR") else {
210        return;
211    };
212    if dir.is_empty() {
213        return;
214    }
215    let path = std::path::PathBuf::from(dir).join(format!("event_log-{session_id}.jsonl"));
216    if let Ok(sink) = crate::agent_events::JsonlEventSink::open(&path) {
217        crate::agent_events::register_sink(session_id, sink);
218    }
219}
220
221pub fn close(id: &str) {
222    SESSIONS.with(|s| {
223        s.borrow_mut().remove(id);
224    });
225}
226
227pub fn reset_transcript(id: &str) -> bool {
228    SESSIONS.with(|s| {
229        let mut map = s.borrow_mut();
230        let Some(state) = map.get_mut(id) else {
231            return false;
232        };
233        state.transcript = empty_transcript(id);
234        state.last_accessed = Instant::now();
235        true
236    })
237}
238
239/// Copy `src`'s transcript into a new session id. Subscribers are NOT
240/// copied — a fork is a conversation branch, not an event fanout.
241///
242/// Touches `src`'s `last_accessed` before evicting, so the fork
243/// operation itself can't make `src` look stale and kick it out of
244/// the LRU just to make room for the new fork.
245pub fn fork(src_id: &str, dst_id: Option<String>) -> Option<String> {
246    let (src_transcript, dst) = SESSIONS.with(|s| {
247        let mut map = s.borrow_mut();
248        let src = map.get_mut(src_id)?;
249        src.last_accessed = Instant::now();
250        let dst = dst_id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
251        let forked_transcript = clone_transcript_with_id(&src.transcript, &dst);
252        Some((forked_transcript, dst))
253    })?;
254    // Ensure cap is respected when inserting the fork.
255    open_or_create(Some(dst.clone()));
256    SESSIONS.with(|s| {
257        if let Some(state) = s.borrow_mut().get_mut(&dst) {
258            state.transcript = src_transcript;
259            state.last_accessed = Instant::now();
260        }
261    });
262    // open_or_create evicts BEFORE inserting, so the dst slot is
263    // guaranteed once we get here. The existence check is cheap
264    // insurance against a future refactor that breaks that invariant.
265    if exists(&dst) {
266        Some(dst)
267    } else {
268        None
269    }
270}
271
272/// Fork `src_id` and truncate the destination transcript to the
273/// first `keep_first` messages (#105 — branch-replay). Pairs with the
274/// scrubber: the IDE picks an event index, rebuilds a message count,
275/// and calls this to spawn a live sibling session that resumes from
276/// the rebuilt state. Subscribers are not carried over (same as
277/// `fork`), so sibling events don't double-fan into the parent's
278/// consumers.
279///
280/// Returns the new session id on success, `None` if `src_id` doesn't
281/// exist.
282pub fn fork_at(src_id: &str, keep_first: usize, dst_id: Option<String>) -> Option<String> {
283    let new_id = fork(src_id, dst_id)?;
284    retain_first(&new_id, keep_first);
285    Some(new_id)
286}
287
288/// Truncate the session transcript to the first `keep_first`
289/// messages (opposite of `trim`, which keeps the last N). Used by
290/// `fork_at` to cut a branch at a scrubber position.
291fn retain_first(id: &str, keep_first: usize) {
292    SESSIONS.with(|s| {
293        let mut map = s.borrow_mut();
294        let Some(state) = map.get_mut(id) else {
295            return;
296        };
297        let Some(dict) = state.transcript.as_dict() else {
298            return;
299        };
300        let dict = dict.clone();
301        let messages: Vec<VmValue> = match dict.get("messages") {
302            Some(VmValue::List(list)) => list.iter().cloned().collect(),
303            _ => Vec::new(),
304        };
305        let retained: Vec<VmValue> = messages.into_iter().take(keep_first).collect();
306        let mut next = dict;
307        next.insert(
308            "events".to_string(),
309            VmValue::List(Rc::new(
310                crate::llm::helpers::transcript_events_from_messages(&retained),
311            )),
312        );
313        next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
314        state.transcript = VmValue::Dict(Rc::new(next));
315        state.last_accessed = Instant::now();
316    });
317}
318
319/// Retain only the last `keep_last` messages in the session transcript.
320/// Returns the kept count (<= keep_last).
321pub fn trim(id: &str, keep_last: usize) -> Option<usize> {
322    SESSIONS.with(|s| {
323        let mut map = s.borrow_mut();
324        let state = map.get_mut(id)?;
325        let dict = state.transcript.as_dict()?.clone();
326        let messages: Vec<VmValue> = match dict.get("messages") {
327            Some(VmValue::List(list)) => list.iter().cloned().collect(),
328            _ => Vec::new(),
329        };
330        let start = messages.len().saturating_sub(keep_last);
331        let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
332        let kept = retained.len();
333        let mut next = dict;
334        next.insert(
335            "events".to_string(),
336            VmValue::List(Rc::new(
337                crate::llm::helpers::transcript_events_from_messages(&retained),
338            )),
339        );
340        next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
341        state.transcript = VmValue::Dict(Rc::new(next));
342        state.last_accessed = Instant::now();
343        Some(kept)
344    })
345}
346
347/// Append a message dict to the session transcript. The message must
348/// have at least a string `role`; anything else is merged verbatim.
349pub fn inject_message(id: &str, message: VmValue) -> Result<(), String> {
350    let Some(msg_dict) = message.as_dict().cloned() else {
351        return Err("agent_session_inject: message must be a dict".into());
352    };
353    let role_ok = matches!(msg_dict.get("role"), Some(VmValue::String(_)));
354    if !role_ok {
355        return Err(
356            "agent_session_inject: message must have a string `role` (user|assistant|tool_result|system)"
357                .into(),
358        );
359    }
360    SESSIONS.with(|s| {
361        let mut map = s.borrow_mut();
362        let Some(state) = map.get_mut(id) else {
363            return Err(format!("agent_session_inject: unknown session id '{id}'"));
364        };
365        let dict = state
366            .transcript
367            .as_dict()
368            .cloned()
369            .unwrap_or_else(BTreeMap::new);
370        let mut messages: Vec<VmValue> = match dict.get("messages") {
371            Some(VmValue::List(list)) => list.iter().cloned().collect(),
372            _ => Vec::new(),
373        };
374        messages.push(VmValue::Dict(Rc::new(msg_dict)));
375        let mut next = dict;
376        next.insert(
377            "events".to_string(),
378            VmValue::List(Rc::new(
379                crate::llm::helpers::transcript_events_from_messages(&messages),
380            )),
381        );
382        next.insert("messages".to_string(), VmValue::List(Rc::new(messages)));
383        state.transcript = VmValue::Dict(Rc::new(next));
384        state.last_accessed = Instant::now();
385        Ok(())
386    })
387}
388
389/// Load the messages vec (as JSON) for this session, for use as prefix
390/// to an agent_loop run. Returns an empty vec if the session doesn't
391/// exist or has no messages.
392pub fn messages_json(id: &str) -> Vec<serde_json::Value> {
393    SESSIONS.with(|s| {
394        let map = s.borrow();
395        let Some(state) = map.get(id) else {
396            return Vec::new();
397        };
398        let Some(dict) = state.transcript.as_dict() else {
399            return Vec::new();
400        };
401        match dict.get("messages") {
402            Some(VmValue::List(list)) => list
403                .iter()
404                .map(crate::llm::helpers::vm_value_to_json)
405                .collect(),
406            _ => Vec::new(),
407        }
408    })
409}
410
411#[derive(Clone, Debug, Default)]
412pub struct SessionPromptState {
413    pub messages: Vec<serde_json::Value>,
414    pub summary: Option<String>,
415}
416
417fn summary_message_json(summary: &str) -> serde_json::Value {
418    serde_json::json!({
419        "role": "user",
420        "content": summary,
421    })
422}
423
424fn messages_begin_with_summary(messages: &[serde_json::Value], summary: &str) -> bool {
425    messages.first().is_some_and(|message| {
426        message.get("role").and_then(|value| value.as_str()) == Some("user")
427            && message.get("content").and_then(|value| value.as_str()) == Some(summary)
428    })
429}
430
431/// Prompt-surface resume state for a persisted session.
432///
433/// Returns the compacted/rehydratable message list plus the transcript's
434/// summary field. When the transcript carries a summary field but its
435/// message list does not already begin with the compacted summary
436/// message, this helper prepends one so session re-entry preserves the
437/// same prompt surface the previous loop was actually using.
438pub fn prompt_state_json(id: &str) -> SessionPromptState {
439    SESSIONS.with(|s| {
440        let map = s.borrow();
441        let Some(state) = map.get(id) else {
442            return SessionPromptState::default();
443        };
444        let Some(dict) = state.transcript.as_dict() else {
445            return SessionPromptState::default();
446        };
447        let mut messages = match dict.get("messages") {
448            Some(VmValue::List(list)) => list
449                .iter()
450                .map(crate::llm::helpers::vm_value_to_json)
451                .collect::<Vec<_>>(),
452            _ => Vec::new(),
453        };
454        let summary = dict.get("summary").and_then(|value| match value {
455            VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
456            _ => None,
457        });
458        if let Some(summary_text) = summary.as_deref() {
459            if !messages_begin_with_summary(&messages, summary_text) {
460                messages.insert(0, summary_message_json(summary_text));
461            }
462        }
463        SessionPromptState { messages, summary }
464    })
465}
466
467/// Overwrite the transcript for this session. Used by `agent_loop` on
468/// exit to persist the synthesized transcript.
469pub fn store_transcript(id: &str, transcript: VmValue) {
470    SESSIONS.with(|s| {
471        if let Some(state) = s.borrow_mut().get_mut(id) {
472            state.transcript = transcript;
473            state.last_accessed = Instant::now();
474        }
475    });
476}
477
478/// Append a transcript event to the session without mutating its
479/// message list. Used for orchestration-side lineage events (sub-agent
480/// spawn/completion, workflow hooks, etc.) that should survive
481/// persistence/replay without being replayed back into the model as
482/// conversational messages.
483pub fn append_event(id: &str, event: VmValue) -> Result<(), String> {
484    let Some(event_dict) = event.as_dict() else {
485        return Err("agent_session_append_event: event must be a dict".into());
486    };
487    let kind_ok = matches!(event_dict.get("kind"), Some(VmValue::String(_)));
488    if !kind_ok {
489        return Err("agent_session_append_event: event must have a string `kind`".into());
490    }
491    SESSIONS.with(|s| {
492        let mut map = s.borrow_mut();
493        let Some(state) = map.get_mut(id) else {
494            return Err(format!(
495                "agent_session_append_event: unknown session id '{id}'"
496            ));
497        };
498        let dict = state
499            .transcript
500            .as_dict()
501            .cloned()
502            .unwrap_or_else(BTreeMap::new);
503        let mut events: Vec<VmValue> = match dict.get("events") {
504            Some(VmValue::List(list)) => list.iter().cloned().collect(),
505            _ => dict
506                .get("messages")
507                .and_then(|value| match value {
508                    VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
509                    _ => None,
510                })
511                .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
512                .unwrap_or_default(),
513        };
514        events.push(event);
515        let mut next = dict;
516        next.insert("events".to_string(), VmValue::List(Rc::new(events)));
517        state.transcript = VmValue::Dict(Rc::new(next));
518        state.last_accessed = Instant::now();
519        Ok(())
520    })
521}
522
523/// Replace the transcript's message list wholesale. Used by the
524/// in-loop compaction path, which operates on JSON messages.
525pub fn replace_messages(id: &str, messages: &[serde_json::Value]) {
526    SESSIONS.with(|s| {
527        let mut map = s.borrow_mut();
528        let Some(state) = map.get_mut(id) else {
529            return;
530        };
531        let dict = state
532            .transcript
533            .as_dict()
534            .cloned()
535            .unwrap_or_else(BTreeMap::new);
536        let vm_messages: Vec<VmValue> = messages
537            .iter()
538            .map(crate::stdlib::json_to_vm_value)
539            .collect();
540        let mut next = dict;
541        next.insert(
542            "events".to_string(),
543            VmValue::List(Rc::new(
544                crate::llm::helpers::transcript_events_from_messages(&vm_messages),
545            )),
546        );
547        next.insert("messages".to_string(), VmValue::List(Rc::new(vm_messages)));
548        state.transcript = VmValue::Dict(Rc::new(next));
549        state.last_accessed = Instant::now();
550    });
551}
552
553pub fn append_subscriber(id: &str, callback: VmValue) {
554    open_or_create(Some(id.to_string()));
555    SESSIONS.with(|s| {
556        if let Some(state) = s.borrow_mut().get_mut(id) {
557            state.subscribers.push(callback);
558            state.last_accessed = Instant::now();
559        }
560    });
561}
562
563pub fn subscribers_for(id: &str) -> Vec<VmValue> {
564    SESSIONS.with(|s| {
565        s.borrow()
566            .get(id)
567            .map(|state| state.subscribers.clone())
568            .unwrap_or_default()
569    })
570}
571
572pub fn subscriber_count(id: &str) -> usize {
573    SESSIONS.with(|s| {
574        s.borrow()
575            .get(id)
576            .map(|state| state.subscribers.len())
577            .unwrap_or(0)
578    })
579}
580
581/// Persist the set of active skill names for session resume. Called at
582/// the end of an agent_loop run; the next `open_or_create` for this id
583/// reads them back via [`active_skills`].
584pub fn set_active_skills(id: &str, skills: Vec<String>) {
585    SESSIONS.with(|s| {
586        if let Some(state) = s.borrow_mut().get_mut(id) {
587            state.active_skills = skills;
588            state.last_accessed = Instant::now();
589        }
590    });
591}
592
593/// Skills that were active at the end of the previous agent_loop run
594/// against this session. Returns an empty vec when the session doesn't
595/// exist or nothing was persisted.
596pub fn active_skills(id: &str) -> Vec<String> {
597    SESSIONS.with(|s| {
598        s.borrow()
599            .get(id)
600            .map(|state| state.active_skills.clone())
601            .unwrap_or_default()
602    })
603}
604
605fn empty_transcript(id: &str) -> VmValue {
606    use crate::llm::helpers::new_transcript_with;
607    new_transcript_with(Some(id.to_string()), Vec::new(), None, None)
608}
609
610fn clone_transcript_with_id(transcript: &VmValue, new_id: &str) -> VmValue {
611    let Some(dict) = transcript.as_dict() else {
612        return empty_transcript(new_id);
613    };
614    let mut next = dict.clone();
615    next.insert(
616        "id".to_string(),
617        VmValue::String(Rc::from(new_id.to_string())),
618    );
619    VmValue::Dict(Rc::new(next))
620}
621
622fn clone_transcript_with_parent(transcript: &VmValue, parent_id: &str) -> VmValue {
623    let Some(dict) = transcript.as_dict() else {
624        return transcript.clone();
625    };
626    let mut next = dict.clone();
627    let metadata = match next.get("metadata") {
628        Some(VmValue::Dict(metadata)) => {
629            let mut metadata = metadata.as_ref().clone();
630            metadata.insert(
631                "parent_session_id".to_string(),
632                VmValue::String(Rc::from(parent_id.to_string())),
633            );
634            VmValue::Dict(Rc::new(metadata))
635        }
636        _ => VmValue::Dict(Rc::new(BTreeMap::from([(
637            "parent_session_id".to_string(),
638            VmValue::String(Rc::from(parent_id.to_string())),
639        )]))),
640    };
641    next.insert("metadata".to_string(), metadata);
642    VmValue::Dict(Rc::new(next))
643}
644
645#[cfg(test)]
646mod tests {
647    use super::*;
648    use std::collections::BTreeMap;
649
650    fn make_msg(role: &str, content: &str) -> VmValue {
651        let mut m: BTreeMap<String, VmValue> = BTreeMap::new();
652        m.insert("role".to_string(), VmValue::String(Rc::from(role)));
653        m.insert("content".to_string(), VmValue::String(Rc::from(content)));
654        VmValue::Dict(Rc::new(m))
655    }
656
657    fn message_count(id: &str) -> usize {
658        SESSIONS.with(|s| {
659            let map = s.borrow();
660            let Some(state) = map.get(id) else { return 0 };
661            let Some(dict) = state.transcript.as_dict() else {
662                return 0;
663            };
664            match dict.get("messages") {
665                Some(VmValue::List(list)) => list.len(),
666                _ => 0,
667            }
668        })
669    }
670
671    #[test]
672    fn fork_at_truncates_destination_to_keep_first() {
673        reset_session_store();
674        let src = open_or_create(Some("src-fork-at".into()));
675        inject_message(&src, make_msg("user", "a")).unwrap();
676        inject_message(&src, make_msg("assistant", "b")).unwrap();
677        inject_message(&src, make_msg("user", "c")).unwrap();
678        inject_message(&src, make_msg("assistant", "d")).unwrap();
679        assert_eq!(message_count(&src), 4);
680
681        let dst = fork_at(&src, 2, Some("dst-fork-at".into())).expect("fork_at");
682        assert_ne!(dst, src);
683        assert_eq!(message_count(&dst), 2, "branched at message index 2");
684        // Source untouched.
685        assert_eq!(message_count(&src), 4);
686        // Subscribers not carried — forks start with a clean fanout list.
687        assert_eq!(subscriber_count(&dst), 0);
688        reset_session_store();
689    }
690
691    #[test]
692    fn fork_at_on_unknown_source_returns_none() {
693        reset_session_store();
694        assert!(fork_at("does-not-exist", 3, None).is_none());
695    }
696
697    #[test]
698    fn child_sessions_record_parent_lineage() {
699        reset_session_store();
700        let parent = open_or_create(Some("parent-session".into()));
701        let child = open_child_session(&parent, Some("child-session".into()));
702        assert_eq!(parent_id(&child).as_deref(), Some("parent-session"));
703        assert_eq!(child_ids(&parent), vec!["child-session".to_string()]);
704
705        let transcript = snapshot(&child).expect("child transcript");
706        let metadata = transcript
707            .as_dict()
708            .and_then(|dict| dict.get("metadata"))
709            .and_then(|value| value.as_dict())
710            .expect("child metadata");
711        assert!(matches!(
712            metadata.get("parent_session_id"),
713            Some(VmValue::String(value)) if value.as_ref() == "parent-session"
714        ));
715    }
716
717    #[test]
718    fn child_session_forks_parent_transcript() {
719        reset_session_store();
720        let parent = open_or_create(Some("parent-fork-parent".into()));
721        inject_message(&parent, make_msg("user", "parent context")).unwrap();
722
723        let child = open_child_session(&parent, Some("parent-fork-child".into()));
724        assert_eq!(message_count(&parent), 1);
725        assert_eq!(message_count(&child), 1);
726
727        let child_messages = messages_json(&child);
728        assert_eq!(
729            child_messages[0]["content"].as_str(),
730            Some("parent context"),
731        );
732    }
733
734    #[test]
735    fn prompt_state_prepends_summary_message_when_missing_from_messages() {
736        reset_session_store();
737        let session = open_or_create(Some("prompt-state-summary".into()));
738        let transcript = crate::llm::helpers::new_transcript_with_events(
739            Some(session.clone()),
740            vec![make_msg("assistant", "latest answer")],
741            Some("[auto-compacted 2 older messages]\nsummary".to_string()),
742            None,
743            Vec::new(),
744            Vec::new(),
745            Some("active"),
746        );
747        store_transcript(&session, transcript);
748
749        let prompt = prompt_state_json(&session);
750        assert_eq!(
751            prompt.summary.as_deref(),
752            Some("[auto-compacted 2 older messages]\nsummary")
753        );
754        assert_eq!(prompt.messages.len(), 2);
755        assert_eq!(prompt.messages[0]["role"].as_str(), Some("user"));
756        assert_eq!(
757            prompt.messages[0]["content"].as_str(),
758            Some("[auto-compacted 2 older messages]\nsummary"),
759        );
760        assert_eq!(prompt.messages[1]["role"].as_str(), Some("assistant"));
761    }
762}