1use std::cell::{Cell, RefCell};
22use std::collections::{BTreeMap, HashMap, HashSet};
23use std::rc::Rc;
24use std::time::Instant;
25
26use crate::value::VmValue;
27
28pub const DEFAULT_SESSION_CAP: usize = 128;
31
32pub struct SessionState {
33 pub id: String,
34 pub transcript: VmValue,
35 pub subscribers: Vec<VmValue>,
36 pub created_at: Instant,
37 pub last_accessed: Instant,
38 pub parent_id: Option<String>,
39 pub child_ids: Vec<String>,
40 pub branched_at_event_index: Option<usize>,
41 pub active_skills: Vec<String>,
47}
48
49impl SessionState {
50 fn new(id: String) -> Self {
51 let now = Instant::now();
52 let transcript = empty_transcript(&id);
53 Self {
54 id,
55 transcript,
56 subscribers: Vec::new(),
57 created_at: now,
58 last_accessed: now,
59 parent_id: None,
60 child_ids: Vec::new(),
61 branched_at_event_index: None,
62 active_skills: Vec::new(),
63 }
64 }
65}
66
67#[derive(Clone, Debug, PartialEq, Eq)]
68pub struct SessionAncestry {
69 pub parent_id: Option<String>,
70 pub child_ids: Vec<String>,
71 pub root_id: String,
72}
73
74thread_local! {
75 static SESSIONS: RefCell<HashMap<String, SessionState>> = RefCell::new(HashMap::new());
76 static SESSION_CAP: Cell<usize> = const { Cell::new(DEFAULT_SESSION_CAP) };
77 static CURRENT_SESSION_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
78}
79
80pub struct CurrentSessionGuard {
81 active: bool,
82}
83
84impl Drop for CurrentSessionGuard {
85 fn drop(&mut self) {
86 if self.active {
87 pop_current_session();
88 }
89 }
90}
91
92pub fn set_session_cap(cap: usize) {
95 SESSION_CAP.with(|c| c.set(cap.max(1)));
96}
97
98pub fn session_cap() -> usize {
99 SESSION_CAP.with(|c| c.get())
100}
101
102pub fn reset_session_store() {
104 SESSIONS.with(|s| s.borrow_mut().clear());
105 CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().clear());
106}
107
108pub(crate) fn push_current_session(id: String) {
109 if id.is_empty() {
110 return;
111 }
112 CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().push(id));
113}
114
115pub(crate) fn pop_current_session() {
116 CURRENT_SESSION_STACK.with(|stack| {
117 let _ = stack.borrow_mut().pop();
118 });
119}
120
121pub fn current_session_id() -> Option<String> {
122 CURRENT_SESSION_STACK.with(|stack| stack.borrow().last().cloned())
123}
124
125pub fn enter_current_session(id: impl Into<String>) -> CurrentSessionGuard {
126 let id = id.into();
127 if id.trim().is_empty() {
128 return CurrentSessionGuard { active: false };
129 }
130 push_current_session(id);
131 CurrentSessionGuard { active: true }
132}
133
134pub fn exists(id: &str) -> bool {
135 SESSIONS.with(|s| s.borrow().contains_key(id))
136}
137
138pub fn length(id: &str) -> Option<usize> {
139 SESSIONS.with(|s| {
140 s.borrow().get(id).map(|state| {
141 state
142 .transcript
143 .as_dict()
144 .and_then(|d| d.get("messages"))
145 .and_then(|v| match v {
146 VmValue::List(list) => Some(list.len()),
147 _ => None,
148 })
149 .unwrap_or(0)
150 })
151 })
152}
153
154pub fn snapshot(id: &str) -> Option<VmValue> {
155 SESSIONS.with(|s| s.borrow().get(id).map(session_snapshot))
156}
157
158pub fn open_or_create(id: Option<String>) -> String {
167 let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
168 let mut was_new = false;
169 SESSIONS.with(|s| {
170 let mut map = s.borrow_mut();
171 if let Some(state) = map.get_mut(&resolved) {
172 state.last_accessed = Instant::now();
173 return;
174 }
175 was_new = true;
176 let cap = SESSION_CAP.with(|c| c.get());
177 if map.len() >= cap {
178 if let Some(victim) = map
179 .iter()
180 .min_by_key(|(_, state)| state.last_accessed)
181 .map(|(id, _)| id.clone())
182 {
183 map.remove(&victim);
184 }
185 }
186 map.insert(resolved.clone(), SessionState::new(resolved.clone()));
187 });
188 if was_new {
189 try_register_event_log(&resolved);
190 }
191 resolved
192}
193
194pub fn open_child_session(parent_id: &str, id: Option<String>) -> String {
195 let resolved = fork(parent_id, id.clone()).unwrap_or_else(|| open_or_create(id));
196 link_child_session(parent_id, &resolved);
197 resolved
198}
199
200pub fn link_child_session(parent_id: &str, child_id: &str) {
201 link_child_session_with_branch(parent_id, child_id, None);
202}
203
204pub fn link_child_session_with_branch(
205 parent_id: &str,
206 child_id: &str,
207 branched_at_event_index: Option<usize>,
208) {
209 if parent_id == child_id {
210 return;
211 }
212 open_or_create(Some(parent_id.to_string()));
213 open_or_create(Some(child_id.to_string()));
214 SESSIONS.with(|s| {
215 let mut map = s.borrow_mut();
216 update_lineage(&mut map, parent_id, child_id, branched_at_event_index);
217 });
218}
219
220pub fn parent_id(id: &str) -> Option<String> {
221 SESSIONS.with(|s| s.borrow().get(id).and_then(|state| state.parent_id.clone()))
222}
223
224pub fn child_ids(id: &str) -> Vec<String> {
225 SESSIONS.with(|s| {
226 s.borrow()
227 .get(id)
228 .map(|state| state.child_ids.clone())
229 .unwrap_or_default()
230 })
231}
232
233pub fn ancestry(id: &str) -> Option<SessionAncestry> {
234 SESSIONS.with(|s| {
235 let map = s.borrow();
236 let state = map.get(id)?;
237 let mut root_id = state.id.clone();
238 let mut cursor = state.parent_id.clone();
239 let mut seen = HashSet::from([state.id.clone()]);
240 while let Some(parent_id) = cursor {
241 if !seen.insert(parent_id.clone()) {
242 break;
243 }
244 root_id = parent_id.clone();
245 cursor = map
246 .get(&parent_id)
247 .and_then(|parent| parent.parent_id.clone());
248 }
249 Some(SessionAncestry {
250 parent_id: state.parent_id.clone(),
251 child_ids: state.child_ids.clone(),
252 root_id,
253 })
254 })
255}
256
257fn try_register_event_log(session_id: &str) {
261 if let Some(log) = crate::event_log::active_event_log() {
262 crate::agent_events::register_sink(
263 session_id,
264 crate::agent_events::EventLogSink::new(log, session_id),
265 );
266 return;
267 }
268 let Ok(dir) = std::env::var("HARN_EVENT_LOG_DIR") else {
269 return;
270 };
271 if dir.is_empty() {
272 return;
273 }
274 let path = std::path::PathBuf::from(dir).join(format!("event_log-{session_id}.jsonl"));
275 if let Ok(sink) = crate::agent_events::JsonlEventSink::open(&path) {
276 crate::agent_events::register_sink(session_id, sink);
277 }
278}
279
280pub fn close(id: &str) {
281 SESSIONS.with(|s| {
282 s.borrow_mut().remove(id);
283 });
284}
285
286pub fn reset_transcript(id: &str) -> bool {
287 SESSIONS.with(|s| {
288 let mut map = s.borrow_mut();
289 let Some(state) = map.get_mut(id) else {
290 return false;
291 };
292 state.transcript = empty_transcript(id);
293 state.last_accessed = Instant::now();
294 true
295 })
296}
297
298pub fn fork(src_id: &str, dst_id: Option<String>) -> Option<String> {
305 let (src_transcript, dst) = SESSIONS.with(|s| {
306 let mut map = s.borrow_mut();
307 let src = map.get_mut(src_id)?;
308 src.last_accessed = Instant::now();
309 let dst = dst_id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
310 let forked_transcript = clone_transcript_with_id(&src.transcript, &dst);
311 Some((forked_transcript, dst))
312 })?;
313 open_or_create(Some(dst.clone()));
315 SESSIONS.with(|s| {
316 let mut map = s.borrow_mut();
317 if let Some(state) = map.get_mut(&dst) {
318 state.transcript = src_transcript;
319 state.last_accessed = Instant::now();
320 }
321 update_lineage(&mut map, src_id, &dst, None);
322 });
323 if exists(&dst) {
327 Some(dst)
328 } else {
329 None
330 }
331}
332
333pub fn fork_at(src_id: &str, keep_first: usize, dst_id: Option<String>) -> Option<String> {
344 let branched_at_event_index = SESSIONS.with(|s| {
345 let map = s.borrow();
346 let src = map.get(src_id)?;
347 Some(branch_event_index(&src.transcript, keep_first))
348 })?;
349 let new_id = fork(src_id, dst_id)?;
350 link_child_session_with_branch(src_id, &new_id, Some(branched_at_event_index));
351 retain_first(&new_id, keep_first);
352 Some(new_id)
353}
354
355fn retain_first(id: &str, keep_first: usize) {
359 SESSIONS.with(|s| {
360 let mut map = s.borrow_mut();
361 let Some(state) = map.get_mut(id) else {
362 return;
363 };
364 let Some(dict) = state.transcript.as_dict() else {
365 return;
366 };
367 let dict = dict.clone();
368 let messages: Vec<VmValue> = match dict.get("messages") {
369 Some(VmValue::List(list)) => list.iter().cloned().collect(),
370 _ => Vec::new(),
371 };
372 let retained: Vec<VmValue> = messages.into_iter().take(keep_first).collect();
373 let mut next = dict;
374 next.insert(
375 "events".to_string(),
376 VmValue::List(Rc::new(
377 crate::llm::helpers::transcript_events_from_messages(&retained),
378 )),
379 );
380 next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
381 state.transcript = VmValue::Dict(Rc::new(next));
382 state.last_accessed = Instant::now();
383 });
384}
385
386pub fn trim(id: &str, keep_last: usize) -> Option<usize> {
389 SESSIONS.with(|s| {
390 let mut map = s.borrow_mut();
391 let state = map.get_mut(id)?;
392 let dict = state.transcript.as_dict()?.clone();
393 let messages: Vec<VmValue> = match dict.get("messages") {
394 Some(VmValue::List(list)) => list.iter().cloned().collect(),
395 _ => Vec::new(),
396 };
397 let start = messages.len().saturating_sub(keep_last);
398 let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
399 let kept = retained.len();
400 let mut next = dict;
401 next.insert(
402 "events".to_string(),
403 VmValue::List(Rc::new(
404 crate::llm::helpers::transcript_events_from_messages(&retained),
405 )),
406 );
407 next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
408 state.transcript = VmValue::Dict(Rc::new(next));
409 state.last_accessed = Instant::now();
410 Some(kept)
411 })
412}
413
414pub fn inject_message(id: &str, message: VmValue) -> Result<(), String> {
417 let Some(msg_dict) = message.as_dict().cloned() else {
418 return Err("agent_session_inject: message must be a dict".into());
419 };
420 let role_ok = matches!(msg_dict.get("role"), Some(VmValue::String(_)));
421 if !role_ok {
422 return Err(
423 "agent_session_inject: message must have a string `role` (user|assistant|tool_result|system)"
424 .into(),
425 );
426 }
427 SESSIONS.with(|s| {
428 let mut map = s.borrow_mut();
429 let Some(state) = map.get_mut(id) else {
430 return Err(format!("agent_session_inject: unknown session id '{id}'"));
431 };
432 let dict = state
433 .transcript
434 .as_dict()
435 .cloned()
436 .unwrap_or_else(BTreeMap::new);
437 let mut messages: Vec<VmValue> = match dict.get("messages") {
438 Some(VmValue::List(list)) => list.iter().cloned().collect(),
439 _ => Vec::new(),
440 };
441 messages.push(VmValue::Dict(Rc::new(msg_dict)));
442 let mut next = dict;
443 next.insert(
444 "events".to_string(),
445 VmValue::List(Rc::new(
446 crate::llm::helpers::transcript_events_from_messages(&messages),
447 )),
448 );
449 next.insert("messages".to_string(), VmValue::List(Rc::new(messages)));
450 state.transcript = VmValue::Dict(Rc::new(next));
451 state.last_accessed = Instant::now();
452 Ok(())
453 })
454}
455
456pub fn messages_json(id: &str) -> Vec<serde_json::Value> {
460 SESSIONS.with(|s| {
461 let map = s.borrow();
462 let Some(state) = map.get(id) else {
463 return Vec::new();
464 };
465 let Some(dict) = state.transcript.as_dict() else {
466 return Vec::new();
467 };
468 match dict.get("messages") {
469 Some(VmValue::List(list)) => list
470 .iter()
471 .map(crate::llm::helpers::vm_value_to_json)
472 .collect(),
473 _ => Vec::new(),
474 }
475 })
476}
477
478#[derive(Clone, Debug, Default)]
479pub struct SessionPromptState {
480 pub messages: Vec<serde_json::Value>,
481 pub summary: Option<String>,
482}
483
484fn summary_message_json(summary: &str) -> serde_json::Value {
485 serde_json::json!({
486 "role": "user",
487 "content": summary,
488 })
489}
490
491fn messages_begin_with_summary(messages: &[serde_json::Value], summary: &str) -> bool {
492 messages.first().is_some_and(|message| {
493 message.get("role").and_then(|value| value.as_str()) == Some("user")
494 && message.get("content").and_then(|value| value.as_str()) == Some(summary)
495 })
496}
497
498pub fn prompt_state_json(id: &str) -> SessionPromptState {
506 SESSIONS.with(|s| {
507 let map = s.borrow();
508 let Some(state) = map.get(id) else {
509 return SessionPromptState::default();
510 };
511 let Some(dict) = state.transcript.as_dict() else {
512 return SessionPromptState::default();
513 };
514 let mut messages = match dict.get("messages") {
515 Some(VmValue::List(list)) => list
516 .iter()
517 .map(crate::llm::helpers::vm_value_to_json)
518 .collect::<Vec<_>>(),
519 _ => Vec::new(),
520 };
521 let summary = dict.get("summary").and_then(|value| match value {
522 VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
523 _ => None,
524 });
525 if let Some(summary_text) = summary.as_deref() {
526 if !messages_begin_with_summary(&messages, summary_text) {
527 messages.insert(0, summary_message_json(summary_text));
528 }
529 }
530 SessionPromptState { messages, summary }
531 })
532}
533
534pub fn store_transcript(id: &str, transcript: VmValue) {
537 SESSIONS.with(|s| {
538 if let Some(state) = s.borrow_mut().get_mut(id) {
539 state.transcript = transcript;
540 state.last_accessed = Instant::now();
541 }
542 });
543}
544
545pub fn append_event(id: &str, event: VmValue) -> Result<(), String> {
551 let Some(event_dict) = event.as_dict() else {
552 return Err("agent_session_append_event: event must be a dict".into());
553 };
554 let kind_ok = matches!(event_dict.get("kind"), Some(VmValue::String(_)));
555 if !kind_ok {
556 return Err("agent_session_append_event: event must have a string `kind`".into());
557 }
558 SESSIONS.with(|s| {
559 let mut map = s.borrow_mut();
560 let Some(state) = map.get_mut(id) else {
561 return Err(format!(
562 "agent_session_append_event: unknown session id '{id}'"
563 ));
564 };
565 let dict = state
566 .transcript
567 .as_dict()
568 .cloned()
569 .unwrap_or_else(BTreeMap::new);
570 let mut events: Vec<VmValue> = match dict.get("events") {
571 Some(VmValue::List(list)) => list.iter().cloned().collect(),
572 _ => dict
573 .get("messages")
574 .and_then(|value| match value {
575 VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
576 _ => None,
577 })
578 .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
579 .unwrap_or_default(),
580 };
581 events.push(event);
582 let mut next = dict;
583 next.insert("events".to_string(), VmValue::List(Rc::new(events)));
584 state.transcript = VmValue::Dict(Rc::new(next));
585 state.last_accessed = Instant::now();
586 Ok(())
587 })
588}
589
590pub fn replace_messages(id: &str, messages: &[serde_json::Value]) {
593 SESSIONS.with(|s| {
594 let mut map = s.borrow_mut();
595 let Some(state) = map.get_mut(id) else {
596 return;
597 };
598 let dict = state
599 .transcript
600 .as_dict()
601 .cloned()
602 .unwrap_or_else(BTreeMap::new);
603 let vm_messages: Vec<VmValue> = messages
604 .iter()
605 .map(crate::stdlib::json_to_vm_value)
606 .collect();
607 let mut next = dict;
608 next.insert(
609 "events".to_string(),
610 VmValue::List(Rc::new(
611 crate::llm::helpers::transcript_events_from_messages(&vm_messages),
612 )),
613 );
614 next.insert("messages".to_string(), VmValue::List(Rc::new(vm_messages)));
615 state.transcript = VmValue::Dict(Rc::new(next));
616 state.last_accessed = Instant::now();
617 });
618}
619
620pub fn append_subscriber(id: &str, callback: VmValue) {
621 open_or_create(Some(id.to_string()));
622 SESSIONS.with(|s| {
623 if let Some(state) = s.borrow_mut().get_mut(id) {
624 state.subscribers.push(callback);
625 state.last_accessed = Instant::now();
626 }
627 });
628}
629
630pub fn subscribers_for(id: &str) -> Vec<VmValue> {
631 SESSIONS.with(|s| {
632 s.borrow()
633 .get(id)
634 .map(|state| state.subscribers.clone())
635 .unwrap_or_default()
636 })
637}
638
639pub fn subscriber_count(id: &str) -> usize {
640 SESSIONS.with(|s| {
641 s.borrow()
642 .get(id)
643 .map(|state| state.subscribers.len())
644 .unwrap_or(0)
645 })
646}
647
648pub fn set_active_skills(id: &str, skills: Vec<String>) {
652 SESSIONS.with(|s| {
653 if let Some(state) = s.borrow_mut().get_mut(id) {
654 state.active_skills = skills;
655 state.last_accessed = Instant::now();
656 }
657 });
658}
659
660pub fn active_skills(id: &str) -> Vec<String> {
664 SESSIONS.with(|s| {
665 s.borrow()
666 .get(id)
667 .map(|state| state.active_skills.clone())
668 .unwrap_or_default()
669 })
670}
671
672fn empty_transcript(id: &str) -> VmValue {
673 use crate::llm::helpers::new_transcript_with;
674 new_transcript_with(Some(id.to_string()), Vec::new(), None, None)
675}
676
677fn clone_transcript_with_id(transcript: &VmValue, new_id: &str) -> VmValue {
678 let Some(dict) = transcript.as_dict() else {
679 return empty_transcript(new_id);
680 };
681 let mut next = dict.clone();
682 next.insert(
683 "id".to_string(),
684 VmValue::String(Rc::from(new_id.to_string())),
685 );
686 VmValue::Dict(Rc::new(next))
687}
688
689fn clone_transcript_with_parent(transcript: &VmValue, parent_id: &str) -> VmValue {
690 let Some(dict) = transcript.as_dict() else {
691 return transcript.clone();
692 };
693 let mut next = dict.clone();
694 let metadata = match next.get("metadata") {
695 Some(VmValue::Dict(metadata)) => {
696 let mut metadata = metadata.as_ref().clone();
697 metadata.insert(
698 "parent_session_id".to_string(),
699 VmValue::String(Rc::from(parent_id.to_string())),
700 );
701 VmValue::Dict(Rc::new(metadata))
702 }
703 _ => VmValue::Dict(Rc::new(BTreeMap::from([(
704 "parent_session_id".to_string(),
705 VmValue::String(Rc::from(parent_id.to_string())),
706 )]))),
707 };
708 next.insert("metadata".to_string(), metadata);
709 VmValue::Dict(Rc::new(next))
710}
711
712fn session_snapshot(state: &SessionState) -> VmValue {
713 let Some(dict) = state.transcript.as_dict() else {
714 return state.transcript.clone();
715 };
716 let mut next = dict.clone();
717 next.insert(
718 "parent_id".to_string(),
719 state
720 .parent_id
721 .as_ref()
722 .map(|id| VmValue::String(Rc::from(id.clone())))
723 .unwrap_or(VmValue::Nil),
724 );
725 next.insert(
726 "child_ids".to_string(),
727 VmValue::List(Rc::new(
728 state
729 .child_ids
730 .iter()
731 .cloned()
732 .map(|id| VmValue::String(Rc::from(id)))
733 .collect(),
734 )),
735 );
736 next.insert(
737 "branched_at_event_index".to_string(),
738 state
739 .branched_at_event_index
740 .map(|index| VmValue::Int(index as i64))
741 .unwrap_or(VmValue::Nil),
742 );
743 VmValue::Dict(Rc::new(next))
744}
745
746fn update_lineage(
747 map: &mut HashMap<String, SessionState>,
748 parent_id: &str,
749 child_id: &str,
750 branched_at_event_index: Option<usize>,
751) {
752 let old_parent_id = map.get(child_id).and_then(|child| child.parent_id.clone());
753 if let Some(old_parent_id) = old_parent_id.filter(|old_parent_id| old_parent_id != parent_id) {
754 if let Some(old_parent) = map.get_mut(&old_parent_id) {
755 old_parent.child_ids.retain(|id| id != child_id);
756 old_parent.last_accessed = Instant::now();
757 }
758 }
759 if let Some(parent) = map.get_mut(parent_id) {
760 parent.last_accessed = Instant::now();
761 if !parent.child_ids.iter().any(|id| id == child_id) {
762 parent.child_ids.push(child_id.to_string());
763 }
764 }
765 if let Some(child) = map.get_mut(child_id) {
766 child.last_accessed = Instant::now();
767 child.parent_id = Some(parent_id.to_string());
768 child.branched_at_event_index = branched_at_event_index;
769 child.transcript = clone_transcript_with_parent(&child.transcript, parent_id);
770 }
771}
772
773fn branch_event_index(transcript: &VmValue, keep_first: usize) -> usize {
774 if keep_first == 0 {
775 return 0;
776 }
777 let Some(dict) = transcript.as_dict() else {
778 return keep_first;
779 };
780 let Some(VmValue::List(events)) = dict.get("events") else {
781 return keep_first;
782 };
783 let mut retained_messages = 0usize;
784 for (index, event) in events.iter().enumerate() {
785 let kind = event
786 .as_dict()
787 .and_then(|dict| dict.get("kind"))
788 .map(VmValue::display);
789 if matches!(kind.as_deref(), Some("message" | "tool_result")) {
790 retained_messages += 1;
791 if retained_messages == keep_first {
792 return index + 1;
793 }
794 }
795 }
796 events.len()
797}
798
799#[cfg(test)]
800mod tests {
801 use super::*;
802 use crate::agent_events::{
803 emit_event, reset_all_sinks, session_external_sink_count, AgentEvent,
804 };
805 use crate::event_log::{active_event_log, EventLog, Topic};
806 use std::collections::BTreeMap;
807
808 fn make_msg(role: &str, content: &str) -> VmValue {
809 let mut m: BTreeMap<String, VmValue> = BTreeMap::new();
810 m.insert("role".to_string(), VmValue::String(Rc::from(role)));
811 m.insert("content".to_string(), VmValue::String(Rc::from(content)));
812 VmValue::Dict(Rc::new(m))
813 }
814
815 fn message_count(id: &str) -> usize {
816 SESSIONS.with(|s| {
817 let map = s.borrow();
818 let Some(state) = map.get(id) else { return 0 };
819 let Some(dict) = state.transcript.as_dict() else {
820 return 0;
821 };
822 match dict.get("messages") {
823 Some(VmValue::List(list)) => list.len(),
824 _ => 0,
825 }
826 })
827 }
828
829 #[test]
830 fn fork_at_truncates_destination_to_keep_first() {
831 reset_session_store();
832 let src = open_or_create(Some("src-fork-at".into()));
833 inject_message(&src, make_msg("user", "a")).unwrap();
834 inject_message(&src, make_msg("assistant", "b")).unwrap();
835 inject_message(&src, make_msg("user", "c")).unwrap();
836 inject_message(&src, make_msg("assistant", "d")).unwrap();
837 assert_eq!(message_count(&src), 4);
838
839 let dst = fork_at(&src, 2, Some("dst-fork-at".into())).expect("fork_at");
840 assert_ne!(dst, src);
841 assert_eq!(message_count(&dst), 2, "branched at message index 2");
842 assert_eq!(
843 snapshot(&dst)
844 .and_then(|value| value.as_dict().cloned())
845 .and_then(|dict| dict
846 .get("branched_at_event_index")
847 .and_then(VmValue::as_int)),
848 Some(2)
849 );
850 assert_eq!(message_count(&src), 4);
852 assert_eq!(subscriber_count(&dst), 0);
854 reset_session_store();
855 }
856
857 #[test]
858 fn fork_at_on_unknown_source_returns_none() {
859 reset_session_store();
860 assert!(fork_at("does-not-exist", 3, None).is_none());
861 }
862
863 #[test]
864 fn child_sessions_record_parent_lineage() {
865 reset_session_store();
866 let parent = open_or_create(Some("parent-session".into()));
867 let child = open_child_session(&parent, Some("child-session".into()));
868 assert_eq!(parent_id(&child).as_deref(), Some("parent-session"));
869 assert_eq!(child_ids(&parent), vec!["child-session".to_string()]);
870 assert_eq!(
871 ancestry(&child),
872 Some(SessionAncestry {
873 parent_id: Some("parent-session".to_string()),
874 child_ids: Vec::new(),
875 root_id: "parent-session".to_string(),
876 })
877 );
878
879 let transcript = snapshot(&child).expect("child transcript");
880 let transcript = transcript.as_dict().expect("child snapshot");
881 let metadata = transcript
882 .get("metadata")
883 .and_then(|value| value.as_dict())
884 .expect("child metadata");
885 assert!(
886 matches!(transcript.get("parent_id"), Some(VmValue::String(value)) if value.as_ref() == "parent-session")
887 );
888 assert!(
889 matches!(transcript.get("child_ids"), Some(VmValue::List(children)) if children.is_empty())
890 );
891 assert!(matches!(
892 transcript.get("branched_at_event_index"),
893 Some(VmValue::Nil)
894 ));
895 assert!(matches!(
896 metadata.get("parent_session_id"),
897 Some(VmValue::String(value)) if value.as_ref() == "parent-session"
898 ));
899 }
900
901 #[test]
902 fn branch_event_index_counts_non_message_events() {
903 reset_session_store();
904 let src = open_or_create(Some("branch-event-index".into()));
905 let transcript = VmValue::Dict(Rc::new(BTreeMap::from([
906 ("id".to_string(), VmValue::String(Rc::from(src.clone()))),
907 (
908 "messages".to_string(),
909 VmValue::List(Rc::new(vec![
910 make_msg("user", "a"),
911 make_msg("assistant", "b"),
912 ])),
913 ),
914 (
915 "events".to_string(),
916 VmValue::List(Rc::new(vec![
917 VmValue::Dict(Rc::new(BTreeMap::from([(
918 "kind".to_string(),
919 VmValue::String(Rc::from("message")),
920 )]))),
921 VmValue::Dict(Rc::new(BTreeMap::from([(
922 "kind".to_string(),
923 VmValue::String(Rc::from("sub_agent_start")),
924 )]))),
925 VmValue::Dict(Rc::new(BTreeMap::from([(
926 "kind".to_string(),
927 VmValue::String(Rc::from("message")),
928 )]))),
929 ])),
930 ),
931 ])));
932 store_transcript(&src, transcript);
933
934 let dst = fork_at(&src, 2, Some("branch-event-index-child".into())).expect("fork_at");
935 assert_eq!(
936 snapshot(&dst)
937 .and_then(|value| value.as_dict().cloned())
938 .and_then(|dict| dict
939 .get("branched_at_event_index")
940 .and_then(VmValue::as_int)),
941 Some(3)
942 );
943 }
944
945 #[test]
946 fn child_session_forks_parent_transcript() {
947 reset_session_store();
948 let parent = open_or_create(Some("parent-fork-parent".into()));
949 inject_message(&parent, make_msg("user", "parent context")).unwrap();
950
951 let child = open_child_session(&parent, Some("parent-fork-child".into()));
952 assert_eq!(message_count(&parent), 1);
953 assert_eq!(message_count(&child), 1);
954
955 let child_messages = messages_json(&child);
956 assert_eq!(
957 child_messages[0]["content"].as_str(),
958 Some("parent context"),
959 );
960 }
961
962 #[test]
963 fn prompt_state_prepends_summary_message_when_missing_from_messages() {
964 reset_session_store();
965 let session = open_or_create(Some("prompt-state-summary".into()));
966 let transcript = crate::llm::helpers::new_transcript_with_events(
967 Some(session.clone()),
968 vec![make_msg("assistant", "latest answer")],
969 Some("[auto-compacted 2 older messages]\nsummary".to_string()),
970 None,
971 Vec::new(),
972 Vec::new(),
973 Some("active"),
974 );
975 store_transcript(&session, transcript);
976
977 let prompt = prompt_state_json(&session);
978 assert_eq!(
979 prompt.summary.as_deref(),
980 Some("[auto-compacted 2 older messages]\nsummary")
981 );
982 assert_eq!(prompt.messages.len(), 2);
983 assert_eq!(prompt.messages[0]["role"].as_str(), Some("user"));
984 assert_eq!(
985 prompt.messages[0]["content"].as_str(),
986 Some("[auto-compacted 2 older messages]\nsummary"),
987 );
988 assert_eq!(prompt.messages[1]["role"].as_str(), Some("assistant"));
989 }
990
991 #[tokio::test(flavor = "current_thread")]
992 async fn open_or_create_registers_event_log_sink_when_active_log_is_installed() {
993 reset_all_sinks();
994 crate::event_log::reset_active_event_log();
995 let dir = tempfile::tempdir().expect("tempdir");
996 crate::event_log::install_default_for_base_dir(dir.path()).expect("install event log");
997
998 let session = open_or_create(Some("event-log-session".into()));
999 assert_eq!(session_external_sink_count(&session), 1);
1000
1001 emit_event(&AgentEvent::TurnStart {
1002 session_id: session.clone(),
1003 iteration: 0,
1004 });
1005 tokio::time::sleep(std::time::Duration::from_millis(25)).await;
1006
1007 let topic = Topic::new("observability.agent_events.event-log-session").unwrap();
1008 let log = active_event_log().expect("active event log");
1009 let events = log.read_range(&topic, None, usize::MAX).await.unwrap();
1010 assert_eq!(events.len(), 1);
1011 assert_eq!(events[0].1.kind, "turn_start");
1012
1013 crate::event_log::reset_active_event_log();
1014 reset_all_sinks();
1015 }
1016}