1use crate::value::VmDictExt;
20use std::cell::{Cell, RefCell};
21use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
22use std::future::Future;
23use std::path::{Path, PathBuf};
24use std::sync::{Mutex, OnceLock};
25use std::time::Instant;
26
27use crate::actor_chain::ActorChain;
28use crate::agent_transcript_budget::{
29 apply_transcript_with_budget, transcript_budget_policy_json, transcript_budget_usage_json,
30 transcript_message_count, transcript_usage,
31};
32use crate::runtime_limits::RuntimeLimits;
33use crate::value::VmValue;
34use crate::workspace_anchor::{
35 MountMode, MountedRoot, WorkspaceAnchor, WorkspacePolicy, WORKSPACE_ANCHOR_METADATA_KEY,
36};
37
38const LIVE_CLIENT_EVENT_KIND: &str = "live_session_client";
39const LIVE_CLIENT_PERMISSION_EVENT_KIND: &str = "live_session_permission_route";
40
41pub const DEFAULT_SESSION_CAP: usize = RuntimeLimits::DEFAULT.max_agent_sessions;
44
45pub const DEFAULT_TRANSCRIPT_MESSAGE_CAP: usize = 4096;
49
50pub const DEFAULT_TRANSCRIPT_EVENT_CAP: usize = 32768;
53pub const MAX_SCRATCHPAD_BYTES: usize = 16 * 1024;
54#[cfg(debug_assertions)]
55const CACHE_STABLE_SYSTEM_PROMPT_DIAGNOSTIC: &str = "HARN-CACHE-001";
56
57#[derive(Clone, Debug, PartialEq, Eq)]
58pub enum TranscriptBudgetRecovery {
59 Reject,
60 Trim { keep_last: usize },
61 Compact { keep_last: usize },
62}
63
64#[derive(Clone, Debug, PartialEq, Eq)]
65pub struct SessionTranscriptBudgetPolicy {
66 pub max_messages: usize,
67 pub max_events: usize,
68 pub max_approx_bytes: Option<usize>,
69 pub recovery: TranscriptBudgetRecovery,
70}
71
72impl SessionTranscriptBudgetPolicy {
73 pub fn reject(max_messages: usize, max_events: usize) -> Self {
74 Self {
75 max_messages: max_messages.max(1),
76 max_events: max_events.max(1),
77 max_approx_bytes: None,
78 recovery: TranscriptBudgetRecovery::Reject,
79 }
80 }
81
82 pub fn trim(max_messages: usize, max_events: usize, keep_last: usize) -> Self {
83 Self {
84 max_messages: max_messages.max(1),
85 max_events: max_events.max(1),
86 max_approx_bytes: None,
87 recovery: TranscriptBudgetRecovery::Trim { keep_last },
88 }
89 }
90
91 pub fn compact(max_messages: usize, max_events: usize, keep_last: usize) -> Self {
92 Self {
93 max_messages: max_messages.max(1),
94 max_events: max_events.max(1),
95 max_approx_bytes: None,
96 recovery: TranscriptBudgetRecovery::Compact { keep_last },
97 }
98 }
99
100 pub fn with_max_approx_bytes(mut self, max_approx_bytes: Option<usize>) -> Self {
101 self.max_approx_bytes = max_approx_bytes.map(|limit| limit.max(1));
102 self
103 }
104
105 pub(crate) fn normalized(&self) -> Self {
106 Self {
107 max_messages: self.max_messages.max(1),
108 max_events: self.max_events.max(1),
109 max_approx_bytes: self.max_approx_bytes.map(|limit| limit.max(1)),
110 recovery: self.recovery.clone(),
111 }
112 }
113}
114
115impl Default for SessionTranscriptBudgetPolicy {
116 fn default() -> Self {
117 Self::reject(DEFAULT_TRANSCRIPT_MESSAGE_CAP, DEFAULT_TRANSCRIPT_EVENT_CAP)
118 }
119}
120
121pub struct SessionState {
122 pub id: String,
123 pub transcript: VmValue,
124 pub subscribers: Vec<VmValue>,
125 pub created_at: String,
126 pub last_accessed: Instant,
127 pub parent_id: Option<String>,
128 pub child_ids: Vec<String>,
129 pub branched_at_event_index: Option<usize>,
130 pub actor_chain: Option<ActorChain>,
131 pub active_skills: Vec<String>,
137 pub tool_format: Option<String>,
141 pub system_prompt: Option<String>,
145 pub pinned_model: Option<String>,
151 pub pinned_reasoning_policy: Option<String>,
157 pub workspace_policy: WorkspacePolicy,
160 pub workspace_anchor: Option<WorkspaceAnchor>,
166 pub scratchpad: Option<VmValue>,
169 pub scratchpad_version: u64,
170 pub transcript_budget_policy: SessionTranscriptBudgetPolicy,
171 pub last_transcript_budget_action: Option<serde_json::Value>,
172 pub live_clients: BTreeMap<String, LiveSessionClient>,
173 pub live_controller_id: Option<String>,
174 pub completed_turn_checkpoints: Vec<SessionTurnCheckpoint>,
175 pub redo_stack: Vec<SessionRedoEntry>,
176}
177
178impl SessionState {
179 fn new(id: String) -> Self {
180 let now = Instant::now();
181 let transcript = empty_transcript(&id);
182 Self {
183 id,
184 transcript,
185 subscribers: Vec::new(),
186 created_at: crate::orchestration::now_rfc3339(),
187 last_accessed: now,
188 parent_id: None,
189 child_ids: Vec::new(),
190 branched_at_event_index: None,
191 actor_chain: None,
192 active_skills: Vec::new(),
193 tool_format: None,
194 system_prompt: None,
195 pinned_model: None,
196 pinned_reasoning_policy: None,
197 workspace_policy: WorkspacePolicy::default(),
198 workspace_anchor: None,
199 scratchpad: None,
200 scratchpad_version: 0,
201 transcript_budget_policy: default_transcript_budget_policy(),
202 last_transcript_budget_action: None,
203 live_clients: BTreeMap::new(),
204 live_controller_id: None,
205 completed_turn_checkpoints: Vec::new(),
206 redo_stack: Vec::new(),
207 }
208 }
209
210 fn touch(&mut self) {
211 self.last_accessed = Instant::now();
212 }
213
214 pub(crate) fn replace_transcript(&mut self, transcript: VmValue) {
215 if !crate::values_equal(&self.transcript, &transcript) {
216 self.redo_stack.clear();
217 }
218 self.transcript = transcript;
219 self.touch();
220 }
221}
222
223#[derive(Clone, Debug, PartialEq, Eq)]
224pub enum LiveClientMode {
225 Observer,
226 Controller,
227}
228
229impl LiveClientMode {
230 fn as_str(&self) -> &'static str {
231 match self {
232 Self::Observer => "observer",
233 Self::Controller => "controller",
234 }
235 }
236}
237
238#[derive(Clone, Debug, PartialEq, Eq)]
239pub struct LiveSessionClient {
240 pub client_id: String,
241 pub mode: LiveClientMode,
242 pub attached_at: String,
243 pub last_seen_at: String,
244 pub prompt_injection: bool,
245 pub permission_routing: bool,
246 pub metadata: serde_json::Value,
247}
248
249#[derive(Clone, Debug, PartialEq, Eq)]
250pub struct AttachLiveClient {
251 pub client_id: String,
252 pub mode: LiveClientMode,
253 pub takeover: bool,
254 pub prompt_injection: bool,
255 pub permission_routing: bool,
256 pub metadata: serde_json::Value,
257}
258
259#[derive(Clone, Debug, PartialEq, Eq)]
260pub struct LiveClientChange {
261 pub client: Option<LiveSessionClient>,
262 pub previous_controller_id: Option<String>,
263 pub active_controller_id: Option<String>,
264 pub clients: Vec<LiveSessionClient>,
265}
266
267#[derive(Clone, Debug, PartialEq, Eq)]
268pub struct SessionAncestry {
269 pub parent_id: Option<String>,
270 pub child_ids: Vec<String>,
271 pub root_id: String,
272}
273
274#[derive(Clone, Debug, PartialEq, Eq)]
275pub struct SessionTruncateResult {
276 pub kept_turn_count: usize,
277 pub removed_turn_count: usize,
278 pub new_tip_turn_id: Option<String>,
279}
280
281#[derive(Clone, Debug, PartialEq, Eq)]
282pub struct SessionCheckpointSummary {
283 pub checkpoint_id: String,
284 pub before_message_count: usize,
285 pub after_message_count: usize,
286 pub fs_snapshot_ids: Vec<String>,
287}
288
289#[derive(Clone, Debug)]
290pub struct SessionTurnCheckpoint {
291 pub checkpoint_id: String,
292 pub completed_at: String,
293 pub before_transcript: VmValue,
294 pub after_transcript: VmValue,
295 pub before_message_count: usize,
296 pub after_message_count: usize,
297 pub fs_snapshot_ids: Vec<String>,
298}
299
300#[derive(Clone, Debug)]
301pub struct SessionRedoEntry {
302 pub checkpoint: SessionTurnCheckpoint,
303 pub redo_fs_snapshot_ids: Vec<String>,
304}
305
306#[derive(Clone, Debug, PartialEq, Eq)]
307pub enum SessionCheckpointError {
308 UnknownSession,
309 NoCheckpoint,
310 NoRedo,
311}
312
313#[derive(Clone, Debug, PartialEq, Eq)]
314pub struct SessionCheckpointOutcome {
315 pub status: &'static str,
316 pub checkpoint: SessionCheckpointSummary,
317 pub redo_fs_snapshot_ids: Vec<String>,
318}
319
320#[derive(Clone, Debug, PartialEq, Eq)]
321pub struct ReminderInjectionReport {
322 pub reminder_id: String,
323 pub deduped_count: usize,
324}
325
326thread_local! {
327 static SESSIONS: RefCell<HashMap<String, SessionState>> = RefCell::new(HashMap::new());
328 static SESSION_CAP: Cell<usize> = const { Cell::new(DEFAULT_SESSION_CAP) };
329 static DEFAULT_TRANSCRIPT_BUDGET_POLICY: RefCell<SessionTranscriptBudgetPolicy> =
330 RefCell::new(SessionTranscriptBudgetPolicy::default());
331 static CURRENT_SESSION_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
332 static CURRENT_TOOL_CALL_STACK: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
333}
334
335tokio::task_local! {
336 static CURRENT_TOOL_CALL_TASK: String;
337}
338
339static SESSION_CHANGED_PATHS: OnceLock<Mutex<BTreeMap<String, BTreeSet<String>>>> = OnceLock::new();
349
350fn session_changed_paths_store() -> &'static Mutex<BTreeMap<String, BTreeSet<String>>> {
351 SESSION_CHANGED_PATHS.get_or_init(|| Mutex::new(BTreeMap::new()))
352}
353
354pub fn record_session_changed_path(session_id: &str, path: &str) {
357 if session_id.is_empty() || path.is_empty() {
358 return;
359 }
360 if let Ok(mut store) = session_changed_paths_store().lock() {
361 store
362 .entry(session_id.to_string())
363 .or_default()
364 .insert(path.to_string());
365 }
366}
367
368pub fn session_changed_paths(session_id: &str) -> Vec<String> {
371 session_changed_paths_store()
372 .lock()
373 .ok()
374 .and_then(|store| {
375 store
376 .get(session_id)
377 .map(|set| set.iter().cloned().collect())
378 })
379 .unwrap_or_default()
380}
381
382pub fn take_session_changed_paths(session_id: &str) -> Vec<String> {
386 session_changed_paths_store()
387 .lock()
388 .ok()
389 .and_then(|mut store| store.remove(session_id))
390 .map(|set| set.into_iter().collect())
391 .unwrap_or_default()
392}
393
394pub fn clear_session_changed_paths(session_id: &str) {
396 if let Ok(mut store) = session_changed_paths_store().lock() {
397 store.remove(session_id);
398 }
399}
400
401pub struct CurrentSessionGuard {
402 active: bool,
403}
404
405impl Drop for CurrentSessionGuard {
406 fn drop(&mut self) {
407 if self.active {
408 pop_current_session();
409 }
410 }
411}
412
413pub struct CurrentToolCallGuard {
419 active: bool,
420}
421
422impl Drop for CurrentToolCallGuard {
423 fn drop(&mut self) {
424 if self.active {
425 pop_current_tool_call();
426 }
427 }
428}
429
430pub fn set_session_cap(cap: usize) {
433 SESSION_CAP.with(|c| c.set(cap.max(1)));
434}
435
436pub fn session_cap() -> usize {
437 SESSION_CAP.with(|c| c.get())
438}
439
440pub fn set_default_transcript_budget_policy(policy: SessionTranscriptBudgetPolicy) {
441 DEFAULT_TRANSCRIPT_BUDGET_POLICY.with(|cell| {
442 *cell.borrow_mut() = policy.normalized();
443 });
444}
445
446pub fn reset_default_transcript_budget_policy() {
447 set_default_transcript_budget_policy(SessionTranscriptBudgetPolicy::default());
448}
449
450pub fn default_transcript_budget_policy() -> SessionTranscriptBudgetPolicy {
451 DEFAULT_TRANSCRIPT_BUDGET_POLICY.with(|cell| cell.borrow().clone())
452}
453
454pub fn transcript_budget_policy(id: &str) -> Option<SessionTranscriptBudgetPolicy> {
455 SESSIONS.with(|s| {
456 s.borrow()
457 .get(id)
458 .map(|state| state.transcript_budget_policy.clone())
459 })
460}
461
462pub fn set_transcript_budget_policy(
463 id: &str,
464 policy: SessionTranscriptBudgetPolicy,
465) -> Result<(), String> {
466 SESSIONS.with(|s| {
467 let mut map = s.borrow_mut();
468 let Some(state) = map.get_mut(id) else {
469 return Err(format!("agent session '{id}' does not exist"));
470 };
471 let previous = state.transcript_budget_policy.clone();
472 let previous_action = state.last_transcript_budget_action.clone();
473 state.transcript_budget_policy = policy.normalized();
474 let candidate = state.transcript.clone();
475 if let Err(error) = apply_transcript_with_budget(state, candidate, "policy_update") {
476 state.transcript_budget_policy = previous;
477 state.last_transcript_budget_action = previous_action;
478 return Err(error);
479 }
480 Ok(())
481 })
482}
483
484pub fn reset_session_store() {
486 SESSIONS.with(|s| s.borrow_mut().clear());
487 CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().clear());
488 CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow_mut().clear());
489 reset_default_transcript_budget_policy();
490}
491
492pub(crate) fn push_current_session(id: String) {
493 if id.is_empty() {
494 return;
495 }
496 CURRENT_SESSION_STACK.with(|stack| stack.borrow_mut().push(id));
497}
498
499pub(crate) fn swap_current_session_stack(replacement: Vec<String>) -> Vec<String> {
500 CURRENT_SESSION_STACK.with(|stack| std::mem::replace(&mut *stack.borrow_mut(), replacement))
501}
502
503pub(crate) fn pop_current_session() {
504 CURRENT_SESSION_STACK.with(|stack| {
505 let _ = stack.borrow_mut().pop();
506 });
507}
508
509pub fn current_session_id() -> Option<String> {
510 CURRENT_SESSION_STACK.with(|stack| stack.borrow().last().cloned())
511}
512
513pub fn current_actor_chain() -> Option<ActorChain> {
514 current_session_id().as_deref().and_then(actor_chain)
515}
516
517pub fn enter_current_session(id: impl Into<String>) -> CurrentSessionGuard {
518 let id = id.into();
519 if id.trim().is_empty() {
520 return CurrentSessionGuard { active: false };
521 }
522 push_current_session(id);
523 CurrentSessionGuard { active: true }
524}
525
526pub fn actor_chain(id: &str) -> Option<ActorChain> {
527 SESSIONS.with(|s| {
528 s.borrow()
529 .get(id)
530 .and_then(|state| state.actor_chain.clone())
531 })
532}
533
534pub fn set_actor_chain(id: &str, actor_chain: Option<ActorChain>) -> Result<bool, String> {
535 SESSIONS.with(|s| {
536 let mut map = s.borrow_mut();
537 let Some(state) = map.get_mut(id) else {
538 return Err(format!("agent session '{id}' does not exist"));
539 };
540 let changed = state.actor_chain != actor_chain;
541 state.actor_chain = actor_chain;
542 state.touch();
543 Ok(changed)
544 })
545}
546
547fn push_current_tool_call(id: String) {
548 if id.is_empty() {
549 return;
550 }
551 CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow_mut().push(id));
552}
553
554fn pop_current_tool_call() {
555 CURRENT_TOOL_CALL_STACK.with(|stack| {
556 let _ = stack.borrow_mut().pop();
557 });
558}
559
560pub fn current_tool_call_id() -> Option<String> {
565 if let Ok(id) = CURRENT_TOOL_CALL_TASK.try_with(Clone::clone) {
566 if !id.trim().is_empty() {
567 return Some(id);
568 }
569 }
570 CURRENT_TOOL_CALL_STACK.with(|stack| stack.borrow().last().cloned())
571}
572
573pub async fn scope_current_tool_call<F, T>(id: impl Into<String>, future: F) -> T
579where
580 F: Future<Output = T>,
581{
582 let id = id.into();
583 if id.trim().is_empty() {
584 future.await
585 } else {
586 CURRENT_TOOL_CALL_TASK.scope(id, future).await
587 }
588}
589
590pub fn enter_current_tool_call(id: impl Into<String>) -> CurrentToolCallGuard {
592 let id = id.into();
593 if id.trim().is_empty() {
594 return CurrentToolCallGuard { active: false };
595 }
596 push_current_tool_call(id);
597 CurrentToolCallGuard { active: true }
598}
599
600pub fn exists(id: &str) -> bool {
601 SESSIONS.with(|s| s.borrow().contains_key(id))
602}
603
604pub fn length(id: &str) -> Option<usize> {
605 SESSIONS.with(|s| {
606 s.borrow().get(id).map(|state| {
607 state
608 .transcript
609 .as_dict()
610 .and_then(|d| d.get("messages"))
611 .and_then(|v| match v {
612 VmValue::List(list) => Some(list.len()),
613 _ => None,
614 })
615 .unwrap_or(0)
616 })
617 })
618}
619
620pub fn scratchpad(id: &str) -> Option<VmValue> {
621 SESSIONS.with(|s| {
622 s.borrow()
623 .get(id)
624 .and_then(|state| state.scratchpad.clone())
625 })
626}
627
628pub fn scratchpad_version(id: &str) -> Option<u64> {
629 SESSIONS.with(|s| s.borrow().get(id).map(|state| state.scratchpad_version))
630}
631
632pub fn set_scratchpad(
633 id: &str,
634 scratchpad: VmValue,
635 source: impl Into<String>,
636 reason: Option<String>,
637 metadata: serde_json::Value,
638) -> Result<u64, String> {
639 validate_scratchpad_value(&scratchpad)?;
640 SESSIONS.with(|s| {
641 let mut map = s.borrow_mut();
642 let Some(state) = map.get_mut(id) else {
643 return Err(format!("agent session '{id}' does not exist"));
644 };
645 let version = state.scratchpad_version.saturating_add(1);
646 let event = scratchpad_transcript_event(
647 "set",
648 version,
649 Some(&scratchpad),
650 source.into(),
651 reason,
652 metadata,
653 );
654 append_event_to_state(state, event, "set_scratchpad")?;
655 state.scratchpad = Some(scratchpad);
656 state.scratchpad_version = version;
657 state.touch();
658 Ok(version)
659 })
660}
661
662pub fn clear_scratchpad(
663 id: &str,
664 source: impl Into<String>,
665 reason: Option<String>,
666 metadata: serde_json::Value,
667) -> Result<u64, String> {
668 SESSIONS.with(|s| {
669 let mut map = s.borrow_mut();
670 let Some(state) = map.get_mut(id) else {
671 return Err(format!("agent session '{id}' does not exist"));
672 };
673 let version = state.scratchpad_version.saturating_add(1);
674 let event =
675 scratchpad_transcript_event("clear", version, None, source.into(), reason, metadata);
676 append_event_to_state(state, event, "clear_scratchpad")?;
677 state.scratchpad = None;
678 state.scratchpad_version = version;
679 state.touch();
680 Ok(version)
681 })
682}
683
684fn validate_scratchpad_value(value: &VmValue) -> Result<(), String> {
685 if !matches!(value, VmValue::Dict(_)) {
686 return Err("agent session scratchpad must be a dict".to_string());
687 }
688 let json = crate::llm::helpers::vm_value_to_json(value);
689 let approx_bytes = serde_json::to_vec(&json)
690 .map(|bytes| bytes.len())
691 .unwrap_or(usize::MAX);
692 if approx_bytes > MAX_SCRATCHPAD_BYTES {
693 return Err(format!(
694 "agent session scratchpad is {approx_bytes} bytes; max is {MAX_SCRATCHPAD_BYTES}"
695 ));
696 }
697 Ok(())
698}
699
700fn scratchpad_transcript_event(
701 action: &str,
702 version: u64,
703 scratchpad: Option<&VmValue>,
704 source: String,
705 reason: Option<String>,
706 metadata: serde_json::Value,
707) -> VmValue {
708 let scratchpad_json = scratchpad.map(crate::llm::helpers::vm_value_to_json);
709 let approx_bytes = scratchpad_json
710 .as_ref()
711 .and_then(|value| serde_json::to_vec(value).ok().map(|bytes| bytes.len()))
712 .unwrap_or(0);
713 let event_metadata = serde_json::json!({
714 "action": action,
715 "version": version,
716 "source": normalize_scratchpad_source(source),
717 "reason": reason.unwrap_or_default(),
718 "approx_bytes": approx_bytes,
719 "counts": scratchpad_json
720 .as_ref()
721 .map(scratchpad_counts_json)
722 .unwrap_or_else(|| serde_json::json!({})),
723 "metadata": metadata,
724 });
725 let content = format!("Agent scratchpad {action}");
726 crate::llm::helpers::transcript_event(
727 "agent_scratchpad",
728 "system",
729 "internal",
730 &content,
731 Some(event_metadata),
732 )
733}
734
735fn normalize_scratchpad_source(source: String) -> String {
736 let trimmed = source.trim();
737 if trimmed.is_empty() {
738 "harn.agent_scratchpad".to_string()
739 } else {
740 trimmed.to_string()
741 }
742}
743
744fn scratchpad_counts_json(value: &serde_json::Value) -> serde_json::Value {
745 serde_json::json!({
746 "goals": scratchpad_array_len(value, "goals"),
747 "open_items": scratchpad_array_len(value, "open_items"),
748 "facts": scratchpad_array_len(value, "facts"),
749 "refs": scratchpad_array_len(value, "refs"),
750 })
751}
752
753fn scratchpad_array_len(value: &serde_json::Value, key: &str) -> usize {
754 value
755 .get(key)
756 .and_then(serde_json::Value::as_array)
757 .map(Vec::len)
758 .unwrap_or(0)
759}
760
761pub fn snapshot(id: &str) -> Option<VmValue> {
762 SESSIONS.with(|s| s.borrow().get(id).map(session_snapshot))
763}
764
765pub fn transcript(id: &str) -> Option<VmValue> {
767 SESSIONS.with(|s| {
768 s.borrow()
769 .get(id)
770 .map(|state| transcript_with_session_metadata(state.transcript.clone(), state))
771 })
772}
773
774pub fn open_or_create(id: Option<String>) -> String {
783 open_or_create_with_actor_chain(id, None)
784}
785
786pub fn open_or_create_with_actor_chain(
787 id: Option<String>,
788 requested_actor_chain: Option<ActorChain>,
789) -> String {
790 let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
791 let parent_session = current_session_id();
792 let inherited_actor_chain = requested_actor_chain
793 .clone()
794 .or_else(|| parent_session.as_deref().and_then(actor_chain));
795 let mut was_new = false;
796 SESSIONS.with(|s| {
797 let mut map = s.borrow_mut();
798 if let Some(state) = map.get_mut(&resolved) {
799 if let Some(actor_chain) = requested_actor_chain.clone() {
800 state.actor_chain = Some(actor_chain);
801 }
802 state.touch();
803 return;
804 }
805 was_new = true;
806 let cap = SESSION_CAP.with(|c| c.get());
807 if map.len() >= cap {
808 if let Some(victim) = map
809 .iter()
810 .min_by_key(|(_, state)| state.last_accessed)
811 .map(|(id, _)| id.clone())
812 {
813 map.remove(&victim);
814 }
815 }
816 let mut state = SessionState::new(resolved.clone());
817 state.actor_chain = inherited_actor_chain.clone();
818 map.insert(resolved.clone(), state);
819 });
820 if was_new {
821 if let Some(parent) = parent_session.as_deref() {
822 crate::agent_events::mirror_session_sinks(parent, &resolved);
823 }
824 try_register_event_log(&resolved);
825 }
826 resolved
827}
828
829pub fn open_child_session(parent_id: &str, id: Option<String>) -> String {
830 open_child_session_with_actor(parent_id, id, None)
831}
832
833pub fn open_child_session_with_actor(
834 parent_id: &str,
835 id: Option<String>,
836 actor: Option<&str>,
837) -> String {
838 let actor_chain = actor_chain(parent_id).map(|chain| match actor {
839 Some(actor) if !actor.trim().is_empty() => chain.pushed(actor.trim()),
840 _ => chain,
841 });
842 let resolved = open_or_create_with_actor_chain(id, actor_chain);
843 link_child_session(parent_id, &resolved);
844 resolved
845}
846
847pub fn link_child_session(parent_id: &str, child_id: &str) {
848 link_child_session_with_branch(parent_id, child_id, None);
849}
850
851pub fn link_child_session_with_branch(
852 parent_id: &str,
853 child_id: &str,
854 branched_at_event_index: Option<usize>,
855) {
856 if parent_id == child_id {
857 return;
858 }
859 open_or_create(Some(parent_id.to_string()));
860 open_or_create(Some(child_id.to_string()));
861 SESSIONS.with(|s| {
862 let mut map = s.borrow_mut();
863 update_lineage(&mut map, parent_id, child_id, branched_at_event_index);
864 });
865}
866
867pub fn parent_id(id: &str) -> Option<String> {
868 SESSIONS.with(|s| s.borrow().get(id).and_then(|state| state.parent_id.clone()))
869}
870
871pub fn child_ids(id: &str) -> Vec<String> {
872 SESSIONS.with(|s| {
873 s.borrow()
874 .get(id)
875 .map(|state| state.child_ids.clone())
876 .unwrap_or_default()
877 })
878}
879
880pub fn ancestry(id: &str) -> Option<SessionAncestry> {
881 SESSIONS.with(|s| {
882 let map = s.borrow();
883 let state = map.get(id)?;
884 let mut root_id = state.id.clone();
885 let mut cursor = state.parent_id.clone();
886 let mut seen = HashSet::from([state.id.clone()]);
887 while let Some(parent_id) = cursor {
888 if !seen.insert(parent_id.clone()) {
889 break;
890 }
891 root_id = parent_id.clone();
892 cursor = map
893 .get(&parent_id)
894 .and_then(|parent| parent.parent_id.clone());
895 }
896 Some(SessionAncestry {
897 parent_id: state.parent_id.clone(),
898 child_ids: state.child_ids.clone(),
899 root_id,
900 })
901 })
902}
903
904pub fn live_clients(id: &str) -> Option<Vec<LiveSessionClient>> {
905 SESSIONS.with(|s| {
906 s.borrow()
907 .get(id)
908 .map(|state| state.live_clients.values().cloned().collect())
909 })
910}
911
912pub fn attach_live_client(id: &str, request: AttachLiveClient) -> Result<LiveClientChange, String> {
913 SESSIONS.with(|s| {
914 let mut map = s.borrow_mut();
915 let Some(state) = map.get_mut(id) else {
916 return Err(format!("agent session '{id}' does not exist"));
917 };
918 let client_id = validate_live_client_id(request.client_id)?;
919 let now = crate::orchestration::now_rfc3339();
920 let previous_clients = state.live_clients.clone();
921 let previous_controller_id = state.live_controller_id.clone();
922
923 if request.mode == LiveClientMode::Controller {
924 let conflicting_controller = previous_controller_id
925 .as_ref()
926 .filter(|controller_id| *controller_id != &client_id)
927 .filter(|controller_id| state.live_clients.contains_key(*controller_id));
928 if let Some(previous) = conflicting_controller {
929 if !request.takeover {
930 return Err(format!("live session already has controller '{previous}'"));
931 }
932 if let Some(previous_client) = state.live_clients.get_mut(previous) {
933 previous_client.mode = LiveClientMode::Observer;
934 previous_client.prompt_injection = false;
935 previous_client.permission_routing = false;
936 previous_client.last_seen_at = now.clone();
937 }
938 }
939 state.live_controller_id = Some(client_id.clone());
940 } else if state.live_controller_id.as_deref() == Some(client_id.as_str()) {
941 state.live_controller_id = None;
942 }
943
944 let attached_at = state
945 .live_clients
946 .get(&client_id)
947 .map(|client| client.attached_at.clone())
948 .unwrap_or_else(|| now.clone());
949 let client = LiveSessionClient {
950 client_id: client_id.clone(),
951 mode: request.mode,
952 attached_at,
953 last_seen_at: now,
954 prompt_injection: request.prompt_injection,
955 permission_routing: request.permission_routing,
956 metadata: request.metadata,
957 };
958 state.live_clients.insert(client_id, client.clone());
959 state.touch();
960 let active_controller_id = state.live_controller_id.clone();
961 append_live_client_event(
962 state,
963 "attached",
964 Some(&client),
965 previous_controller_id.as_deref(),
966 active_controller_id.as_deref(),
967 serde_json::Value::Null,
968 )
969 .inspect_err(|_error| {
970 state.live_clients = previous_clients;
971 state.live_controller_id = previous_controller_id.clone();
972 })?;
973 Ok(live_client_change(
974 Some(client),
975 previous_controller_id,
976 state,
977 ))
978 })
979}
980
981pub fn takeover_live_client(
982 id: &str,
983 client_id: impl Into<String>,
984 metadata: serde_json::Value,
985) -> Result<LiveClientChange, String> {
986 attach_live_client(
987 id,
988 AttachLiveClient {
989 client_id: client_id.into(),
990 mode: LiveClientMode::Controller,
991 takeover: true,
992 prompt_injection: true,
993 permission_routing: true,
994 metadata,
995 },
996 )
997}
998
999pub fn detach_live_client(
1000 id: &str,
1001 client_id: impl Into<String>,
1002 reason: Option<String>,
1003 metadata: serde_json::Value,
1004) -> Result<LiveClientChange, String> {
1005 SESSIONS.with(|s| {
1006 let mut map = s.borrow_mut();
1007 let Some(state) = map.get_mut(id) else {
1008 return Err(format!("agent session '{id}' does not exist"));
1009 };
1010 let client_id = validate_live_client_id(client_id.into())?;
1011 let previous_clients = state.live_clients.clone();
1012 let previous_controller_id = state.live_controller_id.clone();
1013 let Some(mut client) = state.live_clients.remove(&client_id) else {
1014 return Err(format!("live client '{client_id}' is not attached"));
1015 };
1016 client.last_seen_at = crate::orchestration::now_rfc3339();
1017 if state.live_controller_id.as_deref() == Some(client_id.as_str()) {
1018 state.live_controller_id = None;
1019 }
1020 state.touch();
1021 let active_controller_id = state.live_controller_id.clone();
1022 append_live_client_event(
1023 state,
1024 "detached",
1025 Some(&client),
1026 previous_controller_id.as_deref(),
1027 active_controller_id.as_deref(),
1028 serde_json::json!({
1029 "reason": reason.unwrap_or_else(|| "client_detached".to_string()),
1030 "metadata": metadata,
1031 }),
1032 )
1033 .inspect_err(|_error| {
1034 state.live_clients = previous_clients;
1035 state.live_controller_id = previous_controller_id.clone();
1036 })?;
1037 Ok(live_client_change(None, previous_controller_id, state))
1038 })
1039}
1040
1041pub fn heartbeat_live_client(
1042 id: &str,
1043 client_id: impl Into<String>,
1044 metadata: serde_json::Value,
1045) -> Result<LiveClientChange, String> {
1046 SESSIONS.with(|s| {
1047 let mut map = s.borrow_mut();
1048 let Some(state) = map.get_mut(id) else {
1049 return Err(format!("agent session '{id}' does not exist"));
1050 };
1051 let client_id = validate_live_client_id(client_id.into())?;
1052 let previous_clients = state.live_clients.clone();
1053 let previous_controller_id = state.live_controller_id.clone();
1054 let Some(client) = state.live_clients.get_mut(&client_id) else {
1055 return Err(format!("live client '{client_id}' is not attached"));
1056 };
1057 client.last_seen_at = crate::orchestration::now_rfc3339();
1058 if !metadata.is_null() {
1059 client.metadata = metadata.clone();
1060 }
1061 let client = client.clone();
1062 state.touch();
1063 let active_controller_id = state.live_controller_id.clone();
1064 append_live_client_event(
1065 state,
1066 "heartbeat",
1067 Some(&client),
1068 previous_controller_id.as_deref(),
1069 active_controller_id.as_deref(),
1070 serde_json::json!({ "metadata": metadata }),
1071 )
1072 .inspect_err(|_error| {
1073 state.live_clients = previous_clients;
1074 state.live_controller_id = previous_controller_id.clone();
1075 })?;
1076 Ok(live_client_change(
1077 Some(client),
1078 previous_controller_id,
1079 state,
1080 ))
1081 })
1082}
1083
1084pub fn inject_prompt_from_live_client(
1085 id: &str,
1086 client_id: impl Into<String>,
1087 content: VmValue,
1088 metadata: serde_json::Value,
1089) -> Result<(), String> {
1090 let client_id = validate_live_client_id(client_id.into())?;
1091 ensure_live_controller(id, &client_id, LiveControllerCapability::PromptInjection)?;
1092 let mut message = BTreeMap::new();
1093 message.put_str("role", "user");
1094 message.insert("content".to_string(), content);
1095 message.insert(
1096 "metadata".to_string(),
1097 crate::stdlib::json_to_vm_value(&serde_json::json!({
1098 "live_session": {
1099 "client_id": client_id,
1100 "mode": "controller",
1101 "source": "live_session_attach",
1102 "metadata": metadata,
1103 }
1104 })),
1105 );
1106 inject_message(id, VmValue::dict(message))
1107}
1108
1109pub fn route_live_permission_request(
1110 id: &str,
1111 client_id: impl Into<String>,
1112 request: serde_json::Value,
1113 metadata: serde_json::Value,
1114) -> Result<serde_json::Value, String> {
1115 let client_id = validate_live_client_id(client_id.into())?;
1116 let client =
1117 ensure_live_controller(id, &client_id, LiveControllerCapability::PermissionRouting)?;
1118 let request_id = request
1119 .get("id")
1120 .or_else(|| request.get("request_id"))
1121 .and_then(serde_json::Value::as_str)
1122 .filter(|id| !id.trim().is_empty())
1123 .unwrap_or("permission_request");
1124 let event_metadata = serde_json::json!({
1125 "action": "permission_routed",
1126 "client": live_client_json(&client),
1127 "request_id": request_id,
1128 "request": request,
1129 "metadata": metadata,
1130 });
1131 let event = crate::llm::helpers::transcript_event(
1132 LIVE_CLIENT_PERMISSION_EVENT_KIND,
1133 "system",
1134 "internal",
1135 "Live session permission request routed",
1136 Some(event_metadata.clone()),
1137 );
1138 append_event(id, event)?;
1139 Ok(event_metadata)
1140}
1141
1142enum LiveControllerCapability {
1143 PromptInjection,
1144 PermissionRouting,
1145}
1146
1147fn ensure_live_controller(
1148 id: &str,
1149 client_id: &str,
1150 capability: LiveControllerCapability,
1151) -> Result<LiveSessionClient, String> {
1152 SESSIONS.with(|s| {
1153 let map = s.borrow();
1154 let Some(state) = map.get(id) else {
1155 return Err(format!("agent session '{id}' does not exist"));
1156 };
1157 if state.live_controller_id.as_deref() != Some(client_id) {
1158 return Err(format!(
1159 "live client '{client_id}' is not the active controller"
1160 ));
1161 }
1162 let Some(client) = state.live_clients.get(client_id) else {
1163 return Err(format!("live client '{client_id}' is not attached"));
1164 };
1165 match capability {
1166 LiveControllerCapability::PromptInjection if !client.prompt_injection => Err(format!(
1167 "live client '{client_id}' cannot inject prompts for this session"
1168 )),
1169 LiveControllerCapability::PermissionRouting if !client.permission_routing => Err(
1170 format!("live client '{client_id}' cannot route permissions for this session"),
1171 ),
1172 _ => Ok(client.clone()),
1173 }
1174 })
1175}
1176
1177fn append_live_client_event(
1178 state: &mut SessionState,
1179 action: &str,
1180 client: Option<&LiveSessionClient>,
1181 previous_controller_id: Option<&str>,
1182 active_controller_id: Option<&str>,
1183 extra: serde_json::Value,
1184) -> Result<(), String> {
1185 let metadata = serde_json::json!({
1186 "action": action,
1187 "session_id": state.id,
1188 "client": client.map(live_client_json),
1189 "previous_controller_id": previous_controller_id,
1190 "active_controller_id": active_controller_id,
1191 "clients": state
1192 .live_clients
1193 .values()
1194 .map(live_client_json)
1195 .collect::<Vec<_>>(),
1196 "extra": extra,
1197 });
1198 let event = crate::llm::helpers::transcript_event(
1199 LIVE_CLIENT_EVENT_KIND,
1200 "system",
1201 "internal",
1202 "Live session client lifecycle changed",
1203 Some(metadata),
1204 );
1205 append_event_to_state(state, event, "live_client")
1206}
1207
1208fn live_client_change(
1209 client: Option<LiveSessionClient>,
1210 previous_controller_id: Option<String>,
1211 state: &SessionState,
1212) -> LiveClientChange {
1213 LiveClientChange {
1214 client,
1215 previous_controller_id,
1216 active_controller_id: state.live_controller_id.clone(),
1217 clients: state.live_clients.values().cloned().collect(),
1218 }
1219}
1220
1221fn validate_live_client_id(id: impl Into<String>) -> Result<String, String> {
1222 let id = id.into();
1223 let trimmed = id.trim();
1224 if trimmed.is_empty() {
1225 return Err("live client id cannot be empty".to_string());
1226 }
1227 Ok(trimmed.to_string())
1228}
1229
1230pub fn live_client_json(client: &LiveSessionClient) -> serde_json::Value {
1231 serde_json::json!({
1232 "client_id": client.client_id,
1233 "mode": client.mode.as_str(),
1234 "attached_at": client.attached_at,
1235 "last_seen_at": client.last_seen_at,
1236 "prompt_injection": client.prompt_injection,
1237 "permission_routing": client.permission_routing,
1238 "metadata": client.metadata,
1239 })
1240}
1241
1242pub fn live_client_change_json(change: &LiveClientChange) -> serde_json::Value {
1243 serde_json::json!({
1244 "client": change.client.as_ref().map(live_client_json),
1245 "previous_controller_id": change.previous_controller_id,
1246 "active_controller_id": change.active_controller_id,
1247 "clients": change
1248 .clients
1249 .iter()
1250 .map(live_client_json)
1251 .collect::<Vec<_>>(),
1252 })
1253}
1254
1255fn try_register_event_log(session_id: &str) {
1259 if let Some(log) = crate::event_log::active_event_log() {
1260 crate::agent_events::register_sink(
1261 session_id,
1262 crate::agent_events::EventLogSink::new(log, session_id),
1263 );
1264 return;
1265 }
1266 let Ok(dir) = std::env::var("HARN_EVENT_LOG_DIR") else {
1267 return;
1268 };
1269 if dir.is_empty() {
1270 return;
1271 }
1272 let path = std::path::PathBuf::from(dir).join(format!("event_log-{session_id}.jsonl"));
1273 if let Ok(sink) = crate::agent_events::JsonlEventSink::open(&path) {
1274 crate::agent_events::register_sink(session_id, sink);
1275 }
1276}
1277
1278pub fn register_event_log_sink(session_id: &str) {
1279 try_register_event_log(session_id);
1280}
1281
1282pub fn close(id: &str) {
1283 SESSIONS.with(|s| {
1284 s.borrow_mut().remove(id);
1285 });
1286 crate::orchestration::agent_inbox::clear_session(id);
1290 crate::agent_events::clear_session_sinks(id);
1291}
1292
1293pub fn close_with_status(
1294 id: &str,
1295 reason: impl Into<String>,
1296 status: impl Into<String>,
1297 metadata: serde_json::Value,
1298) -> bool {
1299 if !exists(id) {
1300 return false;
1301 }
1302 let reason = reason.into();
1303 let status = status.into();
1304 let event_metadata = serde_json::json!({
1305 "reason": reason,
1306 "status": status,
1307 "metadata": metadata,
1308 });
1309 let transcript_event = crate::llm::helpers::transcript_event(
1310 "agent_session_closed",
1311 "system",
1312 "internal",
1313 "Agent session closed",
1314 Some(event_metadata),
1315 );
1316 let _ = append_event(id, transcript_event);
1317 crate::llm::emit_live_agent_event_sync(&crate::agent_events::AgentEvent::SessionClosed {
1318 session_id: id.to_string(),
1319 reason,
1320 status,
1321 metadata,
1322 });
1323 close(id);
1324 true
1325}
1326
1327pub fn reset_transcript(id: &str) -> bool {
1328 SESSIONS.with(|s| {
1329 let mut map = s.borrow_mut();
1330 let Some(state) = map.get_mut(id) else {
1331 return false;
1332 };
1333 state.transcript = empty_transcript(id);
1334 state.tool_format = None;
1335 state.system_prompt = None;
1336 state.scratchpad = None;
1337 state.scratchpad_version = 0;
1338 state.last_transcript_budget_action = None;
1339 state.completed_turn_checkpoints.clear();
1340 state.redo_stack.clear();
1341 state.touch();
1342 true
1343 })
1344}
1345
1346pub fn fork(src_id: &str, dst_id: Option<String>) -> Option<String> {
1353 let (
1354 src_transcript,
1355 src_tool_format,
1356 src_system_prompt,
1357 src_pinned_model,
1358 src_pinned_reasoning_policy,
1359 src_actor_chain,
1360 src_workspace_anchor,
1361 src_workspace_policy,
1362 src_scratchpad,
1363 src_scratchpad_version,
1364 src_transcript_budget_policy,
1365 src_last_transcript_budget_action,
1366 dst,
1367 ) = SESSIONS.with(|s| {
1368 let mut map = s.borrow_mut();
1369 let src = map.get_mut(src_id)?;
1370 src.touch();
1371 let dst = dst_id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
1372 let forked_transcript = clone_transcript_with_id(&src.transcript, &dst);
1373 Some((
1374 forked_transcript,
1375 src.tool_format.clone(),
1376 src.system_prompt.clone(),
1377 src.pinned_model.clone(),
1378 src.pinned_reasoning_policy.clone(),
1379 src.actor_chain.clone(),
1380 src.workspace_anchor.clone(),
1381 src.workspace_policy.clone(),
1382 src.scratchpad.clone(),
1383 src.scratchpad_version,
1384 src.transcript_budget_policy.clone(),
1385 src.last_transcript_budget_action.clone(),
1386 dst,
1387 ))
1388 })?;
1389 open_or_create(Some(dst.clone()));
1391 SESSIONS.with(|s| {
1392 let mut map = s.borrow_mut();
1393 if let Some(state) = map.get_mut(&dst) {
1394 state.transcript = src_transcript;
1395 state.tool_format = src_tool_format;
1396 state.system_prompt = src_system_prompt;
1397 state.pinned_model = src_pinned_model;
1398 state.pinned_reasoning_policy = src_pinned_reasoning_policy;
1399 state.actor_chain = src_actor_chain;
1400 state.workspace_anchor = src_workspace_anchor;
1401 state.workspace_policy = src_workspace_policy;
1402 state.scratchpad = src_scratchpad;
1403 state.scratchpad_version = src_scratchpad_version;
1404 state.transcript_budget_policy = src_transcript_budget_policy;
1405 state.last_transcript_budget_action = src_last_transcript_budget_action;
1406 state.touch();
1407 }
1408 update_lineage(&mut map, src_id, &dst, None);
1409 });
1410 let budget_ok = SESSIONS.with(|s| {
1411 let mut map = s.borrow_mut();
1412 let Some(state) = map.get_mut(&dst) else {
1413 return false;
1414 };
1415 let candidate = state.transcript.clone();
1416 apply_transcript_with_budget(state, candidate, "fork").is_ok()
1417 });
1418 if !budget_ok {
1419 close(&dst);
1420 return None;
1421 }
1422 if exists(&dst) {
1426 Some(dst)
1427 } else {
1428 None
1429 }
1430}
1431
1432pub fn fork_at(src_id: &str, keep_first: usize, dst_id: Option<String>) -> Option<String> {
1443 let branched_at_event_index = SESSIONS.with(|s| {
1444 let map = s.borrow();
1445 let src = map.get(src_id)?;
1446 Some(branch_event_index(&src.transcript, keep_first))
1447 })?;
1448 let new_id = fork(src_id, dst_id)?;
1449 link_child_session_with_branch(src_id, &new_id, Some(branched_at_event_index));
1450 let _ = truncate(&new_id, keep_first);
1451 Some(new_id)
1452}
1453
1454pub fn truncate(id: &str, keep_first: usize) -> Option<SessionTruncateResult> {
1458 SESSIONS.with(|s| {
1459 let mut map = s.borrow_mut();
1460 let state = map.get_mut(id)?;
1461 let result = truncate_state(state, keep_first)?;
1462 Some(result)
1463 })
1464}
1465
1466fn truncate_state(state: &mut SessionState, keep_first: usize) -> Option<SessionTruncateResult> {
1467 let dict = state
1468 .transcript
1469 .as_dict()
1470 .cloned()
1471 .unwrap_or_else(crate::value::DictMap::new);
1472 let messages: Vec<VmValue> = match dict.get("messages") {
1473 Some(VmValue::List(list)) => list.iter().cloned().collect(),
1474 _ => Vec::new(),
1475 };
1476 let existing_events = match dict.get("events") {
1477 Some(VmValue::List(list)) => Some(list.iter().cloned().collect::<Vec<_>>()),
1478 _ => None,
1479 };
1480 let kept_turn_count = keep_first.min(messages.len());
1481 let removed_turn_count = messages.len().saturating_sub(kept_turn_count);
1482 let mut new_tip_turn_id = existing_events
1483 .as_ref()
1484 .map(|events| turn_event_id_for_count(events, kept_turn_count))
1485 .unwrap_or_else(|| {
1486 let events = crate::llm::helpers::transcript_events_from_messages(&messages);
1487 turn_event_id_for_count(&events, kept_turn_count)
1488 });
1489
1490 if removed_turn_count > 0 {
1491 let retained: Vec<VmValue> = messages.into_iter().take(kept_turn_count).collect();
1492 let retained_events = match existing_events {
1493 Some(events) => {
1494 let keep_event_count = event_prefix_len_for_messages(&events, kept_turn_count);
1495 events.into_iter().take(keep_event_count).collect()
1496 }
1497 None => crate::llm::helpers::transcript_events_from_messages(&retained),
1498 };
1499 new_tip_turn_id = turn_event_id_for_count(&retained_events, kept_turn_count);
1500 let mut next = dict;
1501 next.insert(
1502 crate::value::intern_key("events"),
1503 VmValue::List(std::sync::Arc::new(retained_events)),
1504 );
1505 next.insert(
1506 crate::value::intern_key("messages"),
1507 VmValue::List(std::sync::Arc::new(retained)),
1508 );
1509 next.remove("summary");
1510 apply_transcript_with_budget(state, VmValue::dict(next), "truncate").ok()?;
1511 }
1512 state.touch();
1513 Some(SessionTruncateResult {
1514 kept_turn_count,
1515 removed_turn_count,
1516 new_tip_turn_id,
1517 })
1518}
1519
1520pub fn pop_last_if_assistant(id: &str) -> Result<bool, String> {
1527 SESSIONS.with(|s| {
1528 let mut map = s.borrow_mut();
1529 let Some(state) = map.get_mut(id) else {
1530 return Err(format!(
1531 "pop_last_if_assistant: unknown session id '{id}'"
1532 ));
1533 };
1534 let messages: Vec<VmValue> = match state.transcript.as_dict() {
1535 Some(dict) => match dict.get("messages") {
1536 Some(VmValue::List(list)) => list.iter().cloned().collect(),
1537 _ => Vec::new(),
1538 },
1539 None => Vec::new(),
1540 };
1541 if messages.is_empty() {
1542 return Ok(false);
1543 }
1544 let trailing_role = messages
1545 .last()
1546 .and_then(|m| m.as_dict())
1547 .and_then(|d| d.get("role"))
1548 .map(|v| v.display())
1549 .unwrap_or_default();
1550 if trailing_role != "assistant" {
1551 return Err(format!(
1552 "pop_last_if_assistant: trailing message role is '{trailing_role}', expected 'assistant'"
1553 ));
1554 }
1555 let keep = messages.len() - 1;
1556 truncate_state(state, keep);
1557 Ok(true)
1558 })
1559}
1560
1561pub fn trim(id: &str, keep_last: usize) -> Option<usize> {
1562 SESSIONS.with(|s| {
1563 let mut map = s.borrow_mut();
1564 let state = map.get_mut(id)?;
1565 let dict = state.transcript.as_dict()?.clone();
1566 let messages: Vec<VmValue> = match dict.get("messages") {
1567 Some(VmValue::List(list)) => list.iter().cloned().collect(),
1568 _ => Vec::new(),
1569 };
1570 let start = messages.len().saturating_sub(keep_last);
1571 let retained: Vec<VmValue> = messages.into_iter().skip(start).collect();
1572 let kept = retained.len();
1573 let mut next = dict;
1574 next.insert(
1575 crate::value::intern_key("events"),
1576 VmValue::List(std::sync::Arc::new(
1577 crate::llm::helpers::transcript_events_from_messages(&retained),
1578 )),
1579 );
1580 next.insert(
1581 crate::value::intern_key("messages"),
1582 VmValue::List(std::sync::Arc::new(retained)),
1583 );
1584 apply_transcript_with_budget(state, VmValue::dict(next), "trim").ok()?;
1585 Some(kept)
1586 })
1587}
1588
1589pub fn inject_message(id: &str, message: VmValue) -> Result<(), String> {
1592 let Some(msg_dict) = message.as_dict().cloned() else {
1593 return Err("agent_session_inject: message must be a dict".into());
1594 };
1595 let role_ok = matches!(msg_dict.get("role"), Some(VmValue::String(_)));
1596 if !role_ok {
1597 return Err(
1598 "agent_session_inject: message must have a string `role` (user|assistant|tool_result|system)"
1599 .into(),
1600 );
1601 }
1602 SESSIONS.with(|s| {
1603 let mut map = s.borrow_mut();
1604 let Some(state) = map.get_mut(id) else {
1605 return Err(format!("agent_session_inject: unknown session id '{id}'"));
1606 };
1607 let dict = state
1608 .transcript
1609 .as_dict()
1610 .cloned()
1611 .unwrap_or_else(crate::value::DictMap::new);
1612 let mut messages: Vec<VmValue> = match dict.get("messages") {
1613 Some(VmValue::List(list)) => list.iter().cloned().collect(),
1614 _ => Vec::new(),
1615 };
1616 let mut events: Vec<VmValue> = match dict.get("events") {
1617 Some(VmValue::List(list)) => list.iter().cloned().collect(),
1618 _ => crate::llm::helpers::transcript_events_from_messages(&messages),
1619 };
1620 let new_message = VmValue::dict(msg_dict);
1621 let message_index = messages.len();
1622 events.push(crate::llm::helpers::transcript_event_from_message(
1623 &new_message,
1624 ));
1625 messages.push(new_message);
1626 let mut next = dict;
1627 next.insert(
1628 crate::value::intern_key("events"),
1629 VmValue::List(std::sync::Arc::new(events)),
1630 );
1631 next.insert(
1632 crate::value::intern_key("messages"),
1633 VmValue::List(std::sync::Arc::new(messages)),
1634 );
1635 let persisted_message = next
1636 .get("messages")
1637 .and_then(|value| match value {
1638 VmValue::List(list) => list.get(message_index).cloned(),
1639 _ => None,
1640 })
1641 .unwrap_or(VmValue::Nil);
1642 apply_transcript_with_budget(state, VmValue::dict(next), "inject_message")?;
1643 emit_identified_user_message_event(id, &persisted_message);
1644 emit_llm_message_event(id, message_index, &persisted_message);
1645 Ok(())
1646 })
1647}
1648
1649fn emit_identified_user_message_event(session_id: &str, message: &VmValue) {
1650 let message_json = crate::llm::helpers::vm_value_to_json(message);
1651 let role = message_json.get("role").and_then(|value| value.as_str());
1652 if role != Some("user") {
1653 return;
1654 }
1655 let Some(message_id) = message_json
1656 .get("messageId")
1657 .or_else(|| message_json.get("message_id"))
1658 .and_then(|value| value.as_str())
1659 .filter(|value| !value.trim().is_empty())
1660 else {
1661 return;
1662 };
1663 let content = message_json
1664 .get("content")
1665 .map(user_message_content_blocks)
1666 .unwrap_or_default();
1667 crate::agent_events::emit_event(&crate::agent_events::AgentEvent::UserMessage {
1668 session_id: session_id.to_string(),
1669 message_id: message_id.to_string(),
1670 content,
1671 });
1672}
1673
1674fn user_message_content_blocks(content: &serde_json::Value) -> Vec<serde_json::Value> {
1675 match content {
1676 serde_json::Value::Array(items) => items.clone(),
1677 serde_json::Value::String(text) => vec![serde_json::json!({
1678 "type": "text",
1679 "text": text,
1680 })],
1681 other => vec![serde_json::json!({
1682 "type": "text",
1683 "text": other.to_string(),
1684 })],
1685 }
1686}
1687
1688fn emit_llm_message_event(session_id: &str, message_index: usize, message: &VmValue) {
1689 let mut fields = serde_json::Map::new();
1690 fields.insert(
1691 "session_id".to_string(),
1692 serde_json::Value::String(session_id.to_string()),
1693 );
1694 fields.insert(
1695 "message_index".to_string(),
1696 serde_json::json!(message_index),
1697 );
1698 let message_json = crate::llm::helpers::vm_value_to_json(message);
1699 if let Some(role) = message_json.get("role").and_then(|value| value.as_str()) {
1700 fields.insert(
1701 "role".to_string(),
1702 serde_json::Value::String(role.to_string()),
1703 );
1704 }
1705 if let Some(content) = message_json.get("content") {
1706 fields.insert("content".to_string(), content.clone());
1707 }
1708 fields.insert("message".to_string(), message_json);
1709 crate::llm::append_observability_sidecar_entry("message", fields);
1710}
1711
1712pub fn seed_from_messages(
1718 id: Option<String>,
1719 messages: &[serde_json::Value],
1720 metadata: serde_json::Value,
1721 system_prompt: Option<String>,
1722 tool_format: Option<String>,
1723) -> Result<String, String> {
1724 let resolved = id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
1725 if exists(&resolved) {
1726 return Err(format!("agent session '{resolved}' already exists"));
1727 }
1728 open_or_create(Some(resolved.clone()));
1729 SESSIONS.with(|s| {
1730 let mut map = s.borrow_mut();
1731 let Some(state) = map.get_mut(&resolved) else {
1732 return Err(format!("failed to create agent session '{resolved}'"));
1733 };
1734 state.tool_format = tool_format.filter(|value| !value.trim().is_empty());
1735 state.system_prompt = system_prompt.filter(|value| !value.trim().is_empty());
1736
1737 let mut metadata = metadata
1738 .as_object()
1739 .cloned()
1740 .unwrap_or_else(serde_json::Map::new);
1741 if let Some(tool_format) = state.tool_format.as_ref() {
1742 metadata.insert(
1743 "tool_format".to_string(),
1744 serde_json::Value::String(tool_format.clone()),
1745 );
1746 metadata.insert(
1747 "tool_mode_locked".to_string(),
1748 serde_json::Value::Bool(true),
1749 );
1750 }
1751 if let Some(system_prompt) = state.system_prompt.as_ref() {
1752 metadata.insert(
1753 "system_prompt".to_string(),
1754 crate::llm::helpers::system_prompt_metadata(system_prompt),
1755 );
1756 }
1757 let vm_messages = crate::llm::helpers::json_messages_to_vm(messages);
1758 let candidate = crate::llm::helpers::new_transcript_with(
1759 Some(resolved.clone()),
1760 vm_messages,
1761 None,
1762 Some(crate::stdlib::json_to_vm_value(&serde_json::Value::Object(
1763 metadata,
1764 ))),
1765 );
1766 apply_transcript_with_budget(state, candidate, "seed_from_messages")?;
1767 Ok(resolved)
1768 })
1769}
1770
1771pub fn messages_json(id: &str) -> Vec<serde_json::Value> {
1775 SESSIONS.with(|s| {
1776 let map = s.borrow();
1777 let Some(state) = map.get(id) else {
1778 return Vec::new();
1779 };
1780 let Some(dict) = state.transcript.as_dict() else {
1781 return Vec::new();
1782 };
1783 match dict.get("messages") {
1784 Some(VmValue::List(list)) => list
1785 .iter()
1786 .map(crate::llm::helpers::vm_value_to_json)
1787 .collect(),
1788 _ => Vec::new(),
1789 }
1790 })
1791}
1792
1793#[derive(Clone, Debug, Default)]
1794pub struct SessionPromptState {
1795 pub messages: Vec<serde_json::Value>,
1796 pub summary: Option<String>,
1797}
1798
1799fn summary_message_json(summary: &str) -> serde_json::Value {
1800 serde_json::json!({
1801 "role": "user",
1802 "content": summary,
1803 })
1804}
1805
1806fn messages_begin_with_summary(messages: &[serde_json::Value], summary: &str) -> bool {
1807 messages.first().is_some_and(|message| {
1808 message.get("role").and_then(|value| value.as_str()) == Some("user")
1809 && message.get("content").and_then(|value| value.as_str()) == Some(summary)
1810 })
1811}
1812
1813pub fn prompt_state_json(id: &str) -> SessionPromptState {
1821 SESSIONS.with(|s| {
1822 let map = s.borrow();
1823 let Some(state) = map.get(id) else {
1824 return SessionPromptState::default();
1825 };
1826 let Some(dict) = state.transcript.as_dict() else {
1827 return SessionPromptState::default();
1828 };
1829 let mut messages = match dict.get("messages") {
1830 Some(VmValue::List(list)) => list
1831 .iter()
1832 .map(crate::llm::helpers::vm_value_to_json)
1833 .collect::<Vec<_>>(),
1834 _ => Vec::new(),
1835 };
1836 let summary = dict.get("summary").and_then(|value| match value {
1837 VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
1838 _ => None,
1839 });
1840 if let Some(summary_text) = summary.as_deref() {
1841 if !messages_begin_with_summary(&messages, summary_text) {
1842 messages.insert(0, summary_message_json(summary_text));
1843 }
1844 }
1845 SessionPromptState { messages, summary }
1846 })
1847}
1848
1849pub fn store_transcript(id: &str, transcript: VmValue) -> Result<(), String> {
1852 SESSIONS.with(|s| {
1853 let mut map = s.borrow_mut();
1854 let Some(state) = map.get_mut(id) else {
1855 return Err(format!(
1856 "agent_session_store_transcript: unknown session id '{id}'"
1857 ));
1858 };
1859 let transcript = transcript_with_session_metadata(transcript, state);
1860 apply_transcript_with_budget(state, transcript, "store_transcript")?;
1861 Ok(())
1862 })
1863}
1864
1865fn checkpoint_summary(checkpoint: &SessionTurnCheckpoint) -> SessionCheckpointSummary {
1866 SessionCheckpointSummary {
1867 checkpoint_id: checkpoint.checkpoint_id.clone(),
1868 before_message_count: checkpoint.before_message_count,
1869 after_message_count: checkpoint.after_message_count,
1870 fs_snapshot_ids: checkpoint.fs_snapshot_ids.clone(),
1871 }
1872}
1873
1874fn checkpoint_error_status(error: SessionCheckpointError) -> &'static str {
1875 match error {
1876 SessionCheckpointError::UnknownSession => "unknown_session",
1877 SessionCheckpointError::NoCheckpoint => "no_checkpoint",
1878 SessionCheckpointError::NoRedo => "no_redo",
1879 }
1880}
1881
1882pub fn checkpoint_status_name(error: SessionCheckpointError) -> &'static str {
1883 checkpoint_error_status(error)
1884}
1885
1886pub fn invalidate_redo(id: &str) -> bool {
1889 SESSIONS.with(|s| {
1890 let mut map = s.borrow_mut();
1891 let Some(state) = map.get_mut(id) else {
1892 return false;
1893 };
1894 let had_redo = !state.redo_stack.is_empty();
1895 state.redo_stack.clear();
1896 state.touch();
1897 had_redo
1898 })
1899}
1900
1901pub fn record_completed_turn_checkpoint(
1908 id: &str,
1909 before_transcript: VmValue,
1910 fs_snapshot_ids: Vec<String>,
1911) -> Result<Option<SessionCheckpointSummary>, SessionCheckpointError> {
1912 SESSIONS.with(|s| {
1913 let mut map = s.borrow_mut();
1914 let Some(state) = map.get_mut(id) else {
1915 return Err(SessionCheckpointError::UnknownSession);
1916 };
1917 let after_transcript = transcript_with_session_metadata(state.transcript.clone(), state);
1918 let before_message_count = transcript_message_count(&before_transcript);
1919 let after_message_count = transcript_message_count(&after_transcript);
1920 if crate::values_equal(&before_transcript, &after_transcript) && fs_snapshot_ids.is_empty()
1921 {
1922 return Ok(None);
1923 }
1924 let checkpoint = SessionTurnCheckpoint {
1925 checkpoint_id: format!("turn_{}", uuid::Uuid::now_v7().simple()),
1926 completed_at: crate::orchestration::now_rfc3339(),
1927 before_message_count,
1928 after_message_count,
1929 before_transcript,
1930 after_transcript,
1931 fs_snapshot_ids,
1932 };
1933 state.redo_stack.clear();
1934 state.completed_turn_checkpoints.push(checkpoint.clone());
1935 state.touch();
1936 Ok(Some(checkpoint_summary(&checkpoint)))
1937 })
1938}
1939
1940pub fn rollback_plan(id: &str) -> Result<SessionCheckpointSummary, SessionCheckpointError> {
1941 SESSIONS.with(|s| {
1942 let map = s.borrow();
1943 let Some(state) = map.get(id) else {
1944 return Err(SessionCheckpointError::UnknownSession);
1945 };
1946 state
1947 .completed_turn_checkpoints
1948 .last()
1949 .map(checkpoint_summary)
1950 .ok_or(SessionCheckpointError::NoCheckpoint)
1951 })
1952}
1953
1954pub fn redo_plan(id: &str) -> Result<SessionCheckpointSummary, SessionCheckpointError> {
1955 SESSIONS.with(|s| {
1956 let map = s.borrow();
1957 let Some(state) = map.get(id) else {
1958 return Err(SessionCheckpointError::UnknownSession);
1959 };
1960 state
1961 .redo_stack
1962 .last()
1963 .map(|entry| {
1964 let mut summary = checkpoint_summary(&entry.checkpoint);
1965 summary.fs_snapshot_ids = entry.redo_fs_snapshot_ids.clone();
1966 summary
1967 })
1968 .ok_or(SessionCheckpointError::NoRedo)
1969 })
1970}
1971
1972pub fn rollback_last_completed_turn(
1973 id: &str,
1974 redo_fs_snapshot_ids: Vec<String>,
1975) -> Result<SessionCheckpointOutcome, SessionCheckpointError> {
1976 SESSIONS.with(|s| {
1977 let mut map = s.borrow_mut();
1978 let Some(state) = map.get_mut(id) else {
1979 return Err(SessionCheckpointError::UnknownSession);
1980 };
1981 let Some(checkpoint) = state.completed_turn_checkpoints.pop() else {
1982 return Err(SessionCheckpointError::NoCheckpoint);
1983 };
1984 state.transcript = checkpoint.before_transcript.clone();
1985 state.redo_stack.push(SessionRedoEntry {
1986 checkpoint: checkpoint.clone(),
1987 redo_fs_snapshot_ids: redo_fs_snapshot_ids.clone(),
1988 });
1989 state.touch();
1990 Ok(SessionCheckpointOutcome {
1991 status: "rolled_back",
1992 checkpoint: checkpoint_summary(&checkpoint),
1993 redo_fs_snapshot_ids,
1994 })
1995 })
1996}
1997
1998pub fn redo_last_rollback(id: &str) -> Result<SessionCheckpointOutcome, SessionCheckpointError> {
1999 SESSIONS.with(|s| {
2000 let mut map = s.borrow_mut();
2001 let Some(state) = map.get_mut(id) else {
2002 return Err(SessionCheckpointError::UnknownSession);
2003 };
2004 let Some(entry) = state.redo_stack.pop() else {
2005 return Err(SessionCheckpointError::NoRedo);
2006 };
2007 let checkpoint = entry.checkpoint;
2008 state.transcript = checkpoint.after_transcript.clone();
2009 state.completed_turn_checkpoints.push(checkpoint.clone());
2010 state.touch();
2011 Ok(SessionCheckpointOutcome {
2012 status: "redone",
2013 checkpoint: checkpoint_summary(&checkpoint),
2014 redo_fs_snapshot_ids: entry.redo_fs_snapshot_ids,
2015 })
2016 })
2017}
2018
2019pub fn prune_invalid_reminder_events(id: &str) -> usize {
2023 SESSIONS.with(|s| {
2024 let mut map = s.borrow_mut();
2025 let Some(state) = map.get_mut(id) else {
2026 return 0;
2027 };
2028 let Some(dict) = state.transcript.as_dict().cloned() else {
2029 return 0;
2030 };
2031 let Some(VmValue::List(events)) = dict.get("events") else {
2032 return 0;
2033 };
2034 let mut pruned = 0_usize;
2035 let mut kept = Vec::with_capacity(events.len());
2036 for event in events.iter().cloned() {
2037 let is_reminder = event
2038 .as_dict()
2039 .and_then(|event| event.get("kind"))
2040 .map(VmValue::display)
2041 .as_deref()
2042 == Some(crate::llm::helpers::SYSTEM_REMINDER_EVENT_KIND);
2043 if !is_reminder {
2044 kept.push(event);
2045 continue;
2046 }
2047 let valid = crate::llm::helpers::reminder_from_event(&event)
2048 .is_some_and(|reminder| !reminder.body.trim().is_empty());
2049 if valid {
2050 kept.push(event);
2051 } else {
2052 pruned += 1;
2053 }
2054 }
2055 if pruned > 0 {
2056 let mut next = dict;
2057 next.insert(
2058 crate::value::intern_key("events"),
2059 VmValue::List(std::sync::Arc::new(kept)),
2060 );
2061 let _ = apply_transcript_with_budget(
2062 state,
2063 VmValue::dict(next),
2064 "prune_invalid_reminder_events",
2065 );
2066 state.touch();
2067 }
2068 pruned
2069 })
2070}
2071
2072pub fn apply_reminder_post_turn(id: &str, turn: i64) -> Result<serde_json::Value, String> {
2077 let report = SESSIONS.with(|s| {
2078 let mut map = s.borrow_mut();
2079 let Some(state) = map.get_mut(id) else {
2080 return Err(format!(
2081 "agent_session_apply_reminder_post_turn: unknown session id '{id}'"
2082 ));
2083 };
2084 let report = crate::llm::helpers::apply_reminder_post_turn(&state.transcript, turn);
2085 if report.decremented_count > 0 || !report.expired.is_empty() {
2086 if let Some(next) = report.transcript.clone() {
2087 apply_transcript_with_budget(state, next, "apply_reminder_post_turn")?;
2088 }
2089 state.touch();
2090 }
2091 Ok(report)
2092 })?;
2093
2094 for reminder in &report.expired {
2095 let mut payload = crate::llm::helpers::reminder_lifecycle_payload(Some(id), reminder);
2096 if let Some(obj) = payload.as_object_mut() {
2097 obj.insert(
2098 "transcript_id".to_string(),
2099 serde_json::Value::String(id.to_string()),
2100 );
2101 obj.insert(
2102 "reason".to_string(),
2103 serde_json::Value::String("ttl".to_string()),
2104 );
2105 obj.insert(
2106 "ttl_turns_before".to_string(),
2107 serde_json::json!(&reminder.ttl_turns),
2108 );
2109 obj.insert("expired_at_turn".to_string(), serde_json::json!(turn));
2110 }
2111 crate::llm::helpers::emit_reminder_lifecycle_event(
2112 crate::llm::helpers::REMINDER_EXPIRED_EVENT_KIND,
2113 payload,
2114 );
2115 }
2116
2117 Ok(serde_json::json!({
2118 "expired_count": report.expired.len(),
2119 "decremented_count": report.decremented_count,
2120 "remaining_count": report.remaining_count,
2121 }))
2122}
2123
2124pub fn inject_reminder(
2129 id: &str,
2130 reminder: crate::llm::helpers::SystemReminder,
2131) -> Result<ReminderInjectionReport, String> {
2132 let reminder_id = reminder.id.clone();
2133 let dedupe_key = reminder.dedupe_key.clone();
2134 let mut deduped_reminder_ids = Vec::new();
2135 SESSIONS.with(|s| {
2136 let mut map = s.borrow_mut();
2137 let Some(state) = map.get_mut(id) else {
2138 return Err(format!(
2139 "agent_session_inject_reminder: unknown session id '{id}'"
2140 ));
2141 };
2142 let dict = state
2143 .transcript
2144 .as_dict()
2145 .cloned()
2146 .unwrap_or_else(crate::value::DictMap::new);
2147 let mut events: Vec<VmValue> = match dict.get("events") {
2148 Some(VmValue::List(list)) => list.iter().cloned().collect(),
2149 _ => dict
2150 .get("messages")
2151 .and_then(|value| match value {
2152 VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
2153 _ => None,
2154 })
2155 .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
2156 .unwrap_or_default(),
2157 };
2158 if let Some(expected_key) = dedupe_key.as_deref() {
2159 events.retain(|event| {
2160 let Some(existing) = crate::llm::helpers::reminder_from_event(event) else {
2161 return true;
2162 };
2163 if existing.dedupe_key.as_deref() == Some(expected_key) {
2164 deduped_reminder_ids.push(existing.id);
2165 false
2166 } else {
2167 true
2168 }
2169 });
2170 }
2171 events.push(crate::llm::helpers::transcript_reminder_event(&reminder));
2172 let mut next = dict;
2173 next.insert(
2174 crate::value::intern_key("events"),
2175 VmValue::List(std::sync::Arc::new(events)),
2176 );
2177 apply_transcript_with_budget(state, VmValue::dict(next), "inject_reminder")?;
2178 state.touch();
2179 Ok(())
2180 })?;
2181
2182 if !deduped_reminder_ids.is_empty() {
2183 let dropped_count = deduped_reminder_ids.len();
2184 crate::llm::helpers::emit_reminder_lifecycle_event(
2185 crate::llm::helpers::REMINDER_DEDUPED_EVENT_KIND,
2186 serde_json::json!({
2187 "session_id": id,
2188 "transcript_id": id,
2189 "reminder_id": &reminder_id,
2190 "replacing_id": &reminder_id,
2191 "replaced_id": deduped_reminder_ids.first(),
2192 "replaced_ids": &deduped_reminder_ids,
2193 "dedupe_key": &dedupe_key,
2194 "dropped_reminder_ids": &deduped_reminder_ids,
2195 "dropped_count": dropped_count,
2196 }),
2197 );
2198 }
2199
2200 crate::llm::helpers::emit_reminder_lifecycle_event(
2201 crate::llm::helpers::REMINDER_INJECTED_EVENT_KIND,
2202 crate::llm::helpers::reminder_lifecycle_payload(Some(id), &reminder),
2203 );
2204
2205 Ok(ReminderInjectionReport {
2206 reminder_id,
2207 deduped_count: deduped_reminder_ids.len(),
2208 })
2209}
2210
2211pub fn append_event(id: &str, event: VmValue) -> Result<(), String> {
2217 let Some(event_dict) = event.as_dict() else {
2218 return Err("agent_session_append_event: event must be a dict".into());
2219 };
2220 let kind_ok = matches!(event_dict.get("kind"), Some(VmValue::String(_)));
2221 if !kind_ok {
2222 return Err("agent_session_append_event: event must have a string `kind`".into());
2223 }
2224 SESSIONS.with(|s| {
2225 let mut map = s.borrow_mut();
2226 let Some(state) = map.get_mut(id) else {
2227 return Err(format!(
2228 "agent_session_append_event: unknown session id '{id}'"
2229 ));
2230 };
2231 append_event_to_state(state, event, "append_event")?;
2232 Ok(())
2233 })
2234}
2235
2236fn append_event_to_state(
2237 state: &mut SessionState,
2238 event: VmValue,
2239 action: &str,
2240) -> Result<(), String> {
2241 let dict = state
2242 .transcript
2243 .as_dict()
2244 .cloned()
2245 .unwrap_or_else(crate::value::DictMap::new);
2246 let mut events: Vec<VmValue> = match dict.get("events") {
2247 Some(VmValue::List(list)) => list.iter().cloned().collect(),
2248 _ => dict
2249 .get("messages")
2250 .and_then(|value| match value {
2251 VmValue::List(list) => Some(list.iter().cloned().collect::<Vec<_>>()),
2252 _ => None,
2253 })
2254 .map(|messages| crate::llm::helpers::transcript_events_from_messages(&messages))
2255 .unwrap_or_default(),
2256 };
2257 events.push(event);
2258 let mut next = dict;
2259 next.insert(
2260 crate::value::intern_key("events"),
2261 VmValue::List(std::sync::Arc::new(events)),
2262 );
2263 apply_transcript_with_budget(state, VmValue::dict(next), action)
2264}
2265
2266pub fn replace_messages(id: &str, messages: &[serde_json::Value]) -> Result<(), String> {
2269 replace_messages_with_summary(id, messages, None)
2270}
2271
2272pub fn replace_messages_with_summary(
2277 id: &str,
2278 messages: &[serde_json::Value],
2279 summary: Option<&str>,
2280) -> Result<(), String> {
2281 SESSIONS.with(|s| {
2282 let mut map = s.borrow_mut();
2283 let Some(state) = map.get_mut(id) else {
2284 return Err(format!(
2285 "agent_session_replace_messages: unknown session id '{id}'"
2286 ));
2287 };
2288 let dict = state
2289 .transcript
2290 .as_dict()
2291 .cloned()
2292 .unwrap_or_else(crate::value::DictMap::new);
2293 let vm_messages: Vec<VmValue> = messages
2294 .iter()
2295 .map(crate::stdlib::json_to_vm_value)
2296 .collect();
2297 let mut next = dict;
2298 next.insert(
2299 crate::value::intern_key("events"),
2300 VmValue::List(std::sync::Arc::new(
2301 crate::llm::helpers::transcript_events_from_messages(&vm_messages),
2302 )),
2303 );
2304 next.insert(
2305 crate::value::intern_key("messages"),
2306 VmValue::List(std::sync::Arc::new(vm_messages)),
2307 );
2308 if let Some(summary) = summary {
2309 next.put_str("summary", summary);
2310 } else {
2311 next.remove("summary");
2312 }
2313 apply_transcript_with_budget(state, VmValue::dict(next), "replace_messages")?;
2314 Ok(())
2315 })
2316}
2317
2318pub fn append_subscriber(id: &str, callback: VmValue) {
2319 open_or_create(Some(id.to_string()));
2320 SESSIONS.with(|s| {
2321 if let Some(state) = s.borrow_mut().get_mut(id) {
2322 state.subscribers.push(callback);
2323 state.touch();
2324 }
2325 });
2326}
2327
2328pub fn subscribers_for(id: &str) -> Vec<VmValue> {
2329 SESSIONS.with(|s| {
2330 s.borrow()
2331 .get(id)
2332 .map(|state| state.subscribers.clone())
2333 .unwrap_or_default()
2334 })
2335}
2336
2337pub fn subscriber_count(id: &str) -> usize {
2338 SESSIONS.with(|s| {
2339 s.borrow()
2340 .get(id)
2341 .map(|state| state.subscribers.len())
2342 .unwrap_or(0)
2343 })
2344}
2345
2346pub fn set_active_skills(id: &str, skills: Vec<String>) {
2350 SESSIONS.with(|s| {
2351 if let Some(state) = s.borrow_mut().get_mut(id) {
2352 state.active_skills = skills;
2353 state.touch();
2354 }
2355 });
2356}
2357
2358pub fn active_skills(id: &str) -> Vec<String> {
2362 SESSIONS.with(|s| {
2363 s.borrow()
2364 .get(id)
2365 .map(|state| state.active_skills.clone())
2366 .unwrap_or_default()
2367 })
2368}
2369
2370pub fn claim_tool_format(id: &str, tool_format: &str) -> Result<(), String> {
2376 let tool_format = tool_format.trim();
2377 if tool_format.is_empty() {
2378 return Ok(());
2379 }
2380 SESSIONS.with(|s| {
2381 let mut map = s.borrow_mut();
2382 let Some(state) = map.get_mut(id) else {
2383 return Err(format!("agent session '{id}' does not exist"));
2384 };
2385 match state.tool_format.as_deref() {
2386 Some(existing) if existing != tool_format => Err(format!(
2387 "agent session '{id}' is locked to tool_format='{existing}', but this run requested tool_format='{tool_format}'. Start a new session or fork/reset the transcript before changing tool mode."
2388 )),
2389 Some(_) => {
2390 state.touch();
2391 Ok(())
2392 }
2393 None => {
2394 state.tool_format = Some(tool_format.to_string());
2395 state.touch();
2396 Ok(())
2397 }
2398 }
2399 })
2400}
2401
2402pub fn tool_format(id: &str) -> Option<String> {
2403 SESSIONS.with(|s| {
2404 s.borrow()
2405 .get(id)
2406 .and_then(|state| state.tool_format.clone())
2407 })
2408}
2409
2410pub fn record_system_prompt(id: &str, system_prompt: &str) -> Result<(), String> {
2411 let system_prompt = system_prompt.trim();
2412 if system_prompt.is_empty() {
2413 return Ok(());
2414 }
2415 assert_cache_stable_system_prompt(system_prompt);
2416 SESSIONS.with(|s| {
2417 let mut map = s.borrow_mut();
2418 let Some(state) = map.get_mut(id) else {
2419 return Err(format!("agent session '{id}' does not exist"));
2420 };
2421 let changed = state.system_prompt.as_deref() != Some(system_prompt);
2422 state.system_prompt = Some(system_prompt.to_string());
2423 let dict = state
2424 .transcript
2425 .as_dict()
2426 .cloned()
2427 .unwrap_or_else(crate::value::DictMap::new);
2428 let mut next = dict;
2429 apply_system_prompt_metadata(&mut next, system_prompt);
2430 if changed {
2431 let mut events: Vec<VmValue> = match next.get("events") {
2432 Some(VmValue::List(list)) => list.iter().cloned().collect(),
2433 _ => Vec::new(),
2434 };
2435 events.push(crate::llm::helpers::transcript_event(
2436 "system_prompt",
2437 "system",
2438 "internal",
2439 "",
2440 Some(crate::llm::helpers::system_prompt_event_metadata(
2441 system_prompt,
2442 )),
2443 ));
2444 next.insert(
2445 crate::value::intern_key("events"),
2446 VmValue::List(std::sync::Arc::new(events)),
2447 );
2448 }
2449 apply_transcript_with_budget(state, VmValue::dict(next), "record_system_prompt")?;
2450 Ok(())
2451 })
2452}
2453
2454pub fn system_prompt(id: &str) -> Option<String> {
2455 SESSIONS.with(|s| {
2456 s.borrow()
2457 .get(id)
2458 .and_then(|state| state.system_prompt.clone())
2459 })
2460}
2461
2462#[cfg(debug_assertions)]
2463fn forbidden_workspace_prompt_token(system_prompt: &str) -> Option<&'static str> {
2464 let mut remaining = system_prompt;
2465 while let Some(index) = remaining.find("{{") {
2466 let candidate = remaining[index + 2..].trim_start();
2467 if candidate.starts_with("workspace_") {
2468 return Some("workspace_");
2469 }
2470 if candidate.starts_with("project_") {
2471 return Some("project_");
2472 }
2473 remaining = candidate;
2474 }
2475 None
2476}
2477
2478#[cfg(debug_assertions)]
2479fn assert_cache_stable_system_prompt(system_prompt: &str) {
2480 if let Some(prefix) = forbidden_workspace_prompt_token(system_prompt) {
2481 panic!(
2482 "{CACHE_STABLE_SYSTEM_PROMPT_DIAGNOSTIC}: session system prompts must not interpolate `{{{{{prefix}...` tokens; move workspace/project context into the workspace-anchor reminder"
2483 );
2484 }
2485}
2486
2487#[cfg(not(debug_assertions))]
2488fn assert_cache_stable_system_prompt(_system_prompt: &str) {}
2489
2490pub fn set_pinned_model(id: &str, model: Option<String>) -> Result<bool, String> {
2495 let normalized = model
2496 .map(|value| value.trim().to_string())
2497 .filter(|value| !value.is_empty());
2498 SESSIONS.with(|s| {
2499 let mut map = s.borrow_mut();
2500 let Some(state) = map.get_mut(id) else {
2501 return Err(format!("agent session '{id}' does not exist"));
2502 };
2503 let changed = state.pinned_model != normalized;
2504 state.pinned_model = normalized;
2505 state.touch();
2506 Ok(changed)
2507 })
2508}
2509
2510pub fn pinned_model(id: &str) -> Option<String> {
2514 SESSIONS.with(|s| {
2515 s.borrow()
2516 .get(id)
2517 .and_then(|state| state.pinned_model.clone())
2518 })
2519}
2520
2521pub fn set_pinned_reasoning_policy(id: &str, policy: Option<String>) -> Result<bool, String> {
2523 let normalized = match policy {
2524 Some(value) => crate::llm::reasoning_policy::normalize_policy_selector(&value)?,
2525 None => None,
2526 };
2527 SESSIONS.with(|s| {
2528 let mut map = s.borrow_mut();
2529 let Some(state) = map.get_mut(id) else {
2530 return Err(format!("agent session '{id}' does not exist"));
2531 };
2532 let changed = state.pinned_reasoning_policy != normalized;
2533 state.pinned_reasoning_policy = normalized;
2534 state.touch();
2535 Ok(changed)
2536 })
2537}
2538
2539pub fn pinned_reasoning_policy(id: &str) -> Option<String> {
2541 SESSIONS.with(|s| {
2542 s.borrow()
2543 .get(id)
2544 .and_then(|state| state.pinned_reasoning_policy.clone())
2545 })
2546}
2547
2548pub fn set_workspace_anchor(id: &str, anchor: Option<WorkspaceAnchor>) -> Result<bool, String> {
2552 SESSIONS.with(|s| {
2553 let mut map = s.borrow_mut();
2554 let Some(state) = map.get_mut(id) else {
2555 return Err(format!("agent session '{id}' does not exist"));
2556 };
2557 let changed = state.workspace_anchor != anchor;
2558 state.workspace_anchor = anchor;
2559 if changed {
2560 state.redo_stack.clear();
2561 crate::llm::permissions::clear_session_grants(id);
2562 }
2563 state.touch();
2564 Ok(changed)
2565 })
2566}
2567
2568pub fn workspace_anchor(id: &str) -> Option<WorkspaceAnchor> {
2570 SESSIONS.with(|s| {
2571 s.borrow()
2572 .get(id)
2573 .and_then(|state| state.workspace_anchor.clone())
2574 })
2575}
2576
2577#[derive(Clone, Debug, PartialEq, Eq)]
2581pub struct ReanchorOutcome {
2582 pub previous: Option<WorkspaceAnchor>,
2583 pub current: WorkspaceAnchor,
2584 pub changed: bool,
2585}
2586
2587pub fn reanchor_session(
2592 id: &str,
2593 new_anchor: WorkspaceAnchor,
2594 carry_transcript: bool,
2595 compacted: bool,
2596 reason: Option<String>,
2597) -> Result<ReanchorOutcome, String> {
2598 let outcome = SESSIONS.with(|s| {
2599 let mut map = s.borrow_mut();
2600 let Some(state) = map.get_mut(id) else {
2601 return Err(format!("agent session '{id}' does not exist"));
2602 };
2603 let previous = state.workspace_anchor.clone();
2604 let changed = previous.as_ref() != Some(&new_anchor);
2605 state.workspace_anchor = Some(new_anchor.clone());
2606 if changed {
2607 crate::llm::permissions::clear_session_grants(id);
2608 }
2609 state.touch();
2610 Ok(ReanchorOutcome {
2611 previous,
2612 current: new_anchor,
2613 changed,
2614 })
2615 })?;
2616 if !outcome.changed {
2617 return Ok(outcome);
2618 }
2619 let previous_json = outcome.previous.as_ref().map(WorkspaceAnchor::to_json);
2620 let current_json = outcome.current.to_json();
2621 let event_metadata = serde_json::json!({
2622 "previous": previous_json,
2623 "current": current_json,
2624 "carry_transcript": carry_transcript,
2625 "compacted": compacted,
2626 "reason": reason,
2627 });
2628 let event = crate::llm::helpers::transcript_event(
2629 "AnchorChanged",
2630 "system",
2631 "internal",
2632 "",
2633 Some(event_metadata),
2634 );
2635 let _ = append_event(id, event);
2636 crate::llm::emit_live_agent_event_sync(&crate::agent_events::AgentEvent::AnchorChanged {
2637 session_id: id.to_string(),
2638 previous: previous_json,
2639 current: current_json,
2640 carry_transcript,
2641 compacted,
2642 reason,
2643 });
2644 Ok(outcome)
2645}
2646
2647pub fn set_workspace_policy(id: &str, policy: WorkspacePolicy) -> Result<bool, String> {
2650 SESSIONS.with(|s| {
2651 let mut map = s.borrow_mut();
2652 let Some(state) = map.get_mut(id) else {
2653 return Err(format!("agent session '{id}' does not exist"));
2654 };
2655 let changed = state.workspace_policy != policy;
2656 state.workspace_policy = policy;
2657 if changed {
2658 state.redo_stack.clear();
2659 }
2660 state.touch();
2661 Ok(changed)
2662 })
2663}
2664
2665pub fn workspace_policy(id: &str) -> Option<WorkspacePolicy> {
2667 SESSIONS.with(|s| {
2668 s.borrow()
2669 .get(id)
2670 .map(|state| state.workspace_policy.clone())
2671 })
2672}
2673
2674pub fn add_workspace_root(
2678 id: &str,
2679 root: &str,
2680 mount_mode: Option<MountMode>,
2681 reason: Option<String>,
2682) -> Result<String, String> {
2683 let normalized_root = validate_workspace_root_path(root)?;
2684 let mounted_at = crate::orchestration::now_rfc3339();
2685 SESSIONS.with(|s| {
2686 let mut map = s.borrow_mut();
2687 let Some(state) = map.get_mut(id) else {
2688 return Err(format!("agent session '{id}' does not exist"));
2689 };
2690 let default_mount_mode = state.workspace_policy.default_mount_mode;
2691 let Some(anchor) = state.workspace_anchor.as_mut() else {
2692 return Err(format!("agent session '{id}' has no workspace anchor"));
2693 };
2694 let resolved_mount_mode = mount_mode.unwrap_or(default_mount_mode);
2695 if let Some(existing) = anchor
2696 .additional_roots
2697 .iter_mut()
2698 .find(|entry| entry.path == normalized_root)
2699 {
2700 let changed =
2701 existing.mount_mode != resolved_mount_mode || existing.mounted_at != mounted_at;
2702 existing.mount_mode = resolved_mount_mode;
2703 existing.mounted_at = mounted_at.clone();
2704 if changed {
2705 state.redo_stack.clear();
2706 }
2707 } else {
2708 anchor.additional_roots.push(MountedRoot {
2709 path: normalized_root.clone(),
2710 mount_mode: resolved_mount_mode,
2711 mounted_at: mounted_at.clone(),
2712 });
2713 state.redo_stack.clear();
2714 }
2715 let event = crate::llm::helpers::transcript_event(
2716 "RootMounted",
2717 "system",
2718 "internal",
2719 "",
2720 Some(serde_json::json!({
2721 "path": normalized_root.to_string_lossy(),
2722 "mount_mode": resolved_mount_mode.as_str(),
2723 "mounted_at": mounted_at.clone(),
2724 "reason": reason,
2725 })),
2726 );
2727 append_event_to_state(state, event, "add_workspace_root")?;
2728 crate::llm::permissions::clear_session_grants(id);
2729 state.touch();
2730 Ok(mounted_at.clone())
2731 })
2732}
2733
2734pub fn remove_workspace_root(id: &str, root: &str) -> Result<bool, String> {
2737 let normalized_root = normalize_workspace_root_path(root);
2738 SESSIONS.with(|s| {
2739 let mut map = s.borrow_mut();
2740 let Some(state) = map.get_mut(id) else {
2741 return Err(format!("agent session '{id}' does not exist"));
2742 };
2743 let Some(anchor) = state.workspace_anchor.as_mut() else {
2744 return Err(format!("agent session '{id}' has no workspace anchor"));
2745 };
2746 let before = anchor.additional_roots.len();
2747 anchor
2748 .additional_roots
2749 .retain(|entry| entry.path != normalized_root);
2750 let removed = anchor.additional_roots.len() != before;
2751 if removed {
2752 state.redo_stack.clear();
2753 crate::llm::permissions::clear_session_grants(id);
2754 }
2755 state.touch();
2756 Ok(removed)
2757 })
2758}
2759
2760pub fn list_workspace_roots(id: &str) -> Result<(PathBuf, Vec<MountedRoot>), String> {
2761 SESSIONS.with(|s| {
2762 let map = s.borrow();
2763 let Some(state) = map.get(id) else {
2764 return Err(format!("agent session '{id}' does not exist"));
2765 };
2766 let Some(anchor) = state.workspace_anchor.as_ref() else {
2767 return Err(format!("agent session '{id}' has no workspace anchor"));
2768 };
2769 Ok((anchor.primary.clone(), anchor.additional_roots.clone()))
2770 })
2771}
2772
2773fn validate_workspace_root_path(root: &str) -> Result<PathBuf, String> {
2774 let normalized = normalize_workspace_root_path(root);
2775 let canonical = std::fs::canonicalize(&normalized)
2776 .map_err(|error| format!("workspace root '{root}' must exist and be readable: {error}"))?;
2777 let metadata = std::fs::metadata(&canonical)
2778 .map_err(|error| format!("workspace root '{root}' must exist and be readable: {error}"))?;
2779 if !metadata.is_dir() {
2780 return Err(format!("workspace root '{root}' must be a directory"));
2781 }
2782 std::fs::read_dir(&canonical)
2783 .map_err(|error| format!("workspace root '{root}' must be readable: {error}"))?;
2784 Ok(canonical)
2785}
2786
2787fn normalize_workspace_root_path(root: &str) -> PathBuf {
2788 let absolute = crate::stdlib::process::normalize_context_path(Path::new(root));
2789 std::fs::canonicalize(&absolute).unwrap_or(absolute)
2790}
2791
2792fn empty_transcript(id: &str) -> VmValue {
2793 use crate::llm::helpers::new_transcript_with;
2794 new_transcript_with(Some(id.to_string()), Vec::new(), None, None)
2795}
2796
2797fn clone_transcript_with_id(transcript: &VmValue, new_id: &str) -> VmValue {
2798 let Some(dict) = transcript.as_dict() else {
2799 return empty_transcript(new_id);
2800 };
2801 let mut next = dict.clone();
2802 next.put_str("id", new_id);
2803 VmValue::dict(next)
2804}
2805
2806fn clone_transcript_with_parent(transcript: &VmValue, parent_id: &str) -> VmValue {
2807 let Some(dict) = transcript.as_dict() else {
2808 return transcript.clone();
2809 };
2810 let mut next = dict.clone();
2811 let metadata = match next.get("metadata") {
2812 Some(VmValue::Dict(metadata)) => {
2813 let mut metadata = metadata.as_ref().clone();
2814 metadata.put_str("parent_session_id", parent_id);
2815 VmValue::dict(metadata)
2816 }
2817 _ => VmValue::dict(BTreeMap::from([(
2818 "parent_session_id".to_string(),
2819 VmValue::String(arcstr::ArcStr::from(parent_id.to_string())),
2820 )])),
2821 };
2822 next.insert(crate::value::intern_key("metadata"), metadata);
2823 VmValue::dict(next)
2824}
2825
2826fn apply_system_prompt_metadata(next: &mut crate::value::DictMap, system_prompt: &str) {
2827 let mut metadata = match next.get("metadata") {
2828 Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
2829 _ => crate::value::DictMap::new(),
2830 };
2831 metadata.insert(
2832 crate::value::intern_key("system_prompt"),
2833 crate::stdlib::json_to_vm_value(&crate::llm::helpers::system_prompt_metadata(
2834 system_prompt,
2835 )),
2836 );
2837 next.insert(
2838 crate::value::intern_key("metadata"),
2839 VmValue::dict(metadata),
2840 );
2841}
2842
2843fn transcript_with_session_metadata(transcript: VmValue, state: &SessionState) -> VmValue {
2844 let Some(dict) = transcript.as_dict() else {
2845 return transcript;
2846 };
2847 let mut next = dict.clone();
2848 let mut metadata = match next.get("metadata") {
2849 Some(VmValue::Dict(metadata)) => metadata.as_ref().clone(),
2850 _ => crate::value::DictMap::new(),
2851 };
2852 if let Some(tool_format) = state.tool_format.as_ref() {
2853 metadata.put_str("tool_format", tool_format.clone());
2854 metadata.insert(
2855 crate::value::intern_key("tool_mode_locked"),
2856 VmValue::Bool(true),
2857 );
2858 }
2859 if let Some(system_prompt) = state.system_prompt.as_ref() {
2860 metadata.insert(
2861 crate::value::intern_key("system_prompt"),
2862 crate::stdlib::json_to_vm_value(&crate::llm::helpers::system_prompt_metadata(
2863 system_prompt,
2864 )),
2865 );
2866 }
2867 if let Some(actor_chain) = state.actor_chain.as_ref() {
2868 metadata.insert(
2869 crate::value::intern_key("actor_chain"),
2870 actor_chain.to_vm_value(),
2871 );
2872 } else {
2873 metadata.remove("actor_chain");
2874 }
2875 if let Some(anchor) = state.workspace_anchor.as_ref() {
2876 metadata.insert(
2877 crate::value::intern_key(WORKSPACE_ANCHOR_METADATA_KEY),
2878 anchor.to_vm_value(),
2879 );
2880 } else {
2881 metadata.remove(WORKSPACE_ANCHOR_METADATA_KEY);
2882 }
2883 if let Some(scratchpad) = state.scratchpad.as_ref() {
2884 metadata.insert(
2885 crate::value::intern_key("agent_scratchpad"),
2886 scratchpad.clone(),
2887 );
2888 metadata.insert(
2889 crate::value::intern_key("agent_scratchpad_version"),
2890 VmValue::Int(state.scratchpad_version as i64),
2891 );
2892 } else {
2893 metadata.remove("agent_scratchpad");
2894 metadata.remove("agent_scratchpad_version");
2895 }
2896 if let Some(last_action) = state.last_transcript_budget_action.as_ref() {
2897 let usage = transcript_usage(
2898 &VmValue::dict(next.clone()),
2899 state.transcript_budget_policy.max_approx_bytes.is_some(),
2900 );
2901 metadata.insert(
2902 crate::value::intern_key("transcript_budget"),
2903 crate::stdlib::json_to_vm_value(&serde_json::json!({
2904 "policy": transcript_budget_policy_json(&state.transcript_budget_policy.normalized()),
2905 "usage": transcript_budget_usage_json(&usage),
2906 "last_action": last_action,
2907 })),
2908 );
2909 }
2910 if !metadata.is_empty() {
2911 next.insert(
2912 crate::value::intern_key("metadata"),
2913 VmValue::dict(metadata),
2914 );
2915 }
2916 VmValue::dict(next)
2917}
2918
2919fn session_snapshot(state: &SessionState) -> VmValue {
2931 let transcript = transcript_with_session_metadata(state.transcript.clone(), state);
2932 let Some(dict) = transcript.as_dict() else {
2933 return state.transcript.clone();
2934 };
2935 let mut next = dict.clone();
2936 let length = next
2937 .get("messages")
2938 .and_then(|value| match value {
2939 VmValue::List(list) => Some(list.len() as i64),
2940 _ => None,
2941 })
2942 .unwrap_or(0);
2943 next.insert(crate::value::intern_key("length"), VmValue::Int(length));
2944 next.put_str("created_at", state.created_at.clone());
2945 next.insert(
2946 crate::value::intern_key("parent_id"),
2947 state
2948 .parent_id
2949 .as_ref()
2950 .map(|id| VmValue::String(arcstr::ArcStr::from(id.clone())))
2951 .unwrap_or(VmValue::Nil),
2952 );
2953 next.insert(
2954 crate::value::intern_key("child_ids"),
2955 VmValue::List(std::sync::Arc::new(
2956 state
2957 .child_ids
2958 .iter()
2959 .cloned()
2960 .map(|id| VmValue::String(arcstr::ArcStr::from(id)))
2961 .collect(),
2962 )),
2963 );
2964 next.insert(
2965 crate::value::intern_key("branched_at_event_index"),
2966 state
2967 .branched_at_event_index
2968 .map(|index| VmValue::Int(index as i64))
2969 .unwrap_or(VmValue::Nil),
2970 );
2971 next.insert(
2972 crate::value::intern_key("system_prompt"),
2973 state
2974 .system_prompt
2975 .as_ref()
2976 .map(|prompt| VmValue::String(arcstr::ArcStr::from(prompt.clone())))
2977 .unwrap_or(VmValue::Nil),
2978 );
2979 next.insert(
2980 crate::value::intern_key("tool_format"),
2981 state
2982 .tool_format
2983 .as_ref()
2984 .map(|format| VmValue::String(arcstr::ArcStr::from(format.clone())))
2985 .unwrap_or(VmValue::Nil),
2986 );
2987 next.insert(
2988 crate::value::intern_key("pinned_model"),
2989 state
2990 .pinned_model
2991 .as_ref()
2992 .map(|model| VmValue::String(arcstr::ArcStr::from(model.clone())))
2993 .unwrap_or(VmValue::Nil),
2994 );
2995 next.insert(
2996 crate::value::intern_key("pinned_reasoning_policy"),
2997 state
2998 .pinned_reasoning_policy
2999 .as_ref()
3000 .map(|policy| VmValue::String(arcstr::ArcStr::from(policy.clone())))
3001 .unwrap_or(VmValue::Nil),
3002 );
3003 next.insert(
3004 crate::value::intern_key("actor_chain"),
3005 state
3006 .actor_chain
3007 .as_ref()
3008 .map(ActorChain::to_vm_value)
3009 .unwrap_or(VmValue::Nil),
3010 );
3011 next.insert(
3012 crate::value::intern_key("scratchpad"),
3013 state.scratchpad.clone().unwrap_or(VmValue::Nil),
3014 );
3015 next.insert(
3016 crate::value::intern_key("scratchpad_version"),
3017 VmValue::Int(state.scratchpad_version as i64),
3018 );
3019 next.insert(
3020 crate::value::intern_key("workspace_anchor"),
3021 state
3022 .workspace_anchor
3023 .as_ref()
3024 .map(WorkspaceAnchor::to_vm_value)
3025 .unwrap_or(VmValue::Nil),
3026 );
3027 next.insert(
3028 crate::value::intern_key("workspace_policy"),
3029 state.workspace_policy.to_vm_value(),
3030 );
3031 next.insert(
3032 crate::value::intern_key("live_clients"),
3033 crate::stdlib::json_to_vm_value(&serde_json::Value::Array(
3034 state.live_clients.values().map(live_client_json).collect(),
3035 )),
3036 );
3037 next.insert(
3038 crate::value::intern_key("live_controller_id"),
3039 state
3040 .live_controller_id
3041 .as_ref()
3042 .map(|id| VmValue::String(arcstr::ArcStr::from(id.clone())))
3043 .unwrap_or(VmValue::Nil),
3044 );
3045 next.insert(
3046 crate::value::intern_key("completed_turn_checkpoint_count"),
3047 VmValue::Int(state.completed_turn_checkpoints.len() as i64),
3048 );
3049 next.insert(
3050 crate::value::intern_key("redo_checkpoint_count"),
3051 VmValue::Int(state.redo_stack.len() as i64),
3052 );
3053 VmValue::dict(next)
3054}
3055
3056fn update_lineage(
3057 map: &mut HashMap<String, SessionState>,
3058 parent_id: &str,
3059 child_id: &str,
3060 branched_at_event_index: Option<usize>,
3061) {
3062 let old_parent_id = map.get(child_id).and_then(|child| child.parent_id.clone());
3063 if let Some(old_parent_id) = old_parent_id.filter(|old_parent_id| old_parent_id != parent_id) {
3064 if let Some(old_parent) = map.get_mut(&old_parent_id) {
3065 old_parent.child_ids.retain(|id| id != child_id);
3066 old_parent.touch();
3067 }
3068 }
3069 if let Some(parent) = map.get_mut(parent_id) {
3070 parent.touch();
3071 if !parent.child_ids.iter().any(|id| id == child_id) {
3072 parent.child_ids.push(child_id.to_string());
3073 }
3074 }
3075 if let Some(child) = map.get_mut(child_id) {
3076 child.touch();
3077 child.parent_id = Some(parent_id.to_string());
3078 child.branched_at_event_index = branched_at_event_index;
3079 child.transcript = clone_transcript_with_parent(&child.transcript, parent_id);
3080 }
3081}
3082
3083fn branch_event_index(transcript: &VmValue, keep_first: usize) -> usize {
3084 if keep_first == 0 {
3085 return 0;
3086 }
3087 let Some(dict) = transcript.as_dict() else {
3088 return keep_first;
3089 };
3090 let Some(VmValue::List(events)) = dict.get("events") else {
3091 return keep_first;
3092 };
3093 event_prefix_len_for_messages(events, keep_first)
3094}
3095
3096fn event_kind(event: &VmValue) -> Option<String> {
3097 event
3098 .as_dict()
3099 .and_then(|dict| dict.get("kind"))
3100 .map(VmValue::display)
3101}
3102
3103fn event_id(event: &VmValue) -> Option<String> {
3104 event
3105 .as_dict()
3106 .and_then(|dict| dict.get("id"))
3107 .map(VmValue::display)
3108}
3109
3110fn is_turn_event(event: &VmValue) -> bool {
3111 matches!(
3112 event_kind(event).as_deref(),
3113 Some("message" | "tool_result")
3114 )
3115}
3116
3117fn event_prefix_len_for_messages(events: &[VmValue], keep_first: usize) -> usize {
3118 if keep_first == 0 {
3119 return 0;
3120 }
3121 let mut retained_messages = 0usize;
3122 for (index, event) in events.iter().enumerate() {
3123 if is_turn_event(event) {
3124 retained_messages += 1;
3125 if retained_messages == keep_first {
3126 return index + 1;
3127 }
3128 }
3129 }
3130 events.len()
3131}
3132
3133fn turn_event_id_for_count(events: &[VmValue], keep_first: usize) -> Option<String> {
3134 if keep_first == 0 {
3135 return None;
3136 }
3137 let mut retained_messages = 0usize;
3138 for event in events {
3139 if is_turn_event(event) {
3140 retained_messages += 1;
3141 if retained_messages == keep_first {
3142 return event_id(event);
3143 }
3144 }
3145 }
3146 None
3147}
3148
3149#[cfg(test)]
3150#[path = "agent_sessions_tests.rs"]
3151mod tests;