1use std::cell::{Cell, RefCell};
22use std::collections::{BTreeMap, HashMap, HashSet};
23use std::future::Future;
24use std::path::{Path, PathBuf};
25use std::rc::Rc;
26use std::time::Instant;
27
28use crate::runtime_limits::RuntimeLimits;
29use crate::value::VmValue;
30use crate::workspace_anchor::{
31 MountMode, MountedRoot, WorkspaceAnchor, WorkspacePolicy, WORKSPACE_ANCHOR_METADATA_KEY,
32};
33
34pub const DEFAULT_SESSION_CAP: usize = RuntimeLimits::DEFAULT.max_agent_sessions;
37
38pub const DEFAULT_TRANSCRIPT_MESSAGE_CAP: usize = 4096;
42
43pub const DEFAULT_TRANSCRIPT_EVENT_CAP: usize = 32768;
46#[cfg(debug_assertions)]
47const CACHE_STABLE_SYSTEM_PROMPT_DIAGNOSTIC: &str = "HARN-CACHE-001";
48
49#[derive(Clone, Debug, PartialEq, Eq)]
50pub enum TranscriptBudgetRecovery {
51 Reject,
52 Trim { keep_last: usize },
53 Compact { keep_last: usize },
54}
55
56#[derive(Clone, Debug, PartialEq, Eq)]
57pub struct SessionTranscriptBudgetPolicy {
58 pub max_messages: usize,
59 pub max_events: usize,
60 pub max_approx_bytes: Option<usize>,
61 pub recovery: TranscriptBudgetRecovery,
62}
63
64impl SessionTranscriptBudgetPolicy {
65 pub fn reject(max_messages: usize, max_events: usize) -> Self {
66 Self {
67 max_messages: max_messages.max(1),
68 max_events: max_events.max(1),
69 max_approx_bytes: None,
70 recovery: TranscriptBudgetRecovery::Reject,
71 }
72 }
73
74 pub fn trim(max_messages: usize, max_events: usize, keep_last: usize) -> Self {
75 Self {
76 max_messages: max_messages.max(1),
77 max_events: max_events.max(1),
78 max_approx_bytes: None,
79 recovery: TranscriptBudgetRecovery::Trim { keep_last },
80 }
81 }
82
83 pub fn compact(max_messages: usize, max_events: usize, keep_last: usize) -> Self {
84 Self {
85 max_messages: max_messages.max(1),
86 max_events: max_events.max(1),
87 max_approx_bytes: None,
88 recovery: TranscriptBudgetRecovery::Compact { keep_last },
89 }
90 }
91
92 pub fn with_max_approx_bytes(mut self, max_approx_bytes: Option<usize>) -> Self {
93 self.max_approx_bytes = max_approx_bytes.map(|limit| limit.max(1));
94 self
95 }
96
97 fn normalized(&self) -> Self {
98 Self {
99 max_messages: self.max_messages.max(1),
100 max_events: self.max_events.max(1),
101 max_approx_bytes: self.max_approx_bytes.map(|limit| limit.max(1)),
102 recovery: self.recovery.clone(),
103 }
104 }
105}
106
107impl Default for SessionTranscriptBudgetPolicy {
108 fn default() -> Self {
109 Self::reject(DEFAULT_TRANSCRIPT_MESSAGE_CAP, DEFAULT_TRANSCRIPT_EVENT_CAP)
110 }
111}
112
113pub struct SessionState {
114 pub id: String,
115 pub transcript: VmValue,
116 pub subscribers: Vec<VmValue>,
117 pub created_at: String,
118 pub last_accessed: Instant,
119 pub parent_id: Option<String>,
120 pub child_ids: Vec<String>,
121 pub branched_at_event_index: Option<usize>,
122 pub active_skills: Vec<String>,
128 pub tool_format: Option<String>,
132 pub system_prompt: Option<String>,
136 pub pinned_model: Option<String>,
142 pub pinned_reasoning_policy: Option<String>,
148 pub workspace_policy: WorkspacePolicy,
151 pub workspace_anchor: Option<WorkspaceAnchor>,
157 pub transcript_budget_policy: SessionTranscriptBudgetPolicy,
158 pub last_transcript_budget_action: Option<serde_json::Value>,
159}
160
161impl SessionState {
162 fn new(id: String) -> Self {
163 let now = Instant::now();
164 let transcript = empty_transcript(&id);
165 Self {
166 id,
167 transcript,
168 subscribers: Vec::new(),
169 created_at: crate::orchestration::now_rfc3339(),
170 last_accessed: now,
171 parent_id: None,
172 child_ids: Vec::new(),
173 branched_at_event_index: None,
174 active_skills: Vec::new(),
175 tool_format: None,
176 system_prompt: None,
177 pinned_model: None,
178 pinned_reasoning_policy: None,
179 workspace_policy: WorkspacePolicy::default(),
180 workspace_anchor: None,
181 transcript_budget_policy: default_transcript_budget_policy(),
182 last_transcript_budget_action: None,
183 }
184 }
185}
186
187#[derive(Clone, Debug, PartialEq, Eq)]
188pub struct SessionAncestry {
189 pub parent_id: Option<String>,
190 pub child_ids: Vec<String>,
191 pub root_id: String,
192}
193
194#[derive(Clone, Debug, PartialEq, Eq)]
195pub struct SessionTruncateResult {
196 pub kept_turn_count: usize,
197 pub removed_turn_count: usize,
198 pub new_tip_turn_id: Option<String>,
199}
200
201#[derive(Clone, Debug, PartialEq, Eq)]
202pub struct ReminderInjectionReport {
203 pub reminder_id: String,
204 pub deduped_count: usize,
205}
206
207thread_local! {
208 static SESSIONS: RefCell<HashMap<String, SessionState>> = RefCell::new(HashMap::new());
209 static SESSION_CAP: Cell<usize> = const { Cell::new(DEFAULT_SESSION_CAP) };
210 static DEFAULT_TRANSCRIPT_BUDGET_POLICY: RefCell<SessionTranscriptBudgetPolicy> =
211 RefCell::new(SessionTranscriptBudgetPolicy::default());
212 static CURRENT_SESSION_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
213 static CURRENT_TOOL_CALL_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
214}
215
216tokio::task_local! {
217 static CURRENT_TOOL_CALL_TASK: String;
218}
219
220pub struct CurrentSessionGuard {
221 active: bool,
222}
223
224impl Drop for CurrentSessionGuard {
225 fn drop(&mut self) {
226 if self.active {
227 pop_current_session();
228 }
229 }
230}
231
232pub struct CurrentToolCallGuard {
238 active: bool,
239}
240
241impl Drop for CurrentToolCallGuard {
242 fn drop(&mut self) {
243 if self.active {
244 pop_current_tool_call();
245 }
246 }
247}
248
249pub fn set_session_cap(cap: usize) {
252 SESSION_CAP.with(|c| c.set(cap.max(1)));
253}
254
255pub fn session_cap() -> usize {
256 SESSION_CAP.with(|c| c.get())
257}
258
259pub fn set_default_transcript_budget_policy(policy: SessionTranscriptBudgetPolicy) {
260 DEFAULT_TRANSCRIPT_BUDGET_POLICY.with(|cell| {
261 *cell.borrow_mut() = policy.normalized();
262 });
263}
264
265pub fn reset_default_transcript_budget_policy() {
266 set_default_transcript_budget_policy(SessionTranscriptBudgetPolicy::default());
267}
268
269pub fn default_transcript_budget_policy() -> SessionTranscriptBudgetPolicy {
270 DEFAULT_TRANSCRIPT_BUDGET_POLICY.with(|cell| cell.borrow().clone())
271}
272
273pub fn transcript_budget_policy(id: &str) -> Option<SessionTranscriptBudgetPolicy> {
274 SESSIONS.with(|s| {
275 s.borrow()
276 .get(id)
277 .map(|state| state.transcript_budget_policy.clone())
278 })
279}
280
281pub fn set_transcript_budget_policy(
282 id: &str,
283 policy: SessionTranscriptBudgetPolicy,
284) -> Result<(), String> {
285 SESSIONS.with(|s| {
286 let mut map = s.borrow_mut();
287 let Some(state) = map.get_mut(id) else {
288 return Err(format!("agent session '{id}' does not exist"));
289 };
290 let previous = state.transcript_budget_policy.clone();
291 let previous_action = state.last_transcript_budget_action.clone();
292 state.transcript_budget_policy = policy.normalized();
293 let candidate = state.transcript.clone();
294 if let Err(error) = apply_transcript_with_budget(state, candidate, "policy_update") {
295 state.transcript_budget_policy = previous;
296 state.last_transcript_budget_action = previous_action;
297 return Err(error);
298 }
299 state.last_accessed = Instant::now();
300 Ok(())
301 })
302}
303
304pub fn reset_session_store() {
306 SESSIONS.with(|s| s.borrow_mut().clear());
307 CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().clear());
308 CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow_mut().clear());
309 reset_default_transcript_budget_policy();
310}
311
312pub(crate) fn push_current_session(id: String) {
313 if id.is_empty() {
314 return;
315 }
316 CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().push(id));
317}
318
319pub(crate) fn pop_current_session() {
320 CURRENT_SESSION_STACK.with(|stack| {
321 let _ = stack.borrow_mut().pop();
322 });
323}
324
325pub fn current_session_id() -> Option<String> {
326 CURRENT_SESSION_STACK.with(|stack| stack.borrow().last().cloned())
327}
328
329pub fn enter_current_session(id: impl Into<String>) -> CurrentSessionGuard {
330 let id = id.into();
331 if id.trim().is_empty() {
332 return CurrentSessionGuard { active: false };
333 }
334 push_current_session(id);
335 CurrentSessionGuard { active: true }
336}
337
338fn push_current_tool_call(id: String) {
339 if id.is_empty() {
340 return;
341 }
342 CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow_mut().push(id));
343}
344
345fn pop_current_tool_call() {
346 CURRENT_TOOL_CALL_STACK.with(|stack| {
347 let _ = stack.borrow_mut().pop();
348 });
349}
350
351pub fn current_tool_call_id() -> Option<String> {
356 if let Ok(id) = CURRENT_TOOL_CALL_TASK.try_with(Clone::clone) {
357 if !id.trim().is_empty() {
358 return Some(id);
359 }
360 }
361 CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow().last().cloned())
362}
363
364pub async fn scope_current_tool_call<F, T>(id: impl Into<String>, future: F) -> T
370where
371 F: Future<Output = T>,
372{
373 let id = id.into();
374 if id.trim().is_empty() {
375 future.await
376 } else {
377 CURRENT_TOOL_CALL_TASK.scope(id, future).await
378 }
379}
380
381pub fn enter_current_tool_call(id: impl Into<String>) -> CurrentToolCallGuard {
383 let id = id.into();
384 if id.trim().is_empty() {
385 return CurrentToolCallGuard { active: false };
386 }
387 push_current_tool_call(id);
388 CurrentToolCallGuard { active: true }
389}
390
391pub fn exists(id: &str) -> bool {
392 SESSIONS.with(|s| s.borrow().contains_key(id))
393}
394
395pub fn length(id: &str) -> Option<usize> {
396 SESSIONS.with(|s| {
397 s.borrow().get(id).map(|state| {
398 state
399 .transcript
400 .as_dict()
401 .and_then(|d| d.get("messages"))
402 .and_then(|v| match v {
403 VmValue::List(list) => Some(list.len()),
404 _ => None,
405 })
406 .unwrap_or(0)
407 })
408 })
409}
410
411pub fn snapshot(id: &str) -> Option<VmValue> {
412 SESSIONS.with(|s| s.borrow().get(id).map(session_snapshot))
413}
414
415pub fn transcript(id: &str) -> Option<VmValue> {
417 SESSIONS.with(|s| {
418 s.borrow()
419 .get(id)
420 .map(|state| transcript_with_session_metadata(state.transcript.clone(), state))
421 })
422}
423
424pub fn open_or_create(id: Option<String>) -> String {
433 let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
434 let parent_session = current_session_id();
435 let mut was_new = false;
436 SESSIONS.with(|s| {
437 let mut map = s.borrow_mut();
438 if let Some(state) = map.get_mut(&resolved) {
439 state.last_accessed = Instant::now();
440 return;
441 }
442 was_new = true;
443 let cap = SESSION_CAP.with(|c| c.get());
444 if map.len() >= cap {
445 if let Some(victim) = map
446 .iter()
447 .min_by_key(|(_, state)| state.last_accessed)
448 .map(|(id, _)| id.clone())
449 {
450 map.remove(&victim);
451 }
452 }
453 map.insert(resolved.clone(), SessionState::new(resolved.clone()));
454 });
455 if was_new {
456 if let Some(parent) = parent_session.as_deref() {
457 crate::agent_events::mirror_session_sinks(parent, &resolved);
458 }
459 try_register_event_log(&resolved);
460 }
461 resolved
462}
463
464pub fn open_child_session(parent_id: &str, id: Option<String>) -> String {
465 let resolved = open_or_create(id);
466 link_child_session(parent_id, &resolved);
467 resolved
468}
469
470pub fn link_child_session(parent_id: &str, child_id: &str) {
471 link_child_session_with_branch(parent_id, child_id, None);
472}
473
474pub fn link_child_session_with_branch(
475 parent_id: &str,
476 child_id: &str,
477 branched_at_event_index: Option<usize>,
478) {
479 if parent_id == child_id {
480 return;
481 }
482 open_or_create(Some(parent_id.to_string()));
483 open_or_create(Some(child_id.to_string()));
484 SESSIONS.with(|s| {
485 let mut map = s.borrow_mut();
486 update_lineage(&mut map, parent_id, child_id, branched_at_event_index);
487 });
488}
489
490pub fn parent_id(id: &str) -> Option<String> {
491 SESSIONS.with(|s| s.borrow().get(id).and_then(|state| state.parent_id.clone()))
492}
493
494pub fn child_ids(id: &str) -> Vec<String> {
495 SESSIONS.with(|s| {
496 s.borrow()
497 .get(id)
498 .map(|state| state.child_ids.clone())
499 .unwrap_or_default()
500 })
501}
502
503pub fn ancestry(id: &str) -> Option<SessionAncestry> {
504 SESSIONS.with(|s| {
505 let map = s.borrow();
506 let state = map.get(id)?;
507 let mut root_id = state.id.clone();
508 let mut cursor = state.parent_id.clone();
509 let mut seen = HashSet::from([state.id.clone()]);
510 while let Some(parent_id) = cursor {
511 if !seen.insert(parent_id.clone()) {
512 break;
513 }
514 root_id = parent_id.clone();
515 cursor = map
516 .get(&parent_id)
517 .and_then(|parent| parent.parent_id.clone());
518 }
519 Some(SessionAncestry {
520 parent_id: state.parent_id.clone(),
521 child_ids: state.child_ids.clone(),
522 root_id,
523 })
524 })
525}
526
527fn try_register_event_log(session_id: &str) {
531 if let Some(log) = crate::event_log::active_event_log() {
532 crate::agent_events::register_sink(
533 session_id,
534 crate::agent_events::EventLogSink::new(log, session_id),
535 );
536 return;
537 }
538 let Ok(dir) = std::env::var("HARN_EVENT_LOG_DIR") else {
539 return;
540 };
541 if dir.is_empty() {
542 return;
543 }
544 let path = std::path::PathBuf::from(dir).join(format!("event_log-{session_id}.jsonl"));
545 if let Ok(sink) = crate::agent_events::JsonlEventSink::open(&path) {
546 crate::agent_events::register_sink(session_id, sink);
547 }
548}
549
550pub fn register_event_log_sink(session_id: &str) {
551 try_register_event_log(session_id);
552}
553
554pub fn close(id: &str) {
555 SESSIONS.with(|s| {
556 s.borrow_mut().remove(id);
557 });
558 crate::orchestration::agent_inbox::clear_session(id);
562 crate::agent_events::clear_session_sinks(id);
563}
564
565pub fn close_with_status(
566 id: &str,
567 reason: impl Into<String>,
568 status: impl Into<String>,
569 metadata: serde_json::Value,
570) -> bool {
571 if !exists(id) {
572 return false;
573 }
574 let reason = reason.into();
575 let status = status.into();
576 let event_metadata = serde_json::json!({
577 "reason": reason,
578 "status": status,
579 "metadata": metadata,
580 });
581 let transcript_event = crate::llm::helpers::transcript_event(
582 "agent_session_closed",
583 "system",
584 "internal",
585 "Agent session closed",
586 Some(event_metadata),
587 );
588 let _ = append_event(id, transcript_event);
589 crate::llm::emit_live_agent_event_sync(&crate::agent_events::AgentEvent::SessionClosed {
590 session_id: id.to_string(),
591 reason,
592 status,
593 metadata,
594 });
595 close(id);
596 true
597}
598
599pub fn reset_transcript(id: &str) -> bool {
600 SESSIONS.with(|s| {
601 let mut map = s.borrow_mut();
602 let Some(state) = map.get_mut(id) else {
603 return false;
604 };
605 state.transcript = empty_transcript(id);
606 state.tool_format = None;
607 state.system_prompt = None;
608 state.last_transcript_budget_action = None;
609 state.last_accessed = Instant::now();
610 true
611 })
612}
613
614pub fn fork(src_id: &str, dst_id: Option<String>) -> Option<String> {
621 let (
622 src_transcript,
623 src_tool_format,
624 src_system_prompt,
625 src_pinned_model,
626 src_pinned_reasoning_policy,
627 src_workspace_anchor,
628 src_workspace_policy,
629 src_transcript_budget_policy,
630 src_last_transcript_budget_action,
631 dst,
632 ) = SESSIONS.with(|s| {
633 let mut map = s.borrow_mut();
634 let src = map.get_mut(src_id)?;
635 src.last_accessed = Instant::now();
636 let dst = dst_id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
637 let forked_transcript = clone_transcript_with_id(&src.transcript, &dst);
638 Some((
639 forked_transcript,
640 src.tool_format.clone(),
641 src.system_prompt.clone(),
642 src.pinned_model.clone(),
643 src.pinned_reasoning_policy.clone(),
644 src.workspace_anchor.clone(),
645 src.workspace_policy.clone(),
646 src.transcript_budget_policy.clone(),
647 src.last_transcript_budget_action.clone(),
648 dst,
649 ))
650 })?;
651 open_or_create(Some(dst.clone()));
653 SESSIONS.with(|s| {
654 let mut map = s.borrow_mut();
655 if let Some(state) = map.get_mut(&dst) {
656 state.transcript = src_transcript;
657 state.tool_format = src_tool_format;
658 state.system_prompt = src_system_prompt;
659 state.pinned_model = src_pinned_model;
660 state.pinned_reasoning_policy = src_pinned_reasoning_policy;
661 state.workspace_anchor = src_workspace_anchor;
662 state.workspace_policy = src_workspace_policy;
663 state.transcript_budget_policy = src_transcript_budget_policy;
664 state.last_transcript_budget_action = src_last_transcript_budget_action;
665 state.last_accessed = Instant::now();
666 }
667 update_lineage(&mut map, src_id, &dst, None);
668 });
669 let budget_ok = SESSIONS.with(|s| {
670 let mut map = s.borrow_mut();
671 let Some(state) = map.get_mut(&dst) else {
672 return false;
673 };
674 let candidate = state.transcript.clone();
675 apply_transcript_with_budget(state, candidate, "fork").is_ok()
676 });
677 if !budget_ok {
678 close(&dst);
679 return None;
680 }
681 if exists(&dst) {
685 Some(dst)
686 } else {
687 None
688 }
689}
690
691pub fn fork_at(src_id: &str, keep_first: usize, dst_id: Option<String>) -> Option<String> {
702 let branched_at_event_index = SESSIONS.with(|s| {
703 let map = s.borrow();
704 let src = map.get(src_id)?;
705 Some(branch_event_index(&src.transcript, keep_first))
706 })?;
707 let new_id = fork(src_id, dst_id)?;
708 link_child_session_with_branch(src_id, &new_id, Some(branched_at_event_index));
709 let _ = truncate(&new_id, keep_first);
710 Some(new_id)
711}
712
713pub fn truncate(id: &str, keep_first: usize) -> Option<SessionTruncateResult> {
717 SESSIONS.with(|s| {
718 let mut map = s.borrow_mut();
719 let state = map.get_mut(id)?;
720 let result = truncate_state(state, keep_first)?;
721 Some(result)
722 })
723}
724
725fn truncate_state(state: &mut SessionState, keep_first: usize) -> Option<SessionTruncateResult> {
726 let dict = state
727 .transcript
728 .as_dict()
729 .cloned()
730 .unwrap_or_else(BTreeMap::new);
731 let messages: Vec<VmValue> = match dict.get("messages") {
732 Some(VmValue::List(list)) => list.iter().cloned().collect(),
733 _ => Vec::new(),
734 };
735 let existing_events = match dict.get("events") {
736 Some(VmValue::List(list)) => Some(list.iter().cloned().collect::<Vec<_>>()),
737 _ => None,
738 };
739 let kept_turn_count = keep_first.min(messages.len());
740 let removed_turn_count = messages.len().saturating_sub(kept_turn_count);
741 let mut new_tip_turn_id = existing_events
742 .as_ref()
743 .map(|events| turn_event_id_for_count(events, kept_turn_count))
744 .unwrap_or_else(|| {
745 let events = crate::llm::helpers::transcript_events_from_messages(&messages);
746 turn_event_id_for_count(&events, kept_turn_count)
747 });
748
749 if removed_turn_count > 0 {
750 let retained: Vec<VmValue> = messages.into_iter().take(kept_turn_count).collect();
751 let retained_events = match existing_events {
752 Some(events) => {
753 let keep_event_count = event_prefix_len_for_messages(&events, kept_turn_count);
754 events.into_iter().take(keep_event_count).collect()
755 }
756 None => crate::llm::helpers::transcript_events_from_messages(&retained),
757 };
758 new_tip_turn_id = turn_event_id_for_count(&retained_events, kept_turn_count);
759 let mut next = dict;
760 next.insert(
761 "events".to_string(),
762 VmValue::List(Rc::new(retained_events)),
763 );
764 next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
765 next.remove("summary");
766 apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), "truncate").ok()?;
767 }
768 state.last_accessed = Instant::now();
769 Some(SessionTruncateResult {
770 kept_turn_count,
771 removed_turn_count,
772 new_tip_turn_id,
773 })
774}
775
776pub fn pop_last_if_assistant(id: &str) -> Result<bool, String> {
783 SESSIONS.with(|s| {
784 let mut map = s.borrow_mut();
785 let Some(state) = map.get_mut(id) else {
786 return Err(format!(
787 "pop_last_if_assistant: unknown session id '{id}'"
788 ));
789 };
790 let messages: Vec<VmValue> = match state.transcript.as_dict() {
791 Some(dict) => match dict.get("messages") {
792 Some(VmValue::List(list)) => list.iter().cloned().collect(),
793 _ => Vec::new(),
794 },
795 None => Vec::new(),
796 };
797 if messages.is_empty() {
798 return Ok(false);
799 }
800 let trailing_role = messages
801 .last()
802 .and_then(|m| m.as_dict())
803 .and_then(|d| d.get("role"))
804 .map(|v| v.display())
805 .unwrap_or_default();
806 if trailing_role != "assistant" {
807 return Err(format!(
808 "pop_last_if_assistant: trailing message role is '{trailing_role}', expected 'assistant'"
809 ));
810 }
811 let keep = messages.len() - 1;
812 truncate_state(state, keep);
813 Ok(true)
814 })
815}
816
817pub fn trim(id: &str, keep_last: usize) -> Option<usize> {
818 SESSIONS.with(|s| {
819 let mut map = s.borrow_mut();
820 let state = map.get_mut(id)?;
821 let dict = state.transcript.as_dict()?.clone();
822 let messages: Vec<VmValue> = match dict.get("messages") {
823 Some(VmValue::List(list)) => list.iter().cloned().collect(),
824 _ => Vec::new(),
825 };
826 let start = messages.len().saturating_sub(keep_last);
827 let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
828 let kept = retained.len();
829 let mut next = dict;
830 next.insert(
831 "events".to_string(),
832 VmValue::List(Rc::new(
833 crate::llm::helpers::transcript_events_from_messages(&retained),
834 )),
835 );
836 next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
837 apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), "trim").ok()?;
838 state.last_accessed = Instant::now();
839 Some(kept)
840 })
841}
842
843pub fn inject_message(id: &str, message: VmValue) -> Result<(), String> {
846 let Some(msg_dict) = message.as_dict().cloned() else {
847 return Err("agent_session_inject: message must be a dict".into());
848 };
849 let role_ok = matches!(msg_dict.get("role"), Some(VmValue::String(_)));
850 if !role_ok {
851 return Err(
852 "agent_session_inject: message must have a string `role` (user|assistant|tool_result|system)"
853 .into(),
854 );
855 }
856 SESSIONS.with(|s| {
857 let mut map = s.borrow_mut();
858 let Some(state) = map.get_mut(id) else {
859 return Err(format!("agent_session_inject: unknown session id '{id}'"));
860 };
861 let dict = state
862 .transcript
863 .as_dict()
864 .cloned()
865 .unwrap_or_else(BTreeMap::new);
866 let mut messages: Vec<VmValue> = match dict.get("messages") {
867 Some(VmValue::List(list)) => list.iter().cloned().collect(),
868 _ => Vec::new(),
869 };
870 let mut events: Vec<VmValue> = match dict.get("events") {
871 Some(VmValue::List(list)) => list.iter().cloned().collect(),
872 _ => crate::llm::helpers::transcript_events_from_messages(&messages),
873 };
874 let new_message = VmValue::Dict(Rc::new(msg_dict));
875 let message_index = messages.len();
876 events.push(crate::llm::helpers::transcript_event_from_message(
877 &new_message,
878 ));
879 messages.push(new_message);
880 let mut next = dict;
881 next.insert("events".to_string(), VmValue::List(Rc::new(events)));
882 next.insert("messages".to_string(), VmValue::List(Rc::new(messages)));
883 let persisted_message = next
884 .get("messages")
885 .and_then(|value| match value {
886 VmValue::List(list) => list.get(message_index).cloned(),
887 _ => None,
888 })
889 .unwrap_or(VmValue::Nil);
890 apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), "inject_message")?;
891 emit_identified_user_message_event(id, &persisted_message);
892 emit_llm_message_event(id, message_index, &persisted_message);
893 state.last_accessed = Instant::now();
894 Ok(())
895 })
896}
897
898fn emit_identified_user_message_event(session_id: &str, message: &VmValue) {
899 let message_json = crate::llm::helpers::vm_value_to_json(message);
900 let role = message_json.get("role").and_then(|value| value.as_str());
901 if role != Some("user") {
902 return;
903 }
904 let Some(message_id) = message_json
905 .get("messageId")
906 .or_else(|| message_json.get("message_id"))
907 .and_then(|value| value.as_str())
908 .filter(|value| !value.trim().is_empty())
909 else {
910 return;
911 };
912 let content = message_json
913 .get("content")
914 .map(user_message_content_blocks)
915 .unwrap_or_default();
916 crate::agent_events::emit_event(&crate::agent_events::AgentEvent::UserMessage {
917 session_id: session_id.to_string(),
918 message_id: message_id.to_string(),
919 content,
920 });
921}
922
923fn user_message_content_blocks(content: &serde_json::Value) -> Vec<serde_json::Value> {
924 match content {
925 serde_json::Value::Array(items) => items.clone(),
926 serde_json::Value::String(text) => vec![serde_json::json!({
927 "type": "text",
928 "text": text,
929 })],
930 other => vec![serde_json::json!({
931 "type": "text",
932 "text": other.to_string(),
933 })],
934 }
935}
936
937#[derive(Clone, Debug, PartialEq, Eq)]
938struct TranscriptBudgetUsage {
939 message_count: usize,
940 event_count: usize,
941 approx_bytes: Option<usize>,
942}
943
944fn transcript_messages_from_dict(dict: &BTreeMap<String, VmValue>) -> Vec<VmValue> {
945 match dict.get("messages") {
946 Some(VmValue::List(list)) => list.iter().cloned().collect(),
947 _ => Vec::new(),
948 }
949}
950
951fn transcript_events_from_dict(dict: &BTreeMap<String, VmValue>) -> Vec<VmValue> {
952 match dict.get("events") {
953 Some(VmValue::List(list)) => list.iter().cloned().collect(),
954 _ => {
955 let messages = transcript_messages_from_dict(dict);
956 crate::llm::helpers::transcript_events_from_messages(&messages)
957 }
958 }
959}
960
961fn transcript_usage(transcript: &VmValue, include_bytes: bool) -> TranscriptBudgetUsage {
962 let Some(dict) = transcript.as_dict() else {
963 return TranscriptBudgetUsage {
964 message_count: 0,
965 event_count: 0,
966 approx_bytes: include_bytes.then_some(0),
967 };
968 };
969 let approx_bytes = if include_bytes {
970 serde_json::to_vec(&crate::llm::helpers::vm_value_to_json(transcript))
971 .map(|bytes| bytes.len())
972 .ok()
973 .or(Some(usize::MAX))
974 } else {
975 None
976 };
977 TranscriptBudgetUsage {
978 message_count: transcript_messages_from_dict(dict).len(),
979 event_count: transcript_events_from_dict(dict).len(),
980 approx_bytes,
981 }
982}
983
984fn transcript_budget_exceeded_reason(
985 usage: &TranscriptBudgetUsage,
986 policy: &SessionTranscriptBudgetPolicy,
987) -> Option<&'static str> {
988 if usage.message_count > policy.max_messages {
989 return Some("message_count");
990 }
991 if usage.event_count > policy.max_events {
992 return Some("event_count");
993 }
994 if let (Some(bytes), Some(limit)) = (usage.approx_bytes, policy.max_approx_bytes) {
995 if bytes > limit {
996 return Some("approx_bytes");
997 }
998 }
999 None
1000}
1001
1002fn transcript_budget_usage_json(usage: &TranscriptBudgetUsage) -> serde_json::Value {
1003 serde_json::json!({
1004 "messages": usage.message_count,
1005 "events": usage.event_count,
1006 "approx_bytes": usage.approx_bytes,
1007 })
1008}
1009
1010fn transcript_budget_policy_json(policy: &SessionTranscriptBudgetPolicy) -> serde_json::Value {
1011 let recovery = match &policy.recovery {
1012 TranscriptBudgetRecovery::Reject => serde_json::json!({"action": "reject"}),
1013 TranscriptBudgetRecovery::Trim { keep_last } => {
1014 serde_json::json!({"action": "trim", "keep_last": keep_last})
1015 }
1016 TranscriptBudgetRecovery::Compact { keep_last } => {
1017 serde_json::json!({"action": "compact", "keep_last": keep_last})
1018 }
1019 };
1020 serde_json::json!({
1021 "max_messages": policy.max_messages,
1022 "max_events": policy.max_events,
1023 "max_approx_bytes": policy.max_approx_bytes,
1024 "recovery": recovery,
1025 })
1026}
1027
1028fn transcript_budget_recovery_name(recovery: &TranscriptBudgetRecovery) -> &'static str {
1029 match recovery {
1030 TranscriptBudgetRecovery::Reject => "reject",
1031 TranscriptBudgetRecovery::Trim { .. } => "trim",
1032 TranscriptBudgetRecovery::Compact { .. } => "compact",
1033 }
1034}
1035
1036fn transcript_budget_error(
1037 state: &SessionState,
1038 policy: &SessionTranscriptBudgetPolicy,
1039 usage: &TranscriptBudgetUsage,
1040 reason: &str,
1041) -> String {
1042 let byte_suffix = match (usage.approx_bytes, policy.max_approx_bytes) {
1043 (Some(bytes), Some(limit)) => format!(", approx_bytes {bytes}/{limit}"),
1044 _ => String::new(),
1045 };
1046 format!(
1047 "transcript budget exceeded for session '{}': {reason} (messages {}/{}, events {}/{}{}; recovery={})",
1048 state.id,
1049 usage.message_count,
1050 policy.max_messages,
1051 usage.event_count,
1052 policy.max_events,
1053 byte_suffix,
1054 transcript_budget_recovery_name(&policy.recovery),
1055 )
1056}
1057
1058fn transcript_budget_audit_json(
1059 action: &str,
1060 source: &str,
1061 reason: &str,
1062 policy: &SessionTranscriptBudgetPolicy,
1063 usage_before: &TranscriptBudgetUsage,
1064 usage_attempted: &TranscriptBudgetUsage,
1065 usage_after: &TranscriptBudgetUsage,
1066) -> serde_json::Value {
1067 serde_json::json!({
1068 "action": action,
1069 "source": source,
1070 "reason": reason,
1071 "policy": transcript_budget_policy_json(policy),
1072 "usage_before": transcript_budget_usage_json(usage_before),
1073 "usage_attempted": transcript_budget_usage_json(usage_attempted),
1074 "usage_after": transcript_budget_usage_json(usage_after),
1075 "removed_messages": usage_attempted.message_count.saturating_sub(usage_after.message_count),
1076 "removed_events": usage_attempted.event_count.saturating_sub(usage_after.event_count),
1077 })
1078}
1079
1080fn transcript_budget_event(audit: &serde_json::Value) -> VmValue {
1081 let action = audit
1082 .get("action")
1083 .and_then(serde_json::Value::as_str)
1084 .unwrap_or("enforced");
1085 crate::llm::helpers::transcript_event(
1086 "transcript_budget",
1087 "system",
1088 "internal",
1089 &format!("Transcript budget {action}."),
1090 Some(audit.clone()),
1091 )
1092}
1093
1094fn append_event_to_transcript(transcript: VmValue, event: VmValue) -> VmValue {
1095 let Some(dict) = transcript.as_dict() else {
1096 return transcript;
1097 };
1098 let mut next = dict.clone();
1099 let mut events = transcript_events_from_dict(&next);
1100 events.push(event);
1101 next.insert("events".to_string(), VmValue::List(Rc::new(events)));
1102 VmValue::Dict(Rc::new(next))
1103}
1104
1105fn summary_message_vm(summary: &str) -> VmValue {
1106 crate::stdlib::json_to_vm_value(&summary_message_json(summary))
1107}
1108
1109fn tail_message_capacity(
1110 policy: &SessionTranscriptBudgetPolicy,
1111 reserve_audit_event: bool,
1112) -> usize {
1113 let event_capacity = if reserve_audit_event {
1114 policy.max_events.saturating_sub(1)
1115 } else {
1116 policy.max_events
1117 };
1118 policy.max_messages.min(event_capacity)
1119}
1120
1121fn trim_transcript_for_budget(
1122 transcript: &VmValue,
1123 policy: &SessionTranscriptBudgetPolicy,
1124 keep_last: usize,
1125) -> VmValue {
1126 let dict = transcript.as_dict().cloned().unwrap_or_else(BTreeMap::new);
1127 let messages = transcript_messages_from_dict(&dict);
1128 let keep = keep_last.min(tail_message_capacity(policy, true));
1129 let start = messages.len().saturating_sub(keep);
1130 let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
1131 let mut next = dict;
1132 next.insert(
1133 "events".to_string(),
1134 VmValue::List(Rc::new(
1135 crate::llm::helpers::transcript_events_from_messages(&retained),
1136 )),
1137 );
1138 next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
1139 next.remove("summary");
1140 VmValue::Dict(Rc::new(next))
1141}
1142
1143fn compact_transcript_for_budget(
1144 transcript: &VmValue,
1145 policy: &SessionTranscriptBudgetPolicy,
1146 keep_last: usize,
1147 reason: &str,
1148) -> VmValue {
1149 let dict = transcript.as_dict().cloned().unwrap_or_else(BTreeMap::new);
1150 let messages = transcript_messages_from_dict(&dict);
1151 let message_capacity = tail_message_capacity(policy, true);
1152 let mut retained = Vec::new();
1153 let mut summary = None;
1154
1155 if messages.len() > message_capacity {
1156 if message_capacity > 0 {
1157 let tail_keep = keep_last.min(message_capacity.saturating_sub(1));
1158 let archived = messages.len().saturating_sub(tail_keep);
1159 let summary_text = format!(
1160 "[auto-compacted {archived} older message(s) under transcript budget]\nSession transcript exceeded the {reason} budget; retained the most recent {tail_keep} message(s)."
1161 );
1162 retained.push(summary_message_vm(&summary_text));
1163 retained.extend(messages.into_iter().skip(archived).take(tail_keep));
1164 summary = Some(summary_text);
1165 }
1166 } else {
1167 retained = messages;
1168 }
1169
1170 let mut next = dict;
1171 next.insert(
1172 "events".to_string(),
1173 VmValue::List(Rc::new(
1174 crate::llm::helpers::transcript_events_from_messages(&retained),
1175 )),
1176 );
1177 next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
1178 if let Some(summary) = summary {
1179 next.insert("summary".to_string(), VmValue::String(Rc::from(summary)));
1180 } else {
1181 next.remove("summary");
1182 }
1183 VmValue::Dict(Rc::new(next))
1184}
1185
1186fn recovered_transcript_with_audit(
1187 recovered: VmValue,
1188 action: &str,
1189 source: &str,
1190 reason: &str,
1191 policy: &SessionTranscriptBudgetPolicy,
1192 usage_before: &TranscriptBudgetUsage,
1193 usage_attempted: &TranscriptBudgetUsage,
1194 include_bytes: bool,
1195) -> (VmValue, serde_json::Value, TranscriptBudgetUsage) {
1196 let usage_after_without_audit = transcript_usage(&recovered, include_bytes);
1197 let initial_audit = transcript_budget_audit_json(
1198 action,
1199 source,
1200 reason,
1201 policy,
1202 usage_before,
1203 usage_attempted,
1204 &usage_after_without_audit,
1205 );
1206 let with_initial_audit =
1207 append_event_to_transcript(recovered.clone(), transcript_budget_event(&initial_audit));
1208 let usage_after = transcript_usage(&with_initial_audit, include_bytes);
1209 let audit = transcript_budget_audit_json(
1210 action,
1211 source,
1212 reason,
1213 policy,
1214 usage_before,
1215 usage_attempted,
1216 &usage_after,
1217 );
1218 let with_audit = append_event_to_transcript(recovered, transcript_budget_event(&audit));
1219 let usage_after = transcript_usage(&with_audit, include_bytes);
1220 (with_audit, audit, usage_after)
1221}
1222
1223fn apply_transcript_with_budget(
1224 state: &mut SessionState,
1225 candidate: VmValue,
1226 source: &str,
1227) -> Result<(), String> {
1228 let policy = state.transcript_budget_policy.normalized();
1229 let include_bytes = policy.max_approx_bytes.is_some();
1230 let usage_before = transcript_usage(&state.transcript, include_bytes);
1231 let usage_attempted = transcript_usage(&candidate, include_bytes);
1232 let Some(reason) = transcript_budget_exceeded_reason(&usage_attempted, &policy) else {
1233 state.transcript = candidate;
1234 return Ok(());
1235 };
1236
1237 match policy.recovery.clone() {
1238 TranscriptBudgetRecovery::Reject => {
1239 let audit = transcript_budget_audit_json(
1240 "rejected",
1241 source,
1242 reason,
1243 &policy,
1244 &usage_before,
1245 &usage_attempted,
1246 &usage_before,
1247 );
1248 state.last_transcript_budget_action = Some(audit);
1249 Err(transcript_budget_error(
1250 state,
1251 &policy,
1252 &usage_attempted,
1253 reason,
1254 ))
1255 }
1256 TranscriptBudgetRecovery::Trim { keep_last } => {
1257 let recovered = trim_transcript_for_budget(&candidate, &policy, keep_last);
1258 let (with_audit, audit, usage_after) = recovered_transcript_with_audit(
1259 recovered,
1260 "trimmed",
1261 source,
1262 reason,
1263 &policy,
1264 &usage_before,
1265 &usage_attempted,
1266 include_bytes,
1267 );
1268 if transcript_budget_exceeded_reason(&usage_after, &policy).is_some() {
1269 let rejected = transcript_budget_audit_json(
1270 "rejected",
1271 source,
1272 reason,
1273 &policy,
1274 &usage_before,
1275 &usage_attempted,
1276 &usage_after,
1277 );
1278 state.last_transcript_budget_action = Some(rejected);
1279 return Err(transcript_budget_error(
1280 state,
1281 &policy,
1282 &usage_after,
1283 reason,
1284 ));
1285 }
1286 state.last_transcript_budget_action = Some(audit);
1287 state.transcript = with_audit;
1288 Ok(())
1289 }
1290 TranscriptBudgetRecovery::Compact { keep_last } => {
1291 let recovered = compact_transcript_for_budget(&candidate, &policy, keep_last, reason);
1292 let (with_audit, audit, usage_after) = recovered_transcript_with_audit(
1293 recovered,
1294 "compacted",
1295 source,
1296 reason,
1297 &policy,
1298 &usage_before,
1299 &usage_attempted,
1300 include_bytes,
1301 );
1302 if transcript_budget_exceeded_reason(&usage_after, &policy).is_some() {
1303 let rejected = transcript_budget_audit_json(
1304 "rejected",
1305 source,
1306 reason,
1307 &policy,
1308 &usage_before,
1309 &usage_attempted,
1310 &usage_after,
1311 );
1312 state.last_transcript_budget_action = Some(rejected);
1313 return Err(transcript_budget_error(
1314 state,
1315 &policy,
1316 &usage_after,
1317 reason,
1318 ));
1319 }
1320 state.last_transcript_budget_action = Some(audit);
1321 state.transcript = with_audit;
1322 Ok(())
1323 }
1324 }
1325}
1326
1327fn emit_llm_message_event(session_id: &str, message_index: usize, message: &VmValue) {
1328 let mut fields = serde_json::Map::new();
1329 fields.insert(
1330 "session_id".to_string(),
1331 serde_json::Value::String(session_id.to_string()),
1332 );
1333 fields.insert(
1334 "message_index".to_string(),
1335 serde_json::json!(message_index),
1336 );
1337 let message_json = crate::llm::helpers::vm_value_to_json(message);
1338 if let Some(role) = message_json.get("role").and_then(|value| value.as_str()) {
1339 fields.insert(
1340 "role".to_string(),
1341 serde_json::Value::String(role.to_string()),
1342 );
1343 }
1344 if let Some(content) = message_json.get("content") {
1345 fields.insert("content".to_string(), content.clone());
1346 }
1347 fields.insert("message".to_string(), message_json);
1348 crate::llm::append_observability_sidecar_entry("message", fields);
1349}
1350
1351pub fn seed_from_messages(
1357 id: Option<String>,
1358 messages: &[serde_json::Value],
1359 metadata: serde_json::Value,
1360 system_prompt: Option<String>,
1361 tool_format: Option<String>,
1362) -> Result<String, String> {
1363 let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
1364 if exists(&resolved) {
1365 return Err(format!("agent session '{resolved}' already exists"));
1366 }
1367 open_or_create(Some(resolved.clone()));
1368 SESSIONS.with(|s| {
1369 let mut map = s.borrow_mut();
1370 let Some(state) = map.get_mut(&resolved) else {
1371 return Err(format!("failed to create agent session '{resolved}'"));
1372 };
1373 state.tool_format = tool_format.filter(|value| !value.trim().is_empty());
1374 state.system_prompt = system_prompt.filter(|value| !value.trim().is_empty());
1375
1376 let mut metadata = metadata
1377 .as_object()
1378 .cloned()
1379 .unwrap_or_else(serde_json::Map::new);
1380 if let Some(tool_format) = state.tool_format.as_ref() {
1381 metadata.insert(
1382 "tool_format".to_string(),
1383 serde_json::Value::String(tool_format.clone()),
1384 );
1385 metadata.insert(
1386 "tool_mode_locked".to_string(),
1387 serde_json::Value::Bool(true),
1388 );
1389 }
1390 if let Some(system_prompt) = state.system_prompt.as_ref() {
1391 metadata.insert(
1392 "system_prompt".to_string(),
1393 crate::llm::helpers::system_prompt_metadata(system_prompt),
1394 );
1395 }
1396 let vm_messages = crate::llm::helpers::json_messages_to_vm(messages);
1397 let candidate = crate::llm::helpers::new_transcript_with(
1398 Some(resolved.clone()),
1399 vm_messages,
1400 None,
1401 Some(crate::stdlib::json_to_vm_value(&serde_json::Value::Object(
1402 metadata,
1403 ))),
1404 );
1405 apply_transcript_with_budget(state, candidate, "seed_from_messages")?;
1406 state.last_accessed = Instant::now();
1407 Ok(resolved)
1408 })
1409}
1410
1411pub fn messages_json(id: &str) -> Vec<serde_json::Value> {
1415 SESSIONS.with(|s| {
1416 let map = s.borrow();
1417 let Some(state) = map.get(id) else {
1418 return Vec::new();
1419 };
1420 let Some(dict) = state.transcript.as_dict() else {
1421 return Vec::new();
1422 };
1423 match dict.get("messages") {
1424 Some(VmValue::List(list)) => list
1425 .iter()
1426 .map(crate::llm::helpers::vm_value_to_json)
1427 .collect(),
1428 _ => Vec::new(),
1429 }
1430 })
1431}
1432
1433#[derive(Clone, Debug, Default)]
1434pub struct SessionPromptState {
1435 pub messages: Vec<serde_json::Value>,
1436 pub summary: Option<String>,
1437}
1438
1439fn summary_message_json(summary: &str) -> serde_json::Value {
1440 serde_json::json!({
1441 "role": "user",
1442 "content": summary,
1443 })
1444}
1445
1446fn messages_begin_with_summary(messages: &[serde_json::Value], summary: &str) -> bool {
1447 messages.first().is_some_and(|message| {
1448 message.get("role").and_then(|value| value.as_str()) == Some("user")
1449 && message.get("content").and_then(|value| value.as_str()) == Some(summary)
1450 })
1451}
1452
1453pub fn prompt_state_json(id: &str) -> SessionPromptState {
1461 SESSIONS.with(|s| {
1462 let map = s.borrow();
1463 let Some(state) = map.get(id) else {
1464 return SessionPromptState::default();
1465 };
1466 let Some(dict) = state.transcript.as_dict() else {
1467 return SessionPromptState::default();
1468 };
1469 let mut messages = match dict.get("messages") {
1470 Some(VmValue::List(list)) => list
1471 .iter()
1472 .map(crate::llm::helpers::vm_value_to_json)
1473 .collect::<Vec<_>>(),
1474 _ => Vec::new(),
1475 };
1476 let summary = dict.get("summary").and_then(|value| match value {
1477 VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
1478 _ => None,
1479 });
1480 if let Some(summary_text) = summary.as_deref() {
1481 if !messages_begin_with_summary(&messages, summary_text) {
1482 messages.insert(0, summary_message_json(summary_text));
1483 }
1484 }
1485 SessionPromptState { messages, summary }
1486 })
1487}
1488
1489pub fn store_transcript(id: &str, transcript: VmValue) -> Result<(), String> {
1492 SESSIONS.with(|s| {
1493 let mut map = s.borrow_mut();
1494 let Some(state) = map.get_mut(id) else {
1495 return Err(format!(
1496 "agent_session_store_transcript: unknown session id '{id}'"
1497 ));
1498 };
1499 let transcript = transcript_with_session_metadata(transcript, state);
1500 apply_transcript_with_budget(state, transcript, "store_transcript")?;
1501 state.last_accessed = Instant::now();
1502 Ok(())
1503 })
1504}
1505
1506pub fn prune_invalid_reminder_events(id: &str) -> usize {
1510 SESSIONS.with(|s| {
1511 let mut map = s.borrow_mut();
1512 let Some(state) = map.get_mut(id) else {
1513 return 0;
1514 };
1515 let Some(dict) = state.transcript.as_dict().cloned() else {
1516 return 0;
1517 };
1518 let Some(VmValue::List(events)) = dict.get("events") else {
1519 return 0;
1520 };
1521 let mut pruned = 0_usize;
1522 let mut kept = Vec::with_capacity(events.len());
1523 for event in events.iter().cloned() {
1524 let is_reminder = event
1525 .as_dict()
1526 .and_then(|event| event.get("kind"))
1527 .map(VmValue::display)
1528 .as_deref()
1529 == Some(crate::llm::helpers::SYSTEM_REMINDER_EVENT_KIND);
1530 if !is_reminder {
1531 kept.push(event);
1532 continue;
1533 }
1534 let valid = crate::llm::helpers::reminder_from_event(&event)
1535 .is_some_and(|reminder| !reminder.body.trim().is_empty());
1536 if valid {
1537 kept.push(event);
1538 } else {
1539 pruned += 1;
1540 }
1541 }
1542 if pruned > 0 {
1543 let mut next = dict;
1544 next.insert("events".to_string(), VmValue::List(Rc::new(kept)));
1545 let _ = apply_transcript_with_budget(
1546 state,
1547 VmValue::Dict(Rc::new(next)),
1548 "prune_invalid_reminder_events",
1549 );
1550 state.last_accessed = Instant::now();
1551 }
1552 pruned
1553 })
1554}
1555
1556pub fn apply_reminder_post_turn(id: &str, turn: i64) -> Result<serde_json::Value, String> {
1561 let report = SESSIONS.with(|s| {
1562 let mut map = s.borrow_mut();
1563 let Some(state) = map.get_mut(id) else {
1564 return Err(format!(
1565 "agent_session_apply_reminder_post_turn: unknown session id '{id}'"
1566 ));
1567 };
1568 let report = crate::llm::helpers::apply_reminder_post_turn(&state.transcript, turn);
1569 if report.decremented_count > 0 || !report.expired.is_empty() {
1570 if let Some(next) = report.transcript.clone() {
1571 apply_transcript_with_budget(state, next, "apply_reminder_post_turn")?;
1572 }
1573 state.last_accessed = Instant::now();
1574 }
1575 Ok(report)
1576 })?;
1577
1578 for reminder in &report.expired {
1579 let mut payload = crate::llm::helpers::reminder_lifecycle_payload(Some(id), reminder);
1580 if let Some(obj) = payload.as_object_mut() {
1581 obj.insert(
1582 "transcript_id".to_string(),
1583 serde_json::Value::String(id.to_string()),
1584 );
1585 obj.insert(
1586 "reason".to_string(),
1587 serde_json::Value::String("ttl".to_string()),
1588 );
1589 obj.insert(
1590 "ttl_turns_before".to_string(),
1591 serde_json::json!(&reminder.ttl_turns),
1592 );
1593 obj.insert("expired_at_turn".to_string(), serde_json::json!(turn));
1594 }
1595 crate::llm::helpers::emit_reminder_lifecycle_event(
1596 crate::llm::helpers::REMINDER_EXPIRED_EVENT_KIND,
1597 payload,
1598 );
1599 }
1600
1601 Ok(serde_json::json!({
1602 "expired_count": report.expired.len(),
1603 "decremented_count": report.decremented_count,
1604 "remaining_count": report.remaining_count,
1605 }))
1606}
1607
1608pub fn inject_reminder(
1613 id: &str,
1614 reminder: crate::llm::helpers::SystemReminder,
1615) -> Result<ReminderInjectionReport, String> {
1616 let reminder_id = reminder.id.clone();
1617 let dedupe_key = reminder.dedupe_key.clone();
1618 let mut deduped_reminder_ids = Vec::new();
1619 SESSIONS.with(|s| {
1620 let mut map = s.borrow_mut();
1621 let Some(state) = map.get_mut(id) else {
1622 return Err(format!(
1623 "agent_session_inject_reminder: unknown session id '{id}'"
1624 ));
1625 };
1626 let dict = state
1627 .transcript
1628 .as_dict()
1629 .cloned()
1630 .unwrap_or_else(BTreeMap::new);
1631 let mut events: Vec<VmValue> = match dict.get("events") {
1632 Some(VmValue::List(list)) => list.iter().cloned().collect(),
1633 _ => dict
1634 .get("messages")
1635 .and_then(|value| match value {
1636 VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
1637 _ => None,
1638 })
1639 .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
1640 .unwrap_or_default(),
1641 };
1642 if let Some(expected_key) = dedupe_key.as_deref() {
1643 events.retain(|event| {
1644 let Some(existing) = crate::llm::helpers::reminder_from_event(event) else {
1645 return true;
1646 };
1647 if existing.dedupe_key.as_deref() == Some(expected_key) {
1648 deduped_reminder_ids.push(existing.id);
1649 false
1650 } else {
1651 true
1652 }
1653 });
1654 }
1655 events.push(crate::llm::helpers::transcript_reminder_event(&reminder));
1656 let mut next = dict;
1657 next.insert("events".to_string(), VmValue::List(Rc::new(events)));
1658 apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), "inject_reminder")?;
1659 state.last_accessed = Instant::now();
1660 Ok(())
1661 })?;
1662
1663 if !deduped_reminder_ids.is_empty() {
1664 let dropped_count = deduped_reminder_ids.len();
1665 crate::llm::helpers::emit_reminder_lifecycle_event(
1666 crate::llm::helpers::REMINDER_DEDUPED_EVENT_KIND,
1667 serde_json::json!({
1668 "session_id": id,
1669 "transcript_id": id,
1670 "reminder_id": &reminder_id,
1671 "replacing_id": &reminder_id,
1672 "replaced_id": deduped_reminder_ids.first(),
1673 "replaced_ids": &deduped_reminder_ids,
1674 "dedupe_key": &dedupe_key,
1675 "dropped_reminder_ids": &deduped_reminder_ids,
1676 "dropped_count": dropped_count,
1677 }),
1678 );
1679 }
1680
1681 crate::llm::helpers::emit_reminder_lifecycle_event(
1682 crate::llm::helpers::REMINDER_INJECTED_EVENT_KIND,
1683 crate::llm::helpers::reminder_lifecycle_payload(Some(id), &reminder),
1684 );
1685
1686 Ok(ReminderInjectionReport {
1687 reminder_id,
1688 deduped_count: deduped_reminder_ids.len(),
1689 })
1690}
1691
1692pub fn append_event(id: &str, event: VmValue) -> Result<(), String> {
1698 let Some(event_dict) = event.as_dict() else {
1699 return Err("agent_session_append_event: event must be a dict".into());
1700 };
1701 let kind_ok = matches!(event_dict.get("kind"), Some(VmValue::String(_)));
1702 if !kind_ok {
1703 return Err("agent_session_append_event: event must have a string `kind`".into());
1704 }
1705 SESSIONS.with(|s| {
1706 let mut map = s.borrow_mut();
1707 let Some(state) = map.get_mut(id) else {
1708 return Err(format!(
1709 "agent_session_append_event: unknown session id '{id}'"
1710 ));
1711 };
1712 append_event_to_state(state, event, "append_event")?;
1713 state.last_accessed = Instant::now();
1714 Ok(())
1715 })
1716}
1717
1718fn append_event_to_state(
1719 state: &mut SessionState,
1720 event: VmValue,
1721 action: &str,
1722) -> Result<(), String> {
1723 let dict = state
1724 .transcript
1725 .as_dict()
1726 .cloned()
1727 .unwrap_or_else(BTreeMap::new);
1728 let mut events: Vec<VmValue> = match dict.get("events") {
1729 Some(VmValue::List(list)) => list.iter().cloned().collect(),
1730 _ => dict
1731 .get("messages")
1732 .and_then(|value| match value {
1733 VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
1734 _ => None,
1735 })
1736 .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
1737 .unwrap_or_default(),
1738 };
1739 events.push(event);
1740 let mut next = dict;
1741 next.insert("events".to_string(), VmValue::List(Rc::new(events)));
1742 apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), action)
1743}
1744
1745pub fn replace_messages(id: &str, messages: &[serde_json::Value]) -> Result<(), String> {
1748 replace_messages_with_summary(id, messages, None)
1749}
1750
1751pub fn replace_messages_with_summary(
1756 id: &str,
1757 messages: &[serde_json::Value],
1758 summary: Option<&str>,
1759) -> Result<(), String> {
1760 SESSIONS.with(|s| {
1761 let mut map = s.borrow_mut();
1762 let Some(state) = map.get_mut(id) else {
1763 return Err(format!(
1764 "agent_session_replace_messages: unknown session id '{id}'"
1765 ));
1766 };
1767 let dict = state
1768 .transcript
1769 .as_dict()
1770 .cloned()
1771 .unwrap_or_else(BTreeMap::new);
1772 let vm_messages: Vec<VmValue> = messages
1773 .iter()
1774 .map(crate::stdlib::json_to_vm_value)
1775 .collect();
1776 let mut next = dict;
1777 next.insert(
1778 "events".to_string(),
1779 VmValue::List(Rc::new(
1780 crate::llm::helpers::transcript_events_from_messages(&vm_messages),
1781 )),
1782 );
1783 next.insert("messages".to_string(), VmValue::List(Rc::new(vm_messages)));
1784 if let Some(summary) = summary {
1785 next.insert(
1786 "summary".to_string(),
1787 VmValue::String(Rc::from(summary.to_string())),
1788 );
1789 } else {
1790 next.remove("summary");
1791 }
1792 apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), "replace_messages")?;
1793 state.last_accessed = Instant::now();
1794 Ok(())
1795 })
1796}
1797
1798pub fn append_subscriber(id: &str, callback: VmValue) {
1799 open_or_create(Some(id.to_string()));
1800 SESSIONS.with(|s| {
1801 if let Some(state) = s.borrow_mut().get_mut(id) {
1802 state.subscribers.push(callback);
1803 state.last_accessed = Instant::now();
1804 }
1805 });
1806}
1807
1808pub fn subscribers_for(id: &str) -> Vec<VmValue> {
1809 SESSIONS.with(|s| {
1810 s.borrow()
1811 .get(id)
1812 .map(|state| state.subscribers.clone())
1813 .unwrap_or_default()
1814 })
1815}
1816
1817pub fn subscriber_count(id: &str) -> usize {
1818 SESSIONS.with(|s| {
1819 s.borrow()
1820 .get(id)
1821 .map(|state| state.subscribers.len())
1822 .unwrap_or(0)
1823 })
1824}
1825
1826pub fn set_active_skills(id: &str, skills: Vec<String>) {
1830 SESSIONS.with(|s| {
1831 if let Some(state) = s.borrow_mut().get_mut(id) {
1832 state.active_skills = skills;
1833 state.last_accessed = Instant::now();
1834 }
1835 });
1836}
1837
1838pub fn active_skills(id: &str) -> Vec<String> {
1842 SESSIONS.with(|s| {
1843 s.borrow()
1844 .get(id)
1845 .map(|state| state.active_skills.clone())
1846 .unwrap_or_default()
1847 })
1848}
1849
1850pub fn claim_tool_format(id: &str, tool_format: &str) -> Result<(), String> {
1856 let tool_format = tool_format.trim();
1857 if tool_format.is_empty() {
1858 return Ok(());
1859 }
1860 SESSIONS.with(|s| {
1861 let mut map = s.borrow_mut();
1862 let Some(state) = map.get_mut(id) else {
1863 return Err(format!("agent session '{id}' does not exist"));
1864 };
1865 match state.tool_format.as_deref() {
1866 Some(existing) if existing != tool_format => Err(format!(
1867 "agent session '{id}' is locked to tool_format='{existing}', but this run requested tool_format='{tool_format}'. Start a new session or fork/reset the transcript before changing tool mode."
1868 )),
1869 Some(_) => {
1870 state.last_accessed = Instant::now();
1871 Ok(())
1872 }
1873 None => {
1874 state.tool_format = Some(tool_format.to_string());
1875 state.last_accessed = Instant::now();
1876 Ok(())
1877 }
1878 }
1879 })
1880}
1881
1882pub fn tool_format(id: &str) -> Option<String> {
1883 SESSIONS.with(|s| {
1884 s.borrow()
1885 .get(id)
1886 .and_then(|state| state.tool_format.clone())
1887 })
1888}
1889
1890pub fn record_system_prompt(id: &str, system_prompt: &str) -> Result<(), String> {
1891 let system_prompt = system_prompt.trim();
1892 if system_prompt.is_empty() {
1893 return Ok(());
1894 }
1895 assert_cache_stable_system_prompt(system_prompt);
1896 SESSIONS.with(|s| {
1897 let mut map = s.borrow_mut();
1898 let Some(state) = map.get_mut(id) else {
1899 return Err(format!("agent session '{id}' does not exist"));
1900 };
1901 let changed = state.system_prompt.as_deref() != Some(system_prompt);
1902 state.system_prompt = Some(system_prompt.to_string());
1903 let dict = state
1904 .transcript
1905 .as_dict()
1906 .cloned()
1907 .unwrap_or_else(BTreeMap::new);
1908 let mut next = dict;
1909 apply_system_prompt_metadata(&mut next, system_prompt);
1910 if changed {
1911 let mut events: Vec<VmValue> = match next.get("events") {
1912 Some(VmValue::List(list)) => list.iter().cloned().collect(),
1913 _ => Vec::new(),
1914 };
1915 events.push(crate::llm::helpers::transcript_event(
1916 "system_prompt",
1917 "system",
1918 "internal",
1919 "",
1920 Some(crate::llm::helpers::system_prompt_event_metadata(
1921 system_prompt,
1922 )),
1923 ));
1924 next.insert("events".to_string(), VmValue::List(Rc::new(events)));
1925 }
1926 apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), "record_system_prompt")?;
1927 state.last_accessed = Instant::now();
1928 Ok(())
1929 })
1930}
1931
1932pub fn system_prompt(id: &str) -> Option<String> {
1933 SESSIONS.with(|s| {
1934 s.borrow()
1935 .get(id)
1936 .and_then(|state| state.system_prompt.clone())
1937 })
1938}
1939
1940#[cfg(debug_assertions)]
1941fn forbidden_workspace_prompt_token(system_prompt: &str) -> Option<&'static str> {
1942 let mut remaining = system_prompt;
1943 while let Some(index) = remaining.find("{{") {
1944 let candidate = remaining[index + 2..].trim_start();
1945 if candidate.starts_with("workspace_") {
1946 return Some("workspace_");
1947 }
1948 if candidate.starts_with("project_") {
1949 return Some("project_");
1950 }
1951 remaining = candidate;
1952 }
1953 None
1954}
1955
1956#[cfg(debug_assertions)]
1957fn assert_cache_stable_system_prompt(system_prompt: &str) {
1958 if let Some(prefix) = forbidden_workspace_prompt_token(system_prompt) {
1959 panic!(
1960 "{CACHE_STABLE_SYSTEM_PROMPT_DIAGNOSTIC}: session system prompts must not interpolate `{{{{{prefix}...` tokens; move workspace/project context into the workspace-anchor reminder"
1961 );
1962 }
1963}
1964
1965#[cfg(not(debug_assertions))]
1966fn assert_cache_stable_system_prompt(_system_prompt: &str) {}
1967
1968pub fn set_pinned_model(id: &str, model: Option<String>) -> Result<bool, String> {
1973 let normalized = model
1974 .map(|value| value.trim().to_string())
1975 .filter(|value| !value.is_empty());
1976 SESSIONS.with(|s| {
1977 let mut map = s.borrow_mut();
1978 let Some(state) = map.get_mut(id) else {
1979 return Err(format!("agent session '{id}' does not exist"));
1980 };
1981 let changed = state.pinned_model != normalized;
1982 state.pinned_model = normalized;
1983 state.last_accessed = Instant::now();
1984 Ok(changed)
1985 })
1986}
1987
1988pub fn pinned_model(id: &str) -> Option<String> {
1992 SESSIONS.with(|s| {
1993 s.borrow()
1994 .get(id)
1995 .and_then(|state| state.pinned_model.clone())
1996 })
1997}
1998
1999pub fn set_pinned_reasoning_policy(id: &str, policy: Option<String>) -> Result<bool, String> {
2001 let normalized = match policy {
2002 Some(value) => crate::llm::reasoning_policy::normalize_policy_selector(&value)?,
2003 None => None,
2004 };
2005 SESSIONS.with(|s| {
2006 let mut map = s.borrow_mut();
2007 let Some(state) = map.get_mut(id) else {
2008 return Err(format!("agent session '{id}' does not exist"));
2009 };
2010 let changed = state.pinned_reasoning_policy != normalized;
2011 state.pinned_reasoning_policy = normalized;
2012 state.last_accessed = Instant::now();
2013 Ok(changed)
2014 })
2015}
2016
2017pub fn pinned_reasoning_policy(id: &str) -> Option<String> {
2019 SESSIONS.with(|s| {
2020 s.borrow()
2021 .get(id)
2022 .and_then(|state| state.pinned_reasoning_policy.clone())
2023 })
2024}
2025
2026pub fn set_workspace_anchor(id: &str, anchor: Option<WorkspaceAnchor>) -> Result<bool, String> {
2030 SESSIONS.with(|s| {
2031 let mut map = s.borrow_mut();
2032 let Some(state) = map.get_mut(id) else {
2033 return Err(format!("agent session '{id}' does not exist"));
2034 };
2035 let changed = state.workspace_anchor != anchor;
2036 state.workspace_anchor = anchor;
2037 if changed {
2038 crate::llm::permissions::clear_session_grants(id);
2039 }
2040 state.last_accessed = Instant::now();
2041 Ok(changed)
2042 })
2043}
2044
2045pub fn workspace_anchor(id: &str) -> Option<WorkspaceAnchor> {
2047 SESSIONS.with(|s| {
2048 s.borrow()
2049 .get(id)
2050 .and_then(|state| state.workspace_anchor.clone())
2051 })
2052}
2053
2054#[derive(Clone, Debug, PartialEq, Eq)]
2058pub struct ReanchorOutcome {
2059 pub previous: Option<WorkspaceAnchor>,
2060 pub current: WorkspaceAnchor,
2061 pub changed: bool,
2062}
2063
2064pub fn reanchor_session(
2069 id: &str,
2070 new_anchor: WorkspaceAnchor,
2071 carry_transcript: bool,
2072 compacted: bool,
2073 reason: Option<String>,
2074) -> Result<ReanchorOutcome, String> {
2075 let outcome = SESSIONS.with(|s| {
2076 let mut map = s.borrow_mut();
2077 let Some(state) = map.get_mut(id) else {
2078 return Err(format!("agent session '{id}' does not exist"));
2079 };
2080 let previous = state.workspace_anchor.clone();
2081 let changed = previous.as_ref() != Some(&new_anchor);
2082 state.workspace_anchor = Some(new_anchor.clone());
2083 if changed {
2084 crate::llm::permissions::clear_session_grants(id);
2085 }
2086 state.last_accessed = Instant::now();
2087 Ok(ReanchorOutcome {
2088 previous,
2089 current: new_anchor,
2090 changed,
2091 })
2092 })?;
2093 if !outcome.changed {
2094 return Ok(outcome);
2095 }
2096 let previous_json = outcome.previous.as_ref().map(WorkspaceAnchor::to_json);
2097 let current_json = outcome.current.to_json();
2098 let event_metadata = serde_json::json!({
2099 "previous": previous_json,
2100 "current": current_json,
2101 "carry_transcript": carry_transcript,
2102 "compacted": compacted,
2103 "reason": reason,
2104 });
2105 let event = crate::llm::helpers::transcript_event(
2106 "AnchorChanged",
2107 "system",
2108 "internal",
2109 "",
2110 Some(event_metadata),
2111 );
2112 let _ = append_event(id, event);
2113 crate::llm::emit_live_agent_event_sync(&crate::agent_events::AgentEvent::AnchorChanged {
2114 session_id: id.to_string(),
2115 previous: previous_json,
2116 current: current_json,
2117 carry_transcript,
2118 compacted,
2119 reason,
2120 });
2121 Ok(outcome)
2122}
2123
2124pub fn set_workspace_policy(id: &str, policy: WorkspacePolicy) -> Result<bool, String> {
2127 SESSIONS.with(|s| {
2128 let mut map = s.borrow_mut();
2129 let Some(state) = map.get_mut(id) else {
2130 return Err(format!("agent session '{id}' does not exist"));
2131 };
2132 let changed = state.workspace_policy != policy;
2133 state.workspace_policy = policy;
2134 state.last_accessed = Instant::now();
2135 Ok(changed)
2136 })
2137}
2138
2139pub fn workspace_policy(id: &str) -> Option<WorkspacePolicy> {
2141 SESSIONS.with(|s| {
2142 s.borrow()
2143 .get(id)
2144 .map(|state| state.workspace_policy.clone())
2145 })
2146}
2147
2148pub fn add_workspace_root(
2152 id: &str,
2153 root: &str,
2154 mount_mode: Option<MountMode>,
2155 reason: Option<String>,
2156) -> Result<String, String> {
2157 let normalized_root = validate_workspace_root_path(root)?;
2158 let mounted_at = crate::orchestration::now_rfc3339();
2159 SESSIONS.with(|s| {
2160 let mut map = s.borrow_mut();
2161 let Some(state) = map.get_mut(id) else {
2162 return Err(format!("agent session '{id}' does not exist"));
2163 };
2164 let default_mount_mode = state.workspace_policy.default_mount_mode;
2165 let Some(anchor) = state.workspace_anchor.as_mut() else {
2166 return Err(format!("agent session '{id}' has no workspace anchor"));
2167 };
2168 let resolved_mount_mode = mount_mode.unwrap_or(default_mount_mode);
2169 if let Some(existing) = anchor
2170 .additional_roots
2171 .iter_mut()
2172 .find(|entry| entry.path == normalized_root)
2173 {
2174 existing.mount_mode = resolved_mount_mode;
2175 existing.mounted_at = mounted_at.clone();
2176 } else {
2177 anchor.additional_roots.push(MountedRoot {
2178 path: normalized_root.clone(),
2179 mount_mode: resolved_mount_mode,
2180 mounted_at: mounted_at.clone(),
2181 });
2182 }
2183 let event = crate::llm::helpers::transcript_event(
2184 "RootMounted",
2185 "system",
2186 "internal",
2187 "",
2188 Some(serde_json::json!({
2189 "path": normalized_root.to_string_lossy(),
2190 "mount_mode": resolved_mount_mode.as_str(),
2191 "mounted_at": mounted_at.clone(),
2192 "reason": reason,
2193 })),
2194 );
2195 append_event_to_state(state, event, "add_workspace_root")?;
2196 crate::llm::permissions::clear_session_grants(id);
2197 state.last_accessed = Instant::now();
2198 Ok(mounted_at.clone())
2199 })
2200}
2201
2202pub fn remove_workspace_root(id: &str, root: &str) -> Result<bool, String> {
2205 let normalized_root = normalize_workspace_root_path(root);
2206 SESSIONS.with(|s| {
2207 let mut map = s.borrow_mut();
2208 let Some(state) = map.get_mut(id) else {
2209 return Err(format!("agent session '{id}' does not exist"));
2210 };
2211 let Some(anchor) = state.workspace_anchor.as_mut() else {
2212 return Err(format!("agent session '{id}' has no workspace anchor"));
2213 };
2214 let before = anchor.additional_roots.len();
2215 anchor
2216 .additional_roots
2217 .retain(|entry| entry.path != normalized_root);
2218 let removed = anchor.additional_roots.len() != before;
2219 if removed {
2220 crate::llm::permissions::clear_session_grants(id);
2221 }
2222 state.last_accessed = Instant::now();
2223 Ok(removed)
2224 })
2225}
2226
2227pub fn list_workspace_roots(id: &str) -> Result<(PathBuf, Vec<MountedRoot>), String> {
2228 SESSIONS.with(|s| {
2229 let map = s.borrow();
2230 let Some(state) = map.get(id) else {
2231 return Err(format!("agent session '{id}' does not exist"));
2232 };
2233 let Some(anchor) = state.workspace_anchor.as_ref() else {
2234 return Err(format!("agent session '{id}' has no workspace anchor"));
2235 };
2236 Ok((anchor.primary.clone(), anchor.additional_roots.clone()))
2237 })
2238}
2239
2240fn validate_workspace_root_path(root: &str) -> Result<PathBuf, String> {
2241 let normalized = normalize_workspace_root_path(root);
2242 let canonical = std::fs::canonicalize(&normalized)
2243 .map_err(|error| format!("workspace root '{root}' must exist and be readable: {error}"))?;
2244 let metadata = std::fs::metadata(&canonical)
2245 .map_err(|error| format!("workspace root '{root}' must exist and be readable: {error}"))?;
2246 if !metadata.is_dir() {
2247 return Err(format!("workspace root '{root}' must be a directory"));
2248 }
2249 std::fs::read_dir(&canonical)
2250 .map_err(|error| format!("workspace root '{root}' must be readable: {error}"))?;
2251 Ok(canonical)
2252}
2253
2254fn normalize_workspace_root_path(root: &str) -> PathBuf {
2255 let absolute = crate::stdlib::process::normalize_context_path(Path::new(root));
2256 std::fs::canonicalize(&absolute).unwrap_or(absolute)
2257}
2258
2259fn empty_transcript(id: &str) -> VmValue {
2260 use crate::llm::helpers::new_transcript_with;
2261 new_transcript_with(Some(id.to_string()), Vec::new(), None, None)
2262}
2263
2264fn clone_transcript_with_id(transcript: &VmValue, new_id: &str) -> VmValue {
2265 let Some(dict) = transcript.as_dict() else {
2266 return empty_transcript(new_id);
2267 };
2268 let mut next = dict.clone();
2269 next.insert(
2270 "id".to_string(),
2271 VmValue::String(Rc::from(new_id.to_string())),
2272 );
2273 VmValue::Dict(Rc::new(next))
2274}
2275
2276fn clone_transcript_with_parent(transcript: &VmValue, parent_id: &str) -> VmValue {
2277 let Some(dict) = transcript.as_dict() else {
2278 return transcript.clone();
2279 };
2280 let mut next = dict.clone();
2281 let metadata = match next.get("metadata") {
2282 Some(VmValue::Dict(metadata)) => {
2283 let mut metadata = metadata.as_ref().clone();
2284 metadata.insert(
2285 "parent_session_id".to_string(),
2286 VmValue::String(Rc::from(parent_id.to_string())),
2287 );
2288 VmValue::Dict(Rc::new(metadata))
2289 }
2290 _ => VmValue::Dict(Rc::new(BTreeMap::from([(
2291 "parent_session_id".to_string(),
2292 VmValue::String(Rc::from(parent_id.to_string())),
2293 )]))),
2294 };
2295 next.insert("metadata".to_string(), metadata);
2296 VmValue::Dict(Rc::new(next))
2297}
2298
2299fn apply_system_prompt_metadata(next: &mut BTreeMap<String, VmValue>, system_prompt: &str) {
2300 let mut metadata = match next.get("metadata") {
2301 Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
2302 _ => BTreeMap::new(),
2303 };
2304 metadata.insert(
2305 "system_prompt".to_string(),
2306 crate::stdlib::json_to_vm_value(&crate::llm::helpers::system_prompt_metadata(
2307 system_prompt,
2308 )),
2309 );
2310 next.insert("metadata".to_string(), VmValue::Dict(Rc::new(metadata)));
2311}
2312
2313fn transcript_with_session_metadata(transcript: VmValue, state: &SessionState) -> VmValue {
2314 let Some(dict) = transcript.as_dict() else {
2315 return transcript;
2316 };
2317 let mut next = dict.clone();
2318 let mut metadata = match next.get("metadata") {
2319 Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
2320 _ => BTreeMap::new(),
2321 };
2322 if let Some(tool_format) = state.tool_format.as_ref() {
2323 metadata.insert(
2324 "tool_format".to_string(),
2325 VmValue::String(Rc::from(tool_format.clone())),
2326 );
2327 metadata.insert("tool_mode_locked".to_string(), VmValue::Bool(true));
2328 }
2329 if let Some(system_prompt) = state.system_prompt.as_ref() {
2330 metadata.insert(
2331 "system_prompt".to_string(),
2332 crate::stdlib::json_to_vm_value(&crate::llm::helpers::system_prompt_metadata(
2333 system_prompt,
2334 )),
2335 );
2336 }
2337 if let Some(anchor) = state.workspace_anchor.as_ref() {
2338 metadata.insert(
2339 WORKSPACE_ANCHOR_METADATA_KEY.to_string(),
2340 anchor.to_vm_value(),
2341 );
2342 } else {
2343 metadata.remove(WORKSPACE_ANCHOR_METADATA_KEY);
2344 }
2345 if let Some(last_action) = state.last_transcript_budget_action.as_ref() {
2346 let usage = transcript_usage(
2347 &VmValue::Dict(Rc::new(next.clone())),
2348 state.transcript_budget_policy.max_approx_bytes.is_some(),
2349 );
2350 metadata.insert(
2351 "transcript_budget".to_string(),
2352 crate::stdlib::json_to_vm_value(&serde_json::json!({
2353 "policy": transcript_budget_policy_json(&state.transcript_budget_policy.normalized()),
2354 "usage": transcript_budget_usage_json(&usage),
2355 "last_action": last_action,
2356 })),
2357 );
2358 }
2359 if !metadata.is_empty() {
2360 next.insert("metadata".to_string(), VmValue::Dict(Rc::new(metadata)));
2361 }
2362 VmValue::Dict(Rc::new(next))
2363}
2364
2365fn session_snapshot(state: &SessionState) -> VmValue {
2366 let transcript = transcript_with_session_metadata(state.transcript.clone(), state);
2367 let Some(dict) = transcript.as_dict() else {
2368 return state.transcript.clone();
2369 };
2370 let mut next = dict.clone();
2371 let length = next
2372 .get("messages")
2373 .and_then(|value| match value {
2374 VmValue::List(list) => Some(list.len() as i64),
2375 _ => None,
2376 })
2377 .unwrap_or(0);
2378 next.insert("length".to_string(), VmValue::Int(length));
2379 next.insert(
2380 "created_at".to_string(),
2381 VmValue::String(Rc::from(state.created_at.clone())),
2382 );
2383 next.insert(
2384 "parent_id".to_string(),
2385 state
2386 .parent_id
2387 .as_ref()
2388 .map(|id| VmValue::String(Rc::from(id.clone())))
2389 .unwrap_or(VmValue::Nil),
2390 );
2391 next.insert(
2392 "child_ids".to_string(),
2393 VmValue::List(Rc::new(
2394 state
2395 .child_ids
2396 .iter()
2397 .cloned()
2398 .map(|id| VmValue::String(Rc::from(id)))
2399 .collect(),
2400 )),
2401 );
2402 next.insert(
2403 "branched_at_event_index".to_string(),
2404 state
2405 .branched_at_event_index
2406 .map(|index| VmValue::Int(index as i64))
2407 .unwrap_or(VmValue::Nil),
2408 );
2409 next.insert(
2410 "system_prompt".to_string(),
2411 state
2412 .system_prompt
2413 .as_ref()
2414 .map(|prompt| VmValue::String(Rc::from(prompt.clone())))
2415 .unwrap_or(VmValue::Nil),
2416 );
2417 next.insert(
2418 "tool_format".to_string(),
2419 state
2420 .tool_format
2421 .as_ref()
2422 .map(|format| VmValue::String(Rc::from(format.clone())))
2423 .unwrap_or(VmValue::Nil),
2424 );
2425 next.insert(
2426 "pinned_model".to_string(),
2427 state
2428 .pinned_model
2429 .as_ref()
2430 .map(|model| VmValue::String(Rc::from(model.clone())))
2431 .unwrap_or(VmValue::Nil),
2432 );
2433 next.insert(
2434 "pinned_reasoning_policy".to_string(),
2435 state
2436 .pinned_reasoning_policy
2437 .as_ref()
2438 .map(|policy| VmValue::String(Rc::from(policy.clone())))
2439 .unwrap_or(VmValue::Nil),
2440 );
2441 next.insert(
2442 "workspace_anchor".to_string(),
2443 state
2444 .workspace_anchor
2445 .as_ref()
2446 .map(WorkspaceAnchor::to_vm_value)
2447 .unwrap_or(VmValue::Nil),
2448 );
2449 next.insert(
2450 "workspace_policy".to_string(),
2451 state.workspace_policy.to_vm_value(),
2452 );
2453 VmValue::Dict(Rc::new(next))
2454}
2455
2456fn update_lineage(
2457 map: &mut HashMap<String, SessionState>,
2458 parent_id: &str,
2459 child_id: &str,
2460 branched_at_event_index: Option<usize>,
2461) {
2462 let old_parent_id = map.get(child_id).and_then(|child| child.parent_id.clone());
2463 if let Some(old_parent_id) = old_parent_id.filter(|old_parent_id| old_parent_id != parent_id) {
2464 if let Some(old_parent) = map.get_mut(&old_parent_id) {
2465 old_parent.child_ids.retain(|id| id != child_id);
2466 old_parent.last_accessed = Instant::now();
2467 }
2468 }
2469 if let Some(parent) = map.get_mut(parent_id) {
2470 parent.last_accessed = Instant::now();
2471 if !parent.child_ids.iter().any(|id| id == child_id) {
2472 parent.child_ids.push(child_id.to_string());
2473 }
2474 }
2475 if let Some(child) = map.get_mut(child_id) {
2476 child.last_accessed = Instant::now();
2477 child.parent_id = Some(parent_id.to_string());
2478 child.branched_at_event_index = branched_at_event_index;
2479 child.transcript = clone_transcript_with_parent(&child.transcript, parent_id);
2480 }
2481}
2482
2483fn branch_event_index(transcript: &VmValue, keep_first: usize) -> usize {
2484 if keep_first == 0 {
2485 return 0;
2486 }
2487 let Some(dict) = transcript.as_dict() else {
2488 return keep_first;
2489 };
2490 let Some(VmValue::List(events)) = dict.get("events") else {
2491 return keep_first;
2492 };
2493 event_prefix_len_for_messages(events, keep_first)
2494}
2495
2496fn event_kind(event: &VmValue) -> Option<String> {
2497 event
2498 .as_dict()
2499 .and_then(|dict| dict.get("kind"))
2500 .map(VmValue::display)
2501}
2502
2503fn event_id(event: &VmValue) -> Option<String> {
2504 event
2505 .as_dict()
2506 .and_then(|dict| dict.get("id"))
2507 .map(VmValue::display)
2508}
2509
2510fn is_turn_event(event: &VmValue) -> bool {
2511 matches!(
2512 event_kind(event).as_deref(),
2513 Some("message" | "tool_result")
2514 )
2515}
2516
2517fn event_prefix_len_for_messages(events: &[VmValue], keep_first: usize) -> usize {
2518 if keep_first == 0 {
2519 return 0;
2520 }
2521 let mut retained_messages = 0usize;
2522 for (index, event) in events.iter().enumerate() {
2523 if is_turn_event(event) {
2524 retained_messages += 1;
2525 if retained_messages == keep_first {
2526 return index + 1;
2527 }
2528 }
2529 }
2530 events.len()
2531}
2532
2533fn turn_event_id_for_count(events: &[VmValue], keep_first: usize) -> Option<String> {
2534 if keep_first == 0 {
2535 return None;
2536 }
2537 let mut retained_messages = 0usize;
2538 for event in events {
2539 if is_turn_event(event) {
2540 retained_messages += 1;
2541 if retained_messages == keep_first {
2542 return event_id(event);
2543 }
2544 }
2545 }
2546 None
2547}
2548
2549#[cfg(test)]
2550#[path = "agent_sessions_tests.rs"]
2551mod tests;