defect_agent/session/default.rs
1//! Default implementation of [`Session`] / [`AgentCore`].
2//!
3//! Assembly structure:
4//!
5//! ```text
6//! DefaultAgentCore
7//! ├── Arc<dyn LlmProvider> (injected at assembly, shared by all sessions in this core)
8//! ├── Arc<dyn ToolRegistry> (built-in tools, shared by all sessions in this core)
9//! ├── TurnConfig (default configuration)
10//! └── DashMap<SessionId, Arc<dyn Session>>
11//!
12//! Note: "shared" here is scoped to the **`AgentCore` instance**, not process-global.
13//! When using defect as a library, a single process can assemble multiple `AgentCore`
14//! instances, each with its own provider / tool set / configuration.
15//!
16//! DefaultSession
17//! ├── id: SessionId
18//! ├── cwd: PathBuf
19//! ├── history: Box<dyn History>
20//! ├── tools: Arc<dyn ToolRegistry> (CompositeRegistry: per-session + process)
21//! ├── provider: Arc<dyn LlmProvider>
22//! ├── events: Arc<EventEmitter>
23//! ├── permissions: Arc<PermissionGate>
24//! ├── turn_lock: tokio::sync::Mutex<TurnSlot>
25//! └── config: RwLock<TurnConfig>
26//! ```
27//!
28//! Turn mutual exclusion uses `Mutex<TurnSlot>`: `run_turn` calls `try_lock` at the
29//! outermost level, returning `TurnError::TurnInProgress` on failure. `TurnSlot`
30//! internally stores the current turn's [`CancellationToken`]; `cancel_turn` extracts
31//! it and calls `cancel()`.
32
33use std::io;
34use std::path::PathBuf;
35use std::sync::{Arc, Mutex, RwLock};
36
37use agent_client_protocol_schema::{ContentBlock, McpServer, SessionId, StopReason, ToolCallId};
38use dashmap::DashMap;
39use futures::future::BoxFuture;
40use tokio_util::sync::CancellationToken;
41use tracing::Instrument;
42
43use crate::error::BoxError;
44use crate::event::{AgentEvent, PermissionResolution};
45use crate::fs::FsBackend;
46use crate::hooks::{HookCtx, HookEngine, NoopHookEngine};
47use crate::http::{HttpClient, NoopHttpClient};
48use crate::llm::{
49 HostedCapabilities, LlmProvider, Message, ModelCandidate, ModelInfo, ProviderError,
50 ProviderErrorKind, ProviderInfo, ProviderRegistry, ReasoningEffort,
51};
52use crate::policy::{AskWritesPolicy, ModeCatalog, SandboxPolicy};
53use crate::session::capabilities::{ResolvedSessionCapabilities, SessionCapabilitiesConfig};
54use crate::session::context::{Frontend, RunningContext};
55use crate::session::events::EventEmitter;
56use crate::session::permissions::PermissionGate;
57use crate::session::prompt::resolve_system_prompt;
58use crate::session::tool_registry::{CompositeRegistry, StaticToolRegistry};
59use crate::session::turn::{
60 CompactionCtx, RequestAuditTracker, TurnConfig, TurnRunner, run_sync_compaction,
61};
62use crate::session::{
63 AgentCore, AgentError, CompactionReport, ContextStatus, EventStream, History, ModelSelection,
64 Session, SessionCreateInfo, SessionLoader, SessionObserver, SessionToolFactory, ToolRegistry,
65 TurnError, VecHistory,
66};
67use crate::shell::ShellBackend;
68
69/// Default [`AgentCore`].
70pub struct DefaultAgentCore {
71 /// Provider registry wired at assembly time. Sessions share the same `Arc`; the
72 /// active model ID is used to resolve the corresponding [`LlmProvider`] — this type
73 /// no longer "holds a single provider".
74 registry: Arc<ProviderRegistry>,
75 process_tools: Arc<dyn ToolRegistry>,
76 /// Default policy — used as the session's active policy **only when no mode catalog
77 /// is present (`modes` is `None`)**. When a catalog is present, it is overridden by
78 /// the catalog's current mode.
79 policy: Arc<dyn SandboxPolicy>,
80 /// Permission mode catalog template. When `Some`, each session holds a cloned copy
81 /// and can switch independently via `session/set_mode`; when `None`, falls back to
82 /// the single non-switchable `policy`.
83 modes: Option<ModeCatalog>,
84 config: RwLock<TurnConfig>,
85 loader: Option<Arc<dyn SessionLoader>>,
86 session_tools: Option<Arc<dyn SessionToolFactory>>,
87 observers: Vec<Arc<dyn SessionObserver>>,
88 /// HTTP fetch backend. Shared across all sessions in this core — HTTP has no
89 /// per-client capability negotiation, and there is no need to isolate connection
90 /// pools between sessions. Constructed once at the CLI entry point from
91 /// `HttpClientConfig` and injected; tests and the `echo` provider use
92 /// [`NoopHttpClient`].
93 http: Arc<dyn HttpClient>,
94 /// Hook engine shared by all sessions in this core — hook configuration uses global +
95 /// per-session matchers. Assembled at CLI entry; without explicit injection, uses
96 /// [`NoopHookEngine`], equivalent to "no hooks configured = main loop unchanged".
97 hook_engine: Arc<dyn HookEngine>,
98 /// Background progress view configuration. Shared across all sessions (at the same
99 /// level as process-wide tool configuration).
100 /// Passed to each session when constructing `BackgroundTasks`.
101 background_progress: crate::session::BackgroundProgressConfig,
102 /// Shared state for the `--goal` goal-driven loop. When `Some`, sessions in this core
103 /// run in goal mode; injected by CLI assembly based on `--goal`. All sessions share
104 /// the same instance (a `--goal` process typically runs only one session). `None` =
105 /// non-goal mode (default).
106 goal: Option<Arc<crate::session::GoalState>>,
107 sessions: DashMap<SessionId, Arc<dyn Session>>,
108}
109
110impl DefaultAgentCore {
111 pub fn builder() -> DefaultAgentCoreBuilder {
112 DefaultAgentCoreBuilder::default()
113 }
114}
115
116#[derive(Default)]
117pub struct DefaultAgentCoreBuilder {
118 registry: Option<Arc<ProviderRegistry>>,
119 /// Convenience entry for a single provider: set [`Self::provider`] here, and at
120 /// `build()` time it is combined with `config.model` to produce a single-entry
121 /// [`ProviderRegistry`]. This field is ignored when `registry` has been explicitly
122 /// injected.
123 single_provider: Option<Arc<dyn LlmProvider>>,
124 /// Session capabilities for the single-provider path. Ignored when `registry` is
125 /// explicitly injected, as the entry provides its own.
126 single_capabilities: SessionCapabilitiesConfig,
127 process_tools: Option<Arc<dyn ToolRegistry>>,
128 policy: Option<Arc<dyn SandboxPolicy>>,
129 modes: Option<ModeCatalog>,
130 loader: Option<Arc<dyn SessionLoader>>,
131 session_tools: Option<Arc<dyn SessionToolFactory>>,
132 observers: Vec<Arc<dyn SessionObserver>>,
133 http: Option<Arc<dyn HttpClient>>,
134 hook_engine: Option<Arc<dyn HookEngine>>,
135 config: TurnConfig,
136 /// Background task progress view configuration (ring capacity / body limit). Each
137 /// session's `BackgroundTasks` builds its progress ring from this. When unset, falls
138 /// back to
139 /// [`BackgroundProgressConfig::default`](crate::session::BackgroundProgressConfig)
140 /// (bird's-eye view, no body content).
141 background_progress: crate::session::BackgroundProgressConfig,
142 /// Shared state for the `--goal` goal-driven loop. Injected by the CLI during
143 /// assembly based on `--goal`; if unset, goal mode is disabled.
144 goal: Option<Arc<crate::session::GoalState>>,
145}
146
147impl DefaultAgentCoreBuilder {
148 /// Injects the provider registry during assembly. Used by the CLI and real startup
149 /// paths; tests and single-provider scenarios should use [`Self::provider`] for
150 /// convenience.
151 pub fn registry(mut self, registry: Arc<ProviderRegistry>) -> Self {
152 self.registry = Some(registry);
153 self
154 }
155
156 /// Convenience entry point for a single provider. Wraps it into a single-entry
157 /// [`ProviderRegistry`] at `build()` time; default model = [`TurnConfig::model`].
158 /// Mutually exclusive with [`Self::registry`]; if both are set, `registry` takes
159 /// precedence.
160 pub fn provider(mut self, provider: Arc<dyn LlmProvider>) -> Self {
161 self.single_provider = Some(provider);
162 self
163 }
164
165 /// Session capabilities for the single-provider convenience path — these are merged
166 /// into the single-entry registry automatically constructed by `build()`. For the
167 /// multi-provider path, write capabilities directly into
168 /// [`ProviderEntry`](crate::llm::ProviderEntry) instead; this field is ignored.
169 pub fn capabilities(mut self, capabilities: SessionCapabilitiesConfig) -> Self {
170 self.single_capabilities = capabilities;
171 self
172 }
173
174 pub fn process_tools(mut self, tools: Arc<dyn ToolRegistry>) -> Self {
175 self.process_tools = Some(tools);
176 self
177 }
178
179 pub fn policy(mut self, policy: Arc<dyn SandboxPolicy>) -> Self {
180 self.policy = Some(policy);
181 self
182 }
183
184 /// Inject a permission-mode catalog. When `Some`, each session exposes an ACP
185 /// `SessionModeState` and supports `session/set_mode`; the catalog's current mode
186 /// overrides [`Self::policy`] as the session's initial active policy. If not called,
187 /// the session has no mode switching and is fixed to [`Self::policy`] (or the default
188 /// [`AskWritesPolicy`]).
189 pub fn modes(mut self, modes: ModeCatalog) -> Self {
190 self.modes = Some(modes);
191 self
192 }
193
194 pub fn session_loader(mut self, loader: Arc<dyn SessionLoader>) -> Self {
195 self.loader = Some(loader);
196 self
197 }
198
199 pub fn session_tool_factory(mut self, factory: Arc<dyn SessionToolFactory>) -> Self {
200 self.session_tools = Some(factory);
201 self
202 }
203
204 pub fn observe_session(mut self, observer: Arc<dyn SessionObserver>) -> Self {
205 self.observers.push(observer);
206 self
207 }
208
209 pub fn config(mut self, config: TurnConfig) -> Self {
210 self.config = config;
211 self
212 }
213
214 /// Inject shared state for the `--goal` goal-driven loop. When `Some`, the session
215 /// runs in goal mode: the top-level turn injects it into the `goal_done` tool via
216 /// [`crate::tool::ToolContext::goal`], and the `goal-gate` hook uses it to drive
217 /// multi-turn autonomous loops. If not called, the session runs in non-goal mode (the
218 /// default).
219 pub fn goal(mut self, goal: Arc<crate::session::GoalState>) -> Self {
220 self.goal = Some(goal);
221 self
222 }
223
224 /// Sets the background task progress view configuration (progress ring capacity /
225 /// per-block body character limit). When not called, defaults to ring size 64 and
226 /// body limit 0, meaning only summary/metadata is shown and sub-turn bodies are not
227 /// populated. During CLI assembly, this is injected from the `[tools.background]`
228 /// projection.
229 pub fn background_progress(mut self, config: crate::session::BackgroundProgressConfig) -> Self {
230 self.background_progress = config;
231 self
232 }
233
234 /// Sets the HTTP fetch backend for this core. When unset, defaults to
235 /// [`NoopHttpClient`]—any `fetch` call will fail with
236 /// [`crate::http::HttpClientError::Transport`], allowing tests or `echo` assemblies
237 /// that don't need networking to skip constructing a real HTTP stack.
238 pub fn http(mut self, http: Arc<dyn HttpClient>) -> Self {
239 self.http = Some(http);
240 self
241 }
242
243 /// Sets the hook engine for this core. When unset, falls back to [`NoopHookEngine`] —
244 /// all hook calls return `Pass` directly, and the main loop behaves as if the hook
245 /// system were not introduced.
246 pub fn hook_engine(mut self, hook_engine: Arc<dyn HookEngine>) -> Self {
247 self.hook_engine = Some(hook_engine);
248 self
249 }
250
251 /// # Panics
252 /// Neither `registry` nor `provider` is set; or, in the single-provider path,
253 /// `config.model` is an empty string (the registry must have at least one default
254 /// model).
255 pub fn build(mut self) -> DefaultAgentCore {
256 let registry = self.registry.take().unwrap_or_else(|| {
257 let provider = self
258 .single_provider
259 .take()
260 .expect("DefaultAgentCore requires a provider or a registry");
261 let vendor = provider.info().vendor;
262 // Under the single-provider path, config usually lacks a selected vendor —
263 // fill in the provider's own vendor so that `resolve_initial_provider` can
264 // find the entry by the (vendor, model) pair.
265 if self.config.provider.is_empty() {
266 self.config.provider = vendor.clone();
267 }
268 let model_id = self.config.model.clone();
269 assert!(
270 !model_id.is_empty(),
271 "DefaultAgentCoreBuilder::provider() requires TurnConfig::model to be set; \
272 use registry() for multi-provider setups"
273 );
274 // In the single-provider path, treat `TurnConfig::allowed_models` as the
275 // model candidate list — this mirrors the multi-provider assembly in the CLI:
276 // users declare candidates via `[providers.<p>.models]`, and the agent does
277 // not send a `list_models` network request to the adapter. When
278 // `allowed_models` is absent, fall back to exposing only the default model.
279 let model_ids = match self.config.allowed_models.as_ref() {
280 Some(ids) if !ids.is_empty() => ids.clone(),
281 _ => vec![model_id.clone()],
282 };
283 let mut model_infos: Vec<ModelInfo> = model_ids
284 .into_iter()
285 .map(|id| {
286 provider.model_info(&id).unwrap_or(ModelInfo {
287 id,
288 display_name: None,
289 context_window: None,
290 max_output_tokens: None,
291 deprecated: false,
292 capabilities_overrides: Default::default(),
293 })
294 })
295 .collect();
296 if !model_infos.iter().any(|m| m.id == model_id) {
297 model_infos.insert(
298 0,
299 ModelInfo {
300 id: model_id.clone(),
301 display_name: None,
302 context_window: None,
303 max_output_tokens: None,
304 deprecated: false,
305 capabilities_overrides: Default::default(),
306 },
307 );
308 }
309 Arc::new(
310 crate::llm::ProviderRegistry::new(
311 vec![crate::llm::ProviderEntry::new(
312 provider,
313 model_infos,
314 self.single_capabilities,
315 )],
316 &vendor,
317 &model_id,
318 )
319 .expect("single-entry registry must satisfy invariants"),
320 )
321 });
322
323 DefaultAgentCore {
324 registry,
325 process_tools: self
326 .process_tools
327 .unwrap_or_else(|| Arc::new(StaticToolRegistry::empty()) as Arc<dyn ToolRegistry>),
328 policy: self
329 .policy
330 .unwrap_or_else(|| Arc::new(AskWritesPolicy::new()) as Arc<dyn SandboxPolicy>),
331 modes: self.modes,
332 loader: self.loader,
333 session_tools: self.session_tools,
334 observers: self.observers,
335 http: self
336 .http
337 .unwrap_or_else(|| Arc::new(NoopHttpClient) as Arc<dyn HttpClient>),
338 hook_engine: self
339 .hook_engine
340 .unwrap_or_else(|| Arc::new(NoopHookEngine) as Arc<dyn HookEngine>),
341 config: RwLock::new(self.config),
342 background_progress: self.background_progress,
343 goal: self.goal,
344 sessions: DashMap::new(),
345 }
346 }
347}
348
349impl AgentCore for DefaultAgentCore {
350 fn create_session(
351 &self,
352 id: SessionId,
353 cwd: PathBuf,
354 mcp_servers: Vec<McpServer>,
355 fs: Arc<dyn FsBackend>,
356 shell: Arc<dyn ShellBackend>,
357 frontend: Frontend,
358 ) -> BoxFuture<'_, Result<Arc<dyn Session>, AgentError>> {
359 Box::pin(async move {
360 if !cwd.is_absolute() || !cwd.exists() {
361 return Err(AgentError::InvalidCwd(cwd));
362 }
363 let session_cwd = cwd.clone();
364 if self.sessions.contains_key(&id) {
365 return Err(AgentError::DuplicateSessionId(id));
366 }
367
368 let initial = self.resolve_initial_provider()?;
369
370 let session_tools = match &self.session_tools {
371 Some(factory) => factory
372 .build_registry(cwd.clone(), mcp_servers.clone())
373 .await
374 .map_err(|source| AgentError::McpStartup {
375 server: "session_tools".to_string(),
376 source,
377 })?,
378 None => Arc::new(StaticToolRegistry::empty()) as Arc<dyn ToolRegistry>,
379 };
380 let composite: Arc<dyn ToolRegistry> = Arc::new(CompositeRegistry::new(
381 session_tools,
382 self.process_tools.clone(),
383 ));
384
385 // After the session-enter hook, absorb any injected `additional_context` as
386 // candidate system-prompt suffixes.
387 let session_start_append = {
388 let cancel = CancellationToken::new();
389 let ctx = HookCtx::new(&id, &cwd, cancel);
390 let mut step = crate::hooks::step::AfterSessionEnter {
391 cwd: cwd.to_string_lossy().into_owned(),
392 source: crate::hooks::step::SessionSource::New,
393 additional_context: Vec::new(),
394 };
395 let _ = self.hook_engine.dispatch(&mut step, ctx).await;
396 step.additional_context
397 };
398
399 // Session-level cancellation token: both the driver loop exit signal and the
400 // source of background task cancellation tokens (same token).
401 let session_cancel = CancellationToken::new();
402 let (policy, modes) = self.session_policy_state();
403 let concrete = Arc::new(DefaultSession {
404 id: id.clone(),
405 cwd,
406 history: Arc::new(VecHistory::new()) as Arc<dyn History>,
407 tools: composite,
408 registry: self.registry.clone(),
409 provider_state: RwLock::new(initial),
410 policy,
411 modes,
412 events: Arc::new(EventEmitter::new()),
413 permissions: Arc::new(PermissionGate::new()),
414 turn_state: Mutex::new(TurnSlot::default()),
415 background: crate::session::BackgroundTasks::new(
416 session_cancel.clone(),
417 self.background_progress,
418 ),
419 goal: self.goal.clone(),
420 compaction_slot: crate::session::CompactionSlot::new(),
421 turn_freed: Arc::new(tokio::sync::Notify::new()),
422 session_cancel,
423 config: RwLock::new(
424 self.config
425 .read()
426 .expect("DefaultAgentCore config rwlock poisoned")
427 .clone(),
428 ),
429 fs,
430 shell,
431 frontend,
432 http: self.http.clone(),
433 hook_engine: self.hook_engine.clone(),
434 session_start_append,
435 request_audit: RequestAuditTracker::new(),
436 });
437 // Spawn the session driver (active keep-alive). The driver holds a `Weak`
438 // self-reference so that when all external strong references to the session
439 // are dropped, the driver's `upgrade` fails and it exits, preventing the
440 // session from living forever.
441 tokio::spawn(DefaultSession::drive(Arc::downgrade(&concrete)));
442 let session = concrete as Arc<dyn Session>;
443
444 let session_info = SessionCreateInfo {
445 id: id.clone(),
446 cwd: session_cwd,
447 mcp_servers,
448 };
449 for observer in &self.observers {
450 observer
451 .on_session_created(session.clone(), session_info.clone())
452 .map_err(AgentError::Observer)?;
453 }
454
455 self.sessions.insert(id, session.clone());
456 Ok(session)
457 })
458 }
459
460 fn load_session(
461 &self,
462 id: SessionId,
463 fs: Arc<dyn FsBackend>,
464 shell: Arc<dyn ShellBackend>,
465 frontend: Frontend,
466 ) -> BoxFuture<'_, Result<Arc<dyn Session>, AgentError>> {
467 Box::pin(async move {
468 if let Some(existing) = self.sessions.get(&id) {
469 return Ok(existing.value().clone());
470 }
471 let Some(loader) = &self.loader else {
472 return Err(AgentError::Restore(BoxError::new(io::Error::other(
473 "session loader not configured",
474 ))));
475 };
476 let loaded = loader
477 .load_session(id.clone())
478 .await
479 .map_err(AgentError::Restore)?;
480 let initial = self.resolve_initial_provider()?;
481 let session_tools = match &self.session_tools {
482 Some(factory) => factory
483 .build_registry(loaded.info.cwd.clone(), loaded.info.mcp_servers.clone())
484 .await
485 .map_err(AgentError::Restore)?,
486 None => Arc::new(StaticToolRegistry::empty()) as Arc<dyn ToolRegistry>,
487 };
488
489 // After session enter hook (resume path). Same as `create_session`: retrieve
490 // the injected context.
491 let session_start_append = {
492 let cancel = CancellationToken::new();
493 let ctx = HookCtx::new(&loaded.info.id, &loaded.info.cwd, cancel);
494 let mut step = crate::hooks::step::AfterSessionEnter {
495 cwd: loaded.info.cwd.to_string_lossy().into_owned(),
496 source: crate::hooks::step::SessionSource::Resume,
497 additional_context: Vec::new(),
498 };
499 let _ = self.hook_engine.dispatch(&mut step, ctx).await;
500 step.additional_context
501 };
502
503 let session_cancel = CancellationToken::new();
504 let (policy, modes) = self.session_policy_state();
505 let concrete = Arc::new(DefaultSession {
506 id: loaded.info.id.clone(),
507 cwd: loaded.info.cwd.clone(),
508 history: Arc::new(VecHistory::from_messages(loaded.history)) as Arc<dyn History>,
509 tools: Arc::new(CompositeRegistry::new(
510 session_tools,
511 self.process_tools.clone(),
512 )),
513 registry: self.registry.clone(),
514 provider_state: RwLock::new(initial),
515 policy,
516 modes,
517 events: Arc::new(EventEmitter::new()),
518 permissions: Arc::new(PermissionGate::new()),
519 turn_state: Mutex::new(TurnSlot::default()),
520 background: crate::session::BackgroundTasks::new(
521 session_cancel.clone(),
522 self.background_progress,
523 ),
524 goal: self.goal.clone(),
525 compaction_slot: crate::session::CompactionSlot::new(),
526 turn_freed: Arc::new(tokio::sync::Notify::new()),
527 session_cancel,
528 config: RwLock::new(
529 self.config
530 .read()
531 .expect("DefaultAgentCore config rwlock poisoned")
532 .clone(),
533 ),
534 fs,
535 shell,
536 frontend,
537 http: self.http.clone(),
538 hook_engine: self.hook_engine.clone(),
539 session_start_append,
540 request_audit: RequestAuditTracker::new(),
541 });
542 tokio::spawn(DefaultSession::drive(Arc::downgrade(&concrete)));
543 let session = concrete as Arc<dyn Session>;
544
545 let session_info = loaded.info;
546 for observer in &self.observers {
547 observer
548 .on_session_created(session.clone(), session_info.clone())
549 .map_err(AgentError::Observer)?;
550 }
551
552 self.sessions.insert(id, session.clone());
553 Ok(session)
554 })
555 }
556
557 fn session(&self, id: &SessionId) -> Option<Arc<dyn Session>> {
558 self.sessions.get(id).map(|r| r.value().clone())
559 }
560}
561
562impl DefaultAgentCore {
563 /// Look up the entry in the registry for the current [`TurnConfig::model`] and
564 /// resolve it into `(provider, hosted_capabilities)`. Shared by `create_session` /
565 /// `load_session`.
566 ///
567 /// The configured model must have an entry in the registry —
568 /// [`ProviderRegistry::new`] already validated the default model during CLI assembly,
569 /// so the only way to reach this error is builder misuse (registry and turn config
570 /// are inconsistent).
571 fn resolve_initial_provider(&self) -> Result<SessionProviderState, AgentError> {
572 let (vendor, model) = {
573 let cfg = self
574 .config
575 .read()
576 .expect("DefaultAgentCore config rwlock poisoned");
577 (cfg.provider.clone(), cfg.model.clone())
578 };
579 let entry = self.registry.entry_for(&vendor, &model).ok_or_else(|| {
580 AgentError::Other(BoxError::new(io::Error::other(format!(
581 "default model `{model}` is not declared by provider `{vendor}` in the registry"
582 ))))
583 })?;
584 let provider = entry.provider().clone();
585 let resolved = ResolvedSessionCapabilities::resolve(
586 entry.capabilities(),
587 provider.hosted_capabilities(),
588 &provider.info().vendor,
589 )?;
590 Ok(SessionProviderState {
591 provider,
592 hosted_capabilities: resolved.hosted,
593 })
594 }
595
596 /// Derive the initial `(active policy, mode catalog)` for a new session.
597 ///
598 /// When a [`ModeCatalog`] is configured: each session holds its own clone of the
599 /// catalog (so `current` can be switched independently), and the active policy is the
600 /// policy of the catalog's current mode. When not configured: the active policy is
601 /// the process-level `policy`, and there is no catalog (mode switching is
602 /// unavailable).
603 fn session_policy_state(&self) -> (RwLock<Arc<dyn SandboxPolicy>>, Option<Mutex<ModeCatalog>>) {
604 match &self.modes {
605 Some(catalog) => {
606 let catalog = catalog.clone();
607 let active = catalog.current_policy();
608 (RwLock::new(active), Some(Mutex::new(catalog)))
609 }
610 None => (RwLock::new(self.policy.clone()), None),
611 }
612 }
613}
614
615/// The currently selected real provider for the session, together with the parsed hosted
616/// capabilities of that provider.
617///
618/// Atomically replaced by `set_model` when switching providers.
619struct SessionProviderState {
620 provider: Arc<dyn LlmProvider>,
621 hosted_capabilities: HostedCapabilities,
622}
623
624pub struct DefaultSession {
625 id: SessionId,
626 cwd: PathBuf,
627 /// Use `Arc` instead of `Box` because the background compaction task
628 /// ([`CompactionSlot`](crate::session::CompactionSlot)) needs to hold it with a
629 /// `'static` lifetime across turns, requiring shared reference counting.
630 history: Arc<dyn History>,
631 tools: Arc<dyn ToolRegistry>,
632 /// Global provider directory. The session shares the same `Arc<ProviderRegistry>`
633 /// held by [`DefaultAgentCore`] — it is used to resolve candidates and owner
634 /// providers for `list_models` / `set_model`.
635 registry: Arc<ProviderRegistry>,
636 /// Current selected (provider, hosted_capabilities) state. `set_model` replaces the
637 /// entire pair when switching providers, ensuring `(provider, hosted_capabilities)`
638 /// is always consistent — there is no intermediate state where the provider has
639 /// changed but capabilities have not.
640 provider_state: RwLock<SessionProviderState>,
641 /// The currently active decision policy. Atomically replaced on `set_mode` (lock
642 /// order: after `modes`).
643 /// Uses `RwLock<Arc<_>>` rather than a bare `Arc` because the per-session permission
644 /// mode must be switchable at runtime; `run_turn` snapshots the policy via
645 /// `.read().clone()` at the start of each turn, so in-flight turns are unaffected by
646 /// subsequent switches (same semantics as `set_model`).
647 policy: RwLock<Arc<dyn SandboxPolicy>>,
648 /// Permission mode catalog. When `Some`, enables `session/set_mode` and ACP
649 /// `SessionModeState`; when `None`, `policy` is fixed and cannot be switched. Uses
650 /// `std::sync::Mutex` held only briefly, never across an await.
651 modes: Option<Mutex<ModeCatalog>>,
652 events: Arc<EventEmitter>,
653 permissions: Arc<PermissionGate>,
654 /// Single-turn mutex + cancel channel. `Some(token)` means a turn is running; `None`
655 /// means idle. The `std::sync::Mutex` is held briefly and never across an await.
656 turn_state: Mutex<TurnSlot>,
657 /// Session-level background task table (landing point for `run_in_background`). Holds
658 /// the task's `JoinHandle` to keep it alive past the originating turn; its internal
659 /// cancel token is independent of the turn's child token. `run_turn` clones it
660 /// through `TurnRunner` → `ToolContext` for injection into tools.
661 background: crate::session::BackgroundTasks,
662 /// Shared state for the `--goal` goal-driven loop. When `Some`, this session runs in
663 /// goal mode; the top-level turn injects it into tools via
664 /// [`crate::tool::ToolContext::goal`], and the `goal-gate` hook uses it to keep the
665 /// turn alive or let it proceed. Cloned from [`DefaultAgentCore::goal`]. `None` =
666 /// non-goal mode.
667 goal: Option<Arc<crate::session::GoalState>>,
668 /// Session-level single-flight compaction slot. When the soft watermark is exceeded,
669 /// the turn main loop asynchronously triggers one summary compaction without blocking
670 /// the current turn. See `session/turn/compaction_slot.rs`.
671 compaction_slot: crate::session::CompactionSlot,
672 /// Notifies when a turn slot is released. `TurnGuard::drop` calls `notify_one` — the
673 /// session driver waits on this after hitting `TurnInProgress`, so it can start an
674 /// autonomous turn continuation once the current turn ends (liveness guarantee for
675 /// autonomous continuation).
676 turn_freed: Arc<tokio::sync::Notify>,
677 /// Session-level cancellation token; cancelled when the session terminates, causing
678 /// the driver loop to exit. Also the source (same token) for cancellation tokens of
679 /// tasks inside `background`.
680 session_cancel: CancellationToken,
681 config: RwLock<TurnConfig>,
682 /// Session-level filesystem backend. Injected by [`AgentCore::create_session`];
683 /// `TurnRunner` borrows a `&dyn FsBackend` into [`crate::tool::ToolContext`] for
684 /// tools.
685 fs: Arc<dyn FsBackend>,
686 /// Session-level shell backend. Injected by [`AgentCore::create_session`] alongside
687 /// `fs`; the `bash` tool accesses it via [`crate::tool::ToolContext`].
688 shell: Arc<dyn ShellBackend>,
689 /// How the agent is accessed. Injected by [`AgentCore::create_session`] /
690 /// `load_session`, assembled into [`RunningContext`] during turn setup, and rendered
691 /// into the `# Environment` section of the system prompt.
692 frontend: Frontend,
693 /// HTTP fetch backend, shared across sessions in this core and held/cloned by
694 /// [`DefaultAgentCore`]. The `fetch` tool accesses it via
695 /// [`crate::tool::ToolContext`].
696 http: Arc<dyn HttpClient>,
697 /// Hook engine, shared across sessions in this core. When `run_turn` assembles
698 /// [`TurnRunner`], it borrows `&dyn HookEngine` to the main loop.
699 hook_engine: Arc<dyn HookEngine>,
700 /// Content appended by the `after_session_enter` hook during session startup (e.g.,
701 /// skill L1 manifest / always-on skill body). Populated by
702 /// [`AgentCore::create_session`] / `load_session` after the hook runs; on each turn,
703 /// when assembling the system prompt, [`merge_session_overlay`] merges it with the
704 /// explicit `config.system_prompt`, and
705 /// [`crate::session::prompt::resolve_system_prompt`] places it into the "Session
706 /// Instructions" section via hooks.
707 session_start_append: Vec<agent_client_protocol_schema::ContentBlock>,
708 /// Adjacent-request stability diagnostic. Emits a tracing record for every request
709 /// actually sent to the provider, helping to identify the source of cache misses.
710 request_audit: RequestAuditTracker,
711}
712
713impl DefaultSession {
714 fn current_provider(&self) -> Arc<dyn LlmProvider> {
715 self.provider_state
716 .read()
717 .expect("DefaultSession provider_state rwlock poisoned")
718 .provider
719 .clone()
720 }
721
722 fn current_hosted(&self) -> HostedCapabilities {
723 self.provider_state
724 .read()
725 .expect("DefaultSession provider_state rwlock poisoned")
726 .hosted_capabilities
727 }
728
729 /// Core execution of a turn, shared by user-initiated turns and automatic
730 /// continuation turns.
731 ///
732 /// `prompt` is either external input (user turn) or empty (automatic continuation
733 /// turn). In both cases, any completed background results are prepended as **prefix
734 /// blocks** to the prompt. An empty prompt with no background results does not start
735 /// a turn (returns `EndTurn` to avoid a no-op turn). Turn slot mutual exclusion is
736 /// still enforced at the top of this function.
737 async fn run_turn_core(
738 &self,
739 prompt: Vec<ContentBlock>,
740 ingest_source: crate::hooks::step::IngestSource,
741 ) -> Result<StopReason, TurnError> {
742 let span = tracing::info_span!(
743 "turn",
744 session_id = %short_id(self.id.0.as_ref()),
745 model = %self.current_model(),
746 );
747 async move {
748 let cancel = {
749 let mut slot = self
750 .turn_state
751 .lock()
752 .expect("DefaultSession turn_state mutex poisoned");
753 if slot.cancel.is_some() {
754 return Err(TurnError::TurnInProgress);
755 }
756 let cancel = CancellationToken::new();
757 slot.cancel = Some(cancel.clone());
758 cancel
759 };
760
761 // RAII guard: releases the slot and wakes the driver on any exit path
762 // (including panic inside an await).
763 let _guard = TurnGuard {
764 state: &self.turn_state,
765 freed: &self.turn_freed,
766 };
767
768 // Prepend completed background-task results as prefix blocks for the current
769 // prompt.
770 // Background task reflow.
771 let prompt = {
772 let outcomes = self.background.drain_completed();
773 if outcomes.is_empty() {
774 prompt
775 } else {
776 let mut blocks: Vec<ContentBlock> = outcomes
777 .iter()
778 .map(|o| {
779 ContentBlock::from(
780 crate::session::format_background_outcome(o).as_str(),
781 )
782 })
783 .collect();
784 blocks.extend(prompt);
785 blocks
786 }
787 };
788
789 // Empty prompt (autonomous turn with no background results to consume) — skip
790 // the turn to avoid spinning.
791 if prompt.is_empty() {
792 return Ok(StopReason::EndTurn);
793 }
794
795 let config = self
796 .config
797 .read()
798 .expect("DefaultSession config rwlock poisoned")
799 .clone();
800 // Snapshot (provider, hosted) once at turn start — concurrent set_model
801 // requests within the same turn still use the chosen provider; changes take
802 // effect on the next turn.
803 let provider = self.current_provider();
804 let hosted = self.current_hosted();
805 let running_ctx = RunningContext::new(self.frontend, &self.cwd);
806 // Merge session-scoped injection (the `additional_context` from the
807 // `after_session_enter` hook, e.g. skill L1 manifest / always-on skill body)
808 // with the explicit `system_prompt` into a single "Session Instructions"
809 // overlay — both originate from the same source and target the same location,
810 // so no extra parameter is needed.
811 let session_overlay =
812 merge_session_overlay(config.system_prompt.as_deref(), &self.session_start_append);
813 let system_prompt = resolve_system_prompt(
814 &running_ctx,
815 &provider.info().vendor,
816 &config.model,
817 &config.base_prompt,
818 &config.prompt,
819 session_overlay.as_deref(),
820 )
821 .map_err(|err| TurnError::Internal(BoxError::new(err)))?;
822 // Snapshot the active policy for this turn: an in-progress turn uses a fixed
823 // policy, so
824 // `session/set_mode` changes only affect subsequent turns (same semantics as
825 // `set_model`).
826 // Use an owned `Arc` rather than a borrow — it flows with `ToolContext` into
827 // `spawn_agent`,
828 // ensuring child agents capture the parent's actual policy at this moment,
829 // not a stale process-level default.
830 let policy = self
831 .policy
832 .read()
833 .expect("DefaultSession policy rwlock poisoned")
834 .clone();
835 let runner = TurnRunner {
836 history: self.history.as_ref(),
837 tools: self.tools.as_ref(),
838 provider: provider.as_ref(),
839 policy,
840 events: self.events.clone(),
841 permissions: self.permissions.as_ref(),
842 cancel,
843 config: &config,
844 system_prompt: system_prompt.map(Arc::from),
845 cwd: &self.cwd,
846 fs: self.fs.clone(),
847 shell: self.shell.clone(),
848 http: self.http.clone(),
849 hosted_capabilities: hosted,
850 hooks: self.hook_engine.as_ref(),
851 session_id: &self.id,
852 request_audit: &self.request_audit,
853 // Inject the session-level background task handle into the top-level
854 // turn, enabling the tool's `run_in_background` capability. Nested
855 // sub-agent turns do not receive this injection (see `spawn_agent`).
856 background: Some(self.background.clone()),
857 // Top-level turn injects the goal-loop state (`Some` under `--goal`
858 // mode); the `goal_done` tool and `goal-gate` hook use it to drive
859 // multi-turn autonomous loops. `None` in non-goal mode.
860 goal: self.goal.clone(),
861 // Inject the compaction slot, history Arc, and provider Arc into the
862 // top-level turn so that summary compaction can be triggered
863 // asynchronously when the soft watermark is exceeded. Sub-agent turns
864 // pass `None` for all of these (see `spawn_agent`).
865 compaction_slot: Some(self.compaction_slot.clone()),
866 history_arc: Some(self.history.clone()),
867 provider_arc: Some(provider.clone()),
868 session_cancel: Some(self.session_cancel.clone()),
869 ingest_source,
870 };
871
872 runner.run(prompt).await
873 }
874 .instrument(span)
875 .await
876 }
877
878 /// Session driver loop (autonomous turn continuation): a long-lived task that starts
879 /// an autonomous turn when a background task completes, consuming its results.
880 /// Spawned during `create_session` / `load_session`.
881 ///
882 /// Holds `Weak<Self>` instead of `Arc`: the driver must not keep the session alive
883 /// indefinitely. Each iteration first calls `upgrade` — when all external strong
884 /// references (the `AgentCore.sessions` DashMap) are gone, `upgrade` fails and the
885 /// driver exits. `session_cancel` is the explicit exit signal (process shutdown /
886 /// future session eviction).
887 ///
888 /// Two waiting paths:
889 /// - `background.wait_for_completion()`: a task completed → prepare to start an
890 /// autonomous turn;
891 /// - `session_cancel.cancelled()`: session terminated → exit the loop.
892 ///
893 /// If a `TurnInProgress` is encountered before starting a turn (a user turn is
894 /// running), wait for `turn_freed` and retry — this is exactly where user input and
895 /// background results contend for the same turn slot: if the user turn arrives first,
896 /// it runs, and the background result either hitches a ride (via `run_turn_core`'s
897 /// drain) or waits for it to finish before starting its own turn.
898 async fn drive(weak: std::sync::Weak<Self>) {
899 loop {
900 let Some(this) = weak.upgrade() else { break };
901 if this.session_cancel.is_cancelled() {
902 break;
903 }
904 // First take the notified() future, then check the queue — avoid missing
905 // completion notifications that arrive between the two steps.
906 let completion = this.background.wait_for_completion();
907 if this.background.has_completed() {
908 this.run_autonomous_turn_with_retry().await;
909 continue;
910 }
911 tokio::select! {
912 () = completion => {
913 this.run_autonomous_turn_with_retry().await;
914 }
915 () = this.session_cancel.cancelled() => break,
916 }
917 }
918 }
919
920 /// Run an autonomous turn; if the turn slot is occupied (a user turn is running),
921 /// wait for it to be released and retry, up to the point where the result is
922 /// consumed. Abort when `session_cancel` fires.
923 async fn run_autonomous_turn_with_retry(self: &Arc<Self>) {
924 loop {
925 if self.session_cancel.is_cancelled() {
926 return;
927 }
928 match self
929 .run_turn_core(Vec::new(), crate::hooks::step::IngestSource::Background)
930 .await
931 {
932 Err(TurnError::TurnInProgress) => {
933 // A user turn is in progress. Wait for it to finish — its
934 // `run_turn_core` will drain our background results (piggybacking),
935 // so `has_completed` will be empty here and we exit naturally.
936 tokio::select! {
937 () = self.turn_freed.notified() => {}
938 () = self.session_cancel.cancelled() => return,
939 }
940 if !self.background.has_completed() {
941 // Consumed by an in-flight user turn (piggybacking) — no need to
942 // start an autonomous turn.
943 return;
944 }
945 }
946 _ => return,
947 }
948 }
949 }
950}
951
952impl Drop for DefaultSession {
953 fn drop(&mut self) {
954 // On session drop, cancel `session_cancel`: this kills all in-flight background
955 // tasks and wakes the driver loop's `session_cancel.cancelled()` branch so it
956 // exits (the driver holds a `Weak`, which will now fail to upgrade).
957 self.session_cancel.cancel();
958 }
959}
960
961#[derive(Default)]
962struct TurnSlot {
963 cancel: Option<CancellationToken>,
964}
965
966/// A guard that occupies a turn slot on construction and releases it on drop.
967struct TurnGuard<'a> {
968 state: &'a Mutex<TurnSlot>,
969 /// Notifies the session driver when the turn is released (liveness guarantee for
970 /// proactive turn renewal).
971 freed: &'a tokio::sync::Notify,
972}
973
974impl<'a> Drop for TurnGuard<'a> {
975 fn drop(&mut self) {
976 if let Ok(mut slot) = self.state.lock() {
977 slot.cancel = None;
978 }
979 // Turn slot is now empty; wake the driver that may be waiting to start its own
980 // turn.
981 self.freed.notify_one();
982 }
983}
984
985impl Session for DefaultSession {
986 fn id(&self) -> &SessionId {
987 &self.id
988 }
989
990 fn provider_info(&self) -> ProviderInfo {
991 self.current_provider().info()
992 }
993
994 fn current_model(&self) -> String {
995 self.config
996 .read()
997 .expect("DefaultSession config rwlock poisoned")
998 .model
999 .clone()
1000 }
1001
1002 fn list_models(&self) -> BoxFuture<'_, Result<Vec<ModelInfo>, ProviderError>> {
1003 Box::pin(async move {
1004 // Under multi-provider assembly, the candidate set comes from the registry —
1005 // each entry already carries its own model list (populated during CLI
1006 // assembly). It is then filtered against the session's `allowed_models`
1007 // allowlist. The registry makes no network requests, so this path always
1008 // succeeds.
1009 let allowed_models = self
1010 .config
1011 .read()
1012 .expect("DefaultSession config rwlock poisoned")
1013 .allowed_models
1014 .clone();
1015 let candidates = self.registry.list_candidates();
1016 let mut models: Vec<ModelInfo> = candidates
1017 .into_iter()
1018 .map(|candidate| {
1019 decorate_with_provider_display(candidate.model, &candidate.provider)
1020 })
1021 .collect();
1022 models = filter_allowed_models(models, allowed_models.as_deref());
1023 Ok(models)
1024 })
1025 }
1026
1027 fn list_candidates(&self) -> BoxFuture<'_, Result<Vec<ModelCandidate>, ProviderError>> {
1028 Box::pin(async move {
1029 let allowed_models = self
1030 .config
1031 .read()
1032 .expect("DefaultSession config rwlock poisoned")
1033 .allowed_models
1034 .clone();
1035 let candidates = self.registry.list_candidates();
1036 let candidates: Vec<ModelCandidate> = match allowed_models {
1037 Some(allowed) => candidates
1038 .into_iter()
1039 .filter(|c| allowed.iter().any(|id| id == &c.model.id))
1040 .collect(),
1041 None => candidates,
1042 };
1043 Ok(candidates)
1044 })
1045 }
1046
1047 fn set_model(&self, selection: ModelSelection) -> BoxFuture<'_, Result<(), ProviderError>> {
1048 Box::pin(async move {
1049 let ModelSelection { provider, model } = selection;
1050 let allowed_models = self
1051 .config
1052 .read()
1053 .expect("DefaultSession config rwlock poisoned")
1054 .allowed_models
1055 .clone();
1056 // Note: `allowed_models` is a flat list of model IDs without a vendor
1057 // dimension — models with the same name are allowed or denied uniformly
1058 // across all providers. Pairing is deferred to future work.
1059 if let Some(allowed_models) = allowed_models.as_ref()
1060 && !allowed_models.iter().any(|allowed| allowed == &model)
1061 {
1062 return Err(ProviderError::new(ProviderErrorKind::ModelNotFound {
1063 model,
1064 }));
1065 }
1066
1067 let Some(entry) = self.registry.entry_for(&provider, &model) else {
1068 return Err(ProviderError::new(ProviderErrorKind::ModelNotFound {
1069 model,
1070 }));
1071 };
1072
1073 // When switching providers, re-resolve hosted capabilities: each entry
1074 // carries its own [`SessionCapabilitiesConfig`], which is cross-referenced
1075 // with the provider's hosted_capabilities. If the delegate is not supported
1076 // by the provider, a `ProviderError` is returned — preserving the stable
1077 // failure semantics of `set_model`.
1078 let new_provider = entry.provider().clone();
1079 let resolved = ResolvedSessionCapabilities::resolve(
1080 entry.capabilities(),
1081 new_provider.hosted_capabilities(),
1082 &new_provider.info().vendor,
1083 )
1084 .map_err(|err| {
1085 ProviderError::new(ProviderErrorKind::Other(BoxError::new(io::Error::other(
1086 err.to_string(),
1087 ))))
1088 })?;
1089
1090 // Lock order: `provider_state` before `config`, matching the snapshot path in
1091 // `run_turn`.
1092 // The window where both write locks are held is very short (just a few
1093 // assignments) and will not block the main loop.
1094 {
1095 let mut state = self
1096 .provider_state
1097 .write()
1098 .expect("DefaultSession provider_state rwlock poisoned");
1099 state.provider = new_provider;
1100 state.hosted_capabilities = resolved.hosted;
1101 }
1102 let mut config = self
1103 .config
1104 .write()
1105 .expect("DefaultSession config rwlock poisoned");
1106 config.provider = provider;
1107 config.model = model;
1108 Ok(())
1109 })
1110 }
1111
1112 fn current_mode(&self) -> Option<String> {
1113 self.modes.as_ref().map(|m| {
1114 m.lock()
1115 .expect("DefaultSession modes mutex poisoned")
1116 .current_id()
1117 .to_string()
1118 })
1119 }
1120
1121 fn available_modes(&self) -> Vec<crate::session::ModeDescriptor> {
1122 let Some(modes) = self.modes.as_ref() else {
1123 return Vec::new();
1124 };
1125 modes
1126 .lock()
1127 .expect("DefaultSession modes mutex poisoned")
1128 .modes()
1129 .iter()
1130 .map(|m| crate::session::ModeDescriptor {
1131 id: m.id.clone(),
1132 name: m.name.clone(),
1133 description: m.description.clone(),
1134 })
1135 .collect()
1136 }
1137
1138 fn set_mode(&self, mode_id: String) -> Result<(), AgentError> {
1139 let Some(modes) = self.modes.as_ref() else {
1140 return Err(AgentError::ModeNotFound(mode_id));
1141 };
1142 // Lock order: `modes` before `policy` (no overlap with `run_turn`'s read path —
1143 // `run_turn` only reads `policy`). Both locks are held briefly and never across
1144 // an `.await`.
1145 let mut catalog = modes.lock().expect("DefaultSession modes mutex poisoned");
1146 if !catalog.set_current(&mode_id) {
1147 return Err(AgentError::ModeNotFound(mode_id));
1148 }
1149 let active = catalog.current_policy();
1150 *self
1151 .policy
1152 .write()
1153 .expect("DefaultSession policy rwlock poisoned") = active;
1154 Ok(())
1155 }
1156
1157 fn current_reasoning_effort(&self) -> Option<ReasoningEffort> {
1158 self.config
1159 .read()
1160 .expect("DefaultSession config rwlock poisoned")
1161 .sampling
1162 .reasoning_effort
1163 }
1164
1165 fn set_reasoning_effort(&self, effort: Option<ReasoningEffort>) {
1166 self.config
1167 .write()
1168 .expect("DefaultSession config rwlock poisoned")
1169 .sampling
1170 .reasoning_effort = effort;
1171 }
1172
1173 fn subscribe(&self) -> EventStream {
1174 self.events.subscribe()
1175 }
1176
1177 fn history_snapshot(&self) -> Vec<Message> {
1178 self.history.snapshot()
1179 }
1180
1181 fn run_turn(&self, prompt: Vec<ContentBlock>) -> BoxFuture<'_, Result<StopReason, TurnError>> {
1182 // User-driven turn: piggybacks completed background results as a prefix to the
1183 // prompt.
1184 // (Passive backflow, complementary to active continuation — active continuation
1185 // handles idle state, piggybacking handles "user happened to speak up".)
1186 Box::pin(self.run_turn_core(prompt, crate::hooks::step::IngestSource::User))
1187 }
1188
1189 fn cancel_turn(&self) {
1190 let token = {
1191 let slot = self
1192 .turn_state
1193 .lock()
1194 .expect("DefaultSession turn_state mutex poisoned");
1195 slot.cancel.clone()
1196 };
1197 if let Some(token) = token {
1198 token.cancel();
1199 }
1200 // No turn running → no-op (idempotent)
1201 }
1202
1203 fn resolve_permission(&self, id: ToolCallId, outcome: PermissionResolution) {
1204 self.permissions.resolve(&id, outcome);
1205 }
1206
1207 fn context_status(&self) -> ContextStatus {
1208 let used_tokens = self.history.token_estimate();
1209 let context_window = {
1210 let model = self
1211 .config
1212 .read()
1213 .expect("DefaultSession config rwlock poisoned")
1214 .model
1215 .clone();
1216 self.current_provider()
1217 .model_info(&model)
1218 .and_then(|m| m.context_window)
1219 };
1220 let ratio = match (used_tokens, context_window) {
1221 (Some(used), Some(window)) if window > 0 => Some(used as f64 / window as f64),
1222 _ => None,
1223 };
1224 ContextStatus {
1225 used_tokens,
1226 context_window,
1227 ratio,
1228 }
1229 }
1230
1231 fn compact_now(&self) -> BoxFuture<'_, Result<Option<CompactionReport>, TurnError>> {
1232 Box::pin(async move {
1233 // A turn rewrites history concurrently with compaction; refuse rather than
1234 // race. The caller should `/cancel` or wait. (Held briefly, never across await.)
1235 {
1236 let slot = self
1237 .turn_state
1238 .lock()
1239 .expect("DefaultSession turn_state mutex poisoned");
1240 if slot.cancel.is_some() {
1241 return Err(TurnError::TurnInProgress);
1242 }
1243 }
1244
1245 let (model, sampling) = {
1246 let config = self
1247 .config
1248 .read()
1249 .expect("DefaultSession config rwlock poisoned");
1250 (config.model.clone(), config.sampling.clone())
1251 };
1252 let ctx = CompactionCtx {
1253 provider: self.current_provider(),
1254 model,
1255 sampling,
1256 tools: self.tools.schemas(),
1257 cancel: self.session_cancel.clone(),
1258 };
1259
1260 // Force a compaction regardless of watermark: use the current estimate as the
1261 // threshold so boundary selection keeps a sensible tail. If history is empty /
1262 // has no estimate, there is nothing to compact.
1263 let Some(threshold) = self.history.token_estimate() else {
1264 return Ok(None);
1265 };
1266 let report = run_sync_compaction(self.history.as_ref(), &ctx, threshold).await;
1267 if let Some(report) = report {
1268 self.events
1269 .emit(AgentEvent::ContextCompressed {
1270 tokens_before: report.tokens_before,
1271 tokens_after: report.tokens_after,
1272 })
1273 .await;
1274 }
1275 Ok(report)
1276 })
1277 }
1278}
1279
1280fn filter_allowed_models(
1281 available_models: Vec<ModelInfo>,
1282 allowed_models: Option<&[String]>,
1283) -> Vec<ModelInfo> {
1284 let Some(allowed_models) = allowed_models else {
1285 return available_models;
1286 };
1287
1288 available_models
1289 .into_iter()
1290 .filter(|model| allowed_models.iter().any(|allowed| allowed == &model.id))
1291 .collect()
1292}
1293
1294/// Prepend the provider name to the model's `display_name` so that ACP clients can
1295/// distinguish the same model ID served by different providers — the only reliable
1296/// disambiguation when multiple gateways expose the same model (e.g. "OpenAI: gpt-4o" vs
1297/// "gw-b: gpt-4o").
1298fn decorate_with_provider_display(mut model: ModelInfo, provider: &ProviderInfo) -> ModelInfo {
1299 let name = model
1300 .display_name
1301 .clone()
1302 .unwrap_or_else(|| model.id.clone());
1303 model.display_name = Some(format!("{}: {name}", provider.display_name));
1304 model
1305}
1306
1307/// Generates a session ID as a random UUID v4.
1308///
1309/// The `defect-acp` `session/new` handler needs a [`SessionId`] (to construct an
1310/// `AcpFsBackend`) before calling [`AgentCore::create_session`]; this function is
1311/// public so that both acp and tests can produce IDs in a consistent format.
1312///
1313/// Using a globally unique UUID instead of an in-process counter plus timestamp
1314/// avoids collisions across process restarts and concurrent instances, and allows
1315/// downstream consumers (storage on-disk directories, observability trace
1316/// correlation) to use it as a stable primary key.
1317pub fn new_session_id() -> String {
1318 uuid::Uuid::new_v4().to_string()
1319}
1320
1321/// Short session ID for tracing spans: takes the first 12 characters. Diagnostic use
1322/// only.
1323fn short_id(s: &str) -> &str {
1324 match s.char_indices().nth(12) {
1325 Some((idx, _)) => &s[..idx],
1326 None => s,
1327 }
1328}
1329
1330/// Merge the explicit `config.system_prompt` with the text-only content blocks from the
1331/// session-start hook's `append` into a single overlay string for the `session_overlay`
1332/// parameter of [`resolve_system_prompt`]. Returns `None` when both are empty (no empty
1333/// segment injected); when both are present they are separated by `\n\n`.
1334fn merge_session_overlay(system_prompt: Option<&str>, append: &[ContentBlock]) -> Option<String> {
1335 let appended: String = append
1336 .iter()
1337 .filter_map(|b| match b {
1338 ContentBlock::Text(t) => Some(t.text.as_str()),
1339 _ => None,
1340 })
1341 .collect::<Vec<_>>()
1342 .join("\n\n");
1343 match (system_prompt, appended.is_empty()) {
1344 (Some(sp), true) => Some(sp.to_owned()),
1345 (Some(sp), false) => Some(format!("{sp}\n\n{appended}")),
1346 (None, false) => Some(appended),
1347 (None, true) => None,
1348 }
1349}