1use std::cell::{Cell, RefCell};
22use std::collections::{BTreeMap, HashMap, HashSet};
23use std::rc::Rc;
24use std::time::Instant;
25
26use crate::value::VmValue;
27
28pub const DEFAULT_SESSION_CAP: usize = 128;
31
32pub struct SessionState {
33 pub id: String,
34 pub transcript: VmValue,
35 pub subscribers: Vec<VmValue>,
36 pub created_at: Instant,
37 pub last_accessed: Instant,
38 pub parent_id: Option<String>,
39 pub child_ids: Vec<String>,
40 pub branched_at_event_index: Option<usize>,
41 pub active_skills: Vec<String>,
47 pub tool_format: Option<String>,
51 pub system_prompt: Option<String>,
55}
56
57impl SessionState {
58 fn new(id: String) -> Self {
59 let now = Instant::now();
60 let transcript = empty_transcript(&id);
61 Self {
62 id,
63 transcript,
64 subscribers: Vec::new(),
65 created_at: now,
66 last_accessed: now,
67 parent_id: None,
68 child_ids: Vec::new(),
69 branched_at_event_index: None,
70 active_skills: Vec::new(),
71 tool_format: None,
72 system_prompt: None,
73 }
74 }
75}
76
77#[derive(Clone, Debug, PartialEq, Eq)]
78pub struct SessionAncestry {
79 pub parent_id: Option<String>,
80 pub child_ids: Vec<String>,
81 pub root_id: String,
82}
83
84thread_local! {
85 static SESSIONS: RefCell<HashMap<String, SessionState>> = RefCell::new(HashMap::new());
86 static SESSION_CAP: Cell<usize> = const { Cell::new(DEFAULT_SESSION_CAP) };
87 static CURRENT_SESSION_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
88}
89
90pub struct CurrentSessionGuard {
91 active: bool,
92}
93
94impl Drop for CurrentSessionGuard {
95 fn drop(&mut self) {
96 if self.active {
97 pop_current_session();
98 }
99 }
100}
101
102pub fn set_session_cap(cap: usize) {
105 SESSION_CAP.with(|c| c.set(cap.max(1)));
106}
107
108pub fn session_cap() -> usize {
109 SESSION_CAP.with(|c| c.get())
110}
111
112pub fn reset_session_store() {
114 SESSIONS.with(|s| s.borrow_mut().clear());
115 CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().clear());
116}
117
118pub(crate) fn push_current_session(id: String) {
119 if id.is_empty() {
120 return;
121 }
122 CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().push(id));
123}
124
125pub(crate) fn pop_current_session() {
126 CURRENT_SESSION_STACK.with(|stack| {
127 let _ = stack.borrow_mut().pop();
128 });
129}
130
131pub fn current_session_id() -> Option<String> {
132 CURRENT_SESSION_STACK.with(|stack| stack.borrow().last().cloned())
133}
134
135pub fn enter_current_session(id: impl Into<String>) -> CurrentSessionGuard {
136 let id = id.into();
137 if id.trim().is_empty() {
138 return CurrentSessionGuard { active: false };
139 }
140 push_current_session(id);
141 CurrentSessionGuard { active: true }
142}
143
144pub fn exists(id: &str) -> bool {
145 SESSIONS.with(|s| s.borrow().contains_key(id))
146}
147
148pub fn length(id: &str) -> Option<usize> {
149 SESSIONS.with(|s| {
150 s.borrow().get(id).map(|state| {
151 state
152 .transcript
153 .as_dict()
154 .and_then(|d| d.get("messages"))
155 .and_then(|v| match v {
156 VmValue::List(list) => Some(list.len()),
157 _ => None,
158 })
159 .unwrap_or(0)
160 })
161 })
162}
163
164pub fn snapshot(id: &str) -> Option<VmValue> {
165 SESSIONS.with(|s| s.borrow().get(id).map(session_snapshot))
166}
167
168pub fn open_or_create(id: Option<String>) -> String {
177 let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
178 let parent_session = current_session_id();
179 let mut was_new = false;
180 SESSIONS.with(|s| {
181 let mut map = s.borrow_mut();
182 if let Some(state) = map.get_mut(&resolved) {
183 state.last_accessed = Instant::now();
184 return;
185 }
186 was_new = true;
187 let cap = SESSION_CAP.with(|c| c.get());
188 if map.len() >= cap {
189 if let Some(victim) = map
190 .iter()
191 .min_by_key(|(_, state)| state.last_accessed)
192 .map(|(id, _)| id.clone())
193 {
194 map.remove(&victim);
195 }
196 }
197 map.insert(resolved.clone(), SessionState::new(resolved.clone()));
198 });
199 if was_new {
200 if let Some(parent) = parent_session.as_deref() {
201 crate::agent_events::mirror_session_sinks(parent, &resolved);
202 }
203 try_register_event_log(&resolved);
204 }
205 resolved
206}
207
208pub fn open_child_session(parent_id: &str, id: Option<String>) -> String {
209 let resolved = open_or_create(id);
210 link_child_session(parent_id, &resolved);
211 resolved
212}
213
214pub fn link_child_session(parent_id: &str, child_id: &str) {
215 link_child_session_with_branch(parent_id, child_id, None);
216}
217
218pub fn link_child_session_with_branch(
219 parent_id: &str,
220 child_id: &str,
221 branched_at_event_index: Option<usize>,
222) {
223 if parent_id == child_id {
224 return;
225 }
226 open_or_create(Some(parent_id.to_string()));
227 open_or_create(Some(child_id.to_string()));
228 SESSIONS.with(|s| {
229 let mut map = s.borrow_mut();
230 update_lineage(&mut map, parent_id, child_id, branched_at_event_index);
231 });
232}
233
234pub fn parent_id(id: &str) -> Option<String> {
235 SESSIONS.with(|s| s.borrow().get(id).and_then(|state| state.parent_id.clone()))
236}
237
238pub fn child_ids(id: &str) -> Vec<String> {
239 SESSIONS.with(|s| {
240 s.borrow()
241 .get(id)
242 .map(|state| state.child_ids.clone())
243 .unwrap_or_default()
244 })
245}
246
247pub fn ancestry(id: &str) -> Option<SessionAncestry> {
248 SESSIONS.with(|s| {
249 let map = s.borrow();
250 let state = map.get(id)?;
251 let mut root_id = state.id.clone();
252 let mut cursor = state.parent_id.clone();
253 let mut seen = HashSet::from([state.id.clone()]);
254 while let Some(parent_id) = cursor {
255 if !seen.insert(parent_id.clone()) {
256 break;
257 }
258 root_id = parent_id.clone();
259 cursor = map
260 .get(&parent_id)
261 .and_then(|parent| parent.parent_id.clone());
262 }
263 Some(SessionAncestry {
264 parent_id: state.parent_id.clone(),
265 child_ids: state.child_ids.clone(),
266 root_id,
267 })
268 })
269}
270
271fn try_register_event_log(session_id: &str) {
275 if let Some(log) = crate::event_log::active_event_log() {
276 crate::agent_events::register_sink(
277 session_id,
278 crate::agent_events::EventLogSink::new(log, session_id),
279 );
280 return;
281 }
282 let Ok(dir) = std::env::var("HARN_EVENT_LOG_DIR") else {
283 return;
284 };
285 if dir.is_empty() {
286 return;
287 }
288 let path = std::path::PathBuf::from(dir).join(format!("event_log-{session_id}.jsonl"));
289 if let Ok(sink) = crate::agent_events::JsonlEventSink::open(&path) {
290 crate::agent_events::register_sink(session_id, sink);
291 }
292}
293
294pub fn register_event_log_sink(session_id: &str) {
295 try_register_event_log(session_id);
296}
297
298pub fn close(id: &str) {
299 SESSIONS.with(|s| {
300 s.borrow_mut().remove(id);
301 });
302}
303
304pub fn reset_transcript(id: &str) -> bool {
305 SESSIONS.with(|s| {
306 let mut map = s.borrow_mut();
307 let Some(state) = map.get_mut(id) else {
308 return false;
309 };
310 state.transcript = empty_transcript(id);
311 state.tool_format = None;
312 state.system_prompt = None;
313 state.last_accessed = Instant::now();
314 true
315 })
316}
317
318pub fn fork(src_id: &str, dst_id: Option<String>) -> Option<String> {
325 let (src_transcript, src_tool_format, src_system_prompt, dst) = SESSIONS.with(|s| {
326 let mut map = s.borrow_mut();
327 let src = map.get_mut(src_id)?;
328 src.last_accessed = Instant::now();
329 let dst = dst_id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
330 let forked_transcript = clone_transcript_with_id(&src.transcript, &dst);
331 Some((
332 forked_transcript,
333 src.tool_format.clone(),
334 src.system_prompt.clone(),
335 dst,
336 ))
337 })?;
338 open_or_create(Some(dst.clone()));
340 SESSIONS.with(|s| {
341 let mut map = s.borrow_mut();
342 if let Some(state) = map.get_mut(&dst) {
343 state.transcript = src_transcript;
344 state.tool_format = src_tool_format;
345 state.system_prompt = src_system_prompt;
346 state.last_accessed = Instant::now();
347 }
348 update_lineage(&mut map, src_id, &dst, None);
349 });
350 if exists(&dst) {
354 Some(dst)
355 } else {
356 None
357 }
358}
359
360pub fn fork_at(src_id: &str, keep_first: usize, dst_id: Option<String>) -> Option<String> {
371 let branched_at_event_index = SESSIONS.with(|s| {
372 let map = s.borrow();
373 let src = map.get(src_id)?;
374 Some(branch_event_index(&src.transcript, keep_first))
375 })?;
376 let new_id = fork(src_id, dst_id)?;
377 link_child_session_with_branch(src_id, &new_id, Some(branched_at_event_index));
378 retain_first(&new_id, keep_first);
379 Some(new_id)
380}
381
382fn retain_first(id: &str, keep_first: usize) {
386 SESSIONS.with(|s| {
387 let mut map = s.borrow_mut();
388 let Some(state) = map.get_mut(id) else {
389 return;
390 };
391 let Some(dict) = state.transcript.as_dict() else {
392 return;
393 };
394 let dict = dict.clone();
395 let messages: Vec<VmValue> = match dict.get("messages") {
396 Some(VmValue::List(list)) => list.iter().cloned().collect(),
397 _ => Vec::new(),
398 };
399 let retained: Vec<VmValue> = messages.into_iter().take(keep_first).collect();
400 let mut next = dict;
401 next.insert(
402 "events".to_string(),
403 VmValue::List(Rc::new(
404 crate::llm::helpers::transcript_events_from_messages(&retained),
405 )),
406 );
407 next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
408 state.transcript = VmValue::Dict(Rc::new(next));
409 state.last_accessed = Instant::now();
410 });
411}
412
413pub fn trim(id: &str, keep_last: usize) -> Option<usize> {
416 SESSIONS.with(|s| {
417 let mut map = s.borrow_mut();
418 let state = map.get_mut(id)?;
419 let dict = state.transcript.as_dict()?.clone();
420 let messages: Vec<VmValue> = match dict.get("messages") {
421 Some(VmValue::List(list)) => list.iter().cloned().collect(),
422 _ => Vec::new(),
423 };
424 let start = messages.len().saturating_sub(keep_last);
425 let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
426 let kept = retained.len();
427 let mut next = dict;
428 next.insert(
429 "events".to_string(),
430 VmValue::List(Rc::new(
431 crate::llm::helpers::transcript_events_from_messages(&retained),
432 )),
433 );
434 next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
435 state.transcript = VmValue::Dict(Rc::new(next));
436 state.last_accessed = Instant::now();
437 Some(kept)
438 })
439}
440
441pub fn inject_message(id: &str, message: VmValue) -> Result<(), String> {
444 let Some(msg_dict) = message.as_dict().cloned() else {
445 return Err("agent_session_inject: message must be a dict".into());
446 };
447 let role_ok = matches!(msg_dict.get("role"), Some(VmValue::String(_)));
448 if !role_ok {
449 return Err(
450 "agent_session_inject: message must have a string `role` (user|assistant|tool_result|system)"
451 .into(),
452 );
453 }
454 SESSIONS.with(|s| {
455 let mut map = s.borrow_mut();
456 let Some(state) = map.get_mut(id) else {
457 return Err(format!("agent_session_inject: unknown session id '{id}'"));
458 };
459 let dict = state
460 .transcript
461 .as_dict()
462 .cloned()
463 .unwrap_or_else(BTreeMap::new);
464 let mut messages: Vec<VmValue> = match dict.get("messages") {
465 Some(VmValue::List(list)) => list.iter().cloned().collect(),
466 _ => Vec::new(),
467 };
468 let mut events: Vec<VmValue> = match dict.get("events") {
469 Some(VmValue::List(list)) => list.iter().cloned().collect(),
470 _ => crate::llm::helpers::transcript_events_from_messages(&messages),
471 };
472 let new_message = VmValue::Dict(Rc::new(msg_dict));
473 events.push(crate::llm::helpers::transcript_event_from_message(
474 &new_message,
475 ));
476 messages.push(new_message);
477 let mut next = dict;
478 next.insert("events".to_string(), VmValue::List(Rc::new(events)));
479 next.insert("messages".to_string(), VmValue::List(Rc::new(messages)));
480 state.transcript = VmValue::Dict(Rc::new(next));
481 state.last_accessed = Instant::now();
482 Ok(())
483 })
484}
485
486pub fn messages_json(id: &str) -> Vec<serde_json::Value> {
490 SESSIONS.with(|s| {
491 let map = s.borrow();
492 let Some(state) = map.get(id) else {
493 return Vec::new();
494 };
495 let Some(dict) = state.transcript.as_dict() else {
496 return Vec::new();
497 };
498 match dict.get("messages") {
499 Some(VmValue::List(list)) => list
500 .iter()
501 .map(crate::llm::helpers::vm_value_to_json)
502 .collect(),
503 _ => Vec::new(),
504 }
505 })
506}
507
508#[derive(Clone, Debug, Default)]
509pub struct SessionPromptState {
510 pub messages: Vec<serde_json::Value>,
511 pub summary: Option<String>,
512}
513
514fn summary_message_json(summary: &str) -> serde_json::Value {
515 serde_json::json!({
516 "role": "user",
517 "content": summary,
518 })
519}
520
521fn messages_begin_with_summary(messages: &[serde_json::Value], summary: &str) -> bool {
522 messages.first().is_some_and(|message| {
523 message.get("role").and_then(|value| value.as_str()) == Some("user")
524 && message.get("content").and_then(|value| value.as_str()) == Some(summary)
525 })
526}
527
528pub fn prompt_state_json(id: &str) -> SessionPromptState {
536 SESSIONS.with(|s| {
537 let map = s.borrow();
538 let Some(state) = map.get(id) else {
539 return SessionPromptState::default();
540 };
541 let Some(dict) = state.transcript.as_dict() else {
542 return SessionPromptState::default();
543 };
544 let mut messages = match dict.get("messages") {
545 Some(VmValue::List(list)) => list
546 .iter()
547 .map(crate::llm::helpers::vm_value_to_json)
548 .collect::<Vec<_>>(),
549 _ => Vec::new(),
550 };
551 let summary = dict.get("summary").and_then(|value| match value {
552 VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
553 _ => None,
554 });
555 if let Some(summary_text) = summary.as_deref() {
556 if !messages_begin_with_summary(&messages, summary_text) {
557 messages.insert(0, summary_message_json(summary_text));
558 }
559 }
560 SessionPromptState { messages, summary }
561 })
562}
563
564pub fn store_transcript(id: &str, transcript: VmValue) {
567 SESSIONS.with(|s| {
568 if let Some(state) = s.borrow_mut().get_mut(id) {
569 state.transcript = transcript_with_session_metadata(transcript, state);
570 state.last_accessed = Instant::now();
571 }
572 });
573}
574
575pub fn append_event(id: &str, event: VmValue) -> Result<(), String> {
581 let Some(event_dict) = event.as_dict() else {
582 return Err("agent_session_append_event: event must be a dict".into());
583 };
584 let kind_ok = matches!(event_dict.get("kind"), Some(VmValue::String(_)));
585 if !kind_ok {
586 return Err("agent_session_append_event: event must have a string `kind`".into());
587 }
588 SESSIONS.with(|s| {
589 let mut map = s.borrow_mut();
590 let Some(state) = map.get_mut(id) else {
591 return Err(format!(
592 "agent_session_append_event: unknown session id '{id}'"
593 ));
594 };
595 let dict = state
596 .transcript
597 .as_dict()
598 .cloned()
599 .unwrap_or_else(BTreeMap::new);
600 let mut events: Vec<VmValue> = match dict.get("events") {
601 Some(VmValue::List(list)) => list.iter().cloned().collect(),
602 _ => dict
603 .get("messages")
604 .and_then(|value| match value {
605 VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
606 _ => None,
607 })
608 .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
609 .unwrap_or_default(),
610 };
611 events.push(event);
612 let mut next = dict;
613 next.insert("events".to_string(), VmValue::List(Rc::new(events)));
614 state.transcript = VmValue::Dict(Rc::new(next));
615 state.last_accessed = Instant::now();
616 Ok(())
617 })
618}
619
620pub fn replace_messages(id: &str, messages: &[serde_json::Value]) {
623 replace_messages_with_summary(id, messages, None);
624}
625
626pub fn replace_messages_with_summary(
631 id: &str,
632 messages: &[serde_json::Value],
633 summary: Option<&str>,
634) {
635 SESSIONS.with(|s| {
636 let mut map = s.borrow_mut();
637 let Some(state) = map.get_mut(id) else {
638 return;
639 };
640 let dict = state
641 .transcript
642 .as_dict()
643 .cloned()
644 .unwrap_or_else(BTreeMap::new);
645 let vm_messages: Vec<VmValue> = messages
646 .iter()
647 .map(crate::stdlib::json_to_vm_value)
648 .collect();
649 let mut next = dict;
650 next.insert(
651 "events".to_string(),
652 VmValue::List(Rc::new(
653 crate::llm::helpers::transcript_events_from_messages(&vm_messages),
654 )),
655 );
656 next.insert("messages".to_string(), VmValue::List(Rc::new(vm_messages)));
657 if let Some(summary) = summary {
658 next.insert(
659 "summary".to_string(),
660 VmValue::String(Rc::from(summary.to_string())),
661 );
662 }
663 state.transcript = VmValue::Dict(Rc::new(next));
664 state.last_accessed = Instant::now();
665 });
666}
667
668pub fn append_subscriber(id: &str, callback: VmValue) {
669 open_or_create(Some(id.to_string()));
670 SESSIONS.with(|s| {
671 if let Some(state) = s.borrow_mut().get_mut(id) {
672 state.subscribers.push(callback);
673 state.last_accessed = Instant::now();
674 }
675 });
676}
677
678pub fn subscribers_for(id: &str) -> Vec<VmValue> {
679 SESSIONS.with(|s| {
680 s.borrow()
681 .get(id)
682 .map(|state| state.subscribers.clone())
683 .unwrap_or_default()
684 })
685}
686
687pub fn subscriber_count(id: &str) -> usize {
688 SESSIONS.with(|s| {
689 s.borrow()
690 .get(id)
691 .map(|state| state.subscribers.len())
692 .unwrap_or(0)
693 })
694}
695
696pub fn set_active_skills(id: &str, skills: Vec<String>) {
700 SESSIONS.with(|s| {
701 if let Some(state) = s.borrow_mut().get_mut(id) {
702 state.active_skills = skills;
703 state.last_accessed = Instant::now();
704 }
705 });
706}
707
708pub fn active_skills(id: &str) -> Vec<String> {
712 SESSIONS.with(|s| {
713 s.borrow()
714 .get(id)
715 .map(|state| state.active_skills.clone())
716 .unwrap_or_default()
717 })
718}
719
720pub fn claim_tool_format(id: &str, tool_format: &str) -> Result<(), String> {
726 let tool_format = tool_format.trim();
727 if tool_format.is_empty() {
728 return Ok(());
729 }
730 SESSIONS.with(|s| {
731 let mut map = s.borrow_mut();
732 let Some(state) = map.get_mut(id) else {
733 return Err(format!("agent session '{id}' does not exist"));
734 };
735 match state.tool_format.as_deref() {
736 Some(existing) if existing != tool_format => Err(format!(
737 "agent session '{id}' is locked to tool_format='{existing}', but this run requested tool_format='{tool_format}'. Start a new session or fork/reset the transcript before changing tool mode."
738 )),
739 Some(_) => {
740 state.last_accessed = Instant::now();
741 Ok(())
742 }
743 None => {
744 state.tool_format = Some(tool_format.to_string());
745 state.last_accessed = Instant::now();
746 Ok(())
747 }
748 }
749 })
750}
751
752pub fn tool_format(id: &str) -> Option<String> {
753 SESSIONS.with(|s| {
754 s.borrow()
755 .get(id)
756 .and_then(|state| state.tool_format.clone())
757 })
758}
759
760pub fn record_system_prompt(id: &str, system_prompt: &str) -> Result<(), String> {
761 let system_prompt = system_prompt.trim();
762 if system_prompt.is_empty() {
763 return Ok(());
764 }
765 SESSIONS.with(|s| {
766 let mut map = s.borrow_mut();
767 let Some(state) = map.get_mut(id) else {
768 return Err(format!("agent session '{id}' does not exist"));
769 };
770 let changed = state.system_prompt.as_deref() != Some(system_prompt);
771 state.system_prompt = Some(system_prompt.to_string());
772 let dict = state
773 .transcript
774 .as_dict()
775 .cloned()
776 .unwrap_or_else(BTreeMap::new);
777 let mut next = dict;
778 apply_system_prompt_metadata(&mut next, system_prompt);
779 if changed {
780 let mut events: Vec<VmValue> = match next.get("events") {
781 Some(VmValue::List(list)) => list.iter().cloned().collect(),
782 _ => Vec::new(),
783 };
784 events.push(crate::llm::helpers::transcript_event(
785 "system_prompt",
786 "system",
787 "internal",
788 "",
789 Some(crate::llm::helpers::system_prompt_event_metadata(
790 system_prompt,
791 )),
792 ));
793 next.insert("events".to_string(), VmValue::List(Rc::new(events)));
794 }
795 state.transcript = VmValue::Dict(Rc::new(next));
796 state.last_accessed = Instant::now();
797 Ok(())
798 })
799}
800
801pub fn system_prompt(id: &str) -> Option<String> {
802 SESSIONS.with(|s| {
803 s.borrow()
804 .get(id)
805 .and_then(|state| state.system_prompt.clone())
806 })
807}
808
809fn empty_transcript(id: &str) -> VmValue {
810 use crate::llm::helpers::new_transcript_with;
811 new_transcript_with(Some(id.to_string()), Vec::new(), None, None)
812}
813
814fn clone_transcript_with_id(transcript: &VmValue, new_id: &str) -> VmValue {
815 let Some(dict) = transcript.as_dict() else {
816 return empty_transcript(new_id);
817 };
818 let mut next = dict.clone();
819 next.insert(
820 "id".to_string(),
821 VmValue::String(Rc::from(new_id.to_string())),
822 );
823 VmValue::Dict(Rc::new(next))
824}
825
826fn clone_transcript_with_parent(transcript: &VmValue, parent_id: &str) -> VmValue {
827 let Some(dict) = transcript.as_dict() else {
828 return transcript.clone();
829 };
830 let mut next = dict.clone();
831 let metadata = match next.get("metadata") {
832 Some(VmValue::Dict(metadata)) => {
833 let mut metadata = metadata.as_ref().clone();
834 metadata.insert(
835 "parent_session_id".to_string(),
836 VmValue::String(Rc::from(parent_id.to_string())),
837 );
838 VmValue::Dict(Rc::new(metadata))
839 }
840 _ => VmValue::Dict(Rc::new(BTreeMap::from([(
841 "parent_session_id".to_string(),
842 VmValue::String(Rc::from(parent_id.to_string())),
843 )]))),
844 };
845 next.insert("metadata".to_string(), metadata);
846 VmValue::Dict(Rc::new(next))
847}
848
849fn apply_system_prompt_metadata(next: &mut BTreeMap<String, VmValue>, system_prompt: &str) {
850 let mut metadata = match next.get("metadata") {
851 Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
852 _ => BTreeMap::new(),
853 };
854 metadata.insert(
855 "system_prompt".to_string(),
856 crate::stdlib::json_to_vm_value(&crate::llm::helpers::system_prompt_metadata(
857 system_prompt,
858 )),
859 );
860 next.insert("metadata".to_string(), VmValue::Dict(Rc::new(metadata)));
861}
862
863fn transcript_with_session_metadata(transcript: VmValue, state: &SessionState) -> VmValue {
864 let Some(dict) = transcript.as_dict() else {
865 return transcript;
866 };
867 let mut next = dict.clone();
868 let mut metadata = match next.get("metadata") {
869 Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
870 _ => BTreeMap::new(),
871 };
872 if let Some(tool_format) = state.tool_format.as_ref() {
873 metadata.insert(
874 "tool_format".to_string(),
875 VmValue::String(Rc::from(tool_format.clone())),
876 );
877 metadata.insert("tool_mode_locked".to_string(), VmValue::Bool(true));
878 }
879 if let Some(system_prompt) = state.system_prompt.as_ref() {
880 metadata.insert(
881 "system_prompt".to_string(),
882 crate::stdlib::json_to_vm_value(&crate::llm::helpers::system_prompt_metadata(
883 system_prompt,
884 )),
885 );
886 }
887 if !metadata.is_empty() {
888 next.insert("metadata".to_string(), VmValue::Dict(Rc::new(metadata)));
889 }
890 VmValue::Dict(Rc::new(next))
891}
892
893fn session_snapshot(state: &SessionState) -> VmValue {
894 let transcript = transcript_with_session_metadata(state.transcript.clone(), state);
895 let Some(dict) = transcript.as_dict() else {
896 return state.transcript.clone();
897 };
898 let mut next = dict.clone();
899 next.insert(
900 "parent_id".to_string(),
901 state
902 .parent_id
903 .as_ref()
904 .map(|id| VmValue::String(Rc::from(id.clone())))
905 .unwrap_or(VmValue::Nil),
906 );
907 next.insert(
908 "child_ids".to_string(),
909 VmValue::List(Rc::new(
910 state
911 .child_ids
912 .iter()
913 .cloned()
914 .map(|id| VmValue::String(Rc::from(id)))
915 .collect(),
916 )),
917 );
918 next.insert(
919 "branched_at_event_index".to_string(),
920 state
921 .branched_at_event_index
922 .map(|index| VmValue::Int(index as i64))
923 .unwrap_or(VmValue::Nil),
924 );
925 VmValue::Dict(Rc::new(next))
926}
927
928fn update_lineage(
929 map: &mut HashMap<String, SessionState>,
930 parent_id: &str,
931 child_id: &str,
932 branched_at_event_index: Option<usize>,
933) {
934 let old_parent_id = map.get(child_id).and_then(|child| child.parent_id.clone());
935 if let Some(old_parent_id) = old_parent_id.filter(|old_parent_id| old_parent_id != parent_id) {
936 if let Some(old_parent) = map.get_mut(&old_parent_id) {
937 old_parent.child_ids.retain(|id| id != child_id);
938 old_parent.last_accessed = Instant::now();
939 }
940 }
941 if let Some(parent) = map.get_mut(parent_id) {
942 parent.last_accessed = Instant::now();
943 if !parent.child_ids.iter().any(|id| id == child_id) {
944 parent.child_ids.push(child_id.to_string());
945 }
946 }
947 if let Some(child) = map.get_mut(child_id) {
948 child.last_accessed = Instant::now();
949 child.parent_id = Some(parent_id.to_string());
950 child.branched_at_event_index = branched_at_event_index;
951 child.transcript = clone_transcript_with_parent(&child.transcript, parent_id);
952 }
953}
954
955fn branch_event_index(transcript: &VmValue, keep_first: usize) -> usize {
956 if keep_first == 0 {
957 return 0;
958 }
959 let Some(dict) = transcript.as_dict() else {
960 return keep_first;
961 };
962 let Some(VmValue::List(events)) = dict.get("events") else {
963 return keep_first;
964 };
965 let mut retained_messages = 0usize;
966 for (index, event) in events.iter().enumerate() {
967 let kind = event
968 .as_dict()
969 .and_then(|dict| dict.get("kind"))
970 .map(VmValue::display);
971 if matches!(kind.as_deref(), Some("message" | "tool_result")) {
972 retained_messages += 1;
973 if retained_messages == keep_first {
974 return index + 1;
975 }
976 }
977 }
978 events.len()
979}
980
981#[cfg(test)]
982mod tests {
983 use super::*;
984 use crate::agent_events::{
985 emit_event, reset_all_sinks, session_external_sink_count, AgentEvent,
986 };
987 use crate::event_log::{active_event_log, EventLog, Topic};
988 use std::collections::BTreeMap;
989
990 fn make_msg(role: &str, content: &str) -> VmValue {
991 let mut m: BTreeMap<String, VmValue> = BTreeMap::new();
992 m.insert("role".to_string(), VmValue::String(Rc::from(role)));
993 m.insert("content".to_string(), VmValue::String(Rc::from(content)));
994 VmValue::Dict(Rc::new(m))
995 }
996
997 fn message_count(id: &str) -> usize {
998 SESSIONS.with(|s| {
999 let map = s.borrow();
1000 let Some(state) = map.get(id) else { return 0 };
1001 let Some(dict) = state.transcript.as_dict() else {
1002 return 0;
1003 };
1004 match dict.get("messages") {
1005 Some(VmValue::List(list)) => list.len(),
1006 _ => 0,
1007 }
1008 })
1009 }
1010
1011 fn event_count_by_kind(id: &str, expected_kind: &str) -> usize {
1012 snapshot(id)
1013 .and_then(|snapshot| snapshot.as_dict().cloned())
1014 .and_then(|dict| dict.get("events").cloned())
1015 .and_then(|events| match events {
1016 VmValue::List(events) => Some(
1017 events
1018 .iter()
1019 .filter(|event| {
1020 event
1021 .as_dict()
1022 .and_then(|dict| dict.get("kind"))
1023 .map(VmValue::display)
1024 .as_deref()
1025 == Some(expected_kind)
1026 })
1027 .count(),
1028 ),
1029 _ => None,
1030 })
1031 .unwrap_or(0)
1032 }
1033
1034 #[test]
1035 fn records_system_prompt_as_metadata_event_without_message() {
1036 reset_session_store();
1037 let id = open_or_create(Some("system-prompt-session".into()));
1038 record_system_prompt(&id, "Follow the workflow.").unwrap();
1039 record_system_prompt(&id, "Follow the workflow.").unwrap();
1040 inject_message(&id, make_msg("user", "hello")).unwrap();
1041
1042 let snapshot = snapshot(&id).expect("session snapshot");
1043 let metadata = snapshot
1044 .as_dict()
1045 .and_then(|dict| dict.get("metadata"))
1046 .and_then(VmValue::as_dict)
1047 .expect("metadata");
1048 let system_prompt = metadata
1049 .get("system_prompt")
1050 .and_then(VmValue::as_dict)
1051 .expect("system prompt metadata");
1052 assert_eq!(
1053 system_prompt
1054 .get("content")
1055 .map(VmValue::display)
1056 .as_deref(),
1057 Some("Follow the workflow.")
1058 );
1059 assert_eq!(message_count(&id), 1);
1060 assert_eq!(event_count_by_kind(&id, "system_prompt"), 1);
1061 }
1062
1063 #[test]
1064 fn fork_at_truncates_destination_to_keep_first() {
1065 reset_session_store();
1066 let src = open_or_create(Some("src-fork-at".into()));
1067 inject_message(&src, make_msg("user", "a")).unwrap();
1068 inject_message(&src, make_msg("assistant", "b")).unwrap();
1069 inject_message(&src, make_msg("user", "c")).unwrap();
1070 inject_message(&src, make_msg("assistant", "d")).unwrap();
1071 assert_eq!(message_count(&src), 4);
1072
1073 let dst = fork_at(&src, 2, Some("dst-fork-at".into())).expect("fork_at");
1074 assert_ne!(dst, src);
1075 assert_eq!(message_count(&dst), 2, "branched at message index 2");
1076 assert_eq!(
1077 snapshot(&dst)
1078 .and_then(|value| value.as_dict().cloned())
1079 .and_then(|dict| dict
1080 .get("branched_at_event_index")
1081 .and_then(VmValue::as_int)),
1082 Some(2)
1083 );
1084 assert_eq!(message_count(&src), 4);
1086 assert_eq!(subscriber_count(&dst), 0);
1088 reset_session_store();
1089 }
1090
1091 #[test]
1092 fn fork_at_on_unknown_source_returns_none() {
1093 reset_session_store();
1094 assert!(fork_at("does-not-exist", 3, None).is_none());
1095 }
1096
1097 #[test]
1098 fn child_sessions_record_parent_lineage() {
1099 reset_session_store();
1100 let parent = open_or_create(Some("parent-session".into()));
1101 let child = open_child_session(&parent, Some("child-session".into()));
1102 assert_eq!(parent_id(&child).as_deref(), Some("parent-session"));
1103 assert_eq!(child_ids(&parent), vec!["child-session".to_string()]);
1104 assert_eq!(
1105 ancestry(&child),
1106 Some(SessionAncestry {
1107 parent_id: Some("parent-session".to_string()),
1108 child_ids: Vec::new(),
1109 root_id: "parent-session".to_string(),
1110 })
1111 );
1112
1113 let transcript = snapshot(&child).expect("child transcript");
1114 let transcript = transcript.as_dict().expect("child snapshot");
1115 let metadata = transcript
1116 .get("metadata")
1117 .and_then(|value| value.as_dict())
1118 .expect("child metadata");
1119 assert!(
1120 matches!(transcript.get("parent_id"), Some(VmValue::String(value)) if value.as_ref() == "parent-session")
1121 );
1122 assert!(
1123 matches!(transcript.get("child_ids"), Some(VmValue::List(children)) if children.is_empty())
1124 );
1125 assert!(matches!(
1126 transcript.get("branched_at_event_index"),
1127 Some(VmValue::Nil)
1128 ));
1129 assert!(matches!(
1130 metadata.get("parent_session_id"),
1131 Some(VmValue::String(value)) if value.as_ref() == "parent-session"
1132 ));
1133 }
1134
1135 #[test]
1136 fn branch_event_index_counts_non_message_events() {
1137 reset_session_store();
1138 let src = open_or_create(Some("branch-event-index".into()));
1139 let transcript = VmValue::Dict(Rc::new(BTreeMap::from([
1140 ("id".to_string(), VmValue::String(Rc::from(src.clone()))),
1141 (
1142 "messages".to_string(),
1143 VmValue::List(Rc::new(vec![
1144 make_msg("user", "a"),
1145 make_msg("assistant", "b"),
1146 ])),
1147 ),
1148 (
1149 "events".to_string(),
1150 VmValue::List(Rc::new(vec![
1151 VmValue::Dict(Rc::new(BTreeMap::from([(
1152 "kind".to_string(),
1153 VmValue::String(Rc::from("message")),
1154 )]))),
1155 VmValue::Dict(Rc::new(BTreeMap::from([(
1156 "kind".to_string(),
1157 VmValue::String(Rc::from("sub_agent_start")),
1158 )]))),
1159 VmValue::Dict(Rc::new(BTreeMap::from([(
1160 "kind".to_string(),
1161 VmValue::String(Rc::from("message")),
1162 )]))),
1163 ])),
1164 ),
1165 ])));
1166 store_transcript(&src, transcript);
1167
1168 let dst = fork_at(&src, 2, Some("branch-event-index-child".into())).expect("fork_at");
1169 assert_eq!(
1170 snapshot(&dst)
1171 .and_then(|value| value.as_dict().cloned())
1172 .and_then(|dict| dict
1173 .get("branched_at_event_index")
1174 .and_then(VmValue::as_int)),
1175 Some(3)
1176 );
1177 }
1178
1179 #[test]
1180 fn child_session_records_lineage_without_reusing_parent_transcript() {
1181 reset_session_store();
1182 let parent = open_or_create(Some("parent-fork-parent".into()));
1183 inject_message(&parent, make_msg("user", "parent context")).unwrap();
1184 claim_tool_format(&parent, "native").unwrap();
1185
1186 let child = open_child_session(&parent, Some("parent-fork-child".into()));
1187 assert_eq!(message_count(&parent), 1);
1188 assert_eq!(message_count(&child), 0);
1189 assert_eq!(tool_format(&child), None);
1190 assert_eq!(parent_id(&child).as_deref(), Some(parent.as_str()));
1191 }
1192
1193 #[test]
1194 fn prompt_state_prepends_summary_message_when_missing_from_messages() {
1195 reset_session_store();
1196 let session = open_or_create(Some("prompt-state-summary".into()));
1197 let transcript = crate::llm::helpers::new_transcript_with_events(
1198 Some(session.clone()),
1199 vec![make_msg("assistant", "latest answer")],
1200 Some("[auto-compacted 2 older messages]\nsummary".to_string()),
1201 None,
1202 Vec::new(),
1203 Vec::new(),
1204 Some("active"),
1205 );
1206 store_transcript(&session, transcript);
1207
1208 let prompt = prompt_state_json(&session);
1209 assert_eq!(
1210 prompt.summary.as_deref(),
1211 Some("[auto-compacted 2 older messages]\nsummary")
1212 );
1213 assert_eq!(prompt.messages.len(), 2);
1214 assert_eq!(prompt.messages[0]["role"].as_str(), Some("user"));
1215 assert_eq!(
1216 prompt.messages[0]["content"].as_str(),
1217 Some("[auto-compacted 2 older messages]\nsummary"),
1218 );
1219 assert_eq!(prompt.messages[1]["role"].as_str(), Some("assistant"));
1220 }
1221
1222 #[tokio::test(flavor = "current_thread", start_paused = true)]
1223 async fn open_or_create_registers_event_log_sink_when_active_log_is_installed() {
1224 reset_all_sinks();
1225 crate::event_log::reset_active_event_log();
1226 let dir = tempfile::tempdir().expect("tempdir");
1227 crate::event_log::install_default_for_base_dir(dir.path()).expect("install event log");
1228
1229 let session = open_or_create(Some("event-log-session".into()));
1230 assert_eq!(session_external_sink_count(&session), 1);
1231
1232 emit_event(&AgentEvent::TurnStart {
1233 session_id: session.clone(),
1234 iteration: 0,
1235 });
1236 tokio::time::sleep(std::time::Duration::from_millis(25)).await;
1237
1238 let topic = Topic::new("observability.agent_events.event-log-session").unwrap();
1239 let log = active_event_log().expect("active event log");
1240 let events = log.read_range(&topic, None, usize::MAX).await.unwrap();
1241 assert_eq!(events.len(), 1);
1242 assert_eq!(events[0].1.kind, "turn_start");
1243
1244 crate::event_log::reset_active_event_log();
1245 reset_all_sinks();
1246 }
1247}