1use crate::value::VmDictExt;
20use std::cell::{Cell, RefCell};
21use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
22use std::future::Future;
23use std::path::{Path, PathBuf};
24use std::sync::{Mutex, OnceLock};
25use std::time::Instant;
26
27use crate::actor_chain::ActorChain;
28use crate::runtime_limits::RuntimeLimits;
29use crate::value::VmValue;
30use crate::workspace_anchor::{
31 MountMode, MountedRoot, WorkspaceAnchor, WorkspacePolicy, WORKSPACE_ANCHOR_METADATA_KEY,
32};
33
34const LIVE_CLIENT_EVENT_KIND: &str = "live_session_client";
35const LIVE_CLIENT_PERMISSION_EVENT_KIND: &str = "live_session_permission_route";
36
37pub const DEFAULT_SESSION_CAP: usize = RuntimeLimits::DEFAULT.max_agent_sessions;
40
41pub const DEFAULT_TRANSCRIPT_MESSAGE_CAP: usize = 4096;
45
46pub const DEFAULT_TRANSCRIPT_EVENT_CAP: usize = 32768;
49pub const MAX_SCRATCHPAD_BYTES: usize = 16 * 1024;
50#[cfg(debug_assertions)]
51const CACHE_STABLE_SYSTEM_PROMPT_DIAGNOSTIC: &str = "HARN-CACHE-001";
52
53#[derive(Clone, Debug, PartialEq, Eq)]
54pub enum TranscriptBudgetRecovery {
55 Reject,
56 Trim { keep_last: usize },
57 Compact { keep_last: usize },
58}
59
60#[derive(Clone, Debug, PartialEq, Eq)]
61pub struct SessionTranscriptBudgetPolicy {
62 pub max_messages: usize,
63 pub max_events: usize,
64 pub max_approx_bytes: Option<usize>,
65 pub recovery: TranscriptBudgetRecovery,
66}
67
68impl SessionTranscriptBudgetPolicy {
69 pub fn reject(max_messages: usize, max_events: usize) -> Self {
70 Self {
71 max_messages: max_messages.max(1),
72 max_events: max_events.max(1),
73 max_approx_bytes: None,
74 recovery: TranscriptBudgetRecovery::Reject,
75 }
76 }
77
78 pub fn trim(max_messages: usize, max_events: usize, keep_last: usize) -> Self {
79 Self {
80 max_messages: max_messages.max(1),
81 max_events: max_events.max(1),
82 max_approx_bytes: None,
83 recovery: TranscriptBudgetRecovery::Trim { keep_last },
84 }
85 }
86
87 pub fn compact(max_messages: usize, max_events: usize, keep_last: usize) -> Self {
88 Self {
89 max_messages: max_messages.max(1),
90 max_events: max_events.max(1),
91 max_approx_bytes: None,
92 recovery: TranscriptBudgetRecovery::Compact { keep_last },
93 }
94 }
95
96 pub fn with_max_approx_bytes(mut self, max_approx_bytes: Option<usize>) -> Self {
97 self.max_approx_bytes = max_approx_bytes.map(|limit| limit.max(1));
98 self
99 }
100
101 fn normalized(&self) -> Self {
102 Self {
103 max_messages: self.max_messages.max(1),
104 max_events: self.max_events.max(1),
105 max_approx_bytes: self.max_approx_bytes.map(|limit| limit.max(1)),
106 recovery: self.recovery.clone(),
107 }
108 }
109}
110
111impl Default for SessionTranscriptBudgetPolicy {
112 fn default() -> Self {
113 Self::reject(DEFAULT_TRANSCRIPT_MESSAGE_CAP, DEFAULT_TRANSCRIPT_EVENT_CAP)
114 }
115}
116
117pub struct SessionState {
118 pub id: String,
119 pub transcript: VmValue,
120 pub subscribers: Vec<VmValue>,
121 pub created_at: String,
122 pub last_accessed: Instant,
123 pub parent_id: Option<String>,
124 pub child_ids: Vec<String>,
125 pub branched_at_event_index: Option<usize>,
126 pub actor_chain: Option<ActorChain>,
127 pub active_skills: Vec<String>,
133 pub tool_format: Option<String>,
137 pub system_prompt: Option<String>,
141 pub pinned_model: Option<String>,
147 pub pinned_reasoning_policy: Option<String>,
153 pub workspace_policy: WorkspacePolicy,
156 pub workspace_anchor: Option<WorkspaceAnchor>,
162 pub scratchpad: Option<VmValue>,
165 pub scratchpad_version: u64,
166 pub transcript_budget_policy: SessionTranscriptBudgetPolicy,
167 pub last_transcript_budget_action: Option<serde_json::Value>,
168 pub live_clients: BTreeMap<String, LiveSessionClient>,
169 pub live_controller_id: Option<String>,
170 pub completed_turn_checkpoints: Vec<SessionTurnCheckpoint>,
171 pub redo_stack: Vec<SessionRedoEntry>,
172}
173
174impl SessionState {
175 fn new(id: String) -> Self {
176 let now = Instant::now();
177 let transcript = empty_transcript(&id);
178 Self {
179 id,
180 transcript,
181 subscribers: Vec::new(),
182 created_at: crate::orchestration::now_rfc3339(),
183 last_accessed: now,
184 parent_id: None,
185 child_ids: Vec::new(),
186 branched_at_event_index: None,
187 actor_chain: None,
188 active_skills: Vec::new(),
189 tool_format: None,
190 system_prompt: None,
191 pinned_model: None,
192 pinned_reasoning_policy: None,
193 workspace_policy: WorkspacePolicy::default(),
194 workspace_anchor: None,
195 scratchpad: None,
196 scratchpad_version: 0,
197 transcript_budget_policy: default_transcript_budget_policy(),
198 last_transcript_budget_action: None,
199 live_clients: BTreeMap::new(),
200 live_controller_id: None,
201 completed_turn_checkpoints: Vec::new(),
202 redo_stack: Vec::new(),
203 }
204 }
205
206 fn touch(&mut self) {
207 self.last_accessed = Instant::now();
208 }
209
210 fn replace_transcript(&mut self, transcript: VmValue) {
211 if !crate::values_equal(&self.transcript, &transcript) {
212 self.redo_stack.clear();
213 }
214 self.transcript = transcript;
215 self.touch();
216 }
217}
218
219#[derive(Clone, Debug, PartialEq, Eq)]
220pub enum LiveClientMode {
221 Observer,
222 Controller,
223}
224
225impl LiveClientMode {
226 fn as_str(&self) -> &'static str {
227 match self {
228 Self::Observer => "observer",
229 Self::Controller => "controller",
230 }
231 }
232}
233
234#[derive(Clone, Debug, PartialEq, Eq)]
235pub struct LiveSessionClient {
236 pub client_id: String,
237 pub mode: LiveClientMode,
238 pub attached_at: String,
239 pub last_seen_at: String,
240 pub prompt_injection: bool,
241 pub permission_routing: bool,
242 pub metadata: serde_json::Value,
243}
244
245#[derive(Clone, Debug, PartialEq, Eq)]
246pub struct AttachLiveClient {
247 pub client_id: String,
248 pub mode: LiveClientMode,
249 pub takeover: bool,
250 pub prompt_injection: bool,
251 pub permission_routing: bool,
252 pub metadata: serde_json::Value,
253}
254
255#[derive(Clone, Debug, PartialEq, Eq)]
256pub struct LiveClientChange {
257 pub client: Option<LiveSessionClient>,
258 pub previous_controller_id: Option<String>,
259 pub active_controller_id: Option<String>,
260 pub clients: Vec<LiveSessionClient>,
261}
262
263#[derive(Clone, Debug, PartialEq, Eq)]
264pub struct SessionAncestry {
265 pub parent_id: Option<String>,
266 pub child_ids: Vec<String>,
267 pub root_id: String,
268}
269
270#[derive(Clone, Debug, PartialEq, Eq)]
271pub struct SessionTruncateResult {
272 pub kept_turn_count: usize,
273 pub removed_turn_count: usize,
274 pub new_tip_turn_id: Option<String>,
275}
276
277#[derive(Clone, Debug, PartialEq, Eq)]
278pub struct SessionCheckpointSummary {
279 pub checkpoint_id: String,
280 pub before_message_count: usize,
281 pub after_message_count: usize,
282 pub fs_snapshot_ids: Vec<String>,
283}
284
285#[derive(Clone, Debug)]
286pub struct SessionTurnCheckpoint {
287 pub checkpoint_id: String,
288 pub completed_at: String,
289 pub before_transcript: VmValue,
290 pub after_transcript: VmValue,
291 pub before_message_count: usize,
292 pub after_message_count: usize,
293 pub fs_snapshot_ids: Vec<String>,
294}
295
296#[derive(Clone, Debug)]
297pub struct SessionRedoEntry {
298 pub checkpoint: SessionTurnCheckpoint,
299 pub redo_fs_snapshot_ids: Vec<String>,
300}
301
302#[derive(Clone, Debug, PartialEq, Eq)]
303pub enum SessionCheckpointError {
304 UnknownSession,
305 NoCheckpoint,
306 NoRedo,
307}
308
309#[derive(Clone, Debug, PartialEq, Eq)]
310pub struct SessionCheckpointOutcome {
311 pub status: &'static str,
312 pub checkpoint: SessionCheckpointSummary,
313 pub redo_fs_snapshot_ids: Vec<String>,
314}
315
316#[derive(Clone, Debug, PartialEq, Eq)]
317pub struct ReminderInjectionReport {
318 pub reminder_id: String,
319 pub deduped_count: usize,
320}
321
322thread_local! {
323 static SESSIONS: RefCell<HashMap<String, SessionState>> = RefCell::new(HashMap::new());
324 static SESSION_CAP: Cell<usize> = const { Cell::new(DEFAULT_SESSION_CAP) };
325 static DEFAULT_TRANSCRIPT_BUDGET_POLICY: RefCell<SessionTranscriptBudgetPolicy> =
326 RefCell::new(SessionTranscriptBudgetPolicy::default());
327 static CURRENT_SESSION_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
328 static CURRENT_TOOL_CALL_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
329}
330
331tokio::task_local! {
332 static CURRENT_TOOL_CALL_TASK: String;
333}
334
335static SESSION_CHANGED_PATHS: OnceLock<Mutex<BTreeMap<String, BTreeSet<String>>>> = OnceLock::new();
345
346fn session_changed_paths_store() -> &'static Mutex<BTreeMap<String, BTreeSet<String>>> {
347 SESSION_CHANGED_PATHS.get_or_init(|| Mutex::new(BTreeMap::new()))
348}
349
350pub fn record_session_changed_path(session_id: &str, path: &str) {
353 if session_id.is_empty() || path.is_empty() {
354 return;
355 }
356 if let Ok(mut store) = session_changed_paths_store().lock() {
357 store
358 .entry(session_id.to_string())
359 .or_default()
360 .insert(path.to_string());
361 }
362}
363
364pub fn session_changed_paths(session_id: &str) -> Vec<String> {
367 session_changed_paths_store()
368 .lock()
369 .ok()
370 .and_then(|store| {
371 store
372 .get(session_id)
373 .map(|set| set.iter().cloned().collect())
374 })
375 .unwrap_or_default()
376}
377
378pub fn take_session_changed_paths(session_id: &str) -> Vec<String> {
382 session_changed_paths_store()
383 .lock()
384 .ok()
385 .and_then(|mut store| store.remove(session_id))
386 .map(|set| set.into_iter().collect())
387 .unwrap_or_default()
388}
389
390pub fn clear_session_changed_paths(session_id: &str) {
392 if let Ok(mut store) = session_changed_paths_store().lock() {
393 store.remove(session_id);
394 }
395}
396
397pub struct CurrentSessionGuard {
398 active: bool,
399}
400
401impl Drop for CurrentSessionGuard {
402 fn drop(&mut self) {
403 if self.active {
404 pop_current_session();
405 }
406 }
407}
408
409pub struct CurrentToolCallGuard {
415 active: bool,
416}
417
418impl Drop for CurrentToolCallGuard {
419 fn drop(&mut self) {
420 if self.active {
421 pop_current_tool_call();
422 }
423 }
424}
425
426pub fn set_session_cap(cap: usize) {
429 SESSION_CAP.with(|c| c.set(cap.max(1)));
430}
431
432pub fn session_cap() -> usize {
433 SESSION_CAP.with(|c| c.get())
434}
435
436pub fn set_default_transcript_budget_policy(policy: SessionTranscriptBudgetPolicy) {
437 DEFAULT_TRANSCRIPT_BUDGET_POLICY.with(|cell| {
438 *cell.borrow_mut() = policy.normalized();
439 });
440}
441
442pub fn reset_default_transcript_budget_policy() {
443 set_default_transcript_budget_policy(SessionTranscriptBudgetPolicy::default());
444}
445
446pub fn default_transcript_budget_policy() -> SessionTranscriptBudgetPolicy {
447 DEFAULT_TRANSCRIPT_BUDGET_POLICY.with(|cell| cell.borrow().clone())
448}
449
450pub fn transcript_budget_policy(id: &str) -> Option<SessionTranscriptBudgetPolicy> {
451 SESSIONS.with(|s| {
452 s.borrow()
453 .get(id)
454 .map(|state| state.transcript_budget_policy.clone())
455 })
456}
457
458pub fn set_transcript_budget_policy(
459 id: &str,
460 policy: SessionTranscriptBudgetPolicy,
461) -> Result<(), String> {
462 SESSIONS.with(|s| {
463 let mut map = s.borrow_mut();
464 let Some(state) = map.get_mut(id) else {
465 return Err(format!("agent session '{id}' does not exist"));
466 };
467 let previous = state.transcript_budget_policy.clone();
468 let previous_action = state.last_transcript_budget_action.clone();
469 state.transcript_budget_policy = policy.normalized();
470 let candidate = state.transcript.clone();
471 if let Err(error) = apply_transcript_with_budget(state, candidate, "policy_update") {
472 state.transcript_budget_policy = previous;
473 state.last_transcript_budget_action = previous_action;
474 return Err(error);
475 }
476 Ok(())
477 })
478}
479
480pub fn reset_session_store() {
482 SESSIONS.with(|s| s.borrow_mut().clear());
483 CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().clear());
484 CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow_mut().clear());
485 reset_default_transcript_budget_policy();
486}
487
488pub(crate) fn push_current_session(id: String) {
489 if id.is_empty() {
490 return;
491 }
492 CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().push(id));
493}
494
495pub(crate) fn swap_current_session_stack(replacement: Vec<String>) -> Vec<String> {
496 CURRENT_SESSION_STACK.with(|stack| std::mem::replace(&mut *stack.borrow_mut(), replacement))
497}
498
499pub(crate) fn pop_current_session() {
500 CURRENT_SESSION_STACK.with(|stack| {
501 let _ = stack.borrow_mut().pop();
502 });
503}
504
505pub fn current_session_id() -> Option<String> {
506 CURRENT_SESSION_STACK.with(|stack| stack.borrow().last().cloned())
507}
508
509pub fn current_actor_chain() -> Option<ActorChain> {
510 current_session_id().as_deref().and_then(actor_chain)
511}
512
513pub fn enter_current_session(id: impl Into<String>) -> CurrentSessionGuard {
514 let id = id.into();
515 if id.trim().is_empty() {
516 return CurrentSessionGuard { active: false };
517 }
518 push_current_session(id);
519 CurrentSessionGuard { active: true }
520}
521
522pub fn actor_chain(id: &str) -> Option<ActorChain> {
523 SESSIONS.with(|s| {
524 s.borrow()
525 .get(id)
526 .and_then(|state| state.actor_chain.clone())
527 })
528}
529
530pub fn set_actor_chain(id: &str, actor_chain: Option<ActorChain>) -> Result<bool, String> {
531 SESSIONS.with(|s| {
532 let mut map = s.borrow_mut();
533 let Some(state) = map.get_mut(id) else {
534 return Err(format!("agent session '{id}' does not exist"));
535 };
536 let changed = state.actor_chain != actor_chain;
537 state.actor_chain = actor_chain;
538 state.touch();
539 Ok(changed)
540 })
541}
542
543fn push_current_tool_call(id: String) {
544 if id.is_empty() {
545 return;
546 }
547 CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow_mut().push(id));
548}
549
550fn pop_current_tool_call() {
551 CURRENT_TOOL_CALL_STACK.with(|stack| {
552 let _ = stack.borrow_mut().pop();
553 });
554}
555
556pub fn current_tool_call_id() -> Option<String> {
561 if let Ok(id) = CURRENT_TOOL_CALL_TASK.try_with(Clone::clone) {
562 if !id.trim().is_empty() {
563 return Some(id);
564 }
565 }
566 CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow().last().cloned())
567}
568
569pub async fn scope_current_tool_call<F, T>(id: impl Into<String>, future: F) -> T
575where
576 F: Future<Output = T>,
577{
578 let id = id.into();
579 if id.trim().is_empty() {
580 future.await
581 } else {
582 CURRENT_TOOL_CALL_TASK.scope(id, future).await
583 }
584}
585
586pub fn enter_current_tool_call(id: impl Into<String>) -> CurrentToolCallGuard {
588 let id = id.into();
589 if id.trim().is_empty() {
590 return CurrentToolCallGuard { active: false };
591 }
592 push_current_tool_call(id);
593 CurrentToolCallGuard { active: true }
594}
595
596pub fn exists(id: &str) -> bool {
597 SESSIONS.with(|s| s.borrow().contains_key(id))
598}
599
600pub fn length(id: &str) -> Option<usize> {
601 SESSIONS.with(|s| {
602 s.borrow().get(id).map(|state| {
603 state
604 .transcript
605 .as_dict()
606 .and_then(|d| d.get("messages"))
607 .and_then(|v| match v {
608 VmValue::List(list) => Some(list.len()),
609 _ => None,
610 })
611 .unwrap_or(0)
612 })
613 })
614}
615
616pub fn scratchpad(id: &str) -> Option<VmValue> {
617 SESSIONS.with(|s| {
618 s.borrow()
619 .get(id)
620 .and_then(|state| state.scratchpad.clone())
621 })
622}
623
624pub fn scratchpad_version(id: &str) -> Option<u64> {
625 SESSIONS.with(|s| s.borrow().get(id).map(|state| state.scratchpad_version))
626}
627
628pub fn set_scratchpad(
629 id: &str,
630 scratchpad: VmValue,
631 source: impl Into<String>,
632 reason: Option<String>,
633 metadata: serde_json::Value,
634) -> Result<u64, String> {
635 validate_scratchpad_value(&scratchpad)?;
636 SESSIONS.with(|s| {
637 let mut map = s.borrow_mut();
638 let Some(state) = map.get_mut(id) else {
639 return Err(format!("agent session '{id}' does not exist"));
640 };
641 let version = state.scratchpad_version.saturating_add(1);
642 let event = scratchpad_transcript_event(
643 "set",
644 version,
645 Some(&scratchpad),
646 source.into(),
647 reason,
648 metadata,
649 );
650 append_event_to_state(state, event, "set_scratchpad")?;
651 state.scratchpad = Some(scratchpad);
652 state.scratchpad_version = version;
653 state.touch();
654 Ok(version)
655 })
656}
657
658pub fn clear_scratchpad(
659 id: &str,
660 source: impl Into<String>,
661 reason: Option<String>,
662 metadata: serde_json::Value,
663) -> Result<u64, String> {
664 SESSIONS.with(|s| {
665 let mut map = s.borrow_mut();
666 let Some(state) = map.get_mut(id) else {
667 return Err(format!("agent session '{id}' does not exist"));
668 };
669 let version = state.scratchpad_version.saturating_add(1);
670 let event =
671 scratchpad_transcript_event("clear", version, None, source.into(), reason, metadata);
672 append_event_to_state(state, event, "clear_scratchpad")?;
673 state.scratchpad = None;
674 state.scratchpad_version = version;
675 state.touch();
676 Ok(version)
677 })
678}
679
680fn validate_scratchpad_value(value: &VmValue) -> Result<(), String> {
681 if !matches!(value, VmValue::Dict(_)) {
682 return Err("agent session scratchpad must be a dict".to_string());
683 }
684 let json = crate::llm::helpers::vm_value_to_json(value);
685 let approx_bytes = serde_json::to_vec(&json)
686 .map(|bytes| bytes.len())
687 .unwrap_or(usize::MAX);
688 if approx_bytes > MAX_SCRATCHPAD_BYTES {
689 return Err(format!(
690 "agent session scratchpad is {approx_bytes} bytes; max is {MAX_SCRATCHPAD_BYTES}"
691 ));
692 }
693 Ok(())
694}
695
696fn scratchpad_transcript_event(
697 action: &str,
698 version: u64,
699 scratchpad: Option<&VmValue>,
700 source: String,
701 reason: Option<String>,
702 metadata: serde_json::Value,
703) -> VmValue {
704 let scratchpad_json = scratchpad.map(crate::llm::helpers::vm_value_to_json);
705 let approx_bytes = scratchpad_json
706 .as_ref()
707 .and_then(|value| serde_json::to_vec(value).ok().map(|bytes| bytes.len()))
708 .unwrap_or(0);
709 let event_metadata = serde_json::json!({
710 "action": action,
711 "version": version,
712 "source": normalize_scratchpad_source(source),
713 "reason": reason.unwrap_or_default(),
714 "approx_bytes": approx_bytes,
715 "counts": scratchpad_json
716 .as_ref()
717 .map(scratchpad_counts_json)
718 .unwrap_or_else(|| serde_json::json!({})),
719 "metadata": metadata,
720 });
721 let content = format!("Agent scratchpad {action}");
722 crate::llm::helpers::transcript_event(
723 "agent_scratchpad",
724 "system",
725 "internal",
726 &content,
727 Some(event_metadata),
728 )
729}
730
731fn normalize_scratchpad_source(source: String) -> String {
732 let trimmed = source.trim();
733 if trimmed.is_empty() {
734 "harn.agent_scratchpad".to_string()
735 } else {
736 trimmed.to_string()
737 }
738}
739
740fn scratchpad_counts_json(value: &serde_json::Value) -> serde_json::Value {
741 serde_json::json!({
742 "goals": scratchpad_array_len(value, "goals"),
743 "open_items": scratchpad_array_len(value, "open_items"),
744 "facts": scratchpad_array_len(value, "facts"),
745 "refs": scratchpad_array_len(value, "refs"),
746 })
747}
748
749fn scratchpad_array_len(value: &serde_json::Value, key: &str) -> usize {
750 value
751 .get(key)
752 .and_then(serde_json::Value::as_array)
753 .map(Vec::len)
754 .unwrap_or(0)
755}
756
757pub fn snapshot(id: &str) -> Option<VmValue> {
758 SESSIONS.with(|s| s.borrow().get(id).map(session_snapshot))
759}
760
761pub fn transcript(id: &str) -> Option<VmValue> {
763 SESSIONS.with(|s| {
764 s.borrow()
765 .get(id)
766 .map(|state| transcript_with_session_metadata(state.transcript.clone(), state))
767 })
768}
769
770pub fn open_or_create(id: Option<String>) -> String {
779 open_or_create_with_actor_chain(id, None)
780}
781
782pub fn open_or_create_with_actor_chain(
783 id: Option<String>,
784 requested_actor_chain: Option<ActorChain>,
785) -> String {
786 let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
787 let parent_session = current_session_id();
788 let inherited_actor_chain = requested_actor_chain
789 .clone()
790 .or_else(|| parent_session.as_deref().and_then(actor_chain));
791 let mut was_new = false;
792 SESSIONS.with(|s| {
793 let mut map = s.borrow_mut();
794 if let Some(state) = map.get_mut(&resolved) {
795 if let Some(actor_chain) = requested_actor_chain.clone() {
796 state.actor_chain = Some(actor_chain);
797 }
798 state.touch();
799 return;
800 }
801 was_new = true;
802 let cap = SESSION_CAP.with(|c| c.get());
803 if map.len() >= cap {
804 if let Some(victim) = map
805 .iter()
806 .min_by_key(|(_, state)| state.last_accessed)
807 .map(|(id, _)| id.clone())
808 {
809 map.remove(&victim);
810 }
811 }
812 let mut state = SessionState::new(resolved.clone());
813 state.actor_chain = inherited_actor_chain.clone();
814 map.insert(resolved.clone(), state);
815 });
816 if was_new {
817 if let Some(parent) = parent_session.as_deref() {
818 crate::agent_events::mirror_session_sinks(parent, &resolved);
819 }
820 try_register_event_log(&resolved);
821 }
822 resolved
823}
824
825pub fn open_child_session(parent_id: &str, id: Option<String>) -> String {
826 open_child_session_with_actor(parent_id, id, None)
827}
828
829pub fn open_child_session_with_actor(
830 parent_id: &str,
831 id: Option<String>,
832 actor: Option<&str>,
833) -> String {
834 let actor_chain = actor_chain(parent_id).map(|chain| match actor {
835 Some(actor) if !actor.trim().is_empty() => chain.pushed(actor.trim()),
836 _ => chain,
837 });
838 let resolved = open_or_create_with_actor_chain(id, actor_chain);
839 link_child_session(parent_id, &resolved);
840 resolved
841}
842
843pub fn link_child_session(parent_id: &str, child_id: &str) {
844 link_child_session_with_branch(parent_id, child_id, None);
845}
846
847pub fn link_child_session_with_branch(
848 parent_id: &str,
849 child_id: &str,
850 branched_at_event_index: Option<usize>,
851) {
852 if parent_id == child_id {
853 return;
854 }
855 open_or_create(Some(parent_id.to_string()));
856 open_or_create(Some(child_id.to_string()));
857 SESSIONS.with(|s| {
858 let mut map = s.borrow_mut();
859 update_lineage(&mut map, parent_id, child_id, branched_at_event_index);
860 });
861}
862
863pub fn parent_id(id: &str) -> Option<String> {
864 SESSIONS.with(|s| s.borrow().get(id).and_then(|state| state.parent_id.clone()))
865}
866
867pub fn child_ids(id: &str) -> Vec<String> {
868 SESSIONS.with(|s| {
869 s.borrow()
870 .get(id)
871 .map(|state| state.child_ids.clone())
872 .unwrap_or_default()
873 })
874}
875
876pub fn ancestry(id: &str) -> Option<SessionAncestry> {
877 SESSIONS.with(|s| {
878 let map = s.borrow();
879 let state = map.get(id)?;
880 let mut root_id = state.id.clone();
881 let mut cursor = state.parent_id.clone();
882 let mut seen = HashSet::from([state.id.clone()]);
883 while let Some(parent_id) = cursor {
884 if !seen.insert(parent_id.clone()) {
885 break;
886 }
887 root_id = parent_id.clone();
888 cursor = map
889 .get(&parent_id)
890 .and_then(|parent| parent.parent_id.clone());
891 }
892 Some(SessionAncestry {
893 parent_id: state.parent_id.clone(),
894 child_ids: state.child_ids.clone(),
895 root_id,
896 })
897 })
898}
899
900pub fn live_clients(id: &str) -> Option<Vec<LiveSessionClient>> {
901 SESSIONS.with(|s| {
902 s.borrow()
903 .get(id)
904 .map(|state| state.live_clients.values().cloned().collect())
905 })
906}
907
908pub fn attach_live_client(id: &str, request: AttachLiveClient) -> Result<LiveClientChange, String> {
909 SESSIONS.with(|s| {
910 let mut map = s.borrow_mut();
911 let Some(state) = map.get_mut(id) else {
912 return Err(format!("agent session '{id}' does not exist"));
913 };
914 let client_id = validate_live_client_id(request.client_id)?;
915 let now = crate::orchestration::now_rfc3339();
916 let previous_clients = state.live_clients.clone();
917 let previous_controller_id = state.live_controller_id.clone();
918
919 if request.mode == LiveClientMode::Controller {
920 let conflicting_controller = previous_controller_id
921 .as_ref()
922 .filter(|controller_id| *controller_id != &client_id)
923 .filter(|controller_id| state.live_clients.contains_key(*controller_id));
924 if let Some(previous) = conflicting_controller {
925 if !request.takeover {
926 return Err(format!("live session already has controller '{previous}'"));
927 }
928 if let Some(previous_client) = state.live_clients.get_mut(previous) {
929 previous_client.mode = LiveClientMode::Observer;
930 previous_client.prompt_injection = false;
931 previous_client.permission_routing = false;
932 previous_client.last_seen_at = now.clone();
933 }
934 }
935 state.live_controller_id = Some(client_id.clone());
936 } else if state.live_controller_id.as_deref() == Some(client_id.as_str()) {
937 state.live_controller_id = None;
938 }
939
940 let attached_at = state
941 .live_clients
942 .get(&client_id)
943 .map(|client| client.attached_at.clone())
944 .unwrap_or_else(|| now.clone());
945 let client = LiveSessionClient {
946 client_id: client_id.clone(),
947 mode: request.mode,
948 attached_at,
949 last_seen_at: now,
950 prompt_injection: request.prompt_injection,
951 permission_routing: request.permission_routing,
952 metadata: request.metadata,
953 };
954 state.live_clients.insert(client_id, client.clone());
955 state.touch();
956 let active_controller_id = state.live_controller_id.clone();
957 append_live_client_event(
958 state,
959 "attached",
960 Some(&client),
961 previous_controller_id.as_deref(),
962 active_controller_id.as_deref(),
963 serde_json::Value::Null,
964 )
965 .inspect_err(|_error| {
966 state.live_clients = previous_clients;
967 state.live_controller_id = previous_controller_id.clone();
968 })?;
969 Ok(live_client_change(
970 Some(client),
971 previous_controller_id,
972 state,
973 ))
974 })
975}
976
977pub fn takeover_live_client(
978 id: &str,
979 client_id: impl Into<String>,
980 metadata: serde_json::Value,
981) -> Result<LiveClientChange, String> {
982 attach_live_client(
983 id,
984 AttachLiveClient {
985 client_id: client_id.into(),
986 mode: LiveClientMode::Controller,
987 takeover: true,
988 prompt_injection: true,
989 permission_routing: true,
990 metadata,
991 },
992 )
993}
994
995pub fn detach_live_client(
996 id: &str,
997 client_id: impl Into<String>,
998 reason: Option<String>,
999 metadata: serde_json::Value,
1000) -> Result<LiveClientChange, String> {
1001 SESSIONS.with(|s| {
1002 let mut map = s.borrow_mut();
1003 let Some(state) = map.get_mut(id) else {
1004 return Err(format!("agent session '{id}' does not exist"));
1005 };
1006 let client_id = validate_live_client_id(client_id.into())?;
1007 let previous_clients = state.live_clients.clone();
1008 let previous_controller_id = state.live_controller_id.clone();
1009 let Some(mut client) = state.live_clients.remove(&client_id) else {
1010 return Err(format!("live client '{client_id}' is not attached"));
1011 };
1012 client.last_seen_at = crate::orchestration::now_rfc3339();
1013 if state.live_controller_id.as_deref() == Some(client_id.as_str()) {
1014 state.live_controller_id = None;
1015 }
1016 state.touch();
1017 let active_controller_id = state.live_controller_id.clone();
1018 append_live_client_event(
1019 state,
1020 "detached",
1021 Some(&client),
1022 previous_controller_id.as_deref(),
1023 active_controller_id.as_deref(),
1024 serde_json::json!({
1025 "reason": reason.unwrap_or_else(|| "client_detached".to_string()),
1026 "metadata": metadata,
1027 }),
1028 )
1029 .inspect_err(|_error| {
1030 state.live_clients = previous_clients;
1031 state.live_controller_id = previous_controller_id.clone();
1032 })?;
1033 Ok(live_client_change(None, previous_controller_id, state))
1034 })
1035}
1036
1037pub fn heartbeat_live_client(
1038 id: &str,
1039 client_id: impl Into<String>,
1040 metadata: serde_json::Value,
1041) -> Result<LiveClientChange, String> {
1042 SESSIONS.with(|s| {
1043 let mut map = s.borrow_mut();
1044 let Some(state) = map.get_mut(id) else {
1045 return Err(format!("agent session '{id}' does not exist"));
1046 };
1047 let client_id = validate_live_client_id(client_id.into())?;
1048 let previous_clients = state.live_clients.clone();
1049 let previous_controller_id = state.live_controller_id.clone();
1050 let Some(client) = state.live_clients.get_mut(&client_id) else {
1051 return Err(format!("live client '{client_id}' is not attached"));
1052 };
1053 client.last_seen_at = crate::orchestration::now_rfc3339();
1054 if !metadata.is_null() {
1055 client.metadata = metadata.clone();
1056 }
1057 let client = client.clone();
1058 state.touch();
1059 let active_controller_id = state.live_controller_id.clone();
1060 append_live_client_event(
1061 state,
1062 "heartbeat",
1063 Some(&client),
1064 previous_controller_id.as_deref(),
1065 active_controller_id.as_deref(),
1066 serde_json::json!({ "metadata": metadata }),
1067 )
1068 .inspect_err(|_error| {
1069 state.live_clients = previous_clients;
1070 state.live_controller_id = previous_controller_id.clone();
1071 })?;
1072 Ok(live_client_change(
1073 Some(client),
1074 previous_controller_id,
1075 state,
1076 ))
1077 })
1078}
1079
1080pub fn inject_prompt_from_live_client(
1081 id: &str,
1082 client_id: impl Into<String>,
1083 content: VmValue,
1084 metadata: serde_json::Value,
1085) -> Result<(), String> {
1086 let client_id = validate_live_client_id(client_id.into())?;
1087 ensure_live_controller(id, &client_id, LiveControllerCapability::PromptInjection)?;
1088 let mut message = BTreeMap::new();
1089 message.put_str("role", "user");
1090 message.insert("content".to_string(), content);
1091 message.insert(
1092 "metadata".to_string(),
1093 crate::stdlib::json_to_vm_value(&serde_json::json!({
1094 "live_session": {
1095 "client_id": client_id,
1096 "mode": "controller",
1097 "source": "live_session_attach",
1098 "metadata": metadata,
1099 }
1100 })),
1101 );
1102 inject_message(id, VmValue::dict(message))
1103}
1104
1105pub fn route_live_permission_request(
1106 id: &str,
1107 client_id: impl Into<String>,
1108 request: serde_json::Value,
1109 metadata: serde_json::Value,
1110) -> Result<serde_json::Value, String> {
1111 let client_id = validate_live_client_id(client_id.into())?;
1112 let client =
1113 ensure_live_controller(id, &client_id, LiveControllerCapability::PermissionRouting)?;
1114 let request_id = request
1115 .get("id")
1116 .or_else(|| request.get("request_id"))
1117 .and_then(serde_json::Value::as_str)
1118 .filter(|id| !id.trim().is_empty())
1119 .unwrap_or("permission_request");
1120 let event_metadata = serde_json::json!({
1121 "action": "permission_routed",
1122 "client": live_client_json(&client),
1123 "request_id": request_id,
1124 "request": request,
1125 "metadata": metadata,
1126 });
1127 let event = crate::llm::helpers::transcript_event(
1128 LIVE_CLIENT_PERMISSION_EVENT_KIND,
1129 "system",
1130 "internal",
1131 "Live session permission request routed",
1132 Some(event_metadata.clone()),
1133 );
1134 append_event(id, event)?;
1135 Ok(event_metadata)
1136}
1137
1138enum LiveControllerCapability {
1139 PromptInjection,
1140 PermissionRouting,
1141}
1142
1143fn ensure_live_controller(
1144 id: &str,
1145 client_id: &str,
1146 capability: LiveControllerCapability,
1147) -> Result<LiveSessionClient, String> {
1148 SESSIONS.with(|s| {
1149 let map = s.borrow();
1150 let Some(state) = map.get(id) else {
1151 return Err(format!("agent session '{id}' does not exist"));
1152 };
1153 if state.live_controller_id.as_deref() != Some(client_id) {
1154 return Err(format!(
1155 "live client '{client_id}' is not the active controller"
1156 ));
1157 }
1158 let Some(client) = state.live_clients.get(client_id) else {
1159 return Err(format!("live client '{client_id}' is not attached"));
1160 };
1161 match capability {
1162 LiveControllerCapability::PromptInjection if !client.prompt_injection => Err(format!(
1163 "live client '{client_id}' cannot inject prompts for this session"
1164 )),
1165 LiveControllerCapability::PermissionRouting if !client.permission_routing => Err(
1166 format!("live client '{client_id}' cannot route permissions for this session"),
1167 ),
1168 _ => Ok(client.clone()),
1169 }
1170 })
1171}
1172
1173fn append_live_client_event(
1174 state: &mut SessionState,
1175 action: &str,
1176 client: Option<&LiveSessionClient>,
1177 previous_controller_id: Option<&str>,
1178 active_controller_id: Option<&str>,
1179 extra: serde_json::Value,
1180) -> Result<(), String> {
1181 let metadata = serde_json::json!({
1182 "action": action,
1183 "session_id": state.id,
1184 "client": client.map(live_client_json),
1185 "previous_controller_id": previous_controller_id,
1186 "active_controller_id": active_controller_id,
1187 "clients": state
1188 .live_clients
1189 .values()
1190 .map(live_client_json)
1191 .collect::<Vec<_>>(),
1192 "extra": extra,
1193 });
1194 let event = crate::llm::helpers::transcript_event(
1195 LIVE_CLIENT_EVENT_KIND,
1196 "system",
1197 "internal",
1198 "Live session client lifecycle changed",
1199 Some(metadata),
1200 );
1201 append_event_to_state(state, event, "live_client")
1202}
1203
1204fn live_client_change(
1205 client: Option<LiveSessionClient>,
1206 previous_controller_id: Option<String>,
1207 state: &SessionState,
1208) -> LiveClientChange {
1209 LiveClientChange {
1210 client,
1211 previous_controller_id,
1212 active_controller_id: state.live_controller_id.clone(),
1213 clients: state.live_clients.values().cloned().collect(),
1214 }
1215}
1216
1217fn validate_live_client_id(id: impl Into<String>) -> Result<String, String> {
1218 let id = id.into();
1219 let trimmed = id.trim();
1220 if trimmed.is_empty() {
1221 return Err("live client id cannot be empty".to_string());
1222 }
1223 Ok(trimmed.to_string())
1224}
1225
1226pub fn live_client_json(client: &LiveSessionClient) -> serde_json::Value {
1227 serde_json::json!({
1228 "client_id": client.client_id,
1229 "mode": client.mode.as_str(),
1230 "attached_at": client.attached_at,
1231 "last_seen_at": client.last_seen_at,
1232 "prompt_injection": client.prompt_injection,
1233 "permission_routing": client.permission_routing,
1234 "metadata": client.metadata,
1235 })
1236}
1237
1238pub fn live_client_change_json(change: &LiveClientChange) -> serde_json::Value {
1239 serde_json::json!({
1240 "client": change.client.as_ref().map(live_client_json),
1241 "previous_controller_id": change.previous_controller_id,
1242 "active_controller_id": change.active_controller_id,
1243 "clients": change
1244 .clients
1245 .iter()
1246 .map(live_client_json)
1247 .collect::<Vec<_>>(),
1248 })
1249}
1250
1251fn try_register_event_log(session_id: &str) {
1255 if let Some(log) = crate::event_log::active_event_log() {
1256 crate::agent_events::register_sink(
1257 session_id,
1258 crate::agent_events::EventLogSink::new(log, session_id),
1259 );
1260 return;
1261 }
1262 let Ok(dir) = std::env::var("HARN_EVENT_LOG_DIR") else {
1263 return;
1264 };
1265 if dir.is_empty() {
1266 return;
1267 }
1268 let path = std::path::PathBuf::from(dir).join(format!("event_log-{session_id}.jsonl"));
1269 if let Ok(sink) = crate::agent_events::JsonlEventSink::open(&path) {
1270 crate::agent_events::register_sink(session_id, sink);
1271 }
1272}
1273
1274pub fn register_event_log_sink(session_id: &str) {
1275 try_register_event_log(session_id);
1276}
1277
1278pub fn close(id: &str) {
1279 SESSIONS.with(|s| {
1280 s.borrow_mut().remove(id);
1281 });
1282 crate::orchestration::agent_inbox::clear_session(id);
1286 crate::agent_events::clear_session_sinks(id);
1287}
1288
1289pub fn close_with_status(
1290 id: &str,
1291 reason: impl Into<String>,
1292 status: impl Into<String>,
1293 metadata: serde_json::Value,
1294) -> bool {
1295 if !exists(id) {
1296 return false;
1297 }
1298 let reason = reason.into();
1299 let status = status.into();
1300 let event_metadata = serde_json::json!({
1301 "reason": reason,
1302 "status": status,
1303 "metadata": metadata,
1304 });
1305 let transcript_event = crate::llm::helpers::transcript_event(
1306 "agent_session_closed",
1307 "system",
1308 "internal",
1309 "Agent session closed",
1310 Some(event_metadata),
1311 );
1312 let _ = append_event(id, transcript_event);
1313 crate::llm::emit_live_agent_event_sync(&crate::agent_events::AgentEvent::SessionClosed {
1314 session_id: id.to_string(),
1315 reason,
1316 status,
1317 metadata,
1318 });
1319 close(id);
1320 true
1321}
1322
1323pub fn reset_transcript(id: &str) -> bool {
1324 SESSIONS.with(|s| {
1325 let mut map = s.borrow_mut();
1326 let Some(state) = map.get_mut(id) else {
1327 return false;
1328 };
1329 state.transcript = empty_transcript(id);
1330 state.tool_format = None;
1331 state.system_prompt = None;
1332 state.scratchpad = None;
1333 state.scratchpad_version = 0;
1334 state.last_transcript_budget_action = None;
1335 state.completed_turn_checkpoints.clear();
1336 state.redo_stack.clear();
1337 state.touch();
1338 true
1339 })
1340}
1341
1342pub fn fork(src_id: &str, dst_id: Option<String>) -> Option<String> {
1349 let (
1350 src_transcript,
1351 src_tool_format,
1352 src_system_prompt,
1353 src_pinned_model,
1354 src_pinned_reasoning_policy,
1355 src_actor_chain,
1356 src_workspace_anchor,
1357 src_workspace_policy,
1358 src_scratchpad,
1359 src_scratchpad_version,
1360 src_transcript_budget_policy,
1361 src_last_transcript_budget_action,
1362 dst,
1363 ) = SESSIONS.with(|s| {
1364 let mut map = s.borrow_mut();
1365 let src = map.get_mut(src_id)?;
1366 src.touch();
1367 let dst = dst_id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
1368 let forked_transcript = clone_transcript_with_id(&src.transcript, &dst);
1369 Some((
1370 forked_transcript,
1371 src.tool_format.clone(),
1372 src.system_prompt.clone(),
1373 src.pinned_model.clone(),
1374 src.pinned_reasoning_policy.clone(),
1375 src.actor_chain.clone(),
1376 src.workspace_anchor.clone(),
1377 src.workspace_policy.clone(),
1378 src.scratchpad.clone(),
1379 src.scratchpad_version,
1380 src.transcript_budget_policy.clone(),
1381 src.last_transcript_budget_action.clone(),
1382 dst,
1383 ))
1384 })?;
1385 open_or_create(Some(dst.clone()));
1387 SESSIONS.with(|s| {
1388 let mut map = s.borrow_mut();
1389 if let Some(state) = map.get_mut(&dst) {
1390 state.transcript = src_transcript;
1391 state.tool_format = src_tool_format;
1392 state.system_prompt = src_system_prompt;
1393 state.pinned_model = src_pinned_model;
1394 state.pinned_reasoning_policy = src_pinned_reasoning_policy;
1395 state.actor_chain = src_actor_chain;
1396 state.workspace_anchor = src_workspace_anchor;
1397 state.workspace_policy = src_workspace_policy;
1398 state.scratchpad = src_scratchpad;
1399 state.scratchpad_version = src_scratchpad_version;
1400 state.transcript_budget_policy = src_transcript_budget_policy;
1401 state.last_transcript_budget_action = src_last_transcript_budget_action;
1402 state.touch();
1403 }
1404 update_lineage(&mut map, src_id, &dst, None);
1405 });
1406 let budget_ok = SESSIONS.with(|s| {
1407 let mut map = s.borrow_mut();
1408 let Some(state) = map.get_mut(&dst) else {
1409 return false;
1410 };
1411 let candidate = state.transcript.clone();
1412 apply_transcript_with_budget(state, candidate, "fork").is_ok()
1413 });
1414 if !budget_ok {
1415 close(&dst);
1416 return None;
1417 }
1418 if exists(&dst) {
1422 Some(dst)
1423 } else {
1424 None
1425 }
1426}
1427
1428pub fn fork_at(src_id: &str, keep_first: usize, dst_id: Option<String>) -> Option<String> {
1439 let branched_at_event_index = SESSIONS.with(|s| {
1440 let map = s.borrow();
1441 let src = map.get(src_id)?;
1442 Some(branch_event_index(&src.transcript, keep_first))
1443 })?;
1444 let new_id = fork(src_id, dst_id)?;
1445 link_child_session_with_branch(src_id, &new_id, Some(branched_at_event_index));
1446 let _ = truncate(&new_id, keep_first);
1447 Some(new_id)
1448}
1449
1450pub fn truncate(id: &str, keep_first: usize) -> Option<SessionTruncateResult> {
1454 SESSIONS.with(|s| {
1455 let mut map = s.borrow_mut();
1456 let state = map.get_mut(id)?;
1457 let result = truncate_state(state, keep_first)?;
1458 Some(result)
1459 })
1460}
1461
1462fn truncate_state(state: &mut SessionState, keep_first: usize) -> Option<SessionTruncateResult> {
1463 let dict = state
1464 .transcript
1465 .as_dict()
1466 .cloned()
1467 .unwrap_or_else(crate::value::DictMap::new);
1468 let messages: Vec<VmValue> = match dict.get("messages") {
1469 Some(VmValue::List(list)) => list.iter().cloned().collect(),
1470 _ => Vec::new(),
1471 };
1472 let existing_events = match dict.get("events") {
1473 Some(VmValue::List(list)) => Some(list.iter().cloned().collect::<Vec<_>>()),
1474 _ => None,
1475 };
1476 let kept_turn_count = keep_first.min(messages.len());
1477 let removed_turn_count = messages.len().saturating_sub(kept_turn_count);
1478 let mut new_tip_turn_id = existing_events
1479 .as_ref()
1480 .map(|events| turn_event_id_for_count(events, kept_turn_count))
1481 .unwrap_or_else(|| {
1482 let events = crate::llm::helpers::transcript_events_from_messages(&messages);
1483 turn_event_id_for_count(&events, kept_turn_count)
1484 });
1485
1486 if removed_turn_count > 0 {
1487 let retained: Vec<VmValue> = messages.into_iter().take(kept_turn_count).collect();
1488 let retained_events = match existing_events {
1489 Some(events) => {
1490 let keep_event_count = event_prefix_len_for_messages(&events, kept_turn_count);
1491 events.into_iter().take(keep_event_count).collect()
1492 }
1493 None => crate::llm::helpers::transcript_events_from_messages(&retained),
1494 };
1495 new_tip_turn_id = turn_event_id_for_count(&retained_events, kept_turn_count);
1496 let mut next = dict;
1497 next.insert(
1498 crate::value::intern_key("events"),
1499 VmValue::List(std::sync::Arc::new(retained_events)),
1500 );
1501 next.insert(
1502 crate::value::intern_key("messages"),
1503 VmValue::List(std::sync::Arc::new(retained)),
1504 );
1505 next.remove("summary");
1506 apply_transcript_with_budget(state, VmValue::dict(next), "truncate").ok()?;
1507 }
1508 state.touch();
1509 Some(SessionTruncateResult {
1510 kept_turn_count,
1511 removed_turn_count,
1512 new_tip_turn_id,
1513 })
1514}
1515
1516pub fn pop_last_if_assistant(id: &str) -> Result<bool, String> {
1523 SESSIONS.with(|s| {
1524 let mut map = s.borrow_mut();
1525 let Some(state) = map.get_mut(id) else {
1526 return Err(format!(
1527 "pop_last_if_assistant: unknown session id '{id}'"
1528 ));
1529 };
1530 let messages: Vec<VmValue> = match state.transcript.as_dict() {
1531 Some(dict) => match dict.get("messages") {
1532 Some(VmValue::List(list)) => list.iter().cloned().collect(),
1533 _ => Vec::new(),
1534 },
1535 None => Vec::new(),
1536 };
1537 if messages.is_empty() {
1538 return Ok(false);
1539 }
1540 let trailing_role = messages
1541 .last()
1542 .and_then(|m| m.as_dict())
1543 .and_then(|d| d.get("role"))
1544 .map(|v| v.display())
1545 .unwrap_or_default();
1546 if trailing_role != "assistant" {
1547 return Err(format!(
1548 "pop_last_if_assistant: trailing message role is '{trailing_role}', expected 'assistant'"
1549 ));
1550 }
1551 let keep = messages.len() - 1;
1552 truncate_state(state, keep);
1553 Ok(true)
1554 })
1555}
1556
1557pub fn trim(id: &str, keep_last: usize) -> Option<usize> {
1558 SESSIONS.with(|s| {
1559 let mut map = s.borrow_mut();
1560 let state = map.get_mut(id)?;
1561 let dict = state.transcript.as_dict()?.clone();
1562 let messages: Vec<VmValue> = match dict.get("messages") {
1563 Some(VmValue::List(list)) => list.iter().cloned().collect(),
1564 _ => Vec::new(),
1565 };
1566 let start = messages.len().saturating_sub(keep_last);
1567 let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
1568 let kept = retained.len();
1569 let mut next = dict;
1570 next.insert(
1571 crate::value::intern_key("events"),
1572 VmValue::List(std::sync::Arc::new(
1573 crate::llm::helpers::transcript_events_from_messages(&retained),
1574 )),
1575 );
1576 next.insert(
1577 crate::value::intern_key("messages"),
1578 VmValue::List(std::sync::Arc::new(retained)),
1579 );
1580 apply_transcript_with_budget(state, VmValue::dict(next), "trim").ok()?;
1581 Some(kept)
1582 })
1583}
1584
1585pub fn inject_message(id: &str, message: VmValue) -> Result<(), String> {
1588 let Some(msg_dict) = message.as_dict().cloned() else {
1589 return Err("agent_session_inject: message must be a dict".into());
1590 };
1591 let role_ok = matches!(msg_dict.get("role"), Some(VmValue::String(_)));
1592 if !role_ok {
1593 return Err(
1594 "agent_session_inject: message must have a string `role` (user|assistant|tool_result|system)"
1595 .into(),
1596 );
1597 }
1598 SESSIONS.with(|s| {
1599 let mut map = s.borrow_mut();
1600 let Some(state) = map.get_mut(id) else {
1601 return Err(format!("agent_session_inject: unknown session id '{id}'"));
1602 };
1603 let dict = state
1604 .transcript
1605 .as_dict()
1606 .cloned()
1607 .unwrap_or_else(crate::value::DictMap::new);
1608 let mut messages: Vec<VmValue> = match dict.get("messages") {
1609 Some(VmValue::List(list)) => list.iter().cloned().collect(),
1610 _ => Vec::new(),
1611 };
1612 let mut events: Vec<VmValue> = match dict.get("events") {
1613 Some(VmValue::List(list)) => list.iter().cloned().collect(),
1614 _ => crate::llm::helpers::transcript_events_from_messages(&messages),
1615 };
1616 let new_message = VmValue::dict(msg_dict);
1617 let message_index = messages.len();
1618 events.push(crate::llm::helpers::transcript_event_from_message(
1619 &new_message,
1620 ));
1621 messages.push(new_message);
1622 let mut next = dict;
1623 next.insert(
1624 crate::value::intern_key("events"),
1625 VmValue::List(std::sync::Arc::new(events)),
1626 );
1627 next.insert(
1628 crate::value::intern_key("messages"),
1629 VmValue::List(std::sync::Arc::new(messages)),
1630 );
1631 let persisted_message = next
1632 .get("messages")
1633 .and_then(|value| match value {
1634 VmValue::List(list) => list.get(message_index).cloned(),
1635 _ => None,
1636 })
1637 .unwrap_or(VmValue::Nil);
1638 apply_transcript_with_budget(state, VmValue::dict(next), "inject_message")?;
1639 emit_identified_user_message_event(id, &persisted_message);
1640 emit_llm_message_event(id, message_index, &persisted_message);
1641 Ok(())
1642 })
1643}
1644
1645fn emit_identified_user_message_event(session_id: &str, message: &VmValue) {
1646 let message_json = crate::llm::helpers::vm_value_to_json(message);
1647 let role = message_json.get("role").and_then(|value| value.as_str());
1648 if role != Some("user") {
1649 return;
1650 }
1651 let Some(message_id) = message_json
1652 .get("messageId")
1653 .or_else(|| message_json.get("message_id"))
1654 .and_then(|value| value.as_str())
1655 .filter(|value| !value.trim().is_empty())
1656 else {
1657 return;
1658 };
1659 let content = message_json
1660 .get("content")
1661 .map(user_message_content_blocks)
1662 .unwrap_or_default();
1663 crate::agent_events::emit_event(&crate::agent_events::AgentEvent::UserMessage {
1664 session_id: session_id.to_string(),
1665 message_id: message_id.to_string(),
1666 content,
1667 });
1668}
1669
1670fn user_message_content_blocks(content: &serde_json::Value) -> Vec<serde_json::Value> {
1671 match content {
1672 serde_json::Value::Array(items) => items.clone(),
1673 serde_json::Value::String(text) => vec![serde_json::json!({
1674 "type": "text",
1675 "text": text,
1676 })],
1677 other => vec![serde_json::json!({
1678 "type": "text",
1679 "text": other.to_string(),
1680 })],
1681 }
1682}
1683
1684#[derive(Clone, Debug, PartialEq, Eq)]
1685struct TranscriptBudgetUsage {
1686 message_count: usize,
1687 event_count: usize,
1688 approx_bytes: Option<usize>,
1689}
1690
1691fn transcript_messages_from_dict(dict: &crate::value::DictMap) -> Vec<VmValue> {
1692 match dict.get("messages") {
1693 Some(VmValue::List(list)) => list.iter().cloned().collect(),
1694 _ => Vec::new(),
1695 }
1696}
1697
1698fn transcript_message_count(transcript: &VmValue) -> usize {
1699 transcript
1700 .as_dict()
1701 .map(transcript_messages_from_dict)
1702 .map(|messages| messages.len())
1703 .unwrap_or(0)
1704}
1705
1706fn transcript_events_from_dict(dict: &crate::value::DictMap) -> Vec<VmValue> {
1707 match dict.get("events") {
1708 Some(VmValue::List(list)) => list.iter().cloned().collect(),
1709 _ => {
1710 let messages = transcript_messages_from_dict(dict);
1711 crate::llm::helpers::transcript_events_from_messages(&messages)
1712 }
1713 }
1714}
1715
1716fn transcript_usage(transcript: &VmValue, include_bytes: bool) -> TranscriptBudgetUsage {
1717 let Some(dict) = transcript.as_dict() else {
1718 return TranscriptBudgetUsage {
1719 message_count: 0,
1720 event_count: 0,
1721 approx_bytes: include_bytes.then_some(0),
1722 };
1723 };
1724 let approx_bytes = if include_bytes {
1725 serde_json::to_vec(&crate::llm::helpers::vm_value_to_json(transcript))
1726 .map(|bytes| bytes.len())
1727 .ok()
1728 .or(Some(usize::MAX))
1729 } else {
1730 None
1731 };
1732 TranscriptBudgetUsage {
1733 message_count: transcript_messages_from_dict(dict).len(),
1734 event_count: transcript_events_from_dict(dict).len(),
1735 approx_bytes,
1736 }
1737}
1738
1739fn transcript_budget_exceeded_reason(
1740 usage: &TranscriptBudgetUsage,
1741 policy: &SessionTranscriptBudgetPolicy,
1742) -> Option<&'static str> {
1743 if usage.message_count > policy.max_messages {
1744 return Some("message_count");
1745 }
1746 if usage.event_count > policy.max_events {
1747 return Some("event_count");
1748 }
1749 if let (Some(bytes), Some(limit)) = (usage.approx_bytes, policy.max_approx_bytes) {
1750 if bytes > limit {
1751 return Some("approx_bytes");
1752 }
1753 }
1754 None
1755}
1756
1757fn transcript_budget_usage_json(usage: &TranscriptBudgetUsage) -> serde_json::Value {
1758 serde_json::json!({
1759 "messages": usage.message_count,
1760 "events": usage.event_count,
1761 "approx_bytes": usage.approx_bytes,
1762 })
1763}
1764
1765fn transcript_budget_policy_json(policy: &SessionTranscriptBudgetPolicy) -> serde_json::Value {
1766 let recovery = match &policy.recovery {
1767 TranscriptBudgetRecovery::Reject => serde_json::json!({"action": "reject"}),
1768 TranscriptBudgetRecovery::Trim { keep_last } => {
1769 serde_json::json!({"action": "trim", "keep_last": keep_last})
1770 }
1771 TranscriptBudgetRecovery::Compact { keep_last } => {
1772 serde_json::json!({"action": "compact", "keep_last": keep_last})
1773 }
1774 };
1775 serde_json::json!({
1776 "max_messages": policy.max_messages,
1777 "max_events": policy.max_events,
1778 "max_approx_bytes": policy.max_approx_bytes,
1779 "recovery": recovery,
1780 })
1781}
1782
1783fn transcript_budget_recovery_name(recovery: &TranscriptBudgetRecovery) -> &'static str {
1784 match recovery {
1785 TranscriptBudgetRecovery::Reject => "reject",
1786 TranscriptBudgetRecovery::Trim { .. } => "trim",
1787 TranscriptBudgetRecovery::Compact { .. } => "compact",
1788 }
1789}
1790
1791fn transcript_budget_error(
1792 state: &SessionState,
1793 policy: &SessionTranscriptBudgetPolicy,
1794 usage: &TranscriptBudgetUsage,
1795 reason: &str,
1796) -> String {
1797 let byte_suffix = match (usage.approx_bytes, policy.max_approx_bytes) {
1798 (Some(bytes), Some(limit)) => format!(", approx_bytes {bytes}/{limit}"),
1799 _ => String::new(),
1800 };
1801 format!(
1802 "transcript budget exceeded for session '{}': {reason} (messages {}/{}, events {}/{}{}; recovery={})",
1803 state.id,
1804 usage.message_count,
1805 policy.max_messages,
1806 usage.event_count,
1807 policy.max_events,
1808 byte_suffix,
1809 transcript_budget_recovery_name(&policy.recovery),
1810 )
1811}
1812
1813fn transcript_budget_audit_json(
1814 action: &str,
1815 source: &str,
1816 reason: &str,
1817 policy: &SessionTranscriptBudgetPolicy,
1818 usage_before: &TranscriptBudgetUsage,
1819 usage_attempted: &TranscriptBudgetUsage,
1820 usage_after: &TranscriptBudgetUsage,
1821) -> serde_json::Value {
1822 serde_json::json!({
1823 "action": action,
1824 "source": source,
1825 "reason": reason,
1826 "policy": transcript_budget_policy_json(policy),
1827 "usage_before": transcript_budget_usage_json(usage_before),
1828 "usage_attempted": transcript_budget_usage_json(usage_attempted),
1829 "usage_after": transcript_budget_usage_json(usage_after),
1830 "removed_messages": usage_attempted.message_count.saturating_sub(usage_after.message_count),
1831 "removed_events": usage_attempted.event_count.saturating_sub(usage_after.event_count),
1832 })
1833}
1834
1835fn transcript_budget_event(audit: &serde_json::Value) -> VmValue {
1836 let action = audit
1837 .get("action")
1838 .and_then(serde_json::Value::as_str)
1839 .unwrap_or("enforced");
1840 crate::llm::helpers::transcript_event(
1841 "transcript_budget",
1842 "system",
1843 "internal",
1844 &format!("Transcript budget {action}."),
1845 Some(audit.clone()),
1846 )
1847}
1848
1849fn append_event_to_transcript(transcript: VmValue, event: VmValue) -> VmValue {
1850 let Some(dict) = transcript.as_dict() else {
1851 return transcript;
1852 };
1853 let mut next = dict.clone();
1854 let mut events = transcript_events_from_dict(&next);
1855 events.push(event);
1856 next.insert(
1857 crate::value::intern_key("events"),
1858 VmValue::List(std::sync::Arc::new(events)),
1859 );
1860 VmValue::dict(next)
1861}
1862
1863fn tail_message_capacity(
1864 policy: &SessionTranscriptBudgetPolicy,
1865 reserve_audit_event: bool,
1866) -> usize {
1867 let event_capacity = tail_event_capacity(policy, usize::from(reserve_audit_event));
1868 policy.max_messages.min(event_capacity)
1869}
1870
1871fn tail_event_capacity(policy: &SessionTranscriptBudgetPolicy, reserved_events: usize) -> usize {
1872 policy.max_events.saturating_sub(reserved_events)
1873}
1874
1875fn trim_transcript_for_budget(
1876 transcript: &VmValue,
1877 policy: &SessionTranscriptBudgetPolicy,
1878 keep_last: usize,
1879) -> VmValue {
1880 let dict = transcript
1881 .as_dict()
1882 .cloned()
1883 .unwrap_or_else(crate::value::DictMap::new);
1884 let messages = transcript_messages_from_dict(&dict);
1885 let keep = keep_last.min(tail_message_capacity(policy, true));
1886 let start = messages.len().saturating_sub(keep);
1887 let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
1888 let mut next = dict;
1889 next.insert(
1890 crate::value::intern_key("events"),
1891 VmValue::List(std::sync::Arc::new(
1892 crate::llm::helpers::transcript_events_from_messages(&retained),
1893 )),
1894 );
1895 next.insert(
1896 crate::value::intern_key("messages"),
1897 VmValue::List(std::sync::Arc::new(retained)),
1898 );
1899 next.remove("summary");
1900 VmValue::dict(next)
1901}
1902
1903struct BudgetCompactionLiveEvent {
1904 policy: crate::orchestration::CompactionPolicy,
1905 policy_strategy: String,
1906 metrics: crate::orchestration::TranscriptCompactedEventMetrics,
1907}
1908
1909struct BudgetCompactionResult {
1910 transcript: VmValue,
1911 live_event: Option<BudgetCompactionLiveEvent>,
1912}
1913
1914fn compact_transcript_for_budget(
1915 transcript: &VmValue,
1916 policy: &SessionTranscriptBudgetPolicy,
1917 keep_last: usize,
1918 session_id: &str,
1919) -> BudgetCompactionResult {
1920 let dict = transcript
1921 .as_dict()
1922 .cloned()
1923 .unwrap_or_else(crate::value::DictMap::new);
1924 let messages = transcript_messages_from_dict(&dict);
1925 let message_capacity = policy.max_messages.min(tail_event_capacity(policy, 2));
1926 let tail_keep = keep_last.min(message_capacity.saturating_sub(2));
1929 let mut config = crate::orchestration::AutoCompactConfig {
1930 token_threshold: 0,
1931 keep_last: tail_keep,
1932 compact_strategy: crate::orchestration::CompactStrategy::Llm,
1933 hard_limit_strategy: crate::orchestration::CompactStrategy::Truncate,
1934 fallback_strategy: Some(crate::orchestration::CompactStrategy::Truncate),
1935 policy_strategy: crate::orchestration::compact_strategy_name(
1936 &crate::orchestration::CompactStrategy::Llm,
1937 )
1938 .to_string(),
1939 ..Default::default()
1940 };
1941
1942 let mut json_messages = messages
1943 .iter()
1944 .map(crate::llm::helpers::vm_value_to_json)
1945 .collect::<Vec<_>>();
1946 let lifecycle =
1947 crate::orchestration::CompactLifecycle::new(crate::orchestration::CompactMode::Auto)
1948 .with_session_id(Some(session_id))
1949 .with_trigger(crate::orchestration::CompactionTrigger::BudgetPressure)
1950 .with_hook_dispatch(false)
1951 .with_evaluate_providers(false);
1952 let llm_opts = crate::llm::extract_llm_options(&[
1953 VmValue::String(arcstr::ArcStr::from("")),
1954 VmValue::Nil,
1955 VmValue::Nil,
1956 ])
1957 .ok();
1958 let outcome = futures::executor::block_on(crate::orchestration::run_compaction_lifecycle(
1959 &mut json_messages,
1960 &mut config,
1961 llm_opts.as_ref(),
1962 lifecycle,
1963 ))
1964 .ok()
1965 .flatten();
1966
1967 let retained = json_messages
1968 .iter()
1969 .map(crate::stdlib::json_to_vm_value)
1970 .collect::<Vec<_>>();
1971 let mut events = crate::llm::helpers::transcript_events_from_messages(&retained);
1972 let summary = outcome.as_ref().map(|outcome| outcome.summary.clone());
1973 let mut live_event = None;
1974 if let Some(outcome) = outcome {
1975 events.push(crate::llm::helpers::transcript_event(
1976 "compaction",
1977 "system",
1978 "internal",
1979 "",
1980 Some(outcome.event_metadata.clone()),
1981 ));
1982 live_event = Some(BudgetCompactionLiveEvent {
1983 policy: config.policy.clone(),
1984 policy_strategy: outcome.policy_strategy,
1985 metrics: crate::orchestration::TranscriptCompactedEventMetrics {
1986 archived_messages: outcome.archived_messages,
1987 estimated_tokens_before: outcome.estimated_tokens_before,
1988 estimated_tokens_after: outcome.estimated_tokens_after,
1989 snapshot_asset_id: outcome.snapshot_asset_id,
1990 },
1991 });
1992 }
1993
1994 let mut next = dict;
1995 next.insert(
1996 crate::value::intern_key("events"),
1997 VmValue::List(std::sync::Arc::new(events)),
1998 );
1999 next.insert(
2000 crate::value::intern_key("messages"),
2001 VmValue::List(std::sync::Arc::new(retained)),
2002 );
2003 if let Some(summary) = summary {
2004 next.put_str("summary", summary);
2005 } else {
2006 next.remove("summary");
2007 }
2008 BudgetCompactionResult {
2009 transcript: VmValue::dict(next),
2010 live_event,
2011 }
2012}
2013
2014fn recovered_transcript_with_audit(
2015 recovered: VmValue,
2016 action: &str,
2017 source: &str,
2018 reason: &str,
2019 policy: &SessionTranscriptBudgetPolicy,
2020 usage_before: &TranscriptBudgetUsage,
2021 usage_attempted: &TranscriptBudgetUsage,
2022 include_bytes: bool,
2023) -> (VmValue, serde_json::Value, TranscriptBudgetUsage) {
2024 let usage_after_without_audit = transcript_usage(&recovered, include_bytes);
2025 let initial_audit = transcript_budget_audit_json(
2026 action,
2027 source,
2028 reason,
2029 policy,
2030 usage_before,
2031 usage_attempted,
2032 &usage_after_without_audit,
2033 );
2034 let with_initial_audit =
2035 append_event_to_transcript(recovered.clone(), transcript_budget_event(&initial_audit));
2036 let usage_after = transcript_usage(&with_initial_audit, include_bytes);
2037 let audit = transcript_budget_audit_json(
2038 action,
2039 source,
2040 reason,
2041 policy,
2042 usage_before,
2043 usage_attempted,
2044 &usage_after,
2045 );
2046 let with_audit = append_event_to_transcript(recovered, transcript_budget_event(&audit));
2047 let usage_after = transcript_usage(&with_audit, include_bytes);
2048 (with_audit, audit, usage_after)
2049}
2050
2051fn apply_transcript_with_budget(
2052 state: &mut SessionState,
2053 candidate: VmValue,
2054 source: &str,
2055) -> Result<(), String> {
2056 let policy = state.transcript_budget_policy.normalized();
2057 let include_bytes = policy.max_approx_bytes.is_some();
2058 let usage_before = transcript_usage(&state.transcript, include_bytes);
2059 let usage_attempted = transcript_usage(&candidate, include_bytes);
2060 let Some(reason) = transcript_budget_exceeded_reason(&usage_attempted, &policy) else {
2061 state.replace_transcript(candidate);
2062 return Ok(());
2063 };
2064
2065 match policy.recovery.clone() {
2066 TranscriptBudgetRecovery::Reject => {
2067 let audit = transcript_budget_audit_json(
2068 "rejected",
2069 source,
2070 reason,
2071 &policy,
2072 &usage_before,
2073 &usage_attempted,
2074 &usage_before,
2075 );
2076 state.last_transcript_budget_action = Some(audit);
2077 Err(transcript_budget_error(
2078 state,
2079 &policy,
2080 &usage_attempted,
2081 reason,
2082 ))
2083 }
2084 TranscriptBudgetRecovery::Trim { keep_last } => {
2085 let recovered = trim_transcript_for_budget(&candidate, &policy, keep_last);
2086 let (with_audit, audit, usage_after) = recovered_transcript_with_audit(
2087 recovered,
2088 "trimmed",
2089 source,
2090 reason,
2091 &policy,
2092 &usage_before,
2093 &usage_attempted,
2094 include_bytes,
2095 );
2096 if transcript_budget_exceeded_reason(&usage_after, &policy).is_some() {
2097 let rejected = transcript_budget_audit_json(
2098 "rejected",
2099 source,
2100 reason,
2101 &policy,
2102 &usage_before,
2103 &usage_attempted,
2104 &usage_after,
2105 );
2106 state.last_transcript_budget_action = Some(rejected);
2107 return Err(transcript_budget_error(
2108 state,
2109 &policy,
2110 &usage_after,
2111 reason,
2112 ));
2113 }
2114 state.last_transcript_budget_action = Some(audit);
2115 state.replace_transcript(with_audit);
2116 Ok(())
2117 }
2118 TranscriptBudgetRecovery::Compact { keep_last } => {
2119 let compacted =
2120 compact_transcript_for_budget(&candidate, &policy, keep_last, &state.id);
2121 let (with_audit, audit, usage_after) = recovered_transcript_with_audit(
2122 compacted.transcript,
2123 "compacted",
2124 source,
2125 reason,
2126 &policy,
2127 &usage_before,
2128 &usage_attempted,
2129 include_bytes,
2130 );
2131 if transcript_budget_exceeded_reason(&usage_after, &policy).is_some() {
2132 let rejected = transcript_budget_audit_json(
2133 "rejected",
2134 source,
2135 reason,
2136 &policy,
2137 &usage_before,
2138 &usage_attempted,
2139 &usage_after,
2140 );
2141 state.last_transcript_budget_action = Some(rejected);
2142 return Err(transcript_budget_error(
2143 state,
2144 &policy,
2145 &usage_after,
2146 reason,
2147 ));
2148 }
2149 state.last_transcript_budget_action = Some(audit);
2150 state.replace_transcript(with_audit);
2151 if let Some(event) = compacted.live_event {
2152 crate::orchestration::emit_transcript_compacted_event_sync(
2153 &state.id,
2154 crate::orchestration::CompactMode::Auto,
2155 crate::orchestration::CompactionTrigger::BudgetPressure
2156 .as_str()
2157 .to_string(),
2158 &event.policy,
2159 event.policy_strategy,
2160 event.metrics,
2161 );
2162 }
2163 Ok(())
2164 }
2165 }
2166}
2167
2168fn emit_llm_message_event(session_id: &str, message_index: usize, message: &VmValue) {
2169 let mut fields = serde_json::Map::new();
2170 fields.insert(
2171 "session_id".to_string(),
2172 serde_json::Value::String(session_id.to_string()),
2173 );
2174 fields.insert(
2175 "message_index".to_string(),
2176 serde_json::json!(message_index),
2177 );
2178 let message_json = crate::llm::helpers::vm_value_to_json(message);
2179 if let Some(role) = message_json.get("role").and_then(|value| value.as_str()) {
2180 fields.insert(
2181 "role".to_string(),
2182 serde_json::Value::String(role.to_string()),
2183 );
2184 }
2185 if let Some(content) = message_json.get("content") {
2186 fields.insert("content".to_string(), content.clone());
2187 }
2188 fields.insert("message".to_string(), message_json);
2189 crate::llm::append_observability_sidecar_entry("message", fields);
2190}
2191
2192pub fn seed_from_messages(
2198 id: Option<String>,
2199 messages: &[serde_json::Value],
2200 metadata: serde_json::Value,
2201 system_prompt: Option<String>,
2202 tool_format: Option<String>,
2203) -> Result<String, String> {
2204 let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
2205 if exists(&resolved) {
2206 return Err(format!("agent session '{resolved}' already exists"));
2207 }
2208 open_or_create(Some(resolved.clone()));
2209 SESSIONS.with(|s| {
2210 let mut map = s.borrow_mut();
2211 let Some(state) = map.get_mut(&resolved) else {
2212 return Err(format!("failed to create agent session '{resolved}'"));
2213 };
2214 state.tool_format = tool_format.filter(|value| !value.trim().is_empty());
2215 state.system_prompt = system_prompt.filter(|value| !value.trim().is_empty());
2216
2217 let mut metadata = metadata
2218 .as_object()
2219 .cloned()
2220 .unwrap_or_else(serde_json::Map::new);
2221 if let Some(tool_format) = state.tool_format.as_ref() {
2222 metadata.insert(
2223 "tool_format".to_string(),
2224 serde_json::Value::String(tool_format.clone()),
2225 );
2226 metadata.insert(
2227 "tool_mode_locked".to_string(),
2228 serde_json::Value::Bool(true),
2229 );
2230 }
2231 if let Some(system_prompt) = state.system_prompt.as_ref() {
2232 metadata.insert(
2233 "system_prompt".to_string(),
2234 crate::llm::helpers::system_prompt_metadata(system_prompt),
2235 );
2236 }
2237 let vm_messages = crate::llm::helpers::json_messages_to_vm(messages);
2238 let candidate = crate::llm::helpers::new_transcript_with(
2239 Some(resolved.clone()),
2240 vm_messages,
2241 None,
2242 Some(crate::stdlib::json_to_vm_value(&serde_json::Value::Object(
2243 metadata,
2244 ))),
2245 );
2246 apply_transcript_with_budget(state, candidate, "seed_from_messages")?;
2247 Ok(resolved)
2248 })
2249}
2250
2251pub fn messages_json(id: &str) -> Vec<serde_json::Value> {
2255 SESSIONS.with(|s| {
2256 let map = s.borrow();
2257 let Some(state) = map.get(id) else {
2258 return Vec::new();
2259 };
2260 let Some(dict) = state.transcript.as_dict() else {
2261 return Vec::new();
2262 };
2263 match dict.get("messages") {
2264 Some(VmValue::List(list)) => list
2265 .iter()
2266 .map(crate::llm::helpers::vm_value_to_json)
2267 .collect(),
2268 _ => Vec::new(),
2269 }
2270 })
2271}
2272
2273#[derive(Clone, Debug, Default)]
2274pub struct SessionPromptState {
2275 pub messages: Vec<serde_json::Value>,
2276 pub summary: Option<String>,
2277}
2278
2279fn summary_message_json(summary: &str) -> serde_json::Value {
2280 serde_json::json!({
2281 "role": "user",
2282 "content": summary,
2283 })
2284}
2285
2286fn messages_begin_with_summary(messages: &[serde_json::Value], summary: &str) -> bool {
2287 messages.first().is_some_and(|message| {
2288 message.get("role").and_then(|value| value.as_str()) == Some("user")
2289 && message.get("content").and_then(|value| value.as_str()) == Some(summary)
2290 })
2291}
2292
2293pub fn prompt_state_json(id: &str) -> SessionPromptState {
2301 SESSIONS.with(|s| {
2302 let map = s.borrow();
2303 let Some(state) = map.get(id) else {
2304 return SessionPromptState::default();
2305 };
2306 let Some(dict) = state.transcript.as_dict() else {
2307 return SessionPromptState::default();
2308 };
2309 let mut messages = match dict.get("messages") {
2310 Some(VmValue::List(list)) => list
2311 .iter()
2312 .map(crate::llm::helpers::vm_value_to_json)
2313 .collect::<Vec<_>>(),
2314 _ => Vec::new(),
2315 };
2316 let summary = dict.get("summary").and_then(|value| match value {
2317 VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
2318 _ => None,
2319 });
2320 if let Some(summary_text) = summary.as_deref() {
2321 if !messages_begin_with_summary(&messages, summary_text) {
2322 messages.insert(0, summary_message_json(summary_text));
2323 }
2324 }
2325 SessionPromptState { messages, summary }
2326 })
2327}
2328
2329pub fn store_transcript(id: &str, transcript: VmValue) -> Result<(), String> {
2332 SESSIONS.with(|s| {
2333 let mut map = s.borrow_mut();
2334 let Some(state) = map.get_mut(id) else {
2335 return Err(format!(
2336 "agent_session_store_transcript: unknown session id '{id}'"
2337 ));
2338 };
2339 let transcript = transcript_with_session_metadata(transcript, state);
2340 apply_transcript_with_budget(state, transcript, "store_transcript")?;
2341 Ok(())
2342 })
2343}
2344
2345fn checkpoint_summary(checkpoint: &SessionTurnCheckpoint) -> SessionCheckpointSummary {
2346 SessionCheckpointSummary {
2347 checkpoint_id: checkpoint.checkpoint_id.clone(),
2348 before_message_count: checkpoint.before_message_count,
2349 after_message_count: checkpoint.after_message_count,
2350 fs_snapshot_ids: checkpoint.fs_snapshot_ids.clone(),
2351 }
2352}
2353
2354fn checkpoint_error_status(error: SessionCheckpointError) -> &'static str {
2355 match error {
2356 SessionCheckpointError::UnknownSession => "unknown_session",
2357 SessionCheckpointError::NoCheckpoint => "no_checkpoint",
2358 SessionCheckpointError::NoRedo => "no_redo",
2359 }
2360}
2361
2362pub fn checkpoint_status_name(error: SessionCheckpointError) -> &'static str {
2363 checkpoint_error_status(error)
2364}
2365
2366pub fn invalidate_redo(id: &str) -> bool {
2369 SESSIONS.with(|s| {
2370 let mut map = s.borrow_mut();
2371 let Some(state) = map.get_mut(id) else {
2372 return false;
2373 };
2374 let had_redo = !state.redo_stack.is_empty();
2375 state.redo_stack.clear();
2376 state.touch();
2377 had_redo
2378 })
2379}
2380
2381pub fn record_completed_turn_checkpoint(
2388 id: &str,
2389 before_transcript: VmValue,
2390 fs_snapshot_ids: Vec<String>,
2391) -> Result<Option<SessionCheckpointSummary>, SessionCheckpointError> {
2392 SESSIONS.with(|s| {
2393 let mut map = s.borrow_mut();
2394 let Some(state) = map.get_mut(id) else {
2395 return Err(SessionCheckpointError::UnknownSession);
2396 };
2397 let after_transcript = transcript_with_session_metadata(state.transcript.clone(), state);
2398 let before_message_count = transcript_message_count(&before_transcript);
2399 let after_message_count = transcript_message_count(&after_transcript);
2400 if crate::values_equal(&before_transcript, &after_transcript) && fs_snapshot_ids.is_empty()
2401 {
2402 return Ok(None);
2403 }
2404 let checkpoint = SessionTurnCheckpoint {
2405 checkpoint_id: format!("turn_{}", uuid::Uuid::now_v7().simple()),
2406 completed_at: crate::orchestration::now_rfc3339(),
2407 before_message_count,
2408 after_message_count,
2409 before_transcript,
2410 after_transcript,
2411 fs_snapshot_ids,
2412 };
2413 state.redo_stack.clear();
2414 state.completed_turn_checkpoints.push(checkpoint.clone());
2415 state.touch();
2416 Ok(Some(checkpoint_summary(&checkpoint)))
2417 })
2418}
2419
2420pub fn rollback_plan(id: &str) -> Result<SessionCheckpointSummary, SessionCheckpointError> {
2421 SESSIONS.with(|s| {
2422 let map = s.borrow();
2423 let Some(state) = map.get(id) else {
2424 return Err(SessionCheckpointError::UnknownSession);
2425 };
2426 state
2427 .completed_turn_checkpoints
2428 .last()
2429 .map(checkpoint_summary)
2430 .ok_or(SessionCheckpointError::NoCheckpoint)
2431 })
2432}
2433
2434pub fn redo_plan(id: &str) -> Result<SessionCheckpointSummary, SessionCheckpointError> {
2435 SESSIONS.with(|s| {
2436 let map = s.borrow();
2437 let Some(state) = map.get(id) else {
2438 return Err(SessionCheckpointError::UnknownSession);
2439 };
2440 state
2441 .redo_stack
2442 .last()
2443 .map(|entry| {
2444 let mut summary = checkpoint_summary(&entry.checkpoint);
2445 summary.fs_snapshot_ids = entry.redo_fs_snapshot_ids.clone();
2446 summary
2447 })
2448 .ok_or(SessionCheckpointError::NoRedo)
2449 })
2450}
2451
2452pub fn rollback_last_completed_turn(
2453 id: &str,
2454 redo_fs_snapshot_ids: Vec<String>,
2455) -> Result<SessionCheckpointOutcome, SessionCheckpointError> {
2456 SESSIONS.with(|s| {
2457 let mut map = s.borrow_mut();
2458 let Some(state) = map.get_mut(id) else {
2459 return Err(SessionCheckpointError::UnknownSession);
2460 };
2461 let Some(checkpoint) = state.completed_turn_checkpoints.pop() else {
2462 return Err(SessionCheckpointError::NoCheckpoint);
2463 };
2464 state.transcript = checkpoint.before_transcript.clone();
2465 state.redo_stack.push(SessionRedoEntry {
2466 checkpoint: checkpoint.clone(),
2467 redo_fs_snapshot_ids: redo_fs_snapshot_ids.clone(),
2468 });
2469 state.touch();
2470 Ok(SessionCheckpointOutcome {
2471 status: "rolled_back",
2472 checkpoint: checkpoint_summary(&checkpoint),
2473 redo_fs_snapshot_ids,
2474 })
2475 })
2476}
2477
2478pub fn redo_last_rollback(id: &str) -> Result<SessionCheckpointOutcome, SessionCheckpointError> {
2479 SESSIONS.with(|s| {
2480 let mut map = s.borrow_mut();
2481 let Some(state) = map.get_mut(id) else {
2482 return Err(SessionCheckpointError::UnknownSession);
2483 };
2484 let Some(entry) = state.redo_stack.pop() else {
2485 return Err(SessionCheckpointError::NoRedo);
2486 };
2487 let checkpoint = entry.checkpoint;
2488 state.transcript = checkpoint.after_transcript.clone();
2489 state.completed_turn_checkpoints.push(checkpoint.clone());
2490 state.touch();
2491 Ok(SessionCheckpointOutcome {
2492 status: "redone",
2493 checkpoint: checkpoint_summary(&checkpoint),
2494 redo_fs_snapshot_ids: entry.redo_fs_snapshot_ids,
2495 })
2496 })
2497}
2498
2499pub fn prune_invalid_reminder_events(id: &str) -> usize {
2503 SESSIONS.with(|s| {
2504 let mut map = s.borrow_mut();
2505 let Some(state) = map.get_mut(id) else {
2506 return 0;
2507 };
2508 let Some(dict) = state.transcript.as_dict().cloned() else {
2509 return 0;
2510 };
2511 let Some(VmValue::List(events)) = dict.get("events") else {
2512 return 0;
2513 };
2514 let mut pruned = 0_usize;
2515 let mut kept = Vec::with_capacity(events.len());
2516 for event in events.iter().cloned() {
2517 let is_reminder = event
2518 .as_dict()
2519 .and_then(|event| event.get("kind"))
2520 .map(VmValue::display)
2521 .as_deref()
2522 == Some(crate::llm::helpers::SYSTEM_REMINDER_EVENT_KIND);
2523 if !is_reminder {
2524 kept.push(event);
2525 continue;
2526 }
2527 let valid = crate::llm::helpers::reminder_from_event(&event)
2528 .is_some_and(|reminder| !reminder.body.trim().is_empty());
2529 if valid {
2530 kept.push(event);
2531 } else {
2532 pruned += 1;
2533 }
2534 }
2535 if pruned > 0 {
2536 let mut next = dict;
2537 next.insert(
2538 crate::value::intern_key("events"),
2539 VmValue::List(std::sync::Arc::new(kept)),
2540 );
2541 let _ = apply_transcript_with_budget(
2542 state,
2543 VmValue::dict(next),
2544 "prune_invalid_reminder_events",
2545 );
2546 state.touch();
2547 }
2548 pruned
2549 })
2550}
2551
2552pub fn apply_reminder_post_turn(id: &str, turn: i64) -> Result<serde_json::Value, String> {
2557 let report = SESSIONS.with(|s| {
2558 let mut map = s.borrow_mut();
2559 let Some(state) = map.get_mut(id) else {
2560 return Err(format!(
2561 "agent_session_apply_reminder_post_turn: unknown session id '{id}'"
2562 ));
2563 };
2564 let report = crate::llm::helpers::apply_reminder_post_turn(&state.transcript, turn);
2565 if report.decremented_count > 0 || !report.expired.is_empty() {
2566 if let Some(next) = report.transcript.clone() {
2567 apply_transcript_with_budget(state, next, "apply_reminder_post_turn")?;
2568 }
2569 state.touch();
2570 }
2571 Ok(report)
2572 })?;
2573
2574 for reminder in &report.expired {
2575 let mut payload = crate::llm::helpers::reminder_lifecycle_payload(Some(id), reminder);
2576 if let Some(obj) = payload.as_object_mut() {
2577 obj.insert(
2578 "transcript_id".to_string(),
2579 serde_json::Value::String(id.to_string()),
2580 );
2581 obj.insert(
2582 "reason".to_string(),
2583 serde_json::Value::String("ttl".to_string()),
2584 );
2585 obj.insert(
2586 "ttl_turns_before".to_string(),
2587 serde_json::json!(&reminder.ttl_turns),
2588 );
2589 obj.insert("expired_at_turn".to_string(), serde_json::json!(turn));
2590 }
2591 crate::llm::helpers::emit_reminder_lifecycle_event(
2592 crate::llm::helpers::REMINDER_EXPIRED_EVENT_KIND,
2593 payload,
2594 );
2595 }
2596
2597 Ok(serde_json::json!({
2598 "expired_count": report.expired.len(),
2599 "decremented_count": report.decremented_count,
2600 "remaining_count": report.remaining_count,
2601 }))
2602}
2603
2604pub fn inject_reminder(
2609 id: &str,
2610 reminder: crate::llm::helpers::SystemReminder,
2611) -> Result<ReminderInjectionReport, String> {
2612 let reminder_id = reminder.id.clone();
2613 let dedupe_key = reminder.dedupe_key.clone();
2614 let mut deduped_reminder_ids = Vec::new();
2615 SESSIONS.with(|s| {
2616 let mut map = s.borrow_mut();
2617 let Some(state) = map.get_mut(id) else {
2618 return Err(format!(
2619 "agent_session_inject_reminder: unknown session id '{id}'"
2620 ));
2621 };
2622 let dict = state
2623 .transcript
2624 .as_dict()
2625 .cloned()
2626 .unwrap_or_else(crate::value::DictMap::new);
2627 let mut events: Vec<VmValue> = match dict.get("events") {
2628 Some(VmValue::List(list)) => list.iter().cloned().collect(),
2629 _ => dict
2630 .get("messages")
2631 .and_then(|value| match value {
2632 VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
2633 _ => None,
2634 })
2635 .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
2636 .unwrap_or_default(),
2637 };
2638 if let Some(expected_key) = dedupe_key.as_deref() {
2639 events.retain(|event| {
2640 let Some(existing) = crate::llm::helpers::reminder_from_event(event) else {
2641 return true;
2642 };
2643 if existing.dedupe_key.as_deref() == Some(expected_key) {
2644 deduped_reminder_ids.push(existing.id);
2645 false
2646 } else {
2647 true
2648 }
2649 });
2650 }
2651 events.push(crate::llm::helpers::transcript_reminder_event(&reminder));
2652 let mut next = dict;
2653 next.insert(
2654 crate::value::intern_key("events"),
2655 VmValue::List(std::sync::Arc::new(events)),
2656 );
2657 apply_transcript_with_budget(state, VmValue::dict(next), "inject_reminder")?;
2658 state.touch();
2659 Ok(())
2660 })?;
2661
2662 if !deduped_reminder_ids.is_empty() {
2663 let dropped_count = deduped_reminder_ids.len();
2664 crate::llm::helpers::emit_reminder_lifecycle_event(
2665 crate::llm::helpers::REMINDER_DEDUPED_EVENT_KIND,
2666 serde_json::json!({
2667 "session_id": id,
2668 "transcript_id": id,
2669 "reminder_id": &reminder_id,
2670 "replacing_id": &reminder_id,
2671 "replaced_id": deduped_reminder_ids.first(),
2672 "replaced_ids": &deduped_reminder_ids,
2673 "dedupe_key": &dedupe_key,
2674 "dropped_reminder_ids": &deduped_reminder_ids,
2675 "dropped_count": dropped_count,
2676 }),
2677 );
2678 }
2679
2680 crate::llm::helpers::emit_reminder_lifecycle_event(
2681 crate::llm::helpers::REMINDER_INJECTED_EVENT_KIND,
2682 crate::llm::helpers::reminder_lifecycle_payload(Some(id), &reminder),
2683 );
2684
2685 Ok(ReminderInjectionReport {
2686 reminder_id,
2687 deduped_count: deduped_reminder_ids.len(),
2688 })
2689}
2690
2691pub fn append_event(id: &str, event: VmValue) -> Result<(), String> {
2697 let Some(event_dict) = event.as_dict() else {
2698 return Err("agent_session_append_event: event must be a dict".into());
2699 };
2700 let kind_ok = matches!(event_dict.get("kind"), Some(VmValue::String(_)));
2701 if !kind_ok {
2702 return Err("agent_session_append_event: event must have a string `kind`".into());
2703 }
2704 SESSIONS.with(|s| {
2705 let mut map = s.borrow_mut();
2706 let Some(state) = map.get_mut(id) else {
2707 return Err(format!(
2708 "agent_session_append_event: unknown session id '{id}'"
2709 ));
2710 };
2711 append_event_to_state(state, event, "append_event")?;
2712 Ok(())
2713 })
2714}
2715
2716fn append_event_to_state(
2717 state: &mut SessionState,
2718 event: VmValue,
2719 action: &str,
2720) -> Result<(), String> {
2721 let dict = state
2722 .transcript
2723 .as_dict()
2724 .cloned()
2725 .unwrap_or_else(crate::value::DictMap::new);
2726 let mut events: Vec<VmValue> = match dict.get("events") {
2727 Some(VmValue::List(list)) => list.iter().cloned().collect(),
2728 _ => dict
2729 .get("messages")
2730 .and_then(|value| match value {
2731 VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
2732 _ => None,
2733 })
2734 .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
2735 .unwrap_or_default(),
2736 };
2737 events.push(event);
2738 let mut next = dict;
2739 next.insert(
2740 crate::value::intern_key("events"),
2741 VmValue::List(std::sync::Arc::new(events)),
2742 );
2743 apply_transcript_with_budget(state, VmValue::dict(next), action)
2744}
2745
2746pub fn replace_messages(id: &str, messages: &[serde_json::Value]) -> Result<(), String> {
2749 replace_messages_with_summary(id, messages, None)
2750}
2751
2752pub fn replace_messages_with_summary(
2757 id: &str,
2758 messages: &[serde_json::Value],
2759 summary: Option<&str>,
2760) -> Result<(), String> {
2761 SESSIONS.with(|s| {
2762 let mut map = s.borrow_mut();
2763 let Some(state) = map.get_mut(id) else {
2764 return Err(format!(
2765 "agent_session_replace_messages: unknown session id '{id}'"
2766 ));
2767 };
2768 let dict = state
2769 .transcript
2770 .as_dict()
2771 .cloned()
2772 .unwrap_or_else(crate::value::DictMap::new);
2773 let vm_messages: Vec<VmValue> = messages
2774 .iter()
2775 .map(crate::stdlib::json_to_vm_value)
2776 .collect();
2777 let mut next = dict;
2778 next.insert(
2779 crate::value::intern_key("events"),
2780 VmValue::List(std::sync::Arc::new(
2781 crate::llm::helpers::transcript_events_from_messages(&vm_messages),
2782 )),
2783 );
2784 next.insert(
2785 crate::value::intern_key("messages"),
2786 VmValue::List(std::sync::Arc::new(vm_messages)),
2787 );
2788 if let Some(summary) = summary {
2789 next.put_str("summary", summary);
2790 } else {
2791 next.remove("summary");
2792 }
2793 apply_transcript_with_budget(state, VmValue::dict(next), "replace_messages")?;
2794 Ok(())
2795 })
2796}
2797
2798pub fn append_subscriber(id: &str, callback: VmValue) {
2799 open_or_create(Some(id.to_string()));
2800 SESSIONS.with(|s| {
2801 if let Some(state) = s.borrow_mut().get_mut(id) {
2802 state.subscribers.push(callback);
2803 state.touch();
2804 }
2805 });
2806}
2807
2808pub fn subscribers_for(id: &str) -> Vec<VmValue> {
2809 SESSIONS.with(|s| {
2810 s.borrow()
2811 .get(id)
2812 .map(|state| state.subscribers.clone())
2813 .unwrap_or_default()
2814 })
2815}
2816
2817pub fn subscriber_count(id: &str) -> usize {
2818 SESSIONS.with(|s| {
2819 s.borrow()
2820 .get(id)
2821 .map(|state| state.subscribers.len())
2822 .unwrap_or(0)
2823 })
2824}
2825
2826pub fn set_active_skills(id: &str, skills: Vec<String>) {
2830 SESSIONS.with(|s| {
2831 if let Some(state) = s.borrow_mut().get_mut(id) {
2832 state.active_skills = skills;
2833 state.touch();
2834 }
2835 });
2836}
2837
2838pub fn active_skills(id: &str) -> Vec<String> {
2842 SESSIONS.with(|s| {
2843 s.borrow()
2844 .get(id)
2845 .map(|state| state.active_skills.clone())
2846 .unwrap_or_default()
2847 })
2848}
2849
2850pub fn claim_tool_format(id: &str, tool_format: &str) -> Result<(), String> {
2856 let tool_format = tool_format.trim();
2857 if tool_format.is_empty() {
2858 return Ok(());
2859 }
2860 SESSIONS.with(|s| {
2861 let mut map = s.borrow_mut();
2862 let Some(state) = map.get_mut(id) else {
2863 return Err(format!("agent session '{id}' does not exist"));
2864 };
2865 match state.tool_format.as_deref() {
2866 Some(existing) if existing != tool_format => Err(format!(
2867 "agent session '{id}' is locked to tool_format='{existing}', but this run requested tool_format='{tool_format}'. Start a new session or fork/reset the transcript before changing tool mode."
2868 )),
2869 Some(_) => {
2870 state.touch();
2871 Ok(())
2872 }
2873 None => {
2874 state.tool_format = Some(tool_format.to_string());
2875 state.touch();
2876 Ok(())
2877 }
2878 }
2879 })
2880}
2881
2882pub fn tool_format(id: &str) -> Option<String> {
2883 SESSIONS.with(|s| {
2884 s.borrow()
2885 .get(id)
2886 .and_then(|state| state.tool_format.clone())
2887 })
2888}
2889
2890pub fn record_system_prompt(id: &str, system_prompt: &str) -> Result<(), String> {
2891 let system_prompt = system_prompt.trim();
2892 if system_prompt.is_empty() {
2893 return Ok(());
2894 }
2895 assert_cache_stable_system_prompt(system_prompt);
2896 SESSIONS.with(|s| {
2897 let mut map = s.borrow_mut();
2898 let Some(state) = map.get_mut(id) else {
2899 return Err(format!("agent session '{id}' does not exist"));
2900 };
2901 let changed = state.system_prompt.as_deref() != Some(system_prompt);
2902 state.system_prompt = Some(system_prompt.to_string());
2903 let dict = state
2904 .transcript
2905 .as_dict()
2906 .cloned()
2907 .unwrap_or_else(crate::value::DictMap::new);
2908 let mut next = dict;
2909 apply_system_prompt_metadata(&mut next, system_prompt);
2910 if changed {
2911 let mut events: Vec<VmValue> = match next.get("events") {
2912 Some(VmValue::List(list)) => list.iter().cloned().collect(),
2913 _ => Vec::new(),
2914 };
2915 events.push(crate::llm::helpers::transcript_event(
2916 "system_prompt",
2917 "system",
2918 "internal",
2919 "",
2920 Some(crate::llm::helpers::system_prompt_event_metadata(
2921 system_prompt,
2922 )),
2923 ));
2924 next.insert(
2925 crate::value::intern_key("events"),
2926 VmValue::List(std::sync::Arc::new(events)),
2927 );
2928 }
2929 apply_transcript_with_budget(state, VmValue::dict(next), "record_system_prompt")?;
2930 Ok(())
2931 })
2932}
2933
2934pub fn system_prompt(id: &str) -> Option<String> {
2935 SESSIONS.with(|s| {
2936 s.borrow()
2937 .get(id)
2938 .and_then(|state| state.system_prompt.clone())
2939 })
2940}
2941
2942#[cfg(debug_assertions)]
2943fn forbidden_workspace_prompt_token(system_prompt: &str) -> Option<&'static str> {
2944 let mut remaining = system_prompt;
2945 while let Some(index) = remaining.find("{{") {
2946 let candidate = remaining[index + 2..].trim_start();
2947 if candidate.starts_with("workspace_") {
2948 return Some("workspace_");
2949 }
2950 if candidate.starts_with("project_") {
2951 return Some("project_");
2952 }
2953 remaining = candidate;
2954 }
2955 None
2956}
2957
2958#[cfg(debug_assertions)]
2959fn assert_cache_stable_system_prompt(system_prompt: &str) {
2960 if let Some(prefix) = forbidden_workspace_prompt_token(system_prompt) {
2961 panic!(
2962 "{CACHE_STABLE_SYSTEM_PROMPT_DIAGNOSTIC}: session system prompts must not interpolate `{{{{{prefix}...` tokens; move workspace/project context into the workspace-anchor reminder"
2963 );
2964 }
2965}
2966
2967#[cfg(not(debug_assertions))]
2968fn assert_cache_stable_system_prompt(_system_prompt: &str) {}
2969
2970pub fn set_pinned_model(id: &str, model: Option<String>) -> Result<bool, String> {
2975 let normalized = model
2976 .map(|value| value.trim().to_string())
2977 .filter(|value| !value.is_empty());
2978 SESSIONS.with(|s| {
2979 let mut map = s.borrow_mut();
2980 let Some(state) = map.get_mut(id) else {
2981 return Err(format!("agent session '{id}' does not exist"));
2982 };
2983 let changed = state.pinned_model != normalized;
2984 state.pinned_model = normalized;
2985 state.touch();
2986 Ok(changed)
2987 })
2988}
2989
2990pub fn pinned_model(id: &str) -> Option<String> {
2994 SESSIONS.with(|s| {
2995 s.borrow()
2996 .get(id)
2997 .and_then(|state| state.pinned_model.clone())
2998 })
2999}
3000
3001pub fn set_pinned_reasoning_policy(id: &str, policy: Option<String>) -> Result<bool, String> {
3003 let normalized = match policy {
3004 Some(value) => crate::llm::reasoning_policy::normalize_policy_selector(&value)?,
3005 None => None,
3006 };
3007 SESSIONS.with(|s| {
3008 let mut map = s.borrow_mut();
3009 let Some(state) = map.get_mut(id) else {
3010 return Err(format!("agent session '{id}' does not exist"));
3011 };
3012 let changed = state.pinned_reasoning_policy != normalized;
3013 state.pinned_reasoning_policy = normalized;
3014 state.touch();
3015 Ok(changed)
3016 })
3017}
3018
3019pub fn pinned_reasoning_policy(id: &str) -> Option<String> {
3021 SESSIONS.with(|s| {
3022 s.borrow()
3023 .get(id)
3024 .and_then(|state| state.pinned_reasoning_policy.clone())
3025 })
3026}
3027
3028pub fn set_workspace_anchor(id: &str, anchor: Option<WorkspaceAnchor>) -> Result<bool, String> {
3032 SESSIONS.with(|s| {
3033 let mut map = s.borrow_mut();
3034 let Some(state) = map.get_mut(id) else {
3035 return Err(format!("agent session '{id}' does not exist"));
3036 };
3037 let changed = state.workspace_anchor != anchor;
3038 state.workspace_anchor = anchor;
3039 if changed {
3040 state.redo_stack.clear();
3041 crate::llm::permissions::clear_session_grants(id);
3042 }
3043 state.touch();
3044 Ok(changed)
3045 })
3046}
3047
3048pub fn workspace_anchor(id: &str) -> Option<WorkspaceAnchor> {
3050 SESSIONS.with(|s| {
3051 s.borrow()
3052 .get(id)
3053 .and_then(|state| state.workspace_anchor.clone())
3054 })
3055}
3056
3057#[derive(Clone, Debug, PartialEq, Eq)]
3061pub struct ReanchorOutcome {
3062 pub previous: Option<WorkspaceAnchor>,
3063 pub current: WorkspaceAnchor,
3064 pub changed: bool,
3065}
3066
3067pub fn reanchor_session(
3072 id: &str,
3073 new_anchor: WorkspaceAnchor,
3074 carry_transcript: bool,
3075 compacted: bool,
3076 reason: Option<String>,
3077) -> Result<ReanchorOutcome, String> {
3078 let outcome = SESSIONS.with(|s| {
3079 let mut map = s.borrow_mut();
3080 let Some(state) = map.get_mut(id) else {
3081 return Err(format!("agent session '{id}' does not exist"));
3082 };
3083 let previous = state.workspace_anchor.clone();
3084 let changed = previous.as_ref() != Some(&new_anchor);
3085 state.workspace_anchor = Some(new_anchor.clone());
3086 if changed {
3087 crate::llm::permissions::clear_session_grants(id);
3088 }
3089 state.touch();
3090 Ok(ReanchorOutcome {
3091 previous,
3092 current: new_anchor,
3093 changed,
3094 })
3095 })?;
3096 if !outcome.changed {
3097 return Ok(outcome);
3098 }
3099 let previous_json = outcome.previous.as_ref().map(WorkspaceAnchor::to_json);
3100 let current_json = outcome.current.to_json();
3101 let event_metadata = serde_json::json!({
3102 "previous": previous_json,
3103 "current": current_json,
3104 "carry_transcript": carry_transcript,
3105 "compacted": compacted,
3106 "reason": reason,
3107 });
3108 let event = crate::llm::helpers::transcript_event(
3109 "AnchorChanged",
3110 "system",
3111 "internal",
3112 "",
3113 Some(event_metadata),
3114 );
3115 let _ = append_event(id, event);
3116 crate::llm::emit_live_agent_event_sync(&crate::agent_events::AgentEvent::AnchorChanged {
3117 session_id: id.to_string(),
3118 previous: previous_json,
3119 current: current_json,
3120 carry_transcript,
3121 compacted,
3122 reason,
3123 });
3124 Ok(outcome)
3125}
3126
3127pub fn set_workspace_policy(id: &str, policy: WorkspacePolicy) -> Result<bool, String> {
3130 SESSIONS.with(|s| {
3131 let mut map = s.borrow_mut();
3132 let Some(state) = map.get_mut(id) else {
3133 return Err(format!("agent session '{id}' does not exist"));
3134 };
3135 let changed = state.workspace_policy != policy;
3136 state.workspace_policy = policy;
3137 if changed {
3138 state.redo_stack.clear();
3139 }
3140 state.touch();
3141 Ok(changed)
3142 })
3143}
3144
3145pub fn workspace_policy(id: &str) -> Option<WorkspacePolicy> {
3147 SESSIONS.with(|s| {
3148 s.borrow()
3149 .get(id)
3150 .map(|state| state.workspace_policy.clone())
3151 })
3152}
3153
3154pub fn add_workspace_root(
3158 id: &str,
3159 root: &str,
3160 mount_mode: Option<MountMode>,
3161 reason: Option<String>,
3162) -> Result<String, String> {
3163 let normalized_root = validate_workspace_root_path(root)?;
3164 let mounted_at = crate::orchestration::now_rfc3339();
3165 SESSIONS.with(|s| {
3166 let mut map = s.borrow_mut();
3167 let Some(state) = map.get_mut(id) else {
3168 return Err(format!("agent session '{id}' does not exist"));
3169 };
3170 let default_mount_mode = state.workspace_policy.default_mount_mode;
3171 let Some(anchor) = state.workspace_anchor.as_mut() else {
3172 return Err(format!("agent session '{id}' has no workspace anchor"));
3173 };
3174 let resolved_mount_mode = mount_mode.unwrap_or(default_mount_mode);
3175 if let Some(existing) = anchor
3176 .additional_roots
3177 .iter_mut()
3178 .find(|entry| entry.path == normalized_root)
3179 {
3180 let changed =
3181 existing.mount_mode != resolved_mount_mode || existing.mounted_at != mounted_at;
3182 existing.mount_mode = resolved_mount_mode;
3183 existing.mounted_at = mounted_at.clone();
3184 if changed {
3185 state.redo_stack.clear();
3186 }
3187 } else {
3188 anchor.additional_roots.push(MountedRoot {
3189 path: normalized_root.clone(),
3190 mount_mode: resolved_mount_mode,
3191 mounted_at: mounted_at.clone(),
3192 });
3193 state.redo_stack.clear();
3194 }
3195 let event = crate::llm::helpers::transcript_event(
3196 "RootMounted",
3197 "system",
3198 "internal",
3199 "",
3200 Some(serde_json::json!({
3201 "path": normalized_root.to_string_lossy(),
3202 "mount_mode": resolved_mount_mode.as_str(),
3203 "mounted_at": mounted_at.clone(),
3204 "reason": reason,
3205 })),
3206 );
3207 append_event_to_state(state, event, "add_workspace_root")?;
3208 crate::llm::permissions::clear_session_grants(id);
3209 state.touch();
3210 Ok(mounted_at.clone())
3211 })
3212}
3213
3214pub fn remove_workspace_root(id: &str, root: &str) -> Result<bool, String> {
3217 let normalized_root = normalize_workspace_root_path(root);
3218 SESSIONS.with(|s| {
3219 let mut map = s.borrow_mut();
3220 let Some(state) = map.get_mut(id) else {
3221 return Err(format!("agent session '{id}' does not exist"));
3222 };
3223 let Some(anchor) = state.workspace_anchor.as_mut() else {
3224 return Err(format!("agent session '{id}' has no workspace anchor"));
3225 };
3226 let before = anchor.additional_roots.len();
3227 anchor
3228 .additional_roots
3229 .retain(|entry| entry.path != normalized_root);
3230 let removed = anchor.additional_roots.len() != before;
3231 if removed {
3232 state.redo_stack.clear();
3233 crate::llm::permissions::clear_session_grants(id);
3234 }
3235 state.touch();
3236 Ok(removed)
3237 })
3238}
3239
3240pub fn list_workspace_roots(id: &str) -> Result<(PathBuf, Vec<MountedRoot>), String> {
3241 SESSIONS.with(|s| {
3242 let map = s.borrow();
3243 let Some(state) = map.get(id) else {
3244 return Err(format!("agent session '{id}' does not exist"));
3245 };
3246 let Some(anchor) = state.workspace_anchor.as_ref() else {
3247 return Err(format!("agent session '{id}' has no workspace anchor"));
3248 };
3249 Ok((anchor.primary.clone(), anchor.additional_roots.clone()))
3250 })
3251}
3252
3253fn validate_workspace_root_path(root: &str) -> Result<PathBuf, String> {
3254 let normalized = normalize_workspace_root_path(root);
3255 let canonical = std::fs::canonicalize(&normalized)
3256 .map_err(|error| format!("workspace root '{root}' must exist and be readable: {error}"))?;
3257 let metadata = std::fs::metadata(&canonical)
3258 .map_err(|error| format!("workspace root '{root}' must exist and be readable: {error}"))?;
3259 if !metadata.is_dir() {
3260 return Err(format!("workspace root '{root}' must be a directory"));
3261 }
3262 std::fs::read_dir(&canonical)
3263 .map_err(|error| format!("workspace root '{root}' must be readable: {error}"))?;
3264 Ok(canonical)
3265}
3266
3267fn normalize_workspace_root_path(root: &str) -> PathBuf {
3268 let absolute = crate::stdlib::process::normalize_context_path(Path::new(root));
3269 std::fs::canonicalize(&absolute).unwrap_or(absolute)
3270}
3271
3272fn empty_transcript(id: &str) -> VmValue {
3273 use crate::llm::helpers::new_transcript_with;
3274 new_transcript_with(Some(id.to_string()), Vec::new(), None, None)
3275}
3276
3277fn clone_transcript_with_id(transcript: &VmValue, new_id: &str) -> VmValue {
3278 let Some(dict) = transcript.as_dict() else {
3279 return empty_transcript(new_id);
3280 };
3281 let mut next = dict.clone();
3282 next.put_str("id", new_id);
3283 VmValue::dict(next)
3284}
3285
3286fn clone_transcript_with_parent(transcript: &VmValue, parent_id: &str) -> VmValue {
3287 let Some(dict) = transcript.as_dict() else {
3288 return transcript.clone();
3289 };
3290 let mut next = dict.clone();
3291 let metadata = match next.get("metadata") {
3292 Some(VmValue::Dict(metadata)) => {
3293 let mut metadata = metadata.as_ref().clone();
3294 metadata.put_str("parent_session_id", parent_id);
3295 VmValue::dict(metadata)
3296 }
3297 _ => VmValue::dict(BTreeMap::from([(
3298 "parent_session_id".to_string(),
3299 VmValue::String(arcstr::ArcStr::from(parent_id.to_string())),
3300 )])),
3301 };
3302 next.insert(crate::value::intern_key("metadata"), metadata);
3303 VmValue::dict(next)
3304}
3305
3306fn apply_system_prompt_metadata(next: &mut crate::value::DictMap, system_prompt: &str) {
3307 let mut metadata = match next.get("metadata") {
3308 Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
3309 _ => crate::value::DictMap::new(),
3310 };
3311 metadata.insert(
3312 crate::value::intern_key("system_prompt"),
3313 crate::stdlib::json_to_vm_value(&crate::llm::helpers::system_prompt_metadata(
3314 system_prompt,
3315 )),
3316 );
3317 next.insert(
3318 crate::value::intern_key("metadata"),
3319 VmValue::dict(metadata),
3320 );
3321}
3322
3323fn transcript_with_session_metadata(transcript: VmValue, state: &SessionState) -> VmValue {
3324 let Some(dict) = transcript.as_dict() else {
3325 return transcript;
3326 };
3327 let mut next = dict.clone();
3328 let mut metadata = match next.get("metadata") {
3329 Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
3330 _ => crate::value::DictMap::new(),
3331 };
3332 if let Some(tool_format) = state.tool_format.as_ref() {
3333 metadata.put_str("tool_format", tool_format.clone());
3334 metadata.insert(
3335 crate::value::intern_key("tool_mode_locked"),
3336 VmValue::Bool(true),
3337 );
3338 }
3339 if let Some(system_prompt) = state.system_prompt.as_ref() {
3340 metadata.insert(
3341 crate::value::intern_key("system_prompt"),
3342 crate::stdlib::json_to_vm_value(&crate::llm::helpers::system_prompt_metadata(
3343 system_prompt,
3344 )),
3345 );
3346 }
3347 if let Some(actor_chain) = state.actor_chain.as_ref() {
3348 metadata.insert(
3349 crate::value::intern_key("actor_chain"),
3350 actor_chain.to_vm_value(),
3351 );
3352 } else {
3353 metadata.remove("actor_chain");
3354 }
3355 if let Some(anchor) = state.workspace_anchor.as_ref() {
3356 metadata.insert(
3357 crate::value::intern_key(WORKSPACE_ANCHOR_METADATA_KEY),
3358 anchor.to_vm_value(),
3359 );
3360 } else {
3361 metadata.remove(WORKSPACE_ANCHOR_METADATA_KEY);
3362 }
3363 if let Some(scratchpad) = state.scratchpad.as_ref() {
3364 metadata.insert(
3365 crate::value::intern_key("agent_scratchpad"),
3366 scratchpad.clone(),
3367 );
3368 metadata.insert(
3369 crate::value::intern_key("agent_scratchpad_version"),
3370 VmValue::Int(state.scratchpad_version as i64),
3371 );
3372 } else {
3373 metadata.remove("agent_scratchpad");
3374 metadata.remove("agent_scratchpad_version");
3375 }
3376 if let Some(last_action) = state.last_transcript_budget_action.as_ref() {
3377 let usage = transcript_usage(
3378 &VmValue::dict(next.clone()),
3379 state.transcript_budget_policy.max_approx_bytes.is_some(),
3380 );
3381 metadata.insert(
3382 crate::value::intern_key("transcript_budget"),
3383 crate::stdlib::json_to_vm_value(&serde_json::json!({
3384 "policy": transcript_budget_policy_json(&state.transcript_budget_policy.normalized()),
3385 "usage": transcript_budget_usage_json(&usage),
3386 "last_action": last_action,
3387 })),
3388 );
3389 }
3390 if !metadata.is_empty() {
3391 next.insert(
3392 crate::value::intern_key("metadata"),
3393 VmValue::dict(metadata),
3394 );
3395 }
3396 VmValue::dict(next)
3397}
3398
3399fn session_snapshot(state: &SessionState) -> VmValue {
3400 let transcript = transcript_with_session_metadata(state.transcript.clone(), state);
3401 let Some(dict) = transcript.as_dict() else {
3402 return state.transcript.clone();
3403 };
3404 let mut next = dict.clone();
3405 let length = next
3406 .get("messages")
3407 .and_then(|value| match value {
3408 VmValue::List(list) => Some(list.len() as i64),
3409 _ => None,
3410 })
3411 .unwrap_or(0);
3412 next.insert(crate::value::intern_key("length"), VmValue::Int(length));
3413 next.put_str("created_at", state.created_at.clone());
3414 next.insert(
3415 crate::value::intern_key("parent_id"),
3416 state
3417 .parent_id
3418 .as_ref()
3419 .map(|id| VmValue::String(arcstr::ArcStr::from(id.clone())))
3420 .unwrap_or(VmValue::Nil),
3421 );
3422 next.insert(
3423 crate::value::intern_key("child_ids"),
3424 VmValue::List(std::sync::Arc::new(
3425 state
3426 .child_ids
3427 .iter()
3428 .cloned()
3429 .map(|id| VmValue::String(arcstr::ArcStr::from(id)))
3430 .collect(),
3431 )),
3432 );
3433 next.insert(
3434 crate::value::intern_key("branched_at_event_index"),
3435 state
3436 .branched_at_event_index
3437 .map(|index| VmValue::Int(index as i64))
3438 .unwrap_or(VmValue::Nil),
3439 );
3440 next.insert(
3441 crate::value::intern_key("system_prompt"),
3442 state
3443 .system_prompt
3444 .as_ref()
3445 .map(|prompt| VmValue::String(arcstr::ArcStr::from(prompt.clone())))
3446 .unwrap_or(VmValue::Nil),
3447 );
3448 next.insert(
3449 crate::value::intern_key("tool_format"),
3450 state
3451 .tool_format
3452 .as_ref()
3453 .map(|format| VmValue::String(arcstr::ArcStr::from(format.clone())))
3454 .unwrap_or(VmValue::Nil),
3455 );
3456 next.insert(
3457 crate::value::intern_key("pinned_model"),
3458 state
3459 .pinned_model
3460 .as_ref()
3461 .map(|model| VmValue::String(arcstr::ArcStr::from(model.clone())))
3462 .unwrap_or(VmValue::Nil),
3463 );
3464 next.insert(
3465 crate::value::intern_key("pinned_reasoning_policy"),
3466 state
3467 .pinned_reasoning_policy
3468 .as_ref()
3469 .map(|policy| VmValue::String(arcstr::ArcStr::from(policy.clone())))
3470 .unwrap_or(VmValue::Nil),
3471 );
3472 next.insert(
3473 crate::value::intern_key("actor_chain"),
3474 state
3475 .actor_chain
3476 .as_ref()
3477 .map(ActorChain::to_vm_value)
3478 .unwrap_or(VmValue::Nil),
3479 );
3480 next.insert(
3481 crate::value::intern_key("scratchpad"),
3482 state.scratchpad.clone().unwrap_or(VmValue::Nil),
3483 );
3484 next.insert(
3485 crate::value::intern_key("scratchpad_version"),
3486 VmValue::Int(state.scratchpad_version as i64),
3487 );
3488 next.insert(
3489 crate::value::intern_key("workspace_anchor"),
3490 state
3491 .workspace_anchor
3492 .as_ref()
3493 .map(WorkspaceAnchor::to_vm_value)
3494 .unwrap_or(VmValue::Nil),
3495 );
3496 next.insert(
3497 crate::value::intern_key("workspace_policy"),
3498 state.workspace_policy.to_vm_value(),
3499 );
3500 next.insert(
3501 crate::value::intern_key("live_clients"),
3502 crate::stdlib::json_to_vm_value(&serde_json::Value::Array(
3503 state.live_clients.values().map(live_client_json).collect(),
3504 )),
3505 );
3506 next.insert(
3507 crate::value::intern_key("live_controller_id"),
3508 state
3509 .live_controller_id
3510 .as_ref()
3511 .map(|id| VmValue::String(arcstr::ArcStr::from(id.clone())))
3512 .unwrap_or(VmValue::Nil),
3513 );
3514 next.insert(
3515 crate::value::intern_key("completed_turn_checkpoint_count"),
3516 VmValue::Int(state.completed_turn_checkpoints.len() as i64),
3517 );
3518 next.insert(
3519 crate::value::intern_key("redo_checkpoint_count"),
3520 VmValue::Int(state.redo_stack.len() as i64),
3521 );
3522 VmValue::dict(next)
3523}
3524
3525fn update_lineage(
3526 map: &mut HashMap<String, SessionState>,
3527 parent_id: &str,
3528 child_id: &str,
3529 branched_at_event_index: Option<usize>,
3530) {
3531 let old_parent_id = map.get(child_id).and_then(|child| child.parent_id.clone());
3532 if let Some(old_parent_id) = old_parent_id.filter(|old_parent_id| old_parent_id != parent_id) {
3533 if let Some(old_parent) = map.get_mut(&old_parent_id) {
3534 old_parent.child_ids.retain(|id| id != child_id);
3535 old_parent.touch();
3536 }
3537 }
3538 if let Some(parent) = map.get_mut(parent_id) {
3539 parent.touch();
3540 if !parent.child_ids.iter().any(|id| id == child_id) {
3541 parent.child_ids.push(child_id.to_string());
3542 }
3543 }
3544 if let Some(child) = map.get_mut(child_id) {
3545 child.touch();
3546 child.parent_id = Some(parent_id.to_string());
3547 child.branched_at_event_index = branched_at_event_index;
3548 child.transcript = clone_transcript_with_parent(&child.transcript, parent_id);
3549 }
3550}
3551
3552fn branch_event_index(transcript: &VmValue, keep_first: usize) -> usize {
3553 if keep_first == 0 {
3554 return 0;
3555 }
3556 let Some(dict) = transcript.as_dict() else {
3557 return keep_first;
3558 };
3559 let Some(VmValue::List(events)) = dict.get("events") else {
3560 return keep_first;
3561 };
3562 event_prefix_len_for_messages(events, keep_first)
3563}
3564
3565fn event_kind(event: &VmValue) -> Option<String> {
3566 event
3567 .as_dict()
3568 .and_then(|dict| dict.get("kind"))
3569 .map(VmValue::display)
3570}
3571
3572fn event_id(event: &VmValue) -> Option<String> {
3573 event
3574 .as_dict()
3575 .and_then(|dict| dict.get("id"))
3576 .map(VmValue::display)
3577}
3578
3579fn is_turn_event(event: &VmValue) -> bool {
3580 matches!(
3581 event_kind(event).as_deref(),
3582 Some("message" | "tool_result")
3583 )
3584}
3585
3586fn event_prefix_len_for_messages(events: &[VmValue], keep_first: usize) -> usize {
3587 if keep_first == 0 {
3588 return 0;
3589 }
3590 let mut retained_messages = 0usize;
3591 for (index, event) in events.iter().enumerate() {
3592 if is_turn_event(event) {
3593 retained_messages += 1;
3594 if retained_messages == keep_first {
3595 return index + 1;
3596 }
3597 }
3598 }
3599 events.len()
3600}
3601
3602fn turn_event_id_for_count(events: &[VmValue], keep_first: usize) -> Option<String> {
3603 if keep_first == 0 {
3604 return None;
3605 }
3606 let mut retained_messages = 0usize;
3607 for event in events {
3608 if is_turn_event(event) {
3609 retained_messages += 1;
3610 if retained_messages == keep_first {
3611 return event_id(event);
3612 }
3613 }
3614 }
3615 None
3616}
3617
3618#[cfg(test)]
3619#[path = "agent_sessions_tests.rs"]
3620mod tests;