1use std::cell::{Cell, RefCell};
21use std::collections::{BTreeMap, HashMap};
22use std::rc::Rc;
23use std::time::Instant;
24
25use crate::value::VmValue;
26
27pub const DEFAULT_SESSION_CAP: usize = 128;
30
31pub struct SessionState {
32 pub id: String,
33 pub transcript: VmValue,
34 pub subscribers: Vec<VmValue>,
35 pub created_at: Instant,
36 pub last_accessed: Instant,
37 pub active_skills: Vec<String>,
43}
44
45impl SessionState {
46 fn new(id: String) -> Self {
47 let now = Instant::now();
48 let transcript = empty_transcript(&id);
49 Self {
50 id,
51 transcript,
52 subscribers: Vec::new(),
53 created_at: now,
54 last_accessed: now,
55 active_skills: Vec::new(),
56 }
57 }
58}
59
60thread_local! {
61 static SESSIONS: RefCell<HashMap<String, SessionState>> = RefCell::new(HashMap::new());
62 static SESSION_CAP: Cell<usize> = const { Cell::new(DEFAULT_SESSION_CAP) };
63}
64
65pub fn set_session_cap(cap: usize) {
68 SESSION_CAP.with(|c| c.set(cap.max(1)));
69}
70
71pub fn session_cap() -> usize {
72 SESSION_CAP.with(|c| c.get())
73}
74
75pub fn reset_session_store() {
77 SESSIONS.with(|s| s.borrow_mut().clear());
78}
79
80pub fn exists(id: &str) -> bool {
81 SESSIONS.with(|s| s.borrow().contains_key(id))
82}
83
84pub fn length(id: &str) -> Option<usize> {
85 SESSIONS.with(|s| {
86 s.borrow().get(id).map(|state| {
87 state
88 .transcript
89 .as_dict()
90 .and_then(|d| d.get("messages"))
91 .and_then(|v| match v {
92 VmValue::List(list) => Some(list.len()),
93 _ => None,
94 })
95 .unwrap_or(0)
96 })
97 })
98}
99
100pub fn snapshot(id: &str) -> Option<VmValue> {
101 SESSIONS.with(|s| s.borrow().get(id).map(|state| state.transcript.clone()))
102}
103
104pub fn open_or_create(id: Option<String>) -> String {
113 let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
114 let mut was_new = false;
115 SESSIONS.with(|s| {
116 let mut map = s.borrow_mut();
117 if let Some(state) = map.get_mut(&resolved) {
118 state.last_accessed = Instant::now();
119 return;
120 }
121 was_new = true;
122 let cap = SESSION_CAP.with(|c| c.get());
123 if map.len() >= cap {
124 if let Some(victim) = map
125 .iter()
126 .min_by_key(|(_, state)| state.last_accessed)
127 .map(|(id, _)| id.clone())
128 {
129 map.remove(&victim);
130 }
131 }
132 map.insert(resolved.clone(), SessionState::new(resolved.clone()));
133 });
134 if was_new {
135 try_register_jsonl_event_log(&resolved);
136 }
137 resolved
138}
139
140fn try_register_jsonl_event_log(session_id: &str) {
145 let Ok(dir) = std::env::var("HARN_EVENT_LOG_DIR") else {
146 return;
147 };
148 if dir.is_empty() {
149 return;
150 }
151 let path = std::path::PathBuf::from(dir).join(format!("event_log-{session_id}.jsonl"));
152 if let Ok(sink) = crate::agent_events::JsonlEventSink::open(&path) {
153 crate::agent_events::register_sink(session_id, sink);
154 }
155}
156
157pub fn close(id: &str) {
158 SESSIONS.with(|s| {
159 s.borrow_mut().remove(id);
160 });
161}
162
163pub fn reset_transcript(id: &str) -> bool {
164 SESSIONS.with(|s| {
165 let mut map = s.borrow_mut();
166 let Some(state) = map.get_mut(id) else {
167 return false;
168 };
169 state.transcript = empty_transcript(id);
170 state.last_accessed = Instant::now();
171 true
172 })
173}
174
175pub fn fork(src_id: &str, dst_id: Option<String>) -> Option<String> {
182 let (src_transcript, dst) = SESSIONS.with(|s| {
183 let mut map = s.borrow_mut();
184 let src = map.get_mut(src_id)?;
185 src.last_accessed = Instant::now();
186 let dst = dst_id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
187 let forked_transcript = clone_transcript_with_id(&src.transcript, &dst);
188 Some((forked_transcript, dst))
189 })?;
190 open_or_create(Some(dst.clone()));
192 SESSIONS.with(|s| {
193 if let Some(state) = s.borrow_mut().get_mut(&dst) {
194 state.transcript = src_transcript;
195 state.last_accessed = Instant::now();
196 }
197 });
198 if exists(&dst) {
202 Some(dst)
203 } else {
204 None
205 }
206}
207
208pub fn fork_at(src_id: &str, keep_first: usize, dst_id: Option<String>) -> Option<String> {
219 let new_id = fork(src_id, dst_id)?;
220 retain_first(&new_id, keep_first);
221 Some(new_id)
222}
223
224fn retain_first(id: &str, keep_first: usize) {
228 SESSIONS.with(|s| {
229 let mut map = s.borrow_mut();
230 let Some(state) = map.get_mut(id) else {
231 return;
232 };
233 let Some(dict) = state.transcript.as_dict() else {
234 return;
235 };
236 let dict = dict.clone();
237 let messages: Vec<VmValue> = match dict.get("messages") {
238 Some(VmValue::List(list)) => list.iter().cloned().collect(),
239 _ => Vec::new(),
240 };
241 let retained: Vec<VmValue> = messages.into_iter().take(keep_first).collect();
242 let mut next = dict;
243 next.insert(
244 "events".to_string(),
245 VmValue::List(Rc::new(
246 crate::llm::helpers::transcript_events_from_messages(&retained),
247 )),
248 );
249 next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
250 state.transcript = VmValue::Dict(Rc::new(next));
251 state.last_accessed = Instant::now();
252 });
253}
254
255pub fn trim(id: &str, keep_last: usize) -> Option<usize> {
258 SESSIONS.with(|s| {
259 let mut map = s.borrow_mut();
260 let state = map.get_mut(id)?;
261 let dict = state.transcript.as_dict()?.clone();
262 let messages: Vec<VmValue> = match dict.get("messages") {
263 Some(VmValue::List(list)) => list.iter().cloned().collect(),
264 _ => Vec::new(),
265 };
266 let start = messages.len().saturating_sub(keep_last);
267 let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
268 let kept = retained.len();
269 let mut next = dict;
270 next.insert(
271 "events".to_string(),
272 VmValue::List(Rc::new(
273 crate::llm::helpers::transcript_events_from_messages(&retained),
274 )),
275 );
276 next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
277 state.transcript = VmValue::Dict(Rc::new(next));
278 state.last_accessed = Instant::now();
279 Some(kept)
280 })
281}
282
283pub fn inject_message(id: &str, message: VmValue) -> Result<(), String> {
286 let Some(msg_dict) = message.as_dict().cloned() else {
287 return Err("agent_session_inject: message must be a dict".into());
288 };
289 let role_ok = matches!(msg_dict.get("role"), Some(VmValue::String(_)));
290 if !role_ok {
291 return Err(
292 "agent_session_inject: message must have a string `role` (user|assistant|tool_result|system)"
293 .into(),
294 );
295 }
296 SESSIONS.with(|s| {
297 let mut map = s.borrow_mut();
298 let Some(state) = map.get_mut(id) else {
299 return Err(format!("agent_session_inject: unknown session id '{id}'"));
300 };
301 let dict = state
302 .transcript
303 .as_dict()
304 .cloned()
305 .unwrap_or_else(BTreeMap::new);
306 let mut messages: Vec<VmValue> = match dict.get("messages") {
307 Some(VmValue::List(list)) => list.iter().cloned().collect(),
308 _ => Vec::new(),
309 };
310 messages.push(VmValue::Dict(Rc::new(msg_dict)));
311 let mut next = dict;
312 next.insert(
313 "events".to_string(),
314 VmValue::List(Rc::new(
315 crate::llm::helpers::transcript_events_from_messages(&messages),
316 )),
317 );
318 next.insert("messages".to_string(), VmValue::List(Rc::new(messages)));
319 state.transcript = VmValue::Dict(Rc::new(next));
320 state.last_accessed = Instant::now();
321 Ok(())
322 })
323}
324
325pub fn messages_json(id: &str) -> Vec<serde_json::Value> {
329 SESSIONS.with(|s| {
330 let map = s.borrow();
331 let Some(state) = map.get(id) else {
332 return Vec::new();
333 };
334 let Some(dict) = state.transcript.as_dict() else {
335 return Vec::new();
336 };
337 match dict.get("messages") {
338 Some(VmValue::List(list)) => list
339 .iter()
340 .map(crate::llm::helpers::vm_value_to_json)
341 .collect(),
342 _ => Vec::new(),
343 }
344 })
345}
346
347pub fn store_transcript(id: &str, transcript: VmValue) {
350 SESSIONS.with(|s| {
351 if let Some(state) = s.borrow_mut().get_mut(id) {
352 state.transcript = transcript;
353 state.last_accessed = Instant::now();
354 }
355 });
356}
357
358pub fn replace_messages(id: &str, messages: &[serde_json::Value]) {
361 SESSIONS.with(|s| {
362 let mut map = s.borrow_mut();
363 let Some(state) = map.get_mut(id) else {
364 return;
365 };
366 let dict = state
367 .transcript
368 .as_dict()
369 .cloned()
370 .unwrap_or_else(BTreeMap::new);
371 let vm_messages: Vec<VmValue> = messages
372 .iter()
373 .map(crate::stdlib::json_to_vm_value)
374 .collect();
375 let mut next = dict;
376 next.insert(
377 "events".to_string(),
378 VmValue::List(Rc::new(
379 crate::llm::helpers::transcript_events_from_messages(&vm_messages),
380 )),
381 );
382 next.insert("messages".to_string(), VmValue::List(Rc::new(vm_messages)));
383 state.transcript = VmValue::Dict(Rc::new(next));
384 state.last_accessed = Instant::now();
385 });
386}
387
388pub fn append_subscriber(id: &str, callback: VmValue) {
389 open_or_create(Some(id.to_string()));
390 SESSIONS.with(|s| {
391 if let Some(state) = s.borrow_mut().get_mut(id) {
392 state.subscribers.push(callback);
393 state.last_accessed = Instant::now();
394 }
395 });
396}
397
398pub fn subscribers_for(id: &str) -> Vec<VmValue> {
399 SESSIONS.with(|s| {
400 s.borrow()
401 .get(id)
402 .map(|state| state.subscribers.clone())
403 .unwrap_or_default()
404 })
405}
406
407pub fn subscriber_count(id: &str) -> usize {
408 SESSIONS.with(|s| {
409 s.borrow()
410 .get(id)
411 .map(|state| state.subscribers.len())
412 .unwrap_or(0)
413 })
414}
415
416pub fn set_active_skills(id: &str, skills: Vec<String>) {
420 SESSIONS.with(|s| {
421 if let Some(state) = s.borrow_mut().get_mut(id) {
422 state.active_skills = skills;
423 state.last_accessed = Instant::now();
424 }
425 });
426}
427
428pub fn active_skills(id: &str) -> Vec<String> {
432 SESSIONS.with(|s| {
433 s.borrow()
434 .get(id)
435 .map(|state| state.active_skills.clone())
436 .unwrap_or_default()
437 })
438}
439
440fn empty_transcript(id: &str) -> VmValue {
441 use crate::llm::helpers::new_transcript_with;
442 new_transcript_with(Some(id.to_string()), Vec::new(), None, None)
443}
444
445fn clone_transcript_with_id(transcript: &VmValue, new_id: &str) -> VmValue {
446 let Some(dict) = transcript.as_dict() else {
447 return empty_transcript(new_id);
448 };
449 let mut next = dict.clone();
450 next.insert(
451 "id".to_string(),
452 VmValue::String(Rc::from(new_id.to_string())),
453 );
454 VmValue::Dict(Rc::new(next))
455}
456
457#[cfg(test)]
458mod tests {
459 use super::*;
460 use std::collections::BTreeMap;
461
462 fn make_msg(role: &str, content: &str) -> VmValue {
463 let mut m: BTreeMap<String, VmValue> = BTreeMap::new();
464 m.insert("role".to_string(), VmValue::String(Rc::from(role)));
465 m.insert("content".to_string(), VmValue::String(Rc::from(content)));
466 VmValue::Dict(Rc::new(m))
467 }
468
469 fn message_count(id: &str) -> usize {
470 SESSIONS.with(|s| {
471 let map = s.borrow();
472 let Some(state) = map.get(id) else { return 0 };
473 let Some(dict) = state.transcript.as_dict() else {
474 return 0;
475 };
476 match dict.get("messages") {
477 Some(VmValue::List(list)) => list.len(),
478 _ => 0,
479 }
480 })
481 }
482
483 #[test]
484 fn fork_at_truncates_destination_to_keep_first() {
485 reset_session_store();
486 let src = open_or_create(Some("src-fork-at".into()));
487 inject_message(&src, make_msg("user", "a")).unwrap();
488 inject_message(&src, make_msg("assistant", "b")).unwrap();
489 inject_message(&src, make_msg("user", "c")).unwrap();
490 inject_message(&src, make_msg("assistant", "d")).unwrap();
491 assert_eq!(message_count(&src), 4);
492
493 let dst = fork_at(&src, 2, Some("dst-fork-at".into())).expect("fork_at");
494 assert_ne!(dst, src);
495 assert_eq!(message_count(&dst), 2, "branched at message index 2");
496 assert_eq!(message_count(&src), 4);
498 assert_eq!(subscriber_count(&dst), 0);
500 reset_session_store();
501 }
502
503 #[test]
504 fn fork_at_on_unknown_source_returns_none() {
505 reset_session_store();
506 assert!(fork_at("does-not-exist", 3, None).is_none());
507 }
508}