1mod builder;
6pub mod comms_impl;
7pub mod compact;
8mod extraction;
9mod hook_impl;
10mod runner;
11pub mod skills;
12mod state;
13
14use crate::budget::Budget;
15use crate::comms::{
16 CommsCommand, EventStream, PeerDirectoryEntry, SendError, SendReceipt, StreamError,
17 StreamScope, TrustedPeerSpec,
18};
19use crate::compact::SessionCompactionCadence;
20use crate::config::{AgentConfig, HookRunOverrides};
21use crate::error::AgentError;
22use crate::event::ExternalToolDelta;
23use crate::hooks::HookEngine;
24use crate::ops_lifecycle::{OperationKind, OperationStatus, OperationTerminalOutcome};
25use crate::retry::RetryPolicy;
26use crate::schema::{CompiledSchema, SchemaError};
27use crate::session::Session;
28use crate::state::LoopState;
29#[cfg(target_arch = "wasm32")]
30use crate::tokio;
31use crate::tool_catalog::{
32 ToolCatalogCapabilities, ToolCatalogEntry, ToolCatalogMode, deferred_session_entry_count,
33 select_catalog_mode_from_snapshot,
34};
35use crate::tool_scope::ToolScope;
36use crate::types::{
37 AssistantBlock, BlockAssistantMessage, Message, OutputSchema, StopReason, ToolCallView,
38 ToolDef, Usage,
39};
40use async_trait::async_trait;
41use serde::{Deserialize, Serialize};
42use serde_json::Value;
43use std::collections::{BTreeSet, HashSet};
44use std::sync::Arc;
45
46pub use builder::AgentBuilder;
47pub use runner::AgentRunner;
48
49#[deprecated(
54 since = "0.2.0",
55 note = "Use ToolError::CallbackPending or AgentError::CallbackPending instead"
56)]
57pub const CALLBACK_TOOL_PREFIX: &str = "CALLBACK_TOOL_PENDING:";
58
59#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
61#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
62pub trait AgentLlmClient: Send + Sync {
63 async fn stream_response(
65 &self,
66 messages: &[Message],
67 tools: &[Arc<ToolDef>],
68 max_tokens: u32,
69 temperature: Option<f32>,
70 provider_params: Option<&Value>,
71 ) -> Result<LlmStreamResult, AgentError>;
72
73 fn provider(&self) -> &'static str;
75
76 fn model(&self) -> &str;
82
83 fn compile_schema(&self, output_schema: &OutputSchema) -> Result<CompiledSchema, SchemaError> {
89 Ok(CompiledSchema {
91 schema: output_schema.schema.as_value().clone(),
92 warnings: Vec::new(),
93 })
94 }
95}
96
97pub struct LlmStreamResult {
99 blocks: Vec<AssistantBlock>,
100 stop_reason: StopReason,
101 usage: Usage,
102}
103
104impl LlmStreamResult {
105 pub fn new(blocks: Vec<AssistantBlock>, stop_reason: StopReason, usage: Usage) -> Self {
106 Self {
107 blocks,
108 stop_reason,
109 usage,
110 }
111 }
112
113 pub fn blocks(&self) -> &[AssistantBlock] {
114 &self.blocks
115 }
116 pub fn stop_reason(&self) -> StopReason {
117 self.stop_reason
118 }
119 pub fn usage(&self) -> &Usage {
120 &self.usage
121 }
122
123 pub fn into_message(self) -> BlockAssistantMessage {
124 BlockAssistantMessage {
125 blocks: self.blocks,
126 stop_reason: self.stop_reason,
127 }
128 }
129
130 pub fn into_parts(self) -> (Vec<AssistantBlock>, StopReason, Usage) {
131 (self.blocks, self.stop_reason, self.usage)
132 }
133}
134
135#[derive(Debug, Clone, Default)]
139pub struct ExternalToolUpdate {
140 pub notices: Vec<ExternalToolDelta>,
142 pub pending: Vec<String>,
144 pub background_completions: Vec<DetachedOpCompletion>,
146}
147
148#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct DetachedOpCompletion {
156 pub job_id: String,
158 pub kind: OperationKind,
160 pub status: OperationStatus,
162 pub terminal_outcome: Option<OperationTerminalOutcome>,
164 pub display_name: String,
166 pub detail: String,
168 pub elapsed_ms: Option<u64>,
170}
171
172#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
177pub struct DispatcherCapabilities {
178 pub ops_lifecycle: bool,
180}
181
182pub enum BindOutcome {
193 Bound(Arc<dyn AgentToolDispatcher>),
195 Skipped(Arc<dyn AgentToolDispatcher>),
198}
199
200impl BindOutcome {
201 pub fn into_dispatcher(self) -> Arc<dyn AgentToolDispatcher> {
203 match self {
204 Self::Bound(d) | Self::Skipped(d) => d,
205 }
206 }
207
208 pub fn was_bound(&self) -> bool {
210 matches!(self, Self::Bound(_))
211 }
212}
213
214#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
216#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
217pub trait AgentToolDispatcher: Send + Sync {
218 fn tools(&self) -> Arc<[Arc<ToolDef>]>;
220
221 fn tool_catalog_capabilities(&self) -> ToolCatalogCapabilities {
227 ToolCatalogCapabilities::default()
228 }
229
230 fn tool_catalog(&self) -> Arc<[ToolCatalogEntry]> {
236 self.tools()
237 .iter()
238 .map(|tool| ToolCatalogEntry::session_inline(Arc::clone(tool), true))
239 .collect::<Vec<_>>()
240 .into()
241 }
242
243 fn pending_catalog_sources(&self) -> Arc<[String]> {
248 Arc::from([])
249 }
250
251 async fn dispatch(
257 &self,
258 call: ToolCallView<'_>,
259 ) -> Result<crate::ops::ToolDispatchOutcome, crate::error::ToolError>;
260
261 async fn poll_external_updates(&self) -> ExternalToolUpdate {
267 ExternalToolUpdate::default()
268 }
269
270 fn capabilities(&self) -> DispatcherCapabilities {
272 DispatcherCapabilities::default()
273 }
274
275 fn bind_ops_lifecycle(
280 self: Arc<Self>,
281 _registry: Arc<dyn crate::ops_lifecycle::OpsLifecycleRegistry>,
282 _owner_session_id: crate::types::SessionId,
283 ) -> Result<BindOutcome, OpsLifecycleBindError> {
284 Err(OpsLifecycleBindError::Unsupported)
285 }
286
287 fn completion_enrichment(
292 &self,
293 ) -> Option<Arc<dyn crate::completion_feed::CompletionEnrichmentProvider>> {
294 None
295 }
296}
297
298pub fn select_tool_catalog_mode<T>(dispatcher: &T) -> ToolCatalogMode
300where
301 T: AgentToolDispatcher + ?Sized,
302{
303 let capabilities = dispatcher.tool_catalog_capabilities();
304 if !capabilities.exact_catalog {
305 return ToolCatalogMode::Inline;
306 }
307 let pending_sources = dispatcher.pending_catalog_sources();
308 let catalog = dispatcher.tool_catalog();
309 select_catalog_mode_from_snapshot(
310 capabilities.exact_catalog,
311 catalog.as_ref(),
312 pending_sources.as_ref(),
313 )
314}
315
316pub fn should_compose_tool_catalog_control_plane<T>(dispatcher: &T) -> bool
319where
320 T: AgentToolDispatcher + ?Sized,
321{
322 let capabilities = dispatcher.tool_catalog_capabilities();
323 if !capabilities.exact_catalog {
324 return false;
325 }
326 if capabilities.may_require_catalog_control_plane {
327 return true;
328 }
329
330 let pending_sources = dispatcher.pending_catalog_sources();
331 if !pending_sources.is_empty() {
332 return true;
333 }
334
335 let catalog = dispatcher.tool_catalog();
336 deferred_session_entry_count(catalog.as_ref()) > 0
337}
338
339#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
341pub enum OpsLifecycleBindError {
342 #[error("ops lifecycle binding is unsupported")]
343 Unsupported,
344 #[error("dispatcher has shared ownership and cannot be rebound")]
345 SharedOwnership,
346}
347
348pub struct FilteredToolDispatcher<T: AgentToolDispatcher + ?Sized> {
354 inner: Arc<T>,
355 allowed_tools: HashSet<String>,
356 filtered_tools: Arc<[Arc<ToolDef>]>,
358}
359
360impl<T: AgentToolDispatcher + ?Sized> FilteredToolDispatcher<T> {
361 pub fn new(inner: Arc<T>, allowed_tools: Vec<String>) -> Self {
362 let allowed_set: HashSet<String> = allowed_tools.into_iter().collect();
363
364 let inner_tools = inner.tools();
366 let filtered: Vec<Arc<ToolDef>> = inner_tools
367 .iter()
368 .filter(|t| allowed_set.contains(t.name.as_str()))
369 .map(Arc::clone)
370 .collect();
371
372 Self {
373 inner,
374 allowed_tools: allowed_set,
375 filtered_tools: filtered.into(),
376 }
377 }
378}
379
380#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
381#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
382impl<T: AgentToolDispatcher + ?Sized + 'static> AgentToolDispatcher for FilteredToolDispatcher<T> {
383 fn tools(&self) -> Arc<[Arc<ToolDef>]> {
384 Arc::clone(&self.filtered_tools)
385 }
386
387 async fn dispatch(
388 &self,
389 call: ToolCallView<'_>,
390 ) -> Result<crate::ops::ToolDispatchOutcome, crate::error::ToolError> {
391 if !self.allowed_tools.contains(call.name) {
392 return Err(crate::error::ToolError::access_denied(call.name));
393 }
394 self.inner.dispatch(call).await
395 }
396
397 async fn poll_external_updates(&self) -> ExternalToolUpdate {
398 self.inner.poll_external_updates().await
399 }
400
401 fn capabilities(&self) -> DispatcherCapabilities {
402 self.inner.capabilities()
403 }
404
405 fn bind_ops_lifecycle(
406 self: Arc<Self>,
407 registry: Arc<dyn crate::ops_lifecycle::OpsLifecycleRegistry>,
408 owner_session_id: crate::types::SessionId,
409 ) -> Result<BindOutcome, OpsLifecycleBindError> {
410 let owned = Arc::try_unwrap(self).map_err(|_| OpsLifecycleBindError::SharedOwnership)?;
411 if Arc::strong_count(&owned.inner) == 1 {
412 let outcome = owned.inner.bind_ops_lifecycle(registry, owner_session_id)?;
413 let bound = outcome.was_bound();
414 let d = outcome.into_dispatcher();
415 Ok(if bound {
416 BindOutcome::Bound(Arc::new(FilteredToolDispatcher {
417 inner: d,
418 allowed_tools: owned.allowed_tools,
419 filtered_tools: owned.filtered_tools,
420 }))
421 } else {
422 BindOutcome::Skipped(Arc::new(FilteredToolDispatcher {
423 inner: d,
424 allowed_tools: owned.allowed_tools,
425 filtered_tools: owned.filtered_tools,
426 }))
427 })
428 } else {
429 Ok(BindOutcome::Skipped(Arc::new(FilteredToolDispatcher {
430 inner: owned.inner,
431 allowed_tools: owned.allowed_tools,
432 filtered_tools: owned.filtered_tools,
433 })))
434 }
435 }
436
437 fn completion_enrichment(
438 &self,
439 ) -> Option<Arc<dyn crate::completion_feed::CompletionEnrichmentProvider>> {
440 self.inner.completion_enrichment()
441 }
442}
443
444#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
446#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
447pub trait AgentSessionStore: Send + Sync {
448 async fn save(&self, session: &Session) -> Result<(), AgentError>;
449 async fn load(&self, id: &str) -> Result<Option<Session>, AgentError>;
450}
451
452#[derive(Debug, Clone, Copy, PartialEq, Eq)]
454pub enum InlinePeerNotificationPolicy {
455 Always,
457 Never,
459 AtMost(usize),
461}
462
463pub const DEFAULT_MAX_INLINE_PEER_NOTIFICATIONS: usize = 50;
465
466impl InlinePeerNotificationPolicy {
467 pub fn try_from_raw(raw: Option<i32>) -> Result<Self, i32> {
469 match raw {
470 None => Ok(Self::AtMost(DEFAULT_MAX_INLINE_PEER_NOTIFICATIONS)),
471 Some(-1) => Ok(Self::Always),
472 Some(0) => Ok(Self::Never),
473 Some(v) if v > 0 => Ok(Self::AtMost(v as usize)),
474 Some(v) => Err(v),
475 }
476 }
477}
478
479#[derive(Debug, thiserror::Error)]
481pub enum CommsCapabilityError {
482 #[error("comms capability not supported: {0}")]
484 Unsupported(String),
485}
486
487#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
489#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
490pub trait CommsRuntime: Send + Sync {
491 fn public_key(&self) -> Option<String> {
495 None
496 }
497
498 async fn add_trusted_peer(&self, _peer: TrustedPeerSpec) -> Result<(), SendError> {
504 Err(SendError::Unsupported(
505 "add_trusted_peer not supported for this CommsRuntime".to_string(),
506 ))
507 }
508
509 async fn remove_trusted_peer(&self, _peer_id: &str) -> Result<bool, SendError> {
515 Err(SendError::Unsupported(
516 "remove_trusted_peer not supported for this CommsRuntime".to_string(),
517 ))
518 }
519
520 async fn send(&self, _cmd: CommsCommand) -> Result<SendReceipt, SendError> {
522 Err(SendError::Unsupported(
523 "send not implemented for this CommsRuntime".to_string(),
524 ))
525 }
526
527 #[doc(hidden)]
528 fn stream(&self, scope: StreamScope) -> Result<EventStream, StreamError> {
529 let scope_desc = match scope {
530 StreamScope::Session(session_id) => format!("session {session_id}"),
531 };
532 Err(StreamError::NotFound(scope_desc))
533 }
534
535 async fn peers(&self) -> Vec<PeerDirectoryEntry> {
537 Vec::new()
538 }
539
540 async fn peer_count(&self) -> usize {
544 self.peers().await.len()
545 }
546
547 async fn drain_messages(&self) -> Vec<String>;
549 fn inbox_notify(&self) -> Arc<tokio::sync::Notify>;
551 fn dismiss_received(&self) -> bool {
553 false
554 }
555 fn event_injector(&self) -> Option<Arc<dyn crate::EventInjector>> {
560 None
561 }
562
563 #[doc(hidden)]
565 fn interaction_event_injector(
566 &self,
567 ) -> Option<Arc<dyn crate::event_injector::SubscribableInjector>> {
568 None
569 }
570
571 async fn drain_inbox_interactions(&self) -> Vec<crate::interaction::InboxInteraction> {
576 self.drain_messages()
577 .await
578 .into_iter()
579 .map(|text| crate::interaction::InboxInteraction {
580 id: crate::interaction::InteractionId(uuid::Uuid::new_v4()),
581 from: "unknown".into(),
582 content: crate::interaction::InteractionContent::Message {
583 body: text.clone(),
584 blocks: None,
585 },
586 rendered_text: text,
587 handling_mode: crate::types::HandlingMode::Queue,
588 render_metadata: None,
589 })
590 .collect()
591 }
592
593 fn interaction_subscriber(
598 &self,
599 _id: &crate::interaction::InteractionId,
600 ) -> Option<tokio::sync::mpsc::Sender<crate::event::AgentEvent>> {
601 None
602 }
603
604 fn take_interaction_stream_sender(
606 &self,
607 _id: &crate::interaction::InteractionId,
608 ) -> Option<tokio::sync::mpsc::Sender<crate::event::AgentEvent>> {
609 self.interaction_subscriber(_id)
610 }
611
612 fn mark_interaction_complete(&self, _id: &crate::interaction::InteractionId) {}
618
619 async fn drain_peer_input_candidates(&self) -> Vec<crate::interaction::PeerInputCandidate>;
623}
624
625pub struct Agent<C, T, S>
627where
628 C: AgentLlmClient + ?Sized,
629 T: AgentToolDispatcher + ?Sized,
630 S: AgentSessionStore + ?Sized,
631{
632 config: AgentConfig,
633 client: Arc<C>,
634 tools: Arc<T>,
635 tool_scope: ToolScope,
636 store: Arc<S>,
637 session: Session,
638 budget: Budget,
639 retry_policy: RetryPolicy,
640 state: LoopState,
641 depth: u32,
642 pub(super) comms_runtime: Option<Arc<dyn CommsRuntime>>,
643 pub(super) hook_engine: Option<Arc<dyn HookEngine>>,
644 pub(super) hook_run_overrides: HookRunOverrides,
645 pub(crate) compactor: Option<Arc<dyn crate::compact::Compactor>>,
647 pub(crate) last_input_tokens: u64,
649 pub(crate) compaction_cadence: SessionCompactionCadence,
651 pub(crate) memory_store: Option<Arc<dyn crate::memory::MemoryStore>>,
653 pub(crate) skill_engine: Option<Arc<crate::skills::SkillRuntime>>,
655 pub pending_skill_references: Option<Vec<crate::skills::SkillKey>>,
658 pub(crate) event_tap: crate::event_tap::EventTap,
660 pub(crate) system_context_state:
662 Arc<std::sync::Mutex<crate::session::SessionSystemContextState>>,
663 pub(crate) default_event_tx: Option<tokio::sync::mpsc::Sender<crate::event::AgentEvent>>,
666 #[allow(dead_code)] pub(crate) checkpointer: Option<Arc<dyn crate::checkpoint::SessionCheckpointer>>,
669 pub(crate) blob_store: Option<Arc<dyn crate::BlobStore>>,
671 pub(crate) pending_fatal_diagnostic: Option<AgentError>,
679 #[allow(dead_code)] pub(crate) silent_comms_intents: Vec<String>,
683 pub(crate) ops_lifecycle: Option<Arc<dyn crate::ops_lifecycle::OpsLifecycleRegistry>>,
688 pub(crate) completion_feed: Option<Arc<dyn crate::completion_feed::CompletionFeed>>,
690 pub(crate) epoch_cursor_state: Option<Arc<crate::runtime_epoch::EpochCursorState>>,
692 pub(crate) applied_cursor: crate::completion_feed::CompletionSeq,
694 pub(crate) completion_enrichment:
696 Option<Arc<dyn crate::completion_feed::CompletionEnrichmentProvider>>,
697 pub(crate) mob_authority_handle:
702 Option<Arc<std::sync::RwLock<crate::service::MobToolAuthorityContext>>>,
703 pub(crate) turn_authority: crate::turn_execution_authority::TurnExecutionAuthority,
705 pub(crate) model_defaults_resolver:
708 Option<Arc<dyn crate::model_defaults::ModelOperationalDefaultsResolver>>,
709 pub(crate) call_timeout_override: crate::config::CallTimeoutOverride,
712 pub(crate) extraction_result: Option<serde_json::Value>,
714 pub(crate) extraction_schema_warnings: Option<Vec<crate::schema::SchemaWarning>>,
716 pub(crate) extraction_last_error: Option<String>,
718 pub(crate) last_hidden_deferred_catalog_names: BTreeSet<String>,
720 pub(crate) last_pending_catalog_sources: BTreeSet<String>,
722}
723
724#[cfg(test)]
725mod tests {
726 use super::{
727 CommsRuntime, DEFAULT_MAX_INLINE_PEER_NOTIFICATIONS, InlinePeerNotificationPolicy,
728 };
729 use crate::comms::{SendError, TrustedPeerSpec};
730 use async_trait::async_trait;
731 use std::sync::Arc;
732 use tokio::sync::Notify;
733
734 struct NoopCommsRuntime {
735 notify: Arc<Notify>,
736 }
737
738 #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
739 #[cfg_attr(not(target_arch = "wasm32"), async_trait)]
740 impl CommsRuntime for NoopCommsRuntime {
741 async fn drain_messages(&self) -> Vec<String> {
742 Vec::new()
743 }
744
745 fn inbox_notify(&self) -> std::sync::Arc<Notify> {
746 self.notify.clone()
747 }
748
749 async fn drain_peer_input_candidates(&self) -> Vec<crate::interaction::PeerInputCandidate> {
750 Vec::new()
751 }
752 }
753
754 #[tokio::test]
755 async fn test_comms_runtime_trait_defaults_hide_unimplemented_features() {
756 let runtime = NoopCommsRuntime {
757 notify: Arc::new(Notify::new()),
758 };
759 assert!(<NoopCommsRuntime as CommsRuntime>::public_key(&runtime).is_none());
760 let peer = TrustedPeerSpec {
761 name: "peer-a".to_string(),
762 peer_id: "ed25519:test".to_string(),
763 address: "inproc://peer-a".to_string(),
764 };
765 let result = <NoopCommsRuntime as CommsRuntime>::add_trusted_peer(&runtime, peer).await;
766 assert!(matches!(result, Err(SendError::Unsupported(_))));
767 }
768
769 #[tokio::test]
770 async fn test_remove_trusted_peer_default_unsupported() {
771 let runtime = NoopCommsRuntime {
772 notify: Arc::new(Notify::new()),
773 };
774 let result =
775 <NoopCommsRuntime as CommsRuntime>::remove_trusted_peer(&runtime, "ed25519:test").await;
776 assert!(matches!(result, Err(SendError::Unsupported(_))));
777 }
778
779 #[test]
780 fn test_inline_peer_notification_policy_from_raw() {
781 assert_eq!(
782 InlinePeerNotificationPolicy::try_from_raw(None),
783 Ok(InlinePeerNotificationPolicy::AtMost(
784 DEFAULT_MAX_INLINE_PEER_NOTIFICATIONS
785 ))
786 );
787 assert_eq!(
788 InlinePeerNotificationPolicy::try_from_raw(Some(-1)),
789 Ok(InlinePeerNotificationPolicy::Always)
790 );
791 assert_eq!(
792 InlinePeerNotificationPolicy::try_from_raw(Some(0)),
793 Ok(InlinePeerNotificationPolicy::Never)
794 );
795 assert_eq!(
796 InlinePeerNotificationPolicy::try_from_raw(Some(25)),
797 Ok(InlinePeerNotificationPolicy::AtMost(25))
798 );
799 assert_eq!(
800 InlinePeerNotificationPolicy::try_from_raw(Some(-42)),
801 Err(-42)
802 );
803 }
804
805 #[test]
808 fn unit_001_terminal_status_values() {
809 use crate::ops_lifecycle::OperationStatus;
810 assert!(OperationStatus::Completed.is_terminal());
811 assert!(OperationStatus::Failed.is_terminal());
812 assert!(OperationStatus::Cancelled.is_terminal());
813 assert!(OperationStatus::Aborted.is_terminal());
814 assert!(OperationStatus::Retired.is_terminal());
815 assert!(OperationStatus::Terminated.is_terminal());
816 assert!(!OperationStatus::Running.is_terminal());
817 assert!(!OperationStatus::Provisioning.is_terminal());
818 assert!(!OperationStatus::Retiring.is_terminal());
819 assert!(!OperationStatus::Absent.is_terminal());
820 }
821
822 #[test]
825 fn unit_002_detached_op_completion_has_no_operation_id() {
826 use crate::agent::DetachedOpCompletion;
827 use crate::ops_lifecycle::{OperationKind, OperationStatus};
828
829 let completion = DetachedOpCompletion {
830 job_id: "j_test".into(),
831 kind: OperationKind::BackgroundToolOp,
832 status: OperationStatus::Completed,
833 terminal_outcome: None,
834 display_name: "test cmd".into(),
835 detail: "ok".into(),
836 elapsed_ms: None,
837 };
838 #[allow(clippy::unwrap_used)]
839 let json = serde_json::to_value(&completion).unwrap();
840 assert!(
841 json.get("operation_id").is_none(),
842 "operation_id must not appear in serialized DetachedOpCompletion (CONTRACT-003)"
843 );
844 assert!(
845 json.get("job_id").is_some(),
846 "job_id must be the app-facing control noun"
847 );
848 }
849}