1mod builder;
6pub mod comms_impl;
7pub mod compact;
8mod extraction;
9mod hook_impl;
10#[cfg(test)]
11mod hooks_behavior_tests;
12mod runner;
13pub mod skills;
14mod state;
15#[doc(hidden)]
16pub mod test_turn_state_handle;
17use crate::budget::Budget;
18use crate::comms::{
19 CommsCommand, EventStream, PeerDirectoryEntry, PeerId, SendAndStreamError, SendError,
20 SendReceipt, StreamError, StreamScope, TrustedPeerDescriptor,
21};
22use crate::compact::SessionCompactionCadence;
23use crate::completion_feed::CompletionSeq;
24use crate::config::{AgentConfig, HookRunOverrides};
25use crate::error::AgentError;
26use crate::event::ExternalToolDelta;
27use crate::hooks::HookEngine;
28use crate::lifecycle::RunId;
29use crate::lifecycle::run_primitive::ProviderParamsOverride;
30use crate::ops::OperationId;
31use crate::ops_lifecycle::{OperationKind, OperationStatus, OperationTerminalOutcome};
32use crate::retry::RetryPolicy;
33use crate::schema::{CompiledSchema, SchemaError};
34use crate::session::Session;
35use crate::state::LoopState;
36#[cfg(target_arch = "wasm32")]
37use crate::tokio;
38use crate::tool_catalog::{
39 ToolCatalogCapabilities, ToolCatalogEntry, ToolCatalogMode, deferred_session_entry_count,
40 select_catalog_mode_from_snapshot,
41};
42use crate::tool_scope::ToolScope;
43use crate::turn_execution_authority::{
44 ContentShape, TurnPhase, TurnPrimitiveKind, TurnTerminalCauseKind, TurnTerminalOutcome,
45};
46use crate::types::{
47 AssistantBlock, BlockAssistantMessage, Message, OutputSchema, StopReason, ToolCallView,
48 ToolDef, ToolName, ToolNameSet, Usage,
49};
50use async_trait::async_trait;
51use serde::{Deserialize, Serialize};
52use std::collections::BTreeSet;
53use std::sync::Arc;
54
55pub use builder::{AgentBuildPolicyError, AgentBuilder};
56pub use runner::AgentRunner;
57
58#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
60#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
61pub trait AgentLlmClient: Send + Sync {
62 async fn stream_response(
64 &self,
65 messages: &[Message],
66 tools: &[Arc<ToolDef>],
67 max_tokens: u32,
68 temperature: Option<f32>,
69 provider_params: Option<&ProviderParamsOverride>,
70 ) -> Result<LlmStreamResult, AgentError>;
71
72 fn provider(&self) -> &'static str;
74
75 fn model(&self) -> &str;
81
82 fn compile_schema(&self, output_schema: &OutputSchema) -> Result<CompiledSchema, SchemaError> {
88 Ok(CompiledSchema {
90 schema: output_schema.schema.as_value().clone(),
91 warnings: Vec::new(),
92 })
93 }
94}
95
96pub type AgentLlmClientDecorator =
102 Arc<dyn Fn(Arc<dyn AgentLlmClient>) -> Arc<dyn AgentLlmClient> + Send + Sync + 'static>;
103
104pub struct LlmStreamResult {
106 blocks: Vec<AssistantBlock>,
107 stop_reason: StopReason,
108 usage: Usage,
109}
110
111impl LlmStreamResult {
112 pub fn new(blocks: Vec<AssistantBlock>, stop_reason: StopReason, usage: Usage) -> Self {
113 Self {
114 blocks,
115 stop_reason,
116 usage,
117 }
118 }
119
120 pub fn blocks(&self) -> &[AssistantBlock] {
121 &self.blocks
122 }
123 pub fn stop_reason(&self) -> StopReason {
124 self.stop_reason
125 }
126 pub fn usage(&self) -> &Usage {
127 &self.usage
128 }
129
130 pub fn into_message(self) -> BlockAssistantMessage {
131 BlockAssistantMessage::new(self.blocks, self.stop_reason)
132 }
133
134 pub fn into_parts(self) -> (Vec<AssistantBlock>, StopReason, Usage) {
135 (self.blocks, self.stop_reason, self.usage)
136 }
137}
138
139#[derive(Debug, Clone, PartialEq, Eq)]
145pub struct AgentExecutionSnapshot {
146 pub loop_state: LoopState,
147 pub turn_phase: TurnPhase,
148 pub active_run_id: Option<RunId>,
149 pub primitive_kind: TurnPrimitiveKind,
150 pub admitted_content_shape: Option<ContentShape>,
151 pub vision_enabled: bool,
152 pub image_tool_results_enabled: bool,
153 pub tool_calls_pending: u32,
154 pub pending_operation_ids: Option<Vec<OperationId>>,
155 pub barrier_operation_ids: Vec<OperationId>,
156 pub has_barrier_ops: bool,
157 pub barrier_satisfied: bool,
158 pub boundary_count: u32,
159 pub cancel_after_boundary: bool,
160 pub terminal_outcome: TurnTerminalOutcome,
161 pub terminal_cause_kind: Option<TurnTerminalCauseKind>,
162 pub extraction_attempts: u32,
163 pub max_extraction_retries: u32,
164 pub applied_cursor: CompletionSeq,
165}
166
167#[derive(Debug, Clone, Default)]
171pub struct ExternalToolUpdate {
172 pub notices: Vec<ExternalToolDelta>,
174 pub pending: Vec<String>,
176 pub background_completions: Vec<DetachedOpCompletion>,
178}
179
180#[derive(Debug, Clone, Default, PartialEq, Eq)]
187pub struct ToolDispatchContext {
188 current_turn: Option<CurrentTurnContent>,
189}
190
191impl ToolDispatchContext {
192 pub fn from_current_turn_input(input: &crate::types::ContentInput) -> Self {
193 let blocks = match input {
194 crate::types::ContentInput::Text(_) => None,
195 crate::types::ContentInput::Blocks(blocks) => Some(blocks.clone()),
196 };
197 Self {
198 current_turn: blocks.map(CurrentTurnContent::new),
199 }
200 }
201
202 pub fn current_turn(&self) -> Option<&CurrentTurnContent> {
203 self.current_turn.as_ref()
204 }
205
206 pub fn current_turn_image(&self, index: usize) -> Option<&crate::types::ContentBlock> {
207 self.current_turn
208 .as_ref()
209 .and_then(|current_turn| current_turn.image(index))
210 }
211}
212
213#[derive(Debug, Clone, PartialEq, Eq)]
215pub struct CurrentTurnContent {
216 blocks: Vec<crate::types::ContentBlock>,
217}
218
219impl CurrentTurnContent {
220 pub fn new(blocks: Vec<crate::types::ContentBlock>) -> Self {
221 Self { blocks }
222 }
223
224 pub fn blocks(&self) -> &[crate::types::ContentBlock] {
225 &self.blocks
226 }
227
228 pub fn image(&self, index: usize) -> Option<&crate::types::ContentBlock> {
229 self.blocks
230 .iter()
231 .filter(|block| matches!(block, crate::types::ContentBlock::Image { .. }))
232 .nth(index)
233 }
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize)]
243pub struct DetachedOpCompletion {
244 pub job_id: String,
246 pub kind: OperationKind,
248 pub status: OperationStatus,
250 pub terminal_outcome: Option<OperationTerminalOutcome>,
252 pub display_name: String,
254 pub detail: String,
256 pub elapsed_ms: Option<u64>,
258}
259
260#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
265pub struct DispatcherCapabilities {
266 pub ops_lifecycle: bool,
268}
269
270pub enum BindOutcome {
281 Bound(Arc<dyn AgentToolDispatcher>),
283 Skipped(Arc<dyn AgentToolDispatcher>),
286}
287
288impl BindOutcome {
289 pub fn into_dispatcher(self) -> Arc<dyn AgentToolDispatcher> {
291 match self {
292 Self::Bound(d) | Self::Skipped(d) => d,
293 }
294 }
295
296 pub fn was_bound(&self) -> bool {
298 matches!(self, Self::Bound(_))
299 }
300}
301
302#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
304#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
305pub trait AgentToolDispatcher: Send + Sync {
306 fn tools(&self) -> Arc<[Arc<ToolDef>]>;
308
309 fn tool_catalog_capabilities(&self) -> ToolCatalogCapabilities {
315 ToolCatalogCapabilities::default()
316 }
317
318 fn tool_catalog(&self) -> Arc<[ToolCatalogEntry]> {
324 self.tools()
325 .iter()
326 .map(|tool| ToolCatalogEntry::session_inline(Arc::clone(tool), true))
327 .collect::<Vec<_>>()
328 .into()
329 }
330
331 fn pending_catalog_sources(&self) -> Arc<[String]> {
336 Arc::from([])
337 }
338
339 async fn dispatch(
345 &self,
346 call: ToolCallView<'_>,
347 ) -> Result<crate::ops::ToolDispatchOutcome, crate::error::ToolError>;
348
349 async fn dispatch_with_context(
355 &self,
356 call: ToolCallView<'_>,
357 _context: &ToolDispatchContext,
358 ) -> Result<crate::ops::ToolDispatchOutcome, crate::error::ToolError> {
359 self.dispatch(call).await
360 }
361
362 async fn poll_external_updates(&self) -> ExternalToolUpdate {
368 ExternalToolUpdate::default()
369 }
370
371 fn external_tool_surface_snapshot(&self) -> Option<crate::ExternalToolSurfaceSnapshot> {
377 None
378 }
379
380 fn capabilities(&self) -> DispatcherCapabilities {
382 DispatcherCapabilities::default()
383 }
384
385 fn bind_ops_lifecycle(
393 self: Arc<Self>,
394 _registry: Arc<dyn crate::ops_lifecycle::OpsLifecycleRegistry>,
395 _owner_bridge_session_id: crate::types::SessionId,
396 ) -> Result<BindOutcome, OpsLifecycleBindError> {
397 Err(OpsLifecycleBindError::Unsupported)
398 }
399
400 fn completion_enrichment(
405 &self,
406 ) -> Option<Arc<dyn crate::completion_feed::CompletionEnrichmentProvider>> {
407 None
408 }
409
410 fn bind_mcp_server_lifecycle_handle(
417 &self,
418 _handle: Arc<dyn crate::handles::McpServerLifecycleHandle>,
419 ) {
420 }
421
422 fn bind_external_tool_surface_handle(
429 &self,
430 _handle: Arc<dyn crate::handles::ExternalToolSurfaceHandle>,
431 ) {
432 }
433}
434
435pub fn select_tool_catalog_mode<T>(dispatcher: &T) -> ToolCatalogMode
437where
438 T: AgentToolDispatcher + ?Sized,
439{
440 let capabilities = dispatcher.tool_catalog_capabilities();
441 if !capabilities.exact_catalog {
442 return ToolCatalogMode::Inline;
443 }
444 let pending_sources = dispatcher.pending_catalog_sources();
445 let catalog = dispatcher.tool_catalog();
446 select_catalog_mode_from_snapshot(
447 capabilities.exact_catalog,
448 catalog.as_ref(),
449 pending_sources.as_ref(),
450 )
451}
452
453pub fn should_compose_tool_catalog_control_plane<T>(dispatcher: &T) -> bool
456where
457 T: AgentToolDispatcher + ?Sized,
458{
459 let capabilities = dispatcher.tool_catalog_capabilities();
460 if !capabilities.exact_catalog {
461 return false;
462 }
463 if capabilities.may_require_catalog_control_plane {
464 return true;
465 }
466
467 let pending_sources = dispatcher.pending_catalog_sources();
468 if !pending_sources.is_empty() {
469 return true;
470 }
471
472 let catalog = dispatcher.tool_catalog();
473 deferred_session_entry_count(catalog.as_ref()) > 0
474}
475
476#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
478pub enum OpsLifecycleBindError {
479 #[error("ops lifecycle binding is unsupported")]
480 Unsupported,
481 #[error("dispatcher has shared ownership and cannot be rebound")]
482 SharedOwnership,
483}
484
485pub struct FilteredToolDispatcher<T: AgentToolDispatcher + ?Sized> {
492 inner: Arc<T>,
493 allowed_tools: ToolNameSet,
494 filtered_tools: Arc<[Arc<ToolDef>]>,
496}
497
498impl<T: AgentToolDispatcher + ?Sized> FilteredToolDispatcher<T> {
499 pub fn new<I, N>(inner: Arc<T>, allowed_tools: I) -> Self
500 where
501 I: IntoIterator<Item = N>,
502 N: Into<ToolName>,
503 {
504 let allowed_set: ToolNameSet = allowed_tools
505 .into_iter()
506 .map(Into::into)
507 .collect::<ToolNameSet>();
508
509 let filtered: Vec<Arc<ToolDef>> = if inner.tool_catalog_capabilities().exact_catalog {
510 inner
511 .tool_catalog()
512 .iter()
513 .filter(|entry| entry.currently_callable())
514 .map(|entry| Arc::clone(&entry.tool))
515 .filter(|t| allowed_set.contains(t.name.as_str()))
516 .collect()
517 } else {
518 inner
519 .tools()
520 .iter()
521 .filter(|t| allowed_set.contains(t.name.as_str()))
522 .map(Arc::clone)
523 .collect()
524 };
525
526 Self {
527 inner,
528 allowed_tools: allowed_set,
529 filtered_tools: filtered.into(),
530 }
531 }
532}
533
534#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
535#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
536impl<T: AgentToolDispatcher + ?Sized + 'static> AgentToolDispatcher for FilteredToolDispatcher<T> {
537 fn tools(&self) -> Arc<[Arc<ToolDef>]> {
538 if self.inner.tool_catalog_capabilities().exact_catalog {
539 return self
540 .inner
541 .tool_catalog()
542 .iter()
543 .filter(|entry| entry.currently_callable())
544 .map(|entry| Arc::clone(&entry.tool))
545 .filter(|tool| self.allowed_tools.contains(tool.name.as_str()))
546 .collect::<Vec<_>>()
547 .into();
548 }
549 Arc::clone(&self.filtered_tools)
550 }
551
552 async fn dispatch(
553 &self,
554 call: ToolCallView<'_>,
555 ) -> Result<crate::ops::ToolDispatchOutcome, crate::error::ToolError> {
556 self.dispatch_with_context(call, &ToolDispatchContext::default())
557 .await
558 }
559
560 async fn dispatch_with_context(
561 &self,
562 call: ToolCallView<'_>,
563 context: &ToolDispatchContext,
564 ) -> Result<crate::ops::ToolDispatchOutcome, crate::error::ToolError> {
565 if !self.allowed_tools.contains(call.name) {
566 let inner_knows_tool = if self.inner.tool_catalog_capabilities().exact_catalog {
567 self.inner
568 .tool_catalog()
569 .iter()
570 .any(|entry| entry.tool.name == call.name)
571 } else {
572 self.inner.tools().iter().any(|tool| tool.name == call.name)
573 };
574 if !inner_knows_tool {
575 return Err(crate::error::ToolError::not_found(call.name));
576 }
577 return Err(crate::error::ToolError::access_denied(call.name));
578 }
579 self.inner.dispatch_with_context(call, context).await
580 }
581
582 fn tool_catalog_capabilities(&self) -> ToolCatalogCapabilities {
583 self.inner.tool_catalog_capabilities()
584 }
585
586 fn tool_catalog(&self) -> Arc<[ToolCatalogEntry]> {
587 if !self.inner.tool_catalog_capabilities().exact_catalog {
588 return self
589 .tools()
590 .iter()
591 .map(|tool| ToolCatalogEntry::session_inline(Arc::clone(tool), true))
592 .collect::<Vec<_>>()
593 .into();
594 }
595 self.inner
596 .tool_catalog()
597 .iter()
598 .filter(|entry| self.allowed_tools.contains(entry.tool.name.as_str()))
599 .cloned()
600 .collect::<Vec<_>>()
601 .into()
602 }
603
604 fn pending_catalog_sources(&self) -> Arc<[String]> {
605 self.inner.pending_catalog_sources()
606 }
607
608 async fn poll_external_updates(&self) -> ExternalToolUpdate {
609 self.inner.poll_external_updates().await
610 }
611
612 fn external_tool_surface_snapshot(&self) -> Option<crate::ExternalToolSurfaceSnapshot> {
613 self.inner.external_tool_surface_snapshot()
614 }
615
616 fn capabilities(&self) -> DispatcherCapabilities {
617 self.inner.capabilities()
618 }
619
620 fn bind_ops_lifecycle(
621 self: Arc<Self>,
622 registry: Arc<dyn crate::ops_lifecycle::OpsLifecycleRegistry>,
623 owner_bridge_session_id: crate::types::SessionId,
624 ) -> Result<BindOutcome, OpsLifecycleBindError> {
625 let owned = Arc::try_unwrap(self).map_err(|_| OpsLifecycleBindError::SharedOwnership)?;
626 if Arc::strong_count(&owned.inner) == 1 {
627 let outcome = owned
628 .inner
629 .bind_ops_lifecycle(registry, owner_bridge_session_id)?;
630 let bound = outcome.was_bound();
631 let d = outcome.into_dispatcher();
632 let allowed_tools = owned.allowed_tools.into_iter().collect::<Vec<_>>();
633 Ok(if bound {
634 BindOutcome::Bound(Arc::new(FilteredToolDispatcher::new(d, allowed_tools)))
635 } else {
636 BindOutcome::Skipped(Arc::new(FilteredToolDispatcher::new(d, allowed_tools)))
637 })
638 } else {
639 Ok(BindOutcome::Skipped(Arc::new(FilteredToolDispatcher {
640 inner: owned.inner,
641 allowed_tools: owned.allowed_tools,
642 filtered_tools: owned.filtered_tools,
643 })))
644 }
645 }
646
647 fn completion_enrichment(
648 &self,
649 ) -> Option<Arc<dyn crate::completion_feed::CompletionEnrichmentProvider>> {
650 self.inner.completion_enrichment()
651 }
652}
653
654#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
656#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
657pub trait AgentSessionStore: Send + Sync {
658 async fn save(&self, session: &Session) -> Result<(), AgentError>;
659 async fn load(&self, id: &str) -> Result<Option<Session>, AgentError>;
660}
661
662#[derive(Debug, Clone, Copy, PartialEq, Eq)]
664pub enum InlinePeerNotificationPolicy {
665 Always,
667 Never,
669 AtMost(usize),
671}
672
673pub const DEFAULT_MAX_INLINE_PEER_NOTIFICATIONS: usize = 50;
675
676impl InlinePeerNotificationPolicy {
677 pub fn try_from_raw(raw: Option<i32>) -> Result<Self, i32> {
679 match raw {
680 None => Ok(Self::AtMost(DEFAULT_MAX_INLINE_PEER_NOTIFICATIONS)),
681 Some(-1) => Ok(Self::Always),
682 Some(0) => Ok(Self::Never),
683 Some(v) if v > 0 => Ok(Self::AtMost(v as usize)),
684 Some(v) => Err(v),
685 }
686 }
687}
688
689#[derive(Debug, thiserror::Error)]
691pub enum CommsCapabilityError {
692 #[error("comms capability not supported: {0}")]
694 Unsupported(String),
695}
696
697#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
699#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
700pub trait CommsRuntime: Send + Sync {
701 fn peer_id(&self) -> Option<PeerId> {
709 self.public_key()
710 .as_deref()
711 .and_then(|public_key| PeerId::parse(public_key).ok())
712 }
713
714 fn public_key(&self) -> Option<String> {
720 None
721 }
722
723 fn public_key_bytes(&self) -> Option<[u8; 32]> {
729 None
730 }
731
732 fn comms_name(&self) -> Option<String> {
738 None
739 }
740
741 fn advertised_address(&self) -> Option<String> {
747 None
748 }
749
750 fn bridge_bootstrap_token(&self) -> Option<String> {
753 None
754 }
755
756 async fn add_trusted_peer(&self, _peer: TrustedPeerDescriptor) -> Result<(), SendError> {
762 Err(SendError::Unsupported(
763 "add_trusted_peer not supported for this CommsRuntime".to_string(),
764 ))
765 }
766
767 async fn remove_trusted_peer(&self, _peer_id: &str) -> Result<bool, SendError> {
773 Err(SendError::Unsupported(
774 "remove_trusted_peer not supported for this CommsRuntime".to_string(),
775 ))
776 }
777
778 async fn add_private_trusted_peer(
789 &self,
790 _peer: TrustedPeerDescriptor,
791 ) -> Result<(), SendError> {
792 Err(SendError::Unsupported(
793 "add_private_trusted_peer not supported for this CommsRuntime".to_string(),
794 ))
795 }
796
797 async fn remove_private_trusted_peer(&self, _peer_id: &str) -> Result<bool, SendError> {
802 Err(SendError::Unsupported(
803 "remove_private_trusted_peer not supported for this CommsRuntime".to_string(),
804 ))
805 }
806
807 async fn send(&self, _cmd: CommsCommand) -> Result<SendReceipt, SendError> {
809 Err(SendError::Unsupported(
810 "send not implemented for this CommsRuntime".to_string(),
811 ))
812 }
813
814 #[doc(hidden)]
815 fn stream(&self, scope: StreamScope) -> Result<EventStream, StreamError> {
816 let scope_desc = match scope {
817 StreamScope::Session(session_id) => format!("session {session_id}"),
818 StreamScope::Interaction(interaction_id) => format!("interaction {}", interaction_id.0),
819 };
820 Err(StreamError::NotFound(scope_desc))
821 }
822
823 async fn peers(&self) -> Vec<PeerDirectoryEntry> {
825 Vec::new()
826 }
827
828 async fn peer_count(&self) -> usize {
832 self.peers().await.len()
833 }
834
835 #[doc(hidden)]
836 async fn send_and_stream(
837 &self,
838 cmd: CommsCommand,
839 ) -> Result<(SendReceipt, EventStream), SendAndStreamError> {
840 let receipt = self.send(cmd).await?;
841 Err(SendAndStreamError::StreamAttach {
842 receipt,
843 error: StreamError::Internal(
844 "send_and_stream is not implemented for this runtime".to_string(),
845 ),
846 })
847 }
848
849 async fn drain_messages(&self) -> Vec<String>;
851 fn inbox_notify(&self) -> Arc<tokio::sync::Notify>;
853 fn dismiss_received(&self) -> bool {
855 false
856 }
857 fn event_injector(&self) -> Option<Arc<dyn crate::EventInjector>> {
862 None
863 }
864
865 #[doc(hidden)]
867 fn interaction_event_injector(
868 &self,
869 ) -> Option<Arc<dyn crate::event_injector::SubscribableInjector>> {
870 None
871 }
872
873 async fn drain_inbox_interactions(&self) -> Vec<crate::interaction::InboxInteraction> {
878 self.drain_messages()
879 .await
880 .into_iter()
881 .map(|text| crate::interaction::InboxInteraction {
882 id: crate::interaction::InteractionId(uuid::Uuid::new_v4()),
883 from_route: None,
884 from: "unknown".into(),
885 content: crate::interaction::InteractionContent::Message {
886 body: text.clone(),
887 blocks: None,
888 },
889 rendered_text: text,
890 handling_mode: crate::types::HandlingMode::Queue,
891 render_metadata: None,
892 })
893 .collect()
894 }
895
896 fn interaction_subscriber(
901 &self,
902 _id: &crate::interaction::InteractionId,
903 ) -> Option<tokio::sync::mpsc::Sender<crate::event::AgentEvent>> {
904 None
905 }
906
907 fn take_interaction_stream_sender(
909 &self,
910 _id: &crate::interaction::InteractionId,
911 ) -> Option<tokio::sync::mpsc::Sender<crate::event::AgentEvent>> {
912 self.interaction_subscriber(_id)
913 }
914
915 fn mark_interaction_complete(&self, _id: &crate::interaction::InteractionId) {}
921
922 fn peer_interaction_handle(
928 &self,
929 ) -> Option<std::sync::Arc<dyn crate::handles::PeerInteractionHandle>> {
930 None
931 }
932
933 fn peer_request_response_authority_handle(
942 &self,
943 ) -> Option<std::sync::Arc<dyn crate::handles::PeerInteractionHandle>> {
944 None
945 }
946
947 async fn drain_classified_inbox_interactions(
955 &self,
956 ) -> Result<Vec<crate::interaction::ClassifiedInboxInteraction>, CommsCapabilityError> {
957 Err(CommsCapabilityError::Unsupported(
958 "drain_classified_inbox_interactions".to_string(),
959 ))
960 }
961
962 async fn drain_peer_input_candidates(&self) -> Vec<crate::interaction::PeerInputCandidate> {
969 self.drain_classified_inbox_interactions()
970 .await
971 .unwrap_or_default()
972 }
973
974 async fn peer_ingress_queue_snapshot(
979 &self,
980 ) -> Result<crate::interaction::PeerIngressQueueSnapshot, CommsCapabilityError> {
981 Err(CommsCapabilityError::Unsupported(
982 "peer_ingress_queue_snapshot".to_string(),
983 ))
984 }
985
986 async fn peer_ingress_runtime_snapshot(
991 &self,
992 ) -> Result<crate::interaction::PeerIngressRuntimeSnapshot, CommsCapabilityError> {
993 Err(CommsCapabilityError::Unsupported(
994 "peer_ingress_runtime_snapshot".to_string(),
995 ))
996 }
997
998 fn actionable_input_notify(&self) -> Result<Arc<tokio::sync::Notify>, CommsCapabilityError> {
1003 Err(CommsCapabilityError::Unsupported(
1004 "actionable_input_notify".to_string(),
1005 ))
1006 }
1007}
1008
1009pub struct Agent<C, T, S>
1011where
1012 C: AgentLlmClient + ?Sized,
1013 T: AgentToolDispatcher + ?Sized,
1014 S: AgentSessionStore + ?Sized,
1015{
1016 config: AgentConfig,
1017 client: Arc<C>,
1018 tools: Arc<T>,
1019 tool_scope: ToolScope,
1020 store: Arc<S>,
1021 session: Session,
1022 budget: Budget,
1023 retry_policy: RetryPolicy,
1024 depth: u32,
1025 pub(super) comms_runtime: Option<Arc<dyn CommsRuntime>>,
1026 pub(super) hook_engine: Option<Arc<dyn HookEngine>>,
1027 pub(super) hook_run_overrides: HookRunOverrides,
1028 pub(crate) compactor: Option<Arc<dyn crate::compact::Compactor>>,
1030 pub(crate) last_input_tokens: u64,
1032 pub(crate) compaction_cadence: SessionCompactionCadence,
1034 pub(crate) memory_store: Option<Arc<dyn crate::memory::MemoryStore>>,
1036 pub(crate) skill_engine: Option<Arc<crate::skills::SkillRuntime>>,
1038 pub pending_skill_references: Option<Vec<crate::skills::SkillKey>>,
1041 pub(crate) event_tap: crate::event_tap::EventTap,
1043 pub(crate) system_context_state:
1045 Arc<std::sync::Mutex<crate::session::SessionSystemContextState>>,
1046 pub(crate) default_event_tx: Option<tokio::sync::mpsc::Sender<crate::event::AgentEvent>>,
1049 pub(crate) checkpointer: Option<Arc<dyn crate::checkpoint::SessionCheckpointer>>,
1055 pub(crate) blob_store: Option<Arc<dyn crate::BlobStore>>,
1057 pub(crate) terminal_error_detail: Option<String>,
1061 pub(crate) run_completed_hooks_applied: bool,
1063 pub(crate) run_completed_event_emitted: bool,
1066 #[allow(dead_code)] pub(crate) silent_comms_intents: Vec<String>,
1070 pub(crate) ops_lifecycle: Option<Arc<dyn crate::ops_lifecycle::OpsLifecycleRegistry>>,
1072 pub(crate) completion_feed: Option<Arc<dyn crate::completion_feed::CompletionFeed>>,
1074 pub(crate) epoch_cursor_state: Option<Arc<crate::runtime_epoch::EpochCursorState>>,
1076 pub(crate) applied_cursor: crate::completion_feed::CompletionSeq,
1078 pub(crate) completion_enrichment:
1080 Option<Arc<dyn crate::completion_feed::CompletionEnrichmentProvider>>,
1081 pub(crate) mob_authority_handle:
1086 Option<Arc<std::sync::RwLock<crate::service::MobToolAuthorityContext>>>,
1087 pub(crate) turn_state_handle: Option<Arc<dyn crate::TurnStateHandle>>,
1089 pub(crate) runtime_execution_kind_required: bool,
1091 pub(crate) runtime_execution_kind: Option<crate::lifecycle::RuntimeExecutionKind>,
1094 pub(crate) external_tool_surface_handle: Option<Arc<dyn crate::ExternalToolSurfaceHandle>>,
1097 pub(crate) auth_lease_handle: Option<Arc<dyn crate::handles::AuthLeaseHandle>>,
1099 pub(crate) mcp_server_lifecycle_handle:
1103 Option<Arc<dyn crate::handles::McpServerLifecycleHandle>>,
1104 pub(crate) cancel_after_boundary_requested: Arc<std::sync::atomic::AtomicBool>,
1106 pub(crate) model_defaults_resolver:
1109 Option<Arc<dyn crate::model_defaults::ModelOperationalDefaultsResolver>>,
1110 pub(crate) call_timeout_override: crate::config::CallTimeoutOverride,
1113 pub(crate) extraction_state: extraction::ExtractionState,
1115 pub(crate) last_hidden_deferred_catalog_names: BTreeSet<String>,
1117 pub(crate) last_pending_catalog_sources: BTreeSet<String>,
1119 pub(crate) tool_dispatch_context: ToolDispatchContext,
1121}
1122
1123#[cfg(test)]
1124#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
1125mod tests {
1126 use super::{
1127 AgentToolDispatcher, CommsRuntime, DEFAULT_MAX_INLINE_PEER_NOTIFICATIONS,
1128 FilteredToolDispatcher, InlinePeerNotificationPolicy, ToolDispatchContext,
1129 };
1130 use crate::comms::{
1131 PeerAddress, PeerId, PeerName, PeerTransport, SendError, TrustedPeerDescriptor,
1132 };
1133 use crate::types::{ContentBlock, ContentInput, ToolCallView, ToolDef, ToolResult};
1134 use async_trait::async_trait;
1135 use serde_json::json;
1136 use std::sync::Arc;
1137 use tokio::sync::Notify;
1138
1139 struct NoopCommsRuntime {
1140 notify: Arc<Notify>,
1141 }
1142
1143 struct ContextAwareToolDispatcher;
1144
1145 #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
1146 #[cfg_attr(not(target_arch = "wasm32"), async_trait)]
1147 impl AgentToolDispatcher for ContextAwareToolDispatcher {
1148 fn tools(&self) -> Arc<[Arc<ToolDef>]> {
1149 Arc::from([Arc::new(ToolDef {
1150 name: "inspect_context".into(),
1151 description: "inspect context".to_string(),
1152 input_schema: json!({"type": "object"}),
1153 provenance: None,
1154 })])
1155 }
1156
1157 async fn dispatch(
1158 &self,
1159 call: ToolCallView<'_>,
1160 ) -> Result<crate::ops::ToolDispatchOutcome, crate::error::ToolError> {
1161 Ok(ToolResult::new(
1162 call.id.to_string(),
1163 json!({"saw_context_image": false}).to_string(),
1164 false,
1165 )
1166 .into())
1167 }
1168
1169 async fn dispatch_with_context(
1170 &self,
1171 call: ToolCallView<'_>,
1172 context: &ToolDispatchContext,
1173 ) -> Result<crate::ops::ToolDispatchOutcome, crate::error::ToolError> {
1174 Ok(ToolResult::new(
1175 call.id.to_string(),
1176 json!({"saw_context_image": context.current_turn_image(0).is_some()}).to_string(),
1177 false,
1178 )
1179 .into())
1180 }
1181 }
1182
1183 #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
1184 #[cfg_attr(not(target_arch = "wasm32"), async_trait)]
1185 impl CommsRuntime for NoopCommsRuntime {
1186 async fn drain_messages(&self) -> Vec<String> {
1187 Vec::new()
1188 }
1189
1190 fn inbox_notify(&self) -> std::sync::Arc<Notify> {
1191 self.notify.clone()
1192 }
1193 }
1194
1195 #[tokio::test]
1196 async fn test_comms_runtime_trait_defaults_hide_unimplemented_features() {
1197 let runtime = NoopCommsRuntime {
1198 notify: Arc::new(Notify::new()),
1199 };
1200 assert!(<NoopCommsRuntime as CommsRuntime>::public_key(&runtime).is_none());
1201 let peer = TrustedPeerDescriptor {
1202 peer_id: PeerId::new(),
1203 name: PeerName::new("peer-a").expect("valid peer name"),
1204 address: PeerAddress::new(PeerTransport::Inproc, "peer-a"),
1205 pubkey: [0u8; 32],
1206 };
1207 let result = <NoopCommsRuntime as CommsRuntime>::add_trusted_peer(&runtime, peer).await;
1208 assert!(matches!(result, Err(SendError::Unsupported(_))));
1209 }
1210
1211 #[tokio::test]
1212 async fn test_remove_trusted_peer_default_unsupported() {
1213 let runtime = NoopCommsRuntime {
1214 notify: Arc::new(Notify::new()),
1215 };
1216 let peer_id = PeerId::new().to_string();
1217 let result =
1218 <NoopCommsRuntime as CommsRuntime>::remove_trusted_peer(&runtime, &peer_id).await;
1219 assert!(matches!(result, Err(SendError::Unsupported(_))));
1220 }
1221
1222 #[tokio::test]
1223 async fn filtered_tool_dispatcher_preserves_dispatch_context() {
1224 let dispatcher =
1225 FilteredToolDispatcher::new(Arc::new(ContextAwareToolDispatcher), ["inspect_context"]);
1226 let args = serde_json::value::RawValue::from_string("{}".to_string())
1227 .expect("empty object should be valid JSON");
1228 let call = ToolCallView {
1229 id: "ctx-1",
1230 name: "inspect_context",
1231 args: &args,
1232 };
1233 let context = ToolDispatchContext::from_current_turn_input(&ContentInput::Blocks(vec![
1234 ContentBlock::Image {
1235 media_type: "image/png".to_string(),
1236 data: "abc".into(),
1237 },
1238 ]));
1239
1240 let outcome = dispatcher
1241 .dispatch_with_context(call, &context)
1242 .await
1243 .expect("filtered wrapper should dispatch");
1244 let payload: serde_json::Value =
1245 serde_json::from_str(&outcome.result.text_content()).expect("tool result JSON");
1246 assert_eq!(payload["saw_context_image"], true);
1247 }
1248
1249 #[test]
1250 fn test_inline_peer_notification_policy_from_raw() {
1251 assert_eq!(
1252 InlinePeerNotificationPolicy::try_from_raw(None),
1253 Ok(InlinePeerNotificationPolicy::AtMost(
1254 DEFAULT_MAX_INLINE_PEER_NOTIFICATIONS
1255 ))
1256 );
1257 assert_eq!(
1258 InlinePeerNotificationPolicy::try_from_raw(Some(-1)),
1259 Ok(InlinePeerNotificationPolicy::Always)
1260 );
1261 assert_eq!(
1262 InlinePeerNotificationPolicy::try_from_raw(Some(0)),
1263 Ok(InlinePeerNotificationPolicy::Never)
1264 );
1265 assert_eq!(
1266 InlinePeerNotificationPolicy::try_from_raw(Some(25)),
1267 Ok(InlinePeerNotificationPolicy::AtMost(25))
1268 );
1269 assert_eq!(
1270 InlinePeerNotificationPolicy::try_from_raw(Some(-42)),
1271 Err(-42)
1272 );
1273 }
1274
1275 #[test]
1278 fn unit_001_terminal_status_values() {
1279 use crate::ops_lifecycle::OperationStatus;
1280 assert!(OperationStatus::Completed.is_terminal());
1281 assert!(OperationStatus::Failed.is_terminal());
1282 assert!(OperationStatus::Cancelled.is_terminal());
1283 assert!(OperationStatus::Aborted.is_terminal());
1284 assert!(OperationStatus::Retired.is_terminal());
1285 assert!(OperationStatus::Terminated.is_terminal());
1286 assert!(!OperationStatus::Running.is_terminal());
1287 assert!(!OperationStatus::Provisioning.is_terminal());
1288 assert!(!OperationStatus::Retiring.is_terminal());
1289 assert!(!OperationStatus::Absent.is_terminal());
1290 }
1291
1292 #[test]
1295 fn unit_002_detached_op_completion_has_no_operation_id() {
1296 use crate::agent::DetachedOpCompletion;
1297 use crate::ops_lifecycle::{OperationKind, OperationStatus};
1298
1299 let completion = DetachedOpCompletion {
1300 job_id: "j_test".into(),
1301 kind: OperationKind::BackgroundToolOp,
1302 status: OperationStatus::Completed,
1303 terminal_outcome: None,
1304 display_name: "test cmd".into(),
1305 detail: "ok".into(),
1306 elapsed_ms: None,
1307 };
1308 #[allow(clippy::unwrap_used)]
1309 let json = serde_json::to_value(&completion).unwrap();
1310 assert!(
1311 json.get("operation_id").is_none(),
1312 "operation_id must not appear in serialized DetachedOpCompletion (CONTRACT-003)"
1313 );
1314 assert!(
1315 json.get("job_id").is_some(),
1316 "job_id must be the app-facing control noun"
1317 );
1318 }
1319}