1use std::cell::{Cell, RefCell};
22use std::collections::{BTreeMap, HashMap, HashSet};
23use std::future::Future;
24use std::path::{Path, PathBuf};
25use std::rc::Rc;
26use std::time::Instant;
27
28use crate::runtime_limits::RuntimeLimits;
29use crate::value::VmValue;
30use crate::workspace_anchor::{
31 MountMode, MountedRoot, WorkspaceAnchor, WorkspacePolicy, WORKSPACE_ANCHOR_METADATA_KEY,
32};
33
34pub const DEFAULT_SESSION_CAP: usize = RuntimeLimits::DEFAULT.max_agent_sessions;
37
38pub const DEFAULT_TRANSCRIPT_MESSAGE_CAP: usize = 4096;
42
43pub const DEFAULT_TRANSCRIPT_EVENT_CAP: usize = 32768;
46#[cfg(debug_assertions)]
47const CACHE_STABLE_SYSTEM_PROMPT_DIAGNOSTIC: &str = "HARN-CACHE-001";
48
49#[derive(Clone, Debug, PartialEq, Eq)]
50pub enum TranscriptBudgetRecovery {
51 Reject,
52 Trim { keep_last: usize },
53 Compact { keep_last: usize },
54}
55
56#[derive(Clone, Debug, PartialEq, Eq)]
57pub struct SessionTranscriptBudgetPolicy {
58 pub max_messages: usize,
59 pub max_events: usize,
60 pub max_approx_bytes: Option<usize>,
61 pub recovery: TranscriptBudgetRecovery,
62}
63
64impl SessionTranscriptBudgetPolicy {
65 pub fn reject(max_messages: usize, max_events: usize) -> Self {
66 Self {
67 max_messages: max_messages.max(1),
68 max_events: max_events.max(1),
69 max_approx_bytes: None,
70 recovery: TranscriptBudgetRecovery::Reject,
71 }
72 }
73
74 pub fn trim(max_messages: usize, max_events: usize, keep_last: usize) -> Self {
75 Self {
76 max_messages: max_messages.max(1),
77 max_events: max_events.max(1),
78 max_approx_bytes: None,
79 recovery: TranscriptBudgetRecovery::Trim { keep_last },
80 }
81 }
82
83 pub fn compact(max_messages: usize, max_events: usize, keep_last: usize) -> Self {
84 Self {
85 max_messages: max_messages.max(1),
86 max_events: max_events.max(1),
87 max_approx_bytes: None,
88 recovery: TranscriptBudgetRecovery::Compact { keep_last },
89 }
90 }
91
92 pub fn with_max_approx_bytes(mut self, max_approx_bytes: Option<usize>) -> Self {
93 self.max_approx_bytes = max_approx_bytes.map(|limit| limit.max(1));
94 self
95 }
96
97 fn normalized(&self) -> Self {
98 Self {
99 max_messages: self.max_messages.max(1),
100 max_events: self.max_events.max(1),
101 max_approx_bytes: self.max_approx_bytes.map(|limit| limit.max(1)),
102 recovery: self.recovery.clone(),
103 }
104 }
105}
106
107impl Default for SessionTranscriptBudgetPolicy {
108 fn default() -> Self {
109 Self::reject(DEFAULT_TRANSCRIPT_MESSAGE_CAP, DEFAULT_TRANSCRIPT_EVENT_CAP)
110 }
111}
112
113pub struct SessionState {
114 pub id: String,
115 pub transcript: VmValue,
116 pub subscribers: Vec<VmValue>,
117 pub created_at: String,
118 pub last_accessed: Instant,
119 pub parent_id: Option<String>,
120 pub child_ids: Vec<String>,
121 pub branched_at_event_index: Option<usize>,
122 pub active_skills: Vec<String>,
128 pub tool_format: Option<String>,
132 pub system_prompt: Option<String>,
136 pub pinned_model: Option<String>,
142 pub pinned_reasoning_policy: Option<String>,
148 pub workspace_policy: WorkspacePolicy,
151 pub workspace_anchor: Option<WorkspaceAnchor>,
157 pub transcript_budget_policy: SessionTranscriptBudgetPolicy,
158 pub last_transcript_budget_action: Option<serde_json::Value>,
159}
160
161impl SessionState {
162 fn new(id: String) -> Self {
163 let now = Instant::now();
164 let transcript = empty_transcript(&id);
165 Self {
166 id,
167 transcript,
168 subscribers: Vec::new(),
169 created_at: crate::orchestration::now_rfc3339(),
170 last_accessed: now,
171 parent_id: None,
172 child_ids: Vec::new(),
173 branched_at_event_index: None,
174 active_skills: Vec::new(),
175 tool_format: None,
176 system_prompt: None,
177 pinned_model: None,
178 pinned_reasoning_policy: None,
179 workspace_policy: WorkspacePolicy::default(),
180 workspace_anchor: None,
181 transcript_budget_policy: default_transcript_budget_policy(),
182 last_transcript_budget_action: None,
183 }
184 }
185}
186
187#[derive(Clone, Debug, PartialEq, Eq)]
188pub struct SessionAncestry {
189 pub parent_id: Option<String>,
190 pub child_ids: Vec<String>,
191 pub root_id: String,
192}
193
194#[derive(Clone, Debug, PartialEq, Eq)]
195pub struct SessionTruncateResult {
196 pub kept_turn_count: usize,
197 pub removed_turn_count: usize,
198 pub new_tip_turn_id: Option<String>,
199}
200
201#[derive(Clone, Debug, PartialEq, Eq)]
202pub struct ReminderInjectionReport {
203 pub reminder_id: String,
204 pub deduped_count: usize,
205}
206
207thread_local! {
208 static SESSIONS: RefCell<HashMap<String, SessionState>> = RefCell::new(HashMap::new());
209 static SESSION_CAP: Cell<usize> = const { Cell::new(DEFAULT_SESSION_CAP) };
210 static DEFAULT_TRANSCRIPT_BUDGET_POLICY: RefCell<SessionTranscriptBudgetPolicy> =
211 RefCell::new(SessionTranscriptBudgetPolicy::default());
212 static CURRENT_SESSION_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
213 static CURRENT_TOOL_CALL_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
214}
215
216tokio::task_local! {
217 static CURRENT_TOOL_CALL_TASK: String;
218}
219
220pub struct CurrentSessionGuard {
221 active: bool,
222}
223
224impl Drop for CurrentSessionGuard {
225 fn drop(&mut self) {
226 if self.active {
227 pop_current_session();
228 }
229 }
230}
231
232pub struct CurrentToolCallGuard {
238 active: bool,
239}
240
241impl Drop for CurrentToolCallGuard {
242 fn drop(&mut self) {
243 if self.active {
244 pop_current_tool_call();
245 }
246 }
247}
248
249pub fn set_session_cap(cap: usize) {
252 SESSION_CAP.with(|c| c.set(cap.max(1)));
253}
254
255pub fn session_cap() -> usize {
256 SESSION_CAP.with(|c| c.get())
257}
258
259pub fn set_default_transcript_budget_policy(policy: SessionTranscriptBudgetPolicy) {
260 DEFAULT_TRANSCRIPT_BUDGET_POLICY.with(|cell| {
261 *cell.borrow_mut() = policy.normalized();
262 });
263}
264
265pub fn reset_default_transcript_budget_policy() {
266 set_default_transcript_budget_policy(SessionTranscriptBudgetPolicy::default());
267}
268
269pub fn default_transcript_budget_policy() -> SessionTranscriptBudgetPolicy {
270 DEFAULT_TRANSCRIPT_BUDGET_POLICY.with(|cell| cell.borrow().clone())
271}
272
273pub fn transcript_budget_policy(id: &str) -> Option<SessionTranscriptBudgetPolicy> {
274 SESSIONS.with(|s| {
275 s.borrow()
276 .get(id)
277 .map(|state| state.transcript_budget_policy.clone())
278 })
279}
280
281pub fn set_transcript_budget_policy(
282 id: &str,
283 policy: SessionTranscriptBudgetPolicy,
284) -> Result<(), String> {
285 SESSIONS.with(|s| {
286 let mut map = s.borrow_mut();
287 let Some(state) = map.get_mut(id) else {
288 return Err(format!("agent session '{id}' does not exist"));
289 };
290 let previous = state.transcript_budget_policy.clone();
291 let previous_action = state.last_transcript_budget_action.clone();
292 state.transcript_budget_policy = policy.normalized();
293 let candidate = state.transcript.clone();
294 if let Err(error) = apply_transcript_with_budget(state, candidate, "policy_update") {
295 state.transcript_budget_policy = previous;
296 state.last_transcript_budget_action = previous_action;
297 return Err(error);
298 }
299 state.last_accessed = Instant::now();
300 Ok(())
301 })
302}
303
304pub fn reset_session_store() {
306 SESSIONS.with(|s| s.borrow_mut().clear());
307 CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().clear());
308 CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow_mut().clear());
309 reset_default_transcript_budget_policy();
310}
311
312pub(crate) fn push_current_session(id: String) {
313 if id.is_empty() {
314 return;
315 }
316 CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().push(id));
317}
318
319pub(crate) fn pop_current_session() {
320 CURRENT_SESSION_STACK.with(|stack| {
321 let _ = stack.borrow_mut().pop();
322 });
323}
324
325pub fn current_session_id() -> Option<String> {
326 CURRENT_SESSION_STACK.with(|stack| stack.borrow().last().cloned())
327}
328
329pub fn enter_current_session(id: impl Into<String>) -> CurrentSessionGuard {
330 let id = id.into();
331 if id.trim().is_empty() {
332 return CurrentSessionGuard { active: false };
333 }
334 push_current_session(id);
335 CurrentSessionGuard { active: true }
336}
337
338fn push_current_tool_call(id: String) {
339 if id.is_empty() {
340 return;
341 }
342 CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow_mut().push(id));
343}
344
345fn pop_current_tool_call() {
346 CURRENT_TOOL_CALL_STACK.with(|stack| {
347 let _ = stack.borrow_mut().pop();
348 });
349}
350
351pub fn current_tool_call_id() -> Option<String> {
356 if let Ok(id) = CURRENT_TOOL_CALL_TASK.try_with(Clone::clone) {
357 if !id.trim().is_empty() {
358 return Some(id);
359 }
360 }
361 CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow().last().cloned())
362}
363
364pub async fn scope_current_tool_call<F, T>(id: impl Into<String>, future: F) -> T
370where
371 F: Future<Output = T>,
372{
373 let id = id.into();
374 if id.trim().is_empty() {
375 future.await
376 } else {
377 CURRENT_TOOL_CALL_TASK.scope(id, future).await
378 }
379}
380
381pub fn enter_current_tool_call(id: impl Into<String>) -> CurrentToolCallGuard {
383 let id = id.into();
384 if id.trim().is_empty() {
385 return CurrentToolCallGuard { active: false };
386 }
387 push_current_tool_call(id);
388 CurrentToolCallGuard { active: true }
389}
390
391pub fn exists(id: &str) -> bool {
392 SESSIONS.with(|s| s.borrow().contains_key(id))
393}
394
395pub fn length(id: &str) -> Option<usize> {
396 SESSIONS.with(|s| {
397 s.borrow().get(id).map(|state| {
398 state
399 .transcript
400 .as_dict()
401 .and_then(|d| d.get("messages"))
402 .and_then(|v| match v {
403 VmValue::List(list) => Some(list.len()),
404 _ => None,
405 })
406 .unwrap_or(0)
407 })
408 })
409}
410
411pub fn snapshot(id: &str) -> Option<VmValue> {
412 SESSIONS.with(|s| s.borrow().get(id).map(session_snapshot))
413}
414
415pub fn transcript(id: &str) -> Option<VmValue> {
417 SESSIONS.with(|s| {
418 s.borrow()
419 .get(id)
420 .map(|state| transcript_with_session_metadata(state.transcript.clone(), state))
421 })
422}
423
424pub fn open_or_create(id: Option<String>) -> String {
433 let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
434 let parent_session = current_session_id();
435 let mut was_new = false;
436 SESSIONS.with(|s| {
437 let mut map = s.borrow_mut();
438 if let Some(state) = map.get_mut(&resolved) {
439 state.last_accessed = Instant::now();
440 return;
441 }
442 was_new = true;
443 let cap = SESSION_CAP.with(|c| c.get());
444 if map.len() >= cap {
445 if let Some(victim) = map
446 .iter()
447 .min_by_key(|(_, state)| state.last_accessed)
448 .map(|(id, _)| id.clone())
449 {
450 map.remove(&victim);
451 }
452 }
453 map.insert(resolved.clone(), SessionState::new(resolved.clone()));
454 });
455 if was_new {
456 if let Some(parent) = parent_session.as_deref() {
457 crate::agent_events::mirror_session_sinks(parent, &resolved);
458 }
459 try_register_event_log(&resolved);
460 }
461 resolved
462}
463
464pub fn open_child_session(parent_id: &str, id: Option<String>) -> String {
465 let resolved = open_or_create(id);
466 link_child_session(parent_id, &resolved);
467 resolved
468}
469
470pub fn link_child_session(parent_id: &str, child_id: &str) {
471 link_child_session_with_branch(parent_id, child_id, None);
472}
473
474pub fn link_child_session_with_branch(
475 parent_id: &str,
476 child_id: &str,
477 branched_at_event_index: Option<usize>,
478) {
479 if parent_id == child_id {
480 return;
481 }
482 open_or_create(Some(parent_id.to_string()));
483 open_or_create(Some(child_id.to_string()));
484 SESSIONS.with(|s| {
485 let mut map = s.borrow_mut();
486 update_lineage(&mut map, parent_id, child_id, branched_at_event_index);
487 });
488}
489
490pub fn parent_id(id: &str) -> Option<String> {
491 SESSIONS.with(|s| s.borrow().get(id).and_then(|state| state.parent_id.clone()))
492}
493
494pub fn child_ids(id: &str) -> Vec<String> {
495 SESSIONS.with(|s| {
496 s.borrow()
497 .get(id)
498 .map(|state| state.child_ids.clone())
499 .unwrap_or_default()
500 })
501}
502
503pub fn ancestry(id: &str) -> Option<SessionAncestry> {
504 SESSIONS.with(|s| {
505 let map = s.borrow();
506 let state = map.get(id)?;
507 let mut root_id = state.id.clone();
508 let mut cursor = state.parent_id.clone();
509 let mut seen = HashSet::from([state.id.clone()]);
510 while let Some(parent_id) = cursor {
511 if !seen.insert(parent_id.clone()) {
512 break;
513 }
514 root_id = parent_id.clone();
515 cursor = map
516 .get(&parent_id)
517 .and_then(|parent| parent.parent_id.clone());
518 }
519 Some(SessionAncestry {
520 parent_id: state.parent_id.clone(),
521 child_ids: state.child_ids.clone(),
522 root_id,
523 })
524 })
525}
526
527fn try_register_event_log(session_id: &str) {
531 if let Some(log) = crate::event_log::active_event_log() {
532 crate::agent_events::register_sink(
533 session_id,
534 crate::agent_events::EventLogSink::new(log, session_id),
535 );
536 return;
537 }
538 let Ok(dir) = std::env::var("HARN_EVENT_LOG_DIR") else {
539 return;
540 };
541 if dir.is_empty() {
542 return;
543 }
544 let path = std::path::PathBuf::from(dir).join(format!("event_log-{session_id}.jsonl"));
545 if let Ok(sink) = crate::agent_events::JsonlEventSink::open(&path) {
546 crate::agent_events::register_sink(session_id, sink);
547 }
548}
549
550pub fn register_event_log_sink(session_id: &str) {
551 try_register_event_log(session_id);
552}
553
554pub fn close(id: &str) {
555 SESSIONS.with(|s| {
556 s.borrow_mut().remove(id);
557 });
558 crate::orchestration::agent_inbox::clear_session(id);
562 crate::agent_events::clear_session_sinks(id);
563}
564
565pub fn close_with_status(
566 id: &str,
567 reason: impl Into<String>,
568 status: impl Into<String>,
569 metadata: serde_json::Value,
570) -> bool {
571 if !exists(id) {
572 return false;
573 }
574 let reason = reason.into();
575 let status = status.into();
576 let event_metadata = serde_json::json!({
577 "reason": reason,
578 "status": status,
579 "metadata": metadata,
580 });
581 let transcript_event = crate::llm::helpers::transcript_event(
582 "agent_session_closed",
583 "system",
584 "internal",
585 "Agent session closed",
586 Some(event_metadata),
587 );
588 let _ = append_event(id, transcript_event);
589 crate::llm::emit_live_agent_event_sync(&crate::agent_events::AgentEvent::SessionClosed {
590 session_id: id.to_string(),
591 reason,
592 status,
593 metadata,
594 });
595 close(id);
596 true
597}
598
599pub fn reset_transcript(id: &str) -> bool {
600 SESSIONS.with(|s| {
601 let mut map = s.borrow_mut();
602 let Some(state) = map.get_mut(id) else {
603 return false;
604 };
605 state.transcript = empty_transcript(id);
606 state.tool_format = None;
607 state.system_prompt = None;
608 state.last_transcript_budget_action = None;
609 state.last_accessed = Instant::now();
610 true
611 })
612}
613
614pub fn fork(src_id: &str, dst_id: Option<String>) -> Option<String> {
621 let (
622 src_transcript,
623 src_tool_format,
624 src_system_prompt,
625 src_pinned_model,
626 src_pinned_reasoning_policy,
627 src_workspace_anchor,
628 src_workspace_policy,
629 src_transcript_budget_policy,
630 src_last_transcript_budget_action,
631 dst,
632 ) = SESSIONS.with(|s| {
633 let mut map = s.borrow_mut();
634 let src = map.get_mut(src_id)?;
635 src.last_accessed = Instant::now();
636 let dst = dst_id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
637 let forked_transcript = clone_transcript_with_id(&src.transcript, &dst);
638 Some((
639 forked_transcript,
640 src.tool_format.clone(),
641 src.system_prompt.clone(),
642 src.pinned_model.clone(),
643 src.pinned_reasoning_policy.clone(),
644 src.workspace_anchor.clone(),
645 src.workspace_policy.clone(),
646 src.transcript_budget_policy.clone(),
647 src.last_transcript_budget_action.clone(),
648 dst,
649 ))
650 })?;
651 open_or_create(Some(dst.clone()));
653 SESSIONS.with(|s| {
654 let mut map = s.borrow_mut();
655 if let Some(state) = map.get_mut(&dst) {
656 state.transcript = src_transcript;
657 state.tool_format = src_tool_format;
658 state.system_prompt = src_system_prompt;
659 state.pinned_model = src_pinned_model;
660 state.pinned_reasoning_policy = src_pinned_reasoning_policy;
661 state.workspace_anchor = src_workspace_anchor;
662 state.workspace_policy = src_workspace_policy;
663 state.transcript_budget_policy = src_transcript_budget_policy;
664 state.last_transcript_budget_action = src_last_transcript_budget_action;
665 state.last_accessed = Instant::now();
666 }
667 update_lineage(&mut map, src_id, &dst, None);
668 });
669 let budget_ok = SESSIONS.with(|s| {
670 let mut map = s.borrow_mut();
671 let Some(state) = map.get_mut(&dst) else {
672 return false;
673 };
674 let candidate = state.transcript.clone();
675 apply_transcript_with_budget(state, candidate, "fork").is_ok()
676 });
677 if !budget_ok {
678 close(&dst);
679 return None;
680 }
681 if exists(&dst) {
685 Some(dst)
686 } else {
687 None
688 }
689}
690
691pub fn fork_at(src_id: &str, keep_first: usize, dst_id: Option<String>) -> Option<String> {
702 let branched_at_event_index = SESSIONS.with(|s| {
703 let map = s.borrow();
704 let src = map.get(src_id)?;
705 Some(branch_event_index(&src.transcript, keep_first))
706 })?;
707 let new_id = fork(src_id, dst_id)?;
708 link_child_session_with_branch(src_id, &new_id, Some(branched_at_event_index));
709 let _ = truncate(&new_id, keep_first);
710 Some(new_id)
711}
712
713pub fn truncate(id: &str, keep_first: usize) -> Option<SessionTruncateResult> {
717 SESSIONS.with(|s| {
718 let mut map = s.borrow_mut();
719 let state = map.get_mut(id)?;
720 let result = truncate_state(state, keep_first)?;
721 Some(result)
722 })
723}
724
725fn truncate_state(state: &mut SessionState, keep_first: usize) -> Option<SessionTruncateResult> {
726 let dict = state
727 .transcript
728 .as_dict()
729 .cloned()
730 .unwrap_or_else(BTreeMap::new);
731 let messages: Vec<VmValue> = match dict.get("messages") {
732 Some(VmValue::List(list)) => list.iter().cloned().collect(),
733 _ => Vec::new(),
734 };
735 let existing_events = match dict.get("events") {
736 Some(VmValue::List(list)) => Some(list.iter().cloned().collect::<Vec<_>>()),
737 _ => None,
738 };
739 let kept_turn_count = keep_first.min(messages.len());
740 let removed_turn_count = messages.len().saturating_sub(kept_turn_count);
741 let mut new_tip_turn_id = existing_events
742 .as_ref()
743 .map(|events| turn_event_id_for_count(events, kept_turn_count))
744 .unwrap_or_else(|| {
745 let events = crate::llm::helpers::transcript_events_from_messages(&messages);
746 turn_event_id_for_count(&events, kept_turn_count)
747 });
748
749 if removed_turn_count > 0 {
750 let retained: Vec<VmValue> = messages.into_iter().take(kept_turn_count).collect();
751 let retained_events = match existing_events {
752 Some(events) => {
753 let keep_event_count = event_prefix_len_for_messages(&events, kept_turn_count);
754 events.into_iter().take(keep_event_count).collect()
755 }
756 None => crate::llm::helpers::transcript_events_from_messages(&retained),
757 };
758 new_tip_turn_id = turn_event_id_for_count(&retained_events, kept_turn_count);
759 let mut next = dict;
760 next.insert(
761 "events".to_string(),
762 VmValue::List(Rc::new(retained_events)),
763 );
764 next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
765 next.remove("summary");
766 apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), "truncate").ok()?;
767 }
768 state.last_accessed = Instant::now();
769 Some(SessionTruncateResult {
770 kept_turn_count,
771 removed_turn_count,
772 new_tip_turn_id,
773 })
774}
775
776pub fn pop_last_if_assistant(id: &str) -> Result<bool, String> {
783 SESSIONS.with(|s| {
784 let mut map = s.borrow_mut();
785 let Some(state) = map.get_mut(id) else {
786 return Err(format!(
787 "pop_last_if_assistant: unknown session id '{id}'"
788 ));
789 };
790 let messages: Vec<VmValue> = match state.transcript.as_dict() {
791 Some(dict) => match dict.get("messages") {
792 Some(VmValue::List(list)) => list.iter().cloned().collect(),
793 _ => Vec::new(),
794 },
795 None => Vec::new(),
796 };
797 if messages.is_empty() {
798 return Ok(false);
799 }
800 let trailing_role = messages
801 .last()
802 .and_then(|m| m.as_dict())
803 .and_then(|d| d.get("role"))
804 .map(|v| v.display())
805 .unwrap_or_default();
806 if trailing_role != "assistant" {
807 return Err(format!(
808 "pop_last_if_assistant: trailing message role is '{trailing_role}', expected 'assistant'"
809 ));
810 }
811 let keep = messages.len() - 1;
812 truncate_state(state, keep);
813 Ok(true)
814 })
815}
816
817pub fn trim(id: &str, keep_last: usize) -> Option<usize> {
818 SESSIONS.with(|s| {
819 let mut map = s.borrow_mut();
820 let state = map.get_mut(id)?;
821 let dict = state.transcript.as_dict()?.clone();
822 let messages: Vec<VmValue> = match dict.get("messages") {
823 Some(VmValue::List(list)) => list.iter().cloned().collect(),
824 _ => Vec::new(),
825 };
826 let start = messages.len().saturating_sub(keep_last);
827 let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
828 let kept = retained.len();
829 let mut next = dict;
830 next.insert(
831 "events".to_string(),
832 VmValue::List(Rc::new(
833 crate::llm::helpers::transcript_events_from_messages(&retained),
834 )),
835 );
836 next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
837 apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), "trim").ok()?;
838 state.last_accessed = Instant::now();
839 Some(kept)
840 })
841}
842
843pub fn inject_message(id: &str, message: VmValue) -> Result<(), String> {
846 let Some(msg_dict) = message.as_dict().cloned() else {
847 return Err("agent_session_inject: message must be a dict".into());
848 };
849 let role_ok = matches!(msg_dict.get("role"), Some(VmValue::String(_)));
850 if !role_ok {
851 return Err(
852 "agent_session_inject: message must have a string `role` (user|assistant|tool_result|system)"
853 .into(),
854 );
855 }
856 SESSIONS.with(|s| {
857 let mut map = s.borrow_mut();
858 let Some(state) = map.get_mut(id) else {
859 return Err(format!("agent_session_inject: unknown session id '{id}'"));
860 };
861 let dict = state
862 .transcript
863 .as_dict()
864 .cloned()
865 .unwrap_or_else(BTreeMap::new);
866 let mut messages: Vec<VmValue> = match dict.get("messages") {
867 Some(VmValue::List(list)) => list.iter().cloned().collect(),
868 _ => Vec::new(),
869 };
870 let mut events: Vec<VmValue> = match dict.get("events") {
871 Some(VmValue::List(list)) => list.iter().cloned().collect(),
872 _ => crate::llm::helpers::transcript_events_from_messages(&messages),
873 };
874 let new_message = VmValue::Dict(Rc::new(msg_dict));
875 let message_index = messages.len();
876 events.push(crate::llm::helpers::transcript_event_from_message(
877 &new_message,
878 ));
879 messages.push(new_message);
880 let mut next = dict;
881 next.insert("events".to_string(), VmValue::List(Rc::new(events)));
882 next.insert("messages".to_string(), VmValue::List(Rc::new(messages)));
883 let persisted_message = next
884 .get("messages")
885 .and_then(|value| match value {
886 VmValue::List(list) => list.get(message_index).cloned(),
887 _ => None,
888 })
889 .unwrap_or(VmValue::Nil);
890 apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), "inject_message")?;
891 emit_identified_user_message_event(id, &persisted_message);
892 emit_llm_message_event(id, message_index, &persisted_message);
893 state.last_accessed = Instant::now();
894 Ok(())
895 })
896}
897
898fn emit_identified_user_message_event(session_id: &str, message: &VmValue) {
899 let message_json = crate::llm::helpers::vm_value_to_json(message);
900 let role = message_json.get("role").and_then(|value| value.as_str());
901 if role != Some("user") {
902 return;
903 }
904 let Some(message_id) = message_json
905 .get("messageId")
906 .or_else(|| message_json.get("message_id"))
907 .and_then(|value| value.as_str())
908 .filter(|value| !value.trim().is_empty())
909 else {
910 return;
911 };
912 let content = message_json
913 .get("content")
914 .map(user_message_content_blocks)
915 .unwrap_or_default();
916 crate::agent_events::emit_event(&crate::agent_events::AgentEvent::UserMessage {
917 session_id: session_id.to_string(),
918 message_id: message_id.to_string(),
919 content,
920 });
921}
922
923fn user_message_content_blocks(content: &serde_json::Value) -> Vec<serde_json::Value> {
924 match content {
925 serde_json::Value::Array(items) => items.clone(),
926 serde_json::Value::String(text) => vec![serde_json::json!({
927 "type": "text",
928 "text": text,
929 })],
930 other => vec![serde_json::json!({
931 "type": "text",
932 "text": other.to_string(),
933 })],
934 }
935}
936
937#[derive(Clone, Debug, PartialEq, Eq)]
938struct TranscriptBudgetUsage {
939 message_count: usize,
940 event_count: usize,
941 approx_bytes: Option<usize>,
942}
943
944fn transcript_messages_from_dict(dict: &BTreeMap<String, VmValue>) -> Vec<VmValue> {
945 match dict.get("messages") {
946 Some(VmValue::List(list)) => list.iter().cloned().collect(),
947 _ => Vec::new(),
948 }
949}
950
951fn transcript_events_from_dict(dict: &BTreeMap<String, VmValue>) -> Vec<VmValue> {
952 match dict.get("events") {
953 Some(VmValue::List(list)) => list.iter().cloned().collect(),
954 _ => {
955 let messages = transcript_messages_from_dict(dict);
956 crate::llm::helpers::transcript_events_from_messages(&messages)
957 }
958 }
959}
960
961fn transcript_usage(transcript: &VmValue, include_bytes: bool) -> TranscriptBudgetUsage {
962 let Some(dict) = transcript.as_dict() else {
963 return TranscriptBudgetUsage {
964 message_count: 0,
965 event_count: 0,
966 approx_bytes: include_bytes.then_some(0),
967 };
968 };
969 let approx_bytes = if include_bytes {
970 serde_json::to_vec(&crate::llm::helpers::vm_value_to_json(transcript))
971 .map(|bytes| bytes.len())
972 .ok()
973 .or(Some(usize::MAX))
974 } else {
975 None
976 };
977 TranscriptBudgetUsage {
978 message_count: transcript_messages_from_dict(dict).len(),
979 event_count: transcript_events_from_dict(dict).len(),
980 approx_bytes,
981 }
982}
983
984fn transcript_budget_exceeded_reason(
985 usage: &TranscriptBudgetUsage,
986 policy: &SessionTranscriptBudgetPolicy,
987) -> Option<&'static str> {
988 if usage.message_count > policy.max_messages {
989 return Some("message_count");
990 }
991 if usage.event_count > policy.max_events {
992 return Some("event_count");
993 }
994 if let (Some(bytes), Some(limit)) = (usage.approx_bytes, policy.max_approx_bytes) {
995 if bytes > limit {
996 return Some("approx_bytes");
997 }
998 }
999 None
1000}
1001
1002fn transcript_budget_usage_json(usage: &TranscriptBudgetUsage) -> serde_json::Value {
1003 serde_json::json!({
1004 "messages": usage.message_count,
1005 "events": usage.event_count,
1006 "approx_bytes": usage.approx_bytes,
1007 })
1008}
1009
1010fn transcript_budget_policy_json(policy: &SessionTranscriptBudgetPolicy) -> serde_json::Value {
1011 let recovery = match &policy.recovery {
1012 TranscriptBudgetRecovery::Reject => serde_json::json!({"action": "reject"}),
1013 TranscriptBudgetRecovery::Trim { keep_last } => {
1014 serde_json::json!({"action": "trim", "keep_last": keep_last})
1015 }
1016 TranscriptBudgetRecovery::Compact { keep_last } => {
1017 serde_json::json!({"action": "compact", "keep_last": keep_last})
1018 }
1019 };
1020 serde_json::json!({
1021 "max_messages": policy.max_messages,
1022 "max_events": policy.max_events,
1023 "max_approx_bytes": policy.max_approx_bytes,
1024 "recovery": recovery,
1025 })
1026}
1027
1028fn transcript_budget_recovery_name(recovery: &TranscriptBudgetRecovery) -> &'static str {
1029 match recovery {
1030 TranscriptBudgetRecovery::Reject => "reject",
1031 TranscriptBudgetRecovery::Trim { .. } => "trim",
1032 TranscriptBudgetRecovery::Compact { .. } => "compact",
1033 }
1034}
1035
1036fn transcript_budget_error(
1037 state: &SessionState,
1038 policy: &SessionTranscriptBudgetPolicy,
1039 usage: &TranscriptBudgetUsage,
1040 reason: &str,
1041) -> String {
1042 let byte_suffix = match (usage.approx_bytes, policy.max_approx_bytes) {
1043 (Some(bytes), Some(limit)) => format!(", approx_bytes {bytes}/{limit}"),
1044 _ => String::new(),
1045 };
1046 format!(
1047 "transcript budget exceeded for session '{}': {reason} (messages {}/{}, events {}/{}{}; recovery={})",
1048 state.id,
1049 usage.message_count,
1050 policy.max_messages,
1051 usage.event_count,
1052 policy.max_events,
1053 byte_suffix,
1054 transcript_budget_recovery_name(&policy.recovery),
1055 )
1056}
1057
1058fn transcript_budget_audit_json(
1059 action: &str,
1060 source: &str,
1061 reason: &str,
1062 policy: &SessionTranscriptBudgetPolicy,
1063 usage_before: &TranscriptBudgetUsage,
1064 usage_attempted: &TranscriptBudgetUsage,
1065 usage_after: &TranscriptBudgetUsage,
1066) -> serde_json::Value {
1067 serde_json::json!({
1068 "action": action,
1069 "source": source,
1070 "reason": reason,
1071 "policy": transcript_budget_policy_json(policy),
1072 "usage_before": transcript_budget_usage_json(usage_before),
1073 "usage_attempted": transcript_budget_usage_json(usage_attempted),
1074 "usage_after": transcript_budget_usage_json(usage_after),
1075 "removed_messages": usage_attempted.message_count.saturating_sub(usage_after.message_count),
1076 "removed_events": usage_attempted.event_count.saturating_sub(usage_after.event_count),
1077 })
1078}
1079
1080fn transcript_budget_event(audit: &serde_json::Value) -> VmValue {
1081 let action = audit
1082 .get("action")
1083 .and_then(serde_json::Value::as_str)
1084 .unwrap_or("enforced");
1085 crate::llm::helpers::transcript_event(
1086 "transcript_budget",
1087 "system",
1088 "internal",
1089 &format!("Transcript budget {action}."),
1090 Some(audit.clone()),
1091 )
1092}
1093
1094fn append_event_to_transcript(transcript: VmValue, event: VmValue) -> VmValue {
1095 let Some(dict) = transcript.as_dict() else {
1096 return transcript;
1097 };
1098 let mut next = dict.clone();
1099 let mut events = transcript_events_from_dict(&next);
1100 events.push(event);
1101 next.insert("events".to_string(), VmValue::List(Rc::new(events)));
1102 VmValue::Dict(Rc::new(next))
1103}
1104
1105fn tail_message_capacity(
1106 policy: &SessionTranscriptBudgetPolicy,
1107 reserve_audit_event: bool,
1108) -> usize {
1109 let event_capacity = tail_event_capacity(policy, usize::from(reserve_audit_event));
1110 policy.max_messages.min(event_capacity)
1111}
1112
1113fn tail_event_capacity(policy: &SessionTranscriptBudgetPolicy, reserved_events: usize) -> usize {
1114 policy.max_events.saturating_sub(reserved_events)
1115}
1116
1117fn trim_transcript_for_budget(
1118 transcript: &VmValue,
1119 policy: &SessionTranscriptBudgetPolicy,
1120 keep_last: usize,
1121) -> VmValue {
1122 let dict = transcript.as_dict().cloned().unwrap_or_else(BTreeMap::new);
1123 let messages = transcript_messages_from_dict(&dict);
1124 let keep = keep_last.min(tail_message_capacity(policy, true));
1125 let start = messages.len().saturating_sub(keep);
1126 let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
1127 let mut next = dict;
1128 next.insert(
1129 "events".to_string(),
1130 VmValue::List(Rc::new(
1131 crate::llm::helpers::transcript_events_from_messages(&retained),
1132 )),
1133 );
1134 next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
1135 next.remove("summary");
1136 VmValue::Dict(Rc::new(next))
1137}
1138
1139struct BudgetCompactionLiveEvent {
1140 policy: crate::orchestration::CompactionPolicy,
1141 policy_strategy: String,
1142 metrics: crate::orchestration::TranscriptCompactedEventMetrics,
1143}
1144
1145struct BudgetCompactionResult {
1146 transcript: VmValue,
1147 live_event: Option<BudgetCompactionLiveEvent>,
1148}
1149
1150fn compact_transcript_for_budget(
1151 transcript: &VmValue,
1152 policy: &SessionTranscriptBudgetPolicy,
1153 keep_last: usize,
1154 session_id: &str,
1155) -> BudgetCompactionResult {
1156 let dict = transcript.as_dict().cloned().unwrap_or_else(BTreeMap::new);
1157 let messages = transcript_messages_from_dict(&dict);
1158 let message_capacity = policy.max_messages.min(tail_event_capacity(policy, 2));
1159 let tail_keep = keep_last.min(message_capacity.saturating_sub(2));
1162 let mut config = crate::orchestration::AutoCompactConfig {
1163 token_threshold: 0,
1164 keep_last: tail_keep,
1165 compact_strategy: crate::orchestration::CompactStrategy::Llm,
1166 hard_limit_strategy: crate::orchestration::CompactStrategy::Truncate,
1167 fallback_strategy: Some(crate::orchestration::CompactStrategy::Truncate),
1168 policy_strategy: crate::orchestration::compact_strategy_name(
1169 &crate::orchestration::CompactStrategy::Llm,
1170 )
1171 .to_string(),
1172 ..Default::default()
1173 };
1174
1175 let mut json_messages = messages
1176 .iter()
1177 .map(crate::llm::helpers::vm_value_to_json)
1178 .collect::<Vec<_>>();
1179 let lifecycle =
1180 crate::orchestration::CompactLifecycle::new(crate::orchestration::CompactMode::Auto)
1181 .with_session_id(Some(session_id))
1182 .with_trigger(crate::orchestration::CompactionTrigger::BudgetPressure)
1183 .with_hook_dispatch(false)
1184 .with_evaluate_providers(false);
1185 let llm_opts = crate::llm::extract_llm_options(&[
1186 VmValue::String(Rc::from("")),
1187 VmValue::Nil,
1188 VmValue::Nil,
1189 ])
1190 .ok();
1191 let outcome = futures::executor::block_on(crate::orchestration::run_compaction_lifecycle(
1192 &mut json_messages,
1193 &mut config,
1194 llm_opts.as_ref(),
1195 lifecycle,
1196 ))
1197 .ok()
1198 .flatten();
1199
1200 let retained = json_messages
1201 .iter()
1202 .map(crate::stdlib::json_to_vm_value)
1203 .collect::<Vec<_>>();
1204 let mut events = crate::llm::helpers::transcript_events_from_messages(&retained);
1205 let summary = outcome.as_ref().map(|outcome| outcome.summary.clone());
1206 let mut live_event = None;
1207 if let Some(outcome) = outcome {
1208 events.push(crate::llm::helpers::transcript_event(
1209 "compaction",
1210 "system",
1211 "internal",
1212 "",
1213 Some(outcome.event_metadata.clone()),
1214 ));
1215 live_event = Some(BudgetCompactionLiveEvent {
1216 policy: config.policy.clone(),
1217 policy_strategy: outcome.policy_strategy,
1218 metrics: crate::orchestration::TranscriptCompactedEventMetrics {
1219 archived_messages: outcome.archived_messages,
1220 estimated_tokens_before: outcome.estimated_tokens_before,
1221 estimated_tokens_after: outcome.estimated_tokens_after,
1222 snapshot_asset_id: outcome.snapshot_asset_id,
1223 },
1224 });
1225 }
1226
1227 let mut next = dict;
1228 next.insert("events".to_string(), VmValue::List(Rc::new(events)));
1229 next.insert("messages".to_string(), VmValue::List(Rc::new(retained)));
1230 if let Some(summary) = summary {
1231 next.insert("summary".to_string(), VmValue::String(Rc::from(summary)));
1232 } else {
1233 next.remove("summary");
1234 }
1235 BudgetCompactionResult {
1236 transcript: VmValue::Dict(Rc::new(next)),
1237 live_event,
1238 }
1239}
1240
1241fn recovered_transcript_with_audit(
1242 recovered: VmValue,
1243 action: &str,
1244 source: &str,
1245 reason: &str,
1246 policy: &SessionTranscriptBudgetPolicy,
1247 usage_before: &TranscriptBudgetUsage,
1248 usage_attempted: &TranscriptBudgetUsage,
1249 include_bytes: bool,
1250) -> (VmValue, serde_json::Value, TranscriptBudgetUsage) {
1251 let usage_after_without_audit = transcript_usage(&recovered, include_bytes);
1252 let initial_audit = transcript_budget_audit_json(
1253 action,
1254 source,
1255 reason,
1256 policy,
1257 usage_before,
1258 usage_attempted,
1259 &usage_after_without_audit,
1260 );
1261 let with_initial_audit =
1262 append_event_to_transcript(recovered.clone(), transcript_budget_event(&initial_audit));
1263 let usage_after = transcript_usage(&with_initial_audit, include_bytes);
1264 let audit = transcript_budget_audit_json(
1265 action,
1266 source,
1267 reason,
1268 policy,
1269 usage_before,
1270 usage_attempted,
1271 &usage_after,
1272 );
1273 let with_audit = append_event_to_transcript(recovered, transcript_budget_event(&audit));
1274 let usage_after = transcript_usage(&with_audit, include_bytes);
1275 (with_audit, audit, usage_after)
1276}
1277
1278fn apply_transcript_with_budget(
1279 state: &mut SessionState,
1280 candidate: VmValue,
1281 source: &str,
1282) -> Result<(), String> {
1283 let policy = state.transcript_budget_policy.normalized();
1284 let include_bytes = policy.max_approx_bytes.is_some();
1285 let usage_before = transcript_usage(&state.transcript, include_bytes);
1286 let usage_attempted = transcript_usage(&candidate, include_bytes);
1287 let Some(reason) = transcript_budget_exceeded_reason(&usage_attempted, &policy) else {
1288 state.transcript = candidate;
1289 return Ok(());
1290 };
1291
1292 match policy.recovery.clone() {
1293 TranscriptBudgetRecovery::Reject => {
1294 let audit = transcript_budget_audit_json(
1295 "rejected",
1296 source,
1297 reason,
1298 &policy,
1299 &usage_before,
1300 &usage_attempted,
1301 &usage_before,
1302 );
1303 state.last_transcript_budget_action = Some(audit);
1304 Err(transcript_budget_error(
1305 state,
1306 &policy,
1307 &usage_attempted,
1308 reason,
1309 ))
1310 }
1311 TranscriptBudgetRecovery::Trim { keep_last } => {
1312 let recovered = trim_transcript_for_budget(&candidate, &policy, keep_last);
1313 let (with_audit, audit, usage_after) = recovered_transcript_with_audit(
1314 recovered,
1315 "trimmed",
1316 source,
1317 reason,
1318 &policy,
1319 &usage_before,
1320 &usage_attempted,
1321 include_bytes,
1322 );
1323 if transcript_budget_exceeded_reason(&usage_after, &policy).is_some() {
1324 let rejected = transcript_budget_audit_json(
1325 "rejected",
1326 source,
1327 reason,
1328 &policy,
1329 &usage_before,
1330 &usage_attempted,
1331 &usage_after,
1332 );
1333 state.last_transcript_budget_action = Some(rejected);
1334 return Err(transcript_budget_error(
1335 state,
1336 &policy,
1337 &usage_after,
1338 reason,
1339 ));
1340 }
1341 state.last_transcript_budget_action = Some(audit);
1342 state.transcript = with_audit;
1343 Ok(())
1344 }
1345 TranscriptBudgetRecovery::Compact { keep_last } => {
1346 let compacted =
1347 compact_transcript_for_budget(&candidate, &policy, keep_last, &state.id);
1348 let (with_audit, audit, usage_after) = recovered_transcript_with_audit(
1349 compacted.transcript,
1350 "compacted",
1351 source,
1352 reason,
1353 &policy,
1354 &usage_before,
1355 &usage_attempted,
1356 include_bytes,
1357 );
1358 if transcript_budget_exceeded_reason(&usage_after, &policy).is_some() {
1359 let rejected = transcript_budget_audit_json(
1360 "rejected",
1361 source,
1362 reason,
1363 &policy,
1364 &usage_before,
1365 &usage_attempted,
1366 &usage_after,
1367 );
1368 state.last_transcript_budget_action = Some(rejected);
1369 return Err(transcript_budget_error(
1370 state,
1371 &policy,
1372 &usage_after,
1373 reason,
1374 ));
1375 }
1376 state.last_transcript_budget_action = Some(audit);
1377 state.transcript = with_audit;
1378 if let Some(event) = compacted.live_event {
1379 crate::orchestration::emit_transcript_compacted_event_sync(
1380 &state.id,
1381 crate::orchestration::CompactMode::Auto,
1382 crate::orchestration::CompactionTrigger::BudgetPressure
1383 .as_str()
1384 .to_string(),
1385 &event.policy,
1386 event.policy_strategy,
1387 event.metrics,
1388 );
1389 }
1390 Ok(())
1391 }
1392 }
1393}
1394
1395fn emit_llm_message_event(session_id: &str, message_index: usize, message: &VmValue) {
1396 let mut fields = serde_json::Map::new();
1397 fields.insert(
1398 "session_id".to_string(),
1399 serde_json::Value::String(session_id.to_string()),
1400 );
1401 fields.insert(
1402 "message_index".to_string(),
1403 serde_json::json!(message_index),
1404 );
1405 let message_json = crate::llm::helpers::vm_value_to_json(message);
1406 if let Some(role) = message_json.get("role").and_then(|value| value.as_str()) {
1407 fields.insert(
1408 "role".to_string(),
1409 serde_json::Value::String(role.to_string()),
1410 );
1411 }
1412 if let Some(content) = message_json.get("content") {
1413 fields.insert("content".to_string(), content.clone());
1414 }
1415 fields.insert("message".to_string(), message_json);
1416 crate::llm::append_observability_sidecar_entry("message", fields);
1417}
1418
1419pub fn seed_from_messages(
1425 id: Option<String>,
1426 messages: &[serde_json::Value],
1427 metadata: serde_json::Value,
1428 system_prompt: Option<String>,
1429 tool_format: Option<String>,
1430) -> Result<String, String> {
1431 let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
1432 if exists(&resolved) {
1433 return Err(format!("agent session '{resolved}' already exists"));
1434 }
1435 open_or_create(Some(resolved.clone()));
1436 SESSIONS.with(|s| {
1437 let mut map = s.borrow_mut();
1438 let Some(state) = map.get_mut(&resolved) else {
1439 return Err(format!("failed to create agent session '{resolved}'"));
1440 };
1441 state.tool_format = tool_format.filter(|value| !value.trim().is_empty());
1442 state.system_prompt = system_prompt.filter(|value| !value.trim().is_empty());
1443
1444 let mut metadata = metadata
1445 .as_object()
1446 .cloned()
1447 .unwrap_or_else(serde_json::Map::new);
1448 if let Some(tool_format) = state.tool_format.as_ref() {
1449 metadata.insert(
1450 "tool_format".to_string(),
1451 serde_json::Value::String(tool_format.clone()),
1452 );
1453 metadata.insert(
1454 "tool_mode_locked".to_string(),
1455 serde_json::Value::Bool(true),
1456 );
1457 }
1458 if let Some(system_prompt) = state.system_prompt.as_ref() {
1459 metadata.insert(
1460 "system_prompt".to_string(),
1461 crate::llm::helpers::system_prompt_metadata(system_prompt),
1462 );
1463 }
1464 let vm_messages = crate::llm::helpers::json_messages_to_vm(messages);
1465 let candidate = crate::llm::helpers::new_transcript_with(
1466 Some(resolved.clone()),
1467 vm_messages,
1468 None,
1469 Some(crate::stdlib::json_to_vm_value(&serde_json::Value::Object(
1470 metadata,
1471 ))),
1472 );
1473 apply_transcript_with_budget(state, candidate, "seed_from_messages")?;
1474 state.last_accessed = Instant::now();
1475 Ok(resolved)
1476 })
1477}
1478
1479pub fn messages_json(id: &str) -> Vec<serde_json::Value> {
1483 SESSIONS.with(|s| {
1484 let map = s.borrow();
1485 let Some(state) = map.get(id) else {
1486 return Vec::new();
1487 };
1488 let Some(dict) = state.transcript.as_dict() else {
1489 return Vec::new();
1490 };
1491 match dict.get("messages") {
1492 Some(VmValue::List(list)) => list
1493 .iter()
1494 .map(crate::llm::helpers::vm_value_to_json)
1495 .collect(),
1496 _ => Vec::new(),
1497 }
1498 })
1499}
1500
1501#[derive(Clone, Debug, Default)]
1502pub struct SessionPromptState {
1503 pub messages: Vec<serde_json::Value>,
1504 pub summary: Option<String>,
1505}
1506
1507fn summary_message_json(summary: &str) -> serde_json::Value {
1508 serde_json::json!({
1509 "role": "user",
1510 "content": summary,
1511 })
1512}
1513
1514fn messages_begin_with_summary(messages: &[serde_json::Value], summary: &str) -> bool {
1515 messages.first().is_some_and(|message| {
1516 message.get("role").and_then(|value| value.as_str()) == Some("user")
1517 && message.get("content").and_then(|value| value.as_str()) == Some(summary)
1518 })
1519}
1520
1521pub fn prompt_state_json(id: &str) -> SessionPromptState {
1529 SESSIONS.with(|s| {
1530 let map = s.borrow();
1531 let Some(state) = map.get(id) else {
1532 return SessionPromptState::default();
1533 };
1534 let Some(dict) = state.transcript.as_dict() else {
1535 return SessionPromptState::default();
1536 };
1537 let mut messages = match dict.get("messages") {
1538 Some(VmValue::List(list)) => list
1539 .iter()
1540 .map(crate::llm::helpers::vm_value_to_json)
1541 .collect::<Vec<_>>(),
1542 _ => Vec::new(),
1543 };
1544 let summary = dict.get("summary").and_then(|value| match value {
1545 VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
1546 _ => None,
1547 });
1548 if let Some(summary_text) = summary.as_deref() {
1549 if !messages_begin_with_summary(&messages, summary_text) {
1550 messages.insert(0, summary_message_json(summary_text));
1551 }
1552 }
1553 SessionPromptState { messages, summary }
1554 })
1555}
1556
1557pub fn store_transcript(id: &str, transcript: VmValue) -> Result<(), String> {
1560 SESSIONS.with(|s| {
1561 let mut map = s.borrow_mut();
1562 let Some(state) = map.get_mut(id) else {
1563 return Err(format!(
1564 "agent_session_store_transcript: unknown session id '{id}'"
1565 ));
1566 };
1567 let transcript = transcript_with_session_metadata(transcript, state);
1568 apply_transcript_with_budget(state, transcript, "store_transcript")?;
1569 state.last_accessed = Instant::now();
1570 Ok(())
1571 })
1572}
1573
1574pub fn prune_invalid_reminder_events(id: &str) -> usize {
1578 SESSIONS.with(|s| {
1579 let mut map = s.borrow_mut();
1580 let Some(state) = map.get_mut(id) else {
1581 return 0;
1582 };
1583 let Some(dict) = state.transcript.as_dict().cloned() else {
1584 return 0;
1585 };
1586 let Some(VmValue::List(events)) = dict.get("events") else {
1587 return 0;
1588 };
1589 let mut pruned = 0_usize;
1590 let mut kept = Vec::with_capacity(events.len());
1591 for event in events.iter().cloned() {
1592 let is_reminder = event
1593 .as_dict()
1594 .and_then(|event| event.get("kind"))
1595 .map(VmValue::display)
1596 .as_deref()
1597 == Some(crate::llm::helpers::SYSTEM_REMINDER_EVENT_KIND);
1598 if !is_reminder {
1599 kept.push(event);
1600 continue;
1601 }
1602 let valid = crate::llm::helpers::reminder_from_event(&event)
1603 .is_some_and(|reminder| !reminder.body.trim().is_empty());
1604 if valid {
1605 kept.push(event);
1606 } else {
1607 pruned += 1;
1608 }
1609 }
1610 if pruned > 0 {
1611 let mut next = dict;
1612 next.insert("events".to_string(), VmValue::List(Rc::new(kept)));
1613 let _ = apply_transcript_with_budget(
1614 state,
1615 VmValue::Dict(Rc::new(next)),
1616 "prune_invalid_reminder_events",
1617 );
1618 state.last_accessed = Instant::now();
1619 }
1620 pruned
1621 })
1622}
1623
1624pub fn apply_reminder_post_turn(id: &str, turn: i64) -> Result<serde_json::Value, String> {
1629 let report = SESSIONS.with(|s| {
1630 let mut map = s.borrow_mut();
1631 let Some(state) = map.get_mut(id) else {
1632 return Err(format!(
1633 "agent_session_apply_reminder_post_turn: unknown session id '{id}'"
1634 ));
1635 };
1636 let report = crate::llm::helpers::apply_reminder_post_turn(&state.transcript, turn);
1637 if report.decremented_count > 0 || !report.expired.is_empty() {
1638 if let Some(next) = report.transcript.clone() {
1639 apply_transcript_with_budget(state, next, "apply_reminder_post_turn")?;
1640 }
1641 state.last_accessed = Instant::now();
1642 }
1643 Ok(report)
1644 })?;
1645
1646 for reminder in &report.expired {
1647 let mut payload = crate::llm::helpers::reminder_lifecycle_payload(Some(id), reminder);
1648 if let Some(obj) = payload.as_object_mut() {
1649 obj.insert(
1650 "transcript_id".to_string(),
1651 serde_json::Value::String(id.to_string()),
1652 );
1653 obj.insert(
1654 "reason".to_string(),
1655 serde_json::Value::String("ttl".to_string()),
1656 );
1657 obj.insert(
1658 "ttl_turns_before".to_string(),
1659 serde_json::json!(&reminder.ttl_turns),
1660 );
1661 obj.insert("expired_at_turn".to_string(), serde_json::json!(turn));
1662 }
1663 crate::llm::helpers::emit_reminder_lifecycle_event(
1664 crate::llm::helpers::REMINDER_EXPIRED_EVENT_KIND,
1665 payload,
1666 );
1667 }
1668
1669 Ok(serde_json::json!({
1670 "expired_count": report.expired.len(),
1671 "decremented_count": report.decremented_count,
1672 "remaining_count": report.remaining_count,
1673 }))
1674}
1675
1676pub fn inject_reminder(
1681 id: &str,
1682 reminder: crate::llm::helpers::SystemReminder,
1683) -> Result<ReminderInjectionReport, String> {
1684 let reminder_id = reminder.id.clone();
1685 let dedupe_key = reminder.dedupe_key.clone();
1686 let mut deduped_reminder_ids = Vec::new();
1687 SESSIONS.with(|s| {
1688 let mut map = s.borrow_mut();
1689 let Some(state) = map.get_mut(id) else {
1690 return Err(format!(
1691 "agent_session_inject_reminder: unknown session id '{id}'"
1692 ));
1693 };
1694 let dict = state
1695 .transcript
1696 .as_dict()
1697 .cloned()
1698 .unwrap_or_else(BTreeMap::new);
1699 let mut events: Vec<VmValue> = match dict.get("events") {
1700 Some(VmValue::List(list)) => list.iter().cloned().collect(),
1701 _ => dict
1702 .get("messages")
1703 .and_then(|value| match value {
1704 VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
1705 _ => None,
1706 })
1707 .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
1708 .unwrap_or_default(),
1709 };
1710 if let Some(expected_key) = dedupe_key.as_deref() {
1711 events.retain(|event| {
1712 let Some(existing) = crate::llm::helpers::reminder_from_event(event) else {
1713 return true;
1714 };
1715 if existing.dedupe_key.as_deref() == Some(expected_key) {
1716 deduped_reminder_ids.push(existing.id);
1717 false
1718 } else {
1719 true
1720 }
1721 });
1722 }
1723 events.push(crate::llm::helpers::transcript_reminder_event(&reminder));
1724 let mut next = dict;
1725 next.insert("events".to_string(), VmValue::List(Rc::new(events)));
1726 apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), "inject_reminder")?;
1727 state.last_accessed = Instant::now();
1728 Ok(())
1729 })?;
1730
1731 if !deduped_reminder_ids.is_empty() {
1732 let dropped_count = deduped_reminder_ids.len();
1733 crate::llm::helpers::emit_reminder_lifecycle_event(
1734 crate::llm::helpers::REMINDER_DEDUPED_EVENT_KIND,
1735 serde_json::json!({
1736 "session_id": id,
1737 "transcript_id": id,
1738 "reminder_id": &reminder_id,
1739 "replacing_id": &reminder_id,
1740 "replaced_id": deduped_reminder_ids.first(),
1741 "replaced_ids": &deduped_reminder_ids,
1742 "dedupe_key": &dedupe_key,
1743 "dropped_reminder_ids": &deduped_reminder_ids,
1744 "dropped_count": dropped_count,
1745 }),
1746 );
1747 }
1748
1749 crate::llm::helpers::emit_reminder_lifecycle_event(
1750 crate::llm::helpers::REMINDER_INJECTED_EVENT_KIND,
1751 crate::llm::helpers::reminder_lifecycle_payload(Some(id), &reminder),
1752 );
1753
1754 Ok(ReminderInjectionReport {
1755 reminder_id,
1756 deduped_count: deduped_reminder_ids.len(),
1757 })
1758}
1759
1760pub fn append_event(id: &str, event: VmValue) -> Result<(), String> {
1766 let Some(event_dict) = event.as_dict() else {
1767 return Err("agent_session_append_event: event must be a dict".into());
1768 };
1769 let kind_ok = matches!(event_dict.get("kind"), Some(VmValue::String(_)));
1770 if !kind_ok {
1771 return Err("agent_session_append_event: event must have a string `kind`".into());
1772 }
1773 SESSIONS.with(|s| {
1774 let mut map = s.borrow_mut();
1775 let Some(state) = map.get_mut(id) else {
1776 return Err(format!(
1777 "agent_session_append_event: unknown session id '{id}'"
1778 ));
1779 };
1780 append_event_to_state(state, event, "append_event")?;
1781 state.last_accessed = Instant::now();
1782 Ok(())
1783 })
1784}
1785
1786fn append_event_to_state(
1787 state: &mut SessionState,
1788 event: VmValue,
1789 action: &str,
1790) -> Result<(), String> {
1791 let dict = state
1792 .transcript
1793 .as_dict()
1794 .cloned()
1795 .unwrap_or_else(BTreeMap::new);
1796 let mut events: Vec<VmValue> = match dict.get("events") {
1797 Some(VmValue::List(list)) => list.iter().cloned().collect(),
1798 _ => dict
1799 .get("messages")
1800 .and_then(|value| match value {
1801 VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
1802 _ => None,
1803 })
1804 .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
1805 .unwrap_or_default(),
1806 };
1807 events.push(event);
1808 let mut next = dict;
1809 next.insert("events".to_string(), VmValue::List(Rc::new(events)));
1810 apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), action)
1811}
1812
1813pub fn replace_messages(id: &str, messages: &[serde_json::Value]) -> Result<(), String> {
1816 replace_messages_with_summary(id, messages, None)
1817}
1818
1819pub fn replace_messages_with_summary(
1824 id: &str,
1825 messages: &[serde_json::Value],
1826 summary: Option<&str>,
1827) -> Result<(), String> {
1828 SESSIONS.with(|s| {
1829 let mut map = s.borrow_mut();
1830 let Some(state) = map.get_mut(id) else {
1831 return Err(format!(
1832 "agent_session_replace_messages: unknown session id '{id}'"
1833 ));
1834 };
1835 let dict = state
1836 .transcript
1837 .as_dict()
1838 .cloned()
1839 .unwrap_or_else(BTreeMap::new);
1840 let vm_messages: Vec<VmValue> = messages
1841 .iter()
1842 .map(crate::stdlib::json_to_vm_value)
1843 .collect();
1844 let mut next = dict;
1845 next.insert(
1846 "events".to_string(),
1847 VmValue::List(Rc::new(
1848 crate::llm::helpers::transcript_events_from_messages(&vm_messages),
1849 )),
1850 );
1851 next.insert("messages".to_string(), VmValue::List(Rc::new(vm_messages)));
1852 if let Some(summary) = summary {
1853 next.insert(
1854 "summary".to_string(),
1855 VmValue::String(Rc::from(summary.to_string())),
1856 );
1857 } else {
1858 next.remove("summary");
1859 }
1860 apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), "replace_messages")?;
1861 state.last_accessed = Instant::now();
1862 Ok(())
1863 })
1864}
1865
1866pub fn append_subscriber(id: &str, callback: VmValue) {
1867 open_or_create(Some(id.to_string()));
1868 SESSIONS.with(|s| {
1869 if let Some(state) = s.borrow_mut().get_mut(id) {
1870 state.subscribers.push(callback);
1871 state.last_accessed = Instant::now();
1872 }
1873 });
1874}
1875
1876pub fn subscribers_for(id: &str) -> Vec<VmValue> {
1877 SESSIONS.with(|s| {
1878 s.borrow()
1879 .get(id)
1880 .map(|state| state.subscribers.clone())
1881 .unwrap_or_default()
1882 })
1883}
1884
1885pub fn subscriber_count(id: &str) -> usize {
1886 SESSIONS.with(|s| {
1887 s.borrow()
1888 .get(id)
1889 .map(|state| state.subscribers.len())
1890 .unwrap_or(0)
1891 })
1892}
1893
1894pub fn set_active_skills(id: &str, skills: Vec<String>) {
1898 SESSIONS.with(|s| {
1899 if let Some(state) = s.borrow_mut().get_mut(id) {
1900 state.active_skills = skills;
1901 state.last_accessed = Instant::now();
1902 }
1903 });
1904}
1905
1906pub fn active_skills(id: &str) -> Vec<String> {
1910 SESSIONS.with(|s| {
1911 s.borrow()
1912 .get(id)
1913 .map(|state| state.active_skills.clone())
1914 .unwrap_or_default()
1915 })
1916}
1917
1918pub fn claim_tool_format(id: &str, tool_format: &str) -> Result<(), String> {
1924 let tool_format = tool_format.trim();
1925 if tool_format.is_empty() {
1926 return Ok(());
1927 }
1928 SESSIONS.with(|s| {
1929 let mut map = s.borrow_mut();
1930 let Some(state) = map.get_mut(id) else {
1931 return Err(format!("agent session '{id}' does not exist"));
1932 };
1933 match state.tool_format.as_deref() {
1934 Some(existing) if existing != tool_format => Err(format!(
1935 "agent session '{id}' is locked to tool_format='{existing}', but this run requested tool_format='{tool_format}'. Start a new session or fork/reset the transcript before changing tool mode."
1936 )),
1937 Some(_) => {
1938 state.last_accessed = Instant::now();
1939 Ok(())
1940 }
1941 None => {
1942 state.tool_format = Some(tool_format.to_string());
1943 state.last_accessed = Instant::now();
1944 Ok(())
1945 }
1946 }
1947 })
1948}
1949
1950pub fn tool_format(id: &str) -> Option<String> {
1951 SESSIONS.with(|s| {
1952 s.borrow()
1953 .get(id)
1954 .and_then(|state| state.tool_format.clone())
1955 })
1956}
1957
1958pub fn record_system_prompt(id: &str, system_prompt: &str) -> Result<(), String> {
1959 let system_prompt = system_prompt.trim();
1960 if system_prompt.is_empty() {
1961 return Ok(());
1962 }
1963 assert_cache_stable_system_prompt(system_prompt);
1964 SESSIONS.with(|s| {
1965 let mut map = s.borrow_mut();
1966 let Some(state) = map.get_mut(id) else {
1967 return Err(format!("agent session '{id}' does not exist"));
1968 };
1969 let changed = state.system_prompt.as_deref() != Some(system_prompt);
1970 state.system_prompt = Some(system_prompt.to_string());
1971 let dict = state
1972 .transcript
1973 .as_dict()
1974 .cloned()
1975 .unwrap_or_else(BTreeMap::new);
1976 let mut next = dict;
1977 apply_system_prompt_metadata(&mut next, system_prompt);
1978 if changed {
1979 let mut events: Vec<VmValue> = match next.get("events") {
1980 Some(VmValue::List(list)) => list.iter().cloned().collect(),
1981 _ => Vec::new(),
1982 };
1983 events.push(crate::llm::helpers::transcript_event(
1984 "system_prompt",
1985 "system",
1986 "internal",
1987 "",
1988 Some(crate::llm::helpers::system_prompt_event_metadata(
1989 system_prompt,
1990 )),
1991 ));
1992 next.insert("events".to_string(), VmValue::List(Rc::new(events)));
1993 }
1994 apply_transcript_with_budget(state, VmValue::Dict(Rc::new(next)), "record_system_prompt")?;
1995 state.last_accessed = Instant::now();
1996 Ok(())
1997 })
1998}
1999
2000pub fn system_prompt(id: &str) -> Option<String> {
2001 SESSIONS.with(|s| {
2002 s.borrow()
2003 .get(id)
2004 .and_then(|state| state.system_prompt.clone())
2005 })
2006}
2007
2008#[cfg(debug_assertions)]
2009fn forbidden_workspace_prompt_token(system_prompt: &str) -> Option<&'static str> {
2010 let mut remaining = system_prompt;
2011 while let Some(index) = remaining.find("{{") {
2012 let candidate = remaining[index + 2..].trim_start();
2013 if candidate.starts_with("workspace_") {
2014 return Some("workspace_");
2015 }
2016 if candidate.starts_with("project_") {
2017 return Some("project_");
2018 }
2019 remaining = candidate;
2020 }
2021 None
2022}
2023
2024#[cfg(debug_assertions)]
2025fn assert_cache_stable_system_prompt(system_prompt: &str) {
2026 if let Some(prefix) = forbidden_workspace_prompt_token(system_prompt) {
2027 panic!(
2028 "{CACHE_STABLE_SYSTEM_PROMPT_DIAGNOSTIC}: session system prompts must not interpolate `{{{{{prefix}...` tokens; move workspace/project context into the workspace-anchor reminder"
2029 );
2030 }
2031}
2032
2033#[cfg(not(debug_assertions))]
2034fn assert_cache_stable_system_prompt(_system_prompt: &str) {}
2035
2036pub fn set_pinned_model(id: &str, model: Option<String>) -> Result<bool, String> {
2041 let normalized = model
2042 .map(|value| value.trim().to_string())
2043 .filter(|value| !value.is_empty());
2044 SESSIONS.with(|s| {
2045 let mut map = s.borrow_mut();
2046 let Some(state) = map.get_mut(id) else {
2047 return Err(format!("agent session '{id}' does not exist"));
2048 };
2049 let changed = state.pinned_model != normalized;
2050 state.pinned_model = normalized;
2051 state.last_accessed = Instant::now();
2052 Ok(changed)
2053 })
2054}
2055
2056pub fn pinned_model(id: &str) -> Option<String> {
2060 SESSIONS.with(|s| {
2061 s.borrow()
2062 .get(id)
2063 .and_then(|state| state.pinned_model.clone())
2064 })
2065}
2066
2067pub fn set_pinned_reasoning_policy(id: &str, policy: Option<String>) -> Result<bool, String> {
2069 let normalized = match policy {
2070 Some(value) => crate::llm::reasoning_policy::normalize_policy_selector(&value)?,
2071 None => None,
2072 };
2073 SESSIONS.with(|s| {
2074 let mut map = s.borrow_mut();
2075 let Some(state) = map.get_mut(id) else {
2076 return Err(format!("agent session '{id}' does not exist"));
2077 };
2078 let changed = state.pinned_reasoning_policy != normalized;
2079 state.pinned_reasoning_policy = normalized;
2080 state.last_accessed = Instant::now();
2081 Ok(changed)
2082 })
2083}
2084
2085pub fn pinned_reasoning_policy(id: &str) -> Option<String> {
2087 SESSIONS.with(|s| {
2088 s.borrow()
2089 .get(id)
2090 .and_then(|state| state.pinned_reasoning_policy.clone())
2091 })
2092}
2093
2094pub fn set_workspace_anchor(id: &str, anchor: Option<WorkspaceAnchor>) -> Result<bool, String> {
2098 SESSIONS.with(|s| {
2099 let mut map = s.borrow_mut();
2100 let Some(state) = map.get_mut(id) else {
2101 return Err(format!("agent session '{id}' does not exist"));
2102 };
2103 let changed = state.workspace_anchor != anchor;
2104 state.workspace_anchor = anchor;
2105 if changed {
2106 crate::llm::permissions::clear_session_grants(id);
2107 }
2108 state.last_accessed = Instant::now();
2109 Ok(changed)
2110 })
2111}
2112
2113pub fn workspace_anchor(id: &str) -> Option<WorkspaceAnchor> {
2115 SESSIONS.with(|s| {
2116 s.borrow()
2117 .get(id)
2118 .and_then(|state| state.workspace_anchor.clone())
2119 })
2120}
2121
2122#[derive(Clone, Debug, PartialEq, Eq)]
2126pub struct ReanchorOutcome {
2127 pub previous: Option<WorkspaceAnchor>,
2128 pub current: WorkspaceAnchor,
2129 pub changed: bool,
2130}
2131
2132pub fn reanchor_session(
2137 id: &str,
2138 new_anchor: WorkspaceAnchor,
2139 carry_transcript: bool,
2140 compacted: bool,
2141 reason: Option<String>,
2142) -> Result<ReanchorOutcome, String> {
2143 let outcome = SESSIONS.with(|s| {
2144 let mut map = s.borrow_mut();
2145 let Some(state) = map.get_mut(id) else {
2146 return Err(format!("agent session '{id}' does not exist"));
2147 };
2148 let previous = state.workspace_anchor.clone();
2149 let changed = previous.as_ref() != Some(&new_anchor);
2150 state.workspace_anchor = Some(new_anchor.clone());
2151 if changed {
2152 crate::llm::permissions::clear_session_grants(id);
2153 }
2154 state.last_accessed = Instant::now();
2155 Ok(ReanchorOutcome {
2156 previous,
2157 current: new_anchor,
2158 changed,
2159 })
2160 })?;
2161 if !outcome.changed {
2162 return Ok(outcome);
2163 }
2164 let previous_json = outcome.previous.as_ref().map(WorkspaceAnchor::to_json);
2165 let current_json = outcome.current.to_json();
2166 let event_metadata = serde_json::json!({
2167 "previous": previous_json,
2168 "current": current_json,
2169 "carry_transcript": carry_transcript,
2170 "compacted": compacted,
2171 "reason": reason,
2172 });
2173 let event = crate::llm::helpers::transcript_event(
2174 "AnchorChanged",
2175 "system",
2176 "internal",
2177 "",
2178 Some(event_metadata),
2179 );
2180 let _ = append_event(id, event);
2181 crate::llm::emit_live_agent_event_sync(&crate::agent_events::AgentEvent::AnchorChanged {
2182 session_id: id.to_string(),
2183 previous: previous_json,
2184 current: current_json,
2185 carry_transcript,
2186 compacted,
2187 reason,
2188 });
2189 Ok(outcome)
2190}
2191
2192pub fn set_workspace_policy(id: &str, policy: WorkspacePolicy) -> Result<bool, String> {
2195 SESSIONS.with(|s| {
2196 let mut map = s.borrow_mut();
2197 let Some(state) = map.get_mut(id) else {
2198 return Err(format!("agent session '{id}' does not exist"));
2199 };
2200 let changed = state.workspace_policy != policy;
2201 state.workspace_policy = policy;
2202 state.last_accessed = Instant::now();
2203 Ok(changed)
2204 })
2205}
2206
2207pub fn workspace_policy(id: &str) -> Option<WorkspacePolicy> {
2209 SESSIONS.with(|s| {
2210 s.borrow()
2211 .get(id)
2212 .map(|state| state.workspace_policy.clone())
2213 })
2214}
2215
2216pub fn add_workspace_root(
2220 id: &str,
2221 root: &str,
2222 mount_mode: Option<MountMode>,
2223 reason: Option<String>,
2224) -> Result<String, String> {
2225 let normalized_root = validate_workspace_root_path(root)?;
2226 let mounted_at = crate::orchestration::now_rfc3339();
2227 SESSIONS.with(|s| {
2228 let mut map = s.borrow_mut();
2229 let Some(state) = map.get_mut(id) else {
2230 return Err(format!("agent session '{id}' does not exist"));
2231 };
2232 let default_mount_mode = state.workspace_policy.default_mount_mode;
2233 let Some(anchor) = state.workspace_anchor.as_mut() else {
2234 return Err(format!("agent session '{id}' has no workspace anchor"));
2235 };
2236 let resolved_mount_mode = mount_mode.unwrap_or(default_mount_mode);
2237 if let Some(existing) = anchor
2238 .additional_roots
2239 .iter_mut()
2240 .find(|entry| entry.path == normalized_root)
2241 {
2242 existing.mount_mode = resolved_mount_mode;
2243 existing.mounted_at = mounted_at.clone();
2244 } else {
2245 anchor.additional_roots.push(MountedRoot {
2246 path: normalized_root.clone(),
2247 mount_mode: resolved_mount_mode,
2248 mounted_at: mounted_at.clone(),
2249 });
2250 }
2251 let event = crate::llm::helpers::transcript_event(
2252 "RootMounted",
2253 "system",
2254 "internal",
2255 "",
2256 Some(serde_json::json!({
2257 "path": normalized_root.to_string_lossy(),
2258 "mount_mode": resolved_mount_mode.as_str(),
2259 "mounted_at": mounted_at.clone(),
2260 "reason": reason,
2261 })),
2262 );
2263 append_event_to_state(state, event, "add_workspace_root")?;
2264 crate::llm::permissions::clear_session_grants(id);
2265 state.last_accessed = Instant::now();
2266 Ok(mounted_at.clone())
2267 })
2268}
2269
2270pub fn remove_workspace_root(id: &str, root: &str) -> Result<bool, String> {
2273 let normalized_root = normalize_workspace_root_path(root);
2274 SESSIONS.with(|s| {
2275 let mut map = s.borrow_mut();
2276 let Some(state) = map.get_mut(id) else {
2277 return Err(format!("agent session '{id}' does not exist"));
2278 };
2279 let Some(anchor) = state.workspace_anchor.as_mut() else {
2280 return Err(format!("agent session '{id}' has no workspace anchor"));
2281 };
2282 let before = anchor.additional_roots.len();
2283 anchor
2284 .additional_roots
2285 .retain(|entry| entry.path != normalized_root);
2286 let removed = anchor.additional_roots.len() != before;
2287 if removed {
2288 crate::llm::permissions::clear_session_grants(id);
2289 }
2290 state.last_accessed = Instant::now();
2291 Ok(removed)
2292 })
2293}
2294
2295pub fn list_workspace_roots(id: &str) -> Result<(PathBuf, Vec<MountedRoot>), String> {
2296 SESSIONS.with(|s| {
2297 let map = s.borrow();
2298 let Some(state) = map.get(id) else {
2299 return Err(format!("agent session '{id}' does not exist"));
2300 };
2301 let Some(anchor) = state.workspace_anchor.as_ref() else {
2302 return Err(format!("agent session '{id}' has no workspace anchor"));
2303 };
2304 Ok((anchor.primary.clone(), anchor.additional_roots.clone()))
2305 })
2306}
2307
2308fn validate_workspace_root_path(root: &str) -> Result<PathBuf, String> {
2309 let normalized = normalize_workspace_root_path(root);
2310 let canonical = std::fs::canonicalize(&normalized)
2311 .map_err(|error| format!("workspace root '{root}' must exist and be readable: {error}"))?;
2312 let metadata = std::fs::metadata(&canonical)
2313 .map_err(|error| format!("workspace root '{root}' must exist and be readable: {error}"))?;
2314 if !metadata.is_dir() {
2315 return Err(format!("workspace root '{root}' must be a directory"));
2316 }
2317 std::fs::read_dir(&canonical)
2318 .map_err(|error| format!("workspace root '{root}' must be readable: {error}"))?;
2319 Ok(canonical)
2320}
2321
2322fn normalize_workspace_root_path(root: &str) -> PathBuf {
2323 let absolute = crate::stdlib::process::normalize_context_path(Path::new(root));
2324 std::fs::canonicalize(&absolute).unwrap_or(absolute)
2325}
2326
2327fn empty_transcript(id: &str) -> VmValue {
2328 use crate::llm::helpers::new_transcript_with;
2329 new_transcript_with(Some(id.to_string()), Vec::new(), None, None)
2330}
2331
2332fn clone_transcript_with_id(transcript: &VmValue, new_id: &str) -> VmValue {
2333 let Some(dict) = transcript.as_dict() else {
2334 return empty_transcript(new_id);
2335 };
2336 let mut next = dict.clone();
2337 next.insert(
2338 "id".to_string(),
2339 VmValue::String(Rc::from(new_id.to_string())),
2340 );
2341 VmValue::Dict(Rc::new(next))
2342}
2343
2344fn clone_transcript_with_parent(transcript: &VmValue, parent_id: &str) -> VmValue {
2345 let Some(dict) = transcript.as_dict() else {
2346 return transcript.clone();
2347 };
2348 let mut next = dict.clone();
2349 let metadata = match next.get("metadata") {
2350 Some(VmValue::Dict(metadata)) => {
2351 let mut metadata = metadata.as_ref().clone();
2352 metadata.insert(
2353 "parent_session_id".to_string(),
2354 VmValue::String(Rc::from(parent_id.to_string())),
2355 );
2356 VmValue::Dict(Rc::new(metadata))
2357 }
2358 _ => VmValue::Dict(Rc::new(BTreeMap::from([(
2359 "parent_session_id".to_string(),
2360 VmValue::String(Rc::from(parent_id.to_string())),
2361 )]))),
2362 };
2363 next.insert("metadata".to_string(), metadata);
2364 VmValue::Dict(Rc::new(next))
2365}
2366
2367fn apply_system_prompt_metadata(next: &mut BTreeMap<String, VmValue>, system_prompt: &str) {
2368 let mut metadata = match next.get("metadata") {
2369 Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
2370 _ => BTreeMap::new(),
2371 };
2372 metadata.insert(
2373 "system_prompt".to_string(),
2374 crate::stdlib::json_to_vm_value(&crate::llm::helpers::system_prompt_metadata(
2375 system_prompt,
2376 )),
2377 );
2378 next.insert("metadata".to_string(), VmValue::Dict(Rc::new(metadata)));
2379}
2380
2381fn transcript_with_session_metadata(transcript: VmValue, state: &SessionState) -> VmValue {
2382 let Some(dict) = transcript.as_dict() else {
2383 return transcript;
2384 };
2385 let mut next = dict.clone();
2386 let mut metadata = match next.get("metadata") {
2387 Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
2388 _ => BTreeMap::new(),
2389 };
2390 if let Some(tool_format) = state.tool_format.as_ref() {
2391 metadata.insert(
2392 "tool_format".to_string(),
2393 VmValue::String(Rc::from(tool_format.clone())),
2394 );
2395 metadata.insert("tool_mode_locked".to_string(), VmValue::Bool(true));
2396 }
2397 if let Some(system_prompt) = state.system_prompt.as_ref() {
2398 metadata.insert(
2399 "system_prompt".to_string(),
2400 crate::stdlib::json_to_vm_value(&crate::llm::helpers::system_prompt_metadata(
2401 system_prompt,
2402 )),
2403 );
2404 }
2405 if let Some(anchor) = state.workspace_anchor.as_ref() {
2406 metadata.insert(
2407 WORKSPACE_ANCHOR_METADATA_KEY.to_string(),
2408 anchor.to_vm_value(),
2409 );
2410 } else {
2411 metadata.remove(WORKSPACE_ANCHOR_METADATA_KEY);
2412 }
2413 if let Some(last_action) = state.last_transcript_budget_action.as_ref() {
2414 let usage = transcript_usage(
2415 &VmValue::Dict(Rc::new(next.clone())),
2416 state.transcript_budget_policy.max_approx_bytes.is_some(),
2417 );
2418 metadata.insert(
2419 "transcript_budget".to_string(),
2420 crate::stdlib::json_to_vm_value(&serde_json::json!({
2421 "policy": transcript_budget_policy_json(&state.transcript_budget_policy.normalized()),
2422 "usage": transcript_budget_usage_json(&usage),
2423 "last_action": last_action,
2424 })),
2425 );
2426 }
2427 if !metadata.is_empty() {
2428 next.insert("metadata".to_string(), VmValue::Dict(Rc::new(metadata)));
2429 }
2430 VmValue::Dict(Rc::new(next))
2431}
2432
2433fn session_snapshot(state: &SessionState) -> VmValue {
2434 let transcript = transcript_with_session_metadata(state.transcript.clone(), state);
2435 let Some(dict) = transcript.as_dict() else {
2436 return state.transcript.clone();
2437 };
2438 let mut next = dict.clone();
2439 let length = next
2440 .get("messages")
2441 .and_then(|value| match value {
2442 VmValue::List(list) => Some(list.len() as i64),
2443 _ => None,
2444 })
2445 .unwrap_or(0);
2446 next.insert("length".to_string(), VmValue::Int(length));
2447 next.insert(
2448 "created_at".to_string(),
2449 VmValue::String(Rc::from(state.created_at.clone())),
2450 );
2451 next.insert(
2452 "parent_id".to_string(),
2453 state
2454 .parent_id
2455 .as_ref()
2456 .map(|id| VmValue::String(Rc::from(id.clone())))
2457 .unwrap_or(VmValue::Nil),
2458 );
2459 next.insert(
2460 "child_ids".to_string(),
2461 VmValue::List(Rc::new(
2462 state
2463 .child_ids
2464 .iter()
2465 .cloned()
2466 .map(|id| VmValue::String(Rc::from(id)))
2467 .collect(),
2468 )),
2469 );
2470 next.insert(
2471 "branched_at_event_index".to_string(),
2472 state
2473 .branched_at_event_index
2474 .map(|index| VmValue::Int(index as i64))
2475 .unwrap_or(VmValue::Nil),
2476 );
2477 next.insert(
2478 "system_prompt".to_string(),
2479 state
2480 .system_prompt
2481 .as_ref()
2482 .map(|prompt| VmValue::String(Rc::from(prompt.clone())))
2483 .unwrap_or(VmValue::Nil),
2484 );
2485 next.insert(
2486 "tool_format".to_string(),
2487 state
2488 .tool_format
2489 .as_ref()
2490 .map(|format| VmValue::String(Rc::from(format.clone())))
2491 .unwrap_or(VmValue::Nil),
2492 );
2493 next.insert(
2494 "pinned_model".to_string(),
2495 state
2496 .pinned_model
2497 .as_ref()
2498 .map(|model| VmValue::String(Rc::from(model.clone())))
2499 .unwrap_or(VmValue::Nil),
2500 );
2501 next.insert(
2502 "pinned_reasoning_policy".to_string(),
2503 state
2504 .pinned_reasoning_policy
2505 .as_ref()
2506 .map(|policy| VmValue::String(Rc::from(policy.clone())))
2507 .unwrap_or(VmValue::Nil),
2508 );
2509 next.insert(
2510 "workspace_anchor".to_string(),
2511 state
2512 .workspace_anchor
2513 .as_ref()
2514 .map(WorkspaceAnchor::to_vm_value)
2515 .unwrap_or(VmValue::Nil),
2516 );
2517 next.insert(
2518 "workspace_policy".to_string(),
2519 state.workspace_policy.to_vm_value(),
2520 );
2521 VmValue::Dict(Rc::new(next))
2522}
2523
2524fn update_lineage(
2525 map: &mut HashMap<String, SessionState>,
2526 parent_id: &str,
2527 child_id: &str,
2528 branched_at_event_index: Option<usize>,
2529) {
2530 let old_parent_id = map.get(child_id).and_then(|child| child.parent_id.clone());
2531 if let Some(old_parent_id) = old_parent_id.filter(|old_parent_id| old_parent_id != parent_id) {
2532 if let Some(old_parent) = map.get_mut(&old_parent_id) {
2533 old_parent.child_ids.retain(|id| id != child_id);
2534 old_parent.last_accessed = Instant::now();
2535 }
2536 }
2537 if let Some(parent) = map.get_mut(parent_id) {
2538 parent.last_accessed = Instant::now();
2539 if !parent.child_ids.iter().any(|id| id == child_id) {
2540 parent.child_ids.push(child_id.to_string());
2541 }
2542 }
2543 if let Some(child) = map.get_mut(child_id) {
2544 child.last_accessed = Instant::now();
2545 child.parent_id = Some(parent_id.to_string());
2546 child.branched_at_event_index = branched_at_event_index;
2547 child.transcript = clone_transcript_with_parent(&child.transcript, parent_id);
2548 }
2549}
2550
2551fn branch_event_index(transcript: &VmValue, keep_first: usize) -> usize {
2552 if keep_first == 0 {
2553 return 0;
2554 }
2555 let Some(dict) = transcript.as_dict() else {
2556 return keep_first;
2557 };
2558 let Some(VmValue::List(events)) = dict.get("events") else {
2559 return keep_first;
2560 };
2561 event_prefix_len_for_messages(events, keep_first)
2562}
2563
2564fn event_kind(event: &VmValue) -> Option<String> {
2565 event
2566 .as_dict()
2567 .and_then(|dict| dict.get("kind"))
2568 .map(VmValue::display)
2569}
2570
2571fn event_id(event: &VmValue) -> Option<String> {
2572 event
2573 .as_dict()
2574 .and_then(|dict| dict.get("id"))
2575 .map(VmValue::display)
2576}
2577
2578fn is_turn_event(event: &VmValue) -> bool {
2579 matches!(
2580 event_kind(event).as_deref(),
2581 Some("message" | "tool_result")
2582 )
2583}
2584
2585fn event_prefix_len_for_messages(events: &[VmValue], keep_first: usize) -> usize {
2586 if keep_first == 0 {
2587 return 0;
2588 }
2589 let mut retained_messages = 0usize;
2590 for (index, event) in events.iter().enumerate() {
2591 if is_turn_event(event) {
2592 retained_messages += 1;
2593 if retained_messages == keep_first {
2594 return index + 1;
2595 }
2596 }
2597 }
2598 events.len()
2599}
2600
2601fn turn_event_id_for_count(events: &[VmValue], keep_first: usize) -> Option<String> {
2602 if keep_first == 0 {
2603 return None;
2604 }
2605 let mut retained_messages = 0usize;
2606 for event in events {
2607 if is_turn_event(event) {
2608 retained_messages += 1;
2609 if retained_messages == keep_first {
2610 return event_id(event);
2611 }
2612 }
2613 }
2614 None
2615}
2616
2617#[cfg(test)]
2618#[path = "agent_sessions_tests.rs"]
2619mod tests;