Skip to main content

harn_vm/
agent_sessions.rs

1//! First-class session storage.
2//!
3//! A session owns three things:
4//!
5//! 1. A transcript dict (messages, events, summary, metadata, …).
6//! 2. Closure subscribers that fire on agent-loop events for this session.
7//! 3. Its own lifecycle (open, reset, fork, trim, compact, close).
8//!
9//! Storage is thread-local because `VmValue` contains `Rc`, which is
10//! neither `Send` nor `Sync`. The agent loop runs on a tokio
11//! current-thread worker, so all session reads and writes happen on the
12//! same thread. The closure-subscribers register, fire, and unregister
13//! on that same thread.
14//!
15//! Lifecycle is explicit. Builtins (`agent_session_open`,
16//! `_reset`, `_fork`, `_close`, `_trim`, `_compact`, `_inject`,
17//! `_exists`, `_length`, `_snapshot`) drive the store directly — there
18//! is no "policy" config dict that performs lifecycle as a side effect.
19
20use std::cell::{Cell, RefCell};
21use std::collections::{BTreeMap, HashMap};
22use std::rc::Rc;
23use std::time::Instant;
24
25use crate::value::VmValue;
26
27/// Default cap on concurrent sessions per VM thread. Beyond this the
28/// least-recently-accessed session is evicted on the next `open`.
29pub const DEFAULT_SESSION_CAP: usize = 128;
30
31pub struct SessionState {
32    pub id: String,
33    pub transcript: VmValue,
34    pub subscribers: Vec<VmValue>,
35    pub created_at: Instant,
36    pub last_accessed: Instant,
37    /// Names of skills that were active at the end of the most recent
38    /// `agent_loop` run on this session. Empty when no skills were
39    /// matched, when the skill system wasn't used, or when the
40    /// deactivation phase cleared them. Re-entering the session
41    /// restores these as the initial active set before matching runs.
42    pub active_skills: Vec<String>,
43}
44
45impl SessionState {
46    fn new(id: String) -> Self {
47        let now = Instant::now();
48        let transcript = empty_transcript(&id);
49        Self {
50            id,
51            transcript,
52            subscribers: Vec::new(),
53            created_at: now,
54            last_accessed: now,
55            active_skills: Vec::new(),
56        }
57    }
58}
59
60thread_local! {
61    static SESSIONS: RefCell<HashMap<String, SessionState>> = RefCell::new(HashMap::new());
62    static SESSION_CAP: Cell<usize> = const { Cell::new(DEFAULT_SESSION_CAP) };
63}
64
65/// Set the per-thread session cap. Primarily for tests; production VMs
66/// inherit the default.
67pub fn set_session_cap(cap: usize) {
68    SESSION_CAP.with(|c| c.set(cap.max(1)));
69}
70
71pub fn session_cap() -> usize {
72    SESSION_CAP.with(|c| c.get())
73}
74
75/// Clear the session store. Wired into `reset_llm_state` for test isolation.
76pub fn reset_session_store() {
77    SESSIONS.with(|s| s.borrow_mut().clear());
78}
79
80pub fn exists(id: &str) -> bool {
81    SESSIONS.with(|s| s.borrow().contains_key(id))
82}
83
84pub fn length(id: &str) -> Option<usize> {
85    SESSIONS.with(|s| {
86        s.borrow().get(id).map(|state| {
87            state
88                .transcript
89                .as_dict()
90                .and_then(|d| d.get("messages"))
91                .and_then(|v| match v {
92                    VmValue::List(list) => Some(list.len()),
93                    _ => None,
94                })
95                .unwrap_or(0)
96        })
97    })
98}
99
100pub fn snapshot(id: &str) -> Option<VmValue> {
101    SESSIONS.with(|s| s.borrow().get(id).map(|state| state.transcript.clone()))
102}
103
104/// Open a session, or create it if missing. Returns the resolved id.
105///
106/// When the `HARN_EVENT_LOG_DIR` environment variable points at an
107/// existing (or creatable) directory, a [`JsonlEventSink`] is
108/// auto-registered against the newly-created session so the agent
109/// loop's AgentEvent stream persists to `event_log-<session_id>.jsonl`.
110/// Re-opening an existing session does not re-register — sinks are
111/// per-session, owned by the first opener.
112pub fn open_or_create(id: Option<String>) -> String {
113    let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
114    let mut was_new = false;
115    SESSIONS.with(|s| {
116        let mut map = s.borrow_mut();
117        if let Some(state) = map.get_mut(&resolved) {
118            state.last_accessed = Instant::now();
119            return;
120        }
121        was_new = true;
122        let cap = SESSION_CAP.with(|c| c.get());
123        if map.len() >= cap {
124            if let Some(victim) = map
125                .iter()
126                .min_by_key(|(_, state)| state.last_accessed)
127                .map(|(id, _)| id.clone())
128            {
129                map.remove(&victim);
130            }
131        }
132        map.insert(resolved.clone(), SessionState::new(resolved.clone()));
133    });
134    if was_new {
135        try_register_jsonl_event_log(&resolved);
136    }
137    resolved
138}
139
140/// Auto-register a [`JsonlEventSink`] for a newly-created session
141/// when `HARN_EVENT_LOG_DIR` is set. Silent no-op when the env var
142/// is missing or the file can't be opened — a broken log sink must
143/// never prevent a session from starting.
144fn try_register_jsonl_event_log(session_id: &str) {
145    let Ok(dir) = std::env::var("HARN_EVENT_LOG_DIR") else {
146        return;
147    };
148    if dir.is_empty() {
149        return;
150    }
151    let path = std::path::PathBuf::from(dir).join(format!("event_log-{session_id}.jsonl"));
152    if let Ok(sink) = crate::agent_events::JsonlEventSink::open(&path) {
153        crate::agent_events::register_sink(session_id, sink);
154    }
155}
156
157pub fn close(id: &str) {
158    SESSIONS.with(|s| {
159        s.borrow_mut().remove(id);
160    });
161}
162
163pub fn reset_transcript(id: &str) -> bool {
164    SESSIONS.with(|s| {
165        let mut map = s.borrow_mut();
166        let Some(state) = map.get_mut(id) else {
167            return false;
168        };
169        state.transcript = empty_transcript(id);
170        state.last_accessed = Instant::now();
171        true
172    })
173}
174
175/// Copy `src`'s transcript into a new session id. Subscribers are NOT
176/// copied — a fork is a conversation branch, not an event fanout.
177///
178/// Touches `src`'s `last_accessed` before evicting, so the fork
179/// operation itself can't make `src` look stale and kick it out of
180/// the LRU just to make room for the new fork.
181pub fn fork(src_id: &str, dst_id: Option<String>) -> Option<String> {
182    let (src_transcript, dst) = SESSIONS.with(|s| {
183        let mut map = s.borrow_mut();
184        let src = map.get_mut(src_id)?;
185        src.last_accessed = Instant::now();
186        let dst = dst_id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
187        let forked_transcript = clone_transcript_with_id(&src.transcript, &dst);
188        Some((forked_transcript, dst))
189    })?;
190    // Ensure cap is respected when inserting the fork.
191    open_or_create(Some(dst.clone()));
192    SESSIONS.with(|s| {
193        if let Some(state) = s.borrow_mut().get_mut(&dst) {
194            state.transcript = src_transcript;
195            state.last_accessed = Instant::now();
196        }
197    });
198    // open_or_create evicts BEFORE inserting, so the dst slot is
199    // guaranteed once we get here. The existence check is cheap
200    // insurance against a future refactor that breaks that invariant.
201    if exists(&dst) {
202        Some(dst)
203    } else {
204        None
205    }
206}
207
208/// Fork `src_id` and truncate the destination transcript to the
209/// first `keep_first` messages (#105 — branch-replay). Pairs with the
210/// scrubber: the IDE picks an event index, rebuilds a message count,
211/// and calls this to spawn a live sibling session that resumes from
212/// the rebuilt state. Subscribers are not carried over (same as
213/// `fork`), so sibling events don't double-fan into the parent's
214/// consumers.
215///
216/// Returns the new session id on success, `None` if `src_id` doesn't
217/// exist.
218pub fn fork_at(src_id: &str, keep_first: usize, dst_id: Option<String>) -> Option<String> {
219    let new_id = fork(src_id, dst_id)?;
220    retain_first(&new_id, keep_first);
221    Some(new_id)
222}
223
224/// Truncate the session transcript to the first `keep_first`
225/// messages (opposite of `trim`, which keeps the last N). Used by
226/// `fork_at` to cut a branch at a scrubber position.
227fn retain_first(id: &str, keep_first: usize) {
228    SESSIONS.with(|s| {
229        let mut map = s.borrow_mut();
230        let Some(state) = map.get_mut(id) else {
231            return;
232        };
233        let Some(dict) = state.transcript.as_dict() else {
234            return;
235        };
236        let dict = dict.clone();
237        let messages: Vec<VmValue> = match dict.get("messages") {
238            Some(VmValue::List(list)) => list.iter().cloned().collect(),
239            _ => Vec::new(),
240        };
241        let retained: Vec<VmValue> = messages.into_iter().take(keep_first).collect();
242        let mut next = dict;
243        next.insert(
244            "events".to_string(),
245            VmValue::List(Rc::new(
246                crate::llm::helpers::transcript_events_from_messages(&retained),
247            )),
248        );
249        next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
250        state.transcript = VmValue::Dict(Rc::new(next));
251        state.last_accessed = Instant::now();
252    });
253}
254
255/// Retain only the last `keep_last` messages in the session transcript.
256/// Returns the kept count (<= keep_last).
257pub fn trim(id: &str, keep_last: usize) -> Option<usize> {
258    SESSIONS.with(|s| {
259        let mut map = s.borrow_mut();
260        let state = map.get_mut(id)?;
261        let dict = state.transcript.as_dict()?.clone();
262        let messages: Vec<VmValue> = match dict.get("messages") {
263            Some(VmValue::List(list)) => list.iter().cloned().collect(),
264            _ => Vec::new(),
265        };
266        let start = messages.len().saturating_sub(keep_last);
267        let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
268        let kept = retained.len();
269        let mut next = dict;
270        next.insert(
271            "events".to_string(),
272            VmValue::List(Rc::new(
273                crate::llm::helpers::transcript_events_from_messages(&retained),
274            )),
275        );
276        next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
277        state.transcript = VmValue::Dict(Rc::new(next));
278        state.last_accessed = Instant::now();
279        Some(kept)
280    })
281}
282
283/// Append a message dict to the session transcript. The message must
284/// have at least a string `role`; anything else is merged verbatim.
285pub fn inject_message(id: &str, message: VmValue) -> Result<(), String> {
286    let Some(msg_dict) = message.as_dict().cloned() else {
287        return Err("agent_session_inject: message must be a dict".into());
288    };
289    let role_ok = matches!(msg_dict.get("role"), Some(VmValue::String(_)));
290    if !role_ok {
291        return Err(
292            "agent_session_inject: message must have a string `role` (user|assistant|tool_result|system)"
293                .into(),
294        );
295    }
296    SESSIONS.with(|s| {
297        let mut map = s.borrow_mut();
298        let Some(state) = map.get_mut(id) else {
299            return Err(format!("agent_session_inject: unknown session id '{id}'"));
300        };
301        let dict = state
302            .transcript
303            .as_dict()
304            .cloned()
305            .unwrap_or_else(BTreeMap::new);
306        let mut messages: Vec<VmValue> = match dict.get("messages") {
307            Some(VmValue::List(list)) => list.iter().cloned().collect(),
308            _ => Vec::new(),
309        };
310        messages.push(VmValue::Dict(Rc::new(msg_dict)));
311        let mut next = dict;
312        next.insert(
313            "events".to_string(),
314            VmValue::List(Rc::new(
315                crate::llm::helpers::transcript_events_from_messages(&messages),
316            )),
317        );
318        next.insert("messages".to_string(), VmValue::List(Rc::new(messages)));
319        state.transcript = VmValue::Dict(Rc::new(next));
320        state.last_accessed = Instant::now();
321        Ok(())
322    })
323}
324
325/// Load the messages vec (as JSON) for this session, for use as prefix
326/// to an agent_loop run. Returns an empty vec if the session doesn't
327/// exist or has no messages.
328pub fn messages_json(id: &str) -> Vec<serde_json::Value> {
329    SESSIONS.with(|s| {
330        let map = s.borrow();
331        let Some(state) = map.get(id) else {
332            return Vec::new();
333        };
334        let Some(dict) = state.transcript.as_dict() else {
335            return Vec::new();
336        };
337        match dict.get("messages") {
338            Some(VmValue::List(list)) => list
339                .iter()
340                .map(crate::llm::helpers::vm_value_to_json)
341                .collect(),
342            _ => Vec::new(),
343        }
344    })
345}
346
347/// Overwrite the transcript for this session. Used by `agent_loop` on
348/// exit to persist the synthesized transcript.
349pub fn store_transcript(id: &str, transcript: VmValue) {
350    SESSIONS.with(|s| {
351        if let Some(state) = s.borrow_mut().get_mut(id) {
352            state.transcript = transcript;
353            state.last_accessed = Instant::now();
354        }
355    });
356}
357
358/// Replace the transcript's message list wholesale. Used by the
359/// in-loop compaction path, which operates on JSON messages.
360pub fn replace_messages(id: &str, messages: &[serde_json::Value]) {
361    SESSIONS.with(|s| {
362        let mut map = s.borrow_mut();
363        let Some(state) = map.get_mut(id) else {
364            return;
365        };
366        let dict = state
367            .transcript
368            .as_dict()
369            .cloned()
370            .unwrap_or_else(BTreeMap::new);
371        let vm_messages: Vec<VmValue> = messages
372            .iter()
373            .map(crate::stdlib::json_to_vm_value)
374            .collect();
375        let mut next = dict;
376        next.insert(
377            "events".to_string(),
378            VmValue::List(Rc::new(
379                crate::llm::helpers::transcript_events_from_messages(&vm_messages),
380            )),
381        );
382        next.insert("messages".to_string(), VmValue::List(Rc::new(vm_messages)));
383        state.transcript = VmValue::Dict(Rc::new(next));
384        state.last_accessed = Instant::now();
385    });
386}
387
388pub fn append_subscriber(id: &str, callback: VmValue) {
389    open_or_create(Some(id.to_string()));
390    SESSIONS.with(|s| {
391        if let Some(state) = s.borrow_mut().get_mut(id) {
392            state.subscribers.push(callback);
393            state.last_accessed = Instant::now();
394        }
395    });
396}
397
398pub fn subscribers_for(id: &str) -> Vec<VmValue> {
399    SESSIONS.with(|s| {
400        s.borrow()
401            .get(id)
402            .map(|state| state.subscribers.clone())
403            .unwrap_or_default()
404    })
405}
406
407pub fn subscriber_count(id: &str) -> usize {
408    SESSIONS.with(|s| {
409        s.borrow()
410            .get(id)
411            .map(|state| state.subscribers.len())
412            .unwrap_or(0)
413    })
414}
415
416/// Persist the set of active skill names for session resume. Called at
417/// the end of an agent_loop run; the next `open_or_create` for this id
418/// reads them back via [`active_skills`].
419pub fn set_active_skills(id: &str, skills: Vec<String>) {
420    SESSIONS.with(|s| {
421        if let Some(state) = s.borrow_mut().get_mut(id) {
422            state.active_skills = skills;
423            state.last_accessed = Instant::now();
424        }
425    });
426}
427
428/// Skills that were active at the end of the previous agent_loop run
429/// against this session. Returns an empty vec when the session doesn't
430/// exist or nothing was persisted.
431pub fn active_skills(id: &str) -> Vec<String> {
432    SESSIONS.with(|s| {
433        s.borrow()
434            .get(id)
435            .map(|state| state.active_skills.clone())
436            .unwrap_or_default()
437    })
438}
439
440fn empty_transcript(id: &str) -> VmValue {
441    use crate::llm::helpers::new_transcript_with;
442    new_transcript_with(Some(id.to_string()), Vec::new(), None, None)
443}
444
445fn clone_transcript_with_id(transcript: &VmValue, new_id: &str) -> VmValue {
446    let Some(dict) = transcript.as_dict() else {
447        return empty_transcript(new_id);
448    };
449    let mut next = dict.clone();
450    next.insert(
451        "id".to_string(),
452        VmValue::String(Rc::from(new_id.to_string())),
453    );
454    VmValue::Dict(Rc::new(next))
455}
456
457#[cfg(test)]
458mod tests {
459    use super::*;
460    use std::collections::BTreeMap;
461
462    fn make_msg(role: &str, content: &str) -> VmValue {
463        let mut m: BTreeMap<String, VmValue> = BTreeMap::new();
464        m.insert("role".to_string(), VmValue::String(Rc::from(role)));
465        m.insert("content".to_string(), VmValue::String(Rc::from(content)));
466        VmValue::Dict(Rc::new(m))
467    }
468
469    fn message_count(id: &str) -> usize {
470        SESSIONS.with(|s| {
471            let map = s.borrow();
472            let Some(state) = map.get(id) else { return 0 };
473            let Some(dict) = state.transcript.as_dict() else {
474                return 0;
475            };
476            match dict.get("messages") {
477                Some(VmValue::List(list)) => list.len(),
478                _ => 0,
479            }
480        })
481    }
482
483    #[test]
484    fn fork_at_truncates_destination_to_keep_first() {
485        reset_session_store();
486        let src = open_or_create(Some("src-fork-at".into()));
487        inject_message(&src, make_msg("user", "a")).unwrap();
488        inject_message(&src, make_msg("assistant", "b")).unwrap();
489        inject_message(&src, make_msg("user", "c")).unwrap();
490        inject_message(&src, make_msg("assistant", "d")).unwrap();
491        assert_eq!(message_count(&src), 4);
492
493        let dst = fork_at(&src, 2, Some("dst-fork-at".into())).expect("fork_at");
494        assert_ne!(dst, src);
495        assert_eq!(message_count(&dst), 2, "branched at message index 2");
496        // Source untouched.
497        assert_eq!(message_count(&src), 4);
498        // Subscribers not carried — forks start with a clean fanout list.
499        assert_eq!(subscriber_count(&dst), 0);
500        reset_session_store();
501    }
502
503    #[test]
504    fn fork_at_on_unknown_source_returns_none() {
505        reset_session_store();
506        assert!(fork_at("does-not-exist", 3, None).is_none());
507    }
508}