Skip to main content

bob_core/
ports.rs

1//! # Port Traits
2//!
3//! Hexagonal port traits for the Bob Agent Framework.
4//!
5//! These are the 4 v1 boundaries that adapters must implement.
6//! All async traits use `async_trait` for dyn-compatibility.
7//!
8//! ## Architecture
9//!
10//! ```text
11//! ┌─────────────────────────────────────────┐
12//! │          Runtime / Application          │
13//! └─────────────────────────────────────────┘
14//!                  ↓ uses ports
15//! ┌─────────────────────────────────────────┐
16//! │            Port Traits (this module)    │
17//! │  ┌─────────┐ ┌─────────┐ ┌──────────┐  │
18//! │  │LlmPort  │ │ToolPort │ │Store ... │  │
19//! │  └─────────┘ └─────────┘ └──────────┘  │
20//! └─────────────────────────────────────────┘
21//!                  ↓ implemented by
22//! ┌─────────────────────────────────────────┐
23//! │            Adapters (bob-adapters)      │
24//! └─────────────────────────────────────────┘
25//! ```
26//!
27//! ## Implementing a Port
28//!
29//! To create a custom adapter:
30//!
31//! ```rust,ignore
32//! use bob_core::{
33//!     ports::LlmPort,
34//!     types::{LlmRequest, LlmResponse, LlmStream},
35//!     error::LlmError,
36//! };
37//! use async_trait::async_trait;
38//!
39//! pub struct MyCustomLlm {
40//!     // Your fields here
41//! }
42//!
43//! #[async_trait]
44//! impl LlmPort for MyCustomLlm {
45//!     async fn complete(&self, req: LlmRequest) -> Result<LlmResponse, LlmError> {
46//!         // Your implementation
47//!     }
48//!
49//!     async fn complete_stream(&self, req: LlmRequest) -> Result<LlmStream, LlmError> {
50//!         // Your implementation
51//!     }
52//! }
53//! ```
54
55use crate::{
56    error::{CostError, LlmError, StoreError, ToolError},
57    tape::{TapeEntry, TapeEntryKind, TapeSearchResult},
58    types::{
59        AccessDecision, ActivityEntry, ActivityQuery, AgentEvent, ApprovalContext,
60        ApprovalDecision, ArtifactRecord, ChannelAccessPolicy, InboundMessage, LlmCapabilities,
61        LlmRequest, LlmResponse, LlmStream, Message, OutboundMessage, SessionId, SessionState,
62        SubagentResult, TokenUsage, ToolCall, ToolDescriptor, ToolResult, TurnCheckpoint,
63    },
64};
65
66// ── LLM Port ─────────────────────────────────────────────────────────
67
68/// Port for LLM inference (complete and stream).
69#[async_trait::async_trait]
70pub trait LlmPort: Send + Sync {
71    /// Runtime capability declaration for dispatch decisions.
72    #[must_use]
73    fn capabilities(&self) -> LlmCapabilities {
74        LlmCapabilities::default()
75    }
76
77    /// Run a non-streaming inference call.
78    async fn complete(&self, req: LlmRequest) -> Result<LlmResponse, LlmError>;
79
80    /// Run a streaming inference call.
81    async fn complete_stream(&self, req: LlmRequest) -> Result<LlmStream, LlmError>;
82}
83
84/// Port for compacting persisted session transcript into prompt-ready history.
85#[async_trait::async_trait]
86pub trait ContextCompactorPort: Send + Sync {
87    /// Return the transcript entries that should be forwarded to the prompt builder.
88    async fn compact(&self, session: &SessionState) -> Vec<Message>;
89}
90
91// ── Tool Port ────────────────────────────────────────────────────────
92
93/// Port for tool discovery.
94#[async_trait::async_trait]
95pub trait ToolCatalogPort: Send + Sync {
96    /// List all available tools.
97    async fn list_tools(&self) -> Result<Vec<ToolDescriptor>, ToolError>;
98}
99
100/// Port for tool execution.
101#[async_trait::async_trait]
102pub trait ToolExecutorPort: Send + Sync {
103    /// Execute a tool call and return its result.
104    async fn call_tool(&self, call: ToolCall) -> Result<ToolResult, ToolError>;
105}
106
107/// Backward-compatible composite tool port.
108///
109/// New code should prefer depending on [`ToolCatalogPort`] and [`ToolExecutorPort`] separately.
110#[async_trait::async_trait]
111pub trait ToolPort: Send + Sync {
112    /// List all available tools.
113    async fn list_tools(&self) -> Result<Vec<ToolDescriptor>, ToolError>;
114
115    /// Execute a tool call and return its result.
116    async fn call_tool(&self, call: ToolCall) -> Result<ToolResult, ToolError>;
117}
118
119#[async_trait::async_trait]
120impl<T> ToolCatalogPort for T
121where
122    T: ToolPort + ?Sized,
123{
124    async fn list_tools(&self) -> Result<Vec<ToolDescriptor>, ToolError> {
125        ToolPort::list_tools(self).await
126    }
127}
128
129#[async_trait::async_trait]
130impl<T> ToolExecutorPort for T
131where
132    T: ToolPort + ?Sized,
133{
134    async fn call_tool(&self, call: ToolCall) -> Result<ToolResult, ToolError> {
135        ToolPort::call_tool(self, call).await
136    }
137}
138
139/// Tool policy decision boundary.
140pub trait ToolPolicyPort: Send + Sync {
141    /// Returns `true` when a tool is allowed for the effective request policy.
142    fn is_tool_allowed(
143        &self,
144        tool: &str,
145        deny_tools: &[String],
146        allow_tools: Option<&[String]>,
147    ) -> bool;
148}
149
150/// Tool call approval boundary for interactive/sensitive operations.
151#[async_trait::async_trait]
152pub trait ApprovalPort: Send + Sync {
153    /// Decide whether the tool call may proceed.
154    async fn approve_tool_call(
155        &self,
156        call: &ToolCall,
157        context: &ApprovalContext,
158    ) -> Result<ApprovalDecision, ToolError>;
159}
160
161/// Port for persisting turn checkpoints.
162#[async_trait::async_trait]
163pub trait TurnCheckpointStorePort: Send + Sync {
164    async fn save_checkpoint(&self, checkpoint: &TurnCheckpoint) -> Result<(), StoreError>;
165    async fn load_latest(
166        &self,
167        session_id: &SessionId,
168    ) -> Result<Option<TurnCheckpoint>, StoreError>;
169}
170
171/// Port for storing per-turn artifacts.
172#[async_trait::async_trait]
173pub trait ArtifactStorePort: Send + Sync {
174    async fn put(&self, artifact: ArtifactRecord) -> Result<(), StoreError>;
175    async fn list_by_session(
176        &self,
177        session_id: &SessionId,
178    ) -> Result<Vec<ArtifactRecord>, StoreError>;
179}
180
181/// Port for cost metering and budget checks.
182#[async_trait::async_trait]
183pub trait CostMeterPort: Send + Sync {
184    async fn check_budget(&self, session_id: &SessionId) -> Result<(), CostError>;
185    async fn record_llm_usage(
186        &self,
187        session_id: &SessionId,
188        model: &str,
189        usage: &TokenUsage,
190    ) -> Result<(), CostError>;
191    async fn record_tool_result(
192        &self,
193        session_id: &SessionId,
194        tool_result: &ToolResult,
195    ) -> Result<(), CostError>;
196}
197
198// ── Tape Store ───────────────────────────────────────────────────────
199
200/// Port for the append-only conversation tape.
201///
202/// The tape records all messages, events, anchors, and handoffs. It is
203/// separate from the session store: the session store tracks LLM context,
204/// while the tape provides a searchable audit log.
205#[async_trait::async_trait]
206pub trait TapeStorePort: Send + Sync {
207    /// Append a new entry to the session's tape.
208    async fn append(
209        &self,
210        session_id: &SessionId,
211        kind: TapeEntryKind,
212    ) -> Result<TapeEntry, StoreError>;
213
214    /// Return entries recorded since the most recent handoff.
215    ///
216    /// If no handoff exists, returns all entries.
217    async fn entries_since_last_handoff(
218        &self,
219        session_id: &SessionId,
220    ) -> Result<Vec<TapeEntry>, StoreError>;
221
222    /// Search the tape for entries matching a query string.
223    async fn search(
224        &self,
225        session_id: &SessionId,
226        query: &str,
227    ) -> Result<Vec<TapeSearchResult>, StoreError>;
228
229    /// Return all entries for a session.
230    async fn all_entries(&self, session_id: &SessionId) -> Result<Vec<TapeEntry>, StoreError>;
231
232    /// Return only anchor entries for a session.
233    async fn anchors(&self, session_id: &SessionId) -> Result<Vec<TapeEntry>, StoreError>;
234}
235
236// ── Session Store ────────────────────────────────────────────────────
237
238/// Port for session state persistence.
239#[async_trait::async_trait]
240pub trait SessionStore: Send + Sync {
241    /// Load a session by ID. Returns `None` if not found.
242    async fn load(&self, id: &SessionId) -> Result<Option<SessionState>, StoreError>;
243
244    /// Persist a session by ID. Increments the version unconditionally.
245    async fn save(&self, id: &SessionId, state: &SessionState) -> Result<(), StoreError>;
246
247    /// Persist a session only if the current version matches `expected_version`.
248    ///
249    /// On success the store increments the version. On version mismatch the
250    /// store returns [`StoreError::VersionConflict`] with the actual version
251    /// found, allowing the caller to reload and retry.
252    async fn save_if_version(
253        &self,
254        id: &SessionId,
255        state: &SessionState,
256        expected_version: u64,
257    ) -> Result<u64, StoreError> {
258        // Default fallback: simple load-check-save. Adapters should override
259        // this with an atomic CAS for correctness under concurrency.
260        let current = self.load(id).await?;
261        let current_version = current.as_ref().map_or(0, |s| s.version);
262        if current_version != expected_version {
263            return Err(StoreError::VersionConflict {
264                expected: expected_version,
265                actual: current_version,
266            });
267        }
268        self.save(id, state).await?;
269        Ok(current_version.saturating_add(1))
270    }
271}
272
273// ── Event Sink ───────────────────────────────────────────────────────
274
275/// Port for emitting observability events (fire-and-forget).
276pub trait EventSink: Send + Sync {
277    /// Emit an agent event. Must not block.
278    fn emit(&self, event: AgentEvent);
279}
280
281// ── Subagent Port ─────────────────────────────────────────────────────
282
283/// Port for spawning and managing background subagents.
284///
285/// Subagents run independently in their own Tokio tasks with their own
286/// session state and tool registry, but share the parent's LLM port.
287/// Recursive spawning is prevented by denying the `subagent/spawn` tool.
288#[async_trait::async_trait]
289pub trait SubagentPort: Send + Sync {
290    /// Spawn a new subagent task. Returns the subagent ID immediately.
291    async fn spawn(
292        &self,
293        task: String,
294        parent_session_id: SessionId,
295        model: Option<String>,
296        max_steps: Option<u32>,
297        extra_deny_tools: Vec<String>,
298    ) -> Result<SessionId, crate::error::AgentError>;
299
300    /// Await a subagent's result (blocks until completion).
301    async fn await_result(
302        &self,
303        subagent_id: &SessionId,
304    ) -> Result<SubagentResult, crate::error::AgentError>;
305
306    /// List currently active subagent IDs for a parent session.
307    async fn list_active(
308        &self,
309        parent_session_id: &SessionId,
310    ) -> Result<Vec<SessionId>, crate::error::AgentError>;
311
312    /// Cancel a running subagent.
313    async fn cancel(&self, subagent_id: &SessionId) -> Result<(), crate::error::AgentError>;
314}
315
316// ── Access Control ──────────────────────────────────────────────────
317
318/// Port for channel-level access control decisions.
319pub trait AccessControlPort: Send + Sync {
320    /// Check whether a sender is allowed on a channel.
321    fn check_access(&self, channel: &str, sender_id: &str) -> AccessDecision;
322    /// List all configured policies.
323    fn policies(&self) -> &[ChannelAccessPolicy];
324}
325
326// ── Message Bus Port ─────────────────────────────────────────────────
327
328/// Port for message bus communication between bot and agent layers.
329///
330/// Decouples chat adapters from the agent runtime by providing a typed
331/// async channel abstraction. The bot layer pushes [`InboundMessage`]s
332/// onto the bus, and the agent layer consumes them. The agent layer
333/// pushes [`OutboundMessage`]s for the bot layer to deliver to channels.
334#[async_trait::async_trait]
335pub trait MessageBusPort: Send + Sync {
336    /// Send an outbound message to the bus.
337    async fn send(&self, msg: OutboundMessage) -> Result<(), crate::error::AgentError>;
338
339    /// Receive the next inbound message (blocks until available).
340    async fn recv(&self) -> Result<InboundMessage, crate::error::AgentError>;
341}
342
343// ── Activity Journal Port ────────────────────────────────────────────
344
345/// Port for append-only activity journal with time-based queries.
346///
347/// Records agent activity (messages, system events) and supports querying
348/// entries within a symmetric time window around an anchor timestamp.
349#[async_trait::async_trait]
350pub trait ActivityJournalPort: Send + Sync {
351    /// Append an entry to the journal.
352    async fn append(&self, entry: ActivityEntry) -> Result<(), StoreError>;
353
354    /// Query entries within a time window.
355    async fn query(&self, query: &ActivityQuery) -> Result<Vec<ActivityEntry>, StoreError>;
356
357    /// Count total entries.
358    async fn count(&self) -> Result<u64, StoreError>;
359}
360
361// ── Tests ────────────────────────────────────────────────────────────
362
363#[cfg(test)]
364mod tests {
365    use std::sync::{Arc, Mutex};
366
367    use super::*;
368
369    // ── Mock LLM ─────────────────────────────────────────────────
370
371    struct MockLlm {
372        response: LlmResponse,
373    }
374
375    #[async_trait::async_trait]
376    impl LlmPort for MockLlm {
377        async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
378            Ok(self.response.clone())
379        }
380
381        async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
382            Err(LlmError::Provider("streaming not implemented in mock".into()))
383        }
384    }
385
386    // ── Mock Tool Port ───────────────────────────────────────────
387
388    struct MockToolPort {
389        tools: Vec<ToolDescriptor>,
390    }
391
392    #[async_trait::async_trait]
393    impl ToolPort for MockToolPort {
394        async fn list_tools(&self) -> Result<Vec<ToolDescriptor>, ToolError> {
395            Ok(self.tools.clone())
396        }
397
398        async fn call_tool(&self, call: ToolCall) -> Result<ToolResult, ToolError> {
399            Ok(ToolResult {
400                name: call.name,
401                output: serde_json::json!({"result": "mock"}),
402                is_error: false,
403            })
404        }
405    }
406
407    // ── Mock Session Store ───────────────────────────────────────
408
409    struct MockSessionStore {
410        inner: Mutex<std::collections::HashMap<SessionId, SessionState>>,
411    }
412
413    impl MockSessionStore {
414        fn new() -> Self {
415            Self { inner: Mutex::new(std::collections::HashMap::new()) }
416        }
417    }
418
419    #[async_trait::async_trait]
420    impl SessionStore for MockSessionStore {
421        async fn load(&self, id: &SessionId) -> Result<Option<SessionState>, StoreError> {
422            let map = self.inner.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
423            Ok(map.get(id).cloned())
424        }
425
426        async fn save(&self, id: &SessionId, state: &SessionState) -> Result<(), StoreError> {
427            let mut map = self.inner.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
428            map.insert(id.clone(), state.clone());
429            Ok(())
430        }
431    }
432
433    // ── Mock Event Sink ──────────────────────────────────────────
434
435    struct MockEventSink {
436        events: Mutex<Vec<AgentEvent>>,
437    }
438
439    impl MockEventSink {
440        fn new() -> Self {
441            Self { events: Mutex::new(Vec::new()) }
442        }
443    }
444
445    impl EventSink for MockEventSink {
446        fn emit(&self, event: AgentEvent) {
447            let mut events = self.events.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
448            events.push(event);
449        }
450    }
451
452    // ── Tests ────────────────────────────────────────────────────
453
454    #[tokio::test]
455    async fn llm_port_complete() {
456        let llm: Arc<dyn LlmPort> = Arc::new(MockLlm {
457            response: LlmResponse {
458                content: "Hello!".into(),
459                usage: crate::types::TokenUsage { prompt_tokens: 10, completion_tokens: 5 },
460                finish_reason: crate::types::FinishReason::Stop,
461                tool_calls: Vec::new(),
462            },
463        });
464
465        let req = LlmRequest {
466            model: "test-model".into(),
467            messages: vec![],
468            tools: vec![],
469            output_schema: None,
470        };
471
472        let resp = llm.complete(req).await;
473        assert!(resp.is_ok(), "complete should succeed");
474        if let Ok(resp) = resp {
475            assert_eq!(resp.content, "Hello!");
476            assert_eq!(resp.usage.total(), 15);
477        }
478    }
479
480    #[tokio::test]
481    async fn tool_port_list_and_call() {
482        let tools: Arc<dyn ToolPort> =
483            Arc::new(MockToolPort { tools: vec![ToolDescriptor::new("test/echo", "Echo tool")] });
484
485        let listed = tools.list_tools().await;
486        assert!(listed.is_ok(), "list_tools should succeed");
487        if let Ok(listed) = listed {
488            assert_eq!(listed.len(), 1);
489            assert_eq!(listed[0].id, "test/echo");
490        }
491
492        let result =
493            tools.call_tool(ToolCall::new("test/echo", serde_json::json!({"msg": "hi"}))).await;
494        assert!(result.is_ok(), "call_tool should succeed");
495        if let Ok(result) = result {
496            assert!(!result.is_error);
497        }
498    }
499
500    #[tokio::test]
501    async fn session_store_roundtrip() {
502        let store: Arc<dyn SessionStore> = Arc::new(MockSessionStore::new());
503
504        let id = "session-1".to_string();
505        let loaded_before = store.load(&id).await;
506        assert!(matches!(loaded_before, Ok(None)));
507
508        let state = SessionState {
509            messages: vec![crate::types::Message::text(crate::types::Role::User, "hello")],
510            ..Default::default()
511        };
512
513        let saved = store.save(&id, &state).await;
514        assert!(saved.is_ok(), "save should succeed");
515
516        let loaded = store.load(&id).await;
517        assert!(matches!(loaded, Ok(Some(_))), "load should return stored state");
518        if let Ok(Some(loaded)) = loaded {
519            assert_eq!(loaded.messages.len(), 1);
520            assert_eq!(loaded.messages[0].content, "hello");
521        }
522    }
523
524    #[test]
525    fn event_sink_collect() {
526        let sink = MockEventSink::new();
527        sink.emit(AgentEvent::TurnStarted { session_id: "s1".into() });
528        sink.emit(AgentEvent::Error { session_id: "s1".into(), step: None, error: "boom".into() });
529
530        let events = sink.events.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
531        assert_eq!(events.len(), 2);
532    }
533
534    #[test]
535    fn turn_policy_defaults() {
536        let policy = crate::types::TurnPolicy::default();
537        assert_eq!(policy.max_steps, 12);
538        assert_eq!(policy.max_tool_calls, 8);
539        assert_eq!(policy.max_consecutive_errors, 2);
540        assert_eq!(policy.turn_timeout_ms, 90_000);
541        assert_eq!(policy.tool_timeout_ms, 15_000);
542    }
543}