1use std::cell::{Cell, RefCell};
21use std::collections::{BTreeMap, HashMap};
22use std::rc::Rc;
23use std::time::Instant;
24
25use crate::value::VmValue;
26
27pub const DEFAULT_SESSION_CAP: usize = 128;
30
31pub struct SessionState {
32 pub id: String,
33 pub transcript: VmValue,
34 pub subscribers: Vec<VmValue>,
35 pub created_at: Instant,
36 pub last_accessed: Instant,
37 pub parent_id: Option<String>,
38 pub child_ids: Vec<String>,
39 pub active_skills: Vec<String>,
45}
46
47impl SessionState {
48 fn new(id: String) -> Self {
49 let now = Instant::now();
50 let transcript = empty_transcript(&id);
51 Self {
52 id,
53 transcript,
54 subscribers: Vec::new(),
55 created_at: now,
56 last_accessed: now,
57 parent_id: None,
58 child_ids: Vec::new(),
59 active_skills: Vec::new(),
60 }
61 }
62}
63
64thread_local! {
65 static SESSIONS: RefCell<HashMap<String, SessionState>> = RefCell::new(HashMap::new());
66 static SESSION_CAP: Cell<usize> = const { Cell::new(DEFAULT_SESSION_CAP) };
67 static CURRENT_SESSION_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
68}
69
70pub fn set_session_cap(cap: usize) {
73 SESSION_CAP.with(|c| c.set(cap.max(1)));
74}
75
76pub fn session_cap() -> usize {
77 SESSION_CAP.with(|c| c.get())
78}
79
80pub fn reset_session_store() {
82 SESSIONS.with(|s| s.borrow_mut().clear());
83 CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().clear());
84}
85
86pub(crate) fn push_current_session(id: String) {
87 if id.is_empty() {
88 return;
89 }
90 CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().push(id));
91}
92
93pub(crate) fn pop_current_session() {
94 CURRENT_SESSION_STACK.with(|stack| {
95 let _ = stack.borrow_mut().pop();
96 });
97}
98
99pub(crate) fn current_session_id() -> Option<String> {
100 CURRENT_SESSION_STACK.with(|stack| stack.borrow().last().cloned())
101}
102
103pub fn exists(id: &str) -> bool {
104 SESSIONS.with(|s| s.borrow().contains_key(id))
105}
106
107pub fn length(id: &str) -> Option<usize> {
108 SESSIONS.with(|s| {
109 s.borrow().get(id).map(|state| {
110 state
111 .transcript
112 .as_dict()
113 .and_then(|d| d.get("messages"))
114 .and_then(|v| match v {
115 VmValue::List(list) => Some(list.len()),
116 _ => None,
117 })
118 .unwrap_or(0)
119 })
120 })
121}
122
123pub fn snapshot(id: &str) -> Option<VmValue> {
124 SESSIONS.with(|s| s.borrow().get(id).map(|state| state.transcript.clone()))
125}
126
127pub fn open_or_create(id: Option<String>) -> String {
136 let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
137 let mut was_new = false;
138 SESSIONS.with(|s| {
139 let mut map = s.borrow_mut();
140 if let Some(state) = map.get_mut(&resolved) {
141 state.last_accessed = Instant::now();
142 return;
143 }
144 was_new = true;
145 let cap = SESSION_CAP.with(|c| c.get());
146 if map.len() >= cap {
147 if let Some(victim) = map
148 .iter()
149 .min_by_key(|(_, state)| state.last_accessed)
150 .map(|(id, _)| id.clone())
151 {
152 map.remove(&victim);
153 }
154 }
155 map.insert(resolved.clone(), SessionState::new(resolved.clone()));
156 });
157 if was_new {
158 try_register_jsonl_event_log(&resolved);
159 }
160 resolved
161}
162
163pub fn open_child_session(parent_id: &str, id: Option<String>) -> String {
164 let resolved = open_or_create(id);
165 link_child_session(parent_id, &resolved);
166 resolved
167}
168
169pub fn link_child_session(parent_id: &str, child_id: &str) {
170 if parent_id == child_id {
171 return;
172 }
173 open_or_create(Some(parent_id.to_string()));
174 open_or_create(Some(child_id.to_string()));
175 SESSIONS.with(|s| {
176 let mut map = s.borrow_mut();
177 if let Some(parent) = map.get_mut(parent_id) {
178 parent.last_accessed = Instant::now();
179 if !parent.child_ids.iter().any(|id| id == child_id) {
180 parent.child_ids.push(child_id.to_string());
181 }
182 }
183 if let Some(child) = map.get_mut(child_id) {
184 child.last_accessed = Instant::now();
185 child.parent_id = Some(parent_id.to_string());
186 child.transcript = clone_transcript_with_parent(&child.transcript, parent_id);
187 }
188 });
189}
190
191pub fn parent_id(id: &str) -> Option<String> {
192 SESSIONS.with(|s| s.borrow().get(id).and_then(|state| state.parent_id.clone()))
193}
194
195pub fn child_ids(id: &str) -> Vec<String> {
196 SESSIONS.with(|s| {
197 s.borrow()
198 .get(id)
199 .map(|state| state.child_ids.clone())
200 .unwrap_or_default()
201 })
202}
203
204fn try_register_jsonl_event_log(session_id: &str) {
209 let Ok(dir) = std::env::var("HARN_EVENT_LOG_DIR") else {
210 return;
211 };
212 if dir.is_empty() {
213 return;
214 }
215 let path = std::path::PathBuf::from(dir).join(format!("event_log-{session_id}.jsonl"));
216 if let Ok(sink) = crate::agent_events::JsonlEventSink::open(&path) {
217 crate::agent_events::register_sink(session_id, sink);
218 }
219}
220
221pub fn close(id: &str) {
222 SESSIONS.with(|s| {
223 s.borrow_mut().remove(id);
224 });
225}
226
227pub fn reset_transcript(id: &str) -> bool {
228 SESSIONS.with(|s| {
229 let mut map = s.borrow_mut();
230 let Some(state) = map.get_mut(id) else {
231 return false;
232 };
233 state.transcript = empty_transcript(id);
234 state.last_accessed = Instant::now();
235 true
236 })
237}
238
239pub fn fork(src_id: &str, dst_id: Option<String>) -> Option<String> {
246 let (src_transcript, dst) = SESSIONS.with(|s| {
247 let mut map = s.borrow_mut();
248 let src = map.get_mut(src_id)?;
249 src.last_accessed = Instant::now();
250 let dst = dst_id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
251 let forked_transcript = clone_transcript_with_id(&src.transcript, &dst);
252 Some((forked_transcript, dst))
253 })?;
254 open_or_create(Some(dst.clone()));
256 SESSIONS.with(|s| {
257 if let Some(state) = s.borrow_mut().get_mut(&dst) {
258 state.transcript = src_transcript;
259 state.last_accessed = Instant::now();
260 }
261 });
262 if exists(&dst) {
266 Some(dst)
267 } else {
268 None
269 }
270}
271
272pub fn fork_at(src_id: &str, keep_first: usize, dst_id: Option<String>) -> Option<String> {
283 let new_id = fork(src_id, dst_id)?;
284 retain_first(&new_id, keep_first);
285 Some(new_id)
286}
287
288fn retain_first(id: &str, keep_first: usize) {
292 SESSIONS.with(|s| {
293 let mut map = s.borrow_mut();
294 let Some(state) = map.get_mut(id) else {
295 return;
296 };
297 let Some(dict) = state.transcript.as_dict() else {
298 return;
299 };
300 let dict = dict.clone();
301 let messages: Vec<VmValue> = match dict.get("messages") {
302 Some(VmValue::List(list)) => list.iter().cloned().collect(),
303 _ => Vec::new(),
304 };
305 let retained: Vec<VmValue> = messages.into_iter().take(keep_first).collect();
306 let mut next = dict;
307 next.insert(
308 "events".to_string(),
309 VmValue::List(Rc::new(
310 crate::llm::helpers::transcript_events_from_messages(&retained),
311 )),
312 );
313 next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
314 state.transcript = VmValue::Dict(Rc::new(next));
315 state.last_accessed = Instant::now();
316 });
317}
318
319pub fn trim(id: &str, keep_last: usize) -> Option<usize> {
322 SESSIONS.with(|s| {
323 let mut map = s.borrow_mut();
324 let state = map.get_mut(id)?;
325 let dict = state.transcript.as_dict()?.clone();
326 let messages: Vec<VmValue> = match dict.get("messages") {
327 Some(VmValue::List(list)) => list.iter().cloned().collect(),
328 _ => Vec::new(),
329 };
330 let start = messages.len().saturating_sub(keep_last);
331 let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
332 let kept = retained.len();
333 let mut next = dict;
334 next.insert(
335 "events".to_string(),
336 VmValue::List(Rc::new(
337 crate::llm::helpers::transcript_events_from_messages(&retained),
338 )),
339 );
340 next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
341 state.transcript = VmValue::Dict(Rc::new(next));
342 state.last_accessed = Instant::now();
343 Some(kept)
344 })
345}
346
347pub fn inject_message(id: &str, message: VmValue) -> Result<(), String> {
350 let Some(msg_dict) = message.as_dict().cloned() else {
351 return Err("agent_session_inject: message must be a dict".into());
352 };
353 let role_ok = matches!(msg_dict.get("role"), Some(VmValue::String(_)));
354 if !role_ok {
355 return Err(
356 "agent_session_inject: message must have a string `role` (user|assistant|tool_result|system)"
357 .into(),
358 );
359 }
360 SESSIONS.with(|s| {
361 let mut map = s.borrow_mut();
362 let Some(state) = map.get_mut(id) else {
363 return Err(format!("agent_session_inject: unknown session id '{id}'"));
364 };
365 let dict = state
366 .transcript
367 .as_dict()
368 .cloned()
369 .unwrap_or_else(BTreeMap::new);
370 let mut messages: Vec<VmValue> = match dict.get("messages") {
371 Some(VmValue::List(list)) => list.iter().cloned().collect(),
372 _ => Vec::new(),
373 };
374 messages.push(VmValue::Dict(Rc::new(msg_dict)));
375 let mut next = dict;
376 next.insert(
377 "events".to_string(),
378 VmValue::List(Rc::new(
379 crate::llm::helpers::transcript_events_from_messages(&messages),
380 )),
381 );
382 next.insert("messages".to_string(), VmValue::List(Rc::new(messages)));
383 state.transcript = VmValue::Dict(Rc::new(next));
384 state.last_accessed = Instant::now();
385 Ok(())
386 })
387}
388
389pub fn messages_json(id: &str) -> Vec<serde_json::Value> {
393 SESSIONS.with(|s| {
394 let map = s.borrow();
395 let Some(state) = map.get(id) else {
396 return Vec::new();
397 };
398 let Some(dict) = state.transcript.as_dict() else {
399 return Vec::new();
400 };
401 match dict.get("messages") {
402 Some(VmValue::List(list)) => list
403 .iter()
404 .map(crate::llm::helpers::vm_value_to_json)
405 .collect(),
406 _ => Vec::new(),
407 }
408 })
409}
410
411pub fn store_transcript(id: &str, transcript: VmValue) {
414 SESSIONS.with(|s| {
415 if let Some(state) = s.borrow_mut().get_mut(id) {
416 state.transcript = transcript;
417 state.last_accessed = Instant::now();
418 }
419 });
420}
421
422pub fn replace_messages(id: &str, messages: &[serde_json::Value]) {
425 SESSIONS.with(|s| {
426 let mut map = s.borrow_mut();
427 let Some(state) = map.get_mut(id) else {
428 return;
429 };
430 let dict = state
431 .transcript
432 .as_dict()
433 .cloned()
434 .unwrap_or_else(BTreeMap::new);
435 let vm_messages: Vec<VmValue> = messages
436 .iter()
437 .map(crate::stdlib::json_to_vm_value)
438 .collect();
439 let mut next = dict;
440 next.insert(
441 "events".to_string(),
442 VmValue::List(Rc::new(
443 crate::llm::helpers::transcript_events_from_messages(&vm_messages),
444 )),
445 );
446 next.insert("messages".to_string(), VmValue::List(Rc::new(vm_messages)));
447 state.transcript = VmValue::Dict(Rc::new(next));
448 state.last_accessed = Instant::now();
449 });
450}
451
452pub fn append_subscriber(id: &str, callback: VmValue) {
453 open_or_create(Some(id.to_string()));
454 SESSIONS.with(|s| {
455 if let Some(state) = s.borrow_mut().get_mut(id) {
456 state.subscribers.push(callback);
457 state.last_accessed = Instant::now();
458 }
459 });
460}
461
462pub fn subscribers_for(id: &str) -> Vec<VmValue> {
463 SESSIONS.with(|s| {
464 s.borrow()
465 .get(id)
466 .map(|state| state.subscribers.clone())
467 .unwrap_or_default()
468 })
469}
470
471pub fn subscriber_count(id: &str) -> usize {
472 SESSIONS.with(|s| {
473 s.borrow()
474 .get(id)
475 .map(|state| state.subscribers.len())
476 .unwrap_or(0)
477 })
478}
479
480pub fn set_active_skills(id: &str, skills: Vec<String>) {
484 SESSIONS.with(|s| {
485 if let Some(state) = s.borrow_mut().get_mut(id) {
486 state.active_skills = skills;
487 state.last_accessed = Instant::now();
488 }
489 });
490}
491
492pub fn active_skills(id: &str) -> Vec<String> {
496 SESSIONS.with(|s| {
497 s.borrow()
498 .get(id)
499 .map(|state| state.active_skills.clone())
500 .unwrap_or_default()
501 })
502}
503
504fn empty_transcript(id: &str) -> VmValue {
505 use crate::llm::helpers::new_transcript_with;
506 new_transcript_with(Some(id.to_string()), Vec::new(), None, None)
507}
508
509fn clone_transcript_with_id(transcript: &VmValue, new_id: &str) -> VmValue {
510 let Some(dict) = transcript.as_dict() else {
511 return empty_transcript(new_id);
512 };
513 let mut next = dict.clone();
514 next.insert(
515 "id".to_string(),
516 VmValue::String(Rc::from(new_id.to_string())),
517 );
518 VmValue::Dict(Rc::new(next))
519}
520
521fn clone_transcript_with_parent(transcript: &VmValue, parent_id: &str) -> VmValue {
522 let Some(dict) = transcript.as_dict() else {
523 return transcript.clone();
524 };
525 let mut next = dict.clone();
526 let metadata = match next.get("metadata") {
527 Some(VmValue::Dict(metadata)) => {
528 let mut metadata = metadata.as_ref().clone();
529 metadata.insert(
530 "parent_session_id".to_string(),
531 VmValue::String(Rc::from(parent_id.to_string())),
532 );
533 VmValue::Dict(Rc::new(metadata))
534 }
535 _ => VmValue::Dict(Rc::new(BTreeMap::from([(
536 "parent_session_id".to_string(),
537 VmValue::String(Rc::from(parent_id.to_string())),
538 )]))),
539 };
540 next.insert("metadata".to_string(), metadata);
541 VmValue::Dict(Rc::new(next))
542}
543
544#[cfg(test)]
545mod tests {
546 use super::*;
547 use std::collections::BTreeMap;
548
549 fn make_msg(role: &str, content: &str) -> VmValue {
550 let mut m: BTreeMap<String, VmValue> = BTreeMap::new();
551 m.insert("role".to_string(), VmValue::String(Rc::from(role)));
552 m.insert("content".to_string(), VmValue::String(Rc::from(content)));
553 VmValue::Dict(Rc::new(m))
554 }
555
556 fn message_count(id: &str) -> usize {
557 SESSIONS.with(|s| {
558 let map = s.borrow();
559 let Some(state) = map.get(id) else { return 0 };
560 let Some(dict) = state.transcript.as_dict() else {
561 return 0;
562 };
563 match dict.get("messages") {
564 Some(VmValue::List(list)) => list.len(),
565 _ => 0,
566 }
567 })
568 }
569
570 #[test]
571 fn fork_at_truncates_destination_to_keep_first() {
572 reset_session_store();
573 let src = open_or_create(Some("src-fork-at".into()));
574 inject_message(&src, make_msg("user", "a")).unwrap();
575 inject_message(&src, make_msg("assistant", "b")).unwrap();
576 inject_message(&src, make_msg("user", "c")).unwrap();
577 inject_message(&src, make_msg("assistant", "d")).unwrap();
578 assert_eq!(message_count(&src), 4);
579
580 let dst = fork_at(&src, 2, Some("dst-fork-at".into())).expect("fork_at");
581 assert_ne!(dst, src);
582 assert_eq!(message_count(&dst), 2, "branched at message index 2");
583 assert_eq!(message_count(&src), 4);
585 assert_eq!(subscriber_count(&dst), 0);
587 reset_session_store();
588 }
589
590 #[test]
591 fn fork_at_on_unknown_source_returns_none() {
592 reset_session_store();
593 assert!(fork_at("does-not-exist", 3, None).is_none());
594 }
595
596 #[test]
597 fn child_sessions_record_parent_lineage() {
598 reset_session_store();
599 let parent = open_or_create(Some("parent-session".into()));
600 let child = open_child_session(&parent, Some("child-session".into()));
601 assert_eq!(parent_id(&child).as_deref(), Some("parent-session"));
602 assert_eq!(child_ids(&parent), vec!["child-session".to_string()]);
603
604 let transcript = snapshot(&child).expect("child transcript");
605 let metadata = transcript
606 .as_dict()
607 .and_then(|dict| dict.get("metadata"))
608 .and_then(|value| value.as_dict())
609 .expect("child metadata");
610 assert!(matches!(
611 metadata.get("parent_session_id"),
612 Some(VmValue::String(value)) if value.as_ref() == "parent-session"
613 ));
614 }
615}