1use std::cell::{Cell, RefCell};
22use std::collections::{BTreeMap, HashMap, HashSet};
23use std::rc::Rc;
24use std::time::Instant;
25
26use crate::value::VmValue;
27
28pub const DEFAULT_SESSION_CAP: usize = 128;
31
32pub struct SessionState {
33 pub id: String,
34 pub transcript: VmValue,
35 pub subscribers: Vec<VmValue>,
36 pub created_at: String,
37 pub last_accessed: Instant,
38 pub parent_id: Option<String>,
39 pub child_ids: Vec<String>,
40 pub branched_at_event_index: Option<usize>,
41 pub active_skills: Vec<String>,
47 pub tool_format: Option<String>,
51 pub system_prompt: Option<String>,
55}
56
57impl SessionState {
58 fn new(id: String) -> Self {
59 let now = Instant::now();
60 let transcript = empty_transcript(&id);
61 Self {
62 id,
63 transcript,
64 subscribers: Vec::new(),
65 created_at: crate::orchestration::now_rfc3339(),
66 last_accessed: now,
67 parent_id: None,
68 child_ids: Vec::new(),
69 branched_at_event_index: None,
70 active_skills: Vec::new(),
71 tool_format: None,
72 system_prompt: None,
73 }
74 }
75}
76
77#[derive(Clone, Debug, PartialEq, Eq)]
78pub struct SessionAncestry {
79 pub parent_id: Option<String>,
80 pub child_ids: Vec<String>,
81 pub root_id: String,
82}
83
84thread_local! {
85 static SESSIONS: RefCell<HashMap<String, SessionState>> = RefCell::new(HashMap::new());
86 static SESSION_CAP: Cell<usize> = const { Cell::new(DEFAULT_SESSION_CAP) };
87 static CURRENT_SESSION_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
88}
89
90pub struct CurrentSessionGuard {
91 active: bool,
92}
93
94impl Drop for CurrentSessionGuard {
95 fn drop(&mut self) {
96 if self.active {
97 pop_current_session();
98 }
99 }
100}
101
102pub fn set_session_cap(cap: usize) {
105 SESSION_CAP.with(|c| c.set(cap.max(1)));
106}
107
108pub fn session_cap() -> usize {
109 SESSION_CAP.with(|c| c.get())
110}
111
112pub fn reset_session_store() {
114 SESSIONS.with(|s| s.borrow_mut().clear());
115 CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().clear());
116}
117
118pub(crate) fn push_current_session(id: String) {
119 if id.is_empty() {
120 return;
121 }
122 CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().push(id));
123}
124
125pub(crate) fn pop_current_session() {
126 CURRENT_SESSION_STACK.with(|stack| {
127 let _ = stack.borrow_mut().pop();
128 });
129}
130
131pub fn current_session_id() -> Option<String> {
132 CURRENT_SESSION_STACK.with(|stack| stack.borrow().last().cloned())
133}
134
135pub fn enter_current_session(id: impl Into<String>) -> CurrentSessionGuard {
136 let id = id.into();
137 if id.trim().is_empty() {
138 return CurrentSessionGuard { active: false };
139 }
140 push_current_session(id);
141 CurrentSessionGuard { active: true }
142}
143
144pub fn exists(id: &str) -> bool {
145 SESSIONS.with(|s| s.borrow().contains_key(id))
146}
147
148pub fn length(id: &str) -> Option<usize> {
149 SESSIONS.with(|s| {
150 s.borrow().get(id).map(|state| {
151 state
152 .transcript
153 .as_dict()
154 .and_then(|d| d.get("messages"))
155 .and_then(|v| match v {
156 VmValue::List(list) => Some(list.len()),
157 _ => None,
158 })
159 .unwrap_or(0)
160 })
161 })
162}
163
164pub fn snapshot(id: &str) -> Option<VmValue> {
165 SESSIONS.with(|s| s.borrow().get(id).map(session_snapshot))
166}
167
168pub fn transcript(id: &str) -> Option<VmValue> {
171 SESSIONS.with(|s| {
172 s.borrow()
173 .get(id)
174 .map(|state| transcript_with_session_metadata(state.transcript.clone(), state))
175 })
176}
177
178pub fn open_or_create(id: Option<String>) -> String {
187 let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
188 let parent_session = current_session_id();
189 let mut was_new = false;
190 SESSIONS.with(|s| {
191 let mut map = s.borrow_mut();
192 if let Some(state) = map.get_mut(&resolved) {
193 state.last_accessed = Instant::now();
194 return;
195 }
196 was_new = true;
197 let cap = SESSION_CAP.with(|c| c.get());
198 if map.len() >= cap {
199 if let Some(victim) = map
200 .iter()
201 .min_by_key(|(_, state)| state.last_accessed)
202 .map(|(id, _)| id.clone())
203 {
204 map.remove(&victim);
205 }
206 }
207 map.insert(resolved.clone(), SessionState::new(resolved.clone()));
208 });
209 if was_new {
210 if let Some(parent) = parent_session.as_deref() {
211 crate::agent_events::mirror_session_sinks(parent, &resolved);
212 }
213 try_register_event_log(&resolved);
214 }
215 resolved
216}
217
218pub fn open_child_session(parent_id: &str, id: Option<String>) -> String {
219 let resolved = open_or_create(id);
220 link_child_session(parent_id, &resolved);
221 resolved
222}
223
224pub fn link_child_session(parent_id: &str, child_id: &str) {
225 link_child_session_with_branch(parent_id, child_id, None);
226}
227
228pub fn link_child_session_with_branch(
229 parent_id: &str,
230 child_id: &str,
231 branched_at_event_index: Option<usize>,
232) {
233 if parent_id == child_id {
234 return;
235 }
236 open_or_create(Some(parent_id.to_string()));
237 open_or_create(Some(child_id.to_string()));
238 SESSIONS.with(|s| {
239 let mut map = s.borrow_mut();
240 update_lineage(&mut map, parent_id, child_id, branched_at_event_index);
241 });
242}
243
244pub fn parent_id(id: &str) -> Option<String> {
245 SESSIONS.with(|s| s.borrow().get(id).and_then(|state| state.parent_id.clone()))
246}
247
248pub fn child_ids(id: &str) -> Vec<String> {
249 SESSIONS.with(|s| {
250 s.borrow()
251 .get(id)
252 .map(|state| state.child_ids.clone())
253 .unwrap_or_default()
254 })
255}
256
257pub fn ancestry(id: &str) -> Option<SessionAncestry> {
258 SESSIONS.with(|s| {
259 let map = s.borrow();
260 let state = map.get(id)?;
261 let mut root_id = state.id.clone();
262 let mut cursor = state.parent_id.clone();
263 let mut seen = HashSet::from([state.id.clone()]);
264 while let Some(parent_id) = cursor {
265 if !seen.insert(parent_id.clone()) {
266 break;
267 }
268 root_id = parent_id.clone();
269 cursor = map
270 .get(&parent_id)
271 .and_then(|parent| parent.parent_id.clone());
272 }
273 Some(SessionAncestry {
274 parent_id: state.parent_id.clone(),
275 child_ids: state.child_ids.clone(),
276 root_id,
277 })
278 })
279}
280
281fn try_register_event_log(session_id: &str) {
285 if let Some(log) = crate::event_log::active_event_log() {
286 crate::agent_events::register_sink(
287 session_id,
288 crate::agent_events::EventLogSink::new(log, session_id),
289 );
290 return;
291 }
292 let Ok(dir) = std::env::var("HARN_EVENT_LOG_DIR") else {
293 return;
294 };
295 if dir.is_empty() {
296 return;
297 }
298 let path = std::path::PathBuf::from(dir).join(format!("event_log-{session_id}.jsonl"));
299 if let Ok(sink) = crate::agent_events::JsonlEventSink::open(&path) {
300 crate::agent_events::register_sink(session_id, sink);
301 }
302}
303
304pub fn register_event_log_sink(session_id: &str) {
305 try_register_event_log(session_id);
306}
307
308pub fn close(id: &str) {
309 SESSIONS.with(|s| {
310 s.borrow_mut().remove(id);
311 });
312}
313
314pub fn close_with_status(
315 id: &str,
316 reason: impl Into<String>,
317 status: impl Into<String>,
318 metadata: serde_json::Value,
319) -> bool {
320 if !exists(id) {
321 return false;
322 }
323 let reason = reason.into();
324 let status = status.into();
325 let event_metadata = serde_json::json!({
326 "reason": reason,
327 "status": status,
328 "metadata": metadata,
329 });
330 let transcript_event = crate::llm::helpers::transcript_event(
331 "agent_session_closed",
332 "system",
333 "internal",
334 "Agent session closed",
335 Some(event_metadata.clone()),
336 );
337 let _ = append_event(id, transcript_event);
338 crate::llm::emit_live_agent_event_sync(&crate::agent_events::AgentEvent::SessionClosed {
339 session_id: id.to_string(),
340 reason,
341 status,
342 metadata,
343 });
344 close(id);
345 crate::agent_events::clear_session_sinks(id);
346 true
347}
348
349pub fn reset_transcript(id: &str) -> bool {
350 SESSIONS.with(|s| {
351 let mut map = s.borrow_mut();
352 let Some(state) = map.get_mut(id) else {
353 return false;
354 };
355 state.transcript = empty_transcript(id);
356 state.tool_format = None;
357 state.system_prompt = None;
358 state.last_accessed = Instant::now();
359 true
360 })
361}
362
363pub fn fork(src_id: &str, dst_id: Option<String>) -> Option<String> {
370 let (src_transcript, src_tool_format, src_system_prompt, dst) = SESSIONS.with(|s| {
371 let mut map = s.borrow_mut();
372 let src = map.get_mut(src_id)?;
373 src.last_accessed = Instant::now();
374 let dst = dst_id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
375 let forked_transcript = clone_transcript_with_id(&src.transcript, &dst);
376 Some((
377 forked_transcript,
378 src.tool_format.clone(),
379 src.system_prompt.clone(),
380 dst,
381 ))
382 })?;
383 open_or_create(Some(dst.clone()));
385 SESSIONS.with(|s| {
386 let mut map = s.borrow_mut();
387 if let Some(state) = map.get_mut(&dst) {
388 state.transcript = src_transcript;
389 state.tool_format = src_tool_format;
390 state.system_prompt = src_system_prompt;
391 state.last_accessed = Instant::now();
392 }
393 update_lineage(&mut map, src_id, &dst, None);
394 });
395 if exists(&dst) {
399 Some(dst)
400 } else {
401 None
402 }
403}
404
405pub fn fork_at(src_id: &str, keep_first: usize, dst_id: Option<String>) -> Option<String> {
416 let branched_at_event_index = SESSIONS.with(|s| {
417 let map = s.borrow();
418 let src = map.get(src_id)?;
419 Some(branch_event_index(&src.transcript, keep_first))
420 })?;
421 let new_id = fork(src_id, dst_id)?;
422 link_child_session_with_branch(src_id, &new_id, Some(branched_at_event_index));
423 retain_first(&new_id, keep_first);
424 Some(new_id)
425}
426
427fn retain_first(id: &str, keep_first: usize) {
431 SESSIONS.with(|s| {
432 let mut map = s.borrow_mut();
433 let Some(state) = map.get_mut(id) else {
434 return;
435 };
436 let Some(dict) = state.transcript.as_dict() else {
437 return;
438 };
439 let dict = dict.clone();
440 let messages: Vec<VmValue> = match dict.get("messages") {
441 Some(VmValue::List(list)) => list.iter().cloned().collect(),
442 _ => Vec::new(),
443 };
444 let retained: Vec<VmValue> = messages.into_iter().take(keep_first).collect();
445 let mut next = dict;
446 next.insert(
447 "events".to_string(),
448 VmValue::List(Rc::new(
449 crate::llm::helpers::transcript_events_from_messages(&retained),
450 )),
451 );
452 next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
453 state.transcript = VmValue::Dict(Rc::new(next));
454 state.last_accessed = Instant::now();
455 });
456}
457
458pub fn trim(id: &str, keep_last: usize) -> Option<usize> {
461 SESSIONS.with(|s| {
462 let mut map = s.borrow_mut();
463 let state = map.get_mut(id)?;
464 let dict = state.transcript.as_dict()?.clone();
465 let messages: Vec<VmValue> = match dict.get("messages") {
466 Some(VmValue::List(list)) => list.iter().cloned().collect(),
467 _ => Vec::new(),
468 };
469 let start = messages.len().saturating_sub(keep_last);
470 let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
471 let kept = retained.len();
472 let mut next = dict;
473 next.insert(
474 "events".to_string(),
475 VmValue::List(Rc::new(
476 crate::llm::helpers::transcript_events_from_messages(&retained),
477 )),
478 );
479 next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
480 state.transcript = VmValue::Dict(Rc::new(next));
481 state.last_accessed = Instant::now();
482 Some(kept)
483 })
484}
485
486pub fn inject_message(id: &str, message: VmValue) -> Result<(), String> {
489 let Some(msg_dict) = message.as_dict().cloned() else {
490 return Err("agent_session_inject: message must be a dict".into());
491 };
492 let role_ok = matches!(msg_dict.get("role"), Some(VmValue::String(_)));
493 if !role_ok {
494 return Err(
495 "agent_session_inject: message must have a string `role` (user|assistant|tool_result|system)"
496 .into(),
497 );
498 }
499 SESSIONS.with(|s| {
500 let mut map = s.borrow_mut();
501 let Some(state) = map.get_mut(id) else {
502 return Err(format!("agent_session_inject: unknown session id '{id}'"));
503 };
504 let dict = state
505 .transcript
506 .as_dict()
507 .cloned()
508 .unwrap_or_else(BTreeMap::new);
509 let mut messages: Vec<VmValue> = match dict.get("messages") {
510 Some(VmValue::List(list)) => list.iter().cloned().collect(),
511 _ => Vec::new(),
512 };
513 let mut events: Vec<VmValue> = match dict.get("events") {
514 Some(VmValue::List(list)) => list.iter().cloned().collect(),
515 _ => crate::llm::helpers::transcript_events_from_messages(&messages),
516 };
517 let new_message = VmValue::Dict(Rc::new(msg_dict));
518 emit_llm_message_event(id, messages.len(), &new_message);
519 events.push(crate::llm::helpers::transcript_event_from_message(
520 &new_message,
521 ));
522 messages.push(new_message);
523 let mut next = dict;
524 next.insert("events".to_string(), VmValue::List(Rc::new(events)));
525 next.insert("messages".to_string(), VmValue::List(Rc::new(messages)));
526 state.transcript = VmValue::Dict(Rc::new(next));
527 state.last_accessed = Instant::now();
528 Ok(())
529 })
530}
531
532fn emit_llm_message_event(session_id: &str, message_index: usize, message: &VmValue) {
533 let mut fields = serde_json::Map::new();
534 fields.insert(
535 "session_id".to_string(),
536 serde_json::Value::String(session_id.to_string()),
537 );
538 fields.insert(
539 "message_index".to_string(),
540 serde_json::json!(message_index),
541 );
542 let message_json = crate::llm::helpers::vm_value_to_json(message);
543 if let Some(role) = message_json.get("role").and_then(|value| value.as_str()) {
544 fields.insert(
545 "role".to_string(),
546 serde_json::Value::String(role.to_string()),
547 );
548 }
549 if let Some(content) = message_json.get("content") {
550 fields.insert("content".to_string(), content.clone());
551 }
552 fields.insert("message".to_string(), message_json);
553 crate::llm::append_observability_sidecar_entry("message", fields);
554}
555
556pub fn seed_from_messages(
562 id: Option<String>,
563 messages: &[serde_json::Value],
564 metadata: serde_json::Value,
565 system_prompt: Option<String>,
566 tool_format: Option<String>,
567) -> Result<String, String> {
568 let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
569 if exists(&resolved) {
570 return Err(format!("agent session '{resolved}' already exists"));
571 }
572 open_or_create(Some(resolved.clone()));
573 SESSIONS.with(|s| {
574 let mut map = s.borrow_mut();
575 let Some(state) = map.get_mut(&resolved) else {
576 return Err(format!("failed to create agent session '{resolved}'"));
577 };
578 state.tool_format = tool_format.filter(|value| !value.trim().is_empty());
579 state.system_prompt = system_prompt.filter(|value| !value.trim().is_empty());
580
581 let mut metadata = metadata
582 .as_object()
583 .cloned()
584 .unwrap_or_else(serde_json::Map::new);
585 if let Some(tool_format) = state.tool_format.as_ref() {
586 metadata.insert(
587 "tool_format".to_string(),
588 serde_json::Value::String(tool_format.clone()),
589 );
590 metadata.insert(
591 "tool_mode_locked".to_string(),
592 serde_json::Value::Bool(true),
593 );
594 }
595 if let Some(system_prompt) = state.system_prompt.as_ref() {
596 metadata.insert(
597 "system_prompt".to_string(),
598 crate::llm::helpers::system_prompt_metadata(system_prompt),
599 );
600 }
601 let vm_messages = crate::llm::helpers::json_messages_to_vm(messages);
602 state.transcript = crate::llm::helpers::new_transcript_with(
603 Some(resolved.clone()),
604 vm_messages,
605 None,
606 Some(crate::stdlib::json_to_vm_value(&serde_json::Value::Object(
607 metadata,
608 ))),
609 );
610 state.last_accessed = Instant::now();
611 Ok(resolved)
612 })
613}
614
615pub fn messages_json(id: &str) -> Vec<serde_json::Value> {
619 SESSIONS.with(|s| {
620 let map = s.borrow();
621 let Some(state) = map.get(id) else {
622 return Vec::new();
623 };
624 let Some(dict) = state.transcript.as_dict() else {
625 return Vec::new();
626 };
627 match dict.get("messages") {
628 Some(VmValue::List(list)) => list
629 .iter()
630 .map(crate::llm::helpers::vm_value_to_json)
631 .collect(),
632 _ => Vec::new(),
633 }
634 })
635}
636
637#[derive(Clone, Debug, Default)]
638pub struct SessionPromptState {
639 pub messages: Vec<serde_json::Value>,
640 pub summary: Option<String>,
641}
642
643fn summary_message_json(summary: &str) -> serde_json::Value {
644 serde_json::json!({
645 "role": "user",
646 "content": summary,
647 })
648}
649
650fn messages_begin_with_summary(messages: &[serde_json::Value], summary: &str) -> bool {
651 messages.first().is_some_and(|message| {
652 message.get("role").and_then(|value| value.as_str()) == Some("user")
653 && message.get("content").and_then(|value| value.as_str()) == Some(summary)
654 })
655}
656
657pub fn prompt_state_json(id: &str) -> SessionPromptState {
665 SESSIONS.with(|s| {
666 let map = s.borrow();
667 let Some(state) = map.get(id) else {
668 return SessionPromptState::default();
669 };
670 let Some(dict) = state.transcript.as_dict() else {
671 return SessionPromptState::default();
672 };
673 let mut messages = match dict.get("messages") {
674 Some(VmValue::List(list)) => list
675 .iter()
676 .map(crate::llm::helpers::vm_value_to_json)
677 .collect::<Vec<_>>(),
678 _ => Vec::new(),
679 };
680 let summary = dict.get("summary").and_then(|value| match value {
681 VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
682 _ => None,
683 });
684 if let Some(summary_text) = summary.as_deref() {
685 if !messages_begin_with_summary(&messages, summary_text) {
686 messages.insert(0, summary_message_json(summary_text));
687 }
688 }
689 SessionPromptState { messages, summary }
690 })
691}
692
693pub fn store_transcript(id: &str, transcript: VmValue) {
696 SESSIONS.with(|s| {
697 if let Some(state) = s.borrow_mut().get_mut(id) {
698 state.transcript = transcript_with_session_metadata(transcript, state);
699 state.last_accessed = Instant::now();
700 }
701 });
702}
703
704pub fn append_event(id: &str, event: VmValue) -> Result<(), String> {
710 let Some(event_dict) = event.as_dict() else {
711 return Err("agent_session_append_event: event must be a dict".into());
712 };
713 let kind_ok = matches!(event_dict.get("kind"), Some(VmValue::String(_)));
714 if !kind_ok {
715 return Err("agent_session_append_event: event must have a string `kind`".into());
716 }
717 SESSIONS.with(|s| {
718 let mut map = s.borrow_mut();
719 let Some(state) = map.get_mut(id) else {
720 return Err(format!(
721 "agent_session_append_event: unknown session id '{id}'"
722 ));
723 };
724 let dict = state
725 .transcript
726 .as_dict()
727 .cloned()
728 .unwrap_or_else(BTreeMap::new);
729 let mut events: Vec<VmValue> = match dict.get("events") {
730 Some(VmValue::List(list)) => list.iter().cloned().collect(),
731 _ => dict
732 .get("messages")
733 .and_then(|value| match value {
734 VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
735 _ => None,
736 })
737 .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
738 .unwrap_or_default(),
739 };
740 events.push(event);
741 let mut next = dict;
742 next.insert("events".to_string(), VmValue::List(Rc::new(events)));
743 state.transcript = VmValue::Dict(Rc::new(next));
744 state.last_accessed = Instant::now();
745 Ok(())
746 })
747}
748
749pub fn replace_messages(id: &str, messages: &[serde_json::Value]) {
752 replace_messages_with_summary(id, messages, None);
753}
754
755pub fn replace_messages_with_summary(
760 id: &str,
761 messages: &[serde_json::Value],
762 summary: Option<&str>,
763) {
764 SESSIONS.with(|s| {
765 let mut map = s.borrow_mut();
766 let Some(state) = map.get_mut(id) else {
767 return;
768 };
769 let dict = state
770 .transcript
771 .as_dict()
772 .cloned()
773 .unwrap_or_else(BTreeMap::new);
774 let vm_messages: Vec<VmValue> = messages
775 .iter()
776 .map(crate::stdlib::json_to_vm_value)
777 .collect();
778 let mut next = dict;
779 next.insert(
780 "events".to_string(),
781 VmValue::List(Rc::new(
782 crate::llm::helpers::transcript_events_from_messages(&vm_messages),
783 )),
784 );
785 next.insert("messages".to_string(), VmValue::List(Rc::new(vm_messages)));
786 if let Some(summary) = summary {
787 next.insert(
788 "summary".to_string(),
789 VmValue::String(Rc::from(summary.to_string())),
790 );
791 }
792 state.transcript = VmValue::Dict(Rc::new(next));
793 state.last_accessed = Instant::now();
794 });
795}
796
797pub fn append_subscriber(id: &str, callback: VmValue) {
798 open_or_create(Some(id.to_string()));
799 SESSIONS.with(|s| {
800 if let Some(state) = s.borrow_mut().get_mut(id) {
801 state.subscribers.push(callback);
802 state.last_accessed = Instant::now();
803 }
804 });
805}
806
807pub fn subscribers_for(id: &str) -> Vec<VmValue> {
808 SESSIONS.with(|s| {
809 s.borrow()
810 .get(id)
811 .map(|state| state.subscribers.clone())
812 .unwrap_or_default()
813 })
814}
815
816pub fn subscriber_count(id: &str) -> usize {
817 SESSIONS.with(|s| {
818 s.borrow()
819 .get(id)
820 .map(|state| state.subscribers.len())
821 .unwrap_or(0)
822 })
823}
824
825pub fn set_active_skills(id: &str, skills: Vec<String>) {
829 SESSIONS.with(|s| {
830 if let Some(state) = s.borrow_mut().get_mut(id) {
831 state.active_skills = skills;
832 state.last_accessed = Instant::now();
833 }
834 });
835}
836
837pub fn active_skills(id: &str) -> Vec<String> {
841 SESSIONS.with(|s| {
842 s.borrow()
843 .get(id)
844 .map(|state| state.active_skills.clone())
845 .unwrap_or_default()
846 })
847}
848
849pub fn claim_tool_format(id: &str, tool_format: &str) -> Result<(), String> {
855 let tool_format = tool_format.trim();
856 if tool_format.is_empty() {
857 return Ok(());
858 }
859 SESSIONS.with(|s| {
860 let mut map = s.borrow_mut();
861 let Some(state) = map.get_mut(id) else {
862 return Err(format!("agent session '{id}' does not exist"));
863 };
864 match state.tool_format.as_deref() {
865 Some(existing) if existing != tool_format => Err(format!(
866 "agent session '{id}' is locked to tool_format='{existing}', but this run requested tool_format='{tool_format}'. Start a new session or fork/reset the transcript before changing tool mode."
867 )),
868 Some(_) => {
869 state.last_accessed = Instant::now();
870 Ok(())
871 }
872 None => {
873 state.tool_format = Some(tool_format.to_string());
874 state.last_accessed = Instant::now();
875 Ok(())
876 }
877 }
878 })
879}
880
881pub fn tool_format(id: &str) -> Option<String> {
882 SESSIONS.with(|s| {
883 s.borrow()
884 .get(id)
885 .and_then(|state| state.tool_format.clone())
886 })
887}
888
889pub fn record_system_prompt(id: &str, system_prompt: &str) -> Result<(), String> {
890 let system_prompt = system_prompt.trim();
891 if system_prompt.is_empty() {
892 return Ok(());
893 }
894 SESSIONS.with(|s| {
895 let mut map = s.borrow_mut();
896 let Some(state) = map.get_mut(id) else {
897 return Err(format!("agent session '{id}' does not exist"));
898 };
899 let changed = state.system_prompt.as_deref() != Some(system_prompt);
900 state.system_prompt = Some(system_prompt.to_string());
901 let dict = state
902 .transcript
903 .as_dict()
904 .cloned()
905 .unwrap_or_else(BTreeMap::new);
906 let mut next = dict;
907 apply_system_prompt_metadata(&mut next, system_prompt);
908 if changed {
909 let mut events: Vec<VmValue> = match next.get("events") {
910 Some(VmValue::List(list)) => list.iter().cloned().collect(),
911 _ => Vec::new(),
912 };
913 events.push(crate::llm::helpers::transcript_event(
914 "system_prompt",
915 "system",
916 "internal",
917 "",
918 Some(crate::llm::helpers::system_prompt_event_metadata(
919 system_prompt,
920 )),
921 ));
922 next.insert("events".to_string(), VmValue::List(Rc::new(events)));
923 }
924 state.transcript = VmValue::Dict(Rc::new(next));
925 state.last_accessed = Instant::now();
926 Ok(())
927 })
928}
929
930pub fn system_prompt(id: &str) -> Option<String> {
931 SESSIONS.with(|s| {
932 s.borrow()
933 .get(id)
934 .and_then(|state| state.system_prompt.clone())
935 })
936}
937
938fn empty_transcript(id: &str) -> VmValue {
939 use crate::llm::helpers::new_transcript_with;
940 new_transcript_with(Some(id.to_string()), Vec::new(), None, None)
941}
942
943fn clone_transcript_with_id(transcript: &VmValue, new_id: &str) -> VmValue {
944 let Some(dict) = transcript.as_dict() else {
945 return empty_transcript(new_id);
946 };
947 let mut next = dict.clone();
948 next.insert(
949 "id".to_string(),
950 VmValue::String(Rc::from(new_id.to_string())),
951 );
952 VmValue::Dict(Rc::new(next))
953}
954
955fn clone_transcript_with_parent(transcript: &VmValue, parent_id: &str) -> VmValue {
956 let Some(dict) = transcript.as_dict() else {
957 return transcript.clone();
958 };
959 let mut next = dict.clone();
960 let metadata = match next.get("metadata") {
961 Some(VmValue::Dict(metadata)) => {
962 let mut metadata = metadata.as_ref().clone();
963 metadata.insert(
964 "parent_session_id".to_string(),
965 VmValue::String(Rc::from(parent_id.to_string())),
966 );
967 VmValue::Dict(Rc::new(metadata))
968 }
969 _ => VmValue::Dict(Rc::new(BTreeMap::from([(
970 "parent_session_id".to_string(),
971 VmValue::String(Rc::from(parent_id.to_string())),
972 )]))),
973 };
974 next.insert("metadata".to_string(), metadata);
975 VmValue::Dict(Rc::new(next))
976}
977
978fn apply_system_prompt_metadata(next: &mut BTreeMap<String, VmValue>, system_prompt: &str) {
979 let mut metadata = match next.get("metadata") {
980 Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
981 _ => BTreeMap::new(),
982 };
983 metadata.insert(
984 "system_prompt".to_string(),
985 crate::stdlib::json_to_vm_value(&crate::llm::helpers::system_prompt_metadata(
986 system_prompt,
987 )),
988 );
989 next.insert("metadata".to_string(), VmValue::Dict(Rc::new(metadata)));
990}
991
992fn transcript_with_session_metadata(transcript: VmValue, state: &SessionState) -> VmValue {
993 let Some(dict) = transcript.as_dict() else {
994 return transcript;
995 };
996 let mut next = dict.clone();
997 let mut metadata = match next.get("metadata") {
998 Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
999 _ => BTreeMap::new(),
1000 };
1001 if let Some(tool_format) = state.tool_format.as_ref() {
1002 metadata.insert(
1003 "tool_format".to_string(),
1004 VmValue::String(Rc::from(tool_format.clone())),
1005 );
1006 metadata.insert("tool_mode_locked".to_string(), VmValue::Bool(true));
1007 }
1008 if let Some(system_prompt) = state.system_prompt.as_ref() {
1009 metadata.insert(
1010 "system_prompt".to_string(),
1011 crate::stdlib::json_to_vm_value(&crate::llm::helpers::system_prompt_metadata(
1012 system_prompt,
1013 )),
1014 );
1015 }
1016 if !metadata.is_empty() {
1017 next.insert("metadata".to_string(), VmValue::Dict(Rc::new(metadata)));
1018 }
1019 VmValue::Dict(Rc::new(next))
1020}
1021
1022fn session_snapshot(state: &SessionState) -> VmValue {
1023 let transcript = transcript_with_session_metadata(state.transcript.clone(), state);
1024 let Some(dict) = transcript.as_dict() else {
1025 return state.transcript.clone();
1026 };
1027 let mut next = dict.clone();
1028 let length = next
1029 .get("messages")
1030 .and_then(|value| match value {
1031 VmValue::List(list) => Some(list.len() as i64),
1032 _ => None,
1033 })
1034 .unwrap_or(0);
1035 next.insert("length".to_string(), VmValue::Int(length));
1036 next.insert(
1037 "created_at".to_string(),
1038 VmValue::String(Rc::from(state.created_at.clone())),
1039 );
1040 next.insert(
1041 "parent_id".to_string(),
1042 state
1043 .parent_id
1044 .as_ref()
1045 .map(|id| VmValue::String(Rc::from(id.clone())))
1046 .unwrap_or(VmValue::Nil),
1047 );
1048 next.insert(
1049 "child_ids".to_string(),
1050 VmValue::List(Rc::new(
1051 state
1052 .child_ids
1053 .iter()
1054 .cloned()
1055 .map(|id| VmValue::String(Rc::from(id)))
1056 .collect(),
1057 )),
1058 );
1059 next.insert(
1060 "branched_at_event_index".to_string(),
1061 state
1062 .branched_at_event_index
1063 .map(|index| VmValue::Int(index as i64))
1064 .unwrap_or(VmValue::Nil),
1065 );
1066 next.insert(
1067 "system_prompt".to_string(),
1068 state
1069 .system_prompt
1070 .as_ref()
1071 .map(|prompt| VmValue::String(Rc::from(prompt.clone())))
1072 .unwrap_or(VmValue::Nil),
1073 );
1074 next.insert(
1075 "tool_format".to_string(),
1076 state
1077 .tool_format
1078 .as_ref()
1079 .map(|format| VmValue::String(Rc::from(format.clone())))
1080 .unwrap_or(VmValue::Nil),
1081 );
1082 VmValue::Dict(Rc::new(next))
1083}
1084
1085fn update_lineage(
1086 map: &mut HashMap<String, SessionState>,
1087 parent_id: &str,
1088 child_id: &str,
1089 branched_at_event_index: Option<usize>,
1090) {
1091 let old_parent_id = map.get(child_id).and_then(|child| child.parent_id.clone());
1092 if let Some(old_parent_id) = old_parent_id.filter(|old_parent_id| old_parent_id != parent_id) {
1093 if let Some(old_parent) = map.get_mut(&old_parent_id) {
1094 old_parent.child_ids.retain(|id| id != child_id);
1095 old_parent.last_accessed = Instant::now();
1096 }
1097 }
1098 if let Some(parent) = map.get_mut(parent_id) {
1099 parent.last_accessed = Instant::now();
1100 if !parent.child_ids.iter().any(|id| id == child_id) {
1101 parent.child_ids.push(child_id.to_string());
1102 }
1103 }
1104 if let Some(child) = map.get_mut(child_id) {
1105 child.last_accessed = Instant::now();
1106 child.parent_id = Some(parent_id.to_string());
1107 child.branched_at_event_index = branched_at_event_index;
1108 child.transcript = clone_transcript_with_parent(&child.transcript, parent_id);
1109 }
1110}
1111
1112fn branch_event_index(transcript: &VmValue, keep_first: usize) -> usize {
1113 if keep_first == 0 {
1114 return 0;
1115 }
1116 let Some(dict) = transcript.as_dict() else {
1117 return keep_first;
1118 };
1119 let Some(VmValue::List(events)) = dict.get("events") else {
1120 return keep_first;
1121 };
1122 let mut retained_messages = 0usize;
1123 for (index, event) in events.iter().enumerate() {
1124 let kind = event
1125 .as_dict()
1126 .and_then(|dict| dict.get("kind"))
1127 .map(VmValue::display);
1128 if matches!(kind.as_deref(), Some("message" | "tool_result")) {
1129 retained_messages += 1;
1130 if retained_messages == keep_first {
1131 return index + 1;
1132 }
1133 }
1134 }
1135 events.len()
1136}
1137
1138#[cfg(test)]
1139mod tests {
1140 use super::*;
1141 use crate::agent_events::{
1142 emit_event, register_sink, reset_all_sinks, session_external_sink_count, AgentEvent,
1143 AgentEventSink,
1144 };
1145 use crate::event_log::{active_event_log, EventLog, Topic};
1146 use std::collections::BTreeMap;
1147 use std::sync::{Arc, Mutex};
1148
1149 fn make_msg(role: &str, content: &str) -> VmValue {
1150 let mut m: BTreeMap<String, VmValue> = BTreeMap::new();
1151 m.insert("role".to_string(), VmValue::String(Rc::from(role)));
1152 m.insert("content".to_string(), VmValue::String(Rc::from(content)));
1153 VmValue::Dict(Rc::new(m))
1154 }
1155
1156 fn message_count(id: &str) -> usize {
1157 SESSIONS.with(|s| {
1158 let map = s.borrow();
1159 let Some(state) = map.get(id) else { return 0 };
1160 let Some(dict) = state.transcript.as_dict() else {
1161 return 0;
1162 };
1163 match dict.get("messages") {
1164 Some(VmValue::List(list)) => list.len(),
1165 _ => 0,
1166 }
1167 })
1168 }
1169
1170 fn event_count_by_kind(id: &str, expected_kind: &str) -> usize {
1171 snapshot(id)
1172 .and_then(|snapshot| snapshot.as_dict().cloned())
1173 .and_then(|dict| dict.get("events").cloned())
1174 .and_then(|events| match events {
1175 VmValue::List(events) => Some(
1176 events
1177 .iter()
1178 .filter(|event| {
1179 event
1180 .as_dict()
1181 .and_then(|dict| dict.get("kind"))
1182 .map(VmValue::display)
1183 .as_deref()
1184 == Some(expected_kind)
1185 })
1186 .count(),
1187 ),
1188 _ => None,
1189 })
1190 .unwrap_or(0)
1191 }
1192
1193 struct CapturingSink(Arc<Mutex<Vec<AgentEvent>>>);
1194
1195 impl AgentEventSink for CapturingSink {
1196 fn handle_event(&self, event: &AgentEvent) {
1197 self.0
1198 .lock()
1199 .expect("capture sink poisoned")
1200 .push(event.clone());
1201 }
1202 }
1203
1204 #[test]
1205 fn records_system_prompt_as_metadata_event_without_message() {
1206 reset_session_store();
1207 let id = open_or_create(Some("system-prompt-session".into()));
1208 record_system_prompt(&id, "Follow the workflow.").unwrap();
1209 record_system_prompt(&id, "Follow the workflow.").unwrap();
1210 inject_message(&id, make_msg("user", "hello")).unwrap();
1211
1212 let snapshot = snapshot(&id).expect("session snapshot");
1213 let snapshot_dict = snapshot.as_dict().expect("session snapshot dict");
1214 let metadata = snapshot_dict
1215 .get("metadata")
1216 .and_then(VmValue::as_dict)
1217 .expect("metadata");
1218 let system_prompt = metadata
1219 .get("system_prompt")
1220 .and_then(VmValue::as_dict)
1221 .expect("system prompt metadata");
1222 assert_eq!(
1223 system_prompt
1224 .get("content")
1225 .map(VmValue::display)
1226 .as_deref(),
1227 Some("Follow the workflow.")
1228 );
1229 assert!(
1230 matches!(snapshot_dict.get("system_prompt"), Some(VmValue::String(value)) if value.as_ref() == "Follow the workflow.")
1231 );
1232 assert!(matches!(snapshot_dict.get("length"), Some(VmValue::Int(1))));
1233
1234 let transcript = transcript(&id).expect("canonical transcript");
1235 let transcript_dict = transcript.as_dict().expect("canonical transcript dict");
1236 assert!(!transcript_dict.contains_key("system_prompt"));
1237 assert!(transcript_dict
1238 .get("metadata")
1239 .and_then(VmValue::as_dict)
1240 .and_then(|metadata| metadata.get("system_prompt"))
1241 .is_some());
1242 assert_eq!(message_count(&id), 1);
1243 assert_eq!(event_count_by_kind(&id, "system_prompt"), 1);
1244 }
1245
1246 #[test]
1247 fn close_with_status_emits_terminal_event_and_clears_sinks() {
1248 reset_all_sinks();
1249 let id = open_or_create(Some("close-reason-session".into()));
1250 inject_message(&id, make_msg("user", "hello")).unwrap();
1251 let captured = Arc::new(Mutex::new(Vec::new()));
1252 register_sink(&id, Arc::new(CapturingSink(captured.clone())));
1253 assert_eq!(session_external_sink_count(&id), 1);
1254
1255 assert!(close_with_status(
1256 &id,
1257 "timeout",
1258 "timeout",
1259 serde_json::json!({"idle_ms": 5000}),
1260 ));
1261
1262 assert!(!exists(&id));
1263 assert_eq!(session_external_sink_count(&id), 0);
1264 let events = captured.lock().expect("capture sink poisoned");
1265 assert_eq!(events.len(), 1);
1266 match &events[0] {
1267 AgentEvent::SessionClosed {
1268 session_id,
1269 reason,
1270 status,
1271 metadata,
1272 } => {
1273 assert_eq!(session_id, "close-reason-session");
1274 assert_eq!(reason, "timeout");
1275 assert_eq!(status, "timeout");
1276 assert_eq!(metadata["idle_ms"], 5000);
1277 }
1278 other => panic!("expected SessionClosed, got {other:?}"),
1279 }
1280 reset_all_sinks();
1281 }
1282
1283 #[test]
1284 fn fork_at_truncates_destination_to_keep_first() {
1285 reset_session_store();
1286 let src = open_or_create(Some("src-fork-at".into()));
1287 inject_message(&src, make_msg("user", "a")).unwrap();
1288 inject_message(&src, make_msg("assistant", "b")).unwrap();
1289 inject_message(&src, make_msg("user", "c")).unwrap();
1290 inject_message(&src, make_msg("assistant", "d")).unwrap();
1291 assert_eq!(message_count(&src), 4);
1292
1293 let dst = fork_at(&src, 2, Some("dst-fork-at".into())).expect("fork_at");
1294 assert_ne!(dst, src);
1295 assert_eq!(message_count(&dst), 2, "branched at message index 2");
1296 assert_eq!(
1297 snapshot(&dst)
1298 .and_then(|value| value.as_dict().cloned())
1299 .and_then(|dict| dict
1300 .get("branched_at_event_index")
1301 .and_then(VmValue::as_int)),
1302 Some(2)
1303 );
1304 assert_eq!(message_count(&src), 4);
1306 assert_eq!(subscriber_count(&dst), 0);
1308 reset_session_store();
1309 }
1310
1311 #[test]
1312 fn fork_at_on_unknown_source_returns_none() {
1313 reset_session_store();
1314 assert!(fork_at("does-not-exist", 3, None).is_none());
1315 }
1316
1317 #[test]
1318 fn child_sessions_record_parent_lineage() {
1319 reset_session_store();
1320 let parent = open_or_create(Some("parent-session".into()));
1321 let child = open_child_session(&parent, Some("child-session".into()));
1322 assert_eq!(parent_id(&child).as_deref(), Some("parent-session"));
1323 assert_eq!(child_ids(&parent), vec!["child-session".to_string()]);
1324 assert_eq!(
1325 ancestry(&child),
1326 Some(SessionAncestry {
1327 parent_id: Some("parent-session".to_string()),
1328 child_ids: Vec::new(),
1329 root_id: "parent-session".to_string(),
1330 })
1331 );
1332
1333 let transcript = snapshot(&child).expect("child transcript");
1334 let transcript = transcript.as_dict().expect("child snapshot");
1335 let metadata = transcript
1336 .get("metadata")
1337 .and_then(|value| value.as_dict())
1338 .expect("child metadata");
1339 assert!(
1340 matches!(transcript.get("parent_id"), Some(VmValue::String(value)) if value.as_ref() == "parent-session")
1341 );
1342 assert!(
1343 matches!(transcript.get("child_ids"), Some(VmValue::List(children)) if children.is_empty())
1344 );
1345 assert!(matches!(transcript.get("length"), Some(VmValue::Int(0))));
1346 assert!(
1347 matches!(transcript.get("created_at"), Some(VmValue::String(value)) if !value.is_empty())
1348 );
1349 assert!(matches!(
1350 transcript.get("system_prompt"),
1351 Some(VmValue::Nil)
1352 ));
1353 assert!(matches!(transcript.get("tool_format"), Some(VmValue::Nil)));
1354 assert!(matches!(
1355 transcript.get("branched_at_event_index"),
1356 Some(VmValue::Nil)
1357 ));
1358 assert!(matches!(
1359 metadata.get("parent_session_id"),
1360 Some(VmValue::String(value)) if value.as_ref() == "parent-session"
1361 ));
1362 }
1363
1364 #[test]
1365 fn branch_event_index_counts_non_message_events() {
1366 reset_session_store();
1367 let src = open_or_create(Some("branch-event-index".into()));
1368 let transcript = VmValue::Dict(Rc::new(BTreeMap::from([
1369 ("id".to_string(), VmValue::String(Rc::from(src.clone()))),
1370 (
1371 "messages".to_string(),
1372 VmValue::List(Rc::new(vec![
1373 make_msg("user", "a"),
1374 make_msg("assistant", "b"),
1375 ])),
1376 ),
1377 (
1378 "events".to_string(),
1379 VmValue::List(Rc::new(vec![
1380 VmValue::Dict(Rc::new(BTreeMap::from([(
1381 "kind".to_string(),
1382 VmValue::String(Rc::from("message")),
1383 )]))),
1384 VmValue::Dict(Rc::new(BTreeMap::from([(
1385 "kind".to_string(),
1386 VmValue::String(Rc::from("sub_agent_start")),
1387 )]))),
1388 VmValue::Dict(Rc::new(BTreeMap::from([(
1389 "kind".to_string(),
1390 VmValue::String(Rc::from("message")),
1391 )]))),
1392 ])),
1393 ),
1394 ])));
1395 store_transcript(&src, transcript);
1396
1397 let dst = fork_at(&src, 2, Some("branch-event-index-child".into())).expect("fork_at");
1398 assert_eq!(
1399 snapshot(&dst)
1400 .and_then(|value| value.as_dict().cloned())
1401 .and_then(|dict| dict
1402 .get("branched_at_event_index")
1403 .and_then(VmValue::as_int)),
1404 Some(3)
1405 );
1406 }
1407
1408 #[test]
1409 fn child_session_records_lineage_without_reusing_parent_transcript() {
1410 reset_session_store();
1411 let parent = open_or_create(Some("parent-fork-parent".into()));
1412 inject_message(&parent, make_msg("user", "parent context")).unwrap();
1413 claim_tool_format(&parent, "native").unwrap();
1414
1415 let child = open_child_session(&parent, Some("parent-fork-child".into()));
1416 assert_eq!(message_count(&parent), 1);
1417 assert_eq!(message_count(&child), 0);
1418 assert_eq!(tool_format(&child), None);
1419 assert_eq!(parent_id(&child).as_deref(), Some(parent.as_str()));
1420 }
1421
1422 #[test]
1423 fn prompt_state_prepends_summary_message_when_missing_from_messages() {
1424 reset_session_store();
1425 let session = open_or_create(Some("prompt-state-summary".into()));
1426 let transcript = crate::llm::helpers::new_transcript_with_events(
1427 Some(session.clone()),
1428 vec![make_msg("assistant", "latest answer")],
1429 Some("[auto-compacted 2 older messages]\nsummary".to_string()),
1430 None,
1431 Vec::new(),
1432 Vec::new(),
1433 Some("active"),
1434 );
1435 store_transcript(&session, transcript);
1436
1437 let prompt = prompt_state_json(&session);
1438 assert_eq!(
1439 prompt.summary.as_deref(),
1440 Some("[auto-compacted 2 older messages]\nsummary")
1441 );
1442 assert_eq!(prompt.messages.len(), 2);
1443 assert_eq!(prompt.messages[0]["role"].as_str(), Some("user"));
1444 assert_eq!(
1445 prompt.messages[0]["content"].as_str(),
1446 Some("[auto-compacted 2 older messages]\nsummary"),
1447 );
1448 assert_eq!(prompt.messages[1]["role"].as_str(), Some("assistant"));
1449 }
1450
1451 #[tokio::test(flavor = "current_thread", start_paused = true)]
1452 async fn open_or_create_registers_event_log_sink_when_active_log_is_installed() {
1453 reset_all_sinks();
1454 crate::event_log::reset_active_event_log();
1455 let dir = tempfile::tempdir().expect("tempdir");
1456 crate::event_log::install_default_for_base_dir(dir.path()).expect("install event log");
1457
1458 let session = open_or_create(Some("event-log-session".into()));
1459 assert_eq!(session_external_sink_count(&session), 1);
1460
1461 emit_event(&AgentEvent::TurnStart {
1462 session_id: session.clone(),
1463 iteration: 0,
1464 });
1465 tokio::time::sleep(std::time::Duration::from_millis(25)).await;
1466
1467 let topic = Topic::new("observability.agent_events.event-log-session").unwrap();
1468 let log = active_event_log().expect("active event log");
1469 let events = log.read_range(&topic, None, usize::MAX).await.unwrap();
1470 assert_eq!(events.len(), 1);
1471 assert_eq!(events[0].1.kind, "turn_start");
1472
1473 crate::event_log::reset_active_event_log();
1474 reset_all_sinks();
1475 }
1476}