1use std::cell::{Cell, RefCell};
22use std::collections::{BTreeMap, HashMap, HashSet};
23use std::rc::Rc;
24use std::time::Instant;
25
26use crate::value::VmValue;
27
28pub const DEFAULT_SESSION_CAP: usize = 128;
31
32pub struct SessionState {
33 pub id: String,
34 pub transcript: VmValue,
35 pub subscribers: Vec<VmValue>,
36 pub created_at: Instant,
37 pub last_accessed: Instant,
38 pub parent_id: Option<String>,
39 pub child_ids: Vec<String>,
40 pub branched_at_event_index: Option<usize>,
41 pub active_skills: Vec<String>,
47}
48
49impl SessionState {
50 fn new(id: String) -> Self {
51 let now = Instant::now();
52 let transcript = empty_transcript(&id);
53 Self {
54 id,
55 transcript,
56 subscribers: Vec::new(),
57 created_at: now,
58 last_accessed: now,
59 parent_id: None,
60 child_ids: Vec::new(),
61 branched_at_event_index: None,
62 active_skills: Vec::new(),
63 }
64 }
65}
66
67#[derive(Clone, Debug, PartialEq, Eq)]
68pub struct SessionAncestry {
69 pub parent_id: Option<String>,
70 pub child_ids: Vec<String>,
71 pub root_id: String,
72}
73
74thread_local! {
75 static SESSIONS: RefCell<HashMap<String, SessionState>> = RefCell::new(HashMap::new());
76 static SESSION_CAP: Cell<usize> = const { Cell::new(DEFAULT_SESSION_CAP) };
77 static CURRENT_SESSION_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
78}
79
80pub fn set_session_cap(cap: usize) {
83 SESSION_CAP.with(|c| c.set(cap.max(1)));
84}
85
86pub fn session_cap() -> usize {
87 SESSION_CAP.with(|c| c.get())
88}
89
90pub fn reset_session_store() {
92 SESSIONS.with(|s| s.borrow_mut().clear());
93 CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().clear());
94}
95
96pub(crate) fn push_current_session(id: String) {
97 if id.is_empty() {
98 return;
99 }
100 CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().push(id));
101}
102
103pub(crate) fn pop_current_session() {
104 CURRENT_SESSION_STACK.with(|stack| {
105 let _ = stack.borrow_mut().pop();
106 });
107}
108
109pub fn current_session_id() -> Option<String> {
110 CURRENT_SESSION_STACK.with(|stack| stack.borrow().last().cloned())
111}
112
113pub fn exists(id: &str) -> bool {
114 SESSIONS.with(|s| s.borrow().contains_key(id))
115}
116
117pub fn length(id: &str) -> Option<usize> {
118 SESSIONS.with(|s| {
119 s.borrow().get(id).map(|state| {
120 state
121 .transcript
122 .as_dict()
123 .and_then(|d| d.get("messages"))
124 .and_then(|v| match v {
125 VmValue::List(list) => Some(list.len()),
126 _ => None,
127 })
128 .unwrap_or(0)
129 })
130 })
131}
132
133pub fn snapshot(id: &str) -> Option<VmValue> {
134 SESSIONS.with(|s| s.borrow().get(id).map(session_snapshot))
135}
136
137pub fn open_or_create(id: Option<String>) -> String {
146 let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
147 let mut was_new = false;
148 SESSIONS.with(|s| {
149 let mut map = s.borrow_mut();
150 if let Some(state) = map.get_mut(&resolved) {
151 state.last_accessed = Instant::now();
152 return;
153 }
154 was_new = true;
155 let cap = SESSION_CAP.with(|c| c.get());
156 if map.len() >= cap {
157 if let Some(victim) = map
158 .iter()
159 .min_by_key(|(_, state)| state.last_accessed)
160 .map(|(id, _)| id.clone())
161 {
162 map.remove(&victim);
163 }
164 }
165 map.insert(resolved.clone(), SessionState::new(resolved.clone()));
166 });
167 if was_new {
168 try_register_event_log(&resolved);
169 }
170 resolved
171}
172
173pub fn open_child_session(parent_id: &str, id: Option<String>) -> String {
174 let resolved = fork(parent_id, id.clone()).unwrap_or_else(|| open_or_create(id));
175 link_child_session(parent_id, &resolved);
176 resolved
177}
178
179pub fn link_child_session(parent_id: &str, child_id: &str) {
180 link_child_session_with_branch(parent_id, child_id, None);
181}
182
183pub fn link_child_session_with_branch(
184 parent_id: &str,
185 child_id: &str,
186 branched_at_event_index: Option<usize>,
187) {
188 if parent_id == child_id {
189 return;
190 }
191 open_or_create(Some(parent_id.to_string()));
192 open_or_create(Some(child_id.to_string()));
193 SESSIONS.with(|s| {
194 let mut map = s.borrow_mut();
195 update_lineage(&mut map, parent_id, child_id, branched_at_event_index);
196 });
197}
198
199pub fn parent_id(id: &str) -> Option<String> {
200 SESSIONS.with(|s| s.borrow().get(id).and_then(|state| state.parent_id.clone()))
201}
202
203pub fn child_ids(id: &str) -> Vec<String> {
204 SESSIONS.with(|s| {
205 s.borrow()
206 .get(id)
207 .map(|state| state.child_ids.clone())
208 .unwrap_or_default()
209 })
210}
211
212pub fn ancestry(id: &str) -> Option<SessionAncestry> {
213 SESSIONS.with(|s| {
214 let map = s.borrow();
215 let state = map.get(id)?;
216 let mut root_id = state.id.clone();
217 let mut cursor = state.parent_id.clone();
218 let mut seen = HashSet::from([state.id.clone()]);
219 while let Some(parent_id) = cursor {
220 if !seen.insert(parent_id.clone()) {
221 break;
222 }
223 root_id = parent_id.clone();
224 cursor = map
225 .get(&parent_id)
226 .and_then(|parent| parent.parent_id.clone());
227 }
228 Some(SessionAncestry {
229 parent_id: state.parent_id.clone(),
230 child_ids: state.child_ids.clone(),
231 root_id,
232 })
233 })
234}
235
236fn try_register_event_log(session_id: &str) {
240 if let Some(log) = crate::event_log::active_event_log() {
241 crate::agent_events::register_sink(
242 session_id,
243 crate::agent_events::EventLogSink::new(log, session_id),
244 );
245 return;
246 }
247 let Ok(dir) = std::env::var("HARN_EVENT_LOG_DIR") else {
248 return;
249 };
250 if dir.is_empty() {
251 return;
252 }
253 let path = std::path::PathBuf::from(dir).join(format!("event_log-{session_id}.jsonl"));
254 if let Ok(sink) = crate::agent_events::JsonlEventSink::open(&path) {
255 crate::agent_events::register_sink(session_id, sink);
256 }
257}
258
259pub fn close(id: &str) {
260 SESSIONS.with(|s| {
261 s.borrow_mut().remove(id);
262 });
263}
264
265pub fn reset_transcript(id: &str) -> bool {
266 SESSIONS.with(|s| {
267 let mut map = s.borrow_mut();
268 let Some(state) = map.get_mut(id) else {
269 return false;
270 };
271 state.transcript = empty_transcript(id);
272 state.last_accessed = Instant::now();
273 true
274 })
275}
276
277pub fn fork(src_id: &str, dst_id: Option<String>) -> Option<String> {
284 let (src_transcript, dst) = SESSIONS.with(|s| {
285 let mut map = s.borrow_mut();
286 let src = map.get_mut(src_id)?;
287 src.last_accessed = Instant::now();
288 let dst = dst_id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
289 let forked_transcript = clone_transcript_with_id(&src.transcript, &dst);
290 Some((forked_transcript, dst))
291 })?;
292 open_or_create(Some(dst.clone()));
294 SESSIONS.with(|s| {
295 let mut map = s.borrow_mut();
296 if let Some(state) = map.get_mut(&dst) {
297 state.transcript = src_transcript;
298 state.last_accessed = Instant::now();
299 }
300 update_lineage(&mut map, src_id, &dst, None);
301 });
302 if exists(&dst) {
306 Some(dst)
307 } else {
308 None
309 }
310}
311
312pub fn fork_at(src_id: &str, keep_first: usize, dst_id: Option<String>) -> Option<String> {
323 let branched_at_event_index = SESSIONS.with(|s| {
324 let map = s.borrow();
325 let src = map.get(src_id)?;
326 Some(branch_event_index(&src.transcript, keep_first))
327 })?;
328 let new_id = fork(src_id, dst_id)?;
329 link_child_session_with_branch(src_id, &new_id, Some(branched_at_event_index));
330 retain_first(&new_id, keep_first);
331 Some(new_id)
332}
333
334fn retain_first(id: &str, keep_first: usize) {
338 SESSIONS.with(|s| {
339 let mut map = s.borrow_mut();
340 let Some(state) = map.get_mut(id) else {
341 return;
342 };
343 let Some(dict) = state.transcript.as_dict() else {
344 return;
345 };
346 let dict = dict.clone();
347 let messages: Vec<VmValue> = match dict.get("messages") {
348 Some(VmValue::List(list)) => list.iter().cloned().collect(),
349 _ => Vec::new(),
350 };
351 let retained: Vec<VmValue> = messages.into_iter().take(keep_first).collect();
352 let mut next = dict;
353 next.insert(
354 "events".to_string(),
355 VmValue::List(Rc::new(
356 crate::llm::helpers::transcript_events_from_messages(&retained),
357 )),
358 );
359 next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
360 state.transcript = VmValue::Dict(Rc::new(next));
361 state.last_accessed = Instant::now();
362 });
363}
364
365pub fn trim(id: &str, keep_last: usize) -> Option<usize> {
368 SESSIONS.with(|s| {
369 let mut map = s.borrow_mut();
370 let state = map.get_mut(id)?;
371 let dict = state.transcript.as_dict()?.clone();
372 let messages: Vec<VmValue> = match dict.get("messages") {
373 Some(VmValue::List(list)) => list.iter().cloned().collect(),
374 _ => Vec::new(),
375 };
376 let start = messages.len().saturating_sub(keep_last);
377 let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
378 let kept = retained.len();
379 let mut next = dict;
380 next.insert(
381 "events".to_string(),
382 VmValue::List(Rc::new(
383 crate::llm::helpers::transcript_events_from_messages(&retained),
384 )),
385 );
386 next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
387 state.transcript = VmValue::Dict(Rc::new(next));
388 state.last_accessed = Instant::now();
389 Some(kept)
390 })
391}
392
393pub fn inject_message(id: &str, message: VmValue) -> Result<(), String> {
396 let Some(msg_dict) = message.as_dict().cloned() else {
397 return Err("agent_session_inject: message must be a dict".into());
398 };
399 let role_ok = matches!(msg_dict.get("role"), Some(VmValue::String(_)));
400 if !role_ok {
401 return Err(
402 "agent_session_inject: message must have a string `role` (user|assistant|tool_result|system)"
403 .into(),
404 );
405 }
406 SESSIONS.with(|s| {
407 let mut map = s.borrow_mut();
408 let Some(state) = map.get_mut(id) else {
409 return Err(format!("agent_session_inject: unknown session id '{id}'"));
410 };
411 let dict = state
412 .transcript
413 .as_dict()
414 .cloned()
415 .unwrap_or_else(BTreeMap::new);
416 let mut messages: Vec<VmValue> = match dict.get("messages") {
417 Some(VmValue::List(list)) => list.iter().cloned().collect(),
418 _ => Vec::new(),
419 };
420 messages.push(VmValue::Dict(Rc::new(msg_dict)));
421 let mut next = dict;
422 next.insert(
423 "events".to_string(),
424 VmValue::List(Rc::new(
425 crate::llm::helpers::transcript_events_from_messages(&messages),
426 )),
427 );
428 next.insert("messages".to_string(), VmValue::List(Rc::new(messages)));
429 state.transcript = VmValue::Dict(Rc::new(next));
430 state.last_accessed = Instant::now();
431 Ok(())
432 })
433}
434
435pub fn messages_json(id: &str) -> Vec<serde_json::Value> {
439 SESSIONS.with(|s| {
440 let map = s.borrow();
441 let Some(state) = map.get(id) else {
442 return Vec::new();
443 };
444 let Some(dict) = state.transcript.as_dict() else {
445 return Vec::new();
446 };
447 match dict.get("messages") {
448 Some(VmValue::List(list)) => list
449 .iter()
450 .map(crate::llm::helpers::vm_value_to_json)
451 .collect(),
452 _ => Vec::new(),
453 }
454 })
455}
456
457#[derive(Clone, Debug, Default)]
458pub struct SessionPromptState {
459 pub messages: Vec<serde_json::Value>,
460 pub summary: Option<String>,
461}
462
463fn summary_message_json(summary: &str) -> serde_json::Value {
464 serde_json::json!({
465 "role": "user",
466 "content": summary,
467 })
468}
469
470fn messages_begin_with_summary(messages: &[serde_json::Value], summary: &str) -> bool {
471 messages.first().is_some_and(|message| {
472 message.get("role").and_then(|value| value.as_str()) == Some("user")
473 && message.get("content").and_then(|value| value.as_str()) == Some(summary)
474 })
475}
476
477pub fn prompt_state_json(id: &str) -> SessionPromptState {
485 SESSIONS.with(|s| {
486 let map = s.borrow();
487 let Some(state) = map.get(id) else {
488 return SessionPromptState::default();
489 };
490 let Some(dict) = state.transcript.as_dict() else {
491 return SessionPromptState::default();
492 };
493 let mut messages = match dict.get("messages") {
494 Some(VmValue::List(list)) => list
495 .iter()
496 .map(crate::llm::helpers::vm_value_to_json)
497 .collect::<Vec<_>>(),
498 _ => Vec::new(),
499 };
500 let summary = dict.get("summary").and_then(|value| match value {
501 VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
502 _ => None,
503 });
504 if let Some(summary_text) = summary.as_deref() {
505 if !messages_begin_with_summary(&messages, summary_text) {
506 messages.insert(0, summary_message_json(summary_text));
507 }
508 }
509 SessionPromptState { messages, summary }
510 })
511}
512
513pub fn store_transcript(id: &str, transcript: VmValue) {
516 SESSIONS.with(|s| {
517 if let Some(state) = s.borrow_mut().get_mut(id) {
518 state.transcript = transcript;
519 state.last_accessed = Instant::now();
520 }
521 });
522}
523
524pub fn append_event(id: &str, event: VmValue) -> Result<(), String> {
530 let Some(event_dict) = event.as_dict() else {
531 return Err("agent_session_append_event: event must be a dict".into());
532 };
533 let kind_ok = matches!(event_dict.get("kind"), Some(VmValue::String(_)));
534 if !kind_ok {
535 return Err("agent_session_append_event: event must have a string `kind`".into());
536 }
537 SESSIONS.with(|s| {
538 let mut map = s.borrow_mut();
539 let Some(state) = map.get_mut(id) else {
540 return Err(format!(
541 "agent_session_append_event: unknown session id '{id}'"
542 ));
543 };
544 let dict = state
545 .transcript
546 .as_dict()
547 .cloned()
548 .unwrap_or_else(BTreeMap::new);
549 let mut events: Vec<VmValue> = match dict.get("events") {
550 Some(VmValue::List(list)) => list.iter().cloned().collect(),
551 _ => dict
552 .get("messages")
553 .and_then(|value| match value {
554 VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
555 _ => None,
556 })
557 .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
558 .unwrap_or_default(),
559 };
560 events.push(event);
561 let mut next = dict;
562 next.insert("events".to_string(), VmValue::List(Rc::new(events)));
563 state.transcript = VmValue::Dict(Rc::new(next));
564 state.last_accessed = Instant::now();
565 Ok(())
566 })
567}
568
569pub fn replace_messages(id: &str, messages: &[serde_json::Value]) {
572 SESSIONS.with(|s| {
573 let mut map = s.borrow_mut();
574 let Some(state) = map.get_mut(id) else {
575 return;
576 };
577 let dict = state
578 .transcript
579 .as_dict()
580 .cloned()
581 .unwrap_or_else(BTreeMap::new);
582 let vm_messages: Vec<VmValue> = messages
583 .iter()
584 .map(crate::stdlib::json_to_vm_value)
585 .collect();
586 let mut next = dict;
587 next.insert(
588 "events".to_string(),
589 VmValue::List(Rc::new(
590 crate::llm::helpers::transcript_events_from_messages(&vm_messages),
591 )),
592 );
593 next.insert("messages".to_string(), VmValue::List(Rc::new(vm_messages)));
594 state.transcript = VmValue::Dict(Rc::new(next));
595 state.last_accessed = Instant::now();
596 });
597}
598
599pub fn append_subscriber(id: &str, callback: VmValue) {
600 open_or_create(Some(id.to_string()));
601 SESSIONS.with(|s| {
602 if let Some(state) = s.borrow_mut().get_mut(id) {
603 state.subscribers.push(callback);
604 state.last_accessed = Instant::now();
605 }
606 });
607}
608
609pub fn subscribers_for(id: &str) -> Vec<VmValue> {
610 SESSIONS.with(|s| {
611 s.borrow()
612 .get(id)
613 .map(|state| state.subscribers.clone())
614 .unwrap_or_default()
615 })
616}
617
618pub fn subscriber_count(id: &str) -> usize {
619 SESSIONS.with(|s| {
620 s.borrow()
621 .get(id)
622 .map(|state| state.subscribers.len())
623 .unwrap_or(0)
624 })
625}
626
627pub fn set_active_skills(id: &str, skills: Vec<String>) {
631 SESSIONS.with(|s| {
632 if let Some(state) = s.borrow_mut().get_mut(id) {
633 state.active_skills = skills;
634 state.last_accessed = Instant::now();
635 }
636 });
637}
638
639pub fn active_skills(id: &str) -> Vec<String> {
643 SESSIONS.with(|s| {
644 s.borrow()
645 .get(id)
646 .map(|state| state.active_skills.clone())
647 .unwrap_or_default()
648 })
649}
650
651fn empty_transcript(id: &str) -> VmValue {
652 use crate::llm::helpers::new_transcript_with;
653 new_transcript_with(Some(id.to_string()), Vec::new(), None, None)
654}
655
656fn clone_transcript_with_id(transcript: &VmValue, new_id: &str) -> VmValue {
657 let Some(dict) = transcript.as_dict() else {
658 return empty_transcript(new_id);
659 };
660 let mut next = dict.clone();
661 next.insert(
662 "id".to_string(),
663 VmValue::String(Rc::from(new_id.to_string())),
664 );
665 VmValue::Dict(Rc::new(next))
666}
667
668fn clone_transcript_with_parent(transcript: &VmValue, parent_id: &str) -> VmValue {
669 let Some(dict) = transcript.as_dict() else {
670 return transcript.clone();
671 };
672 let mut next = dict.clone();
673 let metadata = match next.get("metadata") {
674 Some(VmValue::Dict(metadata)) => {
675 let mut metadata = metadata.as_ref().clone();
676 metadata.insert(
677 "parent_session_id".to_string(),
678 VmValue::String(Rc::from(parent_id.to_string())),
679 );
680 VmValue::Dict(Rc::new(metadata))
681 }
682 _ => VmValue::Dict(Rc::new(BTreeMap::from([(
683 "parent_session_id".to_string(),
684 VmValue::String(Rc::from(parent_id.to_string())),
685 )]))),
686 };
687 next.insert("metadata".to_string(), metadata);
688 VmValue::Dict(Rc::new(next))
689}
690
691fn session_snapshot(state: &SessionState) -> VmValue {
692 let Some(dict) = state.transcript.as_dict() else {
693 return state.transcript.clone();
694 };
695 let mut next = dict.clone();
696 next.insert(
697 "parent_id".to_string(),
698 state
699 .parent_id
700 .as_ref()
701 .map(|id| VmValue::String(Rc::from(id.clone())))
702 .unwrap_or(VmValue::Nil),
703 );
704 next.insert(
705 "child_ids".to_string(),
706 VmValue::List(Rc::new(
707 state
708 .child_ids
709 .iter()
710 .cloned()
711 .map(|id| VmValue::String(Rc::from(id)))
712 .collect(),
713 )),
714 );
715 next.insert(
716 "branched_at_event_index".to_string(),
717 state
718 .branched_at_event_index
719 .map(|index| VmValue::Int(index as i64))
720 .unwrap_or(VmValue::Nil),
721 );
722 VmValue::Dict(Rc::new(next))
723}
724
725fn update_lineage(
726 map: &mut HashMap<String, SessionState>,
727 parent_id: &str,
728 child_id: &str,
729 branched_at_event_index: Option<usize>,
730) {
731 let old_parent_id = map.get(child_id).and_then(|child| child.parent_id.clone());
732 if let Some(old_parent_id) = old_parent_id.filter(|old_parent_id| old_parent_id != parent_id) {
733 if let Some(old_parent) = map.get_mut(&old_parent_id) {
734 old_parent.child_ids.retain(|id| id != child_id);
735 old_parent.last_accessed = Instant::now();
736 }
737 }
738 if let Some(parent) = map.get_mut(parent_id) {
739 parent.last_accessed = Instant::now();
740 if !parent.child_ids.iter().any(|id| id == child_id) {
741 parent.child_ids.push(child_id.to_string());
742 }
743 }
744 if let Some(child) = map.get_mut(child_id) {
745 child.last_accessed = Instant::now();
746 child.parent_id = Some(parent_id.to_string());
747 child.branched_at_event_index = branched_at_event_index;
748 child.transcript = clone_transcript_with_parent(&child.transcript, parent_id);
749 }
750}
751
752fn branch_event_index(transcript: &VmValue, keep_first: usize) -> usize {
753 if keep_first == 0 {
754 return 0;
755 }
756 let Some(dict) = transcript.as_dict() else {
757 return keep_first;
758 };
759 let Some(VmValue::List(events)) = dict.get("events") else {
760 return keep_first;
761 };
762 let mut retained_messages = 0usize;
763 for (index, event) in events.iter().enumerate() {
764 let kind = event
765 .as_dict()
766 .and_then(|dict| dict.get("kind"))
767 .map(VmValue::display);
768 if matches!(kind.as_deref(), Some("message" | "tool_result")) {
769 retained_messages += 1;
770 if retained_messages == keep_first {
771 return index + 1;
772 }
773 }
774 }
775 events.len()
776}
777
778#[cfg(test)]
779mod tests {
780 use super::*;
781 use crate::agent_events::{
782 emit_event, reset_all_sinks, session_external_sink_count, AgentEvent,
783 };
784 use crate::event_log::{active_event_log, EventLog, Topic};
785 use std::collections::BTreeMap;
786
787 fn make_msg(role: &str, content: &str) -> VmValue {
788 let mut m: BTreeMap<String, VmValue> = BTreeMap::new();
789 m.insert("role".to_string(), VmValue::String(Rc::from(role)));
790 m.insert("content".to_string(), VmValue::String(Rc::from(content)));
791 VmValue::Dict(Rc::new(m))
792 }
793
794 fn message_count(id: &str) -> usize {
795 SESSIONS.with(|s| {
796 let map = s.borrow();
797 let Some(state) = map.get(id) else { return 0 };
798 let Some(dict) = state.transcript.as_dict() else {
799 return 0;
800 };
801 match dict.get("messages") {
802 Some(VmValue::List(list)) => list.len(),
803 _ => 0,
804 }
805 })
806 }
807
808 #[test]
809 fn fork_at_truncates_destination_to_keep_first() {
810 reset_session_store();
811 let src = open_or_create(Some("src-fork-at".into()));
812 inject_message(&src, make_msg("user", "a")).unwrap();
813 inject_message(&src, make_msg("assistant", "b")).unwrap();
814 inject_message(&src, make_msg("user", "c")).unwrap();
815 inject_message(&src, make_msg("assistant", "d")).unwrap();
816 assert_eq!(message_count(&src), 4);
817
818 let dst = fork_at(&src, 2, Some("dst-fork-at".into())).expect("fork_at");
819 assert_ne!(dst, src);
820 assert_eq!(message_count(&dst), 2, "branched at message index 2");
821 assert_eq!(
822 snapshot(&dst)
823 .and_then(|value| value.as_dict().cloned())
824 .and_then(|dict| dict
825 .get("branched_at_event_index")
826 .and_then(VmValue::as_int)),
827 Some(2)
828 );
829 assert_eq!(message_count(&src), 4);
831 assert_eq!(subscriber_count(&dst), 0);
833 reset_session_store();
834 }
835
836 #[test]
837 fn fork_at_on_unknown_source_returns_none() {
838 reset_session_store();
839 assert!(fork_at("does-not-exist", 3, None).is_none());
840 }
841
842 #[test]
843 fn child_sessions_record_parent_lineage() {
844 reset_session_store();
845 let parent = open_or_create(Some("parent-session".into()));
846 let child = open_child_session(&parent, Some("child-session".into()));
847 assert_eq!(parent_id(&child).as_deref(), Some("parent-session"));
848 assert_eq!(child_ids(&parent), vec!["child-session".to_string()]);
849 assert_eq!(
850 ancestry(&child),
851 Some(SessionAncestry {
852 parent_id: Some("parent-session".to_string()),
853 child_ids: Vec::new(),
854 root_id: "parent-session".to_string(),
855 })
856 );
857
858 let transcript = snapshot(&child).expect("child transcript");
859 let transcript = transcript.as_dict().expect("child snapshot");
860 let metadata = transcript
861 .get("metadata")
862 .and_then(|value| value.as_dict())
863 .expect("child metadata");
864 assert!(
865 matches!(transcript.get("parent_id"), Some(VmValue::String(value)) if value.as_ref() == "parent-session")
866 );
867 assert!(
868 matches!(transcript.get("child_ids"), Some(VmValue::List(children)) if children.is_empty())
869 );
870 assert!(matches!(
871 transcript.get("branched_at_event_index"),
872 Some(VmValue::Nil)
873 ));
874 assert!(matches!(
875 metadata.get("parent_session_id"),
876 Some(VmValue::String(value)) if value.as_ref() == "parent-session"
877 ));
878 }
879
880 #[test]
881 fn branch_event_index_counts_non_message_events() {
882 reset_session_store();
883 let src = open_or_create(Some("branch-event-index".into()));
884 let transcript = VmValue::Dict(Rc::new(BTreeMap::from([
885 ("id".to_string(), VmValue::String(Rc::from(src.clone()))),
886 (
887 "messages".to_string(),
888 VmValue::List(Rc::new(vec![
889 make_msg("user", "a"),
890 make_msg("assistant", "b"),
891 ])),
892 ),
893 (
894 "events".to_string(),
895 VmValue::List(Rc::new(vec![
896 VmValue::Dict(Rc::new(BTreeMap::from([(
897 "kind".to_string(),
898 VmValue::String(Rc::from("message")),
899 )]))),
900 VmValue::Dict(Rc::new(BTreeMap::from([(
901 "kind".to_string(),
902 VmValue::String(Rc::from("sub_agent_start")),
903 )]))),
904 VmValue::Dict(Rc::new(BTreeMap::from([(
905 "kind".to_string(),
906 VmValue::String(Rc::from("message")),
907 )]))),
908 ])),
909 ),
910 ])));
911 store_transcript(&src, transcript);
912
913 let dst = fork_at(&src, 2, Some("branch-event-index-child".into())).expect("fork_at");
914 assert_eq!(
915 snapshot(&dst)
916 .and_then(|value| value.as_dict().cloned())
917 .and_then(|dict| dict
918 .get("branched_at_event_index")
919 .and_then(VmValue::as_int)),
920 Some(3)
921 );
922 }
923
924 #[test]
925 fn child_session_forks_parent_transcript() {
926 reset_session_store();
927 let parent = open_or_create(Some("parent-fork-parent".into()));
928 inject_message(&parent, make_msg("user", "parent context")).unwrap();
929
930 let child = open_child_session(&parent, Some("parent-fork-child".into()));
931 assert_eq!(message_count(&parent), 1);
932 assert_eq!(message_count(&child), 1);
933
934 let child_messages = messages_json(&child);
935 assert_eq!(
936 child_messages[0]["content"].as_str(),
937 Some("parent context"),
938 );
939 }
940
941 #[test]
942 fn prompt_state_prepends_summary_message_when_missing_from_messages() {
943 reset_session_store();
944 let session = open_or_create(Some("prompt-state-summary".into()));
945 let transcript = crate::llm::helpers::new_transcript_with_events(
946 Some(session.clone()),
947 vec![make_msg("assistant", "latest answer")],
948 Some("[auto-compacted 2 older messages]\nsummary".to_string()),
949 None,
950 Vec::new(),
951 Vec::new(),
952 Some("active"),
953 );
954 store_transcript(&session, transcript);
955
956 let prompt = prompt_state_json(&session);
957 assert_eq!(
958 prompt.summary.as_deref(),
959 Some("[auto-compacted 2 older messages]\nsummary")
960 );
961 assert_eq!(prompt.messages.len(), 2);
962 assert_eq!(prompt.messages[0]["role"].as_str(), Some("user"));
963 assert_eq!(
964 prompt.messages[0]["content"].as_str(),
965 Some("[auto-compacted 2 older messages]\nsummary"),
966 );
967 assert_eq!(prompt.messages[1]["role"].as_str(), Some("assistant"));
968 }
969
970 #[tokio::test(flavor = "current_thread")]
971 async fn open_or_create_registers_event_log_sink_when_active_log_is_installed() {
972 reset_all_sinks();
973 crate::event_log::reset_active_event_log();
974 let dir = tempfile::tempdir().expect("tempdir");
975 crate::event_log::install_default_for_base_dir(dir.path()).expect("install event log");
976
977 let session = open_or_create(Some("event-log-session".into()));
978 assert_eq!(session_external_sink_count(&session), 1);
979
980 emit_event(&AgentEvent::TurnStart {
981 session_id: session.clone(),
982 iteration: 0,
983 });
984 tokio::time::sleep(std::time::Duration::from_millis(25)).await;
985
986 let topic = Topic::new("observability.agent_events.event-log-session").unwrap();
987 let log = active_event_log().expect("active event log");
988 let events = log.read_range(&topic, None, usize::MAX).await.unwrap();
989 assert_eq!(events.len(), 1);
990 assert_eq!(events[0].1.kind, "turn_start");
991
992 crate::event_log::reset_active_event_log();
993 reset_all_sinks();
994 }
995}