1use std::cell::{Cell, RefCell};
21use std::collections::{BTreeMap, HashMap};
22use std::rc::Rc;
23use std::time::Instant;
24
25use crate::value::VmValue;
26
27pub const DEFAULT_SESSION_CAP: usize = 128;
30
31pub struct SessionState {
32 pub id: String,
33 pub transcript: VmValue,
34 pub subscribers: Vec<VmValue>,
35 pub created_at: Instant,
36 pub last_accessed: Instant,
37 pub parent_id: Option<String>,
38 pub child_ids: Vec<String>,
39 pub active_skills: Vec<String>,
45}
46
47impl SessionState {
48 fn new(id: String) -> Self {
49 let now = Instant::now();
50 let transcript = empty_transcript(&id);
51 Self {
52 id,
53 transcript,
54 subscribers: Vec::new(),
55 created_at: now,
56 last_accessed: now,
57 parent_id: None,
58 child_ids: Vec::new(),
59 active_skills: Vec::new(),
60 }
61 }
62}
63
64thread_local! {
65 static SESSIONS: RefCell<HashMap<String, SessionState>> = RefCell::new(HashMap::new());
66 static SESSION_CAP: Cell<usize> = const { Cell::new(DEFAULT_SESSION_CAP) };
67 static CURRENT_SESSION_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
68}
69
70pub fn set_session_cap(cap: usize) {
73 SESSION_CAP.with(|c| c.set(cap.max(1)));
74}
75
76pub fn session_cap() -> usize {
77 SESSION_CAP.with(|c| c.get())
78}
79
80pub fn reset_session_store() {
82 SESSIONS.with(|s| s.borrow_mut().clear());
83 CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().clear());
84}
85
86pub(crate) fn push_current_session(id: String) {
87 if id.is_empty() {
88 return;
89 }
90 CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().push(id));
91}
92
93pub(crate) fn pop_current_session() {
94 CURRENT_SESSION_STACK.with(|stack| {
95 let _ = stack.borrow_mut().pop();
96 });
97}
98
99pub(crate) fn current_session_id() -> Option<String> {
100 CURRENT_SESSION_STACK.with(|stack| stack.borrow().last().cloned())
101}
102
103pub fn exists(id: &str) -> bool {
104 SESSIONS.with(|s| s.borrow().contains_key(id))
105}
106
107pub fn length(id: &str) -> Option<usize> {
108 SESSIONS.with(|s| {
109 s.borrow().get(id).map(|state| {
110 state
111 .transcript
112 .as_dict()
113 .and_then(|d| d.get("messages"))
114 .and_then(|v| match v {
115 VmValue::List(list) => Some(list.len()),
116 _ => None,
117 })
118 .unwrap_or(0)
119 })
120 })
121}
122
123pub fn snapshot(id: &str) -> Option<VmValue> {
124 SESSIONS.with(|s| s.borrow().get(id).map(|state| state.transcript.clone()))
125}
126
127pub fn open_or_create(id: Option<String>) -> String {
136 let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
137 let mut was_new = false;
138 SESSIONS.with(|s| {
139 let mut map = s.borrow_mut();
140 if let Some(state) = map.get_mut(&resolved) {
141 state.last_accessed = Instant::now();
142 return;
143 }
144 was_new = true;
145 let cap = SESSION_CAP.with(|c| c.get());
146 if map.len() >= cap {
147 if let Some(victim) = map
148 .iter()
149 .min_by_key(|(_, state)| state.last_accessed)
150 .map(|(id, _)| id.clone())
151 {
152 map.remove(&victim);
153 }
154 }
155 map.insert(resolved.clone(), SessionState::new(resolved.clone()));
156 });
157 if was_new {
158 try_register_jsonl_event_log(&resolved);
159 }
160 resolved
161}
162
163pub fn open_child_session(parent_id: &str, id: Option<String>) -> String {
164 let resolved = fork(parent_id, id.clone()).unwrap_or_else(|| open_or_create(id));
165 link_child_session(parent_id, &resolved);
166 resolved
167}
168
169pub fn link_child_session(parent_id: &str, child_id: &str) {
170 if parent_id == child_id {
171 return;
172 }
173 open_or_create(Some(parent_id.to_string()));
174 open_or_create(Some(child_id.to_string()));
175 SESSIONS.with(|s| {
176 let mut map = s.borrow_mut();
177 if let Some(parent) = map.get_mut(parent_id) {
178 parent.last_accessed = Instant::now();
179 if !parent.child_ids.iter().any(|id| id == child_id) {
180 parent.child_ids.push(child_id.to_string());
181 }
182 }
183 if let Some(child) = map.get_mut(child_id) {
184 child.last_accessed = Instant::now();
185 child.parent_id = Some(parent_id.to_string());
186 child.transcript = clone_transcript_with_parent(&child.transcript, parent_id);
187 }
188 });
189}
190
191pub fn parent_id(id: &str) -> Option<String> {
192 SESSIONS.with(|s| s.borrow().get(id).and_then(|state| state.parent_id.clone()))
193}
194
195pub fn child_ids(id: &str) -> Vec<String> {
196 SESSIONS.with(|s| {
197 s.borrow()
198 .get(id)
199 .map(|state| state.child_ids.clone())
200 .unwrap_or_default()
201 })
202}
203
204fn try_register_jsonl_event_log(session_id: &str) {
209 let Ok(dir) = std::env::var("HARN_EVENT_LOG_DIR") else {
210 return;
211 };
212 if dir.is_empty() {
213 return;
214 }
215 let path = std::path::PathBuf::from(dir).join(format!("event_log-{session_id}.jsonl"));
216 if let Ok(sink) = crate::agent_events::JsonlEventSink::open(&path) {
217 crate::agent_events::register_sink(session_id, sink);
218 }
219}
220
221pub fn close(id: &str) {
222 SESSIONS.with(|s| {
223 s.borrow_mut().remove(id);
224 });
225}
226
227pub fn reset_transcript(id: &str) -> bool {
228 SESSIONS.with(|s| {
229 let mut map = s.borrow_mut();
230 let Some(state) = map.get_mut(id) else {
231 return false;
232 };
233 state.transcript = empty_transcript(id);
234 state.last_accessed = Instant::now();
235 true
236 })
237}
238
239pub fn fork(src_id: &str, dst_id: Option<String>) -> Option<String> {
246 let (src_transcript, dst) = SESSIONS.with(|s| {
247 let mut map = s.borrow_mut();
248 let src = map.get_mut(src_id)?;
249 src.last_accessed = Instant::now();
250 let dst = dst_id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
251 let forked_transcript = clone_transcript_with_id(&src.transcript, &dst);
252 Some((forked_transcript, dst))
253 })?;
254 open_or_create(Some(dst.clone()));
256 SESSIONS.with(|s| {
257 if let Some(state) = s.borrow_mut().get_mut(&dst) {
258 state.transcript = src_transcript;
259 state.last_accessed = Instant::now();
260 }
261 });
262 if exists(&dst) {
266 Some(dst)
267 } else {
268 None
269 }
270}
271
272pub fn fork_at(src_id: &str, keep_first: usize, dst_id: Option<String>) -> Option<String> {
283 let new_id = fork(src_id, dst_id)?;
284 retain_first(&new_id, keep_first);
285 Some(new_id)
286}
287
288fn retain_first(id: &str, keep_first: usize) {
292 SESSIONS.with(|s| {
293 let mut map = s.borrow_mut();
294 let Some(state) = map.get_mut(id) else {
295 return;
296 };
297 let Some(dict) = state.transcript.as_dict() else {
298 return;
299 };
300 let dict = dict.clone();
301 let messages: Vec<VmValue> = match dict.get("messages") {
302 Some(VmValue::List(list)) => list.iter().cloned().collect(),
303 _ => Vec::new(),
304 };
305 let retained: Vec<VmValue> = messages.into_iter().take(keep_first).collect();
306 let mut next = dict;
307 next.insert(
308 "events".to_string(),
309 VmValue::List(Rc::new(
310 crate::llm::helpers::transcript_events_from_messages(&retained),
311 )),
312 );
313 next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
314 state.transcript = VmValue::Dict(Rc::new(next));
315 state.last_accessed = Instant::now();
316 });
317}
318
319pub fn trim(id: &str, keep_last: usize) -> Option<usize> {
322 SESSIONS.with(|s| {
323 let mut map = s.borrow_mut();
324 let state = map.get_mut(id)?;
325 let dict = state.transcript.as_dict()?.clone();
326 let messages: Vec<VmValue> = match dict.get("messages") {
327 Some(VmValue::List(list)) => list.iter().cloned().collect(),
328 _ => Vec::new(),
329 };
330 let start = messages.len().saturating_sub(keep_last);
331 let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
332 let kept = retained.len();
333 let mut next = dict;
334 next.insert(
335 "events".to_string(),
336 VmValue::List(Rc::new(
337 crate::llm::helpers::transcript_events_from_messages(&retained),
338 )),
339 );
340 next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
341 state.transcript = VmValue::Dict(Rc::new(next));
342 state.last_accessed = Instant::now();
343 Some(kept)
344 })
345}
346
347pub fn inject_message(id: &str, message: VmValue) -> Result<(), String> {
350 let Some(msg_dict) = message.as_dict().cloned() else {
351 return Err("agent_session_inject: message must be a dict".into());
352 };
353 let role_ok = matches!(msg_dict.get("role"), Some(VmValue::String(_)));
354 if !role_ok {
355 return Err(
356 "agent_session_inject: message must have a string `role` (user|assistant|tool_result|system)"
357 .into(),
358 );
359 }
360 SESSIONS.with(|s| {
361 let mut map = s.borrow_mut();
362 let Some(state) = map.get_mut(id) else {
363 return Err(format!("agent_session_inject: unknown session id '{id}'"));
364 };
365 let dict = state
366 .transcript
367 .as_dict()
368 .cloned()
369 .unwrap_or_else(BTreeMap::new);
370 let mut messages: Vec<VmValue> = match dict.get("messages") {
371 Some(VmValue::List(list)) => list.iter().cloned().collect(),
372 _ => Vec::new(),
373 };
374 messages.push(VmValue::Dict(Rc::new(msg_dict)));
375 let mut next = dict;
376 next.insert(
377 "events".to_string(),
378 VmValue::List(Rc::new(
379 crate::llm::helpers::transcript_events_from_messages(&messages),
380 )),
381 );
382 next.insert("messages".to_string(), VmValue::List(Rc::new(messages)));
383 state.transcript = VmValue::Dict(Rc::new(next));
384 state.last_accessed = Instant::now();
385 Ok(())
386 })
387}
388
389pub fn messages_json(id: &str) -> Vec<serde_json::Value> {
393 SESSIONS.with(|s| {
394 let map = s.borrow();
395 let Some(state) = map.get(id) else {
396 return Vec::new();
397 };
398 let Some(dict) = state.transcript.as_dict() else {
399 return Vec::new();
400 };
401 match dict.get("messages") {
402 Some(VmValue::List(list)) => list
403 .iter()
404 .map(crate::llm::helpers::vm_value_to_json)
405 .collect(),
406 _ => Vec::new(),
407 }
408 })
409}
410
411#[derive(Clone, Debug, Default)]
412pub struct SessionPromptState {
413 pub messages: Vec<serde_json::Value>,
414 pub summary: Option<String>,
415}
416
417fn summary_message_json(summary: &str) -> serde_json::Value {
418 serde_json::json!({
419 "role": "user",
420 "content": summary,
421 })
422}
423
424fn messages_begin_with_summary(messages: &[serde_json::Value], summary: &str) -> bool {
425 messages.first().is_some_and(|message| {
426 message.get("role").and_then(|value| value.as_str()) == Some("user")
427 && message.get("content").and_then(|value| value.as_str()) == Some(summary)
428 })
429}
430
431pub fn prompt_state_json(id: &str) -> SessionPromptState {
439 SESSIONS.with(|s| {
440 let map = s.borrow();
441 let Some(state) = map.get(id) else {
442 return SessionPromptState::default();
443 };
444 let Some(dict) = state.transcript.as_dict() else {
445 return SessionPromptState::default();
446 };
447 let mut messages = match dict.get("messages") {
448 Some(VmValue::List(list)) => list
449 .iter()
450 .map(crate::llm::helpers::vm_value_to_json)
451 .collect::<Vec<_>>(),
452 _ => Vec::new(),
453 };
454 let summary = dict.get("summary").and_then(|value| match value {
455 VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
456 _ => None,
457 });
458 if let Some(summary_text) = summary.as_deref() {
459 if !messages_begin_with_summary(&messages, summary_text) {
460 messages.insert(0, summary_message_json(summary_text));
461 }
462 }
463 SessionPromptState { messages, summary }
464 })
465}
466
467pub fn store_transcript(id: &str, transcript: VmValue) {
470 SESSIONS.with(|s| {
471 if let Some(state) = s.borrow_mut().get_mut(id) {
472 state.transcript = transcript;
473 state.last_accessed = Instant::now();
474 }
475 });
476}
477
478pub fn append_event(id: &str, event: VmValue) -> Result<(), String> {
484 let Some(event_dict) = event.as_dict() else {
485 return Err("agent_session_append_event: event must be a dict".into());
486 };
487 let kind_ok = matches!(event_dict.get("kind"), Some(VmValue::String(_)));
488 if !kind_ok {
489 return Err("agent_session_append_event: event must have a string `kind`".into());
490 }
491 SESSIONS.with(|s| {
492 let mut map = s.borrow_mut();
493 let Some(state) = map.get_mut(id) else {
494 return Err(format!(
495 "agent_session_append_event: unknown session id '{id}'"
496 ));
497 };
498 let dict = state
499 .transcript
500 .as_dict()
501 .cloned()
502 .unwrap_or_else(BTreeMap::new);
503 let mut events: Vec<VmValue> = match dict.get("events") {
504 Some(VmValue::List(list)) => list.iter().cloned().collect(),
505 _ => dict
506 .get("messages")
507 .and_then(|value| match value {
508 VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
509 _ => None,
510 })
511 .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
512 .unwrap_or_default(),
513 };
514 events.push(event);
515 let mut next = dict;
516 next.insert("events".to_string(), VmValue::List(Rc::new(events)));
517 state.transcript = VmValue::Dict(Rc::new(next));
518 state.last_accessed = Instant::now();
519 Ok(())
520 })
521}
522
523pub fn replace_messages(id: &str, messages: &[serde_json::Value]) {
526 SESSIONS.with(|s| {
527 let mut map = s.borrow_mut();
528 let Some(state) = map.get_mut(id) else {
529 return;
530 };
531 let dict = state
532 .transcript
533 .as_dict()
534 .cloned()
535 .unwrap_or_else(BTreeMap::new);
536 let vm_messages: Vec<VmValue> = messages
537 .iter()
538 .map(crate::stdlib::json_to_vm_value)
539 .collect();
540 let mut next = dict;
541 next.insert(
542 "events".to_string(),
543 VmValue::List(Rc::new(
544 crate::llm::helpers::transcript_events_from_messages(&vm_messages),
545 )),
546 );
547 next.insert("messages".to_string(), VmValue::List(Rc::new(vm_messages)));
548 state.transcript = VmValue::Dict(Rc::new(next));
549 state.last_accessed = Instant::now();
550 });
551}
552
553pub fn append_subscriber(id: &str, callback: VmValue) {
554 open_or_create(Some(id.to_string()));
555 SESSIONS.with(|s| {
556 if let Some(state) = s.borrow_mut().get_mut(id) {
557 state.subscribers.push(callback);
558 state.last_accessed = Instant::now();
559 }
560 });
561}
562
563pub fn subscribers_for(id: &str) -> Vec<VmValue> {
564 SESSIONS.with(|s| {
565 s.borrow()
566 .get(id)
567 .map(|state| state.subscribers.clone())
568 .unwrap_or_default()
569 })
570}
571
572pub fn subscriber_count(id: &str) -> usize {
573 SESSIONS.with(|s| {
574 s.borrow()
575 .get(id)
576 .map(|state| state.subscribers.len())
577 .unwrap_or(0)
578 })
579}
580
581pub fn set_active_skills(id: &str, skills: Vec<String>) {
585 SESSIONS.with(|s| {
586 if let Some(state) = s.borrow_mut().get_mut(id) {
587 state.active_skills = skills;
588 state.last_accessed = Instant::now();
589 }
590 });
591}
592
593pub fn active_skills(id: &str) -> Vec<String> {
597 SESSIONS.with(|s| {
598 s.borrow()
599 .get(id)
600 .map(|state| state.active_skills.clone())
601 .unwrap_or_default()
602 })
603}
604
605fn empty_transcript(id: &str) -> VmValue {
606 use crate::llm::helpers::new_transcript_with;
607 new_transcript_with(Some(id.to_string()), Vec::new(), None, None)
608}
609
610fn clone_transcript_with_id(transcript: &VmValue, new_id: &str) -> VmValue {
611 let Some(dict) = transcript.as_dict() else {
612 return empty_transcript(new_id);
613 };
614 let mut next = dict.clone();
615 next.insert(
616 "id".to_string(),
617 VmValue::String(Rc::from(new_id.to_string())),
618 );
619 VmValue::Dict(Rc::new(next))
620}
621
622fn clone_transcript_with_parent(transcript: &VmValue, parent_id: &str) -> VmValue {
623 let Some(dict) = transcript.as_dict() else {
624 return transcript.clone();
625 };
626 let mut next = dict.clone();
627 let metadata = match next.get("metadata") {
628 Some(VmValue::Dict(metadata)) => {
629 let mut metadata = metadata.as_ref().clone();
630 metadata.insert(
631 "parent_session_id".to_string(),
632 VmValue::String(Rc::from(parent_id.to_string())),
633 );
634 VmValue::Dict(Rc::new(metadata))
635 }
636 _ => VmValue::Dict(Rc::new(BTreeMap::from([(
637 "parent_session_id".to_string(),
638 VmValue::String(Rc::from(parent_id.to_string())),
639 )]))),
640 };
641 next.insert("metadata".to_string(), metadata);
642 VmValue::Dict(Rc::new(next))
643}
644
645#[cfg(test)]
646mod tests {
647 use super::*;
648 use std::collections::BTreeMap;
649
650 fn make_msg(role: &str, content: &str) -> VmValue {
651 let mut m: BTreeMap<String, VmValue> = BTreeMap::new();
652 m.insert("role".to_string(), VmValue::String(Rc::from(role)));
653 m.insert("content".to_string(), VmValue::String(Rc::from(content)));
654 VmValue::Dict(Rc::new(m))
655 }
656
657 fn message_count(id: &str) -> usize {
658 SESSIONS.with(|s| {
659 let map = s.borrow();
660 let Some(state) = map.get(id) else { return 0 };
661 let Some(dict) = state.transcript.as_dict() else {
662 return 0;
663 };
664 match dict.get("messages") {
665 Some(VmValue::List(list)) => list.len(),
666 _ => 0,
667 }
668 })
669 }
670
671 #[test]
672 fn fork_at_truncates_destination_to_keep_first() {
673 reset_session_store();
674 let src = open_or_create(Some("src-fork-at".into()));
675 inject_message(&src, make_msg("user", "a")).unwrap();
676 inject_message(&src, make_msg("assistant", "b")).unwrap();
677 inject_message(&src, make_msg("user", "c")).unwrap();
678 inject_message(&src, make_msg("assistant", "d")).unwrap();
679 assert_eq!(message_count(&src), 4);
680
681 let dst = fork_at(&src, 2, Some("dst-fork-at".into())).expect("fork_at");
682 assert_ne!(dst, src);
683 assert_eq!(message_count(&dst), 2, "branched at message index 2");
684 assert_eq!(message_count(&src), 4);
686 assert_eq!(subscriber_count(&dst), 0);
688 reset_session_store();
689 }
690
691 #[test]
692 fn fork_at_on_unknown_source_returns_none() {
693 reset_session_store();
694 assert!(fork_at("does-not-exist", 3, None).is_none());
695 }
696
697 #[test]
698 fn child_sessions_record_parent_lineage() {
699 reset_session_store();
700 let parent = open_or_create(Some("parent-session".into()));
701 let child = open_child_session(&parent, Some("child-session".into()));
702 assert_eq!(parent_id(&child).as_deref(), Some("parent-session"));
703 assert_eq!(child_ids(&parent), vec!["child-session".to_string()]);
704
705 let transcript = snapshot(&child).expect("child transcript");
706 let metadata = transcript
707 .as_dict()
708 .and_then(|dict| dict.get("metadata"))
709 .and_then(|value| value.as_dict())
710 .expect("child metadata");
711 assert!(matches!(
712 metadata.get("parent_session_id"),
713 Some(VmValue::String(value)) if value.as_ref() == "parent-session"
714 ));
715 }
716
717 #[test]
718 fn child_session_forks_parent_transcript() {
719 reset_session_store();
720 let parent = open_or_create(Some("parent-fork-parent".into()));
721 inject_message(&parent, make_msg("user", "parent context")).unwrap();
722
723 let child = open_child_session(&parent, Some("parent-fork-child".into()));
724 assert_eq!(message_count(&parent), 1);
725 assert_eq!(message_count(&child), 1);
726
727 let child_messages = messages_json(&child);
728 assert_eq!(
729 child_messages[0]["content"].as_str(),
730 Some("parent context"),
731 );
732 }
733
734 #[test]
735 fn prompt_state_prepends_summary_message_when_missing_from_messages() {
736 reset_session_store();
737 let session = open_or_create(Some("prompt-state-summary".into()));
738 let transcript = crate::llm::helpers::new_transcript_with_events(
739 Some(session.clone()),
740 vec![make_msg("assistant", "latest answer")],
741 Some("[auto-compacted 2 older messages]\nsummary".to_string()),
742 None,
743 Vec::new(),
744 Vec::new(),
745 Some("active"),
746 );
747 store_transcript(&session, transcript);
748
749 let prompt = prompt_state_json(&session);
750 assert_eq!(
751 prompt.summary.as_deref(),
752 Some("[auto-compacted 2 older messages]\nsummary")
753 );
754 assert_eq!(prompt.messages.len(), 2);
755 assert_eq!(prompt.messages[0]["role"].as_str(), Some("user"));
756 assert_eq!(
757 prompt.messages[0]["content"].as_str(),
758 Some("[auto-compacted 2 older messages]\nsummary"),
759 );
760 assert_eq!(prompt.messages[1]["role"].as_str(), Some("assistant"));
761 }
762}