Skip to main content

harn_vm/
agent_sessions.rs

1//! First-class session storage.
2//!
3//! A session owns three things:
4//!
5//! 1. A transcript dict (messages, events, summary, metadata, …).
6//! 2. Closure subscribers that fire on agent-loop events for this session.
7//! 3. Its own lifecycle (open, reset, fork, trim, compact, close).
8//!
9//! Storage is thread-local because `VmValue` contains `Rc`, which is
10//! neither `Send` nor `Sync`. The agent loop runs on a tokio
11//! current-thread worker, so all session reads and writes happen on the
12//! same thread. The closure-subscribers register, fire, and unregister
13//! on that same thread.
14//!
15//! Lifecycle is explicit. Builtins (`agent_session_open`,
16//! `_reset`, `_fork`, `_close`, `_trim`, `_compact`, `_inject`,
17//! `_exists`, `_length`, `_snapshot`) drive the store directly — there
18//! is no "policy" config dict that performs lifecycle as a side effect.
19
20use std::cell::{Cell, RefCell};
21use std::collections::{BTreeMap, HashMap};
22use std::rc::Rc;
23use std::time::Instant;
24
25use crate::value::VmValue;
26
27/// Default cap on concurrent sessions per VM thread. Beyond this the
28/// least-recently-accessed session is evicted on the next `open`.
29pub const DEFAULT_SESSION_CAP: usize = 128;
30
31pub struct SessionState {
32    pub id: String,
33    pub transcript: VmValue,
34    pub subscribers: Vec<VmValue>,
35    pub created_at: Instant,
36    pub last_accessed: Instant,
37    pub parent_id: Option<String>,
38    pub child_ids: Vec<String>,
39    /// Names of skills that were active at the end of the most recent
40    /// `agent_loop` run on this session. Empty when no skills were
41    /// matched, when the skill system wasn't used, or when the
42    /// deactivation phase cleared them. Re-entering the session
43    /// restores these as the initial active set before matching runs.
44    pub active_skills: Vec<String>,
45}
46
47impl SessionState {
48    fn new(id: String) -> Self {
49        let now = Instant::now();
50        let transcript = empty_transcript(&id);
51        Self {
52            id,
53            transcript,
54            subscribers: Vec::new(),
55            created_at: now,
56            last_accessed: now,
57            parent_id: None,
58            child_ids: Vec::new(),
59            active_skills: Vec::new(),
60        }
61    }
62}
63
64thread_local! {
65    static SESSIONS: RefCell<HashMap<String, SessionState>> = RefCell::new(HashMap::new());
66    static SESSION_CAP: Cell<usize> = const { Cell::new(DEFAULT_SESSION_CAP) };
67    static CURRENT_SESSION_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
68}
69
70/// Set the per-thread session cap. Primarily for tests; production VMs
71/// inherit the default.
72pub fn set_session_cap(cap: usize) {
73    SESSION_CAP.with(|c| c.set(cap.max(1)));
74}
75
76pub fn session_cap() -> usize {
77    SESSION_CAP.with(|c| c.get())
78}
79
80/// Clear the session store. Wired into `reset_llm_state` for test isolation.
81pub fn reset_session_store() {
82    SESSIONS.with(|s| s.borrow_mut().clear());
83    CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().clear());
84}
85
86pub(crate) fn push_current_session(id: String) {
87    if id.is_empty() {
88        return;
89    }
90    CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().push(id));
91}
92
93pub(crate) fn pop_current_session() {
94    CURRENT_SESSION_STACK.with(|stack| {
95        let _ = stack.borrow_mut().pop();
96    });
97}
98
99pub(crate) fn current_session_id() -> Option<String> {
100    CURRENT_SESSION_STACK.with(|stack| stack.borrow().last().cloned())
101}
102
103pub fn exists(id: &str) -> bool {
104    SESSIONS.with(|s| s.borrow().contains_key(id))
105}
106
107pub fn length(id: &str) -> Option<usize> {
108    SESSIONS.with(|s| {
109        s.borrow().get(id).map(|state| {
110            state
111                .transcript
112                .as_dict()
113                .and_then(|d| d.get("messages"))
114                .and_then(|v| match v {
115                    VmValue::List(list) => Some(list.len()),
116                    _ => None,
117                })
118                .unwrap_or(0)
119        })
120    })
121}
122
123pub fn snapshot(id: &str) -> Option<VmValue> {
124    SESSIONS.with(|s| s.borrow().get(id).map(|state| state.transcript.clone()))
125}
126
127/// Open a session, or create it if missing. Returns the resolved id.
128///
129/// When the `HARN_EVENT_LOG_DIR` environment variable points at an
130/// existing (or creatable) directory, a [`JsonlEventSink`] is
131/// auto-registered against the newly-created session so the agent
132/// loop's AgentEvent stream persists to `event_log-<session_id>.jsonl`.
133/// Re-opening an existing session does not re-register — sinks are
134/// per-session, owned by the first opener.
135pub fn open_or_create(id: Option<String>) -> String {
136    let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
137    let mut was_new = false;
138    SESSIONS.with(|s| {
139        let mut map = s.borrow_mut();
140        if let Some(state) = map.get_mut(&resolved) {
141            state.last_accessed = Instant::now();
142            return;
143        }
144        was_new = true;
145        let cap = SESSION_CAP.with(|c| c.get());
146        if map.len() >= cap {
147            if let Some(victim) = map
148                .iter()
149                .min_by_key(|(_, state)| state.last_accessed)
150                .map(|(id, _)| id.clone())
151            {
152                map.remove(&victim);
153            }
154        }
155        map.insert(resolved.clone(), SessionState::new(resolved.clone()));
156    });
157    if was_new {
158        try_register_jsonl_event_log(&resolved);
159    }
160    resolved
161}
162
163pub fn open_child_session(parent_id: &str, id: Option<String>) -> String {
164    let resolved = open_or_create(id);
165    link_child_session(parent_id, &resolved);
166    resolved
167}
168
169pub fn link_child_session(parent_id: &str, child_id: &str) {
170    if parent_id == child_id {
171        return;
172    }
173    open_or_create(Some(parent_id.to_string()));
174    open_or_create(Some(child_id.to_string()));
175    SESSIONS.with(|s| {
176        let mut map = s.borrow_mut();
177        if let Some(parent) = map.get_mut(parent_id) {
178            parent.last_accessed = Instant::now();
179            if !parent.child_ids.iter().any(|id| id == child_id) {
180                parent.child_ids.push(child_id.to_string());
181            }
182        }
183        if let Some(child) = map.get_mut(child_id) {
184            child.last_accessed = Instant::now();
185            child.parent_id = Some(parent_id.to_string());
186            child.transcript = clone_transcript_with_parent(&child.transcript, parent_id);
187        }
188    });
189}
190
191pub fn parent_id(id: &str) -> Option<String> {
192    SESSIONS.with(|s| s.borrow().get(id).and_then(|state| state.parent_id.clone()))
193}
194
195pub fn child_ids(id: &str) -> Vec<String> {
196    SESSIONS.with(|s| {
197        s.borrow()
198            .get(id)
199            .map(|state| state.child_ids.clone())
200            .unwrap_or_default()
201    })
202}
203
204/// Auto-register a [`JsonlEventSink`] for a newly-created session
205/// when `HARN_EVENT_LOG_DIR` is set. Silent no-op when the env var
206/// is missing or the file can't be opened — a broken log sink must
207/// never prevent a session from starting.
208fn try_register_jsonl_event_log(session_id: &str) {
209    let Ok(dir) = std::env::var("HARN_EVENT_LOG_DIR") else {
210        return;
211    };
212    if dir.is_empty() {
213        return;
214    }
215    let path = std::path::PathBuf::from(dir).join(format!("event_log-{session_id}.jsonl"));
216    if let Ok(sink) = crate::agent_events::JsonlEventSink::open(&path) {
217        crate::agent_events::register_sink(session_id, sink);
218    }
219}
220
221pub fn close(id: &str) {
222    SESSIONS.with(|s| {
223        s.borrow_mut().remove(id);
224    });
225}
226
227pub fn reset_transcript(id: &str) -> bool {
228    SESSIONS.with(|s| {
229        let mut map = s.borrow_mut();
230        let Some(state) = map.get_mut(id) else {
231            return false;
232        };
233        state.transcript = empty_transcript(id);
234        state.last_accessed = Instant::now();
235        true
236    })
237}
238
239/// Copy `src`'s transcript into a new session id. Subscribers are NOT
240/// copied — a fork is a conversation branch, not an event fanout.
241///
242/// Touches `src`'s `last_accessed` before evicting, so the fork
243/// operation itself can't make `src` look stale and kick it out of
244/// the LRU just to make room for the new fork.
245pub fn fork(src_id: &str, dst_id: Option<String>) -> Option<String> {
246    let (src_transcript, dst) = SESSIONS.with(|s| {
247        let mut map = s.borrow_mut();
248        let src = map.get_mut(src_id)?;
249        src.last_accessed = Instant::now();
250        let dst = dst_id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
251        let forked_transcript = clone_transcript_with_id(&src.transcript, &dst);
252        Some((forked_transcript, dst))
253    })?;
254    // Ensure cap is respected when inserting the fork.
255    open_or_create(Some(dst.clone()));
256    SESSIONS.with(|s| {
257        if let Some(state) = s.borrow_mut().get_mut(&dst) {
258            state.transcript = src_transcript;
259            state.last_accessed = Instant::now();
260        }
261    });
262    // open_or_create evicts BEFORE inserting, so the dst slot is
263    // guaranteed once we get here. The existence check is cheap
264    // insurance against a future refactor that breaks that invariant.
265    if exists(&dst) {
266        Some(dst)
267    } else {
268        None
269    }
270}
271
272/// Fork `src_id` and truncate the destination transcript to the
273/// first `keep_first` messages (#105 — branch-replay). Pairs with the
274/// scrubber: the IDE picks an event index, rebuilds a message count,
275/// and calls this to spawn a live sibling session that resumes from
276/// the rebuilt state. Subscribers are not carried over (same as
277/// `fork`), so sibling events don't double-fan into the parent's
278/// consumers.
279///
280/// Returns the new session id on success, `None` if `src_id` doesn't
281/// exist.
282pub fn fork_at(src_id: &str, keep_first: usize, dst_id: Option<String>) -> Option<String> {
283    let new_id = fork(src_id, dst_id)?;
284    retain_first(&new_id, keep_first);
285    Some(new_id)
286}
287
288/// Truncate the session transcript to the first `keep_first`
289/// messages (opposite of `trim`, which keeps the last N). Used by
290/// `fork_at` to cut a branch at a scrubber position.
291fn retain_first(id: &str, keep_first: usize) {
292    SESSIONS.with(|s| {
293        let mut map = s.borrow_mut();
294        let Some(state) = map.get_mut(id) else {
295            return;
296        };
297        let Some(dict) = state.transcript.as_dict() else {
298            return;
299        };
300        let dict = dict.clone();
301        let messages: Vec<VmValue> = match dict.get("messages") {
302            Some(VmValue::List(list)) => list.iter().cloned().collect(),
303            _ => Vec::new(),
304        };
305        let retained: Vec<VmValue> = messages.into_iter().take(keep_first).collect();
306        let mut next = dict;
307        next.insert(
308            "events".to_string(),
309            VmValue::List(Rc::new(
310                crate::llm::helpers::transcript_events_from_messages(&retained),
311            )),
312        );
313        next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
314        state.transcript = VmValue::Dict(Rc::new(next));
315        state.last_accessed = Instant::now();
316    });
317}
318
319/// Retain only the last `keep_last` messages in the session transcript.
320/// Returns the kept count (<= keep_last).
321pub fn trim(id: &str, keep_last: usize) -> Option<usize> {
322    SESSIONS.with(|s| {
323        let mut map = s.borrow_mut();
324        let state = map.get_mut(id)?;
325        let dict = state.transcript.as_dict()?.clone();
326        let messages: Vec<VmValue> = match dict.get("messages") {
327            Some(VmValue::List(list)) => list.iter().cloned().collect(),
328            _ => Vec::new(),
329        };
330        let start = messages.len().saturating_sub(keep_last);
331        let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
332        let kept = retained.len();
333        let mut next = dict;
334        next.insert(
335            "events".to_string(),
336            VmValue::List(Rc::new(
337                crate::llm::helpers::transcript_events_from_messages(&retained),
338            )),
339        );
340        next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
341        state.transcript = VmValue::Dict(Rc::new(next));
342        state.last_accessed = Instant::now();
343        Some(kept)
344    })
345}
346
347/// Append a message dict to the session transcript. The message must
348/// have at least a string `role`; anything else is merged verbatim.
349pub fn inject_message(id: &str, message: VmValue) -> Result<(), String> {
350    let Some(msg_dict) = message.as_dict().cloned() else {
351        return Err("agent_session_inject: message must be a dict".into());
352    };
353    let role_ok = matches!(msg_dict.get("role"), Some(VmValue::String(_)));
354    if !role_ok {
355        return Err(
356            "agent_session_inject: message must have a string `role` (user|assistant|tool_result|system)"
357                .into(),
358        );
359    }
360    SESSIONS.with(|s| {
361        let mut map = s.borrow_mut();
362        let Some(state) = map.get_mut(id) else {
363            return Err(format!("agent_session_inject: unknown session id '{id}'"));
364        };
365        let dict = state
366            .transcript
367            .as_dict()
368            .cloned()
369            .unwrap_or_else(BTreeMap::new);
370        let mut messages: Vec<VmValue> = match dict.get("messages") {
371            Some(VmValue::List(list)) => list.iter().cloned().collect(),
372            _ => Vec::new(),
373        };
374        messages.push(VmValue::Dict(Rc::new(msg_dict)));
375        let mut next = dict;
376        next.insert(
377            "events".to_string(),
378            VmValue::List(Rc::new(
379                crate::llm::helpers::transcript_events_from_messages(&messages),
380            )),
381        );
382        next.insert("messages".to_string(), VmValue::List(Rc::new(messages)));
383        state.transcript = VmValue::Dict(Rc::new(next));
384        state.last_accessed = Instant::now();
385        Ok(())
386    })
387}
388
389/// Load the messages vec (as JSON) for this session, for use as prefix
390/// to an agent_loop run. Returns an empty vec if the session doesn't
391/// exist or has no messages.
392pub fn messages_json(id: &str) -> Vec<serde_json::Value> {
393    SESSIONS.with(|s| {
394        let map = s.borrow();
395        let Some(state) = map.get(id) else {
396            return Vec::new();
397        };
398        let Some(dict) = state.transcript.as_dict() else {
399            return Vec::new();
400        };
401        match dict.get("messages") {
402            Some(VmValue::List(list)) => list
403                .iter()
404                .map(crate::llm::helpers::vm_value_to_json)
405                .collect(),
406            _ => Vec::new(),
407        }
408    })
409}
410
411/// Overwrite the transcript for this session. Used by `agent_loop` on
412/// exit to persist the synthesized transcript.
413pub fn store_transcript(id: &str, transcript: VmValue) {
414    SESSIONS.with(|s| {
415        if let Some(state) = s.borrow_mut().get_mut(id) {
416            state.transcript = transcript;
417            state.last_accessed = Instant::now();
418        }
419    });
420}
421
422/// Replace the transcript's message list wholesale. Used by the
423/// in-loop compaction path, which operates on JSON messages.
424pub fn replace_messages(id: &str, messages: &[serde_json::Value]) {
425    SESSIONS.with(|s| {
426        let mut map = s.borrow_mut();
427        let Some(state) = map.get_mut(id) else {
428            return;
429        };
430        let dict = state
431            .transcript
432            .as_dict()
433            .cloned()
434            .unwrap_or_else(BTreeMap::new);
435        let vm_messages: Vec<VmValue> = messages
436            .iter()
437            .map(crate::stdlib::json_to_vm_value)
438            .collect();
439        let mut next = dict;
440        next.insert(
441            "events".to_string(),
442            VmValue::List(Rc::new(
443                crate::llm::helpers::transcript_events_from_messages(&vm_messages),
444            )),
445        );
446        next.insert("messages".to_string(), VmValue::List(Rc::new(vm_messages)));
447        state.transcript = VmValue::Dict(Rc::new(next));
448        state.last_accessed = Instant::now();
449    });
450}
451
452pub fn append_subscriber(id: &str, callback: VmValue) {
453    open_or_create(Some(id.to_string()));
454    SESSIONS.with(|s| {
455        if let Some(state) = s.borrow_mut().get_mut(id) {
456            state.subscribers.push(callback);
457            state.last_accessed = Instant::now();
458        }
459    });
460}
461
462pub fn subscribers_for(id: &str) -> Vec<VmValue> {
463    SESSIONS.with(|s| {
464        s.borrow()
465            .get(id)
466            .map(|state| state.subscribers.clone())
467            .unwrap_or_default()
468    })
469}
470
471pub fn subscriber_count(id: &str) -> usize {
472    SESSIONS.with(|s| {
473        s.borrow()
474            .get(id)
475            .map(|state| state.subscribers.len())
476            .unwrap_or(0)
477    })
478}
479
480/// Persist the set of active skill names for session resume. Called at
481/// the end of an agent_loop run; the next `open_or_create` for this id
482/// reads them back via [`active_skills`].
483pub fn set_active_skills(id: &str, skills: Vec<String>) {
484    SESSIONS.with(|s| {
485        if let Some(state) = s.borrow_mut().get_mut(id) {
486            state.active_skills = skills;
487            state.last_accessed = Instant::now();
488        }
489    });
490}
491
492/// Skills that were active at the end of the previous agent_loop run
493/// against this session. Returns an empty vec when the session doesn't
494/// exist or nothing was persisted.
495pub fn active_skills(id: &str) -> Vec<String> {
496    SESSIONS.with(|s| {
497        s.borrow()
498            .get(id)
499            .map(|state| state.active_skills.clone())
500            .unwrap_or_default()
501    })
502}
503
504fn empty_transcript(id: &str) -> VmValue {
505    use crate::llm::helpers::new_transcript_with;
506    new_transcript_with(Some(id.to_string()), Vec::new(), None, None)
507}
508
509fn clone_transcript_with_id(transcript: &VmValue, new_id: &str) -> VmValue {
510    let Some(dict) = transcript.as_dict() else {
511        return empty_transcript(new_id);
512    };
513    let mut next = dict.clone();
514    next.insert(
515        "id".to_string(),
516        VmValue::String(Rc::from(new_id.to_string())),
517    );
518    VmValue::Dict(Rc::new(next))
519}
520
521fn clone_transcript_with_parent(transcript: &VmValue, parent_id: &str) -> VmValue {
522    let Some(dict) = transcript.as_dict() else {
523        return transcript.clone();
524    };
525    let mut next = dict.clone();
526    let metadata = match next.get("metadata") {
527        Some(VmValue::Dict(metadata)) => {
528            let mut metadata = metadata.as_ref().clone();
529            metadata.insert(
530                "parent_session_id".to_string(),
531                VmValue::String(Rc::from(parent_id.to_string())),
532            );
533            VmValue::Dict(Rc::new(metadata))
534        }
535        _ => VmValue::Dict(Rc::new(BTreeMap::from([(
536            "parent_session_id".to_string(),
537            VmValue::String(Rc::from(parent_id.to_string())),
538        )]))),
539    };
540    next.insert("metadata".to_string(), metadata);
541    VmValue::Dict(Rc::new(next))
542}
543
544#[cfg(test)]
545mod tests {
546    use super::*;
547    use std::collections::BTreeMap;
548
549    fn make_msg(role: &str, content: &str) -> VmValue {
550        let mut m: BTreeMap<String, VmValue> = BTreeMap::new();
551        m.insert("role".to_string(), VmValue::String(Rc::from(role)));
552        m.insert("content".to_string(), VmValue::String(Rc::from(content)));
553        VmValue::Dict(Rc::new(m))
554    }
555
556    fn message_count(id: &str) -> usize {
557        SESSIONS.with(|s| {
558            let map = s.borrow();
559            let Some(state) = map.get(id) else { return 0 };
560            let Some(dict) = state.transcript.as_dict() else {
561                return 0;
562            };
563            match dict.get("messages") {
564                Some(VmValue::List(list)) => list.len(),
565                _ => 0,
566            }
567        })
568    }
569
570    #[test]
571    fn fork_at_truncates_destination_to_keep_first() {
572        reset_session_store();
573        let src = open_or_create(Some("src-fork-at".into()));
574        inject_message(&src, make_msg("user", "a")).unwrap();
575        inject_message(&src, make_msg("assistant", "b")).unwrap();
576        inject_message(&src, make_msg("user", "c")).unwrap();
577        inject_message(&src, make_msg("assistant", "d")).unwrap();
578        assert_eq!(message_count(&src), 4);
579
580        let dst = fork_at(&src, 2, Some("dst-fork-at".into())).expect("fork_at");
581        assert_ne!(dst, src);
582        assert_eq!(message_count(&dst), 2, "branched at message index 2");
583        // Source untouched.
584        assert_eq!(message_count(&src), 4);
585        // Subscribers not carried — forks start with a clean fanout list.
586        assert_eq!(subscriber_count(&dst), 0);
587        reset_session_store();
588    }
589
590    #[test]
591    fn fork_at_on_unknown_source_returns_none() {
592        reset_session_store();
593        assert!(fork_at("does-not-exist", 3, None).is_none());
594    }
595
596    #[test]
597    fn child_sessions_record_parent_lineage() {
598        reset_session_store();
599        let parent = open_or_create(Some("parent-session".into()));
600        let child = open_child_session(&parent, Some("child-session".into()));
601        assert_eq!(parent_id(&child).as_deref(), Some("parent-session"));
602        assert_eq!(child_ids(&parent), vec!["child-session".to_string()]);
603
604        let transcript = snapshot(&child).expect("child transcript");
605        let metadata = transcript
606            .as_dict()
607            .and_then(|dict| dict.get("metadata"))
608            .and_then(|value| value.as_dict())
609            .expect("child metadata");
610        assert!(matches!(
611            metadata.get("parent_session_id"),
612            Some(VmValue::String(value)) if value.as_ref() == "parent-session"
613        ));
614    }
615}