1use std::cell::{Cell, RefCell};
22use std::collections::{BTreeMap, HashMap, HashSet};
23use std::rc::Rc;
24use std::time::Instant;
25
26use crate::value::VmValue;
27
28pub const DEFAULT_SESSION_CAP: usize = 128;
31
32pub struct SessionState {
33 pub id: String,
34 pub transcript: VmValue,
35 pub subscribers: Vec<VmValue>,
36 pub created_at: Instant,
37 pub last_accessed: Instant,
38 pub parent_id: Option<String>,
39 pub child_ids: Vec<String>,
40 pub branched_at_event_index: Option<usize>,
41 pub active_skills: Vec<String>,
47 pub tool_format: Option<String>,
51}
52
53impl SessionState {
54 fn new(id: String) -> Self {
55 let now = Instant::now();
56 let transcript = empty_transcript(&id);
57 Self {
58 id,
59 transcript,
60 subscribers: Vec::new(),
61 created_at: now,
62 last_accessed: now,
63 parent_id: None,
64 child_ids: Vec::new(),
65 branched_at_event_index: None,
66 active_skills: Vec::new(),
67 tool_format: None,
68 }
69 }
70}
71
72#[derive(Clone, Debug, PartialEq, Eq)]
73pub struct SessionAncestry {
74 pub parent_id: Option<String>,
75 pub child_ids: Vec<String>,
76 pub root_id: String,
77}
78
79thread_local! {
80 static SESSIONS: RefCell<HashMap<String, SessionState>> = RefCell::new(HashMap::new());
81 static SESSION_CAP: Cell<usize> = const { Cell::new(DEFAULT_SESSION_CAP) };
82 static CURRENT_SESSION_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
83}
84
85pub struct CurrentSessionGuard {
86 active: bool,
87}
88
89impl Drop for CurrentSessionGuard {
90 fn drop(&mut self) {
91 if self.active {
92 pop_current_session();
93 }
94 }
95}
96
97pub fn set_session_cap(cap: usize) {
100 SESSION_CAP.with(|c| c.set(cap.max(1)));
101}
102
103pub fn session_cap() -> usize {
104 SESSION_CAP.with(|c| c.get())
105}
106
107pub fn reset_session_store() {
109 SESSIONS.with(|s| s.borrow_mut().clear());
110 CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().clear());
111}
112
113pub(crate) fn push_current_session(id: String) {
114 if id.is_empty() {
115 return;
116 }
117 CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().push(id));
118}
119
120pub(crate) fn pop_current_session() {
121 CURRENT_SESSION_STACK.with(|stack| {
122 let _ = stack.borrow_mut().pop();
123 });
124}
125
126pub fn current_session_id() -> Option<String> {
127 CURRENT_SESSION_STACK.with(|stack| stack.borrow().last().cloned())
128}
129
130pub fn enter_current_session(id: impl Into<String>) -> CurrentSessionGuard {
131 let id = id.into();
132 if id.trim().is_empty() {
133 return CurrentSessionGuard { active: false };
134 }
135 push_current_session(id);
136 CurrentSessionGuard { active: true }
137}
138
139pub fn exists(id: &str) -> bool {
140 SESSIONS.with(|s| s.borrow().contains_key(id))
141}
142
143pub fn length(id: &str) -> Option<usize> {
144 SESSIONS.with(|s| {
145 s.borrow().get(id).map(|state| {
146 state
147 .transcript
148 .as_dict()
149 .and_then(|d| d.get("messages"))
150 .and_then(|v| match v {
151 VmValue::List(list) => Some(list.len()),
152 _ => None,
153 })
154 .unwrap_or(0)
155 })
156 })
157}
158
159pub fn snapshot(id: &str) -> Option<VmValue> {
160 SESSIONS.with(|s| s.borrow().get(id).map(session_snapshot))
161}
162
163pub fn open_or_create(id: Option<String>) -> String {
172 let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
173 let parent_session = current_session_id();
174 let mut was_new = false;
175 SESSIONS.with(|s| {
176 let mut map = s.borrow_mut();
177 if let Some(state) = map.get_mut(&resolved) {
178 state.last_accessed = Instant::now();
179 return;
180 }
181 was_new = true;
182 let cap = SESSION_CAP.with(|c| c.get());
183 if map.len() >= cap {
184 if let Some(victim) = map
185 .iter()
186 .min_by_key(|(_, state)| state.last_accessed)
187 .map(|(id, _)| id.clone())
188 {
189 map.remove(&victim);
190 }
191 }
192 map.insert(resolved.clone(), SessionState::new(resolved.clone()));
193 });
194 if was_new {
195 if let Some(parent) = parent_session.as_deref() {
196 crate::agent_events::mirror_session_sinks(parent, &resolved);
197 }
198 try_register_event_log(&resolved);
199 }
200 resolved
201}
202
203pub fn open_child_session(parent_id: &str, id: Option<String>) -> String {
204 let resolved = open_or_create(id);
205 link_child_session(parent_id, &resolved);
206 resolved
207}
208
209pub fn link_child_session(parent_id: &str, child_id: &str) {
210 link_child_session_with_branch(parent_id, child_id, None);
211}
212
213pub fn link_child_session_with_branch(
214 parent_id: &str,
215 child_id: &str,
216 branched_at_event_index: Option<usize>,
217) {
218 if parent_id == child_id {
219 return;
220 }
221 open_or_create(Some(parent_id.to_string()));
222 open_or_create(Some(child_id.to_string()));
223 SESSIONS.with(|s| {
224 let mut map = s.borrow_mut();
225 update_lineage(&mut map, parent_id, child_id, branched_at_event_index);
226 });
227}
228
229pub fn parent_id(id: &str) -> Option<String> {
230 SESSIONS.with(|s| s.borrow().get(id).and_then(|state| state.parent_id.clone()))
231}
232
233pub fn child_ids(id: &str) -> Vec<String> {
234 SESSIONS.with(|s| {
235 s.borrow()
236 .get(id)
237 .map(|state| state.child_ids.clone())
238 .unwrap_or_default()
239 })
240}
241
242pub fn ancestry(id: &str) -> Option<SessionAncestry> {
243 SESSIONS.with(|s| {
244 let map = s.borrow();
245 let state = map.get(id)?;
246 let mut root_id = state.id.clone();
247 let mut cursor = state.parent_id.clone();
248 let mut seen = HashSet::from([state.id.clone()]);
249 while let Some(parent_id) = cursor {
250 if !seen.insert(parent_id.clone()) {
251 break;
252 }
253 root_id = parent_id.clone();
254 cursor = map
255 .get(&parent_id)
256 .and_then(|parent| parent.parent_id.clone());
257 }
258 Some(SessionAncestry {
259 parent_id: state.parent_id.clone(),
260 child_ids: state.child_ids.clone(),
261 root_id,
262 })
263 })
264}
265
266fn try_register_event_log(session_id: &str) {
270 if let Some(log) = crate::event_log::active_event_log() {
271 crate::agent_events::register_sink(
272 session_id,
273 crate::agent_events::EventLogSink::new(log, session_id),
274 );
275 return;
276 }
277 let Ok(dir) = std::env::var("HARN_EVENT_LOG_DIR") else {
278 return;
279 };
280 if dir.is_empty() {
281 return;
282 }
283 let path = std::path::PathBuf::from(dir).join(format!("event_log-{session_id}.jsonl"));
284 if let Ok(sink) = crate::agent_events::JsonlEventSink::open(&path) {
285 crate::agent_events::register_sink(session_id, sink);
286 }
287}
288
289pub fn register_event_log_sink(session_id: &str) {
290 try_register_event_log(session_id);
291}
292
293pub fn close(id: &str) {
294 SESSIONS.with(|s| {
295 s.borrow_mut().remove(id);
296 });
297}
298
299pub fn reset_transcript(id: &str) -> bool {
300 SESSIONS.with(|s| {
301 let mut map = s.borrow_mut();
302 let Some(state) = map.get_mut(id) else {
303 return false;
304 };
305 state.transcript = empty_transcript(id);
306 state.tool_format = None;
307 state.last_accessed = Instant::now();
308 true
309 })
310}
311
312pub fn fork(src_id: &str, dst_id: Option<String>) -> Option<String> {
319 let (src_transcript, src_tool_format, dst) = SESSIONS.with(|s| {
320 let mut map = s.borrow_mut();
321 let src = map.get_mut(src_id)?;
322 src.last_accessed = Instant::now();
323 let dst = dst_id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
324 let forked_transcript = clone_transcript_with_id(&src.transcript, &dst);
325 Some((forked_transcript, src.tool_format.clone(), dst))
326 })?;
327 open_or_create(Some(dst.clone()));
329 SESSIONS.with(|s| {
330 let mut map = s.borrow_mut();
331 if let Some(state) = map.get_mut(&dst) {
332 state.transcript = src_transcript;
333 state.tool_format = src_tool_format;
334 state.last_accessed = Instant::now();
335 }
336 update_lineage(&mut map, src_id, &dst, None);
337 });
338 if exists(&dst) {
342 Some(dst)
343 } else {
344 None
345 }
346}
347
348pub fn fork_at(src_id: &str, keep_first: usize, dst_id: Option<String>) -> Option<String> {
359 let branched_at_event_index = SESSIONS.with(|s| {
360 let map = s.borrow();
361 let src = map.get(src_id)?;
362 Some(branch_event_index(&src.transcript, keep_first))
363 })?;
364 let new_id = fork(src_id, dst_id)?;
365 link_child_session_with_branch(src_id, &new_id, Some(branched_at_event_index));
366 retain_first(&new_id, keep_first);
367 Some(new_id)
368}
369
370fn retain_first(id: &str, keep_first: usize) {
374 SESSIONS.with(|s| {
375 let mut map = s.borrow_mut();
376 let Some(state) = map.get_mut(id) else {
377 return;
378 };
379 let Some(dict) = state.transcript.as_dict() else {
380 return;
381 };
382 let dict = dict.clone();
383 let messages: Vec<VmValue> = match dict.get("messages") {
384 Some(VmValue::List(list)) => list.iter().cloned().collect(),
385 _ => Vec::new(),
386 };
387 let retained: Vec<VmValue> = messages.into_iter().take(keep_first).collect();
388 let mut next = dict;
389 next.insert(
390 "events".to_string(),
391 VmValue::List(Rc::new(
392 crate::llm::helpers::transcript_events_from_messages(&retained),
393 )),
394 );
395 next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
396 state.transcript = VmValue::Dict(Rc::new(next));
397 state.last_accessed = Instant::now();
398 });
399}
400
401pub fn trim(id: &str, keep_last: usize) -> Option<usize> {
404 SESSIONS.with(|s| {
405 let mut map = s.borrow_mut();
406 let state = map.get_mut(id)?;
407 let dict = state.transcript.as_dict()?.clone();
408 let messages: Vec<VmValue> = match dict.get("messages") {
409 Some(VmValue::List(list)) => list.iter().cloned().collect(),
410 _ => Vec::new(),
411 };
412 let start = messages.len().saturating_sub(keep_last);
413 let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
414 let kept = retained.len();
415 let mut next = dict;
416 next.insert(
417 "events".to_string(),
418 VmValue::List(Rc::new(
419 crate::llm::helpers::transcript_events_from_messages(&retained),
420 )),
421 );
422 next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
423 state.transcript = VmValue::Dict(Rc::new(next));
424 state.last_accessed = Instant::now();
425 Some(kept)
426 })
427}
428
429pub fn inject_message(id: &str, message: VmValue) -> Result<(), String> {
432 let Some(msg_dict) = message.as_dict().cloned() else {
433 return Err("agent_session_inject: message must be a dict".into());
434 };
435 let role_ok = matches!(msg_dict.get("role"), Some(VmValue::String(_)));
436 if !role_ok {
437 return Err(
438 "agent_session_inject: message must have a string `role` (user|assistant|tool_result|system)"
439 .into(),
440 );
441 }
442 SESSIONS.with(|s| {
443 let mut map = s.borrow_mut();
444 let Some(state) = map.get_mut(id) else {
445 return Err(format!("agent_session_inject: unknown session id '{id}'"));
446 };
447 let dict = state
448 .transcript
449 .as_dict()
450 .cloned()
451 .unwrap_or_else(BTreeMap::new);
452 let mut messages: Vec<VmValue> = match dict.get("messages") {
453 Some(VmValue::List(list)) => list.iter().cloned().collect(),
454 _ => Vec::new(),
455 };
456 let mut events: Vec<VmValue> = match dict.get("events") {
457 Some(VmValue::List(list)) => list.iter().cloned().collect(),
458 _ => crate::llm::helpers::transcript_events_from_messages(&messages),
459 };
460 let new_message = VmValue::Dict(Rc::new(msg_dict));
461 events.push(crate::llm::helpers::transcript_event_from_message(
462 &new_message,
463 ));
464 messages.push(new_message);
465 let mut next = dict;
466 next.insert("events".to_string(), VmValue::List(Rc::new(events)));
467 next.insert("messages".to_string(), VmValue::List(Rc::new(messages)));
468 state.transcript = VmValue::Dict(Rc::new(next));
469 state.last_accessed = Instant::now();
470 Ok(())
471 })
472}
473
474pub fn messages_json(id: &str) -> Vec<serde_json::Value> {
478 SESSIONS.with(|s| {
479 let map = s.borrow();
480 let Some(state) = map.get(id) else {
481 return Vec::new();
482 };
483 let Some(dict) = state.transcript.as_dict() else {
484 return Vec::new();
485 };
486 match dict.get("messages") {
487 Some(VmValue::List(list)) => list
488 .iter()
489 .map(crate::llm::helpers::vm_value_to_json)
490 .collect(),
491 _ => Vec::new(),
492 }
493 })
494}
495
496#[derive(Clone, Debug, Default)]
497pub struct SessionPromptState {
498 pub messages: Vec<serde_json::Value>,
499 pub summary: Option<String>,
500}
501
502fn summary_message_json(summary: &str) -> serde_json::Value {
503 serde_json::json!({
504 "role": "user",
505 "content": summary,
506 })
507}
508
509fn messages_begin_with_summary(messages: &[serde_json::Value], summary: &str) -> bool {
510 messages.first().is_some_and(|message| {
511 message.get("role").and_then(|value| value.as_str()) == Some("user")
512 && message.get("content").and_then(|value| value.as_str()) == Some(summary)
513 })
514}
515
516pub fn prompt_state_json(id: &str) -> SessionPromptState {
524 SESSIONS.with(|s| {
525 let map = s.borrow();
526 let Some(state) = map.get(id) else {
527 return SessionPromptState::default();
528 };
529 let Some(dict) = state.transcript.as_dict() else {
530 return SessionPromptState::default();
531 };
532 let mut messages = match dict.get("messages") {
533 Some(VmValue::List(list)) => list
534 .iter()
535 .map(crate::llm::helpers::vm_value_to_json)
536 .collect::<Vec<_>>(),
537 _ => Vec::new(),
538 };
539 let summary = dict.get("summary").and_then(|value| match value {
540 VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
541 _ => None,
542 });
543 if let Some(summary_text) = summary.as_deref() {
544 if !messages_begin_with_summary(&messages, summary_text) {
545 messages.insert(0, summary_message_json(summary_text));
546 }
547 }
548 SessionPromptState { messages, summary }
549 })
550}
551
552pub fn store_transcript(id: &str, transcript: VmValue) {
555 SESSIONS.with(|s| {
556 if let Some(state) = s.borrow_mut().get_mut(id) {
557 state.transcript = transcript_with_session_metadata(transcript, state);
558 state.last_accessed = Instant::now();
559 }
560 });
561}
562
563pub fn append_event(id: &str, event: VmValue) -> Result<(), String> {
569 let Some(event_dict) = event.as_dict() else {
570 return Err("agent_session_append_event: event must be a dict".into());
571 };
572 let kind_ok = matches!(event_dict.get("kind"), Some(VmValue::String(_)));
573 if !kind_ok {
574 return Err("agent_session_append_event: event must have a string `kind`".into());
575 }
576 SESSIONS.with(|s| {
577 let mut map = s.borrow_mut();
578 let Some(state) = map.get_mut(id) else {
579 return Err(format!(
580 "agent_session_append_event: unknown session id '{id}'"
581 ));
582 };
583 let dict = state
584 .transcript
585 .as_dict()
586 .cloned()
587 .unwrap_or_else(BTreeMap::new);
588 let mut events: Vec<VmValue> = match dict.get("events") {
589 Some(VmValue::List(list)) => list.iter().cloned().collect(),
590 _ => dict
591 .get("messages")
592 .and_then(|value| match value {
593 VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
594 _ => None,
595 })
596 .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
597 .unwrap_or_default(),
598 };
599 events.push(event);
600 let mut next = dict;
601 next.insert("events".to_string(), VmValue::List(Rc::new(events)));
602 state.transcript = VmValue::Dict(Rc::new(next));
603 state.last_accessed = Instant::now();
604 Ok(())
605 })
606}
607
608pub fn replace_messages(id: &str, messages: &[serde_json::Value]) {
611 replace_messages_with_summary(id, messages, None);
612}
613
614pub fn replace_messages_with_summary(
619 id: &str,
620 messages: &[serde_json::Value],
621 summary: Option<&str>,
622) {
623 SESSIONS.with(|s| {
624 let mut map = s.borrow_mut();
625 let Some(state) = map.get_mut(id) else {
626 return;
627 };
628 let dict = state
629 .transcript
630 .as_dict()
631 .cloned()
632 .unwrap_or_else(BTreeMap::new);
633 let vm_messages: Vec<VmValue> = messages
634 .iter()
635 .map(crate::stdlib::json_to_vm_value)
636 .collect();
637 let mut next = dict;
638 next.insert(
639 "events".to_string(),
640 VmValue::List(Rc::new(
641 crate::llm::helpers::transcript_events_from_messages(&vm_messages),
642 )),
643 );
644 next.insert("messages".to_string(), VmValue::List(Rc::new(vm_messages)));
645 if let Some(summary) = summary {
646 next.insert(
647 "summary".to_string(),
648 VmValue::String(Rc::from(summary.to_string())),
649 );
650 }
651 state.transcript = VmValue::Dict(Rc::new(next));
652 state.last_accessed = Instant::now();
653 });
654}
655
656pub fn append_subscriber(id: &str, callback: VmValue) {
657 open_or_create(Some(id.to_string()));
658 SESSIONS.with(|s| {
659 if let Some(state) = s.borrow_mut().get_mut(id) {
660 state.subscribers.push(callback);
661 state.last_accessed = Instant::now();
662 }
663 });
664}
665
666pub fn subscribers_for(id: &str) -> Vec<VmValue> {
667 SESSIONS.with(|s| {
668 s.borrow()
669 .get(id)
670 .map(|state| state.subscribers.clone())
671 .unwrap_or_default()
672 })
673}
674
675pub fn subscriber_count(id: &str) -> usize {
676 SESSIONS.with(|s| {
677 s.borrow()
678 .get(id)
679 .map(|state| state.subscribers.len())
680 .unwrap_or(0)
681 })
682}
683
684pub fn set_active_skills(id: &str, skills: Vec<String>) {
688 SESSIONS.with(|s| {
689 if let Some(state) = s.borrow_mut().get_mut(id) {
690 state.active_skills = skills;
691 state.last_accessed = Instant::now();
692 }
693 });
694}
695
696pub fn active_skills(id: &str) -> Vec<String> {
700 SESSIONS.with(|s| {
701 s.borrow()
702 .get(id)
703 .map(|state| state.active_skills.clone())
704 .unwrap_or_default()
705 })
706}
707
708pub fn claim_tool_format(id: &str, tool_format: &str) -> Result<(), String> {
714 let tool_format = tool_format.trim();
715 if tool_format.is_empty() {
716 return Ok(());
717 }
718 SESSIONS.with(|s| {
719 let mut map = s.borrow_mut();
720 let Some(state) = map.get_mut(id) else {
721 return Err(format!("agent session '{id}' does not exist"));
722 };
723 match state.tool_format.as_deref() {
724 Some(existing) if existing != tool_format => Err(format!(
725 "agent session '{id}' is locked to tool_format='{existing}', but this run requested tool_format='{tool_format}'. Start a new session or fork/reset the transcript before changing tool mode."
726 )),
727 Some(_) => {
728 state.last_accessed = Instant::now();
729 Ok(())
730 }
731 None => {
732 state.tool_format = Some(tool_format.to_string());
733 state.last_accessed = Instant::now();
734 Ok(())
735 }
736 }
737 })
738}
739
740pub fn tool_format(id: &str) -> Option<String> {
741 SESSIONS.with(|s| {
742 s.borrow()
743 .get(id)
744 .and_then(|state| state.tool_format.clone())
745 })
746}
747
748fn empty_transcript(id: &str) -> VmValue {
749 use crate::llm::helpers::new_transcript_with;
750 new_transcript_with(Some(id.to_string()), Vec::new(), None, None)
751}
752
753fn clone_transcript_with_id(transcript: &VmValue, new_id: &str) -> VmValue {
754 let Some(dict) = transcript.as_dict() else {
755 return empty_transcript(new_id);
756 };
757 let mut next = dict.clone();
758 next.insert(
759 "id".to_string(),
760 VmValue::String(Rc::from(new_id.to_string())),
761 );
762 VmValue::Dict(Rc::new(next))
763}
764
765fn clone_transcript_with_parent(transcript: &VmValue, parent_id: &str) -> VmValue {
766 let Some(dict) = transcript.as_dict() else {
767 return transcript.clone();
768 };
769 let mut next = dict.clone();
770 let metadata = match next.get("metadata") {
771 Some(VmValue::Dict(metadata)) => {
772 let mut metadata = metadata.as_ref().clone();
773 metadata.insert(
774 "parent_session_id".to_string(),
775 VmValue::String(Rc::from(parent_id.to_string())),
776 );
777 VmValue::Dict(Rc::new(metadata))
778 }
779 _ => VmValue::Dict(Rc::new(BTreeMap::from([(
780 "parent_session_id".to_string(),
781 VmValue::String(Rc::from(parent_id.to_string())),
782 )]))),
783 };
784 next.insert("metadata".to_string(), metadata);
785 VmValue::Dict(Rc::new(next))
786}
787
788fn transcript_with_session_metadata(transcript: VmValue, state: &SessionState) -> VmValue {
789 let Some(dict) = transcript.as_dict() else {
790 return transcript;
791 };
792 let mut next = dict.clone();
793 let mut metadata = match next.get("metadata") {
794 Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
795 _ => BTreeMap::new(),
796 };
797 if let Some(tool_format) = state.tool_format.as_ref() {
798 metadata.insert(
799 "tool_format".to_string(),
800 VmValue::String(Rc::from(tool_format.clone())),
801 );
802 metadata.insert("tool_mode_locked".to_string(), VmValue::Bool(true));
803 }
804 if !metadata.is_empty() {
805 next.insert("metadata".to_string(), VmValue::Dict(Rc::new(metadata)));
806 }
807 VmValue::Dict(Rc::new(next))
808}
809
810fn session_snapshot(state: &SessionState) -> VmValue {
811 let Some(dict) = state.transcript.as_dict() else {
812 return state.transcript.clone();
813 };
814 let mut next = dict.clone();
815 next.insert(
816 "parent_id".to_string(),
817 state
818 .parent_id
819 .as_ref()
820 .map(|id| VmValue::String(Rc::from(id.clone())))
821 .unwrap_or(VmValue::Nil),
822 );
823 next.insert(
824 "child_ids".to_string(),
825 VmValue::List(Rc::new(
826 state
827 .child_ids
828 .iter()
829 .cloned()
830 .map(|id| VmValue::String(Rc::from(id)))
831 .collect(),
832 )),
833 );
834 next.insert(
835 "branched_at_event_index".to_string(),
836 state
837 .branched_at_event_index
838 .map(|index| VmValue::Int(index as i64))
839 .unwrap_or(VmValue::Nil),
840 );
841 VmValue::Dict(Rc::new(next))
842}
843
844fn update_lineage(
845 map: &mut HashMap<String, SessionState>,
846 parent_id: &str,
847 child_id: &str,
848 branched_at_event_index: Option<usize>,
849) {
850 let old_parent_id = map.get(child_id).and_then(|child| child.parent_id.clone());
851 if let Some(old_parent_id) = old_parent_id.filter(|old_parent_id| old_parent_id != parent_id) {
852 if let Some(old_parent) = map.get_mut(&old_parent_id) {
853 old_parent.child_ids.retain(|id| id != child_id);
854 old_parent.last_accessed = Instant::now();
855 }
856 }
857 if let Some(parent) = map.get_mut(parent_id) {
858 parent.last_accessed = Instant::now();
859 if !parent.child_ids.iter().any(|id| id == child_id) {
860 parent.child_ids.push(child_id.to_string());
861 }
862 }
863 if let Some(child) = map.get_mut(child_id) {
864 child.last_accessed = Instant::now();
865 child.parent_id = Some(parent_id.to_string());
866 child.branched_at_event_index = branched_at_event_index;
867 child.transcript = clone_transcript_with_parent(&child.transcript, parent_id);
868 }
869}
870
871fn branch_event_index(transcript: &VmValue, keep_first: usize) -> usize {
872 if keep_first == 0 {
873 return 0;
874 }
875 let Some(dict) = transcript.as_dict() else {
876 return keep_first;
877 };
878 let Some(VmValue::List(events)) = dict.get("events") else {
879 return keep_first;
880 };
881 let mut retained_messages = 0usize;
882 for (index, event) in events.iter().enumerate() {
883 let kind = event
884 .as_dict()
885 .and_then(|dict| dict.get("kind"))
886 .map(VmValue::display);
887 if matches!(kind.as_deref(), Some("message" | "tool_result")) {
888 retained_messages += 1;
889 if retained_messages == keep_first {
890 return index + 1;
891 }
892 }
893 }
894 events.len()
895}
896
897#[cfg(test)]
898mod tests {
899 use super::*;
900 use crate::agent_events::{
901 emit_event, reset_all_sinks, session_external_sink_count, AgentEvent,
902 };
903 use crate::event_log::{active_event_log, EventLog, Topic};
904 use std::collections::BTreeMap;
905
906 fn make_msg(role: &str, content: &str) -> VmValue {
907 let mut m: BTreeMap<String, VmValue> = BTreeMap::new();
908 m.insert("role".to_string(), VmValue::String(Rc::from(role)));
909 m.insert("content".to_string(), VmValue::String(Rc::from(content)));
910 VmValue::Dict(Rc::new(m))
911 }
912
913 fn message_count(id: &str) -> usize {
914 SESSIONS.with(|s| {
915 let map = s.borrow();
916 let Some(state) = map.get(id) else { return 0 };
917 let Some(dict) = state.transcript.as_dict() else {
918 return 0;
919 };
920 match dict.get("messages") {
921 Some(VmValue::List(list)) => list.len(),
922 _ => 0,
923 }
924 })
925 }
926
927 #[test]
928 fn fork_at_truncates_destination_to_keep_first() {
929 reset_session_store();
930 let src = open_or_create(Some("src-fork-at".into()));
931 inject_message(&src, make_msg("user", "a")).unwrap();
932 inject_message(&src, make_msg("assistant", "b")).unwrap();
933 inject_message(&src, make_msg("user", "c")).unwrap();
934 inject_message(&src, make_msg("assistant", "d")).unwrap();
935 assert_eq!(message_count(&src), 4);
936
937 let dst = fork_at(&src, 2, Some("dst-fork-at".into())).expect("fork_at");
938 assert_ne!(dst, src);
939 assert_eq!(message_count(&dst), 2, "branched at message index 2");
940 assert_eq!(
941 snapshot(&dst)
942 .and_then(|value| value.as_dict().cloned())
943 .and_then(|dict| dict
944 .get("branched_at_event_index")
945 .and_then(VmValue::as_int)),
946 Some(2)
947 );
948 assert_eq!(message_count(&src), 4);
950 assert_eq!(subscriber_count(&dst), 0);
952 reset_session_store();
953 }
954
955 #[test]
956 fn fork_at_on_unknown_source_returns_none() {
957 reset_session_store();
958 assert!(fork_at("does-not-exist", 3, None).is_none());
959 }
960
961 #[test]
962 fn child_sessions_record_parent_lineage() {
963 reset_session_store();
964 let parent = open_or_create(Some("parent-session".into()));
965 let child = open_child_session(&parent, Some("child-session".into()));
966 assert_eq!(parent_id(&child).as_deref(), Some("parent-session"));
967 assert_eq!(child_ids(&parent), vec!["child-session".to_string()]);
968 assert_eq!(
969 ancestry(&child),
970 Some(SessionAncestry {
971 parent_id: Some("parent-session".to_string()),
972 child_ids: Vec::new(),
973 root_id: "parent-session".to_string(),
974 })
975 );
976
977 let transcript = snapshot(&child).expect("child transcript");
978 let transcript = transcript.as_dict().expect("child snapshot");
979 let metadata = transcript
980 .get("metadata")
981 .and_then(|value| value.as_dict())
982 .expect("child metadata");
983 assert!(
984 matches!(transcript.get("parent_id"), Some(VmValue::String(value)) if value.as_ref() == "parent-session")
985 );
986 assert!(
987 matches!(transcript.get("child_ids"), Some(VmValue::List(children)) if children.is_empty())
988 );
989 assert!(matches!(
990 transcript.get("branched_at_event_index"),
991 Some(VmValue::Nil)
992 ));
993 assert!(matches!(
994 metadata.get("parent_session_id"),
995 Some(VmValue::String(value)) if value.as_ref() == "parent-session"
996 ));
997 }
998
999 #[test]
1000 fn branch_event_index_counts_non_message_events() {
1001 reset_session_store();
1002 let src = open_or_create(Some("branch-event-index".into()));
1003 let transcript = VmValue::Dict(Rc::new(BTreeMap::from([
1004 ("id".to_string(), VmValue::String(Rc::from(src.clone()))),
1005 (
1006 "messages".to_string(),
1007 VmValue::List(Rc::new(vec![
1008 make_msg("user", "a"),
1009 make_msg("assistant", "b"),
1010 ])),
1011 ),
1012 (
1013 "events".to_string(),
1014 VmValue::List(Rc::new(vec![
1015 VmValue::Dict(Rc::new(BTreeMap::from([(
1016 "kind".to_string(),
1017 VmValue::String(Rc::from("message")),
1018 )]))),
1019 VmValue::Dict(Rc::new(BTreeMap::from([(
1020 "kind".to_string(),
1021 VmValue::String(Rc::from("sub_agent_start")),
1022 )]))),
1023 VmValue::Dict(Rc::new(BTreeMap::from([(
1024 "kind".to_string(),
1025 VmValue::String(Rc::from("message")),
1026 )]))),
1027 ])),
1028 ),
1029 ])));
1030 store_transcript(&src, transcript);
1031
1032 let dst = fork_at(&src, 2, Some("branch-event-index-child".into())).expect("fork_at");
1033 assert_eq!(
1034 snapshot(&dst)
1035 .and_then(|value| value.as_dict().cloned())
1036 .and_then(|dict| dict
1037 .get("branched_at_event_index")
1038 .and_then(VmValue::as_int)),
1039 Some(3)
1040 );
1041 }
1042
1043 #[test]
1044 fn child_session_records_lineage_without_reusing_parent_transcript() {
1045 reset_session_store();
1046 let parent = open_or_create(Some("parent-fork-parent".into()));
1047 inject_message(&parent, make_msg("user", "parent context")).unwrap();
1048 claim_tool_format(&parent, "native").unwrap();
1049
1050 let child = open_child_session(&parent, Some("parent-fork-child".into()));
1051 assert_eq!(message_count(&parent), 1);
1052 assert_eq!(message_count(&child), 0);
1053 assert_eq!(tool_format(&child), None);
1054 assert_eq!(parent_id(&child).as_deref(), Some(parent.as_str()));
1055 }
1056
1057 #[test]
1058 fn prompt_state_prepends_summary_message_when_missing_from_messages() {
1059 reset_session_store();
1060 let session = open_or_create(Some("prompt-state-summary".into()));
1061 let transcript = crate::llm::helpers::new_transcript_with_events(
1062 Some(session.clone()),
1063 vec![make_msg("assistant", "latest answer")],
1064 Some("[auto-compacted 2 older messages]\nsummary".to_string()),
1065 None,
1066 Vec::new(),
1067 Vec::new(),
1068 Some("active"),
1069 );
1070 store_transcript(&session, transcript);
1071
1072 let prompt = prompt_state_json(&session);
1073 assert_eq!(
1074 prompt.summary.as_deref(),
1075 Some("[auto-compacted 2 older messages]\nsummary")
1076 );
1077 assert_eq!(prompt.messages.len(), 2);
1078 assert_eq!(prompt.messages[0]["role"].as_str(), Some("user"));
1079 assert_eq!(
1080 prompt.messages[0]["content"].as_str(),
1081 Some("[auto-compacted 2 older messages]\nsummary"),
1082 );
1083 assert_eq!(prompt.messages[1]["role"].as_str(), Some("assistant"));
1084 }
1085
1086 #[tokio::test(flavor = "current_thread", start_paused = true)]
1087 async fn open_or_create_registers_event_log_sink_when_active_log_is_installed() {
1088 reset_all_sinks();
1089 crate::event_log::reset_active_event_log();
1090 let dir = tempfile::tempdir().expect("tempdir");
1091 crate::event_log::install_default_for_base_dir(dir.path()).expect("install event log");
1092
1093 let session = open_or_create(Some("event-log-session".into()));
1094 assert_eq!(session_external_sink_count(&session), 1);
1095
1096 emit_event(&AgentEvent::TurnStart {
1097 session_id: session.clone(),
1098 iteration: 0,
1099 });
1100 tokio::time::sleep(std::time::Duration::from_millis(25)).await;
1101
1102 let topic = Topic::new("observability.agent_events.event-log-session").unwrap();
1103 let log = active_event_log().expect("active event log");
1104 let events = log.read_range(&topic, None, usize::MAX).await.unwrap();
1105 assert_eq!(events.len(), 1);
1106 assert_eq!(events[0].1.kind, "turn_start");
1107
1108 crate::event_log::reset_active_event_log();
1109 reset_all_sinks();
1110 }
1111}