Skip to main content

bob_runtime/
agent_loop.rs

1//! # Agent Loop
2//!
3//! High-level orchestration loop that unifies slash command routing, tape
4//! recording, and LLM pipeline execution.
5//!
6//! ## Architecture
7//!
8//! ```text
9//! ┌─────────────┐       ┌────────────┐       ┌─────────────────┐
10//! │  Channel     │ ───→  │ AgentLoop  │ ───→  │  AgentRuntime   │
11//! │ (transport)  │ ←───  │ (routing)  │ ←───  │  (LLM pipeline) │
12//! └─────────────┘       └────────────┘       └─────────────────┘
13//!                           │
14//!                           ├─→ Router (slash commands)
15//!                           ├─→ TapeStorePort (recording)
16//!                           └─→ ToolPort (tool listing for /tools)
17//! ```
18//!
19//! The `AgentLoop` wraps an `AgentRuntime` and adds:
20//!
21//! 1. **Slash command routing** — deterministic commands bypass the LLM
22//! 2. **Tape recording** — conversation history is persisted to the tape
23//! 3. **System prompt override** — load custom system prompts from workspace files
24
25use std::sync::Arc;
26
27use bob_core::{
28    error::AgentError,
29    ports::{EventSink, SessionStore, ToolPort},
30    tape::{TapeEntryKind, TapeSearchResult},
31    types::{AgentRequest, AgentRunResult, RequestContext, TokenUsage},
32};
33
34// Re-export for convenience.
35pub use crate::router::help_text;
36use crate::{
37    AgentRuntime,
38    router::{self, RouteResult, SlashCommand},
39};
40
41/// Output from the agent loop after processing a single input.
42#[derive(Debug)]
43pub enum AgentLoopOutput {
44    /// A response from the LLM pipeline (normal conversation).
45    Response(AgentRunResult),
46    /// A deterministic command response (no LLM involved).
47    CommandOutput(String),
48    /// The user requested to quit the session.
49    Quit,
50}
51
52/// High-level agent orchestration loop.
53///
54/// Wraps an `AgentRuntime` with slash command routing, tape recording,
55/// and optional system prompt overrides.
56///
57/// ## Construction
58///
59/// ```rust,ignore
60/// let agent_loop = AgentLoop::new(runtime, tools)
61///     .with_tape(tape_store)
62///     .with_system_prompt("You are a helpful assistant.".to_string());
63/// ```
64pub struct AgentLoop {
65    runtime: Arc<dyn AgentRuntime>,
66    tools: Arc<dyn ToolPort>,
67    store: Option<Arc<dyn SessionStore>>,
68    tape: Option<Arc<dyn bob_core::ports::TapeStorePort>>,
69    events: Option<Arc<dyn EventSink>>,
70    system_prompt_override: Option<String>,
71}
72
73impl std::fmt::Debug for AgentLoop {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        f.debug_struct("AgentLoop")
76            .field("has_store", &self.store.is_some())
77            .field("has_tape", &self.tape.is_some())
78            .field("has_system_prompt_override", &self.system_prompt_override.is_some())
79            .finish_non_exhaustive()
80    }
81}
82
83impl AgentLoop {
84    /// Create a new agent loop wrapping the given runtime and tool port.
85    #[must_use]
86    pub fn new(runtime: Arc<dyn AgentRuntime>, tools: Arc<dyn ToolPort>) -> Self {
87        Self { runtime, tools, store: None, tape: None, events: None, system_prompt_override: None }
88    }
89
90    /// Attach a session store for slash commands that inspect persisted state.
91    #[must_use]
92    pub fn with_store(mut self, store: Arc<dyn SessionStore>) -> Self {
93        self.store = Some(store);
94        self
95    }
96
97    /// Attach a tape store for persistent conversation recording.
98    #[must_use]
99    pub fn with_tape(mut self, tape: Arc<dyn bob_core::ports::TapeStorePort>) -> Self {
100        self.tape = Some(tape);
101        self
102    }
103
104    /// Attach an event sink for observability.
105    #[must_use]
106    pub fn with_events(mut self, events: Arc<dyn EventSink>) -> Self {
107        self.events = Some(events);
108        self
109    }
110
111    /// Set a system prompt that overrides the default runtime prompt.
112    ///
113    /// This is typically loaded from a workspace file (e.g. `.agent/system-prompt.md`)
114    /// at the composition root. The content is passed as a pre-loaded string to
115    /// keep the runtime free from filesystem dependencies.
116    #[must_use]
117    pub fn with_system_prompt(mut self, prompt: String) -> Self {
118        self.system_prompt_override = Some(prompt);
119        self
120    }
121
122    /// Process a single user input and return the appropriate output.
123    ///
124    /// Slash commands are handled deterministically. Natural language input
125    /// is forwarded to the LLM pipeline.
126    pub async fn handle_input(
127        &self,
128        input: &str,
129        session_id: &str,
130    ) -> Result<AgentLoopOutput, AgentError> {
131        self.handle_input_with_context(input, session_id, RequestContext::default()).await
132    }
133
134    /// Process a single user input using an explicit request context.
135    pub async fn handle_input_with_context(
136        &self,
137        input: &str,
138        session_id: &str,
139        context: RequestContext,
140    ) -> Result<AgentLoopOutput, AgentError> {
141        let sid = session_id.to_string();
142
143        match router::route(input) {
144            RouteResult::SlashCommand(cmd) => self.execute_command(cmd, &sid).await,
145            RouteResult::NaturalLanguage(text) => {
146                if let Some(ref tape) = self.tape {
147                    let _ = tape
148                        .append(
149                            &sid,
150                            TapeEntryKind::Message {
151                                role: bob_core::types::Role::User,
152                                content: text.clone(),
153                            },
154                        )
155                        .await;
156                }
157                self.execute_llm(&text, &sid, context).await
158            }
159        }
160    }
161
162    /// Execute a deterministic slash command.
163    async fn execute_command(
164        &self,
165        cmd: SlashCommand,
166        session_id: &String,
167    ) -> Result<AgentLoopOutput, AgentError> {
168        match cmd {
169            SlashCommand::Help => Ok(AgentLoopOutput::CommandOutput(help_text())),
170
171            SlashCommand::Tools => {
172                let tools = self.tools.list_tools().await?;
173                let mut out = String::from("Registered tools:\n");
174                for tool in &tools {
175                    out.push_str(&format!("  - {}: {}\n", tool.id, tool.description));
176                }
177                if tools.is_empty() {
178                    out.push_str("  (none)\n");
179                }
180                Ok(AgentLoopOutput::CommandOutput(out))
181            }
182
183            SlashCommand::ToolDescribe { name } => {
184                let tools = self.tools.list_tools().await?;
185                let found = tools.iter().find(|t| t.id == name);
186                let out = match found {
187                    Some(tool) => {
188                        format!(
189                            "Tool: {}\nDescription: {}\nSource: {:?}\nSchema:\n{}",
190                            tool.id,
191                            tool.description,
192                            tool.source,
193                            serde_json::to_string_pretty(&tool.input_schema).unwrap_or_default()
194                        )
195                    }
196                    None => {
197                        format!("Tool '{}' not found. Use /tools to list available tools.", name)
198                    }
199                };
200                Ok(AgentLoopOutput::CommandOutput(out))
201            }
202
203            SlashCommand::TapeSearch { query } => {
204                let out = if let Some(ref tape) = self.tape {
205                    let results = tape.search(session_id, &query).await?;
206                    format_search_results(&results)
207                } else {
208                    "Tape not configured.".to_string()
209                };
210                Ok(AgentLoopOutput::CommandOutput(out))
211            }
212
213            SlashCommand::TapeInfo => {
214                let out = if let Some(ref tape) = self.tape {
215                    let entries = tape.all_entries(session_id).await?;
216                    let anchors = tape.anchors(session_id).await?;
217                    format!("Tape: {} entries, {} anchors", entries.len(), anchors.len())
218                } else {
219                    "Tape not configured.".to_string()
220                };
221                Ok(AgentLoopOutput::CommandOutput(out))
222            }
223
224            SlashCommand::Anchors => {
225                let out = if let Some(ref tape) = self.tape {
226                    let anchors = tape.anchors(session_id).await?;
227                    if anchors.is_empty() {
228                        "No anchors in tape.".to_string()
229                    } else {
230                        let mut buf = String::from("Anchors:\n");
231                        for a in &anchors {
232                            if let TapeEntryKind::Anchor { ref name, .. } = a.kind {
233                                buf.push_str(&format!("  [{}] {}\n", a.id, name));
234                            }
235                        }
236                        buf
237                    }
238                } else {
239                    "Tape not configured.".to_string()
240                };
241                Ok(AgentLoopOutput::CommandOutput(out))
242            }
243
244            SlashCommand::Handoff { name } => {
245                let handoff_name = name.unwrap_or_else(|| "manual".to_string());
246                let reset_applied = if let Some(ref store) = self.store {
247                    let retained_usage = store
248                        .load(session_id)
249                        .await?
250                        .map_or_else(TokenUsage::default, |state| state.total_usage);
251                    store
252                        .save(
253                            session_id,
254                            &bob_core::types::SessionState {
255                                messages: Vec::new(),
256                                total_usage: retained_usage,
257                                ..Default::default()
258                            },
259                        )
260                        .await?;
261                    true
262                } else {
263                    false
264                };
265
266                if let Some(ref tape) = self.tape {
267                    let all = tape.all_entries(session_id).await?;
268                    let _ = tape
269                        .append(
270                            session_id,
271                            TapeEntryKind::Handoff {
272                                name: handoff_name.clone(),
273                                entries_before: all.len() as u64,
274                                summary: None,
275                            },
276                        )
277                        .await;
278                    let message = if reset_applied {
279                        format!("Handoff '{}' created. Context window reset.", handoff_name)
280                    } else {
281                        format!(
282                            "Handoff '{}' recorded, but session store is not configured so context was not reset.",
283                            handoff_name
284                        )
285                    };
286                    Ok(AgentLoopOutput::CommandOutput(message))
287                } else if reset_applied {
288                    Ok(AgentLoopOutput::CommandOutput(format!(
289                        "Context window reset for handoff '{}'. Tape not configured.",
290                        handoff_name
291                    )))
292                } else {
293                    Ok(AgentLoopOutput::CommandOutput(
294                        "Handoff requires a session store or tape configuration.".to_string(),
295                    ))
296                }
297            }
298
299            SlashCommand::Usage => {
300                let out = if let Some(ref store) = self.store {
301                    let session = store.load(session_id).await?;
302                    format_usage_summary(session.as_ref().map(|state| &state.total_usage))
303                } else {
304                    "Session store not configured.".to_string()
305                };
306                Ok(AgentLoopOutput::CommandOutput(out))
307            }
308
309            SlashCommand::Quit => Ok(AgentLoopOutput::Quit),
310
311            SlashCommand::Shell { command } => {
312                // Shell execution is a convenience fallback.
313                // In production, this should be gated by approval policies.
314                Ok(AgentLoopOutput::CommandOutput(format!(
315                    "Shell execution not yet implemented: {}",
316                    command
317                )))
318            }
319        }
320    }
321
322    /// Forward natural language input to the LLM pipeline.
323    async fn execute_llm(
324        &self,
325        text: &str,
326        session_id: &String,
327        mut context: RequestContext,
328    ) -> Result<AgentLoopOutput, AgentError> {
329        if let Some(ref prompt) = self.system_prompt_override {
330            context.system_prompt = Some(prompt.clone());
331        }
332
333        let req = AgentRequest {
334            input: text.to_string(),
335            session_id: session_id.clone(),
336            model: None,
337            context,
338            cancel_token: None,
339            output_schema: None,
340            max_output_retries: 0,
341        };
342
343        let result = self.runtime.run(req).await?;
344
345        // Record assistant response to tape.
346        if let Some(ref tape) = self.tape {
347            let AgentRunResult::Finished(ref resp) = result;
348            let _ = tape
349                .append(
350                    session_id,
351                    TapeEntryKind::Message {
352                        role: bob_core::types::Role::Assistant,
353                        content: resp.content.clone(),
354                    },
355                )
356                .await;
357        }
358
359        Ok(AgentLoopOutput::Response(result))
360    }
361}
362
363/// Format tape search results into a human-readable string.
364fn format_search_results(results: &[TapeSearchResult]) -> String {
365    if results.is_empty() {
366        return "No results found.".to_string();
367    }
368    let mut buf = format!("{} result(s):\n", results.len());
369    for r in results {
370        buf.push_str(&format!("  [{}] {}\n", r.entry.id, r.snippet));
371    }
372    buf
373}
374
375fn format_usage_summary(usage: Option<&TokenUsage>) -> String {
376    let usage = usage.cloned().unwrap_or_default();
377    format!(
378        "Session usage:\n  Prompt tokens: {}\n  Completion tokens: {}\n  Total tokens: {}",
379        usage.prompt_tokens,
380        usage.completion_tokens,
381        usage.total(),
382    )
383}
384
385#[cfg(test)]
386mod tests {
387    use std::sync::{Arc, Mutex};
388
389    use bob_core::{
390        error::{AgentError, StoreError, ToolError},
391        ports::{TapeStorePort, ToolPort},
392        tape::{TapeEntry, TapeEntryKind, TapeSearchResult},
393        types::{
394            AgentEventStream, AgentResponse, FinishReason, RuntimeHealth, SessionId, SessionState,
395            ToolCall, ToolDescriptor, ToolResult,
396        },
397    };
398
399    use super::*;
400
401    struct StubRuntime;
402
403    #[async_trait::async_trait]
404    impl AgentRuntime for StubRuntime {
405        async fn run(&self, _req: AgentRequest) -> Result<AgentRunResult, AgentError> {
406            Ok(AgentRunResult::Finished(AgentResponse {
407                content: "stub".to_string(),
408                tool_transcript: Vec::new(),
409                usage: TokenUsage::default(),
410                finish_reason: FinishReason::Stop,
411            }))
412        }
413
414        async fn run_stream(&self, _req: AgentRequest) -> Result<AgentEventStream, AgentError> {
415            Err(AgentError::Config("unused in test".to_string()))
416        }
417
418        async fn health(&self) -> RuntimeHealth {
419            RuntimeHealth {
420                status: bob_core::types::HealthStatus::Healthy,
421                llm_ready: true,
422                mcp_pool_ready: true,
423            }
424        }
425    }
426
427    struct StubTools;
428
429    #[async_trait::async_trait]
430    impl ToolPort for StubTools {
431        async fn list_tools(&self) -> Result<Vec<ToolDescriptor>, ToolError> {
432            Ok(Vec::new())
433        }
434
435        async fn call_tool(&self, call: ToolCall) -> Result<ToolResult, ToolError> {
436            Err(ToolError::NotFound { name: call.name })
437        }
438    }
439
440    struct StaticSessionStore {
441        state: SessionState,
442    }
443
444    #[async_trait::async_trait]
445    impl SessionStore for StaticSessionStore {
446        async fn load(&self, _id: &SessionId) -> Result<Option<SessionState>, StoreError> {
447            Ok(Some(self.state.clone()))
448        }
449
450        async fn save(&self, _id: &SessionId, _state: &SessionState) -> Result<(), StoreError> {
451            Ok(())
452        }
453    }
454
455    #[derive(Default)]
456    struct MemorySessionStore {
457        state: Mutex<Option<SessionState>>,
458    }
459
460    #[async_trait::async_trait]
461    impl SessionStore for MemorySessionStore {
462        async fn load(&self, _id: &SessionId) -> Result<Option<SessionState>, StoreError> {
463            Ok(self.state.lock().unwrap_or_else(|poisoned| poisoned.into_inner()).clone())
464        }
465
466        async fn save(&self, _id: &SessionId, state: &SessionState) -> Result<(), StoreError> {
467            *self.state.lock().unwrap_or_else(|poisoned| poisoned.into_inner()) =
468                Some(state.clone());
469            Ok(())
470        }
471    }
472
473    #[derive(Default)]
474    struct MemoryTapeStore {
475        entries: Mutex<Vec<TapeEntry>>,
476    }
477
478    #[async_trait::async_trait]
479    impl TapeStorePort for MemoryTapeStore {
480        async fn append(
481            &self,
482            _session_id: &SessionId,
483            kind: TapeEntryKind,
484        ) -> Result<TapeEntry, StoreError> {
485            let mut entries = self.entries.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
486            let entry = TapeEntry { id: entries.len() as u64 + 1, kind, timestamp_ms: 0 };
487            entries.push(entry.clone());
488            Ok(entry)
489        }
490
491        async fn entries_since_last_handoff(
492            &self,
493            _session_id: &SessionId,
494        ) -> Result<Vec<TapeEntry>, StoreError> {
495            Ok(Vec::new())
496        }
497
498        async fn search(
499            &self,
500            _session_id: &SessionId,
501            _query: &str,
502        ) -> Result<Vec<TapeSearchResult>, StoreError> {
503            Ok(Vec::new())
504        }
505
506        async fn all_entries(&self, _session_id: &SessionId) -> Result<Vec<TapeEntry>, StoreError> {
507            Ok(self.entries.lock().unwrap_or_else(|poisoned| poisoned.into_inner()).clone())
508        }
509
510        async fn anchors(&self, _session_id: &SessionId) -> Result<Vec<TapeEntry>, StoreError> {
511            Ok(Vec::new())
512        }
513    }
514
515    #[tokio::test]
516    async fn usage_command_reads_total_usage_from_store() {
517        let store = Arc::new(StaticSessionStore {
518            state: SessionState {
519                messages: Vec::new(),
520                total_usage: TokenUsage { prompt_tokens: 12, completion_tokens: 8 },
521                ..Default::default()
522            },
523        });
524        let loop_ = AgentLoop::new(Arc::new(StubRuntime), Arc::new(StubTools)).with_store(store);
525
526        let output = loop_.handle_input("/usage", "session-1").await;
527
528        match output {
529            Ok(AgentLoopOutput::CommandOutput(body)) => {
530                assert!(body.contains("Prompt tokens: 12"));
531                assert!(body.contains("Completion tokens: 8"));
532                assert!(body.contains("Total tokens: 20"));
533            }
534            Ok(other) => panic!("expected usage command output, got {other:?}"),
535            Err(err) => panic!("usage command failed: {err}"),
536        }
537    }
538
539    #[tokio::test]
540    async fn slash_commands_do_not_append_user_messages_to_tape() {
541        let store = Arc::new(StaticSessionStore {
542            state: SessionState {
543                messages: Vec::new(),
544                total_usage: TokenUsage { prompt_tokens: 12, completion_tokens: 8 },
545                ..Default::default()
546            },
547        });
548        let tape = Arc::new(MemoryTapeStore::default());
549        let loop_ = AgentLoop::new(Arc::new(StubRuntime), Arc::new(StubTools))
550            .with_store(store)
551            .with_tape(tape.clone());
552
553        let output = loop_.handle_input("/usage", "session-1").await;
554
555        match output {
556            Ok(AgentLoopOutput::CommandOutput(body)) => {
557                assert!(body.contains("Total tokens: 20"));
558            }
559            Ok(other) => panic!("expected usage command output, got {other:?}"),
560            Err(err) => panic!("usage command failed: {err}"),
561        }
562
563        let entries = tape.all_entries(&"session-1".to_string()).await;
564        let entries = match entries {
565            Ok(entries) => entries,
566            Err(err) => panic!("failed to read tape entries: {err}"),
567        };
568        assert!(entries.is_empty(), "slash commands should not be recorded as tape messages");
569    }
570
571    #[tokio::test]
572    async fn natural_language_turns_still_append_user_and_assistant_messages_to_tape() {
573        let tape = Arc::new(MemoryTapeStore::default());
574        let loop_ =
575            AgentLoop::new(Arc::new(StubRuntime), Arc::new(StubTools)).with_tape(tape.clone());
576
577        let output = loop_.handle_input("hello world", "session-1").await;
578
579        match output {
580            Ok(AgentLoopOutput::Response(AgentRunResult::Finished(resp))) => {
581                assert_eq!(resp.content, "stub");
582            }
583            Ok(other) => panic!("expected LLM response output, got {other:?}"),
584            Err(err) => panic!("natural language turn failed: {err}"),
585        }
586
587        let entries = tape.all_entries(&"session-1".to_string()).await;
588        let entries = match entries {
589            Ok(entries) => entries,
590            Err(err) => panic!("failed to read tape entries: {err}"),
591        };
592        assert_eq!(
593            entries.len(),
594            2,
595            "natural language turns should record both user and assistant"
596        );
597        assert!(matches!(
598            entries.first().map(|entry| &entry.kind),
599            Some(TapeEntryKind::Message { role: bob_core::types::Role::User, content })
600                if content == "hello world"
601        ));
602        assert!(matches!(
603            entries.get(1).map(|entry| &entry.kind),
604            Some(TapeEntryKind::Message { role: bob_core::types::Role::Assistant, content })
605                if content == "stub"
606        ));
607    }
608
609    #[tokio::test]
610    async fn handoff_resets_session_messages_but_keeps_usage() {
611        let store = Arc::new(MemorySessionStore {
612            state: Mutex::new(Some(SessionState {
613                messages: vec![
614                    bob_core::types::Message::text(bob_core::types::Role::User, "before"),
615                    bob_core::types::Message::text(bob_core::types::Role::Assistant, "answer"),
616                ],
617                total_usage: TokenUsage { prompt_tokens: 21, completion_tokens: 13 },
618                ..Default::default()
619            })),
620        });
621        let tape = Arc::new(MemoryTapeStore::default());
622        let loop_ = AgentLoop::new(Arc::new(StubRuntime), Arc::new(StubTools))
623            .with_store(store.clone())
624            .with_tape(tape.clone());
625
626        let output = loop_.handle_input("/handoff phase-2", "session-1").await;
627
628        match output {
629            Ok(AgentLoopOutput::CommandOutput(body)) => {
630                assert!(body.contains("Context window reset"));
631            }
632            Ok(other) => panic!("expected handoff command output, got {other:?}"),
633            Err(err) => panic!("handoff command failed: {err}"),
634        }
635
636        let saved = store.load(&"session-1".to_string()).await;
637        let saved = match saved {
638            Ok(Some(state)) => state,
639            other => panic!("expected saved session state, got {other:?}"),
640        };
641        assert!(saved.messages.is_empty(), "handoff should clear session messages");
642        assert_eq!(saved.total_usage.total(), 34, "handoff should preserve cumulative usage");
643
644        let entries = tape.all_entries(&"session-1".to_string()).await;
645        let entries = match entries {
646            Ok(entries) => entries,
647            Err(err) => panic!("failed to read tape entries: {err}"),
648        };
649        assert_eq!(entries.len(), 1, "handoff should not leave a slash-command message on tape");
650        assert!(
651            entries.iter().any(|entry| matches!(
652                entry.kind,
653                TapeEntryKind::Handoff { ref name, .. } if name == "phase-2"
654            )),
655            "handoff should be recorded to the tape",
656        );
657    }
658}