1use std::cell::{Cell, RefCell};
22use std::collections::{BTreeMap, HashMap, HashSet};
23use std::future::Future;
24use std::path::{Path, PathBuf};
25use std::rc::Rc;
26use std::time::Instant;
27
28use crate::runtime_limits::RuntimeLimits;
29use crate::value::VmValue;
30use crate::workspace_anchor::{
31 MountMode, MountedRoot, WorkspaceAnchor, WorkspacePolicy, WORKSPACE_ANCHOR_METADATA_KEY,
32};
33
34pub const DEFAULT_SESSION_CAP: usize = RuntimeLimits::DEFAULT.max_agent_sessions;
37
38pub const DEFAULT_TRANSCRIPT_MESSAGE_CAP: usize = 4096;
42
43pub const DEFAULT_TRANSCRIPT_EVENT_CAP: usize = 32768;
46#[cfg(debug_assertions)]
47const CACHE_STABLE_SYSTEM_PROMPT_DIAGNOSTIC: &str = "HARN-CACHE-001";
48
49#[derive(Clone, Debug, PartialEq, Eq)]
50pub enum TranscriptBudgetRecovery {
51 Reject,
52 Trim { keep_last: usize },
53 Compact { keep_last: usize },
54}
55
56#[derive(Clone, Debug, PartialEq, Eq)]
57pub struct SessionTranscriptBudgetPolicy {
58 pub max_messages: usize,
59 pub max_events: usize,
60 pub max_approx_bytes: Option<usize>,
61 pub recovery: TranscriptBudgetRecovery,
62}
63
64impl SessionTranscriptBudgetPolicy {
65 pub fn reject(max_messages: usize, max_events: usize) -> Self {
66 Self {
67 max_messages: max_messages.max(1),
68 max_events: max_events.max(1),
69 max_approx_bytes: None,
70 recovery: TranscriptBudgetRecovery::Reject,
71 }
72 }
73
74 pub fn trim(max_messages: usize, max_events: usize, keep_last: usize) -> Self {
75 Self {
76 max_messages: max_messages.max(1),
77 max_events: max_events.max(1),
78 max_approx_bytes: None,
79 recovery: TranscriptBudgetRecovery::Trim { keep_last },
80 }
81 }
82
83 pub fn compact(max_messages: usize, max_events: usize, keep_last: usize) -> Self {
84 Self {
85 max_messages: max_messages.max(1),
86 max_events: max_events.max(1),
87 max_approx_bytes: None,
88 recovery: TranscriptBudgetRecovery::Compact { keep_last },
89 }
90 }
91
92 pub fn with_max_approx_bytes(mut self, max_approx_bytes: Option<usize>) -> Self {
93 self.max_approx_bytes = max_approx_bytes.map(|limit| limit.max(1));
94 self
95 }
96
97 fn normalized(&self) -> Self {
98 Self {
99 max_messages: self.max_messages.max(1),
100 max_events: self.max_events.max(1),
101 max_approx_bytes: self.max_approx_bytes.map(|limit| limit.max(1)),
102 recovery: self.recovery.clone(),
103 }
104 }
105}
106
107impl Default for SessionTranscriptBudgetPolicy {
108 fn default() -> Self {
109 Self::reject(DEFAULT_TRANSCRIPT_MESSAGE_CAP, DEFAULT_TRANSCRIPT_EVENT_CAP)
110 }
111}
112
113pub struct SessionState {
114 pub id: String,
115 pub transcript: VmValue,
116 pub subscribers: Vec<VmValue>,
117 pub created_at: String,
118 pub last_accessed: Instant,
119 pub parent_id: Option<String>,
120 pub child_ids: Vec<String>,
121 pub branched_at_event_index: Option<usize>,
122 pub active_skills: Vec<String>,
128 pub tool_format: Option<String>,
132 pub system_prompt: Option<String>,
136 pub pinned_model: Option<String>,
142 pub pinned_reasoning_policy: Option<String>,
148 pub workspace_policy: WorkspacePolicy,
151 pub workspace_anchor: Option<WorkspaceAnchor>,
157 pub transcript_budget_policy: SessionTranscriptBudgetPolicy,
158 pub last_transcript_budget_action: Option<serde_json::Value>,
159}
160
161impl SessionState {
162 fn new(id: String) -> Self {
163 let now = Instant::now();
164 let transcript = empty_transcript(&id);
165 Self {
166 id,
167 transcript,
168 subscribers: Vec::new(),
169 created_at: crate::orchestration::now_rfc3339(),
170 last_accessed: now,
171 parent_id: None,
172 child_ids: Vec::new(),
173 branched_at_event_index: None,
174 active_skills: Vec::new(),
175 tool_format: None,
176 system_prompt: None,
177 pinned_model: None,
178 pinned_reasoning_policy: None,
179 workspace_policy: WorkspacePolicy::default(),
180 workspace_anchor: None,
181 transcript_budget_policy: default_transcript_budget_policy(),
182 last_transcript_budget_action: None,
183 }
184 }
185}
186
187#[derive(Clone, Debug, PartialEq, Eq)]
188pub struct SessionAncestry {
189 pub parent_id: Option<String>,
190 pub child_ids: Vec<String>,
191 pub root_id: String,
192}
193
194#[derive(Clone, Debug, PartialEq, Eq)]
195pub struct SessionTruncateResult {
196 pub kept_turn_count: usize,
197 pub removed_turn_count: usize,
198 pub new_tip_turn_id: Option<String>,
199}
200
201#[derive(Clone, Debug, PartialEq, Eq)]
202pub struct ReminderInjectionReport {
203 pub reminder_id: String,
204 pub deduped_count: usize,
205}
206
207thread_local! {
208 static SESSIONS: RefCell<HashMap<String, SessionState>> = RefCell::new(HashMap::new());
209 static SESSION_CAP: Cell<usize> = const { Cell::new(DEFAULT_SESSION_CAP) };
210 static DEFAULT_TRANSCRIPT_BUDGET_POLICY: RefCell<SessionTranscriptBudgetPolicy> =
211 RefCell::new(SessionTranscriptBudgetPolicy::default());
212 static CURRENT_SESSION_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
213 static CURRENT_TOOL_CALL_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
214}
215
216tokio::task_local! {
217 static CURRENT_TOOL_CALL_TASK: String;
218}
219
220pub struct CurrentSessionGuard {
221 active: bool,
222}
223
224impl Drop for CurrentSessionGuard {
225 fn drop(&mut self) {
226 if self.active {
227 pop_current_session();
228 }
229 }
230}
231
232pub struct CurrentToolCallGuard {
238 active: bool,
239}
240
241impl Drop for CurrentToolCallGuard {
242 fn drop(&mut self) {
243 if self.active {
244 pop_current_tool_call();
245 }
246 }
247}
248
249pub fn set_session_cap(cap: usize) {
252 SESSION_CAP.with(|c| c.set(cap.max(1)));
253}
254
255pub fn session_cap() -> usize {
256 SESSION_CAP.with(|c| c.get())
257}
258
259pub fn set_default_transcript_budget_policy(policy: SessionTranscriptBudgetPolicy) {
260 DEFAULT_TRANSCRIPT_BUDGET_POLICY.with(|cell| {
261 *cell.borrow_mut() = policy.normalized();
262 });
263}
264
265pub fn reset_default_transcript_budget_policy() {
266 set_default_transcript_budget_policy(SessionTranscriptBudgetPolicy::default());
267}
268
269pub fn default_transcript_budget_policy() -> SessionTranscriptBudgetPolicy {
270 DEFAULT_TRANSCRIPT_BUDGET_POLICY.with(|cell| cell.borrow().clone())
271}
272
273pub fn transcript_budget_policy(id: &str) -> Option<SessionTranscriptBudgetPolicy> {
274 SESSIONS.with(|s| {
275 s.borrow()
276 .get(id)
277 .map(|state| state.transcript_budget_policy.clone())
278 })
279}
280
281pub fn set_transcript_budget_policy(
282 id: &str,
283 policy: SessionTranscriptBudgetPolicy,
284) -> Result<(), String> {
285 SESSIONS.with(|s| {
286 let mut map = s.borrow_mut();
287 let Some(state) = map.get_mut(id) else {
288 return Err(format!("agent session '{id}' does not exist"));
289 };
290 let previous = state.transcript_budget_policy.clone();
291 let previous_action = state.last_transcript_budget_action.clone();
292 state.transcript_budget_policy = policy.normalized();
293 let candidate = state.transcript.clone();
294 if let Err(error) = apply_transcript_with_budget(state, candidate, "policy_update") {
295 state.transcript_budget_policy = previous;
296 state.last_transcript_budget_action = previous_action;
297 return Err(error);
298 }
299 state.last_accessed = Instant::now();
300 Ok(())
301 })
302}
303
304pub fn reset_session_store() {
306 SESSIONS.with(|s| s.borrow_mut().clear());
307 CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().clear());
308 CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow_mut().clear());
309 reset_default_transcript_budget_policy();
310}
311
312pub(crate) fn push_current_session(id: String) {
313 if id.is_empty() {
314 return;
315 }
316 CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().push(id));
317}
318
319pub(crate) fn pop_current_session() {
320 CURRENT_SESSION_STACK.with(|stack| {
321 let _ = stack.borrow_mut().pop();
322 });
323}
324
325pub fn current_session_id() -> Option<String> {
326 CURRENT_SESSION_STACK.with(|stack| stack.borrow().last().cloned())
327}
328
329pub fn enter_current_session(id: impl Into<String>) -> CurrentSessionGuard {
330 let id = id.into();
331 if id.trim().is_empty() {
332 return CurrentSessionGuard { active: false };
333 }
334 push_current_session(id);
335 CurrentSessionGuard { active: true }
336}
337
338fn push_current_tool_call(id: String) {
339 if id.is_empty() {
340 return;
341 }
342 CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow_mut().push(id));
343}
344
345fn pop_current_tool_call() {
346 CURRENT_TOOL_CALL_STACK.with(|stack| {
347 let _ = stack.borrow_mut().pop();
348 });
349}
350
351pub fn current_tool_call_id() -> Option<String> {
356 if let Ok(id) = CURRENT_TOOL_CALL_TASK.try_with(Clone::clone) {
357 if !id.trim().is_empty() {
358 return Some(id);
359 }
360 }
361 CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow().last().cloned())
362}
363
364pub async fn scope_current_tool_call<F, T>(id: impl Into<String>, future: F) -> T
370where
371 F: Future<Output = T>,
372{
373 let id = id.into();
374 if id.trim().is_empty() {
375 future.await
376 } else {
377 CURRENT_TOOL_CALL_TASK.scope(id, future).await
378 }
379}
380
381pub fn enter_current_tool_call(id: impl Into<String>) -> CurrentToolCallGuard {
383 let id = id.into();
384 if id.trim().is_empty() {
385 return CurrentToolCallGuard { active: false };
386 }
387 push_current_tool_call(id);
388 CurrentToolCallGuard { active: true }
389}
390
391pub fn exists(id: &str) -> bool {
392 SESSIONS.with(|s| s.borrow().contains_key(id))
393}
394
395pub fn length(id: &str) -> Option<usize> {
396 SESSIONS.with(|s| {
397 s.borrow().get(id).map(|state| {
398 state
399 .transcript
400 .as_dict()
401 .and_then(|d| d.get("messages"))
402 .and_then(|v| match v {
403 VmValue::List(list) => Some(list.len()),
404 _ => None,
405 })
406 .unwrap_or(0)
407 })
408 })
409}
410
411pub fn snapshot(id: &str) -> Option<VmValue> {
412 SESSIONS.with(|s| s.borrow().get(id).map(session_snapshot))
413}
414
415pub fn transcript(id: &str) -> Option<VmValue> {
418 SESSIONS.with(|s| {
419 s.borrow()
420 .get(id)
421 .map(|state| transcript_with_session_metadata(state.transcript.clone(), state))
422 })
423}
424
425pub fn open_or_create(id: Option<String>) -> String {
434 let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
435 let parent_session = current_session_id();
436 let mut was_new = false;
437 SESSIONS.with(|s| {
438 let mut map = s.borrow_mut();
439 if let Some(state) = map.get_mut(&resolved) {
440 state.last_accessed = Instant::now();
441 return;
442 }
443 was_new = true;
444 let cap = SESSION_CAP.with(|c| c.get());
445 if map.len() >= cap {
446 if let Some(victim) = map
447 .iter()
448 .min_by_key(|(_, state)| state.last_accessed)
449 .map(|(id, _)| id.clone())
450 {
451 map.remove(&victim);
452 }
453 }
454 map.insert(resolved.clone(), SessionState::new(resolved.clone()));
455 });
456 if was_new {
457 if let Some(parent) = parent_session.as_deref() {
458 crate::agent_events::mirror_session_sinks(parent, &resolved);
459 }
460 try_register_event_log(&resolved);
461 }
462 resolved
463}
464
465pub fn open_child_session(parent_id: &str, id: Option<String>) -> String {
466 let resolved = open_or_create(id);
467 link_child_session(parent_id, &resolved);
468 resolved
469}
470
471pub fn link_child_session(parent_id: &str, child_id: &str) {
472 link_child_session_with_branch(parent_id, child_id, None);
473}
474
475pub fn link_child_session_with_branch(
476 parent_id: &str,
477 child_id: &str,
478 branched_at_event_index: Option<usize>,
479) {
480 if parent_id == child_id {
481 return;
482 }
483 open_or_create(Some(parent_id.to_string()));
484 open_or_create(Some(child_id.to_string()));
485 SESSIONS.with(|s| {
486 let mut map = s.borrow_mut();
487 update_lineage(&mut map, parent_id, child_id, branched_at_event_index);
488 });
489}
490
491pub fn parent_id(id: &str) -> Option<String> {
492 SESSIONS.with(|s| s.borrow().get(id).and_then(|state| state.parent_id.clone()))
493}
494
495pub fn child_ids(id: &str) -> Vec<String> {
496 SESSIONS.with(|s| {
497 s.borrow()
498 .get(id)
499 .map(|state| state.child_ids.clone())
500 .unwrap_or_default()
501 })
502}
503
504pub fn ancestry(id: &str) -> Option<SessionAncestry> {
505 SESSIONS.with(|s| {
506 let map = s.borrow();
507 let state = map.get(id)?;
508 let mut root_id = state.id.clone();
509 let mut cursor = state.parent_id.clone();
510 let mut seen = HashSet::from([state.id.clone()]);
511 while let Some(parent_id) = cursor {
512 if !seen.insert(parent_id.clone()) {
513 break;
514 }
515 root_id = parent_id.clone();
516 cursor = map
517 .get(&parent_id)
518 .and_then(|parent| parent.parent_id.clone());
519 }
520 Some(SessionAncestry {
521 parent_id: state.parent_id.clone(),
522 child_ids: state.child_ids.clone(),
523 root_id,
524 })
525 })
526}
527
528fn try_register_event_log(session_id: &str) {
532 if let Some(log) = crate::event_log::active_event_log() {
533 crate::agent_events::register_sink(
534 session_id,
535 crate::agent_events::EventLogSink::new(log, session_id),
536 );
537 return;
538 }
539 let Ok(dir) = std::env::var("HARN_EVENT_LOG_DIR") else {
540 return;
541 };
542 if dir.is_empty() {
543 return;
544 }
545 let path = std::path::PathBuf::from(dir).join(format!("event_log-{session_id}.jsonl"));
546 if let Ok(sink) = crate::agent_events::JsonlEventSink::open(&path) {
547 crate::agent_events::register_sink(session_id, sink);
548 }
549}
550
551pub fn register_event_log_sink(session_id: &str) {
552 try_register_event_log(session_id);
553}
554
555pub fn close(id: &str) {
556 SESSIONS.with(|s| {
557 s.borrow_mut().remove(id);
558 });
559 crate::orchestration::agent_inbox::clear_session(id);
563 crate::agent_events::clear_session_sinks(id);
564}
565
566pub fn close_with_status(
567 id: &str,
568 reason: impl Into<String>,
569 status: impl Into<String>,
570 metadata: serde_json::Value,
571) -> bool {
572 if !exists(id) {
573 return false;
574 }
575 let reason = reason.into();
576 let status = status.into();
577 let event_metadata = serde_json::json!({
578 "reason": reason,
579 "status": status,
580 "metadata": metadata,
581 });
582 let transcript_event = crate::llm::helpers::transcript_event(
583 "agent_session_closed",
584 "system",
585 "internal",
586 "Agent session closed",
587 Some(event_metadata.clone()),
588 );
589 let _ = append_event(id, transcript_event);
590 crate::llm::emit_live_agent_event_sync(&crate::agent_events::AgentEvent::SessionClosed {
591 session_id: id.to_string(),
592 reason,
593 status,
594 metadata,
595 });
596 close(id);
597 true
598}
599
600pub fn reset_transcript(id: &str) -> bool {
601 SESSIONS.with(|s| {
602 let mut map = s.borrow_mut();
603 let Some(state) = map.get_mut(id) else {
604 return false;
605 };
606 state.transcript = empty_transcript(id);
607 state.tool_format = None;
608 state.system_prompt = None;
609 state.last_transcript_budget_action = None;
610 state.last_accessed = Instant::now();
611 true
612 })
613}
614
615pub fn fork(src_id: &str, dst_id: Option<String>) -> Option<String> {
622 let (
623 src_transcript,
624 src_tool_format,
625 src_system_prompt,
626 src_pinned_model,
627 src_pinned_reasoning_policy,
628 src_workspace_anchor,
629 src_workspace_policy,
630 src_transcript_budget_policy,
631 src_last_transcript_budget_action,
632 dst,
633 ) = SESSIONS.with(|s| {
634 let mut map = s.borrow_mut();
635 let src = map.get_mut(src_id)?;
636 src.last_accessed = Instant::now();
637 let dst = dst_id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
638 let forked_transcript = clone_transcript_with_id(&src.transcript, &dst);
639 Some((
640 forked_transcript,
641 src.tool_format.clone(),
642 src.system_prompt.clone(),
643 src.pinned_model.clone(),
644 src.pinned_reasoning_policy.clone(),
645 src.workspace_anchor.clone(),
646 src.workspace_policy.clone(),
647 src.transcript_budget_policy.clone(),
648 src.last_transcript_budget_action.clone(),
649 dst,
650 ))
651 })?;
652 open_or_create(Some(dst.clone()));
654 SESSIONS.with(|s| {
655 let mut map = s.borrow_mut();
656 if let Some(state) = map.get_mut(&dst) {
657 state.transcript = src_transcript;
658 state.tool_format = src_tool_format;
659 state.system_prompt = src_system_prompt;
660 state.pinned_model = src_pinned_model;
661 state.pinned_reasoning_policy = src_pinned_reasoning_policy;
662 state.workspace_anchor = src_workspace_anchor;
663 state.workspace_policy = src_workspace_policy;
664 state.transcript_budget_policy = src_transcript_budget_policy;
665 state.last_transcript_budget_action = src_last_transcript_budget_action;
666 state.last_accessed = Instant::now();
667 }
668 update_lineage(&mut map, src_id, &dst, None);
669 });
670 let budget_ok = SESSIONS.with(|s| {
671 let mut map = s.borrow_mut();
672 let Some(state) = map.get_mut(&dst) else {
673 return false;
674 };
675 let candidate = state.transcript.clone();
676 apply_transcript_with_budget(state, candidate, "fork").is_ok()
677 });
678 if !budget_ok {
679 close(&dst);
680 return None;
681 }
682 if exists(&dst) {
686 Some(dst)
687 } else {
688 None
689 }
690}
691
692pub fn fork_at(src_id: &str, keep_first: usize, dst_id: Option<String>) -> Option<String> {
703 let branched_at_event_index = SESSIONS.with(|s| {
704 let map = s.borrow();
705 let src = map.get(src_id)?;
706 Some(branch_event_index(&src.transcript, keep_first))
707 })?;
708 let new_id = fork(src_id, dst_id)?;
709 link_child_session_with_branch(src_id, &new_id, Some(branched_at_event_index));
710 let _ = truncate(&new_id, keep_first);
711 Some(new_id)
712}
713
714pub fn truncate(id: &str, keep_first: usize) -> Option<SessionTruncateResult> {
718 SESSIONS.with(|s| {
719 let mut map = s.borrow_mut();
720 let state = map.get_mut(id)?;
721 let result = truncate_state(state, keep_first)?;
722 Some(result)
723 })
724}
725
726fn truncate_state(state: &mut SessionState, keep_first: usize) -> Option<SessionTruncateResult> {
727 let dict = state
728 .transcript
729 .as_dict()
730 .cloned()
731 .unwrap_or_else(BTreeMap::new);
732 let messages: Vec<VmValue> = match dict.get("messages") {
733 Some(VmValue::List(list)) => list.iter().cloned().collect(),
734 _ => Vec::new(),
735 };
736 let existing_events = match dict.get("events") {
737 Some(VmValue::List(list)) => Some(list.iter().cloned().collect::<Vec<_>>()),
738 _ => None,
739 };
740 let kept_turn_count = keep_first.min(messages.len());
741 let removed_turn_count = messages.len().saturating_sub(kept_turn_count);
742 let mut new_tip_turn_id = existing_events
743 .as_ref()
744 .map(|events| turn_event_id_for_count(events, kept_turn_count))
745 .unwrap_or_else(|| {
746 let events = crate::llm::helpers::transcript_events_from_messages(&messages);
747 turn_event_id_for_count(&events, kept_turn_count)
748 });
749
750 if removed_turn_count > 0 {
751 let retained: Vec<VmValue> = messages.into_iter().take(kept_turn_count).collect();
752 let retained_events = match existing_events {
753 Some(events) => {
754 let keep_event_count = event_prefix_len_for_messages(&events, kept_turn_count);
755 events.into_iter().take(keep_event_count).collect()
756 }
757 None => crate::llm::helpers::transcript_events_from_messages(&retained),
758 };
759 new_tip_turn_id = turn_event_id_for_count(&retained_events, kept_turn_count);
760 let mut next = dict;
761 next.insert(
762 "events".to_string(),
763 VmValue::List(Rc::new(retained_events)),
764 );
765 next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
766 next.remove("summary");
767 apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), "truncate").ok()?;
768 }
769 state.last_accessed = Instant::now();
770 Some(SessionTruncateResult {
771 kept_turn_count,
772 removed_turn_count,
773 new_tip_turn_id,
774 })
775}
776
777pub fn pop_last_if_assistant(id: &str) -> Result<bool, String> {
784 SESSIONS.with(|s| {
785 let mut map = s.borrow_mut();
786 let Some(state) = map.get_mut(id) else {
787 return Err(format!(
788 "pop_last_if_assistant: unknown session id '{id}'"
789 ));
790 };
791 let messages: Vec<VmValue> = match state.transcript.as_dict() {
792 Some(dict) => match dict.get("messages") {
793 Some(VmValue::List(list)) => list.iter().cloned().collect(),
794 _ => Vec::new(),
795 },
796 None => Vec::new(),
797 };
798 if messages.is_empty() {
799 return Ok(false);
800 }
801 let trailing_role = messages
802 .last()
803 .and_then(|m| m.as_dict())
804 .and_then(|d| d.get("role"))
805 .map(|v| v.display())
806 .unwrap_or_default();
807 if trailing_role != "assistant" {
808 return Err(format!(
809 "pop_last_if_assistant: trailing message role is '{trailing_role}', expected 'assistant'"
810 ));
811 }
812 let keep = messages.len() - 1;
813 truncate_state(state, keep);
814 Ok(true)
815 })
816}
817
818pub fn trim(id: &str, keep_last: usize) -> Option<usize> {
821 SESSIONS.with(|s| {
822 let mut map = s.borrow_mut();
823 let state = map.get_mut(id)?;
824 let dict = state.transcript.as_dict()?.clone();
825 let messages: Vec<VmValue> = match dict.get("messages") {
826 Some(VmValue::List(list)) => list.iter().cloned().collect(),
827 _ => Vec::new(),
828 };
829 let start = messages.len().saturating_sub(keep_last);
830 let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
831 let kept = retained.len();
832 let mut next = dict;
833 next.insert(
834 "events".to_string(),
835 VmValue::List(Rc::new(
836 crate::llm::helpers::transcript_events_from_messages(&retained),
837 )),
838 );
839 next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
840 apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), "trim").ok()?;
841 state.last_accessed = Instant::now();
842 Some(kept)
843 })
844}
845
846pub fn inject_message(id: &str, message: VmValue) -> Result<(), String> {
849 let Some(msg_dict) = message.as_dict().cloned() else {
850 return Err("agent_session_inject: message must be a dict".into());
851 };
852 let role_ok = matches!(msg_dict.get("role"), Some(VmValue::String(_)));
853 if !role_ok {
854 return Err(
855 "agent_session_inject: message must have a string `role` (user|assistant|tool_result|system)"
856 .into(),
857 );
858 }
859 SESSIONS.with(|s| {
860 let mut map = s.borrow_mut();
861 let Some(state) = map.get_mut(id) else {
862 return Err(format!("agent_session_inject: unknown session id '{id}'"));
863 };
864 let dict = state
865 .transcript
866 .as_dict()
867 .cloned()
868 .unwrap_or_else(BTreeMap::new);
869 let mut messages: Vec<VmValue> = match dict.get("messages") {
870 Some(VmValue::List(list)) => list.iter().cloned().collect(),
871 _ => Vec::new(),
872 };
873 let mut events: Vec<VmValue> = match dict.get("events") {
874 Some(VmValue::List(list)) => list.iter().cloned().collect(),
875 _ => crate::llm::helpers::transcript_events_from_messages(&messages),
876 };
877 let new_message = VmValue::Dict(Rc::new(msg_dict));
878 let message_index = messages.len();
879 events.push(crate::llm::helpers::transcript_event_from_message(
880 &new_message,
881 ));
882 messages.push(new_message);
883 let mut next = dict;
884 next.insert("events".to_string(), VmValue::List(Rc::new(events)));
885 next.insert("messages".to_string(), VmValue::List(Rc::new(messages)));
886 let persisted_message = next
887 .get("messages")
888 .and_then(|value| match value {
889 VmValue::List(list) => list.get(message_index).cloned(),
890 _ => None,
891 })
892 .unwrap_or(VmValue::Nil);
893 apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), "inject_message")?;
894 emit_identified_user_message_event(id, &persisted_message);
895 emit_llm_message_event(id, message_index, &persisted_message);
896 state.last_accessed = Instant::now();
897 Ok(())
898 })
899}
900
901fn emit_identified_user_message_event(session_id: &str, message: &VmValue) {
902 let message_json = crate::llm::helpers::vm_value_to_json(message);
903 let role = message_json.get("role").and_then(|value| value.as_str());
904 if role != Some("user") {
905 return;
906 }
907 let Some(message_id) = message_json
908 .get("messageId")
909 .or_else(|| message_json.get("message_id"))
910 .and_then(|value| value.as_str())
911 .filter(|value| !value.trim().is_empty())
912 else {
913 return;
914 };
915 let content = message_json
916 .get("content")
917 .map(user_message_content_blocks)
918 .unwrap_or_default();
919 crate::agent_events::emit_event(&crate::agent_events::AgentEvent::UserMessage {
920 session_id: session_id.to_string(),
921 message_id: message_id.to_string(),
922 content,
923 });
924}
925
926fn user_message_content_blocks(content: &serde_json::Value) -> Vec<serde_json::Value> {
927 match content {
928 serde_json::Value::Array(items) => items.clone(),
929 serde_json::Value::String(text) => vec![serde_json::json!({
930 "type": "text",
931 "text": text,
932 })],
933 other => vec![serde_json::json!({
934 "type": "text",
935 "text": other.to_string(),
936 })],
937 }
938}
939
940#[derive(Clone, Debug, PartialEq, Eq)]
941struct TranscriptBudgetUsage {
942 message_count: usize,
943 event_count: usize,
944 approx_bytes: Option<usize>,
945}
946
947fn transcript_messages_from_dict(dict: &BTreeMap<String, VmValue>) -> Vec<VmValue> {
948 match dict.get("messages") {
949 Some(VmValue::List(list)) => list.iter().cloned().collect(),
950 _ => Vec::new(),
951 }
952}
953
954fn transcript_events_from_dict(dict: &BTreeMap<String, VmValue>) -> Vec<VmValue> {
955 match dict.get("events") {
956 Some(VmValue::List(list)) => list.iter().cloned().collect(),
957 _ => {
958 let messages = transcript_messages_from_dict(dict);
959 crate::llm::helpers::transcript_events_from_messages(&messages)
960 }
961 }
962}
963
964fn transcript_usage(transcript: &VmValue, include_bytes: bool) -> TranscriptBudgetUsage {
965 let Some(dict) = transcript.as_dict() else {
966 return TranscriptBudgetUsage {
967 message_count: 0,
968 event_count: 0,
969 approx_bytes: include_bytes.then_some(0),
970 };
971 };
972 let approx_bytes = if include_bytes {
973 serde_json::to_vec(&crate::llm::helpers::vm_value_to_json(transcript))
974 .map(|bytes| bytes.len())
975 .ok()
976 .or(Some(usize::MAX))
977 } else {
978 None
979 };
980 TranscriptBudgetUsage {
981 message_count: transcript_messages_from_dict(dict).len(),
982 event_count: transcript_events_from_dict(dict).len(),
983 approx_bytes,
984 }
985}
986
987fn transcript_budget_exceeded_reason(
988 usage: &TranscriptBudgetUsage,
989 policy: &SessionTranscriptBudgetPolicy,
990) -> Option<&'static str> {
991 if usage.message_count > policy.max_messages {
992 return Some("message_count");
993 }
994 if usage.event_count > policy.max_events {
995 return Some("event_count");
996 }
997 if let (Some(bytes), Some(limit)) = (usage.approx_bytes, policy.max_approx_bytes) {
998 if bytes > limit {
999 return Some("approx_bytes");
1000 }
1001 }
1002 None
1003}
1004
1005fn transcript_budget_usage_json(usage: &TranscriptBudgetUsage) -> serde_json::Value {
1006 serde_json::json!({
1007 "messages": usage.message_count,
1008 "events": usage.event_count,
1009 "approx_bytes": usage.approx_bytes,
1010 })
1011}
1012
1013fn transcript_budget_policy_json(policy: &SessionTranscriptBudgetPolicy) -> serde_json::Value {
1014 let recovery = match &policy.recovery {
1015 TranscriptBudgetRecovery::Reject => serde_json::json!({"action": "reject"}),
1016 TranscriptBudgetRecovery::Trim { keep_last } => {
1017 serde_json::json!({"action": "trim", "keep_last": keep_last})
1018 }
1019 TranscriptBudgetRecovery::Compact { keep_last } => {
1020 serde_json::json!({"action": "compact", "keep_last": keep_last})
1021 }
1022 };
1023 serde_json::json!({
1024 "max_messages": policy.max_messages,
1025 "max_events": policy.max_events,
1026 "max_approx_bytes": policy.max_approx_bytes,
1027 "recovery": recovery,
1028 })
1029}
1030
1031fn transcript_budget_recovery_name(recovery: &TranscriptBudgetRecovery) -> &'static str {
1032 match recovery {
1033 TranscriptBudgetRecovery::Reject => "reject",
1034 TranscriptBudgetRecovery::Trim { .. } => "trim",
1035 TranscriptBudgetRecovery::Compact { .. } => "compact",
1036 }
1037}
1038
1039fn transcript_budget_error(
1040 state: &SessionState,
1041 policy: &SessionTranscriptBudgetPolicy,
1042 usage: &TranscriptBudgetUsage,
1043 reason: &str,
1044) -> String {
1045 let byte_suffix = match (usage.approx_bytes, policy.max_approx_bytes) {
1046 (Some(bytes), Some(limit)) => format!(", approx_bytes {bytes}/{limit}"),
1047 _ => String::new(),
1048 };
1049 format!(
1050 "transcript budget exceeded for session '{}': {reason} (messages {}/{}, events {}/{}{}; recovery={})",
1051 state.id,
1052 usage.message_count,
1053 policy.max_messages,
1054 usage.event_count,
1055 policy.max_events,
1056 byte_suffix,
1057 transcript_budget_recovery_name(&policy.recovery),
1058 )
1059}
1060
1061fn transcript_budget_audit_json(
1062 action: &str,
1063 source: &str,
1064 reason: &str,
1065 policy: &SessionTranscriptBudgetPolicy,
1066 usage_before: &TranscriptBudgetUsage,
1067 usage_attempted: &TranscriptBudgetUsage,
1068 usage_after: &TranscriptBudgetUsage,
1069) -> serde_json::Value {
1070 serde_json::json!({
1071 "action": action,
1072 "source": source,
1073 "reason": reason,
1074 "policy": transcript_budget_policy_json(policy),
1075 "usage_before": transcript_budget_usage_json(usage_before),
1076 "usage_attempted": transcript_budget_usage_json(usage_attempted),
1077 "usage_after": transcript_budget_usage_json(usage_after),
1078 "removed_messages": usage_attempted.message_count.saturating_sub(usage_after.message_count),
1079 "removed_events": usage_attempted.event_count.saturating_sub(usage_after.event_count),
1080 })
1081}
1082
1083fn transcript_budget_event(audit: &serde_json::Value) -> VmValue {
1084 let action = audit
1085 .get("action")
1086 .and_then(serde_json::Value::as_str)
1087 .unwrap_or("enforced");
1088 crate::llm::helpers::transcript_event(
1089 "transcript_budget",
1090 "system",
1091 "internal",
1092 &format!("Transcript budget {action}."),
1093 Some(audit.clone()),
1094 )
1095}
1096
1097fn append_event_to_transcript(transcript: VmValue, event: VmValue) -> VmValue {
1098 let Some(dict) = transcript.as_dict() else {
1099 return transcript;
1100 };
1101 let mut next = dict.clone();
1102 let mut events = transcript_events_from_dict(&next);
1103 events.push(event);
1104 next.insert("events".to_string(), VmValue::List(Rc::new(events)));
1105 VmValue::Dict(Rc::new(next))
1106}
1107
1108fn summary_message_vm(summary: &str) -> VmValue {
1109 crate::stdlib::json_to_vm_value(&summary_message_json(summary))
1110}
1111
1112fn tail_message_capacity(
1113 policy: &SessionTranscriptBudgetPolicy,
1114 reserve_audit_event: bool,
1115) -> usize {
1116 let event_capacity = if reserve_audit_event {
1117 policy.max_events.saturating_sub(1)
1118 } else {
1119 policy.max_events
1120 };
1121 policy.max_messages.min(event_capacity)
1122}
1123
1124fn trim_transcript_for_budget(
1125 transcript: &VmValue,
1126 policy: &SessionTranscriptBudgetPolicy,
1127 keep_last: usize,
1128) -> VmValue {
1129 let dict = transcript.as_dict().cloned().unwrap_or_else(BTreeMap::new);
1130 let messages = transcript_messages_from_dict(&dict);
1131 let keep = keep_last.min(tail_message_capacity(policy, true));
1132 let start = messages.len().saturating_sub(keep);
1133 let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
1134 let mut next = dict;
1135 next.insert(
1136 "events".to_string(),
1137 VmValue::List(Rc::new(
1138 crate::llm::helpers::transcript_events_from_messages(&retained),
1139 )),
1140 );
1141 next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
1142 next.remove("summary");
1143 VmValue::Dict(Rc::new(next))
1144}
1145
1146fn compact_transcript_for_budget(
1147 transcript: &VmValue,
1148 policy: &SessionTranscriptBudgetPolicy,
1149 keep_last: usize,
1150 reason: &str,
1151) -> VmValue {
1152 let dict = transcript.as_dict().cloned().unwrap_or_else(BTreeMap::new);
1153 let messages = transcript_messages_from_dict(&dict);
1154 let message_capacity = tail_message_capacity(policy, true);
1155 let mut retained = Vec::new();
1156 let mut summary = None;
1157
1158 if messages.len() > message_capacity {
1159 if message_capacity > 0 {
1160 let tail_keep = keep_last.min(message_capacity.saturating_sub(1));
1161 let archived = messages.len().saturating_sub(tail_keep);
1162 let summary_text = format!(
1163 "[auto-compacted {archived} older message(s) under transcript budget]\nSession transcript exceeded the {reason} budget; retained the most recent {tail_keep} message(s)."
1164 );
1165 retained.push(summary_message_vm(&summary_text));
1166 retained.extend(messages.into_iter().skip(archived).take(tail_keep));
1167 summary = Some(summary_text);
1168 }
1169 } else {
1170 retained = messages;
1171 }
1172
1173 let mut next = dict;
1174 next.insert(
1175 "events".to_string(),
1176 VmValue::List(Rc::new(
1177 crate::llm::helpers::transcript_events_from_messages(&retained),
1178 )),
1179 );
1180 next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
1181 if let Some(summary) = summary {
1182 next.insert("summary".to_string(), VmValue::String(Rc::from(summary)));
1183 } else {
1184 next.remove("summary");
1185 }
1186 VmValue::Dict(Rc::new(next))
1187}
1188
1189fn recovered_transcript_with_audit(
1190 recovered: VmValue,
1191 action: &str,
1192 source: &str,
1193 reason: &str,
1194 policy: &SessionTranscriptBudgetPolicy,
1195 usage_before: &TranscriptBudgetUsage,
1196 usage_attempted: &TranscriptBudgetUsage,
1197 include_bytes: bool,
1198) -> (VmValue, serde_json::Value, TranscriptBudgetUsage) {
1199 let usage_after_without_audit = transcript_usage(&recovered, include_bytes);
1200 let initial_audit = transcript_budget_audit_json(
1201 action,
1202 source,
1203 reason,
1204 policy,
1205 usage_before,
1206 usage_attempted,
1207 &usage_after_without_audit,
1208 );
1209 let with_initial_audit =
1210 append_event_to_transcript(recovered.clone(), transcript_budget_event(&initial_audit));
1211 let usage_after = transcript_usage(&with_initial_audit, include_bytes);
1212 let audit = transcript_budget_audit_json(
1213 action,
1214 source,
1215 reason,
1216 policy,
1217 usage_before,
1218 usage_attempted,
1219 &usage_after,
1220 );
1221 let with_audit = append_event_to_transcript(recovered, transcript_budget_event(&audit));
1222 let usage_after = transcript_usage(&with_audit, include_bytes);
1223 (with_audit, audit, usage_after)
1224}
1225
1226fn apply_transcript_with_budget(
1227 state: &mut SessionState,
1228 candidate: VmValue,
1229 source: &str,
1230) -> Result<(), String> {
1231 let policy = state.transcript_budget_policy.normalized();
1232 let include_bytes = policy.max_approx_bytes.is_some();
1233 let usage_before = transcript_usage(&state.transcript, include_bytes);
1234 let usage_attempted = transcript_usage(&candidate, include_bytes);
1235 let Some(reason) = transcript_budget_exceeded_reason(&usage_attempted, &policy) else {
1236 state.transcript = candidate;
1237 return Ok(());
1238 };
1239
1240 match policy.recovery.clone() {
1241 TranscriptBudgetRecovery::Reject => {
1242 let audit = transcript_budget_audit_json(
1243 "rejected",
1244 source,
1245 reason,
1246 &policy,
1247 &usage_before,
1248 &usage_attempted,
1249 &usage_before,
1250 );
1251 state.last_transcript_budget_action = Some(audit);
1252 Err(transcript_budget_error(
1253 state,
1254 &policy,
1255 &usage_attempted,
1256 reason,
1257 ))
1258 }
1259 TranscriptBudgetRecovery::Trim { keep_last } => {
1260 let recovered = trim_transcript_for_budget(&candidate, &policy, keep_last);
1261 let (with_audit, audit, usage_after) = recovered_transcript_with_audit(
1262 recovered,
1263 "trimmed",
1264 source,
1265 reason,
1266 &policy,
1267 &usage_before,
1268 &usage_attempted,
1269 include_bytes,
1270 );
1271 if transcript_budget_exceeded_reason(&usage_after, &policy).is_some() {
1272 let rejected = transcript_budget_audit_json(
1273 "rejected",
1274 source,
1275 reason,
1276 &policy,
1277 &usage_before,
1278 &usage_attempted,
1279 &usage_after,
1280 );
1281 state.last_transcript_budget_action = Some(rejected);
1282 return Err(transcript_budget_error(
1283 state,
1284 &policy,
1285 &usage_after,
1286 reason,
1287 ));
1288 }
1289 state.last_transcript_budget_action = Some(audit);
1290 state.transcript = with_audit;
1291 Ok(())
1292 }
1293 TranscriptBudgetRecovery::Compact { keep_last } => {
1294 let recovered = compact_transcript_for_budget(&candidate, &policy, keep_last, reason);
1295 let (with_audit, audit, usage_after) = recovered_transcript_with_audit(
1296 recovered,
1297 "compacted",
1298 source,
1299 reason,
1300 &policy,
1301 &usage_before,
1302 &usage_attempted,
1303 include_bytes,
1304 );
1305 if transcript_budget_exceeded_reason(&usage_after, &policy).is_some() {
1306 let rejected = transcript_budget_audit_json(
1307 "rejected",
1308 source,
1309 reason,
1310 &policy,
1311 &usage_before,
1312 &usage_attempted,
1313 &usage_after,
1314 );
1315 state.last_transcript_budget_action = Some(rejected);
1316 return Err(transcript_budget_error(
1317 state,
1318 &policy,
1319 &usage_after,
1320 reason,
1321 ));
1322 }
1323 state.last_transcript_budget_action = Some(audit);
1324 state.transcript = with_audit;
1325 Ok(())
1326 }
1327 }
1328}
1329
1330fn emit_llm_message_event(session_id: &str, message_index: usize, message: &VmValue) {
1331 let mut fields = serde_json::Map::new();
1332 fields.insert(
1333 "session_id".to_string(),
1334 serde_json::Value::String(session_id.to_string()),
1335 );
1336 fields.insert(
1337 "message_index".to_string(),
1338 serde_json::json!(message_index),
1339 );
1340 let message_json = crate::llm::helpers::vm_value_to_json(message);
1341 if let Some(role) = message_json.get("role").and_then(|value| value.as_str()) {
1342 fields.insert(
1343 "role".to_string(),
1344 serde_json::Value::String(role.to_string()),
1345 );
1346 }
1347 if let Some(content) = message_json.get("content") {
1348 fields.insert("content".to_string(), content.clone());
1349 }
1350 fields.insert("message".to_string(), message_json);
1351 crate::llm::append_observability_sidecar_entry("message", fields);
1352}
1353
1354pub fn seed_from_messages(
1360 id: Option<String>,
1361 messages: &[serde_json::Value],
1362 metadata: serde_json::Value,
1363 system_prompt: Option<String>,
1364 tool_format: Option<String>,
1365) -> Result<String, String> {
1366 let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
1367 if exists(&resolved) {
1368 return Err(format!("agent session '{resolved}' already exists"));
1369 }
1370 open_or_create(Some(resolved.clone()));
1371 SESSIONS.with(|s| {
1372 let mut map = s.borrow_mut();
1373 let Some(state) = map.get_mut(&resolved) else {
1374 return Err(format!("failed to create agent session '{resolved}'"));
1375 };
1376 state.tool_format = tool_format.filter(|value| !value.trim().is_empty());
1377 state.system_prompt = system_prompt.filter(|value| !value.trim().is_empty());
1378
1379 let mut metadata = metadata
1380 .as_object()
1381 .cloned()
1382 .unwrap_or_else(serde_json::Map::new);
1383 if let Some(tool_format) = state.tool_format.as_ref() {
1384 metadata.insert(
1385 "tool_format".to_string(),
1386 serde_json::Value::String(tool_format.clone()),
1387 );
1388 metadata.insert(
1389 "tool_mode_locked".to_string(),
1390 serde_json::Value::Bool(true),
1391 );
1392 }
1393 if let Some(system_prompt) = state.system_prompt.as_ref() {
1394 metadata.insert(
1395 "system_prompt".to_string(),
1396 crate::llm::helpers::system_prompt_metadata(system_prompt),
1397 );
1398 }
1399 let vm_messages = crate::llm::helpers::json_messages_to_vm(messages);
1400 let candidate = crate::llm::helpers::new_transcript_with(
1401 Some(resolved.clone()),
1402 vm_messages,
1403 None,
1404 Some(crate::stdlib::json_to_vm_value(&serde_json::Value::Object(
1405 metadata,
1406 ))),
1407 );
1408 apply_transcript_with_budget(state, candidate, "seed_from_messages")?;
1409 state.last_accessed = Instant::now();
1410 Ok(resolved)
1411 })
1412}
1413
1414pub fn messages_json(id: &str) -> Vec<serde_json::Value> {
1418 SESSIONS.with(|s| {
1419 let map = s.borrow();
1420 let Some(state) = map.get(id) else {
1421 return Vec::new();
1422 };
1423 let Some(dict) = state.transcript.as_dict() else {
1424 return Vec::new();
1425 };
1426 match dict.get("messages") {
1427 Some(VmValue::List(list)) => list
1428 .iter()
1429 .map(crate::llm::helpers::vm_value_to_json)
1430 .collect(),
1431 _ => Vec::new(),
1432 }
1433 })
1434}
1435
1436#[derive(Clone, Debug, Default)]
1437pub struct SessionPromptState {
1438 pub messages: Vec<serde_json::Value>,
1439 pub summary: Option<String>,
1440}
1441
1442fn summary_message_json(summary: &str) -> serde_json::Value {
1443 serde_json::json!({
1444 "role": "user",
1445 "content": summary,
1446 })
1447}
1448
1449fn messages_begin_with_summary(messages: &[serde_json::Value], summary: &str) -> bool {
1450 messages.first().is_some_and(|message| {
1451 message.get("role").and_then(|value| value.as_str()) == Some("user")
1452 && message.get("content").and_then(|value| value.as_str()) == Some(summary)
1453 })
1454}
1455
1456pub fn prompt_state_json(id: &str) -> SessionPromptState {
1464 SESSIONS.with(|s| {
1465 let map = s.borrow();
1466 let Some(state) = map.get(id) else {
1467 return SessionPromptState::default();
1468 };
1469 let Some(dict) = state.transcript.as_dict() else {
1470 return SessionPromptState::default();
1471 };
1472 let mut messages = match dict.get("messages") {
1473 Some(VmValue::List(list)) => list
1474 .iter()
1475 .map(crate::llm::helpers::vm_value_to_json)
1476 .collect::<Vec<_>>(),
1477 _ => Vec::new(),
1478 };
1479 let summary = dict.get("summary").and_then(|value| match value {
1480 VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
1481 _ => None,
1482 });
1483 if let Some(summary_text) = summary.as_deref() {
1484 if !messages_begin_with_summary(&messages, summary_text) {
1485 messages.insert(0, summary_message_json(summary_text));
1486 }
1487 }
1488 SessionPromptState { messages, summary }
1489 })
1490}
1491
1492pub fn store_transcript(id: &str, transcript: VmValue) -> Result<(), String> {
1495 SESSIONS.with(|s| {
1496 let mut map = s.borrow_mut();
1497 let Some(state) = map.get_mut(id) else {
1498 return Err(format!(
1499 "agent_session_store_transcript: unknown session id '{id}'"
1500 ));
1501 };
1502 let transcript = transcript_with_session_metadata(transcript, state);
1503 apply_transcript_with_budget(state, transcript, "store_transcript")?;
1504 state.last_accessed = Instant::now();
1505 Ok(())
1506 })
1507}
1508
1509pub fn prune_invalid_reminder_events(id: &str) -> usize {
1513 SESSIONS.with(|s| {
1514 let mut map = s.borrow_mut();
1515 let Some(state) = map.get_mut(id) else {
1516 return 0;
1517 };
1518 let Some(dict) = state.transcript.as_dict().cloned() else {
1519 return 0;
1520 };
1521 let Some(VmValue::List(events)) = dict.get("events") else {
1522 return 0;
1523 };
1524 let mut pruned = 0_usize;
1525 let mut kept = Vec::with_capacity(events.len());
1526 for event in events.iter().cloned() {
1527 let is_reminder = event
1528 .as_dict()
1529 .and_then(|event| event.get("kind"))
1530 .map(VmValue::display)
1531 .as_deref()
1532 == Some(crate::llm::helpers::SYSTEM_REMINDER_EVENT_KIND);
1533 if !is_reminder {
1534 kept.push(event);
1535 continue;
1536 }
1537 let valid = crate::llm::helpers::reminder_from_event(&event)
1538 .is_some_and(|reminder| !reminder.body.trim().is_empty());
1539 if valid {
1540 kept.push(event);
1541 } else {
1542 pruned += 1;
1543 }
1544 }
1545 if pruned > 0 {
1546 let mut next = dict;
1547 next.insert("events".to_string(), VmValue::List(Rc::new(kept)));
1548 let _ = apply_transcript_with_budget(
1549 state,
1550 VmValue::Dict(Rc::new(next)),
1551 "prune_invalid_reminder_events",
1552 );
1553 state.last_accessed = Instant::now();
1554 }
1555 pruned
1556 })
1557}
1558
1559pub fn apply_reminder_post_turn(id: &str, turn: i64) -> Result<serde_json::Value, String> {
1564 let report = SESSIONS.with(|s| {
1565 let mut map = s.borrow_mut();
1566 let Some(state) = map.get_mut(id) else {
1567 return Err(format!(
1568 "agent_session_apply_reminder_post_turn: unknown session id '{id}'"
1569 ));
1570 };
1571 let report = crate::llm::helpers::apply_reminder_post_turn(&state.transcript, turn);
1572 if report.decremented_count > 0 || !report.expired.is_empty() {
1573 if let Some(next) = report.transcript.clone() {
1574 apply_transcript_with_budget(state, next, "apply_reminder_post_turn")?;
1575 }
1576 state.last_accessed = Instant::now();
1577 }
1578 Ok(report)
1579 })?;
1580
1581 for reminder in &report.expired {
1582 let mut payload = crate::llm::helpers::reminder_lifecycle_payload(Some(id), reminder);
1583 if let Some(obj) = payload.as_object_mut() {
1584 obj.insert(
1585 "transcript_id".to_string(),
1586 serde_json::Value::String(id.to_string()),
1587 );
1588 obj.insert(
1589 "reason".to_string(),
1590 serde_json::Value::String("ttl".to_string()),
1591 );
1592 obj.insert(
1593 "ttl_turns_before".to_string(),
1594 serde_json::json!(&reminder.ttl_turns),
1595 );
1596 obj.insert("expired_at_turn".to_string(), serde_json::json!(turn));
1597 }
1598 crate::llm::helpers::emit_reminder_lifecycle_event(
1599 crate::llm::helpers::REMINDER_EXPIRED_EVENT_KIND,
1600 payload,
1601 );
1602 }
1603
1604 Ok(serde_json::json!({
1605 "expired_count": report.expired.len(),
1606 "decremented_count": report.decremented_count,
1607 "remaining_count": report.remaining_count,
1608 }))
1609}
1610
1611pub fn inject_reminder(
1616 id: &str,
1617 reminder: crate::llm::helpers::SystemReminder,
1618) -> Result<ReminderInjectionReport, String> {
1619 let reminder_id = reminder.id.clone();
1620 let dedupe_key = reminder.dedupe_key.clone();
1621 let mut deduped_reminder_ids = Vec::new();
1622 SESSIONS.with(|s| {
1623 let mut map = s.borrow_mut();
1624 let Some(state) = map.get_mut(id) else {
1625 return Err(format!(
1626 "agent_session_inject_reminder: unknown session id '{id}'"
1627 ));
1628 };
1629 let dict = state
1630 .transcript
1631 .as_dict()
1632 .cloned()
1633 .unwrap_or_else(BTreeMap::new);
1634 let mut events: Vec<VmValue> = match dict.get("events") {
1635 Some(VmValue::List(list)) => list.iter().cloned().collect(),
1636 _ => dict
1637 .get("messages")
1638 .and_then(|value| match value {
1639 VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
1640 _ => None,
1641 })
1642 .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
1643 .unwrap_or_default(),
1644 };
1645 if let Some(expected_key) = dedupe_key.as_deref() {
1646 events.retain(|event| {
1647 let Some(existing) = crate::llm::helpers::reminder_from_event(event) else {
1648 return true;
1649 };
1650 if existing.dedupe_key.as_deref() == Some(expected_key) {
1651 deduped_reminder_ids.push(existing.id);
1652 false
1653 } else {
1654 true
1655 }
1656 });
1657 }
1658 events.push(crate::llm::helpers::transcript_reminder_event(&reminder));
1659 let mut next = dict;
1660 next.insert("events".to_string(), VmValue::List(Rc::new(events)));
1661 apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), "inject_reminder")?;
1662 state.last_accessed = Instant::now();
1663 Ok(())
1664 })?;
1665
1666 if !deduped_reminder_ids.is_empty() {
1667 let dropped_count = deduped_reminder_ids.len();
1668 crate::llm::helpers::emit_reminder_lifecycle_event(
1669 crate::llm::helpers::REMINDER_DEDUPED_EVENT_KIND,
1670 serde_json::json!({
1671 "session_id": id,
1672 "transcript_id": id,
1673 "reminder_id": &reminder_id,
1674 "replacing_id": &reminder_id,
1675 "replaced_id": deduped_reminder_ids.first(),
1676 "replaced_ids": &deduped_reminder_ids,
1677 "dedupe_key": &dedupe_key,
1678 "dropped_reminder_ids": &deduped_reminder_ids,
1679 "dropped_count": dropped_count,
1680 }),
1681 );
1682 }
1683
1684 crate::llm::helpers::emit_reminder_lifecycle_event(
1685 crate::llm::helpers::REMINDER_INJECTED_EVENT_KIND,
1686 crate::llm::helpers::reminder_lifecycle_payload(Some(id), &reminder),
1687 );
1688
1689 Ok(ReminderInjectionReport {
1690 reminder_id,
1691 deduped_count: deduped_reminder_ids.len(),
1692 })
1693}
1694
1695pub fn append_event(id: &str, event: VmValue) -> Result<(), String> {
1701 let Some(event_dict) = event.as_dict() else {
1702 return Err("agent_session_append_event: event must be a dict".into());
1703 };
1704 let kind_ok = matches!(event_dict.get("kind"), Some(VmValue::String(_)));
1705 if !kind_ok {
1706 return Err("agent_session_append_event: event must have a string `kind`".into());
1707 }
1708 SESSIONS.with(|s| {
1709 let mut map = s.borrow_mut();
1710 let Some(state) = map.get_mut(id) else {
1711 return Err(format!(
1712 "agent_session_append_event: unknown session id '{id}'"
1713 ));
1714 };
1715 append_event_to_state(state, event, "append_event")?;
1716 state.last_accessed = Instant::now();
1717 Ok(())
1718 })
1719}
1720
1721fn append_event_to_state(
1722 state: &mut SessionState,
1723 event: VmValue,
1724 action: &str,
1725) -> Result<(), String> {
1726 let dict = state
1727 .transcript
1728 .as_dict()
1729 .cloned()
1730 .unwrap_or_else(BTreeMap::new);
1731 let mut events: Vec<VmValue> = match dict.get("events") {
1732 Some(VmValue::List(list)) => list.iter().cloned().collect(),
1733 _ => dict
1734 .get("messages")
1735 .and_then(|value| match value {
1736 VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
1737 _ => None,
1738 })
1739 .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
1740 .unwrap_or_default(),
1741 };
1742 events.push(event);
1743 let mut next = dict;
1744 next.insert("events".to_string(), VmValue::List(Rc::new(events)));
1745 apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), action)
1746}
1747
1748pub fn replace_messages(id: &str, messages: &[serde_json::Value]) -> Result<(), String> {
1751 replace_messages_with_summary(id, messages, None)
1752}
1753
1754pub fn replace_messages_with_summary(
1759 id: &str,
1760 messages: &[serde_json::Value],
1761 summary: Option<&str>,
1762) -> Result<(), String> {
1763 SESSIONS.with(|s| {
1764 let mut map = s.borrow_mut();
1765 let Some(state) = map.get_mut(id) else {
1766 return Err(format!(
1767 "agent_session_replace_messages: unknown session id '{id}'"
1768 ));
1769 };
1770 let dict = state
1771 .transcript
1772 .as_dict()
1773 .cloned()
1774 .unwrap_or_else(BTreeMap::new);
1775 let vm_messages: Vec<VmValue> = messages
1776 .iter()
1777 .map(crate::stdlib::json_to_vm_value)
1778 .collect();
1779 let mut next = dict;
1780 next.insert(
1781 "events".to_string(),
1782 VmValue::List(Rc::new(
1783 crate::llm::helpers::transcript_events_from_messages(&vm_messages),
1784 )),
1785 );
1786 next.insert("messages".to_string(), VmValue::List(Rc::new(vm_messages)));
1787 if let Some(summary) = summary {
1788 next.insert(
1789 "summary".to_string(),
1790 VmValue::String(Rc::from(summary.to_string())),
1791 );
1792 } else {
1793 next.remove("summary");
1794 }
1795 apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), "replace_messages")?;
1796 state.last_accessed = Instant::now();
1797 Ok(())
1798 })
1799}
1800
1801pub fn append_subscriber(id: &str, callback: VmValue) {
1802 open_or_create(Some(id.to_string()));
1803 SESSIONS.with(|s| {
1804 if let Some(state) = s.borrow_mut().get_mut(id) {
1805 state.subscribers.push(callback);
1806 state.last_accessed = Instant::now();
1807 }
1808 });
1809}
1810
1811pub fn subscribers_for(id: &str) -> Vec<VmValue> {
1812 SESSIONS.with(|s| {
1813 s.borrow()
1814 .get(id)
1815 .map(|state| state.subscribers.clone())
1816 .unwrap_or_default()
1817 })
1818}
1819
1820pub fn subscriber_count(id: &str) -> usize {
1821 SESSIONS.with(|s| {
1822 s.borrow()
1823 .get(id)
1824 .map(|state| state.subscribers.len())
1825 .unwrap_or(0)
1826 })
1827}
1828
1829pub fn set_active_skills(id: &str, skills: Vec<String>) {
1833 SESSIONS.with(|s| {
1834 if let Some(state) = s.borrow_mut().get_mut(id) {
1835 state.active_skills = skills;
1836 state.last_accessed = Instant::now();
1837 }
1838 });
1839}
1840
1841pub fn active_skills(id: &str) -> Vec<String> {
1845 SESSIONS.with(|s| {
1846 s.borrow()
1847 .get(id)
1848 .map(|state| state.active_skills.clone())
1849 .unwrap_or_default()
1850 })
1851}
1852
1853pub fn claim_tool_format(id: &str, tool_format: &str) -> Result<(), String> {
1859 let tool_format = tool_format.trim();
1860 if tool_format.is_empty() {
1861 return Ok(());
1862 }
1863 SESSIONS.with(|s| {
1864 let mut map = s.borrow_mut();
1865 let Some(state) = map.get_mut(id) else {
1866 return Err(format!("agent session '{id}' does not exist"));
1867 };
1868 match state.tool_format.as_deref() {
1869 Some(existing) if existing != tool_format => Err(format!(
1870 "agent session '{id}' is locked to tool_format='{existing}', but this run requested tool_format='{tool_format}'. Start a new session or fork/reset the transcript before changing tool mode."
1871 )),
1872 Some(_) => {
1873 state.last_accessed = Instant::now();
1874 Ok(())
1875 }
1876 None => {
1877 state.tool_format = Some(tool_format.to_string());
1878 state.last_accessed = Instant::now();
1879 Ok(())
1880 }
1881 }
1882 })
1883}
1884
1885pub fn tool_format(id: &str) -> Option<String> {
1886 SESSIONS.with(|s| {
1887 s.borrow()
1888 .get(id)
1889 .and_then(|state| state.tool_format.clone())
1890 })
1891}
1892
1893pub fn record_system_prompt(id: &str, system_prompt: &str) -> Result<(), String> {
1894 let system_prompt = system_prompt.trim();
1895 if system_prompt.is_empty() {
1896 return Ok(());
1897 }
1898 assert_cache_stable_system_prompt(system_prompt);
1899 SESSIONS.with(|s| {
1900 let mut map = s.borrow_mut();
1901 let Some(state) = map.get_mut(id) else {
1902 return Err(format!("agent session '{id}' does not exist"));
1903 };
1904 let changed = state.system_prompt.as_deref() != Some(system_prompt);
1905 state.system_prompt = Some(system_prompt.to_string());
1906 let dict = state
1907 .transcript
1908 .as_dict()
1909 .cloned()
1910 .unwrap_or_else(BTreeMap::new);
1911 let mut next = dict;
1912 apply_system_prompt_metadata(&mut next, system_prompt);
1913 if changed {
1914 let mut events: Vec<VmValue> = match next.get("events") {
1915 Some(VmValue::List(list)) => list.iter().cloned().collect(),
1916 _ => Vec::new(),
1917 };
1918 events.push(crate::llm::helpers::transcript_event(
1919 "system_prompt",
1920 "system",
1921 "internal",
1922 "",
1923 Some(crate::llm::helpers::system_prompt_event_metadata(
1924 system_prompt,
1925 )),
1926 ));
1927 next.insert("events".to_string(), VmValue::List(Rc::new(events)));
1928 }
1929 apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), "record_system_prompt")?;
1930 state.last_accessed = Instant::now();
1931 Ok(())
1932 })
1933}
1934
1935pub fn system_prompt(id: &str) -> Option<String> {
1936 SESSIONS.with(|s| {
1937 s.borrow()
1938 .get(id)
1939 .and_then(|state| state.system_prompt.clone())
1940 })
1941}
1942
1943#[cfg(debug_assertions)]
1944fn forbidden_workspace_prompt_token(system_prompt: &str) -> Option<&'static str> {
1945 let mut remaining = system_prompt;
1946 while let Some(index) = remaining.find("{{") {
1947 let candidate = remaining[index + 2..].trim_start();
1948 if candidate.starts_with("workspace_") {
1949 return Some("workspace_");
1950 }
1951 if candidate.starts_with("project_") {
1952 return Some("project_");
1953 }
1954 remaining = candidate;
1955 }
1956 None
1957}
1958
1959#[cfg(debug_assertions)]
1960fn assert_cache_stable_system_prompt(system_prompt: &str) {
1961 if let Some(prefix) = forbidden_workspace_prompt_token(system_prompt) {
1962 panic!(
1963 "{CACHE_STABLE_SYSTEM_PROMPT_DIAGNOSTIC}: session system prompts must not interpolate `{{{{{prefix}...` tokens; move workspace/project context into the workspace-anchor reminder"
1964 );
1965 }
1966}
1967
1968#[cfg(not(debug_assertions))]
1969fn assert_cache_stable_system_prompt(_system_prompt: &str) {}
1970
1971pub fn set_pinned_model(id: &str, model: Option<String>) -> Result<bool, String> {
1976 let normalized = model
1977 .map(|value| value.trim().to_string())
1978 .filter(|value| !value.is_empty());
1979 SESSIONS.with(|s| {
1980 let mut map = s.borrow_mut();
1981 let Some(state) = map.get_mut(id) else {
1982 return Err(format!("agent session '{id}' does not exist"));
1983 };
1984 let changed = state.pinned_model != normalized;
1985 state.pinned_model = normalized;
1986 state.last_accessed = Instant::now();
1987 Ok(changed)
1988 })
1989}
1990
1991pub fn pinned_model(id: &str) -> Option<String> {
1995 SESSIONS.with(|s| {
1996 s.borrow()
1997 .get(id)
1998 .and_then(|state| state.pinned_model.clone())
1999 })
2000}
2001
2002pub fn set_pinned_reasoning_policy(id: &str, policy: Option<String>) -> Result<bool, String> {
2004 let normalized = match policy {
2005 Some(value) => crate::llm::reasoning_policy::normalize_policy_selector(&value)?,
2006 None => None,
2007 };
2008 SESSIONS.with(|s| {
2009 let mut map = s.borrow_mut();
2010 let Some(state) = map.get_mut(id) else {
2011 return Err(format!("agent session '{id}' does not exist"));
2012 };
2013 let changed = state.pinned_reasoning_policy != normalized;
2014 state.pinned_reasoning_policy = normalized;
2015 state.last_accessed = Instant::now();
2016 Ok(changed)
2017 })
2018}
2019
2020pub fn pinned_reasoning_policy(id: &str) -> Option<String> {
2022 SESSIONS.with(|s| {
2023 s.borrow()
2024 .get(id)
2025 .and_then(|state| state.pinned_reasoning_policy.clone())
2026 })
2027}
2028
2029pub fn set_workspace_anchor(id: &str, anchor: Option<WorkspaceAnchor>) -> Result<bool, String> {
2033 SESSIONS.with(|s| {
2034 let mut map = s.borrow_mut();
2035 let Some(state) = map.get_mut(id) else {
2036 return Err(format!("agent session '{id}' does not exist"));
2037 };
2038 let changed = state.workspace_anchor != anchor;
2039 state.workspace_anchor = anchor;
2040 if changed {
2041 crate::llm::permissions::clear_session_grants(id);
2042 }
2043 state.last_accessed = Instant::now();
2044 Ok(changed)
2045 })
2046}
2047
2048pub fn workspace_anchor(id: &str) -> Option<WorkspaceAnchor> {
2050 SESSIONS.with(|s| {
2051 s.borrow()
2052 .get(id)
2053 .and_then(|state| state.workspace_anchor.clone())
2054 })
2055}
2056
2057#[derive(Clone, Debug, PartialEq, Eq)]
2061pub struct ReanchorOutcome {
2062 pub previous: Option<WorkspaceAnchor>,
2063 pub current: WorkspaceAnchor,
2064 pub changed: bool,
2065}
2066
2067pub fn reanchor_session(
2072 id: &str,
2073 new_anchor: WorkspaceAnchor,
2074 carry_transcript: bool,
2075 compacted: bool,
2076 reason: Option<String>,
2077) -> Result<ReanchorOutcome, String> {
2078 let outcome = SESSIONS.with(|s| {
2079 let mut map = s.borrow_mut();
2080 let Some(state) = map.get_mut(id) else {
2081 return Err(format!("agent session '{id}' does not exist"));
2082 };
2083 let previous = state.workspace_anchor.clone();
2084 let changed = previous.as_ref() != Some(&new_anchor);
2085 state.workspace_anchor = Some(new_anchor.clone());
2086 if changed {
2087 crate::llm::permissions::clear_session_grants(id);
2088 }
2089 state.last_accessed = Instant::now();
2090 Ok(ReanchorOutcome {
2091 previous,
2092 current: new_anchor,
2093 changed,
2094 })
2095 })?;
2096 if !outcome.changed {
2097 return Ok(outcome);
2098 }
2099 let previous_json = outcome.previous.as_ref().map(WorkspaceAnchor::to_json);
2100 let current_json = outcome.current.to_json();
2101 let event_metadata = serde_json::json!({
2102 "previous": previous_json,
2103 "current": current_json,
2104 "carry_transcript": carry_transcript,
2105 "compacted": compacted,
2106 "reason": reason,
2107 });
2108 let event = crate::llm::helpers::transcript_event(
2109 "AnchorChanged",
2110 "system",
2111 "internal",
2112 "",
2113 Some(event_metadata),
2114 );
2115 let _ = append_event(id, event);
2116 crate::llm::emit_live_agent_event_sync(&crate::agent_events::AgentEvent::AnchorChanged {
2117 session_id: id.to_string(),
2118 previous: previous_json,
2119 current: current_json,
2120 carry_transcript,
2121 compacted,
2122 reason,
2123 });
2124 Ok(outcome)
2125}
2126
2127pub fn set_workspace_policy(id: &str, policy: WorkspacePolicy) -> Result<bool, String> {
2130 SESSIONS.with(|s| {
2131 let mut map = s.borrow_mut();
2132 let Some(state) = map.get_mut(id) else {
2133 return Err(format!("agent session '{id}' does not exist"));
2134 };
2135 let changed = state.workspace_policy != policy;
2136 state.workspace_policy = policy;
2137 state.last_accessed = Instant::now();
2138 Ok(changed)
2139 })
2140}
2141
2142pub fn workspace_policy(id: &str) -> Option<WorkspacePolicy> {
2144 SESSIONS.with(|s| {
2145 s.borrow()
2146 .get(id)
2147 .map(|state| state.workspace_policy.clone())
2148 })
2149}
2150
2151pub fn add_workspace_root(
2155 id: &str,
2156 root: &str,
2157 mount_mode: Option<MountMode>,
2158 reason: Option<String>,
2159) -> Result<String, String> {
2160 let normalized_root = validate_workspace_root_path(root)?;
2161 let mounted_at = crate::orchestration::now_rfc3339();
2162 SESSIONS.with(|s| {
2163 let mut map = s.borrow_mut();
2164 let Some(state) = map.get_mut(id) else {
2165 return Err(format!("agent session '{id}' does not exist"));
2166 };
2167 let default_mount_mode = state.workspace_policy.default_mount_mode;
2168 let Some(anchor) = state.workspace_anchor.as_mut() else {
2169 return Err(format!("agent session '{id}' has no workspace anchor"));
2170 };
2171 let resolved_mount_mode = mount_mode.unwrap_or(default_mount_mode);
2172 if let Some(existing) = anchor
2173 .additional_roots
2174 .iter_mut()
2175 .find(|entry| entry.path == normalized_root)
2176 {
2177 existing.mount_mode = resolved_mount_mode;
2178 existing.mounted_at = mounted_at.clone();
2179 } else {
2180 anchor.additional_roots.push(MountedRoot {
2181 path: normalized_root.clone(),
2182 mount_mode: resolved_mount_mode,
2183 mounted_at: mounted_at.clone(),
2184 });
2185 }
2186 let event = crate::llm::helpers::transcript_event(
2187 "RootMounted",
2188 "system",
2189 "internal",
2190 "",
2191 Some(serde_json::json!({
2192 "path": normalized_root.to_string_lossy(),
2193 "mount_mode": resolved_mount_mode.as_str(),
2194 "mounted_at": mounted_at.clone(),
2195 "reason": reason,
2196 })),
2197 );
2198 append_event_to_state(state, event, "add_workspace_root")?;
2199 crate::llm::permissions::clear_session_grants(id);
2200 state.last_accessed = Instant::now();
2201 Ok(mounted_at.clone())
2202 })
2203}
2204
2205pub fn remove_workspace_root(id: &str, root: &str) -> Result<bool, String> {
2208 let normalized_root = normalize_workspace_root_path(root);
2209 SESSIONS.with(|s| {
2210 let mut map = s.borrow_mut();
2211 let Some(state) = map.get_mut(id) else {
2212 return Err(format!("agent session '{id}' does not exist"));
2213 };
2214 let Some(anchor) = state.workspace_anchor.as_mut() else {
2215 return Err(format!("agent session '{id}' has no workspace anchor"));
2216 };
2217 let before = anchor.additional_roots.len();
2218 anchor
2219 .additional_roots
2220 .retain(|entry| entry.path != normalized_root);
2221 let removed = anchor.additional_roots.len() != before;
2222 if removed {
2223 crate::llm::permissions::clear_session_grants(id);
2224 }
2225 state.last_accessed = Instant::now();
2226 Ok(removed)
2227 })
2228}
2229
2230pub fn list_workspace_roots(id: &str) -> Result<(PathBuf, Vec<MountedRoot>), String> {
2232 SESSIONS.with(|s| {
2233 let map = s.borrow();
2234 let Some(state) = map.get(id) else {
2235 return Err(format!("agent session '{id}' does not exist"));
2236 };
2237 let Some(anchor) = state.workspace_anchor.as_ref() else {
2238 return Err(format!("agent session '{id}' has no workspace anchor"));
2239 };
2240 Ok((anchor.primary.clone(), anchor.additional_roots.clone()))
2241 })
2242}
2243
2244fn validate_workspace_root_path(root: &str) -> Result<PathBuf, String> {
2245 let normalized = normalize_workspace_root_path(root);
2246 let canonical = std::fs::canonicalize(&normalized)
2247 .map_err(|error| format!("workspace root '{root}' must exist and be readable: {error}"))?;
2248 let metadata = std::fs::metadata(&canonical)
2249 .map_err(|error| format!("workspace root '{root}' must exist and be readable: {error}"))?;
2250 if !metadata.is_dir() {
2251 return Err(format!("workspace root '{root}' must be a directory"));
2252 }
2253 std::fs::read_dir(&canonical)
2254 .map_err(|error| format!("workspace root '{root}' must be readable: {error}"))?;
2255 Ok(canonical)
2256}
2257
2258fn normalize_workspace_root_path(root: &str) -> PathBuf {
2259 let absolute = crate::stdlib::process::normalize_context_path(Path::new(root));
2260 std::fs::canonicalize(&absolute).unwrap_or(absolute)
2261}
2262
2263fn empty_transcript(id: &str) -> VmValue {
2264 use crate::llm::helpers::new_transcript_with;
2265 new_transcript_with(Some(id.to_string()), Vec::new(), None, None)
2266}
2267
2268fn clone_transcript_with_id(transcript: &VmValue, new_id: &str) -> VmValue {
2269 let Some(dict) = transcript.as_dict() else {
2270 return empty_transcript(new_id);
2271 };
2272 let mut next = dict.clone();
2273 next.insert(
2274 "id".to_string(),
2275 VmValue::String(Rc::from(new_id.to_string())),
2276 );
2277 VmValue::Dict(Rc::new(next))
2278}
2279
2280fn clone_transcript_with_parent(transcript: &VmValue, parent_id: &str) -> VmValue {
2281 let Some(dict) = transcript.as_dict() else {
2282 return transcript.clone();
2283 };
2284 let mut next = dict.clone();
2285 let metadata = match next.get("metadata") {
2286 Some(VmValue::Dict(metadata)) => {
2287 let mut metadata = metadata.as_ref().clone();
2288 metadata.insert(
2289 "parent_session_id".to_string(),
2290 VmValue::String(Rc::from(parent_id.to_string())),
2291 );
2292 VmValue::Dict(Rc::new(metadata))
2293 }
2294 _ => VmValue::Dict(Rc::new(BTreeMap::from([(
2295 "parent_session_id".to_string(),
2296 VmValue::String(Rc::from(parent_id.to_string())),
2297 )]))),
2298 };
2299 next.insert("metadata".to_string(), metadata);
2300 VmValue::Dict(Rc::new(next))
2301}
2302
2303fn apply_system_prompt_metadata(next: &mut BTreeMap<String, VmValue>, system_prompt: &str) {
2304 let mut metadata = match next.get("metadata") {
2305 Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
2306 _ => BTreeMap::new(),
2307 };
2308 metadata.insert(
2309 "system_prompt".to_string(),
2310 crate::stdlib::json_to_vm_value(&crate::llm::helpers::system_prompt_metadata(
2311 system_prompt,
2312 )),
2313 );
2314 next.insert("metadata".to_string(), VmValue::Dict(Rc::new(metadata)));
2315}
2316
2317fn transcript_with_session_metadata(transcript: VmValue, state: &SessionState) -> VmValue {
2318 let Some(dict) = transcript.as_dict() else {
2319 return transcript;
2320 };
2321 let mut next = dict.clone();
2322 let mut metadata = match next.get("metadata") {
2323 Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
2324 _ => BTreeMap::new(),
2325 };
2326 if let Some(tool_format) = state.tool_format.as_ref() {
2327 metadata.insert(
2328 "tool_format".to_string(),
2329 VmValue::String(Rc::from(tool_format.clone())),
2330 );
2331 metadata.insert("tool_mode_locked".to_string(), VmValue::Bool(true));
2332 }
2333 if let Some(system_prompt) = state.system_prompt.as_ref() {
2334 metadata.insert(
2335 "system_prompt".to_string(),
2336 crate::stdlib::json_to_vm_value(&crate::llm::helpers::system_prompt_metadata(
2337 system_prompt,
2338 )),
2339 );
2340 }
2341 if let Some(anchor) = state.workspace_anchor.as_ref() {
2342 metadata.insert(
2343 WORKSPACE_ANCHOR_METADATA_KEY.to_string(),
2344 anchor.to_vm_value(),
2345 );
2346 } else {
2347 metadata.remove(WORKSPACE_ANCHOR_METADATA_KEY);
2348 }
2349 if let Some(last_action) = state.last_transcript_budget_action.as_ref() {
2350 let usage = transcript_usage(
2351 &VmValue::Dict(Rc::new(next.clone())),
2352 state.transcript_budget_policy.max_approx_bytes.is_some(),
2353 );
2354 metadata.insert(
2355 "transcript_budget".to_string(),
2356 crate::stdlib::json_to_vm_value(&serde_json::json!({
2357 "policy": transcript_budget_policy_json(&state.transcript_budget_policy.normalized()),
2358 "usage": transcript_budget_usage_json(&usage),
2359 "last_action": last_action,
2360 })),
2361 );
2362 }
2363 if !metadata.is_empty() {
2364 next.insert("metadata".to_string(), VmValue::Dict(Rc::new(metadata)));
2365 }
2366 VmValue::Dict(Rc::new(next))
2367}
2368
2369fn session_snapshot(state: &SessionState) -> VmValue {
2370 let transcript = transcript_with_session_metadata(state.transcript.clone(), state);
2371 let Some(dict) = transcript.as_dict() else {
2372 return state.transcript.clone();
2373 };
2374 let mut next = dict.clone();
2375 let length = next
2376 .get("messages")
2377 .and_then(|value| match value {
2378 VmValue::List(list) => Some(list.len() as i64),
2379 _ => None,
2380 })
2381 .unwrap_or(0);
2382 next.insert("length".to_string(), VmValue::Int(length));
2383 next.insert(
2384 "created_at".to_string(),
2385 VmValue::String(Rc::from(state.created_at.clone())),
2386 );
2387 next.insert(
2388 "parent_id".to_string(),
2389 state
2390 .parent_id
2391 .as_ref()
2392 .map(|id| VmValue::String(Rc::from(id.clone())))
2393 .unwrap_or(VmValue::Nil),
2394 );
2395 next.insert(
2396 "child_ids".to_string(),
2397 VmValue::List(Rc::new(
2398 state
2399 .child_ids
2400 .iter()
2401 .cloned()
2402 .map(|id| VmValue::String(Rc::from(id)))
2403 .collect(),
2404 )),
2405 );
2406 next.insert(
2407 "branched_at_event_index".to_string(),
2408 state
2409 .branched_at_event_index
2410 .map(|index| VmValue::Int(index as i64))
2411 .unwrap_or(VmValue::Nil),
2412 );
2413 next.insert(
2414 "system_prompt".to_string(),
2415 state
2416 .system_prompt
2417 .as_ref()
2418 .map(|prompt| VmValue::String(Rc::from(prompt.clone())))
2419 .unwrap_or(VmValue::Nil),
2420 );
2421 next.insert(
2422 "tool_format".to_string(),
2423 state
2424 .tool_format
2425 .as_ref()
2426 .map(|format| VmValue::String(Rc::from(format.clone())))
2427 .unwrap_or(VmValue::Nil),
2428 );
2429 next.insert(
2430 "pinned_model".to_string(),
2431 state
2432 .pinned_model
2433 .as_ref()
2434 .map(|model| VmValue::String(Rc::from(model.clone())))
2435 .unwrap_or(VmValue::Nil),
2436 );
2437 next.insert(
2438 "pinned_reasoning_policy".to_string(),
2439 state
2440 .pinned_reasoning_policy
2441 .as_ref()
2442 .map(|policy| VmValue::String(Rc::from(policy.clone())))
2443 .unwrap_or(VmValue::Nil),
2444 );
2445 next.insert(
2446 "workspace_anchor".to_string(),
2447 state
2448 .workspace_anchor
2449 .as_ref()
2450 .map(WorkspaceAnchor::to_vm_value)
2451 .unwrap_or(VmValue::Nil),
2452 );
2453 next.insert(
2454 "workspace_policy".to_string(),
2455 state.workspace_policy.to_vm_value(),
2456 );
2457 VmValue::Dict(Rc::new(next))
2458}
2459
2460fn update_lineage(
2461 map: &mut HashMap<String, SessionState>,
2462 parent_id: &str,
2463 child_id: &str,
2464 branched_at_event_index: Option<usize>,
2465) {
2466 let old_parent_id = map.get(child_id).and_then(|child| child.parent_id.clone());
2467 if let Some(old_parent_id) = old_parent_id.filter(|old_parent_id| old_parent_id != parent_id) {
2468 if let Some(old_parent) = map.get_mut(&old_parent_id) {
2469 old_parent.child_ids.retain(|id| id != child_id);
2470 old_parent.last_accessed = Instant::now();
2471 }
2472 }
2473 if let Some(parent) = map.get_mut(parent_id) {
2474 parent.last_accessed = Instant::now();
2475 if !parent.child_ids.iter().any(|id| id == child_id) {
2476 parent.child_ids.push(child_id.to_string());
2477 }
2478 }
2479 if let Some(child) = map.get_mut(child_id) {
2480 child.last_accessed = Instant::now();
2481 child.parent_id = Some(parent_id.to_string());
2482 child.branched_at_event_index = branched_at_event_index;
2483 child.transcript = clone_transcript_with_parent(&child.transcript, parent_id);
2484 }
2485}
2486
2487fn branch_event_index(transcript: &VmValue, keep_first: usize) -> usize {
2488 if keep_first == 0 {
2489 return 0;
2490 }
2491 let Some(dict) = transcript.as_dict() else {
2492 return keep_first;
2493 };
2494 let Some(VmValue::List(events)) = dict.get("events") else {
2495 return keep_first;
2496 };
2497 event_prefix_len_for_messages(events, keep_first)
2498}
2499
2500fn event_kind(event: &VmValue) -> Option<String> {
2501 event
2502 .as_dict()
2503 .and_then(|dict| dict.get("kind"))
2504 .map(VmValue::display)
2505}
2506
2507fn event_id(event: &VmValue) -> Option<String> {
2508 event
2509 .as_dict()
2510 .and_then(|dict| dict.get("id"))
2511 .map(VmValue::display)
2512}
2513
2514fn is_turn_event(event: &VmValue) -> bool {
2515 matches!(
2516 event_kind(event).as_deref(),
2517 Some("message" | "tool_result")
2518 )
2519}
2520
2521fn event_prefix_len_for_messages(events: &[VmValue], keep_first: usize) -> usize {
2522 if keep_first == 0 {
2523 return 0;
2524 }
2525 let mut retained_messages = 0usize;
2526 for (index, event) in events.iter().enumerate() {
2527 if is_turn_event(event) {
2528 retained_messages += 1;
2529 if retained_messages == keep_first {
2530 return index + 1;
2531 }
2532 }
2533 }
2534 events.len()
2535}
2536
2537fn turn_event_id_for_count(events: &[VmValue], keep_first: usize) -> Option<String> {
2538 if keep_first == 0 {
2539 return None;
2540 }
2541 let mut retained_messages = 0usize;
2542 for event in events {
2543 if is_turn_event(event) {
2544 retained_messages += 1;
2545 if retained_messages == keep_first {
2546 return event_id(event);
2547 }
2548 }
2549 }
2550 None
2551}
2552
2553#[cfg(test)]
2554#[path = "agent_sessions_tests.rs"]
2555mod tests;