1use std::cell::{Cell, RefCell};
20use std::collections::{BTreeMap, HashMap, HashSet};
21use std::future::Future;
22use std::path::{Path, PathBuf};
23use std::time::Instant;
24
25use crate::runtime_limits::RuntimeLimits;
26use crate::value::VmValue;
27use crate::workspace_anchor::{
28 MountMode, MountedRoot, WorkspaceAnchor, WorkspacePolicy, WORKSPACE_ANCHOR_METADATA_KEY,
29};
30
31const LIVE_CLIENT_EVENT_KIND: &str = "live_session_client";
32const LIVE_CLIENT_PERMISSION_EVENT_KIND: &str = "live_session_permission_route";
33
34pub const DEFAULT_SESSION_CAP: usize = RuntimeLimits::DEFAULT.max_agent_sessions;
37
38pub const DEFAULT_TRANSCRIPT_MESSAGE_CAP: usize = 4096;
42
43pub const DEFAULT_TRANSCRIPT_EVENT_CAP: usize = 32768;
46pub const MAX_SCRATCHPAD_BYTES: usize = 16 * 1024;
47#[cfg(debug_assertions)]
48const CACHE_STABLE_SYSTEM_PROMPT_DIAGNOSTIC: &str = "HARN-CACHE-001";
49
50#[derive(Clone, Debug, PartialEq, Eq)]
51pub enum TranscriptBudgetRecovery {
52 Reject,
53 Trim { keep_last: usize },
54 Compact { keep_last: usize },
55}
56
57#[derive(Clone, Debug, PartialEq, Eq)]
58pub struct SessionTranscriptBudgetPolicy {
59 pub max_messages: usize,
60 pub max_events: usize,
61 pub max_approx_bytes: Option<usize>,
62 pub recovery: TranscriptBudgetRecovery,
63}
64
65impl SessionTranscriptBudgetPolicy {
66 pub fn reject(max_messages: usize, max_events: usize) -> Self {
67 Self {
68 max_messages: max_messages.max(1),
69 max_events: max_events.max(1),
70 max_approx_bytes: None,
71 recovery: TranscriptBudgetRecovery::Reject,
72 }
73 }
74
75 pub fn trim(max_messages: usize, max_events: usize, keep_last: usize) -> Self {
76 Self {
77 max_messages: max_messages.max(1),
78 max_events: max_events.max(1),
79 max_approx_bytes: None,
80 recovery: TranscriptBudgetRecovery::Trim { keep_last },
81 }
82 }
83
84 pub fn compact(max_messages: usize, max_events: usize, keep_last: usize) -> Self {
85 Self {
86 max_messages: max_messages.max(1),
87 max_events: max_events.max(1),
88 max_approx_bytes: None,
89 recovery: TranscriptBudgetRecovery::Compact { keep_last },
90 }
91 }
92
93 pub fn with_max_approx_bytes(mut self, max_approx_bytes: Option<usize>) -> Self {
94 self.max_approx_bytes = max_approx_bytes.map(|limit| limit.max(1));
95 self
96 }
97
98 fn normalized(&self) -> Self {
99 Self {
100 max_messages: self.max_messages.max(1),
101 max_events: self.max_events.max(1),
102 max_approx_bytes: self.max_approx_bytes.map(|limit| limit.max(1)),
103 recovery: self.recovery.clone(),
104 }
105 }
106}
107
108impl Default for SessionTranscriptBudgetPolicy {
109 fn default() -> Self {
110 Self::reject(DEFAULT_TRANSCRIPT_MESSAGE_CAP, DEFAULT_TRANSCRIPT_EVENT_CAP)
111 }
112}
113
114pub struct SessionState {
115 pub id: String,
116 pub transcript: VmValue,
117 pub subscribers: Vec<VmValue>,
118 pub created_at: String,
119 pub last_accessed: Instant,
120 pub parent_id: Option<String>,
121 pub child_ids: Vec<String>,
122 pub branched_at_event_index: Option<usize>,
123 pub active_skills: Vec<String>,
129 pub tool_format: Option<String>,
133 pub system_prompt: Option<String>,
137 pub pinned_model: Option<String>,
143 pub pinned_reasoning_policy: Option<String>,
149 pub workspace_policy: WorkspacePolicy,
152 pub workspace_anchor: Option<WorkspaceAnchor>,
158 pub scratchpad: Option<VmValue>,
161 pub scratchpad_version: u64,
162 pub transcript_budget_policy: SessionTranscriptBudgetPolicy,
163 pub last_transcript_budget_action: Option<serde_json::Value>,
164 pub live_clients: BTreeMap<String, LiveSessionClient>,
165 pub live_controller_id: Option<String>,
166 pub completed_turn_checkpoints: Vec<SessionTurnCheckpoint>,
167 pub redo_stack: Vec<SessionRedoEntry>,
168}
169
170impl SessionState {
171 fn new(id: String) -> Self {
172 let now = Instant::now();
173 let transcript = empty_transcript(&id);
174 Self {
175 id,
176 transcript,
177 subscribers: Vec::new(),
178 created_at: crate::orchestration::now_rfc3339(),
179 last_accessed: now,
180 parent_id: None,
181 child_ids: Vec::new(),
182 branched_at_event_index: None,
183 active_skills: Vec::new(),
184 tool_format: None,
185 system_prompt: None,
186 pinned_model: None,
187 pinned_reasoning_policy: None,
188 workspace_policy: WorkspacePolicy::default(),
189 workspace_anchor: None,
190 scratchpad: None,
191 scratchpad_version: 0,
192 transcript_budget_policy: default_transcript_budget_policy(),
193 last_transcript_budget_action: None,
194 live_clients: BTreeMap::new(),
195 live_controller_id: None,
196 completed_turn_checkpoints: Vec::new(),
197 redo_stack: Vec::new(),
198 }
199 }
200}
201
202#[derive(Clone, Debug, PartialEq, Eq)]
203pub enum LiveClientMode {
204 Observer,
205 Controller,
206}
207
208impl LiveClientMode {
209 fn as_str(&self) -> &'static str {
210 match self {
211 Self::Observer => "observer",
212 Self::Controller => "controller",
213 }
214 }
215}
216
217#[derive(Clone, Debug, PartialEq, Eq)]
218pub struct LiveSessionClient {
219 pub client_id: String,
220 pub mode: LiveClientMode,
221 pub attached_at: String,
222 pub last_seen_at: String,
223 pub prompt_injection: bool,
224 pub permission_routing: bool,
225 pub metadata: serde_json::Value,
226}
227
228#[derive(Clone, Debug, PartialEq, Eq)]
229pub struct AttachLiveClient {
230 pub client_id: String,
231 pub mode: LiveClientMode,
232 pub takeover: bool,
233 pub prompt_injection: bool,
234 pub permission_routing: bool,
235 pub metadata: serde_json::Value,
236}
237
238#[derive(Clone, Debug, PartialEq, Eq)]
239pub struct LiveClientChange {
240 pub client: Option<LiveSessionClient>,
241 pub previous_controller_id: Option<String>,
242 pub active_controller_id: Option<String>,
243 pub clients: Vec<LiveSessionClient>,
244}
245
246#[derive(Clone, Debug, PartialEq, Eq)]
247pub struct SessionAncestry {
248 pub parent_id: Option<String>,
249 pub child_ids: Vec<String>,
250 pub root_id: String,
251}
252
253#[derive(Clone, Debug, PartialEq, Eq)]
254pub struct SessionTruncateResult {
255 pub kept_turn_count: usize,
256 pub removed_turn_count: usize,
257 pub new_tip_turn_id: Option<String>,
258}
259
260#[derive(Clone, Debug, PartialEq, Eq)]
261pub struct SessionCheckpointSummary {
262 pub checkpoint_id: String,
263 pub before_message_count: usize,
264 pub after_message_count: usize,
265 pub fs_snapshot_ids: Vec<String>,
266}
267
268#[derive(Clone, Debug)]
269pub struct SessionTurnCheckpoint {
270 pub checkpoint_id: String,
271 pub completed_at: String,
272 pub before_transcript: VmValue,
273 pub after_transcript: VmValue,
274 pub before_message_count: usize,
275 pub after_message_count: usize,
276 pub fs_snapshot_ids: Vec<String>,
277}
278
279#[derive(Clone, Debug)]
280pub struct SessionRedoEntry {
281 pub checkpoint: SessionTurnCheckpoint,
282 pub redo_fs_snapshot_ids: Vec<String>,
283}
284
285#[derive(Clone, Debug, PartialEq, Eq)]
286pub enum SessionCheckpointError {
287 UnknownSession,
288 NoCheckpoint,
289 NoRedo,
290}
291
292#[derive(Clone, Debug, PartialEq, Eq)]
293pub struct SessionCheckpointOutcome {
294 pub status: &'static str,
295 pub checkpoint: SessionCheckpointSummary,
296 pub redo_fs_snapshot_ids: Vec<String>,
297}
298
299#[derive(Clone, Debug, PartialEq, Eq)]
300pub struct ReminderInjectionReport {
301 pub reminder_id: String,
302 pub deduped_count: usize,
303}
304
305thread_local! {
306 static SESSIONS: RefCell<HashMap<String, SessionState>> = RefCell::new(HashMap::new());
307 static SESSION_CAP: Cell<usize> = const { Cell::new(DEFAULT_SESSION_CAP) };
308 static DEFAULT_TRANSCRIPT_BUDGET_POLICY: RefCell<SessionTranscriptBudgetPolicy> =
309 RefCell::new(SessionTranscriptBudgetPolicy::default());
310 static CURRENT_SESSION_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
311 static CURRENT_TOOL_CALL_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
312}
313
314tokio::task_local! {
315 static CURRENT_TOOL_CALL_TASK: String;
316}
317
318pub struct CurrentSessionGuard {
319 active: bool,
320}
321
322impl Drop for CurrentSessionGuard {
323 fn drop(&mut self) {
324 if self.active {
325 pop_current_session();
326 }
327 }
328}
329
330pub struct CurrentToolCallGuard {
336 active: bool,
337}
338
339impl Drop for CurrentToolCallGuard {
340 fn drop(&mut self) {
341 if self.active {
342 pop_current_tool_call();
343 }
344 }
345}
346
347pub fn set_session_cap(cap: usize) {
350 SESSION_CAP.with(|c| c.set(cap.max(1)));
351}
352
353pub fn session_cap() -> usize {
354 SESSION_CAP.with(|c| c.get())
355}
356
357pub fn set_default_transcript_budget_policy(policy: SessionTranscriptBudgetPolicy) {
358 DEFAULT_TRANSCRIPT_BUDGET_POLICY.with(|cell| {
359 *cell.borrow_mut() = policy.normalized();
360 });
361}
362
363pub fn reset_default_transcript_budget_policy() {
364 set_default_transcript_budget_policy(SessionTranscriptBudgetPolicy::default());
365}
366
367pub fn default_transcript_budget_policy() -> SessionTranscriptBudgetPolicy {
368 DEFAULT_TRANSCRIPT_BUDGET_POLICY.with(|cell| cell.borrow().clone())
369}
370
371pub fn transcript_budget_policy(id: &str) -> Option<SessionTranscriptBudgetPolicy> {
372 SESSIONS.with(|s| {
373 s.borrow()
374 .get(id)
375 .map(|state| state.transcript_budget_policy.clone())
376 })
377}
378
379pub fn set_transcript_budget_policy(
380 id: &str,
381 policy: SessionTranscriptBudgetPolicy,
382) -> Result<(), String> {
383 SESSIONS.with(|s| {
384 let mut map = s.borrow_mut();
385 let Some(state) = map.get_mut(id) else {
386 return Err(format!("agent session '{id}' does not exist"));
387 };
388 let previous = state.transcript_budget_policy.clone();
389 let previous_action = state.last_transcript_budget_action.clone();
390 state.transcript_budget_policy = policy.normalized();
391 let candidate = state.transcript.clone();
392 if let Err(error) = apply_transcript_with_budget(state, candidate, "policy_update") {
393 state.transcript_budget_policy = previous;
394 state.last_transcript_budget_action = previous_action;
395 return Err(error);
396 }
397 state.last_accessed = Instant::now();
398 Ok(())
399 })
400}
401
402pub fn reset_session_store() {
404 SESSIONS.with(|s| s.borrow_mut().clear());
405 CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().clear());
406 CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow_mut().clear());
407 reset_default_transcript_budget_policy();
408}
409
410pub(crate) fn push_current_session(id: String) {
411 if id.is_empty() {
412 return;
413 }
414 CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().push(id));
415}
416
417pub(crate) fn pop_current_session() {
418 CURRENT_SESSION_STACK.with(|stack| {
419 let _ = stack.borrow_mut().pop();
420 });
421}
422
423pub fn current_session_id() -> Option<String> {
424 CURRENT_SESSION_STACK.with(|stack| stack.borrow().last().cloned())
425}
426
427pub fn enter_current_session(id: impl Into<String>) -> CurrentSessionGuard {
428 let id = id.into();
429 if id.trim().is_empty() {
430 return CurrentSessionGuard { active: false };
431 }
432 push_current_session(id);
433 CurrentSessionGuard { active: true }
434}
435
436fn push_current_tool_call(id: String) {
437 if id.is_empty() {
438 return;
439 }
440 CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow_mut().push(id));
441}
442
443fn pop_current_tool_call() {
444 CURRENT_TOOL_CALL_STACK.with(|stack| {
445 let _ = stack.borrow_mut().pop();
446 });
447}
448
449pub fn current_tool_call_id() -> Option<String> {
454 if let Ok(id) = CURRENT_TOOL_CALL_TASK.try_with(Clone::clone) {
455 if !id.trim().is_empty() {
456 return Some(id);
457 }
458 }
459 CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow().last().cloned())
460}
461
462pub async fn scope_current_tool_call<F, T>(id: impl Into<String>, future: F) -> T
468where
469 F: Future<Output = T>,
470{
471 let id = id.into();
472 if id.trim().is_empty() {
473 future.await
474 } else {
475 CURRENT_TOOL_CALL_TASK.scope(id, future).await
476 }
477}
478
479pub fn enter_current_tool_call(id: impl Into<String>) -> CurrentToolCallGuard {
481 let id = id.into();
482 if id.trim().is_empty() {
483 return CurrentToolCallGuard { active: false };
484 }
485 push_current_tool_call(id);
486 CurrentToolCallGuard { active: true }
487}
488
489pub fn exists(id: &str) -> bool {
490 SESSIONS.with(|s| s.borrow().contains_key(id))
491}
492
493pub fn length(id: &str) -> Option<usize> {
494 SESSIONS.with(|s| {
495 s.borrow().get(id).map(|state| {
496 state
497 .transcript
498 .as_dict()
499 .and_then(|d| d.get("messages"))
500 .and_then(|v| match v {
501 VmValue::List(list) => Some(list.len()),
502 _ => None,
503 })
504 .unwrap_or(0)
505 })
506 })
507}
508
509pub fn scratchpad(id: &str) -> Option<VmValue> {
510 SESSIONS.with(|s| {
511 s.borrow()
512 .get(id)
513 .and_then(|state| state.scratchpad.clone())
514 })
515}
516
517pub fn scratchpad_version(id: &str) -> Option<u64> {
518 SESSIONS.with(|s| s.borrow().get(id).map(|state| state.scratchpad_version))
519}
520
521pub fn set_scratchpad(
522 id: &str,
523 scratchpad: VmValue,
524 source: impl Into<String>,
525 reason: Option<String>,
526 metadata: serde_json::Value,
527) -> Result<u64, String> {
528 validate_scratchpad_value(&scratchpad)?;
529 SESSIONS.with(|s| {
530 let mut map = s.borrow_mut();
531 let Some(state) = map.get_mut(id) else {
532 return Err(format!("agent session '{id}' does not exist"));
533 };
534 let version = state.scratchpad_version.saturating_add(1);
535 let event = scratchpad_transcript_event(
536 "set",
537 version,
538 Some(&scratchpad),
539 source.into(),
540 reason,
541 metadata,
542 );
543 append_event_to_state(state, event, "set_scratchpad")?;
544 state.scratchpad = Some(scratchpad);
545 state.scratchpad_version = version;
546 state.last_accessed = Instant::now();
547 Ok(version)
548 })
549}
550
551pub fn clear_scratchpad(
552 id: &str,
553 source: impl Into<String>,
554 reason: Option<String>,
555 metadata: serde_json::Value,
556) -> Result<u64, String> {
557 SESSIONS.with(|s| {
558 let mut map = s.borrow_mut();
559 let Some(state) = map.get_mut(id) else {
560 return Err(format!("agent session '{id}' does not exist"));
561 };
562 let version = state.scratchpad_version.saturating_add(1);
563 let event =
564 scratchpad_transcript_event("clear", version, None, source.into(), reason, metadata);
565 append_event_to_state(state, event, "clear_scratchpad")?;
566 state.scratchpad = None;
567 state.scratchpad_version = version;
568 state.last_accessed = Instant::now();
569 Ok(version)
570 })
571}
572
573fn validate_scratchpad_value(value: &VmValue) -> Result<(), String> {
574 if !matches!(value, VmValue::Dict(_)) {
575 return Err("agent session scratchpad must be a dict".to_string());
576 }
577 let json = crate::llm::helpers::vm_value_to_json(value);
578 let approx_bytes = serde_json::to_vec(&json)
579 .map(|bytes| bytes.len())
580 .unwrap_or(usize::MAX);
581 if approx_bytes > MAX_SCRATCHPAD_BYTES {
582 return Err(format!(
583 "agent session scratchpad is {approx_bytes} bytes; max is {MAX_SCRATCHPAD_BYTES}"
584 ));
585 }
586 Ok(())
587}
588
589fn scratchpad_transcript_event(
590 action: &str,
591 version: u64,
592 scratchpad: Option<&VmValue>,
593 source: String,
594 reason: Option<String>,
595 metadata: serde_json::Value,
596) -> VmValue {
597 let scratchpad_json = scratchpad.map(crate::llm::helpers::vm_value_to_json);
598 let approx_bytes = scratchpad_json
599 .as_ref()
600 .and_then(|value| serde_json::to_vec(value).ok().map(|bytes| bytes.len()))
601 .unwrap_or(0);
602 let event_metadata = serde_json::json!({
603 "action": action,
604 "version": version,
605 "source": normalize_scratchpad_source(source),
606 "reason": reason.unwrap_or_default(),
607 "approx_bytes": approx_bytes,
608 "counts": scratchpad_json
609 .as_ref()
610 .map(scratchpad_counts_json)
611 .unwrap_or_else(|| serde_json::json!({})),
612 "metadata": metadata,
613 });
614 let content = format!("Agent scratchpad {action}");
615 crate::llm::helpers::transcript_event(
616 "agent_scratchpad",
617 "system",
618 "internal",
619 &content,
620 Some(event_metadata),
621 )
622}
623
624fn normalize_scratchpad_source(source: String) -> String {
625 let trimmed = source.trim();
626 if trimmed.is_empty() {
627 "harn.agent_scratchpad".to_string()
628 } else {
629 trimmed.to_string()
630 }
631}
632
633fn scratchpad_counts_json(value: &serde_json::Value) -> serde_json::Value {
634 serde_json::json!({
635 "goals": scratchpad_array_len(value, "goals"),
636 "open_items": scratchpad_array_len(value, "open_items"),
637 "facts": scratchpad_array_len(value, "facts"),
638 "refs": scratchpad_array_len(value, "refs"),
639 })
640}
641
642fn scratchpad_array_len(value: &serde_json::Value, key: &str) -> usize {
643 value
644 .get(key)
645 .and_then(serde_json::Value::as_array)
646 .map(Vec::len)
647 .unwrap_or(0)
648}
649
650pub fn snapshot(id: &str) -> Option<VmValue> {
651 SESSIONS.with(|s| s.borrow().get(id).map(session_snapshot))
652}
653
654pub fn transcript(id: &str) -> Option<VmValue> {
656 SESSIONS.with(|s| {
657 s.borrow()
658 .get(id)
659 .map(|state| transcript_with_session_metadata(state.transcript.clone(), state))
660 })
661}
662
663pub fn open_or_create(id: Option<String>) -> String {
672 let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
673 let parent_session = current_session_id();
674 let mut was_new = false;
675 SESSIONS.with(|s| {
676 let mut map = s.borrow_mut();
677 if let Some(state) = map.get_mut(&resolved) {
678 state.last_accessed = Instant::now();
679 return;
680 }
681 was_new = true;
682 let cap = SESSION_CAP.with(|c| c.get());
683 if map.len() >= cap {
684 if let Some(victim) = map
685 .iter()
686 .min_by_key(|(_, state)| state.last_accessed)
687 .map(|(id, _)| id.clone())
688 {
689 map.remove(&victim);
690 }
691 }
692 map.insert(resolved.clone(), SessionState::new(resolved.clone()));
693 });
694 if was_new {
695 if let Some(parent) = parent_session.as_deref() {
696 crate::agent_events::mirror_session_sinks(parent, &resolved);
697 }
698 try_register_event_log(&resolved);
699 }
700 resolved
701}
702
703pub fn open_child_session(parent_id: &str, id: Option<String>) -> String {
704 let resolved = open_or_create(id);
705 link_child_session(parent_id, &resolved);
706 resolved
707}
708
709pub fn link_child_session(parent_id: &str, child_id: &str) {
710 link_child_session_with_branch(parent_id, child_id, None);
711}
712
713pub fn link_child_session_with_branch(
714 parent_id: &str,
715 child_id: &str,
716 branched_at_event_index: Option<usize>,
717) {
718 if parent_id == child_id {
719 return;
720 }
721 open_or_create(Some(parent_id.to_string()));
722 open_or_create(Some(child_id.to_string()));
723 SESSIONS.with(|s| {
724 let mut map = s.borrow_mut();
725 update_lineage(&mut map, parent_id, child_id, branched_at_event_index);
726 });
727}
728
729pub fn parent_id(id: &str) -> Option<String> {
730 SESSIONS.with(|s| s.borrow().get(id).and_then(|state| state.parent_id.clone()))
731}
732
733pub fn child_ids(id: &str) -> Vec<String> {
734 SESSIONS.with(|s| {
735 s.borrow()
736 .get(id)
737 .map(|state| state.child_ids.clone())
738 .unwrap_or_default()
739 })
740}
741
742pub fn ancestry(id: &str) -> Option<SessionAncestry> {
743 SESSIONS.with(|s| {
744 let map = s.borrow();
745 let state = map.get(id)?;
746 let mut root_id = state.id.clone();
747 let mut cursor = state.parent_id.clone();
748 let mut seen = HashSet::from([state.id.clone()]);
749 while let Some(parent_id) = cursor {
750 if !seen.insert(parent_id.clone()) {
751 break;
752 }
753 root_id = parent_id.clone();
754 cursor = map
755 .get(&parent_id)
756 .and_then(|parent| parent.parent_id.clone());
757 }
758 Some(SessionAncestry {
759 parent_id: state.parent_id.clone(),
760 child_ids: state.child_ids.clone(),
761 root_id,
762 })
763 })
764}
765
766pub fn live_clients(id: &str) -> Option<Vec<LiveSessionClient>> {
767 SESSIONS.with(|s| {
768 s.borrow()
769 .get(id)
770 .map(|state| state.live_clients.values().cloned().collect())
771 })
772}
773
774pub fn attach_live_client(id: &str, request: AttachLiveClient) -> Result<LiveClientChange, String> {
775 SESSIONS.with(|s| {
776 let mut map = s.borrow_mut();
777 let Some(state) = map.get_mut(id) else {
778 return Err(format!("agent session '{id}' does not exist"));
779 };
780 let client_id = validate_live_client_id(request.client_id)?;
781 let now = crate::orchestration::now_rfc3339();
782 let previous_clients = state.live_clients.clone();
783 let previous_controller_id = state.live_controller_id.clone();
784
785 if request.mode == LiveClientMode::Controller {
786 let conflicting_controller = previous_controller_id
787 .as_ref()
788 .filter(|controller_id| *controller_id != &client_id)
789 .filter(|controller_id| state.live_clients.contains_key(*controller_id));
790 if let Some(previous) = conflicting_controller {
791 if !request.takeover {
792 return Err(format!("live session already has controller '{previous}'"));
793 }
794 if let Some(previous_client) = state.live_clients.get_mut(previous) {
795 previous_client.mode = LiveClientMode::Observer;
796 previous_client.prompt_injection = false;
797 previous_client.permission_routing = false;
798 previous_client.last_seen_at = now.clone();
799 }
800 }
801 state.live_controller_id = Some(client_id.clone());
802 } else if state.live_controller_id.as_deref() == Some(client_id.as_str()) {
803 state.live_controller_id = None;
804 }
805
806 let attached_at = state
807 .live_clients
808 .get(&client_id)
809 .map(|client| client.attached_at.clone())
810 .unwrap_or_else(|| now.clone());
811 let client = LiveSessionClient {
812 client_id: client_id.clone(),
813 mode: request.mode,
814 attached_at,
815 last_seen_at: now,
816 prompt_injection: request.prompt_injection,
817 permission_routing: request.permission_routing,
818 metadata: request.metadata,
819 };
820 state.live_clients.insert(client_id, client.clone());
821 state.last_accessed = Instant::now();
822 let active_controller_id = state.live_controller_id.clone();
823 append_live_client_event(
824 state,
825 "attached",
826 Some(&client),
827 previous_controller_id.as_deref(),
828 active_controller_id.as_deref(),
829 serde_json::Value::Null,
830 )
831 .inspect_err(|_error| {
832 state.live_clients = previous_clients;
833 state.live_controller_id = previous_controller_id.clone();
834 })?;
835 Ok(live_client_change(
836 Some(client),
837 previous_controller_id,
838 state,
839 ))
840 })
841}
842
843pub fn takeover_live_client(
844 id: &str,
845 client_id: impl Into<String>,
846 metadata: serde_json::Value,
847) -> Result<LiveClientChange, String> {
848 attach_live_client(
849 id,
850 AttachLiveClient {
851 client_id: client_id.into(),
852 mode: LiveClientMode::Controller,
853 takeover: true,
854 prompt_injection: true,
855 permission_routing: true,
856 metadata,
857 },
858 )
859}
860
861pub fn detach_live_client(
862 id: &str,
863 client_id: impl Into<String>,
864 reason: Option<String>,
865 metadata: serde_json::Value,
866) -> Result<LiveClientChange, String> {
867 SESSIONS.with(|s| {
868 let mut map = s.borrow_mut();
869 let Some(state) = map.get_mut(id) else {
870 return Err(format!("agent session '{id}' does not exist"));
871 };
872 let client_id = validate_live_client_id(client_id.into())?;
873 let previous_clients = state.live_clients.clone();
874 let previous_controller_id = state.live_controller_id.clone();
875 let Some(mut client) = state.live_clients.remove(&client_id) else {
876 return Err(format!("live client '{client_id}' is not attached"));
877 };
878 client.last_seen_at = crate::orchestration::now_rfc3339();
879 if state.live_controller_id.as_deref() == Some(client_id.as_str()) {
880 state.live_controller_id = None;
881 }
882 state.last_accessed = Instant::now();
883 let active_controller_id = state.live_controller_id.clone();
884 append_live_client_event(
885 state,
886 "detached",
887 Some(&client),
888 previous_controller_id.as_deref(),
889 active_controller_id.as_deref(),
890 serde_json::json!({
891 "reason": reason.unwrap_or_else(|| "client_detached".to_string()),
892 "metadata": metadata,
893 }),
894 )
895 .inspect_err(|_error| {
896 state.live_clients = previous_clients;
897 state.live_controller_id = previous_controller_id.clone();
898 })?;
899 Ok(live_client_change(None, previous_controller_id, state))
900 })
901}
902
903pub fn heartbeat_live_client(
904 id: &str,
905 client_id: impl Into<String>,
906 metadata: serde_json::Value,
907) -> Result<LiveClientChange, String> {
908 SESSIONS.with(|s| {
909 let mut map = s.borrow_mut();
910 let Some(state) = map.get_mut(id) else {
911 return Err(format!("agent session '{id}' does not exist"));
912 };
913 let client_id = validate_live_client_id(client_id.into())?;
914 let previous_clients = state.live_clients.clone();
915 let previous_controller_id = state.live_controller_id.clone();
916 let Some(client) = state.live_clients.get_mut(&client_id) else {
917 return Err(format!("live client '{client_id}' is not attached"));
918 };
919 client.last_seen_at = crate::orchestration::now_rfc3339();
920 if !metadata.is_null() {
921 client.metadata = metadata.clone();
922 }
923 let client = client.clone();
924 state.last_accessed = Instant::now();
925 let active_controller_id = state.live_controller_id.clone();
926 append_live_client_event(
927 state,
928 "heartbeat",
929 Some(&client),
930 previous_controller_id.as_deref(),
931 active_controller_id.as_deref(),
932 serde_json::json!({ "metadata": metadata }),
933 )
934 .inspect_err(|_error| {
935 state.live_clients = previous_clients;
936 state.live_controller_id = previous_controller_id.clone();
937 })?;
938 Ok(live_client_change(
939 Some(client),
940 previous_controller_id,
941 state,
942 ))
943 })
944}
945
946pub fn inject_prompt_from_live_client(
947 id: &str,
948 client_id: impl Into<String>,
949 content: VmValue,
950 metadata: serde_json::Value,
951) -> Result<(), String> {
952 let client_id = validate_live_client_id(client_id.into())?;
953 ensure_live_controller(id, &client_id, LiveControllerCapability::PromptInjection)?;
954 let mut message = BTreeMap::new();
955 message.insert(
956 "role".to_string(),
957 VmValue::String(std::sync::Arc::from("user")),
958 );
959 message.insert("content".to_string(), content);
960 message.insert(
961 "metadata".to_string(),
962 crate::stdlib::json_to_vm_value(&serde_json::json!({
963 "live_session": {
964 "client_id": client_id,
965 "mode": "controller",
966 "source": "live_session_attach",
967 "metadata": metadata,
968 }
969 })),
970 );
971 inject_message(id, VmValue::Dict(std::sync::Arc::new(message)))
972}
973
974pub fn route_live_permission_request(
975 id: &str,
976 client_id: impl Into<String>,
977 request: serde_json::Value,
978 metadata: serde_json::Value,
979) -> Result<serde_json::Value, String> {
980 let client_id = validate_live_client_id(client_id.into())?;
981 let client =
982 ensure_live_controller(id, &client_id, LiveControllerCapability::PermissionRouting)?;
983 let request_id = request
984 .get("id")
985 .or_else(|| request.get("request_id"))
986 .and_then(serde_json::Value::as_str)
987 .filter(|id| !id.trim().is_empty())
988 .unwrap_or("permission_request");
989 let event_metadata = serde_json::json!({
990 "action": "permission_routed",
991 "client": live_client_json(&client),
992 "request_id": request_id,
993 "request": request,
994 "metadata": metadata,
995 });
996 let event = crate::llm::helpers::transcript_event(
997 LIVE_CLIENT_PERMISSION_EVENT_KIND,
998 "system",
999 "internal",
1000 "Live session permission request routed",
1001 Some(event_metadata.clone()),
1002 );
1003 append_event(id, event)?;
1004 Ok(event_metadata)
1005}
1006
1007enum LiveControllerCapability {
1008 PromptInjection,
1009 PermissionRouting,
1010}
1011
1012fn ensure_live_controller(
1013 id: &str,
1014 client_id: &str,
1015 capability: LiveControllerCapability,
1016) -> Result<LiveSessionClient, String> {
1017 SESSIONS.with(|s| {
1018 let map = s.borrow();
1019 let Some(state) = map.get(id) else {
1020 return Err(format!("agent session '{id}' does not exist"));
1021 };
1022 if state.live_controller_id.as_deref() != Some(client_id) {
1023 return Err(format!(
1024 "live client '{client_id}' is not the active controller"
1025 ));
1026 }
1027 let Some(client) = state.live_clients.get(client_id) else {
1028 return Err(format!("live client '{client_id}' is not attached"));
1029 };
1030 match capability {
1031 LiveControllerCapability::PromptInjection if !client.prompt_injection => Err(format!(
1032 "live client '{client_id}' cannot inject prompts for this session"
1033 )),
1034 LiveControllerCapability::PermissionRouting if !client.permission_routing => Err(
1035 format!("live client '{client_id}' cannot route permissions for this session"),
1036 ),
1037 _ => Ok(client.clone()),
1038 }
1039 })
1040}
1041
1042fn append_live_client_event(
1043 state: &mut SessionState,
1044 action: &str,
1045 client: Option<&LiveSessionClient>,
1046 previous_controller_id: Option<&str>,
1047 active_controller_id: Option<&str>,
1048 extra: serde_json::Value,
1049) -> Result<(), String> {
1050 let metadata = serde_json::json!({
1051 "action": action,
1052 "session_id": state.id,
1053 "client": client.map(live_client_json),
1054 "previous_controller_id": previous_controller_id,
1055 "active_controller_id": active_controller_id,
1056 "clients": state
1057 .live_clients
1058 .values()
1059 .map(live_client_json)
1060 .collect::<Vec<_>>(),
1061 "extra": extra,
1062 });
1063 let event = crate::llm::helpers::transcript_event(
1064 LIVE_CLIENT_EVENT_KIND,
1065 "system",
1066 "internal",
1067 "Live session client lifecycle changed",
1068 Some(metadata),
1069 );
1070 append_event_to_state(state, event, "live_client")
1071}
1072
1073fn live_client_change(
1074 client: Option<LiveSessionClient>,
1075 previous_controller_id: Option<String>,
1076 state: &SessionState,
1077) -> LiveClientChange {
1078 LiveClientChange {
1079 client,
1080 previous_controller_id,
1081 active_controller_id: state.live_controller_id.clone(),
1082 clients: state.live_clients.values().cloned().collect(),
1083 }
1084}
1085
1086fn validate_live_client_id(id: impl Into<String>) -> Result<String, String> {
1087 let id = id.into();
1088 let trimmed = id.trim();
1089 if trimmed.is_empty() {
1090 return Err("live client id cannot be empty".to_string());
1091 }
1092 Ok(trimmed.to_string())
1093}
1094
1095pub fn live_client_json(client: &LiveSessionClient) -> serde_json::Value {
1096 serde_json::json!({
1097 "client_id": client.client_id,
1098 "mode": client.mode.as_str(),
1099 "attached_at": client.attached_at,
1100 "last_seen_at": client.last_seen_at,
1101 "prompt_injection": client.prompt_injection,
1102 "permission_routing": client.permission_routing,
1103 "metadata": client.metadata,
1104 })
1105}
1106
1107pub fn live_client_change_json(change: &LiveClientChange) -> serde_json::Value {
1108 serde_json::json!({
1109 "client": change.client.as_ref().map(live_client_json),
1110 "previous_controller_id": change.previous_controller_id,
1111 "active_controller_id": change.active_controller_id,
1112 "clients": change
1113 .clients
1114 .iter()
1115 .map(live_client_json)
1116 .collect::<Vec<_>>(),
1117 })
1118}
1119
1120fn try_register_event_log(session_id: &str) {
1124 if let Some(log) = crate::event_log::active_event_log() {
1125 crate::agent_events::register_sink(
1126 session_id,
1127 crate::agent_events::EventLogSink::new(log, session_id),
1128 );
1129 return;
1130 }
1131 let Ok(dir) = std::env::var("HARN_EVENT_LOG_DIR") else {
1132 return;
1133 };
1134 if dir.is_empty() {
1135 return;
1136 }
1137 let path = std::path::PathBuf::from(dir).join(format!("event_log-{session_id}.jsonl"));
1138 if let Ok(sink) = crate::agent_events::JsonlEventSink::open(&path) {
1139 crate::agent_events::register_sink(session_id, sink);
1140 }
1141}
1142
1143pub fn register_event_log_sink(session_id: &str) {
1144 try_register_event_log(session_id);
1145}
1146
1147pub fn close(id: &str) {
1148 SESSIONS.with(|s| {
1149 s.borrow_mut().remove(id);
1150 });
1151 crate::orchestration::agent_inbox::clear_session(id);
1155 crate::agent_events::clear_session_sinks(id);
1156}
1157
1158pub fn close_with_status(
1159 id: &str,
1160 reason: impl Into<String>,
1161 status: impl Into<String>,
1162 metadata: serde_json::Value,
1163) -> bool {
1164 if !exists(id) {
1165 return false;
1166 }
1167 let reason = reason.into();
1168 let status = status.into();
1169 let event_metadata = serde_json::json!({
1170 "reason": reason,
1171 "status": status,
1172 "metadata": metadata,
1173 });
1174 let transcript_event = crate::llm::helpers::transcript_event(
1175 "agent_session_closed",
1176 "system",
1177 "internal",
1178 "Agent session closed",
1179 Some(event_metadata),
1180 );
1181 let _ = append_event(id, transcript_event);
1182 crate::llm::emit_live_agent_event_sync(&crate::agent_events::AgentEvent::SessionClosed {
1183 session_id: id.to_string(),
1184 reason,
1185 status,
1186 metadata,
1187 });
1188 close(id);
1189 true
1190}
1191
1192pub fn reset_transcript(id: &str) -> bool {
1193 SESSIONS.with(|s| {
1194 let mut map = s.borrow_mut();
1195 let Some(state) = map.get_mut(id) else {
1196 return false;
1197 };
1198 state.transcript = empty_transcript(id);
1199 state.tool_format = None;
1200 state.system_prompt = None;
1201 state.scratchpad = None;
1202 state.scratchpad_version = 0;
1203 state.last_transcript_budget_action = None;
1204 state.completed_turn_checkpoints.clear();
1205 state.redo_stack.clear();
1206 state.last_accessed = Instant::now();
1207 true
1208 })
1209}
1210
1211pub fn fork(src_id: &str, dst_id: Option<String>) -> Option<String> {
1218 let (
1219 src_transcript,
1220 src_tool_format,
1221 src_system_prompt,
1222 src_pinned_model,
1223 src_pinned_reasoning_policy,
1224 src_workspace_anchor,
1225 src_workspace_policy,
1226 src_scratchpad,
1227 src_scratchpad_version,
1228 src_transcript_budget_policy,
1229 src_last_transcript_budget_action,
1230 dst,
1231 ) = SESSIONS.with(|s| {
1232 let mut map = s.borrow_mut();
1233 let src = map.get_mut(src_id)?;
1234 src.last_accessed = Instant::now();
1235 let dst = dst_id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
1236 let forked_transcript = clone_transcript_with_id(&src.transcript, &dst);
1237 Some((
1238 forked_transcript,
1239 src.tool_format.clone(),
1240 src.system_prompt.clone(),
1241 src.pinned_model.clone(),
1242 src.pinned_reasoning_policy.clone(),
1243 src.workspace_anchor.clone(),
1244 src.workspace_policy.clone(),
1245 src.scratchpad.clone(),
1246 src.scratchpad_version,
1247 src.transcript_budget_policy.clone(),
1248 src.last_transcript_budget_action.clone(),
1249 dst,
1250 ))
1251 })?;
1252 open_or_create(Some(dst.clone()));
1254 SESSIONS.with(|s| {
1255 let mut map = s.borrow_mut();
1256 if let Some(state) = map.get_mut(&dst) {
1257 state.transcript = src_transcript;
1258 state.tool_format = src_tool_format;
1259 state.system_prompt = src_system_prompt;
1260 state.pinned_model = src_pinned_model;
1261 state.pinned_reasoning_policy = src_pinned_reasoning_policy;
1262 state.workspace_anchor = src_workspace_anchor;
1263 state.workspace_policy = src_workspace_policy;
1264 state.scratchpad = src_scratchpad;
1265 state.scratchpad_version = src_scratchpad_version;
1266 state.transcript_budget_policy = src_transcript_budget_policy;
1267 state.last_transcript_budget_action = src_last_transcript_budget_action;
1268 state.last_accessed = Instant::now();
1269 }
1270 update_lineage(&mut map, src_id, &dst, None);
1271 });
1272 let budget_ok = SESSIONS.with(|s| {
1273 let mut map = s.borrow_mut();
1274 let Some(state) = map.get_mut(&dst) else {
1275 return false;
1276 };
1277 let candidate = state.transcript.clone();
1278 apply_transcript_with_budget(state, candidate, "fork").is_ok()
1279 });
1280 if !budget_ok {
1281 close(&dst);
1282 return None;
1283 }
1284 if exists(&dst) {
1288 Some(dst)
1289 } else {
1290 None
1291 }
1292}
1293
1294pub fn fork_at(src_id: &str, keep_first: usize, dst_id: Option<String>) -> Option<String> {
1305 let branched_at_event_index = SESSIONS.with(|s| {
1306 let map = s.borrow();
1307 let src = map.get(src_id)?;
1308 Some(branch_event_index(&src.transcript, keep_first))
1309 })?;
1310 let new_id = fork(src_id, dst_id)?;
1311 link_child_session_with_branch(src_id, &new_id, Some(branched_at_event_index));
1312 let _ = truncate(&new_id, keep_first);
1313 Some(new_id)
1314}
1315
1316pub fn truncate(id: &str, keep_first: usize) -> Option<SessionTruncateResult> {
1320 SESSIONS.with(|s| {
1321 let mut map = s.borrow_mut();
1322 let state = map.get_mut(id)?;
1323 let result = truncate_state(state, keep_first)?;
1324 Some(result)
1325 })
1326}
1327
1328fn truncate_state(state: &mut SessionState, keep_first: usize) -> Option<SessionTruncateResult> {
1329 let dict = state
1330 .transcript
1331 .as_dict()
1332 .cloned()
1333 .unwrap_or_else(BTreeMap::new);
1334 let messages: Vec<VmValue> = match dict.get("messages") {
1335 Some(VmValue::List(list)) => list.iter().cloned().collect(),
1336 _ => Vec::new(),
1337 };
1338 let existing_events = match dict.get("events") {
1339 Some(VmValue::List(list)) => Some(list.iter().cloned().collect::<Vec<_>>()),
1340 _ => None,
1341 };
1342 let kept_turn_count = keep_first.min(messages.len());
1343 let removed_turn_count = messages.len().saturating_sub(kept_turn_count);
1344 let mut new_tip_turn_id = existing_events
1345 .as_ref()
1346 .map(|events| turn_event_id_for_count(events, kept_turn_count))
1347 .unwrap_or_else(|| {
1348 let events = crate::llm::helpers::transcript_events_from_messages(&messages);
1349 turn_event_id_for_count(&events, kept_turn_count)
1350 });
1351
1352 if removed_turn_count > 0 {
1353 let retained: Vec<VmValue> = messages.into_iter().take(kept_turn_count).collect();
1354 let retained_events = match existing_events {
1355 Some(events) => {
1356 let keep_event_count = event_prefix_len_for_messages(&events, kept_turn_count);
1357 events.into_iter().take(keep_event_count).collect()
1358 }
1359 None => crate::llm::helpers::transcript_events_from_messages(&retained),
1360 };
1361 new_tip_turn_id = turn_event_id_for_count(&retained_events, kept_turn_count);
1362 let mut next = dict;
1363 next.insert(
1364 "events".to_string(),
1365 VmValue::List(std::sync::Arc::new(retained_events)),
1366 );
1367 next.insert(
1368 "messages".to_string(),
1369 VmValue::List(std::sync::Arc::new(retained)),
1370 );
1371 next.remove("summary");
1372 apply_transcript_with_budget(state, VmValue::Dict(std::sync::Arc::new(next)), "truncate")
1373 .ok()?;
1374 }
1375 state.last_accessed = Instant::now();
1376 Some(SessionTruncateResult {
1377 kept_turn_count,
1378 removed_turn_count,
1379 new_tip_turn_id,
1380 })
1381}
1382
1383pub fn pop_last_if_assistant(id: &str) -> Result<bool, String> {
1390 SESSIONS.with(|s| {
1391 let mut map = s.borrow_mut();
1392 let Some(state) = map.get_mut(id) else {
1393 return Err(format!(
1394 "pop_last_if_assistant: unknown session id '{id}'"
1395 ));
1396 };
1397 let messages: Vec<VmValue> = match state.transcript.as_dict() {
1398 Some(dict) => match dict.get("messages") {
1399 Some(VmValue::List(list)) => list.iter().cloned().collect(),
1400 _ => Vec::new(),
1401 },
1402 None => Vec::new(),
1403 };
1404 if messages.is_empty() {
1405 return Ok(false);
1406 }
1407 let trailing_role = messages
1408 .last()
1409 .and_then(|m| m.as_dict())
1410 .and_then(|d| d.get("role"))
1411 .map(|v| v.display())
1412 .unwrap_or_default();
1413 if trailing_role != "assistant" {
1414 return Err(format!(
1415 "pop_last_if_assistant: trailing message role is '{trailing_role}', expected 'assistant'"
1416 ));
1417 }
1418 let keep = messages.len() - 1;
1419 truncate_state(state, keep);
1420 Ok(true)
1421 })
1422}
1423
1424pub fn trim(id: &str, keep_last: usize) -> Option<usize> {
1425 SESSIONS.with(|s| {
1426 let mut map = s.borrow_mut();
1427 let state = map.get_mut(id)?;
1428 let dict = state.transcript.as_dict()?.clone();
1429 let messages: Vec<VmValue> = match dict.get("messages") {
1430 Some(VmValue::List(list)) => list.iter().cloned().collect(),
1431 _ => Vec::new(),
1432 };
1433 let start = messages.len().saturating_sub(keep_last);
1434 let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
1435 let kept = retained.len();
1436 let mut next = dict;
1437 next.insert(
1438 "events".to_string(),
1439 VmValue::List(std::sync::Arc::new(
1440 crate::llm::helpers::transcript_events_from_messages(&retained),
1441 )),
1442 );
1443 next.insert(
1444 "messages".to_string(),
1445 VmValue::List(std::sync::Arc::new(retained)),
1446 );
1447 apply_transcript_with_budget(state, VmValue::Dict(std::sync::Arc::new(next)), "trim")
1448 .ok()?;
1449 state.last_accessed = Instant::now();
1450 Some(kept)
1451 })
1452}
1453
1454pub fn inject_message(id: &str, message: VmValue) -> Result<(), String> {
1457 let Some(msg_dict) = message.as_dict().cloned() else {
1458 return Err("agent_session_inject: message must be a dict".into());
1459 };
1460 let role_ok = matches!(msg_dict.get("role"), Some(VmValue::String(_)));
1461 if !role_ok {
1462 return Err(
1463 "agent_session_inject: message must have a string `role` (user|assistant|tool_result|system)"
1464 .into(),
1465 );
1466 }
1467 SESSIONS.with(|s| {
1468 let mut map = s.borrow_mut();
1469 let Some(state) = map.get_mut(id) else {
1470 return Err(format!("agent_session_inject: unknown session id '{id}'"));
1471 };
1472 let dict = state
1473 .transcript
1474 .as_dict()
1475 .cloned()
1476 .unwrap_or_else(BTreeMap::new);
1477 let mut messages: Vec<VmValue> = match dict.get("messages") {
1478 Some(VmValue::List(list)) => list.iter().cloned().collect(),
1479 _ => Vec::new(),
1480 };
1481 let mut events: Vec<VmValue> = match dict.get("events") {
1482 Some(VmValue::List(list)) => list.iter().cloned().collect(),
1483 _ => crate::llm::helpers::transcript_events_from_messages(&messages),
1484 };
1485 let new_message = VmValue::Dict(std::sync::Arc::new(msg_dict));
1486 let message_index = messages.len();
1487 events.push(crate::llm::helpers::transcript_event_from_message(
1488 &new_message,
1489 ));
1490 messages.push(new_message);
1491 let mut next = dict;
1492 next.insert(
1493 "events".to_string(),
1494 VmValue::List(std::sync::Arc::new(events)),
1495 );
1496 next.insert(
1497 "messages".to_string(),
1498 VmValue::List(std::sync::Arc::new(messages)),
1499 );
1500 let persisted_message = next
1501 .get("messages")
1502 .and_then(|value| match value {
1503 VmValue::List(list) => list.get(message_index).cloned(),
1504 _ => None,
1505 })
1506 .unwrap_or(VmValue::Nil);
1507 apply_transcript_with_budget(
1508 state,
1509 VmValue::Dict(std::sync::Arc::new(next)),
1510 "inject_message",
1511 )?;
1512 emit_identified_user_message_event(id, &persisted_message);
1513 emit_llm_message_event(id, message_index, &persisted_message);
1514 state.last_accessed = Instant::now();
1515 Ok(())
1516 })
1517}
1518
1519fn emit_identified_user_message_event(session_id: &str, message: &VmValue) {
1520 let message_json = crate::llm::helpers::vm_value_to_json(message);
1521 let role = message_json.get("role").and_then(|value| value.as_str());
1522 if role != Some("user") {
1523 return;
1524 }
1525 let Some(message_id) = message_json
1526 .get("messageId")
1527 .or_else(|| message_json.get("message_id"))
1528 .and_then(|value| value.as_str())
1529 .filter(|value| !value.trim().is_empty())
1530 else {
1531 return;
1532 };
1533 let content = message_json
1534 .get("content")
1535 .map(user_message_content_blocks)
1536 .unwrap_or_default();
1537 crate::agent_events::emit_event(&crate::agent_events::AgentEvent::UserMessage {
1538 session_id: session_id.to_string(),
1539 message_id: message_id.to_string(),
1540 content,
1541 });
1542}
1543
1544fn user_message_content_blocks(content: &serde_json::Value) -> Vec<serde_json::Value> {
1545 match content {
1546 serde_json::Value::Array(items) => items.clone(),
1547 serde_json::Value::String(text) => vec![serde_json::json!({
1548 "type": "text",
1549 "text": text,
1550 })],
1551 other => vec![serde_json::json!({
1552 "type": "text",
1553 "text": other.to_string(),
1554 })],
1555 }
1556}
1557
1558#[derive(Clone, Debug, PartialEq, Eq)]
1559struct TranscriptBudgetUsage {
1560 message_count: usize,
1561 event_count: usize,
1562 approx_bytes: Option<usize>,
1563}
1564
1565fn transcript_messages_from_dict(dict: &BTreeMap<String, VmValue>) -> Vec<VmValue> {
1566 match dict.get("messages") {
1567 Some(VmValue::List(list)) => list.iter().cloned().collect(),
1568 _ => Vec::new(),
1569 }
1570}
1571
1572fn transcript_message_count(transcript: &VmValue) -> usize {
1573 transcript
1574 .as_dict()
1575 .map(transcript_messages_from_dict)
1576 .map(|messages| messages.len())
1577 .unwrap_or(0)
1578}
1579
1580fn transcript_events_from_dict(dict: &BTreeMap<String, VmValue>) -> Vec<VmValue> {
1581 match dict.get("events") {
1582 Some(VmValue::List(list)) => list.iter().cloned().collect(),
1583 _ => {
1584 let messages = transcript_messages_from_dict(dict);
1585 crate::llm::helpers::transcript_events_from_messages(&messages)
1586 }
1587 }
1588}
1589
1590fn transcript_usage(transcript: &VmValue, include_bytes: bool) -> TranscriptBudgetUsage {
1591 let Some(dict) = transcript.as_dict() else {
1592 return TranscriptBudgetUsage {
1593 message_count: 0,
1594 event_count: 0,
1595 approx_bytes: include_bytes.then_some(0),
1596 };
1597 };
1598 let approx_bytes = if include_bytes {
1599 serde_json::to_vec(&crate::llm::helpers::vm_value_to_json(transcript))
1600 .map(|bytes| bytes.len())
1601 .ok()
1602 .or(Some(usize::MAX))
1603 } else {
1604 None
1605 };
1606 TranscriptBudgetUsage {
1607 message_count: transcript_messages_from_dict(dict).len(),
1608 event_count: transcript_events_from_dict(dict).len(),
1609 approx_bytes,
1610 }
1611}
1612
1613fn transcript_budget_exceeded_reason(
1614 usage: &TranscriptBudgetUsage,
1615 policy: &SessionTranscriptBudgetPolicy,
1616) -> Option<&'static str> {
1617 if usage.message_count > policy.max_messages {
1618 return Some("message_count");
1619 }
1620 if usage.event_count > policy.max_events {
1621 return Some("event_count");
1622 }
1623 if let (Some(bytes), Some(limit)) = (usage.approx_bytes, policy.max_approx_bytes) {
1624 if bytes > limit {
1625 return Some("approx_bytes");
1626 }
1627 }
1628 None
1629}
1630
1631fn transcript_budget_usage_json(usage: &TranscriptBudgetUsage) -> serde_json::Value {
1632 serde_json::json!({
1633 "messages": usage.message_count,
1634 "events": usage.event_count,
1635 "approx_bytes": usage.approx_bytes,
1636 })
1637}
1638
1639fn transcript_budget_policy_json(policy: &SessionTranscriptBudgetPolicy) -> serde_json::Value {
1640 let recovery = match &policy.recovery {
1641 TranscriptBudgetRecovery::Reject => serde_json::json!({"action": "reject"}),
1642 TranscriptBudgetRecovery::Trim { keep_last } => {
1643 serde_json::json!({"action": "trim", "keep_last": keep_last})
1644 }
1645 TranscriptBudgetRecovery::Compact { keep_last } => {
1646 serde_json::json!({"action": "compact", "keep_last": keep_last})
1647 }
1648 };
1649 serde_json::json!({
1650 "max_messages": policy.max_messages,
1651 "max_events": policy.max_events,
1652 "max_approx_bytes": policy.max_approx_bytes,
1653 "recovery": recovery,
1654 })
1655}
1656
1657fn transcript_budget_recovery_name(recovery: &TranscriptBudgetRecovery) -> &'static str {
1658 match recovery {
1659 TranscriptBudgetRecovery::Reject => "reject",
1660 TranscriptBudgetRecovery::Trim { .. } => "trim",
1661 TranscriptBudgetRecovery::Compact { .. } => "compact",
1662 }
1663}
1664
1665fn transcript_budget_error(
1666 state: &SessionState,
1667 policy: &SessionTranscriptBudgetPolicy,
1668 usage: &TranscriptBudgetUsage,
1669 reason: &str,
1670) -> String {
1671 let byte_suffix = match (usage.approx_bytes, policy.max_approx_bytes) {
1672 (Some(bytes), Some(limit)) => format!(", approx_bytes {bytes}/{limit}"),
1673 _ => String::new(),
1674 };
1675 format!(
1676 "transcript budget exceeded for session '{}': {reason} (messages {}/{}, events {}/{}{}; recovery={})",
1677 state.id,
1678 usage.message_count,
1679 policy.max_messages,
1680 usage.event_count,
1681 policy.max_events,
1682 byte_suffix,
1683 transcript_budget_recovery_name(&policy.recovery),
1684 )
1685}
1686
1687fn transcript_budget_audit_json(
1688 action: &str,
1689 source: &str,
1690 reason: &str,
1691 policy: &SessionTranscriptBudgetPolicy,
1692 usage_before: &TranscriptBudgetUsage,
1693 usage_attempted: &TranscriptBudgetUsage,
1694 usage_after: &TranscriptBudgetUsage,
1695) -> serde_json::Value {
1696 serde_json::json!({
1697 "action": action,
1698 "source": source,
1699 "reason": reason,
1700 "policy": transcript_budget_policy_json(policy),
1701 "usage_before": transcript_budget_usage_json(usage_before),
1702 "usage_attempted": transcript_budget_usage_json(usage_attempted),
1703 "usage_after": transcript_budget_usage_json(usage_after),
1704 "removed_messages": usage_attempted.message_count.saturating_sub(usage_after.message_count),
1705 "removed_events": usage_attempted.event_count.saturating_sub(usage_after.event_count),
1706 })
1707}
1708
1709fn transcript_budget_event(audit: &serde_json::Value) -> VmValue {
1710 let action = audit
1711 .get("action")
1712 .and_then(serde_json::Value::as_str)
1713 .unwrap_or("enforced");
1714 crate::llm::helpers::transcript_event(
1715 "transcript_budget",
1716 "system",
1717 "internal",
1718 &format!("Transcript budget {action}."),
1719 Some(audit.clone()),
1720 )
1721}
1722
1723fn append_event_to_transcript(transcript: VmValue, event: VmValue) -> VmValue {
1724 let Some(dict) = transcript.as_dict() else {
1725 return transcript;
1726 };
1727 let mut next = dict.clone();
1728 let mut events = transcript_events_from_dict(&next);
1729 events.push(event);
1730 next.insert(
1731 "events".to_string(),
1732 VmValue::List(std::sync::Arc::new(events)),
1733 );
1734 VmValue::Dict(std::sync::Arc::new(next))
1735}
1736
1737fn tail_message_capacity(
1738 policy: &SessionTranscriptBudgetPolicy,
1739 reserve_audit_event: bool,
1740) -> usize {
1741 let event_capacity = tail_event_capacity(policy, usize::from(reserve_audit_event));
1742 policy.max_messages.min(event_capacity)
1743}
1744
1745fn tail_event_capacity(policy: &SessionTranscriptBudgetPolicy, reserved_events: usize) -> usize {
1746 policy.max_events.saturating_sub(reserved_events)
1747}
1748
1749fn trim_transcript_for_budget(
1750 transcript: &VmValue,
1751 policy: &SessionTranscriptBudgetPolicy,
1752 keep_last: usize,
1753) -> VmValue {
1754 let dict = transcript.as_dict().cloned().unwrap_or_else(BTreeMap::new);
1755 let messages = transcript_messages_from_dict(&dict);
1756 let keep = keep_last.min(tail_message_capacity(policy, true));
1757 let start = messages.len().saturating_sub(keep);
1758 let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
1759 let mut next = dict;
1760 next.insert(
1761 "events".to_string(),
1762 VmValue::List(std::sync::Arc::new(
1763 crate::llm::helpers::transcript_events_from_messages(&retained),
1764 )),
1765 );
1766 next.insert(
1767 "messages".to_string(),
1768 VmValue::List(std::sync::Arc::new(retained)),
1769 );
1770 next.remove("summary");
1771 VmValue::Dict(std::sync::Arc::new(next))
1772}
1773
1774struct BudgetCompactionLiveEvent {
1775 policy: crate::orchestration::CompactionPolicy,
1776 policy_strategy: String,
1777 metrics: crate::orchestration::TranscriptCompactedEventMetrics,
1778}
1779
1780struct BudgetCompactionResult {
1781 transcript: VmValue,
1782 live_event: Option<BudgetCompactionLiveEvent>,
1783}
1784
1785fn compact_transcript_for_budget(
1786 transcript: &VmValue,
1787 policy: &SessionTranscriptBudgetPolicy,
1788 keep_last: usize,
1789 session_id: &str,
1790) -> BudgetCompactionResult {
1791 let dict = transcript.as_dict().cloned().unwrap_or_else(BTreeMap::new);
1792 let messages = transcript_messages_from_dict(&dict);
1793 let message_capacity = policy.max_messages.min(tail_event_capacity(policy, 2));
1794 let tail_keep = keep_last.min(message_capacity.saturating_sub(2));
1797 let mut config = crate::orchestration::AutoCompactConfig {
1798 token_threshold: 0,
1799 keep_last: tail_keep,
1800 compact_strategy: crate::orchestration::CompactStrategy::Llm,
1801 hard_limit_strategy: crate::orchestration::CompactStrategy::Truncate,
1802 fallback_strategy: Some(crate::orchestration::CompactStrategy::Truncate),
1803 policy_strategy: crate::orchestration::compact_strategy_name(
1804 &crate::orchestration::CompactStrategy::Llm,
1805 )
1806 .to_string(),
1807 ..Default::default()
1808 };
1809
1810 let mut json_messages = messages
1811 .iter()
1812 .map(crate::llm::helpers::vm_value_to_json)
1813 .collect::<Vec<_>>();
1814 let lifecycle =
1815 crate::orchestration::CompactLifecycle::new(crate::orchestration::CompactMode::Auto)
1816 .with_session_id(Some(session_id))
1817 .with_trigger(crate::orchestration::CompactionTrigger::BudgetPressure)
1818 .with_hook_dispatch(false)
1819 .with_evaluate_providers(false);
1820 let llm_opts = crate::llm::extract_llm_options(&[
1821 VmValue::String(std::sync::Arc::from("")),
1822 VmValue::Nil,
1823 VmValue::Nil,
1824 ])
1825 .ok();
1826 let outcome = futures::executor::block_on(crate::orchestration::run_compaction_lifecycle(
1827 &mut json_messages,
1828 &mut config,
1829 llm_opts.as_ref(),
1830 lifecycle,
1831 ))
1832 .ok()
1833 .flatten();
1834
1835 let retained = json_messages
1836 .iter()
1837 .map(crate::stdlib::json_to_vm_value)
1838 .collect::<Vec<_>>();
1839 let mut events = crate::llm::helpers::transcript_events_from_messages(&retained);
1840 let summary = outcome.as_ref().map(|outcome| outcome.summary.clone());
1841 let mut live_event = None;
1842 if let Some(outcome) = outcome {
1843 events.push(crate::llm::helpers::transcript_event(
1844 "compaction",
1845 "system",
1846 "internal",
1847 "",
1848 Some(outcome.event_metadata.clone()),
1849 ));
1850 live_event = Some(BudgetCompactionLiveEvent {
1851 policy: config.policy.clone(),
1852 policy_strategy: outcome.policy_strategy,
1853 metrics: crate::orchestration::TranscriptCompactedEventMetrics {
1854 archived_messages: outcome.archived_messages,
1855 estimated_tokens_before: outcome.estimated_tokens_before,
1856 estimated_tokens_after: outcome.estimated_tokens_after,
1857 snapshot_asset_id: outcome.snapshot_asset_id,
1858 },
1859 });
1860 }
1861
1862 let mut next = dict;
1863 next.insert(
1864 "events".to_string(),
1865 VmValue::List(std::sync::Arc::new(events)),
1866 );
1867 next.insert(
1868 "messages".to_string(),
1869 VmValue::List(std::sync::Arc::new(retained)),
1870 );
1871 if let Some(summary) = summary {
1872 next.insert(
1873 "summary".to_string(),
1874 VmValue::String(std::sync::Arc::from(summary)),
1875 );
1876 } else {
1877 next.remove("summary");
1878 }
1879 BudgetCompactionResult {
1880 transcript: VmValue::Dict(std::sync::Arc::new(next)),
1881 live_event,
1882 }
1883}
1884
1885fn recovered_transcript_with_audit(
1886 recovered: VmValue,
1887 action: &str,
1888 source: &str,
1889 reason: &str,
1890 policy: &SessionTranscriptBudgetPolicy,
1891 usage_before: &TranscriptBudgetUsage,
1892 usage_attempted: &TranscriptBudgetUsage,
1893 include_bytes: bool,
1894) -> (VmValue, serde_json::Value, TranscriptBudgetUsage) {
1895 let usage_after_without_audit = transcript_usage(&recovered, include_bytes);
1896 let initial_audit = transcript_budget_audit_json(
1897 action,
1898 source,
1899 reason,
1900 policy,
1901 usage_before,
1902 usage_attempted,
1903 &usage_after_without_audit,
1904 );
1905 let with_initial_audit =
1906 append_event_to_transcript(recovered.clone(), transcript_budget_event(&initial_audit));
1907 let usage_after = transcript_usage(&with_initial_audit, include_bytes);
1908 let audit = transcript_budget_audit_json(
1909 action,
1910 source,
1911 reason,
1912 policy,
1913 usage_before,
1914 usage_attempted,
1915 &usage_after,
1916 );
1917 let with_audit = append_event_to_transcript(recovered, transcript_budget_event(&audit));
1918 let usage_after = transcript_usage(&with_audit, include_bytes);
1919 (with_audit, audit, usage_after)
1920}
1921
1922fn apply_transcript_with_budget(
1923 state: &mut SessionState,
1924 candidate: VmValue,
1925 source: &str,
1926) -> Result<(), String> {
1927 state.redo_stack.clear();
1928 let policy = state.transcript_budget_policy.normalized();
1929 let include_bytes = policy.max_approx_bytes.is_some();
1930 let usage_before = transcript_usage(&state.transcript, include_bytes);
1931 let usage_attempted = transcript_usage(&candidate, include_bytes);
1932 let Some(reason) = transcript_budget_exceeded_reason(&usage_attempted, &policy) else {
1933 state.transcript = candidate;
1934 return Ok(());
1935 };
1936
1937 match policy.recovery.clone() {
1938 TranscriptBudgetRecovery::Reject => {
1939 let audit = transcript_budget_audit_json(
1940 "rejected",
1941 source,
1942 reason,
1943 &policy,
1944 &usage_before,
1945 &usage_attempted,
1946 &usage_before,
1947 );
1948 state.last_transcript_budget_action = Some(audit);
1949 Err(transcript_budget_error(
1950 state,
1951 &policy,
1952 &usage_attempted,
1953 reason,
1954 ))
1955 }
1956 TranscriptBudgetRecovery::Trim { keep_last } => {
1957 let recovered = trim_transcript_for_budget(&candidate, &policy, keep_last);
1958 let (with_audit, audit, usage_after) = recovered_transcript_with_audit(
1959 recovered,
1960 "trimmed",
1961 source,
1962 reason,
1963 &policy,
1964 &usage_before,
1965 &usage_attempted,
1966 include_bytes,
1967 );
1968 if transcript_budget_exceeded_reason(&usage_after, &policy).is_some() {
1969 let rejected = transcript_budget_audit_json(
1970 "rejected",
1971 source,
1972 reason,
1973 &policy,
1974 &usage_before,
1975 &usage_attempted,
1976 &usage_after,
1977 );
1978 state.last_transcript_budget_action = Some(rejected);
1979 return Err(transcript_budget_error(
1980 state,
1981 &policy,
1982 &usage_after,
1983 reason,
1984 ));
1985 }
1986 state.last_transcript_budget_action = Some(audit);
1987 state.transcript = with_audit;
1988 Ok(())
1989 }
1990 TranscriptBudgetRecovery::Compact { keep_last } => {
1991 let compacted =
1992 compact_transcript_for_budget(&candidate, &policy, keep_last, &state.id);
1993 let (with_audit, audit, usage_after) = recovered_transcript_with_audit(
1994 compacted.transcript,
1995 "compacted",
1996 source,
1997 reason,
1998 &policy,
1999 &usage_before,
2000 &usage_attempted,
2001 include_bytes,
2002 );
2003 if transcript_budget_exceeded_reason(&usage_after, &policy).is_some() {
2004 let rejected = transcript_budget_audit_json(
2005 "rejected",
2006 source,
2007 reason,
2008 &policy,
2009 &usage_before,
2010 &usage_attempted,
2011 &usage_after,
2012 );
2013 state.last_transcript_budget_action = Some(rejected);
2014 return Err(transcript_budget_error(
2015 state,
2016 &policy,
2017 &usage_after,
2018 reason,
2019 ));
2020 }
2021 state.last_transcript_budget_action = Some(audit);
2022 state.transcript = with_audit;
2023 if let Some(event) = compacted.live_event {
2024 crate::orchestration::emit_transcript_compacted_event_sync(
2025 &state.id,
2026 crate::orchestration::CompactMode::Auto,
2027 crate::orchestration::CompactionTrigger::BudgetPressure
2028 .as_str()
2029 .to_string(),
2030 &event.policy,
2031 event.policy_strategy,
2032 event.metrics,
2033 );
2034 }
2035 Ok(())
2036 }
2037 }
2038}
2039
2040fn emit_llm_message_event(session_id: &str, message_index: usize, message: &VmValue) {
2041 let mut fields = serde_json::Map::new();
2042 fields.insert(
2043 "session_id".to_string(),
2044 serde_json::Value::String(session_id.to_string()),
2045 );
2046 fields.insert(
2047 "message_index".to_string(),
2048 serde_json::json!(message_index),
2049 );
2050 let message_json = crate::llm::helpers::vm_value_to_json(message);
2051 if let Some(role) = message_json.get("role").and_then(|value| value.as_str()) {
2052 fields.insert(
2053 "role".to_string(),
2054 serde_json::Value::String(role.to_string()),
2055 );
2056 }
2057 if let Some(content) = message_json.get("content") {
2058 fields.insert("content".to_string(), content.clone());
2059 }
2060 fields.insert("message".to_string(), message_json);
2061 crate::llm::append_observability_sidecar_entry("message", fields);
2062}
2063
2064pub fn seed_from_messages(
2070 id: Option<String>,
2071 messages: &[serde_json::Value],
2072 metadata: serde_json::Value,
2073 system_prompt: Option<String>,
2074 tool_format: Option<String>,
2075) -> Result<String, String> {
2076 let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
2077 if exists(&resolved) {
2078 return Err(format!("agent session '{resolved}' already exists"));
2079 }
2080 open_or_create(Some(resolved.clone()));
2081 SESSIONS.with(|s| {
2082 let mut map = s.borrow_mut();
2083 let Some(state) = map.get_mut(&resolved) else {
2084 return Err(format!("failed to create agent session '{resolved}'"));
2085 };
2086 state.tool_format = tool_format.filter(|value| !value.trim().is_empty());
2087 state.system_prompt = system_prompt.filter(|value| !value.trim().is_empty());
2088
2089 let mut metadata = metadata
2090 .as_object()
2091 .cloned()
2092 .unwrap_or_else(serde_json::Map::new);
2093 if let Some(tool_format) = state.tool_format.as_ref() {
2094 metadata.insert(
2095 "tool_format".to_string(),
2096 serde_json::Value::String(tool_format.clone()),
2097 );
2098 metadata.insert(
2099 "tool_mode_locked".to_string(),
2100 serde_json::Value::Bool(true),
2101 );
2102 }
2103 if let Some(system_prompt) = state.system_prompt.as_ref() {
2104 metadata.insert(
2105 "system_prompt".to_string(),
2106 crate::llm::helpers::system_prompt_metadata(system_prompt),
2107 );
2108 }
2109 let vm_messages = crate::llm::helpers::json_messages_to_vm(messages);
2110 let candidate = crate::llm::helpers::new_transcript_with(
2111 Some(resolved.clone()),
2112 vm_messages,
2113 None,
2114 Some(crate::stdlib::json_to_vm_value(&serde_json::Value::Object(
2115 metadata,
2116 ))),
2117 );
2118 apply_transcript_with_budget(state, candidate, "seed_from_messages")?;
2119 state.last_accessed = Instant::now();
2120 Ok(resolved)
2121 })
2122}
2123
2124pub fn messages_json(id: &str) -> Vec<serde_json::Value> {
2128 SESSIONS.with(|s| {
2129 let map = s.borrow();
2130 let Some(state) = map.get(id) else {
2131 return Vec::new();
2132 };
2133 let Some(dict) = state.transcript.as_dict() else {
2134 return Vec::new();
2135 };
2136 match dict.get("messages") {
2137 Some(VmValue::List(list)) => list
2138 .iter()
2139 .map(crate::llm::helpers::vm_value_to_json)
2140 .collect(),
2141 _ => Vec::new(),
2142 }
2143 })
2144}
2145
2146#[derive(Clone, Debug, Default)]
2147pub struct SessionPromptState {
2148 pub messages: Vec<serde_json::Value>,
2149 pub summary: Option<String>,
2150}
2151
2152fn summary_message_json(summary: &str) -> serde_json::Value {
2153 serde_json::json!({
2154 "role": "user",
2155 "content": summary,
2156 })
2157}
2158
2159fn messages_begin_with_summary(messages: &[serde_json::Value], summary: &str) -> bool {
2160 messages.first().is_some_and(|message| {
2161 message.get("role").and_then(|value| value.as_str()) == Some("user")
2162 && message.get("content").and_then(|value| value.as_str()) == Some(summary)
2163 })
2164}
2165
2166pub fn prompt_state_json(id: &str) -> SessionPromptState {
2174 SESSIONS.with(|s| {
2175 let map = s.borrow();
2176 let Some(state) = map.get(id) else {
2177 return SessionPromptState::default();
2178 };
2179 let Some(dict) = state.transcript.as_dict() else {
2180 return SessionPromptState::default();
2181 };
2182 let mut messages = match dict.get("messages") {
2183 Some(VmValue::List(list)) => list
2184 .iter()
2185 .map(crate::llm::helpers::vm_value_to_json)
2186 .collect::<Vec<_>>(),
2187 _ => Vec::new(),
2188 };
2189 let summary = dict.get("summary").and_then(|value| match value {
2190 VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
2191 _ => None,
2192 });
2193 if let Some(summary_text) = summary.as_deref() {
2194 if !messages_begin_with_summary(&messages, summary_text) {
2195 messages.insert(0, summary_message_json(summary_text));
2196 }
2197 }
2198 SessionPromptState { messages, summary }
2199 })
2200}
2201
2202pub fn store_transcript(id: &str, transcript: VmValue) -> Result<(), String> {
2205 SESSIONS.with(|s| {
2206 let mut map = s.borrow_mut();
2207 let Some(state) = map.get_mut(id) else {
2208 return Err(format!(
2209 "agent_session_store_transcript: unknown session id '{id}'"
2210 ));
2211 };
2212 let transcript = transcript_with_session_metadata(transcript, state);
2213 apply_transcript_with_budget(state, transcript, "store_transcript")?;
2214 state.last_accessed = Instant::now();
2215 Ok(())
2216 })
2217}
2218
2219fn checkpoint_summary(checkpoint: &SessionTurnCheckpoint) -> SessionCheckpointSummary {
2220 SessionCheckpointSummary {
2221 checkpoint_id: checkpoint.checkpoint_id.clone(),
2222 before_message_count: checkpoint.before_message_count,
2223 after_message_count: checkpoint.after_message_count,
2224 fs_snapshot_ids: checkpoint.fs_snapshot_ids.clone(),
2225 }
2226}
2227
2228fn checkpoint_error_status(error: SessionCheckpointError) -> &'static str {
2229 match error {
2230 SessionCheckpointError::UnknownSession => "unknown_session",
2231 SessionCheckpointError::NoCheckpoint => "no_checkpoint",
2232 SessionCheckpointError::NoRedo => "no_redo",
2233 }
2234}
2235
2236pub fn checkpoint_status_name(error: SessionCheckpointError) -> &'static str {
2237 checkpoint_error_status(error)
2238}
2239
2240pub fn invalidate_redo(id: &str) -> bool {
2243 SESSIONS.with(|s| {
2244 let mut map = s.borrow_mut();
2245 let Some(state) = map.get_mut(id) else {
2246 return false;
2247 };
2248 let had_redo = !state.redo_stack.is_empty();
2249 state.redo_stack.clear();
2250 state.last_accessed = Instant::now();
2251 had_redo
2252 })
2253}
2254
2255pub fn record_completed_turn_checkpoint(
2262 id: &str,
2263 before_transcript: VmValue,
2264 fs_snapshot_ids: Vec<String>,
2265) -> Result<Option<SessionCheckpointSummary>, SessionCheckpointError> {
2266 SESSIONS.with(|s| {
2267 let mut map = s.borrow_mut();
2268 let Some(state) = map.get_mut(id) else {
2269 return Err(SessionCheckpointError::UnknownSession);
2270 };
2271 let after_transcript = transcript_with_session_metadata(state.transcript.clone(), state);
2272 let before_message_count = transcript_message_count(&before_transcript);
2273 let after_message_count = transcript_message_count(&after_transcript);
2274 if crate::values_equal(&before_transcript, &after_transcript) && fs_snapshot_ids.is_empty()
2275 {
2276 return Ok(None);
2277 }
2278 let checkpoint = SessionTurnCheckpoint {
2279 checkpoint_id: format!("turn_{}", uuid::Uuid::now_v7().simple()),
2280 completed_at: crate::orchestration::now_rfc3339(),
2281 before_message_count,
2282 after_message_count,
2283 before_transcript,
2284 after_transcript,
2285 fs_snapshot_ids,
2286 };
2287 state.redo_stack.clear();
2288 state.completed_turn_checkpoints.push(checkpoint.clone());
2289 state.last_accessed = Instant::now();
2290 Ok(Some(checkpoint_summary(&checkpoint)))
2291 })
2292}
2293
2294pub fn rollback_plan(id: &str) -> Result<SessionCheckpointSummary, SessionCheckpointError> {
2295 SESSIONS.with(|s| {
2296 let map = s.borrow();
2297 let Some(state) = map.get(id) else {
2298 return Err(SessionCheckpointError::UnknownSession);
2299 };
2300 state
2301 .completed_turn_checkpoints
2302 .last()
2303 .map(checkpoint_summary)
2304 .ok_or(SessionCheckpointError::NoCheckpoint)
2305 })
2306}
2307
2308pub fn redo_plan(id: &str) -> Result<SessionCheckpointSummary, SessionCheckpointError> {
2309 SESSIONS.with(|s| {
2310 let map = s.borrow();
2311 let Some(state) = map.get(id) else {
2312 return Err(SessionCheckpointError::UnknownSession);
2313 };
2314 state
2315 .redo_stack
2316 .last()
2317 .map(|entry| {
2318 let mut summary = checkpoint_summary(&entry.checkpoint);
2319 summary.fs_snapshot_ids = entry.redo_fs_snapshot_ids.clone();
2320 summary
2321 })
2322 .ok_or(SessionCheckpointError::NoRedo)
2323 })
2324}
2325
2326pub fn rollback_last_completed_turn(
2327 id: &str,
2328 redo_fs_snapshot_ids: Vec<String>,
2329) -> Result<SessionCheckpointOutcome, SessionCheckpointError> {
2330 SESSIONS.with(|s| {
2331 let mut map = s.borrow_mut();
2332 let Some(state) = map.get_mut(id) else {
2333 return Err(SessionCheckpointError::UnknownSession);
2334 };
2335 let Some(checkpoint) = state.completed_turn_checkpoints.pop() else {
2336 return Err(SessionCheckpointError::NoCheckpoint);
2337 };
2338 state.transcript = checkpoint.before_transcript.clone();
2339 state.redo_stack.push(SessionRedoEntry {
2340 checkpoint: checkpoint.clone(),
2341 redo_fs_snapshot_ids: redo_fs_snapshot_ids.clone(),
2342 });
2343 state.last_accessed = Instant::now();
2344 Ok(SessionCheckpointOutcome {
2345 status: "rolled_back",
2346 checkpoint: checkpoint_summary(&checkpoint),
2347 redo_fs_snapshot_ids,
2348 })
2349 })
2350}
2351
2352pub fn redo_last_rollback(id: &str) -> Result<SessionCheckpointOutcome, SessionCheckpointError> {
2353 SESSIONS.with(|s| {
2354 let mut map = s.borrow_mut();
2355 let Some(state) = map.get_mut(id) else {
2356 return Err(SessionCheckpointError::UnknownSession);
2357 };
2358 let Some(entry) = state.redo_stack.pop() else {
2359 return Err(SessionCheckpointError::NoRedo);
2360 };
2361 let checkpoint = entry.checkpoint;
2362 state.transcript = checkpoint.after_transcript.clone();
2363 state.completed_turn_checkpoints.push(checkpoint.clone());
2364 state.last_accessed = Instant::now();
2365 Ok(SessionCheckpointOutcome {
2366 status: "redone",
2367 checkpoint: checkpoint_summary(&checkpoint),
2368 redo_fs_snapshot_ids: entry.redo_fs_snapshot_ids,
2369 })
2370 })
2371}
2372
2373pub fn prune_invalid_reminder_events(id: &str) -> usize {
2377 SESSIONS.with(|s| {
2378 let mut map = s.borrow_mut();
2379 let Some(state) = map.get_mut(id) else {
2380 return 0;
2381 };
2382 let Some(dict) = state.transcript.as_dict().cloned() else {
2383 return 0;
2384 };
2385 let Some(VmValue::List(events)) = dict.get("events") else {
2386 return 0;
2387 };
2388 let mut pruned = 0_usize;
2389 let mut kept = Vec::with_capacity(events.len());
2390 for event in events.iter().cloned() {
2391 let is_reminder = event
2392 .as_dict()
2393 .and_then(|event| event.get("kind"))
2394 .map(VmValue::display)
2395 .as_deref()
2396 == Some(crate::llm::helpers::SYSTEM_REMINDER_EVENT_KIND);
2397 if !is_reminder {
2398 kept.push(event);
2399 continue;
2400 }
2401 let valid = crate::llm::helpers::reminder_from_event(&event)
2402 .is_some_and(|reminder| !reminder.body.trim().is_empty());
2403 if valid {
2404 kept.push(event);
2405 } else {
2406 pruned += 1;
2407 }
2408 }
2409 if pruned > 0 {
2410 let mut next = dict;
2411 next.insert(
2412 "events".to_string(),
2413 VmValue::List(std::sync::Arc::new(kept)),
2414 );
2415 let _ = apply_transcript_with_budget(
2416 state,
2417 VmValue::Dict(std::sync::Arc::new(next)),
2418 "prune_invalid_reminder_events",
2419 );
2420 state.last_accessed = Instant::now();
2421 }
2422 pruned
2423 })
2424}
2425
2426pub fn apply_reminder_post_turn(id: &str, turn: i64) -> Result<serde_json::Value, String> {
2431 let report = SESSIONS.with(|s| {
2432 let mut map = s.borrow_mut();
2433 let Some(state) = map.get_mut(id) else {
2434 return Err(format!(
2435 "agent_session_apply_reminder_post_turn: unknown session id '{id}'"
2436 ));
2437 };
2438 let report = crate::llm::helpers::apply_reminder_post_turn(&state.transcript, turn);
2439 if report.decremented_count > 0 || !report.expired.is_empty() {
2440 if let Some(next) = report.transcript.clone() {
2441 apply_transcript_with_budget(state, next, "apply_reminder_post_turn")?;
2442 }
2443 state.last_accessed = Instant::now();
2444 }
2445 Ok(report)
2446 })?;
2447
2448 for reminder in &report.expired {
2449 let mut payload = crate::llm::helpers::reminder_lifecycle_payload(Some(id), reminder);
2450 if let Some(obj) = payload.as_object_mut() {
2451 obj.insert(
2452 "transcript_id".to_string(),
2453 serde_json::Value::String(id.to_string()),
2454 );
2455 obj.insert(
2456 "reason".to_string(),
2457 serde_json::Value::String("ttl".to_string()),
2458 );
2459 obj.insert(
2460 "ttl_turns_before".to_string(),
2461 serde_json::json!(&reminder.ttl_turns),
2462 );
2463 obj.insert("expired_at_turn".to_string(), serde_json::json!(turn));
2464 }
2465 crate::llm::helpers::emit_reminder_lifecycle_event(
2466 crate::llm::helpers::REMINDER_EXPIRED_EVENT_KIND,
2467 payload,
2468 );
2469 }
2470
2471 Ok(serde_json::json!({
2472 "expired_count": report.expired.len(),
2473 "decremented_count": report.decremented_count,
2474 "remaining_count": report.remaining_count,
2475 }))
2476}
2477
2478pub fn inject_reminder(
2483 id: &str,
2484 reminder: crate::llm::helpers::SystemReminder,
2485) -> Result<ReminderInjectionReport, String> {
2486 let reminder_id = reminder.id.clone();
2487 let dedupe_key = reminder.dedupe_key.clone();
2488 let mut deduped_reminder_ids = Vec::new();
2489 SESSIONS.with(|s| {
2490 let mut map = s.borrow_mut();
2491 let Some(state) = map.get_mut(id) else {
2492 return Err(format!(
2493 "agent_session_inject_reminder: unknown session id '{id}'"
2494 ));
2495 };
2496 let dict = state
2497 .transcript
2498 .as_dict()
2499 .cloned()
2500 .unwrap_or_else(BTreeMap::new);
2501 let mut events: Vec<VmValue> = match dict.get("events") {
2502 Some(VmValue::List(list)) => list.iter().cloned().collect(),
2503 _ => dict
2504 .get("messages")
2505 .and_then(|value| match value {
2506 VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
2507 _ => None,
2508 })
2509 .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
2510 .unwrap_or_default(),
2511 };
2512 if let Some(expected_key) = dedupe_key.as_deref() {
2513 events.retain(|event| {
2514 let Some(existing) = crate::llm::helpers::reminder_from_event(event) else {
2515 return true;
2516 };
2517 if existing.dedupe_key.as_deref() == Some(expected_key) {
2518 deduped_reminder_ids.push(existing.id);
2519 false
2520 } else {
2521 true
2522 }
2523 });
2524 }
2525 events.push(crate::llm::helpers::transcript_reminder_event(&reminder));
2526 let mut next = dict;
2527 next.insert(
2528 "events".to_string(),
2529 VmValue::List(std::sync::Arc::new(events)),
2530 );
2531 apply_transcript_with_budget(
2532 state,
2533 VmValue::Dict(std::sync::Arc::new(next)),
2534 "inject_reminder",
2535 )?;
2536 state.last_accessed = Instant::now();
2537 Ok(())
2538 })?;
2539
2540 if !deduped_reminder_ids.is_empty() {
2541 let dropped_count = deduped_reminder_ids.len();
2542 crate::llm::helpers::emit_reminder_lifecycle_event(
2543 crate::llm::helpers::REMINDER_DEDUPED_EVENT_KIND,
2544 serde_json::json!({
2545 "session_id": id,
2546 "transcript_id": id,
2547 "reminder_id": &reminder_id,
2548 "replacing_id": &reminder_id,
2549 "replaced_id": deduped_reminder_ids.first(),
2550 "replaced_ids": &deduped_reminder_ids,
2551 "dedupe_key": &dedupe_key,
2552 "dropped_reminder_ids": &deduped_reminder_ids,
2553 "dropped_count": dropped_count,
2554 }),
2555 );
2556 }
2557
2558 crate::llm::helpers::emit_reminder_lifecycle_event(
2559 crate::llm::helpers::REMINDER_INJECTED_EVENT_KIND,
2560 crate::llm::helpers::reminder_lifecycle_payload(Some(id), &reminder),
2561 );
2562
2563 Ok(ReminderInjectionReport {
2564 reminder_id,
2565 deduped_count: deduped_reminder_ids.len(),
2566 })
2567}
2568
2569pub fn append_event(id: &str, event: VmValue) -> Result<(), String> {
2575 let Some(event_dict) = event.as_dict() else {
2576 return Err("agent_session_append_event: event must be a dict".into());
2577 };
2578 let kind_ok = matches!(event_dict.get("kind"), Some(VmValue::String(_)));
2579 if !kind_ok {
2580 return Err("agent_session_append_event: event must have a string `kind`".into());
2581 }
2582 SESSIONS.with(|s| {
2583 let mut map = s.borrow_mut();
2584 let Some(state) = map.get_mut(id) else {
2585 return Err(format!(
2586 "agent_session_append_event: unknown session id '{id}'"
2587 ));
2588 };
2589 append_event_to_state(state, event, "append_event")?;
2590 state.last_accessed = Instant::now();
2591 Ok(())
2592 })
2593}
2594
2595fn append_event_to_state(
2596 state: &mut SessionState,
2597 event: VmValue,
2598 action: &str,
2599) -> Result<(), String> {
2600 let dict = state
2601 .transcript
2602 .as_dict()
2603 .cloned()
2604 .unwrap_or_else(BTreeMap::new);
2605 let mut events: Vec<VmValue> = match dict.get("events") {
2606 Some(VmValue::List(list)) => list.iter().cloned().collect(),
2607 _ => dict
2608 .get("messages")
2609 .and_then(|value| match value {
2610 VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
2611 _ => None,
2612 })
2613 .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
2614 .unwrap_or_default(),
2615 };
2616 events.push(event);
2617 let mut next = dict;
2618 next.insert(
2619 "events".to_string(),
2620 VmValue::List(std::sync::Arc::new(events)),
2621 );
2622 apply_transcript_with_budget(state, VmValue::Dict(std::sync::Arc::new(next)), action)
2623}
2624
2625pub fn replace_messages(id: &str, messages: &[serde_json::Value]) -> Result<(), String> {
2628 replace_messages_with_summary(id, messages, None)
2629}
2630
2631pub fn replace_messages_with_summary(
2636 id: &str,
2637 messages: &[serde_json::Value],
2638 summary: Option<&str>,
2639) -> Result<(), String> {
2640 SESSIONS.with(|s| {
2641 let mut map = s.borrow_mut();
2642 let Some(state) = map.get_mut(id) else {
2643 return Err(format!(
2644 "agent_session_replace_messages: unknown session id '{id}'"
2645 ));
2646 };
2647 let dict = state
2648 .transcript
2649 .as_dict()
2650 .cloned()
2651 .unwrap_or_else(BTreeMap::new);
2652 let vm_messages: Vec<VmValue> = messages
2653 .iter()
2654 .map(crate::stdlib::json_to_vm_value)
2655 .collect();
2656 let mut next = dict;
2657 next.insert(
2658 "events".to_string(),
2659 VmValue::List(std::sync::Arc::new(
2660 crate::llm::helpers::transcript_events_from_messages(&vm_messages),
2661 )),
2662 );
2663 next.insert(
2664 "messages".to_string(),
2665 VmValue::List(std::sync::Arc::new(vm_messages)),
2666 );
2667 if let Some(summary) = summary {
2668 next.insert(
2669 "summary".to_string(),
2670 VmValue::String(std::sync::Arc::from(summary.to_string())),
2671 );
2672 } else {
2673 next.remove("summary");
2674 }
2675 apply_transcript_with_budget(
2676 state,
2677 VmValue::Dict(std::sync::Arc::new(next)),
2678 "replace_messages",
2679 )?;
2680 state.last_accessed = Instant::now();
2681 Ok(())
2682 })
2683}
2684
2685pub fn append_subscriber(id: &str, callback: VmValue) {
2686 open_or_create(Some(id.to_string()));
2687 SESSIONS.with(|s| {
2688 if let Some(state) = s.borrow_mut().get_mut(id) {
2689 state.subscribers.push(callback);
2690 state.last_accessed = Instant::now();
2691 }
2692 });
2693}
2694
2695pub fn subscribers_for(id: &str) -> Vec<VmValue> {
2696 SESSIONS.with(|s| {
2697 s.borrow()
2698 .get(id)
2699 .map(|state| state.subscribers.clone())
2700 .unwrap_or_default()
2701 })
2702}
2703
2704pub fn subscriber_count(id: &str) -> usize {
2705 SESSIONS.with(|s| {
2706 s.borrow()
2707 .get(id)
2708 .map(|state| state.subscribers.len())
2709 .unwrap_or(0)
2710 })
2711}
2712
2713pub fn set_active_skills(id: &str, skills: Vec<String>) {
2717 SESSIONS.with(|s| {
2718 if let Some(state) = s.borrow_mut().get_mut(id) {
2719 state.active_skills = skills;
2720 state.last_accessed = Instant::now();
2721 }
2722 });
2723}
2724
2725pub fn active_skills(id: &str) -> Vec<String> {
2729 SESSIONS.with(|s| {
2730 s.borrow()
2731 .get(id)
2732 .map(|state| state.active_skills.clone())
2733 .unwrap_or_default()
2734 })
2735}
2736
2737pub fn claim_tool_format(id: &str, tool_format: &str) -> Result<(), String> {
2743 let tool_format = tool_format.trim();
2744 if tool_format.is_empty() {
2745 return Ok(());
2746 }
2747 SESSIONS.with(|s| {
2748 let mut map = s.borrow_mut();
2749 let Some(state) = map.get_mut(id) else {
2750 return Err(format!("agent session '{id}' does not exist"));
2751 };
2752 match state.tool_format.as_deref() {
2753 Some(existing) if existing != tool_format => Err(format!(
2754 "agent session '{id}' is locked to tool_format='{existing}', but this run requested tool_format='{tool_format}'. Start a new session or fork/reset the transcript before changing tool mode."
2755 )),
2756 Some(_) => {
2757 state.last_accessed = Instant::now();
2758 Ok(())
2759 }
2760 None => {
2761 state.tool_format = Some(tool_format.to_string());
2762 state.last_accessed = Instant::now();
2763 Ok(())
2764 }
2765 }
2766 })
2767}
2768
2769pub fn tool_format(id: &str) -> Option<String> {
2770 SESSIONS.with(|s| {
2771 s.borrow()
2772 .get(id)
2773 .and_then(|state| state.tool_format.clone())
2774 })
2775}
2776
2777pub fn record_system_prompt(id: &str, system_prompt: &str) -> Result<(), String> {
2778 let system_prompt = system_prompt.trim();
2779 if system_prompt.is_empty() {
2780 return Ok(());
2781 }
2782 assert_cache_stable_system_prompt(system_prompt);
2783 SESSIONS.with(|s| {
2784 let mut map = s.borrow_mut();
2785 let Some(state) = map.get_mut(id) else {
2786 return Err(format!("agent session '{id}' does not exist"));
2787 };
2788 let changed = state.system_prompt.as_deref() != Some(system_prompt);
2789 state.system_prompt = Some(system_prompt.to_string());
2790 let dict = state
2791 .transcript
2792 .as_dict()
2793 .cloned()
2794 .unwrap_or_else(BTreeMap::new);
2795 let mut next = dict;
2796 apply_system_prompt_metadata(&mut next, system_prompt);
2797 if changed {
2798 let mut events: Vec<VmValue> = match next.get("events") {
2799 Some(VmValue::List(list)) => list.iter().cloned().collect(),
2800 _ => Vec::new(),
2801 };
2802 events.push(crate::llm::helpers::transcript_event(
2803 "system_prompt",
2804 "system",
2805 "internal",
2806 "",
2807 Some(crate::llm::helpers::system_prompt_event_metadata(
2808 system_prompt,
2809 )),
2810 ));
2811 next.insert(
2812 "events".to_string(),
2813 VmValue::List(std::sync::Arc::new(events)),
2814 );
2815 }
2816 apply_transcript_with_budget(
2817 state,
2818 VmValue::Dict(std::sync::Arc::new(next)),
2819 "record_system_prompt",
2820 )?;
2821 state.last_accessed = Instant::now();
2822 Ok(())
2823 })
2824}
2825
2826pub fn system_prompt(id: &str) -> Option<String> {
2827 SESSIONS.with(|s| {
2828 s.borrow()
2829 .get(id)
2830 .and_then(|state| state.system_prompt.clone())
2831 })
2832}
2833
2834#[cfg(debug_assertions)]
2835fn forbidden_workspace_prompt_token(system_prompt: &str) -> Option<&'static str> {
2836 let mut remaining = system_prompt;
2837 while let Some(index) = remaining.find("{{") {
2838 let candidate = remaining[index + 2..].trim_start();
2839 if candidate.starts_with("workspace_") {
2840 return Some("workspace_");
2841 }
2842 if candidate.starts_with("project_") {
2843 return Some("project_");
2844 }
2845 remaining = candidate;
2846 }
2847 None
2848}
2849
2850#[cfg(debug_assertions)]
2851fn assert_cache_stable_system_prompt(system_prompt: &str) {
2852 if let Some(prefix) = forbidden_workspace_prompt_token(system_prompt) {
2853 panic!(
2854 "{CACHE_STABLE_SYSTEM_PROMPT_DIAGNOSTIC}: session system prompts must not interpolate `{{{{{prefix}...` tokens; move workspace/project context into the workspace-anchor reminder"
2855 );
2856 }
2857}
2858
2859#[cfg(not(debug_assertions))]
2860fn assert_cache_stable_system_prompt(_system_prompt: &str) {}
2861
2862pub fn set_pinned_model(id: &str, model: Option<String>) -> Result<bool, String> {
2867 let normalized = model
2868 .map(|value| value.trim().to_string())
2869 .filter(|value| !value.is_empty());
2870 SESSIONS.with(|s| {
2871 let mut map = s.borrow_mut();
2872 let Some(state) = map.get_mut(id) else {
2873 return Err(format!("agent session '{id}' does not exist"));
2874 };
2875 let changed = state.pinned_model != normalized;
2876 state.pinned_model = normalized;
2877 state.last_accessed = Instant::now();
2878 Ok(changed)
2879 })
2880}
2881
2882pub fn pinned_model(id: &str) -> Option<String> {
2886 SESSIONS.with(|s| {
2887 s.borrow()
2888 .get(id)
2889 .and_then(|state| state.pinned_model.clone())
2890 })
2891}
2892
2893pub fn set_pinned_reasoning_policy(id: &str, policy: Option<String>) -> Result<bool, String> {
2895 let normalized = match policy {
2896 Some(value) => crate::llm::reasoning_policy::normalize_policy_selector(&value)?,
2897 None => None,
2898 };
2899 SESSIONS.with(|s| {
2900 let mut map = s.borrow_mut();
2901 let Some(state) = map.get_mut(id) else {
2902 return Err(format!("agent session '{id}' does not exist"));
2903 };
2904 let changed = state.pinned_reasoning_policy != normalized;
2905 state.pinned_reasoning_policy = normalized;
2906 state.last_accessed = Instant::now();
2907 Ok(changed)
2908 })
2909}
2910
2911pub fn pinned_reasoning_policy(id: &str) -> Option<String> {
2913 SESSIONS.with(|s| {
2914 s.borrow()
2915 .get(id)
2916 .and_then(|state| state.pinned_reasoning_policy.clone())
2917 })
2918}
2919
2920pub fn set_workspace_anchor(id: &str, anchor: Option<WorkspaceAnchor>) -> Result<bool, String> {
2924 SESSIONS.with(|s| {
2925 let mut map = s.borrow_mut();
2926 let Some(state) = map.get_mut(id) else {
2927 return Err(format!("agent session '{id}' does not exist"));
2928 };
2929 let changed = state.workspace_anchor != anchor;
2930 state.workspace_anchor = anchor;
2931 if changed {
2932 state.redo_stack.clear();
2933 crate::llm::permissions::clear_session_grants(id);
2934 }
2935 state.last_accessed = Instant::now();
2936 Ok(changed)
2937 })
2938}
2939
2940pub fn workspace_anchor(id: &str) -> Option<WorkspaceAnchor> {
2942 SESSIONS.with(|s| {
2943 s.borrow()
2944 .get(id)
2945 .and_then(|state| state.workspace_anchor.clone())
2946 })
2947}
2948
2949#[derive(Clone, Debug, PartialEq, Eq)]
2953pub struct ReanchorOutcome {
2954 pub previous: Option<WorkspaceAnchor>,
2955 pub current: WorkspaceAnchor,
2956 pub changed: bool,
2957}
2958
2959pub fn reanchor_session(
2964 id: &str,
2965 new_anchor: WorkspaceAnchor,
2966 carry_transcript: bool,
2967 compacted: bool,
2968 reason: Option<String>,
2969) -> Result<ReanchorOutcome, String> {
2970 let outcome = SESSIONS.with(|s| {
2971 let mut map = s.borrow_mut();
2972 let Some(state) = map.get_mut(id) else {
2973 return Err(format!("agent session '{id}' does not exist"));
2974 };
2975 let previous = state.workspace_anchor.clone();
2976 let changed = previous.as_ref() != Some(&new_anchor);
2977 state.workspace_anchor = Some(new_anchor.clone());
2978 if changed {
2979 crate::llm::permissions::clear_session_grants(id);
2980 }
2981 state.last_accessed = Instant::now();
2982 Ok(ReanchorOutcome {
2983 previous,
2984 current: new_anchor,
2985 changed,
2986 })
2987 })?;
2988 if !outcome.changed {
2989 return Ok(outcome);
2990 }
2991 let previous_json = outcome.previous.as_ref().map(WorkspaceAnchor::to_json);
2992 let current_json = outcome.current.to_json();
2993 let event_metadata = serde_json::json!({
2994 "previous": previous_json,
2995 "current": current_json,
2996 "carry_transcript": carry_transcript,
2997 "compacted": compacted,
2998 "reason": reason,
2999 });
3000 let event = crate::llm::helpers::transcript_event(
3001 "AnchorChanged",
3002 "system",
3003 "internal",
3004 "",
3005 Some(event_metadata),
3006 );
3007 let _ = append_event(id, event);
3008 crate::llm::emit_live_agent_event_sync(&crate::agent_events::AgentEvent::AnchorChanged {
3009 session_id: id.to_string(),
3010 previous: previous_json,
3011 current: current_json,
3012 carry_transcript,
3013 compacted,
3014 reason,
3015 });
3016 Ok(outcome)
3017}
3018
3019pub fn set_workspace_policy(id: &str, policy: WorkspacePolicy) -> Result<bool, String> {
3022 SESSIONS.with(|s| {
3023 let mut map = s.borrow_mut();
3024 let Some(state) = map.get_mut(id) else {
3025 return Err(format!("agent session '{id}' does not exist"));
3026 };
3027 let changed = state.workspace_policy != policy;
3028 state.workspace_policy = policy;
3029 if changed {
3030 state.redo_stack.clear();
3031 }
3032 state.last_accessed = Instant::now();
3033 Ok(changed)
3034 })
3035}
3036
3037pub fn workspace_policy(id: &str) -> Option<WorkspacePolicy> {
3039 SESSIONS.with(|s| {
3040 s.borrow()
3041 .get(id)
3042 .map(|state| state.workspace_policy.clone())
3043 })
3044}
3045
3046pub fn add_workspace_root(
3050 id: &str,
3051 root: &str,
3052 mount_mode: Option<MountMode>,
3053 reason: Option<String>,
3054) -> Result<String, String> {
3055 let normalized_root = validate_workspace_root_path(root)?;
3056 let mounted_at = crate::orchestration::now_rfc3339();
3057 SESSIONS.with(|s| {
3058 let mut map = s.borrow_mut();
3059 let Some(state) = map.get_mut(id) else {
3060 return Err(format!("agent session '{id}' does not exist"));
3061 };
3062 let default_mount_mode = state.workspace_policy.default_mount_mode;
3063 let Some(anchor) = state.workspace_anchor.as_mut() else {
3064 return Err(format!("agent session '{id}' has no workspace anchor"));
3065 };
3066 let resolved_mount_mode = mount_mode.unwrap_or(default_mount_mode);
3067 if let Some(existing) = anchor
3068 .additional_roots
3069 .iter_mut()
3070 .find(|entry| entry.path == normalized_root)
3071 {
3072 let changed =
3073 existing.mount_mode != resolved_mount_mode || existing.mounted_at != mounted_at;
3074 existing.mount_mode = resolved_mount_mode;
3075 existing.mounted_at = mounted_at.clone();
3076 if changed {
3077 state.redo_stack.clear();
3078 }
3079 } else {
3080 anchor.additional_roots.push(MountedRoot {
3081 path: normalized_root.clone(),
3082 mount_mode: resolved_mount_mode,
3083 mounted_at: mounted_at.clone(),
3084 });
3085 state.redo_stack.clear();
3086 }
3087 let event = crate::llm::helpers::transcript_event(
3088 "RootMounted",
3089 "system",
3090 "internal",
3091 "",
3092 Some(serde_json::json!({
3093 "path": normalized_root.to_string_lossy(),
3094 "mount_mode": resolved_mount_mode.as_str(),
3095 "mounted_at": mounted_at.clone(),
3096 "reason": reason,
3097 })),
3098 );
3099 append_event_to_state(state, event, "add_workspace_root")?;
3100 crate::llm::permissions::clear_session_grants(id);
3101 state.last_accessed = Instant::now();
3102 Ok(mounted_at.clone())
3103 })
3104}
3105
3106pub fn remove_workspace_root(id: &str, root: &str) -> Result<bool, String> {
3109 let normalized_root = normalize_workspace_root_path(root);
3110 SESSIONS.with(|s| {
3111 let mut map = s.borrow_mut();
3112 let Some(state) = map.get_mut(id) else {
3113 return Err(format!("agent session '{id}' does not exist"));
3114 };
3115 let Some(anchor) = state.workspace_anchor.as_mut() else {
3116 return Err(format!("agent session '{id}' has no workspace anchor"));
3117 };
3118 let before = anchor.additional_roots.len();
3119 anchor
3120 .additional_roots
3121 .retain(|entry| entry.path != normalized_root);
3122 let removed = anchor.additional_roots.len() != before;
3123 if removed {
3124 state.redo_stack.clear();
3125 crate::llm::permissions::clear_session_grants(id);
3126 }
3127 state.last_accessed = Instant::now();
3128 Ok(removed)
3129 })
3130}
3131
3132pub fn list_workspace_roots(id: &str) -> Result<(PathBuf, Vec<MountedRoot>), String> {
3133 SESSIONS.with(|s| {
3134 let map = s.borrow();
3135 let Some(state) = map.get(id) else {
3136 return Err(format!("agent session '{id}' does not exist"));
3137 };
3138 let Some(anchor) = state.workspace_anchor.as_ref() else {
3139 return Err(format!("agent session '{id}' has no workspace anchor"));
3140 };
3141 Ok((anchor.primary.clone(), anchor.additional_roots.clone()))
3142 })
3143}
3144
3145fn validate_workspace_root_path(root: &str) -> Result<PathBuf, String> {
3146 let normalized = normalize_workspace_root_path(root);
3147 let canonical = std::fs::canonicalize(&normalized)
3148 .map_err(|error| format!("workspace root '{root}' must exist and be readable: {error}"))?;
3149 let metadata = std::fs::metadata(&canonical)
3150 .map_err(|error| format!("workspace root '{root}' must exist and be readable: {error}"))?;
3151 if !metadata.is_dir() {
3152 return Err(format!("workspace root '{root}' must be a directory"));
3153 }
3154 std::fs::read_dir(&canonical)
3155 .map_err(|error| format!("workspace root '{root}' must be readable: {error}"))?;
3156 Ok(canonical)
3157}
3158
3159fn normalize_workspace_root_path(root: &str) -> PathBuf {
3160 let absolute = crate::stdlib::process::normalize_context_path(Path::new(root));
3161 std::fs::canonicalize(&absolute).unwrap_or(absolute)
3162}
3163
3164fn empty_transcript(id: &str) -> VmValue {
3165 use crate::llm::helpers::new_transcript_with;
3166 new_transcript_with(Some(id.to_string()), Vec::new(), None, None)
3167}
3168
3169fn clone_transcript_with_id(transcript: &VmValue, new_id: &str) -> VmValue {
3170 let Some(dict) = transcript.as_dict() else {
3171 return empty_transcript(new_id);
3172 };
3173 let mut next = dict.clone();
3174 next.insert(
3175 "id".to_string(),
3176 VmValue::String(std::sync::Arc::from(new_id.to_string())),
3177 );
3178 VmValue::Dict(std::sync::Arc::new(next))
3179}
3180
3181fn clone_transcript_with_parent(transcript: &VmValue, parent_id: &str) -> VmValue {
3182 let Some(dict) = transcript.as_dict() else {
3183 return transcript.clone();
3184 };
3185 let mut next = dict.clone();
3186 let metadata = match next.get("metadata") {
3187 Some(VmValue::Dict(metadata)) => {
3188 let mut metadata = metadata.as_ref().clone();
3189 metadata.insert(
3190 "parent_session_id".to_string(),
3191 VmValue::String(std::sync::Arc::from(parent_id.to_string())),
3192 );
3193 VmValue::Dict(std::sync::Arc::new(metadata))
3194 }
3195 _ => VmValue::Dict(std::sync::Arc::new(BTreeMap::from([(
3196 "parent_session_id".to_string(),
3197 VmValue::String(std::sync::Arc::from(parent_id.to_string())),
3198 )]))),
3199 };
3200 next.insert("metadata".to_string(), metadata);
3201 VmValue::Dict(std::sync::Arc::new(next))
3202}
3203
3204fn apply_system_prompt_metadata(next: &mut BTreeMap<String, VmValue>, system_prompt: &str) {
3205 let mut metadata = match next.get("metadata") {
3206 Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
3207 _ => BTreeMap::new(),
3208 };
3209 metadata.insert(
3210 "system_prompt".to_string(),
3211 crate::stdlib::json_to_vm_value(&crate::llm::helpers::system_prompt_metadata(
3212 system_prompt,
3213 )),
3214 );
3215 next.insert(
3216 "metadata".to_string(),
3217 VmValue::Dict(std::sync::Arc::new(metadata)),
3218 );
3219}
3220
3221fn transcript_with_session_metadata(transcript: VmValue, state: &SessionState) -> VmValue {
3222 let Some(dict) = transcript.as_dict() else {
3223 return transcript;
3224 };
3225 let mut next = dict.clone();
3226 let mut metadata = match next.get("metadata") {
3227 Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
3228 _ => BTreeMap::new(),
3229 };
3230 if let Some(tool_format) = state.tool_format.as_ref() {
3231 metadata.insert(
3232 "tool_format".to_string(),
3233 VmValue::String(std::sync::Arc::from(tool_format.clone())),
3234 );
3235 metadata.insert("tool_mode_locked".to_string(), VmValue::Bool(true));
3236 }
3237 if let Some(system_prompt) = state.system_prompt.as_ref() {
3238 metadata.insert(
3239 "system_prompt".to_string(),
3240 crate::stdlib::json_to_vm_value(&crate::llm::helpers::system_prompt_metadata(
3241 system_prompt,
3242 )),
3243 );
3244 }
3245 if let Some(anchor) = state.workspace_anchor.as_ref() {
3246 metadata.insert(
3247 WORKSPACE_ANCHOR_METADATA_KEY.to_string(),
3248 anchor.to_vm_value(),
3249 );
3250 } else {
3251 metadata.remove(WORKSPACE_ANCHOR_METADATA_KEY);
3252 }
3253 if let Some(scratchpad) = state.scratchpad.as_ref() {
3254 metadata.insert("agent_scratchpad".to_string(), scratchpad.clone());
3255 metadata.insert(
3256 "agent_scratchpad_version".to_string(),
3257 VmValue::Int(state.scratchpad_version as i64),
3258 );
3259 } else {
3260 metadata.remove("agent_scratchpad");
3261 metadata.remove("agent_scratchpad_version");
3262 }
3263 if let Some(last_action) = state.last_transcript_budget_action.as_ref() {
3264 let usage = transcript_usage(
3265 &VmValue::Dict(std::sync::Arc::new(next.clone())),
3266 state.transcript_budget_policy.max_approx_bytes.is_some(),
3267 );
3268 metadata.insert(
3269 "transcript_budget".to_string(),
3270 crate::stdlib::json_to_vm_value(&serde_json::json!({
3271 "policy": transcript_budget_policy_json(&state.transcript_budget_policy.normalized()),
3272 "usage": transcript_budget_usage_json(&usage),
3273 "last_action": last_action,
3274 })),
3275 );
3276 }
3277 if !metadata.is_empty() {
3278 next.insert(
3279 "metadata".to_string(),
3280 VmValue::Dict(std::sync::Arc::new(metadata)),
3281 );
3282 }
3283 VmValue::Dict(std::sync::Arc::new(next))
3284}
3285
3286fn session_snapshot(state: &SessionState) -> VmValue {
3287 let transcript = transcript_with_session_metadata(state.transcript.clone(), state);
3288 let Some(dict) = transcript.as_dict() else {
3289 return state.transcript.clone();
3290 };
3291 let mut next = dict.clone();
3292 let length = next
3293 .get("messages")
3294 .and_then(|value| match value {
3295 VmValue::List(list) => Some(list.len() as i64),
3296 _ => None,
3297 })
3298 .unwrap_or(0);
3299 next.insert("length".to_string(), VmValue::Int(length));
3300 next.insert(
3301 "created_at".to_string(),
3302 VmValue::String(std::sync::Arc::from(state.created_at.clone())),
3303 );
3304 next.insert(
3305 "parent_id".to_string(),
3306 state
3307 .parent_id
3308 .as_ref()
3309 .map(|id| VmValue::String(std::sync::Arc::from(id.clone())))
3310 .unwrap_or(VmValue::Nil),
3311 );
3312 next.insert(
3313 "child_ids".to_string(),
3314 VmValue::List(std::sync::Arc::new(
3315 state
3316 .child_ids
3317 .iter()
3318 .cloned()
3319 .map(|id| VmValue::String(std::sync::Arc::from(id)))
3320 .collect(),
3321 )),
3322 );
3323 next.insert(
3324 "branched_at_event_index".to_string(),
3325 state
3326 .branched_at_event_index
3327 .map(|index| VmValue::Int(index as i64))
3328 .unwrap_or(VmValue::Nil),
3329 );
3330 next.insert(
3331 "system_prompt".to_string(),
3332 state
3333 .system_prompt
3334 .as_ref()
3335 .map(|prompt| VmValue::String(std::sync::Arc::from(prompt.clone())))
3336 .unwrap_or(VmValue::Nil),
3337 );
3338 next.insert(
3339 "tool_format".to_string(),
3340 state
3341 .tool_format
3342 .as_ref()
3343 .map(|format| VmValue::String(std::sync::Arc::from(format.clone())))
3344 .unwrap_or(VmValue::Nil),
3345 );
3346 next.insert(
3347 "pinned_model".to_string(),
3348 state
3349 .pinned_model
3350 .as_ref()
3351 .map(|model| VmValue::String(std::sync::Arc::from(model.clone())))
3352 .unwrap_or(VmValue::Nil),
3353 );
3354 next.insert(
3355 "pinned_reasoning_policy".to_string(),
3356 state
3357 .pinned_reasoning_policy
3358 .as_ref()
3359 .map(|policy| VmValue::String(std::sync::Arc::from(policy.clone())))
3360 .unwrap_or(VmValue::Nil),
3361 );
3362 next.insert(
3363 "scratchpad".to_string(),
3364 state.scratchpad.clone().unwrap_or(VmValue::Nil),
3365 );
3366 next.insert(
3367 "scratchpad_version".to_string(),
3368 VmValue::Int(state.scratchpad_version as i64),
3369 );
3370 next.insert(
3371 "workspace_anchor".to_string(),
3372 state
3373 .workspace_anchor
3374 .as_ref()
3375 .map(WorkspaceAnchor::to_vm_value)
3376 .unwrap_or(VmValue::Nil),
3377 );
3378 next.insert(
3379 "workspace_policy".to_string(),
3380 state.workspace_policy.to_vm_value(),
3381 );
3382 next.insert(
3383 "live_clients".to_string(),
3384 crate::stdlib::json_to_vm_value(&serde_json::Value::Array(
3385 state.live_clients.values().map(live_client_json).collect(),
3386 )),
3387 );
3388 next.insert(
3389 "live_controller_id".to_string(),
3390 state
3391 .live_controller_id
3392 .as_ref()
3393 .map(|id| VmValue::String(std::sync::Arc::from(id.clone())))
3394 .unwrap_or(VmValue::Nil),
3395 );
3396 next.insert(
3397 "completed_turn_checkpoint_count".to_string(),
3398 VmValue::Int(state.completed_turn_checkpoints.len() as i64),
3399 );
3400 next.insert(
3401 "redo_checkpoint_count".to_string(),
3402 VmValue::Int(state.redo_stack.len() as i64),
3403 );
3404 VmValue::Dict(std::sync::Arc::new(next))
3405}
3406
3407fn update_lineage(
3408 map: &mut HashMap<String, SessionState>,
3409 parent_id: &str,
3410 child_id: &str,
3411 branched_at_event_index: Option<usize>,
3412) {
3413 let old_parent_id = map.get(child_id).and_then(|child| child.parent_id.clone());
3414 if let Some(old_parent_id) = old_parent_id.filter(|old_parent_id| old_parent_id != parent_id) {
3415 if let Some(old_parent) = map.get_mut(&old_parent_id) {
3416 old_parent.child_ids.retain(|id| id != child_id);
3417 old_parent.last_accessed = Instant::now();
3418 }
3419 }
3420 if let Some(parent) = map.get_mut(parent_id) {
3421 parent.last_accessed = Instant::now();
3422 if !parent.child_ids.iter().any(|id| id == child_id) {
3423 parent.child_ids.push(child_id.to_string());
3424 }
3425 }
3426 if let Some(child) = map.get_mut(child_id) {
3427 child.last_accessed = Instant::now();
3428 child.parent_id = Some(parent_id.to_string());
3429 child.branched_at_event_index = branched_at_event_index;
3430 child.transcript = clone_transcript_with_parent(&child.transcript, parent_id);
3431 }
3432}
3433
3434fn branch_event_index(transcript: &VmValue, keep_first: usize) -> usize {
3435 if keep_first == 0 {
3436 return 0;
3437 }
3438 let Some(dict) = transcript.as_dict() else {
3439 return keep_first;
3440 };
3441 let Some(VmValue::List(events)) = dict.get("events") else {
3442 return keep_first;
3443 };
3444 event_prefix_len_for_messages(events, keep_first)
3445}
3446
3447fn event_kind(event: &VmValue) -> Option<String> {
3448 event
3449 .as_dict()
3450 .and_then(|dict| dict.get("kind"))
3451 .map(VmValue::display)
3452}
3453
3454fn event_id(event: &VmValue) -> Option<String> {
3455 event
3456 .as_dict()
3457 .and_then(|dict| dict.get("id"))
3458 .map(VmValue::display)
3459}
3460
3461fn is_turn_event(event: &VmValue) -> bool {
3462 matches!(
3463 event_kind(event).as_deref(),
3464 Some("message" | "tool_result")
3465 )
3466}
3467
3468fn event_prefix_len_for_messages(events: &[VmValue], keep_first: usize) -> usize {
3469 if keep_first == 0 {
3470 return 0;
3471 }
3472 let mut retained_messages = 0usize;
3473 for (index, event) in events.iter().enumerate() {
3474 if is_turn_event(event) {
3475 retained_messages += 1;
3476 if retained_messages == keep_first {
3477 return index + 1;
3478 }
3479 }
3480 }
3481 events.len()
3482}
3483
3484fn turn_event_id_for_count(events: &[VmValue], keep_first: usize) -> Option<String> {
3485 if keep_first == 0 {
3486 return None;
3487 }
3488 let mut retained_messages = 0usize;
3489 for event in events {
3490 if is_turn_event(event) {
3491 retained_messages += 1;
3492 if retained_messages == keep_first {
3493 return event_id(event);
3494 }
3495 }
3496 }
3497 None
3498}
3499
3500#[cfg(test)]
3501#[path = "agent_sessions_tests.rs"]
3502mod tests;