1use std::cell::{Cell, RefCell};
20use std::collections::{BTreeMap, HashMap, HashSet};
21use std::future::Future;
22use std::path::{Path, PathBuf};
23use std::time::Instant;
24
25use crate::runtime_limits::RuntimeLimits;
26use crate::value::VmValue;
27use crate::workspace_anchor::{
28 MountMode, MountedRoot, WorkspaceAnchor, WorkspacePolicy, WORKSPACE_ANCHOR_METADATA_KEY,
29};
30
31pub const DEFAULT_SESSION_CAP: usize = RuntimeLimits::DEFAULT.max_agent_sessions;
34
35pub const DEFAULT_TRANSCRIPT_MESSAGE_CAP: usize = 4096;
39
40pub const DEFAULT_TRANSCRIPT_EVENT_CAP: usize = 32768;
43pub const MAX_SCRATCHPAD_BYTES: usize = 16 * 1024;
44#[cfg(debug_assertions)]
45const CACHE_STABLE_SYSTEM_PROMPT_DIAGNOSTIC: &str = "HARN-CACHE-001";
46
47#[derive(Clone, Debug, PartialEq, Eq)]
48pub enum TranscriptBudgetRecovery {
49 Reject,
50 Trim { keep_last: usize },
51 Compact { keep_last: usize },
52}
53
54#[derive(Clone, Debug, PartialEq, Eq)]
55pub struct SessionTranscriptBudgetPolicy {
56 pub max_messages: usize,
57 pub max_events: usize,
58 pub max_approx_bytes: Option<usize>,
59 pub recovery: TranscriptBudgetRecovery,
60}
61
62impl SessionTranscriptBudgetPolicy {
63 pub fn reject(max_messages: usize, max_events: usize) -> Self {
64 Self {
65 max_messages: max_messages.max(1),
66 max_events: max_events.max(1),
67 max_approx_bytes: None,
68 recovery: TranscriptBudgetRecovery::Reject,
69 }
70 }
71
72 pub fn trim(max_messages: usize, max_events: usize, keep_last: usize) -> Self {
73 Self {
74 max_messages: max_messages.max(1),
75 max_events: max_events.max(1),
76 max_approx_bytes: None,
77 recovery: TranscriptBudgetRecovery::Trim { keep_last },
78 }
79 }
80
81 pub fn compact(max_messages: usize, max_events: usize, keep_last: usize) -> Self {
82 Self {
83 max_messages: max_messages.max(1),
84 max_events: max_events.max(1),
85 max_approx_bytes: None,
86 recovery: TranscriptBudgetRecovery::Compact { keep_last },
87 }
88 }
89
90 pub fn with_max_approx_bytes(mut self, max_approx_bytes: Option<usize>) -> Self {
91 self.max_approx_bytes = max_approx_bytes.map(|limit| limit.max(1));
92 self
93 }
94
95 fn normalized(&self) -> Self {
96 Self {
97 max_messages: self.max_messages.max(1),
98 max_events: self.max_events.max(1),
99 max_approx_bytes: self.max_approx_bytes.map(|limit| limit.max(1)),
100 recovery: self.recovery.clone(),
101 }
102 }
103}
104
105impl Default for SessionTranscriptBudgetPolicy {
106 fn default() -> Self {
107 Self::reject(DEFAULT_TRANSCRIPT_MESSAGE_CAP, DEFAULT_TRANSCRIPT_EVENT_CAP)
108 }
109}
110
111pub struct SessionState {
112 pub id: String,
113 pub transcript: VmValue,
114 pub subscribers: Vec<VmValue>,
115 pub created_at: String,
116 pub last_accessed: Instant,
117 pub parent_id: Option<String>,
118 pub child_ids: Vec<String>,
119 pub branched_at_event_index: Option<usize>,
120 pub active_skills: Vec<String>,
126 pub tool_format: Option<String>,
130 pub system_prompt: Option<String>,
134 pub pinned_model: Option<String>,
140 pub pinned_reasoning_policy: Option<String>,
146 pub workspace_policy: WorkspacePolicy,
149 pub workspace_anchor: Option<WorkspaceAnchor>,
155 pub scratchpad: Option<VmValue>,
158 pub scratchpad_version: u64,
159 pub transcript_budget_policy: SessionTranscriptBudgetPolicy,
160 pub last_transcript_budget_action: Option<serde_json::Value>,
161}
162
163impl SessionState {
164 fn new(id: String) -> Self {
165 let now = Instant::now();
166 let transcript = empty_transcript(&id);
167 Self {
168 id,
169 transcript,
170 subscribers: Vec::new(),
171 created_at: crate::orchestration::now_rfc3339(),
172 last_accessed: now,
173 parent_id: None,
174 child_ids: Vec::new(),
175 branched_at_event_index: None,
176 active_skills: Vec::new(),
177 tool_format: None,
178 system_prompt: None,
179 pinned_model: None,
180 pinned_reasoning_policy: None,
181 workspace_policy: WorkspacePolicy::default(),
182 workspace_anchor: None,
183 scratchpad: None,
184 scratchpad_version: 0,
185 transcript_budget_policy: default_transcript_budget_policy(),
186 last_transcript_budget_action: None,
187 }
188 }
189}
190
191#[derive(Clone, Debug, PartialEq, Eq)]
192pub struct SessionAncestry {
193 pub parent_id: Option<String>,
194 pub child_ids: Vec<String>,
195 pub root_id: String,
196}
197
198#[derive(Clone, Debug, PartialEq, Eq)]
199pub struct SessionTruncateResult {
200 pub kept_turn_count: usize,
201 pub removed_turn_count: usize,
202 pub new_tip_turn_id: Option<String>,
203}
204
205#[derive(Clone, Debug, PartialEq, Eq)]
206pub struct ReminderInjectionReport {
207 pub reminder_id: String,
208 pub deduped_count: usize,
209}
210
211thread_local! {
212 static SESSIONS: RefCell<HashMap<String, SessionState>> = RefCell::new(HashMap::new());
213 static SESSION_CAP: Cell<usize> = const { Cell::new(DEFAULT_SESSION_CAP) };
214 static DEFAULT_TRANSCRIPT_BUDGET_POLICY: RefCell<SessionTranscriptBudgetPolicy> =
215 RefCell::new(SessionTranscriptBudgetPolicy::default());
216 static CURRENT_SESSION_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
217 static CURRENT_TOOL_CALL_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
218}
219
220tokio::task_local! {
221 static CURRENT_TOOL_CALL_TASK: String;
222}
223
224pub struct CurrentSessionGuard {
225 active: bool,
226}
227
228impl Drop for CurrentSessionGuard {
229 fn drop(&mut self) {
230 if self.active {
231 pop_current_session();
232 }
233 }
234}
235
236pub struct CurrentToolCallGuard {
242 active: bool,
243}
244
245impl Drop for CurrentToolCallGuard {
246 fn drop(&mut self) {
247 if self.active {
248 pop_current_tool_call();
249 }
250 }
251}
252
253pub fn set_session_cap(cap: usize) {
256 SESSION_CAP.with(|c| c.set(cap.max(1)));
257}
258
259pub fn session_cap() -> usize {
260 SESSION_CAP.with(|c| c.get())
261}
262
263pub fn set_default_transcript_budget_policy(policy: SessionTranscriptBudgetPolicy) {
264 DEFAULT_TRANSCRIPT_BUDGET_POLICY.with(|cell| {
265 *cell.borrow_mut() = policy.normalized();
266 });
267}
268
269pub fn reset_default_transcript_budget_policy() {
270 set_default_transcript_budget_policy(SessionTranscriptBudgetPolicy::default());
271}
272
273pub fn default_transcript_budget_policy() -> SessionTranscriptBudgetPolicy {
274 DEFAULT_TRANSCRIPT_BUDGET_POLICY.with(|cell| cell.borrow().clone())
275}
276
277pub fn transcript_budget_policy(id: &str) -> Option<SessionTranscriptBudgetPolicy> {
278 SESSIONS.with(|s| {
279 s.borrow()
280 .get(id)
281 .map(|state| state.transcript_budget_policy.clone())
282 })
283}
284
285pub fn set_transcript_budget_policy(
286 id: &str,
287 policy: SessionTranscriptBudgetPolicy,
288) -> Result<(), String> {
289 SESSIONS.with(|s| {
290 let mut map = s.borrow_mut();
291 let Some(state) = map.get_mut(id) else {
292 return Err(format!("agent session '{id}' does not exist"));
293 };
294 let previous = state.transcript_budget_policy.clone();
295 let previous_action = state.last_transcript_budget_action.clone();
296 state.transcript_budget_policy = policy.normalized();
297 let candidate = state.transcript.clone();
298 if let Err(error) = apply_transcript_with_budget(state, candidate, "policy_update") {
299 state.transcript_budget_policy = previous;
300 state.last_transcript_budget_action = previous_action;
301 return Err(error);
302 }
303 state.last_accessed = Instant::now();
304 Ok(())
305 })
306}
307
308pub fn reset_session_store() {
310 SESSIONS.with(|s| s.borrow_mut().clear());
311 CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().clear());
312 CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow_mut().clear());
313 reset_default_transcript_budget_policy();
314}
315
316pub(crate) fn push_current_session(id: String) {
317 if id.is_empty() {
318 return;
319 }
320 CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().push(id));
321}
322
323pub(crate) fn pop_current_session() {
324 CURRENT_SESSION_STACK.with(|stack| {
325 let _ = stack.borrow_mut().pop();
326 });
327}
328
329pub fn current_session_id() -> Option<String> {
330 CURRENT_SESSION_STACK.with(|stack| stack.borrow().last().cloned())
331}
332
333pub fn enter_current_session(id: impl Into<String>) -> CurrentSessionGuard {
334 let id = id.into();
335 if id.trim().is_empty() {
336 return CurrentSessionGuard { active: false };
337 }
338 push_current_session(id);
339 CurrentSessionGuard { active: true }
340}
341
342fn push_current_tool_call(id: String) {
343 if id.is_empty() {
344 return;
345 }
346 CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow_mut().push(id));
347}
348
349fn pop_current_tool_call() {
350 CURRENT_TOOL_CALL_STACK.with(|stack| {
351 let _ = stack.borrow_mut().pop();
352 });
353}
354
355pub fn current_tool_call_id() -> Option<String> {
360 if let Ok(id) = CURRENT_TOOL_CALL_TASK.try_with(Clone::clone) {
361 if !id.trim().is_empty() {
362 return Some(id);
363 }
364 }
365 CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow().last().cloned())
366}
367
368pub async fn scope_current_tool_call<F, T>(id: impl Into<String>, future: F) -> T
374where
375 F: Future<Output = T>,
376{
377 let id = id.into();
378 if id.trim().is_empty() {
379 future.await
380 } else {
381 CURRENT_TOOL_CALL_TASK.scope(id, future).await
382 }
383}
384
385pub fn enter_current_tool_call(id: impl Into<String>) -> CurrentToolCallGuard {
387 let id = id.into();
388 if id.trim().is_empty() {
389 return CurrentToolCallGuard { active: false };
390 }
391 push_current_tool_call(id);
392 CurrentToolCallGuard { active: true }
393}
394
395pub fn exists(id: &str) -> bool {
396 SESSIONS.with(|s| s.borrow().contains_key(id))
397}
398
399pub fn length(id: &str) -> Option<usize> {
400 SESSIONS.with(|s| {
401 s.borrow().get(id).map(|state| {
402 state
403 .transcript
404 .as_dict()
405 .and_then(|d| d.get("messages"))
406 .and_then(|v| match v {
407 VmValue::List(list) => Some(list.len()),
408 _ => None,
409 })
410 .unwrap_or(0)
411 })
412 })
413}
414
415pub fn scratchpad(id: &str) -> Option<VmValue> {
416 SESSIONS.with(|s| {
417 s.borrow()
418 .get(id)
419 .and_then(|state| state.scratchpad.clone())
420 })
421}
422
423pub fn scratchpad_version(id: &str) -> Option<u64> {
424 SESSIONS.with(|s| s.borrow().get(id).map(|state| state.scratchpad_version))
425}
426
427pub fn set_scratchpad(
428 id: &str,
429 scratchpad: VmValue,
430 source: impl Into<String>,
431 reason: Option<String>,
432 metadata: serde_json::Value,
433) -> Result<u64, String> {
434 validate_scratchpad_value(&scratchpad)?;
435 SESSIONS.with(|s| {
436 let mut map = s.borrow_mut();
437 let Some(state) = map.get_mut(id) else {
438 return Err(format!("agent session '{id}' does not exist"));
439 };
440 let version = state.scratchpad_version.saturating_add(1);
441 let event = scratchpad_transcript_event(
442 "set",
443 version,
444 Some(&scratchpad),
445 source.into(),
446 reason,
447 metadata,
448 );
449 append_event_to_state(state, event, "set_scratchpad")?;
450 state.scratchpad = Some(scratchpad);
451 state.scratchpad_version = version;
452 state.last_accessed = Instant::now();
453 Ok(version)
454 })
455}
456
457pub fn clear_scratchpad(
458 id: &str,
459 source: impl Into<String>,
460 reason: Option<String>,
461 metadata: serde_json::Value,
462) -> Result<u64, String> {
463 SESSIONS.with(|s| {
464 let mut map = s.borrow_mut();
465 let Some(state) = map.get_mut(id) else {
466 return Err(format!("agent session '{id}' does not exist"));
467 };
468 let version = state.scratchpad_version.saturating_add(1);
469 let event =
470 scratchpad_transcript_event("clear", version, None, source.into(), reason, metadata);
471 append_event_to_state(state, event, "clear_scratchpad")?;
472 state.scratchpad = None;
473 state.scratchpad_version = version;
474 state.last_accessed = Instant::now();
475 Ok(version)
476 })
477}
478
479fn validate_scratchpad_value(value: &VmValue) -> Result<(), String> {
480 if !matches!(value, VmValue::Dict(_)) {
481 return Err("agent session scratchpad must be a dict".to_string());
482 }
483 let json = crate::llm::helpers::vm_value_to_json(value);
484 let approx_bytes = serde_json::to_vec(&json)
485 .map(|bytes| bytes.len())
486 .unwrap_or(usize::MAX);
487 if approx_bytes > MAX_SCRATCHPAD_BYTES {
488 return Err(format!(
489 "agent session scratchpad is {approx_bytes} bytes; max is {MAX_SCRATCHPAD_BYTES}"
490 ));
491 }
492 Ok(())
493}
494
495fn scratchpad_transcript_event(
496 action: &str,
497 version: u64,
498 scratchpad: Option<&VmValue>,
499 source: String,
500 reason: Option<String>,
501 metadata: serde_json::Value,
502) -> VmValue {
503 let scratchpad_json = scratchpad.map(crate::llm::helpers::vm_value_to_json);
504 let approx_bytes = scratchpad_json
505 .as_ref()
506 .and_then(|value| serde_json::to_vec(value).ok().map(|bytes| bytes.len()))
507 .unwrap_or(0);
508 let event_metadata = serde_json::json!({
509 "action": action,
510 "version": version,
511 "source": normalize_scratchpad_source(source),
512 "reason": reason.unwrap_or_default(),
513 "approx_bytes": approx_bytes,
514 "counts": scratchpad_json
515 .as_ref()
516 .map(scratchpad_counts_json)
517 .unwrap_or_else(|| serde_json::json!({})),
518 "metadata": metadata,
519 });
520 let content = format!("Agent scratchpad {action}");
521 crate::llm::helpers::transcript_event(
522 "agent_scratchpad",
523 "system",
524 "internal",
525 &content,
526 Some(event_metadata),
527 )
528}
529
530fn normalize_scratchpad_source(source: String) -> String {
531 let trimmed = source.trim();
532 if trimmed.is_empty() {
533 "harn.agent_scratchpad".to_string()
534 } else {
535 trimmed.to_string()
536 }
537}
538
539fn scratchpad_counts_json(value: &serde_json::Value) -> serde_json::Value {
540 serde_json::json!({
541 "goals": scratchpad_array_len(value, "goals"),
542 "open_items": scratchpad_array_len(value, "open_items"),
543 "facts": scratchpad_array_len(value, "facts"),
544 "refs": scratchpad_array_len(value, "refs"),
545 })
546}
547
548fn scratchpad_array_len(value: &serde_json::Value, key: &str) -> usize {
549 value
550 .get(key)
551 .and_then(serde_json::Value::as_array)
552 .map(Vec::len)
553 .unwrap_or(0)
554}
555
556pub fn snapshot(id: &str) -> Option<VmValue> {
557 SESSIONS.with(|s| s.borrow().get(id).map(session_snapshot))
558}
559
560pub fn transcript(id: &str) -> Option<VmValue> {
562 SESSIONS.with(|s| {
563 s.borrow()
564 .get(id)
565 .map(|state| transcript_with_session_metadata(state.transcript.clone(), state))
566 })
567}
568
569pub fn open_or_create(id: Option<String>) -> String {
578 let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
579 let parent_session = current_session_id();
580 let mut was_new = false;
581 SESSIONS.with(|s| {
582 let mut map = s.borrow_mut();
583 if let Some(state) = map.get_mut(&resolved) {
584 state.last_accessed = Instant::now();
585 return;
586 }
587 was_new = true;
588 let cap = SESSION_CAP.with(|c| c.get());
589 if map.len() >= cap {
590 if let Some(victim) = map
591 .iter()
592 .min_by_key(|(_, state)| state.last_accessed)
593 .map(|(id, _)| id.clone())
594 {
595 map.remove(&victim);
596 }
597 }
598 map.insert(resolved.clone(), SessionState::new(resolved.clone()));
599 });
600 if was_new {
601 if let Some(parent) = parent_session.as_deref() {
602 crate::agent_events::mirror_session_sinks(parent, &resolved);
603 }
604 try_register_event_log(&resolved);
605 }
606 resolved
607}
608
609pub fn open_child_session(parent_id: &str, id: Option<String>) -> String {
610 let resolved = open_or_create(id);
611 link_child_session(parent_id, &resolved);
612 resolved
613}
614
615pub fn link_child_session(parent_id: &str, child_id: &str) {
616 link_child_session_with_branch(parent_id, child_id, None);
617}
618
619pub fn link_child_session_with_branch(
620 parent_id: &str,
621 child_id: &str,
622 branched_at_event_index: Option<usize>,
623) {
624 if parent_id == child_id {
625 return;
626 }
627 open_or_create(Some(parent_id.to_string()));
628 open_or_create(Some(child_id.to_string()));
629 SESSIONS.with(|s| {
630 let mut map = s.borrow_mut();
631 update_lineage(&mut map, parent_id, child_id, branched_at_event_index);
632 });
633}
634
635pub fn parent_id(id: &str) -> Option<String> {
636 SESSIONS.with(|s| s.borrow().get(id).and_then(|state| state.parent_id.clone()))
637}
638
639pub fn child_ids(id: &str) -> Vec<String> {
640 SESSIONS.with(|s| {
641 s.borrow()
642 .get(id)
643 .map(|state| state.child_ids.clone())
644 .unwrap_or_default()
645 })
646}
647
648pub fn ancestry(id: &str) -> Option<SessionAncestry> {
649 SESSIONS.with(|s| {
650 let map = s.borrow();
651 let state = map.get(id)?;
652 let mut root_id = state.id.clone();
653 let mut cursor = state.parent_id.clone();
654 let mut seen = HashSet::from([state.id.clone()]);
655 while let Some(parent_id) = cursor {
656 if !seen.insert(parent_id.clone()) {
657 break;
658 }
659 root_id = parent_id.clone();
660 cursor = map
661 .get(&parent_id)
662 .and_then(|parent| parent.parent_id.clone());
663 }
664 Some(SessionAncestry {
665 parent_id: state.parent_id.clone(),
666 child_ids: state.child_ids.clone(),
667 root_id,
668 })
669 })
670}
671
672fn try_register_event_log(session_id: &str) {
676 if let Some(log) = crate::event_log::active_event_log() {
677 crate::agent_events::register_sink(
678 session_id,
679 crate::agent_events::EventLogSink::new(log, session_id),
680 );
681 return;
682 }
683 let Ok(dir) = std::env::var("HARN_EVENT_LOG_DIR") else {
684 return;
685 };
686 if dir.is_empty() {
687 return;
688 }
689 let path = std::path::PathBuf::from(dir).join(format!("event_log-{session_id}.jsonl"));
690 if let Ok(sink) = crate::agent_events::JsonlEventSink::open(&path) {
691 crate::agent_events::register_sink(session_id, sink);
692 }
693}
694
695pub fn register_event_log_sink(session_id: &str) {
696 try_register_event_log(session_id);
697}
698
699pub fn close(id: &str) {
700 SESSIONS.with(|s| {
701 s.borrow_mut().remove(id);
702 });
703 crate::orchestration::agent_inbox::clear_session(id);
707 crate::agent_events::clear_session_sinks(id);
708}
709
710pub fn close_with_status(
711 id: &str,
712 reason: impl Into<String>,
713 status: impl Into<String>,
714 metadata: serde_json::Value,
715) -> bool {
716 if !exists(id) {
717 return false;
718 }
719 let reason = reason.into();
720 let status = status.into();
721 let event_metadata = serde_json::json!({
722 "reason": reason,
723 "status": status,
724 "metadata": metadata,
725 });
726 let transcript_event = crate::llm::helpers::transcript_event(
727 "agent_session_closed",
728 "system",
729 "internal",
730 "Agent session closed",
731 Some(event_metadata),
732 );
733 let _ = append_event(id, transcript_event);
734 crate::llm::emit_live_agent_event_sync(&crate::agent_events::AgentEvent::SessionClosed {
735 session_id: id.to_string(),
736 reason,
737 status,
738 metadata,
739 });
740 close(id);
741 true
742}
743
744pub fn reset_transcript(id: &str) -> bool {
745 SESSIONS.with(|s| {
746 let mut map = s.borrow_mut();
747 let Some(state) = map.get_mut(id) else {
748 return false;
749 };
750 state.transcript = empty_transcript(id);
751 state.tool_format = None;
752 state.system_prompt = None;
753 state.scratchpad = None;
754 state.scratchpad_version = 0;
755 state.last_transcript_budget_action = None;
756 state.last_accessed = Instant::now();
757 true
758 })
759}
760
761pub fn fork(src_id: &str, dst_id: Option<String>) -> Option<String> {
768 let (
769 src_transcript,
770 src_tool_format,
771 src_system_prompt,
772 src_pinned_model,
773 src_pinned_reasoning_policy,
774 src_workspace_anchor,
775 src_workspace_policy,
776 src_scratchpad,
777 src_scratchpad_version,
778 src_transcript_budget_policy,
779 src_last_transcript_budget_action,
780 dst,
781 ) = SESSIONS.with(|s| {
782 let mut map = s.borrow_mut();
783 let src = map.get_mut(src_id)?;
784 src.last_accessed = Instant::now();
785 let dst = dst_id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
786 let forked_transcript = clone_transcript_with_id(&src.transcript, &dst);
787 Some((
788 forked_transcript,
789 src.tool_format.clone(),
790 src.system_prompt.clone(),
791 src.pinned_model.clone(),
792 src.pinned_reasoning_policy.clone(),
793 src.workspace_anchor.clone(),
794 src.workspace_policy.clone(),
795 src.scratchpad.clone(),
796 src.scratchpad_version,
797 src.transcript_budget_policy.clone(),
798 src.last_transcript_budget_action.clone(),
799 dst,
800 ))
801 })?;
802 open_or_create(Some(dst.clone()));
804 SESSIONS.with(|s| {
805 let mut map = s.borrow_mut();
806 if let Some(state) = map.get_mut(&dst) {
807 state.transcript = src_transcript;
808 state.tool_format = src_tool_format;
809 state.system_prompt = src_system_prompt;
810 state.pinned_model = src_pinned_model;
811 state.pinned_reasoning_policy = src_pinned_reasoning_policy;
812 state.workspace_anchor = src_workspace_anchor;
813 state.workspace_policy = src_workspace_policy;
814 state.scratchpad = src_scratchpad;
815 state.scratchpad_version = src_scratchpad_version;
816 state.transcript_budget_policy = src_transcript_budget_policy;
817 state.last_transcript_budget_action = src_last_transcript_budget_action;
818 state.last_accessed = Instant::now();
819 }
820 update_lineage(&mut map, src_id, &dst, None);
821 });
822 let budget_ok = SESSIONS.with(|s| {
823 let mut map = s.borrow_mut();
824 let Some(state) = map.get_mut(&dst) else {
825 return false;
826 };
827 let candidate = state.transcript.clone();
828 apply_transcript_with_budget(state, candidate, "fork").is_ok()
829 });
830 if !budget_ok {
831 close(&dst);
832 return None;
833 }
834 if exists(&dst) {
838 Some(dst)
839 } else {
840 None
841 }
842}
843
844pub fn fork_at(src_id: &str, keep_first: usize, dst_id: Option<String>) -> Option<String> {
855 let branched_at_event_index = SESSIONS.with(|s| {
856 let map = s.borrow();
857 let src = map.get(src_id)?;
858 Some(branch_event_index(&src.transcript, keep_first))
859 })?;
860 let new_id = fork(src_id, dst_id)?;
861 link_child_session_with_branch(src_id, &new_id, Some(branched_at_event_index));
862 let _ = truncate(&new_id, keep_first);
863 Some(new_id)
864}
865
866pub fn truncate(id: &str, keep_first: usize) -> Option<SessionTruncateResult> {
870 SESSIONS.with(|s| {
871 let mut map = s.borrow_mut();
872 let state = map.get_mut(id)?;
873 let result = truncate_state(state, keep_first)?;
874 Some(result)
875 })
876}
877
878fn truncate_state(state: &mut SessionState, keep_first: usize) -> Option<SessionTruncateResult> {
879 let dict = state
880 .transcript
881 .as_dict()
882 .cloned()
883 .unwrap_or_else(BTreeMap::new);
884 let messages: Vec<VmValue> = match dict.get("messages") {
885 Some(VmValue::List(list)) => list.iter().cloned().collect(),
886 _ => Vec::new(),
887 };
888 let existing_events = match dict.get("events") {
889 Some(VmValue::List(list)) => Some(list.iter().cloned().collect::<Vec<_>>()),
890 _ => None,
891 };
892 let kept_turn_count = keep_first.min(messages.len());
893 let removed_turn_count = messages.len().saturating_sub(kept_turn_count);
894 let mut new_tip_turn_id = existing_events
895 .as_ref()
896 .map(|events| turn_event_id_for_count(events, kept_turn_count))
897 .unwrap_or_else(|| {
898 let events = crate::llm::helpers::transcript_events_from_messages(&messages);
899 turn_event_id_for_count(&events, kept_turn_count)
900 });
901
902 if removed_turn_count > 0 {
903 let retained: Vec<VmValue> = messages.into_iter().take(kept_turn_count).collect();
904 let retained_events = match existing_events {
905 Some(events) => {
906 let keep_event_count = event_prefix_len_for_messages(&events, kept_turn_count);
907 events.into_iter().take(keep_event_count).collect()
908 }
909 None => crate::llm::helpers::transcript_events_from_messages(&retained),
910 };
911 new_tip_turn_id = turn_event_id_for_count(&retained_events, kept_turn_count);
912 let mut next = dict;
913 next.insert(
914 "events".to_string(),
915 VmValue::List(std::sync::Arc::new(retained_events)),
916 );
917 next.insert(
918 "messages".to_string(),
919 VmValue::List(std::sync::Arc::new(retained)),
920 );
921 next.remove("summary");
922 apply_transcript_with_budget(state, VmValue::Dict(std::sync::Arc::new(next)), "truncate")
923 .ok()?;
924 }
925 state.last_accessed = Instant::now();
926 Some(SessionTruncateResult {
927 kept_turn_count,
928 removed_turn_count,
929 new_tip_turn_id,
930 })
931}
932
933pub fn pop_last_if_assistant(id: &str) -> Result<bool, String> {
940 SESSIONS.with(|s| {
941 let mut map = s.borrow_mut();
942 let Some(state) = map.get_mut(id) else {
943 return Err(format!(
944 "pop_last_if_assistant: unknown session id '{id}'"
945 ));
946 };
947 let messages: Vec<VmValue> = match state.transcript.as_dict() {
948 Some(dict) => match dict.get("messages") {
949 Some(VmValue::List(list)) => list.iter().cloned().collect(),
950 _ => Vec::new(),
951 },
952 None => Vec::new(),
953 };
954 if messages.is_empty() {
955 return Ok(false);
956 }
957 let trailing_role = messages
958 .last()
959 .and_then(|m| m.as_dict())
960 .and_then(|d| d.get("role"))
961 .map(|v| v.display())
962 .unwrap_or_default();
963 if trailing_role != "assistant" {
964 return Err(format!(
965 "pop_last_if_assistant: trailing message role is '{trailing_role}', expected 'assistant'"
966 ));
967 }
968 let keep = messages.len() - 1;
969 truncate_state(state, keep);
970 Ok(true)
971 })
972}
973
974pub fn trim(id: &str, keep_last: usize) -> Option<usize> {
975 SESSIONS.with(|s| {
976 let mut map = s.borrow_mut();
977 let state = map.get_mut(id)?;
978 let dict = state.transcript.as_dict()?.clone();
979 let messages: Vec<VmValue> = match dict.get("messages") {
980 Some(VmValue::List(list)) => list.iter().cloned().collect(),
981 _ => Vec::new(),
982 };
983 let start = messages.len().saturating_sub(keep_last);
984 let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
985 let kept = retained.len();
986 let mut next = dict;
987 next.insert(
988 "events".to_string(),
989 VmValue::List(std::sync::Arc::new(
990 crate::llm::helpers::transcript_events_from_messages(&retained),
991 )),
992 );
993 next.insert(
994 "messages".to_string(),
995 VmValue::List(std::sync::Arc::new(retained)),
996 );
997 apply_transcript_with_budget(state, VmValue::Dict(std::sync::Arc::new(next)), "trim")
998 .ok()?;
999 state.last_accessed = Instant::now();
1000 Some(kept)
1001 })
1002}
1003
1004pub fn inject_message(id: &str, message: VmValue) -> Result<(), String> {
1007 let Some(msg_dict) = message.as_dict().cloned() else {
1008 return Err("agent_session_inject: message must be a dict".into());
1009 };
1010 let role_ok = matches!(msg_dict.get("role"), Some(VmValue::String(_)));
1011 if !role_ok {
1012 return Err(
1013 "agent_session_inject: message must have a string `role` (user|assistant|tool_result|system)"
1014 .into(),
1015 );
1016 }
1017 SESSIONS.with(|s| {
1018 let mut map = s.borrow_mut();
1019 let Some(state) = map.get_mut(id) else {
1020 return Err(format!("agent_session_inject: unknown session id '{id}'"));
1021 };
1022 let dict = state
1023 .transcript
1024 .as_dict()
1025 .cloned()
1026 .unwrap_or_else(BTreeMap::new);
1027 let mut messages: Vec<VmValue> = match dict.get("messages") {
1028 Some(VmValue::List(list)) => list.iter().cloned().collect(),
1029 _ => Vec::new(),
1030 };
1031 let mut events: Vec<VmValue> = match dict.get("events") {
1032 Some(VmValue::List(list)) => list.iter().cloned().collect(),
1033 _ => crate::llm::helpers::transcript_events_from_messages(&messages),
1034 };
1035 let new_message = VmValue::Dict(std::sync::Arc::new(msg_dict));
1036 let message_index = messages.len();
1037 events.push(crate::llm::helpers::transcript_event_from_message(
1038 &new_message,
1039 ));
1040 messages.push(new_message);
1041 let mut next = dict;
1042 next.insert(
1043 "events".to_string(),
1044 VmValue::List(std::sync::Arc::new(events)),
1045 );
1046 next.insert(
1047 "messages".to_string(),
1048 VmValue::List(std::sync::Arc::new(messages)),
1049 );
1050 let persisted_message = next
1051 .get("messages")
1052 .and_then(|value| match value {
1053 VmValue::List(list) => list.get(message_index).cloned(),
1054 _ => None,
1055 })
1056 .unwrap_or(VmValue::Nil);
1057 apply_transcript_with_budget(
1058 state,
1059 VmValue::Dict(std::sync::Arc::new(next)),
1060 "inject_message",
1061 )?;
1062 emit_identified_user_message_event(id, &persisted_message);
1063 emit_llm_message_event(id, message_index, &persisted_message);
1064 state.last_accessed = Instant::now();
1065 Ok(())
1066 })
1067}
1068
1069fn emit_identified_user_message_event(session_id: &str, message: &VmValue) {
1070 let message_json = crate::llm::helpers::vm_value_to_json(message);
1071 let role = message_json.get("role").and_then(|value| value.as_str());
1072 if role != Some("user") {
1073 return;
1074 }
1075 let Some(message_id) = message_json
1076 .get("messageId")
1077 .or_else(|| message_json.get("message_id"))
1078 .and_then(|value| value.as_str())
1079 .filter(|value| !value.trim().is_empty())
1080 else {
1081 return;
1082 };
1083 let content = message_json
1084 .get("content")
1085 .map(user_message_content_blocks)
1086 .unwrap_or_default();
1087 crate::agent_events::emit_event(&crate::agent_events::AgentEvent::UserMessage {
1088 session_id: session_id.to_string(),
1089 message_id: message_id.to_string(),
1090 content,
1091 });
1092}
1093
1094fn user_message_content_blocks(content: &serde_json::Value) -> Vec<serde_json::Value> {
1095 match content {
1096 serde_json::Value::Array(items) => items.clone(),
1097 serde_json::Value::String(text) => vec![serde_json::json!({
1098 "type": "text",
1099 "text": text,
1100 })],
1101 other => vec![serde_json::json!({
1102 "type": "text",
1103 "text": other.to_string(),
1104 })],
1105 }
1106}
1107
1108#[derive(Clone, Debug, PartialEq, Eq)]
1109struct TranscriptBudgetUsage {
1110 message_count: usize,
1111 event_count: usize,
1112 approx_bytes: Option<usize>,
1113}
1114
1115fn transcript_messages_from_dict(dict: &BTreeMap<String, VmValue>) -> Vec<VmValue> {
1116 match dict.get("messages") {
1117 Some(VmValue::List(list)) => list.iter().cloned().collect(),
1118 _ => Vec::new(),
1119 }
1120}
1121
1122fn transcript_events_from_dict(dict: &BTreeMap<String, VmValue>) -> Vec<VmValue> {
1123 match dict.get("events") {
1124 Some(VmValue::List(list)) => list.iter().cloned().collect(),
1125 _ => {
1126 let messages = transcript_messages_from_dict(dict);
1127 crate::llm::helpers::transcript_events_from_messages(&messages)
1128 }
1129 }
1130}
1131
1132fn transcript_usage(transcript: &VmValue, include_bytes: bool) -> TranscriptBudgetUsage {
1133 let Some(dict) = transcript.as_dict() else {
1134 return TranscriptBudgetUsage {
1135 message_count: 0,
1136 event_count: 0,
1137 approx_bytes: include_bytes.then_some(0),
1138 };
1139 };
1140 let approx_bytes = if include_bytes {
1141 serde_json::to_vec(&crate::llm::helpers::vm_value_to_json(transcript))
1142 .map(|bytes| bytes.len())
1143 .ok()
1144 .or(Some(usize::MAX))
1145 } else {
1146 None
1147 };
1148 TranscriptBudgetUsage {
1149 message_count: transcript_messages_from_dict(dict).len(),
1150 event_count: transcript_events_from_dict(dict).len(),
1151 approx_bytes,
1152 }
1153}
1154
1155fn transcript_budget_exceeded_reason(
1156 usage: &TranscriptBudgetUsage,
1157 policy: &SessionTranscriptBudgetPolicy,
1158) -> Option<&'static str> {
1159 if usage.message_count > policy.max_messages {
1160 return Some("message_count");
1161 }
1162 if usage.event_count > policy.max_events {
1163 return Some("event_count");
1164 }
1165 if let (Some(bytes), Some(limit)) = (usage.approx_bytes, policy.max_approx_bytes) {
1166 if bytes > limit {
1167 return Some("approx_bytes");
1168 }
1169 }
1170 None
1171}
1172
1173fn transcript_budget_usage_json(usage: &TranscriptBudgetUsage) -> serde_json::Value {
1174 serde_json::json!({
1175 "messages": usage.message_count,
1176 "events": usage.event_count,
1177 "approx_bytes": usage.approx_bytes,
1178 })
1179}
1180
1181fn transcript_budget_policy_json(policy: &SessionTranscriptBudgetPolicy) -> serde_json::Value {
1182 let recovery = match &policy.recovery {
1183 TranscriptBudgetRecovery::Reject => serde_json::json!({"action": "reject"}),
1184 TranscriptBudgetRecovery::Trim { keep_last } => {
1185 serde_json::json!({"action": "trim", "keep_last": keep_last})
1186 }
1187 TranscriptBudgetRecovery::Compact { keep_last } => {
1188 serde_json::json!({"action": "compact", "keep_last": keep_last})
1189 }
1190 };
1191 serde_json::json!({
1192 "max_messages": policy.max_messages,
1193 "max_events": policy.max_events,
1194 "max_approx_bytes": policy.max_approx_bytes,
1195 "recovery": recovery,
1196 })
1197}
1198
1199fn transcript_budget_recovery_name(recovery: &TranscriptBudgetRecovery) -> &'static str {
1200 match recovery {
1201 TranscriptBudgetRecovery::Reject => "reject",
1202 TranscriptBudgetRecovery::Trim { .. } => "trim",
1203 TranscriptBudgetRecovery::Compact { .. } => "compact",
1204 }
1205}
1206
1207fn transcript_budget_error(
1208 state: &SessionState,
1209 policy: &SessionTranscriptBudgetPolicy,
1210 usage: &TranscriptBudgetUsage,
1211 reason: &str,
1212) -> String {
1213 let byte_suffix = match (usage.approx_bytes, policy.max_approx_bytes) {
1214 (Some(bytes), Some(limit)) => format!(", approx_bytes {bytes}/{limit}"),
1215 _ => String::new(),
1216 };
1217 format!(
1218 "transcript budget exceeded for session '{}': {reason} (messages {}/{}, events {}/{}{}; recovery={})",
1219 state.id,
1220 usage.message_count,
1221 policy.max_messages,
1222 usage.event_count,
1223 policy.max_events,
1224 byte_suffix,
1225 transcript_budget_recovery_name(&policy.recovery),
1226 )
1227}
1228
1229fn transcript_budget_audit_json(
1230 action: &str,
1231 source: &str,
1232 reason: &str,
1233 policy: &SessionTranscriptBudgetPolicy,
1234 usage_before: &TranscriptBudgetUsage,
1235 usage_attempted: &TranscriptBudgetUsage,
1236 usage_after: &TranscriptBudgetUsage,
1237) -> serde_json::Value {
1238 serde_json::json!({
1239 "action": action,
1240 "source": source,
1241 "reason": reason,
1242 "policy": transcript_budget_policy_json(policy),
1243 "usage_before": transcript_budget_usage_json(usage_before),
1244 "usage_attempted": transcript_budget_usage_json(usage_attempted),
1245 "usage_after": transcript_budget_usage_json(usage_after),
1246 "removed_messages": usage_attempted.message_count.saturating_sub(usage_after.message_count),
1247 "removed_events": usage_attempted.event_count.saturating_sub(usage_after.event_count),
1248 })
1249}
1250
1251fn transcript_budget_event(audit: &serde_json::Value) -> VmValue {
1252 let action = audit
1253 .get("action")
1254 .and_then(serde_json::Value::as_str)
1255 .unwrap_or("enforced");
1256 crate::llm::helpers::transcript_event(
1257 "transcript_budget",
1258 "system",
1259 "internal",
1260 &format!("Transcript budget {action}."),
1261 Some(audit.clone()),
1262 )
1263}
1264
1265fn append_event_to_transcript(transcript: VmValue, event: VmValue) -> VmValue {
1266 let Some(dict) = transcript.as_dict() else {
1267 return transcript;
1268 };
1269 let mut next = dict.clone();
1270 let mut events = transcript_events_from_dict(&next);
1271 events.push(event);
1272 next.insert(
1273 "events".to_string(),
1274 VmValue::List(std::sync::Arc::new(events)),
1275 );
1276 VmValue::Dict(std::sync::Arc::new(next))
1277}
1278
1279fn tail_message_capacity(
1280 policy: &SessionTranscriptBudgetPolicy,
1281 reserve_audit_event: bool,
1282) -> usize {
1283 let event_capacity = tail_event_capacity(policy, usize::from(reserve_audit_event));
1284 policy.max_messages.min(event_capacity)
1285}
1286
1287fn tail_event_capacity(policy: &SessionTranscriptBudgetPolicy, reserved_events: usize) -> usize {
1288 policy.max_events.saturating_sub(reserved_events)
1289}
1290
1291fn trim_transcript_for_budget(
1292 transcript: &VmValue,
1293 policy: &SessionTranscriptBudgetPolicy,
1294 keep_last: usize,
1295) -> VmValue {
1296 let dict = transcript.as_dict().cloned().unwrap_or_else(BTreeMap::new);
1297 let messages = transcript_messages_from_dict(&dict);
1298 let keep = keep_last.min(tail_message_capacity(policy, true));
1299 let start = messages.len().saturating_sub(keep);
1300 let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
1301 let mut next = dict;
1302 next.insert(
1303 "events".to_string(),
1304 VmValue::List(std::sync::Arc::new(
1305 crate::llm::helpers::transcript_events_from_messages(&retained),
1306 )),
1307 );
1308 next.insert(
1309 "messages".to_string(),
1310 VmValue::List(std::sync::Arc::new(retained)),
1311 );
1312 next.remove("summary");
1313 VmValue::Dict(std::sync::Arc::new(next))
1314}
1315
1316struct BudgetCompactionLiveEvent {
1317 policy: crate::orchestration::CompactionPolicy,
1318 policy_strategy: String,
1319 metrics: crate::orchestration::TranscriptCompactedEventMetrics,
1320}
1321
1322struct BudgetCompactionResult {
1323 transcript: VmValue,
1324 live_event: Option<BudgetCompactionLiveEvent>,
1325}
1326
1327fn compact_transcript_for_budget(
1328 transcript: &VmValue,
1329 policy: &SessionTranscriptBudgetPolicy,
1330 keep_last: usize,
1331 session_id: &str,
1332) -> BudgetCompactionResult {
1333 let dict = transcript.as_dict().cloned().unwrap_or_else(BTreeMap::new);
1334 let messages = transcript_messages_from_dict(&dict);
1335 let message_capacity = policy.max_messages.min(tail_event_capacity(policy, 2));
1336 let tail_keep = keep_last.min(message_capacity.saturating_sub(2));
1339 let mut config = crate::orchestration::AutoCompactConfig {
1340 token_threshold: 0,
1341 keep_last: tail_keep,
1342 compact_strategy: crate::orchestration::CompactStrategy::Llm,
1343 hard_limit_strategy: crate::orchestration::CompactStrategy::Truncate,
1344 fallback_strategy: Some(crate::orchestration::CompactStrategy::Truncate),
1345 policy_strategy: crate::orchestration::compact_strategy_name(
1346 &crate::orchestration::CompactStrategy::Llm,
1347 )
1348 .to_string(),
1349 ..Default::default()
1350 };
1351
1352 let mut json_messages = messages
1353 .iter()
1354 .map(crate::llm::helpers::vm_value_to_json)
1355 .collect::<Vec<_>>();
1356 let lifecycle =
1357 crate::orchestration::CompactLifecycle::new(crate::orchestration::CompactMode::Auto)
1358 .with_session_id(Some(session_id))
1359 .with_trigger(crate::orchestration::CompactionTrigger::BudgetPressure)
1360 .with_hook_dispatch(false)
1361 .with_evaluate_providers(false);
1362 let llm_opts = crate::llm::extract_llm_options(&[
1363 VmValue::String(std::sync::Arc::from("")),
1364 VmValue::Nil,
1365 VmValue::Nil,
1366 ])
1367 .ok();
1368 let outcome = futures::executor::block_on(crate::orchestration::run_compaction_lifecycle(
1369 &mut json_messages,
1370 &mut config,
1371 llm_opts.as_ref(),
1372 lifecycle,
1373 ))
1374 .ok()
1375 .flatten();
1376
1377 let retained = json_messages
1378 .iter()
1379 .map(crate::stdlib::json_to_vm_value)
1380 .collect::<Vec<_>>();
1381 let mut events = crate::llm::helpers::transcript_events_from_messages(&retained);
1382 let summary = outcome.as_ref().map(|outcome| outcome.summary.clone());
1383 let mut live_event = None;
1384 if let Some(outcome) = outcome {
1385 events.push(crate::llm::helpers::transcript_event(
1386 "compaction",
1387 "system",
1388 "internal",
1389 "",
1390 Some(outcome.event_metadata.clone()),
1391 ));
1392 live_event = Some(BudgetCompactionLiveEvent {
1393 policy: config.policy.clone(),
1394 policy_strategy: outcome.policy_strategy,
1395 metrics: crate::orchestration::TranscriptCompactedEventMetrics {
1396 archived_messages: outcome.archived_messages,
1397 estimated_tokens_before: outcome.estimated_tokens_before,
1398 estimated_tokens_after: outcome.estimated_tokens_after,
1399 snapshot_asset_id: outcome.snapshot_asset_id,
1400 },
1401 });
1402 }
1403
1404 let mut next = dict;
1405 next.insert(
1406 "events".to_string(),
1407 VmValue::List(std::sync::Arc::new(events)),
1408 );
1409 next.insert(
1410 "messages".to_string(),
1411 VmValue::List(std::sync::Arc::new(retained)),
1412 );
1413 if let Some(summary) = summary {
1414 next.insert(
1415 "summary".to_string(),
1416 VmValue::String(std::sync::Arc::from(summary)),
1417 );
1418 } else {
1419 next.remove("summary");
1420 }
1421 BudgetCompactionResult {
1422 transcript: VmValue::Dict(std::sync::Arc::new(next)),
1423 live_event,
1424 }
1425}
1426
1427fn recovered_transcript_with_audit(
1428 recovered: VmValue,
1429 action: &str,
1430 source: &str,
1431 reason: &str,
1432 policy: &SessionTranscriptBudgetPolicy,
1433 usage_before: &TranscriptBudgetUsage,
1434 usage_attempted: &TranscriptBudgetUsage,
1435 include_bytes: bool,
1436) -> (VmValue, serde_json::Value, TranscriptBudgetUsage) {
1437 let usage_after_without_audit = transcript_usage(&recovered, include_bytes);
1438 let initial_audit = transcript_budget_audit_json(
1439 action,
1440 source,
1441 reason,
1442 policy,
1443 usage_before,
1444 usage_attempted,
1445 &usage_after_without_audit,
1446 );
1447 let with_initial_audit =
1448 append_event_to_transcript(recovered.clone(), transcript_budget_event(&initial_audit));
1449 let usage_after = transcript_usage(&with_initial_audit, include_bytes);
1450 let audit = transcript_budget_audit_json(
1451 action,
1452 source,
1453 reason,
1454 policy,
1455 usage_before,
1456 usage_attempted,
1457 &usage_after,
1458 );
1459 let with_audit = append_event_to_transcript(recovered, transcript_budget_event(&audit));
1460 let usage_after = transcript_usage(&with_audit, include_bytes);
1461 (with_audit, audit, usage_after)
1462}
1463
1464fn apply_transcript_with_budget(
1465 state: &mut SessionState,
1466 candidate: VmValue,
1467 source: &str,
1468) -> Result<(), String> {
1469 let policy = state.transcript_budget_policy.normalized();
1470 let include_bytes = policy.max_approx_bytes.is_some();
1471 let usage_before = transcript_usage(&state.transcript, include_bytes);
1472 let usage_attempted = transcript_usage(&candidate, include_bytes);
1473 let Some(reason) = transcript_budget_exceeded_reason(&usage_attempted, &policy) else {
1474 state.transcript = candidate;
1475 return Ok(());
1476 };
1477
1478 match policy.recovery.clone() {
1479 TranscriptBudgetRecovery::Reject => {
1480 let audit = transcript_budget_audit_json(
1481 "rejected",
1482 source,
1483 reason,
1484 &policy,
1485 &usage_before,
1486 &usage_attempted,
1487 &usage_before,
1488 );
1489 state.last_transcript_budget_action = Some(audit);
1490 Err(transcript_budget_error(
1491 state,
1492 &policy,
1493 &usage_attempted,
1494 reason,
1495 ))
1496 }
1497 TranscriptBudgetRecovery::Trim { keep_last } => {
1498 let recovered = trim_transcript_for_budget(&candidate, &policy, keep_last);
1499 let (with_audit, audit, usage_after) = recovered_transcript_with_audit(
1500 recovered,
1501 "trimmed",
1502 source,
1503 reason,
1504 &policy,
1505 &usage_before,
1506 &usage_attempted,
1507 include_bytes,
1508 );
1509 if transcript_budget_exceeded_reason(&usage_after, &policy).is_some() {
1510 let rejected = transcript_budget_audit_json(
1511 "rejected",
1512 source,
1513 reason,
1514 &policy,
1515 &usage_before,
1516 &usage_attempted,
1517 &usage_after,
1518 );
1519 state.last_transcript_budget_action = Some(rejected);
1520 return Err(transcript_budget_error(
1521 state,
1522 &policy,
1523 &usage_after,
1524 reason,
1525 ));
1526 }
1527 state.last_transcript_budget_action = Some(audit);
1528 state.transcript = with_audit;
1529 Ok(())
1530 }
1531 TranscriptBudgetRecovery::Compact { keep_last } => {
1532 let compacted =
1533 compact_transcript_for_budget(&candidate, &policy, keep_last, &state.id);
1534 let (with_audit, audit, usage_after) = recovered_transcript_with_audit(
1535 compacted.transcript,
1536 "compacted",
1537 source,
1538 reason,
1539 &policy,
1540 &usage_before,
1541 &usage_attempted,
1542 include_bytes,
1543 );
1544 if transcript_budget_exceeded_reason(&usage_after, &policy).is_some() {
1545 let rejected = transcript_budget_audit_json(
1546 "rejected",
1547 source,
1548 reason,
1549 &policy,
1550 &usage_before,
1551 &usage_attempted,
1552 &usage_after,
1553 );
1554 state.last_transcript_budget_action = Some(rejected);
1555 return Err(transcript_budget_error(
1556 state,
1557 &policy,
1558 &usage_after,
1559 reason,
1560 ));
1561 }
1562 state.last_transcript_budget_action = Some(audit);
1563 state.transcript = with_audit;
1564 if let Some(event) = compacted.live_event {
1565 crate::orchestration::emit_transcript_compacted_event_sync(
1566 &state.id,
1567 crate::orchestration::CompactMode::Auto,
1568 crate::orchestration::CompactionTrigger::BudgetPressure
1569 .as_str()
1570 .to_string(),
1571 &event.policy,
1572 event.policy_strategy,
1573 event.metrics,
1574 );
1575 }
1576 Ok(())
1577 }
1578 }
1579}
1580
1581fn emit_llm_message_event(session_id: &str, message_index: usize, message: &VmValue) {
1582 let mut fields = serde_json::Map::new();
1583 fields.insert(
1584 "session_id".to_string(),
1585 serde_json::Value::String(session_id.to_string()),
1586 );
1587 fields.insert(
1588 "message_index".to_string(),
1589 serde_json::json!(message_index),
1590 );
1591 let message_json = crate::llm::helpers::vm_value_to_json(message);
1592 if let Some(role) = message_json.get("role").and_then(|value| value.as_str()) {
1593 fields.insert(
1594 "role".to_string(),
1595 serde_json::Value::String(role.to_string()),
1596 );
1597 }
1598 if let Some(content) = message_json.get("content") {
1599 fields.insert("content".to_string(), content.clone());
1600 }
1601 fields.insert("message".to_string(), message_json);
1602 crate::llm::append_observability_sidecar_entry("message", fields);
1603}
1604
1605pub fn seed_from_messages(
1611 id: Option<String>,
1612 messages: &[serde_json::Value],
1613 metadata: serde_json::Value,
1614 system_prompt: Option<String>,
1615 tool_format: Option<String>,
1616) -> Result<String, String> {
1617 let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
1618 if exists(&resolved) {
1619 return Err(format!("agent session '{resolved}' already exists"));
1620 }
1621 open_or_create(Some(resolved.clone()));
1622 SESSIONS.with(|s| {
1623 let mut map = s.borrow_mut();
1624 let Some(state) = map.get_mut(&resolved) else {
1625 return Err(format!("failed to create agent session '{resolved}'"));
1626 };
1627 state.tool_format = tool_format.filter(|value| !value.trim().is_empty());
1628 state.system_prompt = system_prompt.filter(|value| !value.trim().is_empty());
1629
1630 let mut metadata = metadata
1631 .as_object()
1632 .cloned()
1633 .unwrap_or_else(serde_json::Map::new);
1634 if let Some(tool_format) = state.tool_format.as_ref() {
1635 metadata.insert(
1636 "tool_format".to_string(),
1637 serde_json::Value::String(tool_format.clone()),
1638 );
1639 metadata.insert(
1640 "tool_mode_locked".to_string(),
1641 serde_json::Value::Bool(true),
1642 );
1643 }
1644 if let Some(system_prompt) = state.system_prompt.as_ref() {
1645 metadata.insert(
1646 "system_prompt".to_string(),
1647 crate::llm::helpers::system_prompt_metadata(system_prompt),
1648 );
1649 }
1650 let vm_messages = crate::llm::helpers::json_messages_to_vm(messages);
1651 let candidate = crate::llm::helpers::new_transcript_with(
1652 Some(resolved.clone()),
1653 vm_messages,
1654 None,
1655 Some(crate::stdlib::json_to_vm_value(&serde_json::Value::Object(
1656 metadata,
1657 ))),
1658 );
1659 apply_transcript_with_budget(state, candidate, "seed_from_messages")?;
1660 state.last_accessed = Instant::now();
1661 Ok(resolved)
1662 })
1663}
1664
1665pub fn messages_json(id: &str) -> Vec<serde_json::Value> {
1669 SESSIONS.with(|s| {
1670 let map = s.borrow();
1671 let Some(state) = map.get(id) else {
1672 return Vec::new();
1673 };
1674 let Some(dict) = state.transcript.as_dict() else {
1675 return Vec::new();
1676 };
1677 match dict.get("messages") {
1678 Some(VmValue::List(list)) => list
1679 .iter()
1680 .map(crate::llm::helpers::vm_value_to_json)
1681 .collect(),
1682 _ => Vec::new(),
1683 }
1684 })
1685}
1686
1687#[derive(Clone, Debug, Default)]
1688pub struct SessionPromptState {
1689 pub messages: Vec<serde_json::Value>,
1690 pub summary: Option<String>,
1691}
1692
1693fn summary_message_json(summary: &str) -> serde_json::Value {
1694 serde_json::json!({
1695 "role": "user",
1696 "content": summary,
1697 })
1698}
1699
1700fn messages_begin_with_summary(messages: &[serde_json::Value], summary: &str) -> bool {
1701 messages.first().is_some_and(|message| {
1702 message.get("role").and_then(|value| value.as_str()) == Some("user")
1703 && message.get("content").and_then(|value| value.as_str()) == Some(summary)
1704 })
1705}
1706
1707pub fn prompt_state_json(id: &str) -> SessionPromptState {
1715 SESSIONS.with(|s| {
1716 let map = s.borrow();
1717 let Some(state) = map.get(id) else {
1718 return SessionPromptState::default();
1719 };
1720 let Some(dict) = state.transcript.as_dict() else {
1721 return SessionPromptState::default();
1722 };
1723 let mut messages = match dict.get("messages") {
1724 Some(VmValue::List(list)) => list
1725 .iter()
1726 .map(crate::llm::helpers::vm_value_to_json)
1727 .collect::<Vec<_>>(),
1728 _ => Vec::new(),
1729 };
1730 let summary = dict.get("summary").and_then(|value| match value {
1731 VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
1732 _ => None,
1733 });
1734 if let Some(summary_text) = summary.as_deref() {
1735 if !messages_begin_with_summary(&messages, summary_text) {
1736 messages.insert(0, summary_message_json(summary_text));
1737 }
1738 }
1739 SessionPromptState { messages, summary }
1740 })
1741}
1742
1743pub fn store_transcript(id: &str, transcript: VmValue) -> Result<(), String> {
1746 SESSIONS.with(|s| {
1747 let mut map = s.borrow_mut();
1748 let Some(state) = map.get_mut(id) else {
1749 return Err(format!(
1750 "agent_session_store_transcript: unknown session id '{id}'"
1751 ));
1752 };
1753 let transcript = transcript_with_session_metadata(transcript, state);
1754 apply_transcript_with_budget(state, transcript, "store_transcript")?;
1755 state.last_accessed = Instant::now();
1756 Ok(())
1757 })
1758}
1759
1760pub fn prune_invalid_reminder_events(id: &str) -> usize {
1764 SESSIONS.with(|s| {
1765 let mut map = s.borrow_mut();
1766 let Some(state) = map.get_mut(id) else {
1767 return 0;
1768 };
1769 let Some(dict) = state.transcript.as_dict().cloned() else {
1770 return 0;
1771 };
1772 let Some(VmValue::List(events)) = dict.get("events") else {
1773 return 0;
1774 };
1775 let mut pruned = 0_usize;
1776 let mut kept = Vec::with_capacity(events.len());
1777 for event in events.iter().cloned() {
1778 let is_reminder = event
1779 .as_dict()
1780 .and_then(|event| event.get("kind"))
1781 .map(VmValue::display)
1782 .as_deref()
1783 == Some(crate::llm::helpers::SYSTEM_REMINDER_EVENT_KIND);
1784 if !is_reminder {
1785 kept.push(event);
1786 continue;
1787 }
1788 let valid = crate::llm::helpers::reminder_from_event(&event)
1789 .is_some_and(|reminder| !reminder.body.trim().is_empty());
1790 if valid {
1791 kept.push(event);
1792 } else {
1793 pruned += 1;
1794 }
1795 }
1796 if pruned > 0 {
1797 let mut next = dict;
1798 next.insert(
1799 "events".to_string(),
1800 VmValue::List(std::sync::Arc::new(kept)),
1801 );
1802 let _ = apply_transcript_with_budget(
1803 state,
1804 VmValue::Dict(std::sync::Arc::new(next)),
1805 "prune_invalid_reminder_events",
1806 );
1807 state.last_accessed = Instant::now();
1808 }
1809 pruned
1810 })
1811}
1812
1813pub fn apply_reminder_post_turn(id: &str, turn: i64) -> Result<serde_json::Value, String> {
1818 let report = SESSIONS.with(|s| {
1819 let mut map = s.borrow_mut();
1820 let Some(state) = map.get_mut(id) else {
1821 return Err(format!(
1822 "agent_session_apply_reminder_post_turn: unknown session id '{id}'"
1823 ));
1824 };
1825 let report = crate::llm::helpers::apply_reminder_post_turn(&state.transcript, turn);
1826 if report.decremented_count > 0 || !report.expired.is_empty() {
1827 if let Some(next) = report.transcript.clone() {
1828 apply_transcript_with_budget(state, next, "apply_reminder_post_turn")?;
1829 }
1830 state.last_accessed = Instant::now();
1831 }
1832 Ok(report)
1833 })?;
1834
1835 for reminder in &report.expired {
1836 let mut payload = crate::llm::helpers::reminder_lifecycle_payload(Some(id), reminder);
1837 if let Some(obj) = payload.as_object_mut() {
1838 obj.insert(
1839 "transcript_id".to_string(),
1840 serde_json::Value::String(id.to_string()),
1841 );
1842 obj.insert(
1843 "reason".to_string(),
1844 serde_json::Value::String("ttl".to_string()),
1845 );
1846 obj.insert(
1847 "ttl_turns_before".to_string(),
1848 serde_json::json!(&reminder.ttl_turns),
1849 );
1850 obj.insert("expired_at_turn".to_string(), serde_json::json!(turn));
1851 }
1852 crate::llm::helpers::emit_reminder_lifecycle_event(
1853 crate::llm::helpers::REMINDER_EXPIRED_EVENT_KIND,
1854 payload,
1855 );
1856 }
1857
1858 Ok(serde_json::json!({
1859 "expired_count": report.expired.len(),
1860 "decremented_count": report.decremented_count,
1861 "remaining_count": report.remaining_count,
1862 }))
1863}
1864
1865pub fn inject_reminder(
1870 id: &str,
1871 reminder: crate::llm::helpers::SystemReminder,
1872) -> Result<ReminderInjectionReport, String> {
1873 let reminder_id = reminder.id.clone();
1874 let dedupe_key = reminder.dedupe_key.clone();
1875 let mut deduped_reminder_ids = Vec::new();
1876 SESSIONS.with(|s| {
1877 let mut map = s.borrow_mut();
1878 let Some(state) = map.get_mut(id) else {
1879 return Err(format!(
1880 "agent_session_inject_reminder: unknown session id '{id}'"
1881 ));
1882 };
1883 let dict = state
1884 .transcript
1885 .as_dict()
1886 .cloned()
1887 .unwrap_or_else(BTreeMap::new);
1888 let mut events: Vec<VmValue> = match dict.get("events") {
1889 Some(VmValue::List(list)) => list.iter().cloned().collect(),
1890 _ => dict
1891 .get("messages")
1892 .and_then(|value| match value {
1893 VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
1894 _ => None,
1895 })
1896 .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
1897 .unwrap_or_default(),
1898 };
1899 if let Some(expected_key) = dedupe_key.as_deref() {
1900 events.retain(|event| {
1901 let Some(existing) = crate::llm::helpers::reminder_from_event(event) else {
1902 return true;
1903 };
1904 if existing.dedupe_key.as_deref() == Some(expected_key) {
1905 deduped_reminder_ids.push(existing.id);
1906 false
1907 } else {
1908 true
1909 }
1910 });
1911 }
1912 events.push(crate::llm::helpers::transcript_reminder_event(&reminder));
1913 let mut next = dict;
1914 next.insert(
1915 "events".to_string(),
1916 VmValue::List(std::sync::Arc::new(events)),
1917 );
1918 apply_transcript_with_budget(
1919 state,
1920 VmValue::Dict(std::sync::Arc::new(next)),
1921 "inject_reminder",
1922 )?;
1923 state.last_accessed = Instant::now();
1924 Ok(())
1925 })?;
1926
1927 if !deduped_reminder_ids.is_empty() {
1928 let dropped_count = deduped_reminder_ids.len();
1929 crate::llm::helpers::emit_reminder_lifecycle_event(
1930 crate::llm::helpers::REMINDER_DEDUPED_EVENT_KIND,
1931 serde_json::json!({
1932 "session_id": id,
1933 "transcript_id": id,
1934 "reminder_id": &reminder_id,
1935 "replacing_id": &reminder_id,
1936 "replaced_id": deduped_reminder_ids.first(),
1937 "replaced_ids": &deduped_reminder_ids,
1938 "dedupe_key": &dedupe_key,
1939 "dropped_reminder_ids": &deduped_reminder_ids,
1940 "dropped_count": dropped_count,
1941 }),
1942 );
1943 }
1944
1945 crate::llm::helpers::emit_reminder_lifecycle_event(
1946 crate::llm::helpers::REMINDER_INJECTED_EVENT_KIND,
1947 crate::llm::helpers::reminder_lifecycle_payload(Some(id), &reminder),
1948 );
1949
1950 Ok(ReminderInjectionReport {
1951 reminder_id,
1952 deduped_count: deduped_reminder_ids.len(),
1953 })
1954}
1955
1956pub fn append_event(id: &str, event: VmValue) -> Result<(), String> {
1962 let Some(event_dict) = event.as_dict() else {
1963 return Err("agent_session_append_event: event must be a dict".into());
1964 };
1965 let kind_ok = matches!(event_dict.get("kind"), Some(VmValue::String(_)));
1966 if !kind_ok {
1967 return Err("agent_session_append_event: event must have a string `kind`".into());
1968 }
1969 SESSIONS.with(|s| {
1970 let mut map = s.borrow_mut();
1971 let Some(state) = map.get_mut(id) else {
1972 return Err(format!(
1973 "agent_session_append_event: unknown session id '{id}'"
1974 ));
1975 };
1976 append_event_to_state(state, event, "append_event")?;
1977 state.last_accessed = Instant::now();
1978 Ok(())
1979 })
1980}
1981
1982fn append_event_to_state(
1983 state: &mut SessionState,
1984 event: VmValue,
1985 action: &str,
1986) -> Result<(), String> {
1987 let dict = state
1988 .transcript
1989 .as_dict()
1990 .cloned()
1991 .unwrap_or_else(BTreeMap::new);
1992 let mut events: Vec<VmValue> = match dict.get("events") {
1993 Some(VmValue::List(list)) => list.iter().cloned().collect(),
1994 _ => dict
1995 .get("messages")
1996 .and_then(|value| match value {
1997 VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
1998 _ => None,
1999 })
2000 .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
2001 .unwrap_or_default(),
2002 };
2003 events.push(event);
2004 let mut next = dict;
2005 next.insert(
2006 "events".to_string(),
2007 VmValue::List(std::sync::Arc::new(events)),
2008 );
2009 apply_transcript_with_budget(state, VmValue::Dict(std::sync::Arc::new(next)), action)
2010}
2011
2012pub fn replace_messages(id: &str, messages: &[serde_json::Value]) -> Result<(), String> {
2015 replace_messages_with_summary(id, messages, None)
2016}
2017
2018pub fn replace_messages_with_summary(
2023 id: &str,
2024 messages: &[serde_json::Value],
2025 summary: Option<&str>,
2026) -> Result<(), String> {
2027 SESSIONS.with(|s| {
2028 let mut map = s.borrow_mut();
2029 let Some(state) = map.get_mut(id) else {
2030 return Err(format!(
2031 "agent_session_replace_messages: unknown session id '{id}'"
2032 ));
2033 };
2034 let dict = state
2035 .transcript
2036 .as_dict()
2037 .cloned()
2038 .unwrap_or_else(BTreeMap::new);
2039 let vm_messages: Vec<VmValue> = messages
2040 .iter()
2041 .map(crate::stdlib::json_to_vm_value)
2042 .collect();
2043 let mut next = dict;
2044 next.insert(
2045 "events".to_string(),
2046 VmValue::List(std::sync::Arc::new(
2047 crate::llm::helpers::transcript_events_from_messages(&vm_messages),
2048 )),
2049 );
2050 next.insert(
2051 "messages".to_string(),
2052 VmValue::List(std::sync::Arc::new(vm_messages)),
2053 );
2054 if let Some(summary) = summary {
2055 next.insert(
2056 "summary".to_string(),
2057 VmValue::String(std::sync::Arc::from(summary.to_string())),
2058 );
2059 } else {
2060 next.remove("summary");
2061 }
2062 apply_transcript_with_budget(
2063 state,
2064 VmValue::Dict(std::sync::Arc::new(next)),
2065 "replace_messages",
2066 )?;
2067 state.last_accessed = Instant::now();
2068 Ok(())
2069 })
2070}
2071
2072pub fn append_subscriber(id: &str, callback: VmValue) {
2073 open_or_create(Some(id.to_string()));
2074 SESSIONS.with(|s| {
2075 if let Some(state) = s.borrow_mut().get_mut(id) {
2076 state.subscribers.push(callback);
2077 state.last_accessed = Instant::now();
2078 }
2079 });
2080}
2081
2082pub fn subscribers_for(id: &str) -> Vec<VmValue> {
2083 SESSIONS.with(|s| {
2084 s.borrow()
2085 .get(id)
2086 .map(|state| state.subscribers.clone())
2087 .unwrap_or_default()
2088 })
2089}
2090
2091pub fn subscriber_count(id: &str) -> usize {
2092 SESSIONS.with(|s| {
2093 s.borrow()
2094 .get(id)
2095 .map(|state| state.subscribers.len())
2096 .unwrap_or(0)
2097 })
2098}
2099
2100pub fn set_active_skills(id: &str, skills: Vec<String>) {
2104 SESSIONS.with(|s| {
2105 if let Some(state) = s.borrow_mut().get_mut(id) {
2106 state.active_skills = skills;
2107 state.last_accessed = Instant::now();
2108 }
2109 });
2110}
2111
2112pub fn active_skills(id: &str) -> Vec<String> {
2116 SESSIONS.with(|s| {
2117 s.borrow()
2118 .get(id)
2119 .map(|state| state.active_skills.clone())
2120 .unwrap_or_default()
2121 })
2122}
2123
2124pub fn claim_tool_format(id: &str, tool_format: &str) -> Result<(), String> {
2130 let tool_format = tool_format.trim();
2131 if tool_format.is_empty() {
2132 return Ok(());
2133 }
2134 SESSIONS.with(|s| {
2135 let mut map = s.borrow_mut();
2136 let Some(state) = map.get_mut(id) else {
2137 return Err(format!("agent session '{id}' does not exist"));
2138 };
2139 match state.tool_format.as_deref() {
2140 Some(existing) if existing != tool_format => Err(format!(
2141 "agent session '{id}' is locked to tool_format='{existing}', but this run requested tool_format='{tool_format}'. Start a new session or fork/reset the transcript before changing tool mode."
2142 )),
2143 Some(_) => {
2144 state.last_accessed = Instant::now();
2145 Ok(())
2146 }
2147 None => {
2148 state.tool_format = Some(tool_format.to_string());
2149 state.last_accessed = Instant::now();
2150 Ok(())
2151 }
2152 }
2153 })
2154}
2155
2156pub fn tool_format(id: &str) -> Option<String> {
2157 SESSIONS.with(|s| {
2158 s.borrow()
2159 .get(id)
2160 .and_then(|state| state.tool_format.clone())
2161 })
2162}
2163
2164pub fn record_system_prompt(id: &str, system_prompt: &str) -> Result<(), String> {
2165 let system_prompt = system_prompt.trim();
2166 if system_prompt.is_empty() {
2167 return Ok(());
2168 }
2169 assert_cache_stable_system_prompt(system_prompt);
2170 SESSIONS.with(|s| {
2171 let mut map = s.borrow_mut();
2172 let Some(state) = map.get_mut(id) else {
2173 return Err(format!("agent session '{id}' does not exist"));
2174 };
2175 let changed = state.system_prompt.as_deref() != Some(system_prompt);
2176 state.system_prompt = Some(system_prompt.to_string());
2177 let dict = state
2178 .transcript
2179 .as_dict()
2180 .cloned()
2181 .unwrap_or_else(BTreeMap::new);
2182 let mut next = dict;
2183 apply_system_prompt_metadata(&mut next, system_prompt);
2184 if changed {
2185 let mut events: Vec<VmValue> = match next.get("events") {
2186 Some(VmValue::List(list)) => list.iter().cloned().collect(),
2187 _ => Vec::new(),
2188 };
2189 events.push(crate::llm::helpers::transcript_event(
2190 "system_prompt",
2191 "system",
2192 "internal",
2193 "",
2194 Some(crate::llm::helpers::system_prompt_event_metadata(
2195 system_prompt,
2196 )),
2197 ));
2198 next.insert(
2199 "events".to_string(),
2200 VmValue::List(std::sync::Arc::new(events)),
2201 );
2202 }
2203 apply_transcript_with_budget(
2204 state,
2205 VmValue::Dict(std::sync::Arc::new(next)),
2206 "record_system_prompt",
2207 )?;
2208 state.last_accessed = Instant::now();
2209 Ok(())
2210 })
2211}
2212
2213pub fn system_prompt(id: &str) -> Option<String> {
2214 SESSIONS.with(|s| {
2215 s.borrow()
2216 .get(id)
2217 .and_then(|state| state.system_prompt.clone())
2218 })
2219}
2220
2221#[cfg(debug_assertions)]
2222fn forbidden_workspace_prompt_token(system_prompt: &str) -> Option<&'static str> {
2223 let mut remaining = system_prompt;
2224 while let Some(index) = remaining.find("{{") {
2225 let candidate = remaining[index + 2..].trim_start();
2226 if candidate.starts_with("workspace_") {
2227 return Some("workspace_");
2228 }
2229 if candidate.starts_with("project_") {
2230 return Some("project_");
2231 }
2232 remaining = candidate;
2233 }
2234 None
2235}
2236
2237#[cfg(debug_assertions)]
2238fn assert_cache_stable_system_prompt(system_prompt: &str) {
2239 if let Some(prefix) = forbidden_workspace_prompt_token(system_prompt) {
2240 panic!(
2241 "{CACHE_STABLE_SYSTEM_PROMPT_DIAGNOSTIC}: session system prompts must not interpolate `{{{{{prefix}...` tokens; move workspace/project context into the workspace-anchor reminder"
2242 );
2243 }
2244}
2245
2246#[cfg(not(debug_assertions))]
2247fn assert_cache_stable_system_prompt(_system_prompt: &str) {}
2248
2249pub fn set_pinned_model(id: &str, model: Option<String>) -> Result<bool, String> {
2254 let normalized = model
2255 .map(|value| value.trim().to_string())
2256 .filter(|value| !value.is_empty());
2257 SESSIONS.with(|s| {
2258 let mut map = s.borrow_mut();
2259 let Some(state) = map.get_mut(id) else {
2260 return Err(format!("agent session '{id}' does not exist"));
2261 };
2262 let changed = state.pinned_model != normalized;
2263 state.pinned_model = normalized;
2264 state.last_accessed = Instant::now();
2265 Ok(changed)
2266 })
2267}
2268
2269pub fn pinned_model(id: &str) -> Option<String> {
2273 SESSIONS.with(|s| {
2274 s.borrow()
2275 .get(id)
2276 .and_then(|state| state.pinned_model.clone())
2277 })
2278}
2279
2280pub fn set_pinned_reasoning_policy(id: &str, policy: Option<String>) -> Result<bool, String> {
2282 let normalized = match policy {
2283 Some(value) => crate::llm::reasoning_policy::normalize_policy_selector(&value)?,
2284 None => None,
2285 };
2286 SESSIONS.with(|s| {
2287 let mut map = s.borrow_mut();
2288 let Some(state) = map.get_mut(id) else {
2289 return Err(format!("agent session '{id}' does not exist"));
2290 };
2291 let changed = state.pinned_reasoning_policy != normalized;
2292 state.pinned_reasoning_policy = normalized;
2293 state.last_accessed = Instant::now();
2294 Ok(changed)
2295 })
2296}
2297
2298pub fn pinned_reasoning_policy(id: &str) -> Option<String> {
2300 SESSIONS.with(|s| {
2301 s.borrow()
2302 .get(id)
2303 .and_then(|state| state.pinned_reasoning_policy.clone())
2304 })
2305}
2306
2307pub fn set_workspace_anchor(id: &str, anchor: Option<WorkspaceAnchor>) -> Result<bool, String> {
2311 SESSIONS.with(|s| {
2312 let mut map = s.borrow_mut();
2313 let Some(state) = map.get_mut(id) else {
2314 return Err(format!("agent session '{id}' does not exist"));
2315 };
2316 let changed = state.workspace_anchor != anchor;
2317 state.workspace_anchor = anchor;
2318 if changed {
2319 crate::llm::permissions::clear_session_grants(id);
2320 }
2321 state.last_accessed = Instant::now();
2322 Ok(changed)
2323 })
2324}
2325
2326pub fn workspace_anchor(id: &str) -> Option<WorkspaceAnchor> {
2328 SESSIONS.with(|s| {
2329 s.borrow()
2330 .get(id)
2331 .and_then(|state| state.workspace_anchor.clone())
2332 })
2333}
2334
2335#[derive(Clone, Debug, PartialEq, Eq)]
2339pub struct ReanchorOutcome {
2340 pub previous: Option<WorkspaceAnchor>,
2341 pub current: WorkspaceAnchor,
2342 pub changed: bool,
2343}
2344
2345pub fn reanchor_session(
2350 id: &str,
2351 new_anchor: WorkspaceAnchor,
2352 carry_transcript: bool,
2353 compacted: bool,
2354 reason: Option<String>,
2355) -> Result<ReanchorOutcome, String> {
2356 let outcome = SESSIONS.with(|s| {
2357 let mut map = s.borrow_mut();
2358 let Some(state) = map.get_mut(id) else {
2359 return Err(format!("agent session '{id}' does not exist"));
2360 };
2361 let previous = state.workspace_anchor.clone();
2362 let changed = previous.as_ref() != Some(&new_anchor);
2363 state.workspace_anchor = Some(new_anchor.clone());
2364 if changed {
2365 crate::llm::permissions::clear_session_grants(id);
2366 }
2367 state.last_accessed = Instant::now();
2368 Ok(ReanchorOutcome {
2369 previous,
2370 current: new_anchor,
2371 changed,
2372 })
2373 })?;
2374 if !outcome.changed {
2375 return Ok(outcome);
2376 }
2377 let previous_json = outcome.previous.as_ref().map(WorkspaceAnchor::to_json);
2378 let current_json = outcome.current.to_json();
2379 let event_metadata = serde_json::json!({
2380 "previous": previous_json,
2381 "current": current_json,
2382 "carry_transcript": carry_transcript,
2383 "compacted": compacted,
2384 "reason": reason,
2385 });
2386 let event = crate::llm::helpers::transcript_event(
2387 "AnchorChanged",
2388 "system",
2389 "internal",
2390 "",
2391 Some(event_metadata),
2392 );
2393 let _ = append_event(id, event);
2394 crate::llm::emit_live_agent_event_sync(&crate::agent_events::AgentEvent::AnchorChanged {
2395 session_id: id.to_string(),
2396 previous: previous_json,
2397 current: current_json,
2398 carry_transcript,
2399 compacted,
2400 reason,
2401 });
2402 Ok(outcome)
2403}
2404
2405pub fn set_workspace_policy(id: &str, policy: WorkspacePolicy) -> Result<bool, String> {
2408 SESSIONS.with(|s| {
2409 let mut map = s.borrow_mut();
2410 let Some(state) = map.get_mut(id) else {
2411 return Err(format!("agent session '{id}' does not exist"));
2412 };
2413 let changed = state.workspace_policy != policy;
2414 state.workspace_policy = policy;
2415 state.last_accessed = Instant::now();
2416 Ok(changed)
2417 })
2418}
2419
2420pub fn workspace_policy(id: &str) -> Option<WorkspacePolicy> {
2422 SESSIONS.with(|s| {
2423 s.borrow()
2424 .get(id)
2425 .map(|state| state.workspace_policy.clone())
2426 })
2427}
2428
2429pub fn add_workspace_root(
2433 id: &str,
2434 root: &str,
2435 mount_mode: Option<MountMode>,
2436 reason: Option<String>,
2437) -> Result<String, String> {
2438 let normalized_root = validate_workspace_root_path(root)?;
2439 let mounted_at = crate::orchestration::now_rfc3339();
2440 SESSIONS.with(|s| {
2441 let mut map = s.borrow_mut();
2442 let Some(state) = map.get_mut(id) else {
2443 return Err(format!("agent session '{id}' does not exist"));
2444 };
2445 let default_mount_mode = state.workspace_policy.default_mount_mode;
2446 let Some(anchor) = state.workspace_anchor.as_mut() else {
2447 return Err(format!("agent session '{id}' has no workspace anchor"));
2448 };
2449 let resolved_mount_mode = mount_mode.unwrap_or(default_mount_mode);
2450 if let Some(existing) = anchor
2451 .additional_roots
2452 .iter_mut()
2453 .find(|entry| entry.path == normalized_root)
2454 {
2455 existing.mount_mode = resolved_mount_mode;
2456 existing.mounted_at = mounted_at.clone();
2457 } else {
2458 anchor.additional_roots.push(MountedRoot {
2459 path: normalized_root.clone(),
2460 mount_mode: resolved_mount_mode,
2461 mounted_at: mounted_at.clone(),
2462 });
2463 }
2464 let event = crate::llm::helpers::transcript_event(
2465 "RootMounted",
2466 "system",
2467 "internal",
2468 "",
2469 Some(serde_json::json!({
2470 "path": normalized_root.to_string_lossy(),
2471 "mount_mode": resolved_mount_mode.as_str(),
2472 "mounted_at": mounted_at.clone(),
2473 "reason": reason,
2474 })),
2475 );
2476 append_event_to_state(state, event, "add_workspace_root")?;
2477 crate::llm::permissions::clear_session_grants(id);
2478 state.last_accessed = Instant::now();
2479 Ok(mounted_at.clone())
2480 })
2481}
2482
2483pub fn remove_workspace_root(id: &str, root: &str) -> Result<bool, String> {
2486 let normalized_root = normalize_workspace_root_path(root);
2487 SESSIONS.with(|s| {
2488 let mut map = s.borrow_mut();
2489 let Some(state) = map.get_mut(id) else {
2490 return Err(format!("agent session '{id}' does not exist"));
2491 };
2492 let Some(anchor) = state.workspace_anchor.as_mut() else {
2493 return Err(format!("agent session '{id}' has no workspace anchor"));
2494 };
2495 let before = anchor.additional_roots.len();
2496 anchor
2497 .additional_roots
2498 .retain(|entry| entry.path != normalized_root);
2499 let removed = anchor.additional_roots.len() != before;
2500 if removed {
2501 crate::llm::permissions::clear_session_grants(id);
2502 }
2503 state.last_accessed = Instant::now();
2504 Ok(removed)
2505 })
2506}
2507
2508pub fn list_workspace_roots(id: &str) -> Result<(PathBuf, Vec<MountedRoot>), String> {
2509 SESSIONS.with(|s| {
2510 let map = s.borrow();
2511 let Some(state) = map.get(id) else {
2512 return Err(format!("agent session '{id}' does not exist"));
2513 };
2514 let Some(anchor) = state.workspace_anchor.as_ref() else {
2515 return Err(format!("agent session '{id}' has no workspace anchor"));
2516 };
2517 Ok((anchor.primary.clone(), anchor.additional_roots.clone()))
2518 })
2519}
2520
2521fn validate_workspace_root_path(root: &str) -> Result<PathBuf, String> {
2522 let normalized = normalize_workspace_root_path(root);
2523 let canonical = std::fs::canonicalize(&normalized)
2524 .map_err(|error| format!("workspace root '{root}' must exist and be readable: {error}"))?;
2525 let metadata = std::fs::metadata(&canonical)
2526 .map_err(|error| format!("workspace root '{root}' must exist and be readable: {error}"))?;
2527 if !metadata.is_dir() {
2528 return Err(format!("workspace root '{root}' must be a directory"));
2529 }
2530 std::fs::read_dir(&canonical)
2531 .map_err(|error| format!("workspace root '{root}' must be readable: {error}"))?;
2532 Ok(canonical)
2533}
2534
2535fn normalize_workspace_root_path(root: &str) -> PathBuf {
2536 let absolute = crate::stdlib::process::normalize_context_path(Path::new(root));
2537 std::fs::canonicalize(&absolute).unwrap_or(absolute)
2538}
2539
2540fn empty_transcript(id: &str) -> VmValue {
2541 use crate::llm::helpers::new_transcript_with;
2542 new_transcript_with(Some(id.to_string()), Vec::new(), None, None)
2543}
2544
2545fn clone_transcript_with_id(transcript: &VmValue, new_id: &str) -> VmValue {
2546 let Some(dict) = transcript.as_dict() else {
2547 return empty_transcript(new_id);
2548 };
2549 let mut next = dict.clone();
2550 next.insert(
2551 "id".to_string(),
2552 VmValue::String(std::sync::Arc::from(new_id.to_string())),
2553 );
2554 VmValue::Dict(std::sync::Arc::new(next))
2555}
2556
2557fn clone_transcript_with_parent(transcript: &VmValue, parent_id: &str) -> VmValue {
2558 let Some(dict) = transcript.as_dict() else {
2559 return transcript.clone();
2560 };
2561 let mut next = dict.clone();
2562 let metadata = match next.get("metadata") {
2563 Some(VmValue::Dict(metadata)) => {
2564 let mut metadata = metadata.as_ref().clone();
2565 metadata.insert(
2566 "parent_session_id".to_string(),
2567 VmValue::String(std::sync::Arc::from(parent_id.to_string())),
2568 );
2569 VmValue::Dict(std::sync::Arc::new(metadata))
2570 }
2571 _ => VmValue::Dict(std::sync::Arc::new(BTreeMap::from([(
2572 "parent_session_id".to_string(),
2573 VmValue::String(std::sync::Arc::from(parent_id.to_string())),
2574 )]))),
2575 };
2576 next.insert("metadata".to_string(), metadata);
2577 VmValue::Dict(std::sync::Arc::new(next))
2578}
2579
2580fn apply_system_prompt_metadata(next: &mut BTreeMap<String, VmValue>, system_prompt: &str) {
2581 let mut metadata = match next.get("metadata") {
2582 Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
2583 _ => BTreeMap::new(),
2584 };
2585 metadata.insert(
2586 "system_prompt".to_string(),
2587 crate::stdlib::json_to_vm_value(&crate::llm::helpers::system_prompt_metadata(
2588 system_prompt,
2589 )),
2590 );
2591 next.insert(
2592 "metadata".to_string(),
2593 VmValue::Dict(std::sync::Arc::new(metadata)),
2594 );
2595}
2596
2597fn transcript_with_session_metadata(transcript: VmValue, state: &SessionState) -> VmValue {
2598 let Some(dict) = transcript.as_dict() else {
2599 return transcript;
2600 };
2601 let mut next = dict.clone();
2602 let mut metadata = match next.get("metadata") {
2603 Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
2604 _ => BTreeMap::new(),
2605 };
2606 if let Some(tool_format) = state.tool_format.as_ref() {
2607 metadata.insert(
2608 "tool_format".to_string(),
2609 VmValue::String(std::sync::Arc::from(tool_format.clone())),
2610 );
2611 metadata.insert("tool_mode_locked".to_string(), VmValue::Bool(true));
2612 }
2613 if let Some(system_prompt) = state.system_prompt.as_ref() {
2614 metadata.insert(
2615 "system_prompt".to_string(),
2616 crate::stdlib::json_to_vm_value(&crate::llm::helpers::system_prompt_metadata(
2617 system_prompt,
2618 )),
2619 );
2620 }
2621 if let Some(anchor) = state.workspace_anchor.as_ref() {
2622 metadata.insert(
2623 WORKSPACE_ANCHOR_METADATA_KEY.to_string(),
2624 anchor.to_vm_value(),
2625 );
2626 } else {
2627 metadata.remove(WORKSPACE_ANCHOR_METADATA_KEY);
2628 }
2629 if let Some(scratchpad) = state.scratchpad.as_ref() {
2630 metadata.insert("agent_scratchpad".to_string(), scratchpad.clone());
2631 metadata.insert(
2632 "agent_scratchpad_version".to_string(),
2633 VmValue::Int(state.scratchpad_version as i64),
2634 );
2635 } else {
2636 metadata.remove("agent_scratchpad");
2637 metadata.remove("agent_scratchpad_version");
2638 }
2639 if let Some(last_action) = state.last_transcript_budget_action.as_ref() {
2640 let usage = transcript_usage(
2641 &VmValue::Dict(std::sync::Arc::new(next.clone())),
2642 state.transcript_budget_policy.max_approx_bytes.is_some(),
2643 );
2644 metadata.insert(
2645 "transcript_budget".to_string(),
2646 crate::stdlib::json_to_vm_value(&serde_json::json!({
2647 "policy": transcript_budget_policy_json(&state.transcript_budget_policy.normalized()),
2648 "usage": transcript_budget_usage_json(&usage),
2649 "last_action": last_action,
2650 })),
2651 );
2652 }
2653 if !metadata.is_empty() {
2654 next.insert(
2655 "metadata".to_string(),
2656 VmValue::Dict(std::sync::Arc::new(metadata)),
2657 );
2658 }
2659 VmValue::Dict(std::sync::Arc::new(next))
2660}
2661
2662fn session_snapshot(state: &SessionState) -> VmValue {
2663 let transcript = transcript_with_session_metadata(state.transcript.clone(), state);
2664 let Some(dict) = transcript.as_dict() else {
2665 return state.transcript.clone();
2666 };
2667 let mut next = dict.clone();
2668 let length = next
2669 .get("messages")
2670 .and_then(|value| match value {
2671 VmValue::List(list) => Some(list.len() as i64),
2672 _ => None,
2673 })
2674 .unwrap_or(0);
2675 next.insert("length".to_string(), VmValue::Int(length));
2676 next.insert(
2677 "created_at".to_string(),
2678 VmValue::String(std::sync::Arc::from(state.created_at.clone())),
2679 );
2680 next.insert(
2681 "parent_id".to_string(),
2682 state
2683 .parent_id
2684 .as_ref()
2685 .map(|id| VmValue::String(std::sync::Arc::from(id.clone())))
2686 .unwrap_or(VmValue::Nil),
2687 );
2688 next.insert(
2689 "child_ids".to_string(),
2690 VmValue::List(std::sync::Arc::new(
2691 state
2692 .child_ids
2693 .iter()
2694 .cloned()
2695 .map(|id| VmValue::String(std::sync::Arc::from(id)))
2696 .collect(),
2697 )),
2698 );
2699 next.insert(
2700 "branched_at_event_index".to_string(),
2701 state
2702 .branched_at_event_index
2703 .map(|index| VmValue::Int(index as i64))
2704 .unwrap_or(VmValue::Nil),
2705 );
2706 next.insert(
2707 "system_prompt".to_string(),
2708 state
2709 .system_prompt
2710 .as_ref()
2711 .map(|prompt| VmValue::String(std::sync::Arc::from(prompt.clone())))
2712 .unwrap_or(VmValue::Nil),
2713 );
2714 next.insert(
2715 "tool_format".to_string(),
2716 state
2717 .tool_format
2718 .as_ref()
2719 .map(|format| VmValue::String(std::sync::Arc::from(format.clone())))
2720 .unwrap_or(VmValue::Nil),
2721 );
2722 next.insert(
2723 "pinned_model".to_string(),
2724 state
2725 .pinned_model
2726 .as_ref()
2727 .map(|model| VmValue::String(std::sync::Arc::from(model.clone())))
2728 .unwrap_or(VmValue::Nil),
2729 );
2730 next.insert(
2731 "pinned_reasoning_policy".to_string(),
2732 state
2733 .pinned_reasoning_policy
2734 .as_ref()
2735 .map(|policy| VmValue::String(std::sync::Arc::from(policy.clone())))
2736 .unwrap_or(VmValue::Nil),
2737 );
2738 next.insert(
2739 "scratchpad".to_string(),
2740 state.scratchpad.clone().unwrap_or(VmValue::Nil),
2741 );
2742 next.insert(
2743 "scratchpad_version".to_string(),
2744 VmValue::Int(state.scratchpad_version as i64),
2745 );
2746 next.insert(
2747 "workspace_anchor".to_string(),
2748 state
2749 .workspace_anchor
2750 .as_ref()
2751 .map(WorkspaceAnchor::to_vm_value)
2752 .unwrap_or(VmValue::Nil),
2753 );
2754 next.insert(
2755 "workspace_policy".to_string(),
2756 state.workspace_policy.to_vm_value(),
2757 );
2758 VmValue::Dict(std::sync::Arc::new(next))
2759}
2760
2761fn update_lineage(
2762 map: &mut HashMap<String, SessionState>,
2763 parent_id: &str,
2764 child_id: &str,
2765 branched_at_event_index: Option<usize>,
2766) {
2767 let old_parent_id = map.get(child_id).and_then(|child| child.parent_id.clone());
2768 if let Some(old_parent_id) = old_parent_id.filter(|old_parent_id| old_parent_id != parent_id) {
2769 if let Some(old_parent) = map.get_mut(&old_parent_id) {
2770 old_parent.child_ids.retain(|id| id != child_id);
2771 old_parent.last_accessed = Instant::now();
2772 }
2773 }
2774 if let Some(parent) = map.get_mut(parent_id) {
2775 parent.last_accessed = Instant::now();
2776 if !parent.child_ids.iter().any(|id| id == child_id) {
2777 parent.child_ids.push(child_id.to_string());
2778 }
2779 }
2780 if let Some(child) = map.get_mut(child_id) {
2781 child.last_accessed = Instant::now();
2782 child.parent_id = Some(parent_id.to_string());
2783 child.branched_at_event_index = branched_at_event_index;
2784 child.transcript = clone_transcript_with_parent(&child.transcript, parent_id);
2785 }
2786}
2787
2788fn branch_event_index(transcript: &VmValue, keep_first: usize) -> usize {
2789 if keep_first == 0 {
2790 return 0;
2791 }
2792 let Some(dict) = transcript.as_dict() else {
2793 return keep_first;
2794 };
2795 let Some(VmValue::List(events)) = dict.get("events") else {
2796 return keep_first;
2797 };
2798 event_prefix_len_for_messages(events, keep_first)
2799}
2800
2801fn event_kind(event: &VmValue) -> Option<String> {
2802 event
2803 .as_dict()
2804 .and_then(|dict| dict.get("kind"))
2805 .map(VmValue::display)
2806}
2807
2808fn event_id(event: &VmValue) -> Option<String> {
2809 event
2810 .as_dict()
2811 .and_then(|dict| dict.get("id"))
2812 .map(VmValue::display)
2813}
2814
2815fn is_turn_event(event: &VmValue) -> bool {
2816 matches!(
2817 event_kind(event).as_deref(),
2818 Some("message" | "tool_result")
2819 )
2820}
2821
2822fn event_prefix_len_for_messages(events: &[VmValue], keep_first: usize) -> usize {
2823 if keep_first == 0 {
2824 return 0;
2825 }
2826 let mut retained_messages = 0usize;
2827 for (index, event) in events.iter().enumerate() {
2828 if is_turn_event(event) {
2829 retained_messages += 1;
2830 if retained_messages == keep_first {
2831 return index + 1;
2832 }
2833 }
2834 }
2835 events.len()
2836}
2837
2838fn turn_event_id_for_count(events: &[VmValue], keep_first: usize) -> Option<String> {
2839 if keep_first == 0 {
2840 return None;
2841 }
2842 let mut retained_messages = 0usize;
2843 for event in events {
2844 if is_turn_event(event) {
2845 retained_messages += 1;
2846 if retained_messages == keep_first {
2847 return event_id(event);
2848 }
2849 }
2850 }
2851 None
2852}
2853
2854#[cfg(test)]
2855#[path = "agent_sessions_tests.rs"]
2856mod tests;