1use std::sync::Arc;
26
27use bob_core::{
28 error::AgentError,
29 ports::{EventSink, SessionStore, ToolPort},
30 tape::{TapeEntryKind, TapeSearchResult},
31 types::{AgentRequest, AgentRunResult, RequestContext, TokenUsage},
32};
33
34pub use crate::router::help_text;
36use crate::{
37 AgentRuntime,
38 router::{self, RouteResult, SlashCommand},
39};
40
41#[derive(Debug)]
43pub enum AgentLoopOutput {
44 Response(AgentRunResult),
46 CommandOutput(String),
48 Quit,
50}
51
52pub struct AgentLoop {
65 runtime: Arc<dyn AgentRuntime>,
66 tools: Arc<dyn ToolPort>,
67 store: Option<Arc<dyn SessionStore>>,
68 tape: Option<Arc<dyn bob_core::ports::TapeStorePort>>,
69 events: Option<Arc<dyn EventSink>>,
70 system_prompt_override: Option<String>,
71}
72
73impl std::fmt::Debug for AgentLoop {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 f.debug_struct("AgentLoop")
76 .field("has_store", &self.store.is_some())
77 .field("has_tape", &self.tape.is_some())
78 .field("has_system_prompt_override", &self.system_prompt_override.is_some())
79 .finish_non_exhaustive()
80 }
81}
82
83impl AgentLoop {
84 #[must_use]
86 pub fn new(runtime: Arc<dyn AgentRuntime>, tools: Arc<dyn ToolPort>) -> Self {
87 Self { runtime, tools, store: None, tape: None, events: None, system_prompt_override: None }
88 }
89
90 #[must_use]
92 pub fn with_store(mut self, store: Arc<dyn SessionStore>) -> Self {
93 self.store = Some(store);
94 self
95 }
96
97 #[must_use]
99 pub fn with_tape(mut self, tape: Arc<dyn bob_core::ports::TapeStorePort>) -> Self {
100 self.tape = Some(tape);
101 self
102 }
103
104 #[must_use]
106 pub fn with_events(mut self, events: Arc<dyn EventSink>) -> Self {
107 self.events = Some(events);
108 self
109 }
110
111 #[must_use]
117 pub fn with_system_prompt(mut self, prompt: String) -> Self {
118 self.system_prompt_override = Some(prompt);
119 self
120 }
121
122 pub async fn handle_input(
127 &self,
128 input: &str,
129 session_id: &str,
130 ) -> Result<AgentLoopOutput, AgentError> {
131 self.handle_input_with_context(input, session_id, RequestContext::default()).await
132 }
133
134 pub async fn handle_input_with_context(
136 &self,
137 input: &str,
138 session_id: &str,
139 context: RequestContext,
140 ) -> Result<AgentLoopOutput, AgentError> {
141 let sid = session_id.to_string();
142
143 match router::route(input) {
144 RouteResult::SlashCommand(cmd) => self.execute_command(cmd, &sid).await,
145 RouteResult::NaturalLanguage(text) => {
146 if let Some(ref tape) = self.tape {
147 let _ = tape
148 .append(
149 &sid,
150 TapeEntryKind::Message {
151 role: bob_core::types::Role::User,
152 content: text.clone(),
153 },
154 )
155 .await;
156 }
157 self.execute_llm(&text, &sid, context).await
158 }
159 }
160 }
161
162 async fn execute_command(
164 &self,
165 cmd: SlashCommand,
166 session_id: &String,
167 ) -> Result<AgentLoopOutput, AgentError> {
168 match cmd {
169 SlashCommand::Help => Ok(AgentLoopOutput::CommandOutput(help_text())),
170
171 SlashCommand::Tools => {
172 let tools = self.tools.list_tools().await?;
173 let mut out = String::from("Registered tools:\n");
174 for tool in &tools {
175 out.push_str(&format!(" - {}: {}\n", tool.id, tool.description));
176 }
177 if tools.is_empty() {
178 out.push_str(" (none)\n");
179 }
180 Ok(AgentLoopOutput::CommandOutput(out))
181 }
182
183 SlashCommand::ToolDescribe { name } => {
184 let tools = self.tools.list_tools().await?;
185 let found = tools.iter().find(|t| t.id == name);
186 let out = match found {
187 Some(tool) => {
188 format!(
189 "Tool: {}\nDescription: {}\nSource: {:?}\nSchema:\n{}",
190 tool.id,
191 tool.description,
192 tool.source,
193 serde_json::to_string_pretty(&tool.input_schema).unwrap_or_default()
194 )
195 }
196 None => {
197 format!("Tool '{}' not found. Use /tools to list available tools.", name)
198 }
199 };
200 Ok(AgentLoopOutput::CommandOutput(out))
201 }
202
203 SlashCommand::TapeSearch { query } => {
204 let out = if let Some(ref tape) = self.tape {
205 let results = tape.search(session_id, &query).await?;
206 format_search_results(&results)
207 } else {
208 "Tape not configured.".to_string()
209 };
210 Ok(AgentLoopOutput::CommandOutput(out))
211 }
212
213 SlashCommand::TapeInfo => {
214 let out = if let Some(ref tape) = self.tape {
215 let entries = tape.all_entries(session_id).await?;
216 let anchors = tape.anchors(session_id).await?;
217 format!("Tape: {} entries, {} anchors", entries.len(), anchors.len())
218 } else {
219 "Tape not configured.".to_string()
220 };
221 Ok(AgentLoopOutput::CommandOutput(out))
222 }
223
224 SlashCommand::Anchors => {
225 let out = if let Some(ref tape) = self.tape {
226 let anchors = tape.anchors(session_id).await?;
227 if anchors.is_empty() {
228 "No anchors in tape.".to_string()
229 } else {
230 let mut buf = String::from("Anchors:\n");
231 for a in &anchors {
232 if let TapeEntryKind::Anchor { ref name, .. } = a.kind {
233 buf.push_str(&format!(" [{}] {}\n", a.id, name));
234 }
235 }
236 buf
237 }
238 } else {
239 "Tape not configured.".to_string()
240 };
241 Ok(AgentLoopOutput::CommandOutput(out))
242 }
243
244 SlashCommand::Handoff { name } => {
245 let handoff_name = name.unwrap_or_else(|| "manual".to_string());
246 let reset_applied = if let Some(ref store) = self.store {
247 let retained_usage = store
248 .load(session_id)
249 .await?
250 .map_or_else(TokenUsage::default, |state| state.total_usage);
251 store
252 .save(
253 session_id,
254 &bob_core::types::SessionState {
255 messages: Vec::new(),
256 total_usage: retained_usage,
257 ..Default::default()
258 },
259 )
260 .await?;
261 true
262 } else {
263 false
264 };
265
266 if let Some(ref tape) = self.tape {
267 let all = tape.all_entries(session_id).await?;
268 let _ = tape
269 .append(
270 session_id,
271 TapeEntryKind::Handoff {
272 name: handoff_name.clone(),
273 entries_before: all.len() as u64,
274 summary: None,
275 },
276 )
277 .await;
278 let message = if reset_applied {
279 format!("Handoff '{}' created. Context window reset.", handoff_name)
280 } else {
281 format!(
282 "Handoff '{}' recorded, but session store is not configured so context was not reset.",
283 handoff_name
284 )
285 };
286 Ok(AgentLoopOutput::CommandOutput(message))
287 } else if reset_applied {
288 Ok(AgentLoopOutput::CommandOutput(format!(
289 "Context window reset for handoff '{}'. Tape not configured.",
290 handoff_name
291 )))
292 } else {
293 Ok(AgentLoopOutput::CommandOutput(
294 "Handoff requires a session store or tape configuration.".to_string(),
295 ))
296 }
297 }
298
299 SlashCommand::Usage => {
300 let out = if let Some(ref store) = self.store {
301 let session = store.load(session_id).await?;
302 format_usage_summary(session.as_ref().map(|state| &state.total_usage))
303 } else {
304 "Session store not configured.".to_string()
305 };
306 Ok(AgentLoopOutput::CommandOutput(out))
307 }
308
309 SlashCommand::Quit => Ok(AgentLoopOutput::Quit),
310
311 SlashCommand::Shell { command } => {
312 Ok(AgentLoopOutput::CommandOutput(format!(
315 "Shell execution not yet implemented: {}",
316 command
317 )))
318 }
319 }
320 }
321
322 async fn execute_llm(
324 &self,
325 text: &str,
326 session_id: &String,
327 mut context: RequestContext,
328 ) -> Result<AgentLoopOutput, AgentError> {
329 if let Some(ref prompt) = self.system_prompt_override {
330 context.system_prompt = Some(prompt.clone());
331 }
332
333 let req = AgentRequest {
334 input: text.to_string(),
335 session_id: session_id.clone(),
336 model: None,
337 context,
338 cancel_token: None,
339 output_schema: None,
340 max_output_retries: 0,
341 };
342
343 let result = self.runtime.run(req).await?;
344
345 if let Some(ref tape) = self.tape {
347 let AgentRunResult::Finished(ref resp) = result;
348 let _ = tape
349 .append(
350 session_id,
351 TapeEntryKind::Message {
352 role: bob_core::types::Role::Assistant,
353 content: resp.content.clone(),
354 },
355 )
356 .await;
357 }
358
359 Ok(AgentLoopOutput::Response(result))
360 }
361}
362
363fn format_search_results(results: &[TapeSearchResult]) -> String {
365 if results.is_empty() {
366 return "No results found.".to_string();
367 }
368 let mut buf = format!("{} result(s):\n", results.len());
369 for r in results {
370 buf.push_str(&format!(" [{}] {}\n", r.entry.id, r.snippet));
371 }
372 buf
373}
374
375fn format_usage_summary(usage: Option<&TokenUsage>) -> String {
376 let usage = usage.cloned().unwrap_or_default();
377 format!(
378 "Session usage:\n Prompt tokens: {}\n Completion tokens: {}\n Total tokens: {}",
379 usage.prompt_tokens,
380 usage.completion_tokens,
381 usage.total(),
382 )
383}
384
385#[cfg(test)]
386mod tests {
387 use std::sync::{Arc, Mutex};
388
389 use bob_core::{
390 error::{AgentError, StoreError, ToolError},
391 ports::{TapeStorePort, ToolPort},
392 tape::{TapeEntry, TapeEntryKind, TapeSearchResult},
393 types::{
394 AgentEventStream, AgentResponse, FinishReason, RuntimeHealth, SessionId, SessionState,
395 ToolCall, ToolDescriptor, ToolResult,
396 },
397 };
398
399 use super::*;
400
401 struct StubRuntime;
402
403 #[async_trait::async_trait]
404 impl AgentRuntime for StubRuntime {
405 async fn run(&self, _req: AgentRequest) -> Result<AgentRunResult, AgentError> {
406 Ok(AgentRunResult::Finished(AgentResponse {
407 content: "stub".to_string(),
408 tool_transcript: Vec::new(),
409 usage: TokenUsage::default(),
410 finish_reason: FinishReason::Stop,
411 }))
412 }
413
414 async fn run_stream(&self, _req: AgentRequest) -> Result<AgentEventStream, AgentError> {
415 Err(AgentError::Config("unused in test".to_string()))
416 }
417
418 async fn health(&self) -> RuntimeHealth {
419 RuntimeHealth {
420 status: bob_core::types::HealthStatus::Healthy,
421 llm_ready: true,
422 mcp_pool_ready: true,
423 }
424 }
425 }
426
427 struct StubTools;
428
429 #[async_trait::async_trait]
430 impl ToolPort for StubTools {
431 async fn list_tools(&self) -> Result<Vec<ToolDescriptor>, ToolError> {
432 Ok(Vec::new())
433 }
434
435 async fn call_tool(&self, call: ToolCall) -> Result<ToolResult, ToolError> {
436 Err(ToolError::NotFound { name: call.name })
437 }
438 }
439
440 struct StaticSessionStore {
441 state: SessionState,
442 }
443
444 #[async_trait::async_trait]
445 impl SessionStore for StaticSessionStore {
446 async fn load(&self, _id: &SessionId) -> Result<Option<SessionState>, StoreError> {
447 Ok(Some(self.state.clone()))
448 }
449
450 async fn save(&self, _id: &SessionId, _state: &SessionState) -> Result<(), StoreError> {
451 Ok(())
452 }
453 }
454
455 #[derive(Default)]
456 struct MemorySessionStore {
457 state: Mutex<Option<SessionState>>,
458 }
459
460 #[async_trait::async_trait]
461 impl SessionStore for MemorySessionStore {
462 async fn load(&self, _id: &SessionId) -> Result<Option<SessionState>, StoreError> {
463 Ok(self.state.lock().unwrap_or_else(|poisoned| poisoned.into_inner()).clone())
464 }
465
466 async fn save(&self, _id: &SessionId, state: &SessionState) -> Result<(), StoreError> {
467 *self.state.lock().unwrap_or_else(|poisoned| poisoned.into_inner()) =
468 Some(state.clone());
469 Ok(())
470 }
471 }
472
473 #[derive(Default)]
474 struct MemoryTapeStore {
475 entries: Mutex<Vec<TapeEntry>>,
476 }
477
478 #[async_trait::async_trait]
479 impl TapeStorePort for MemoryTapeStore {
480 async fn append(
481 &self,
482 _session_id: &SessionId,
483 kind: TapeEntryKind,
484 ) -> Result<TapeEntry, StoreError> {
485 let mut entries = self.entries.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
486 let entry = TapeEntry { id: entries.len() as u64 + 1, kind, timestamp_ms: 0 };
487 entries.push(entry.clone());
488 Ok(entry)
489 }
490
491 async fn entries_since_last_handoff(
492 &self,
493 _session_id: &SessionId,
494 ) -> Result<Vec<TapeEntry>, StoreError> {
495 Ok(Vec::new())
496 }
497
498 async fn search(
499 &self,
500 _session_id: &SessionId,
501 _query: &str,
502 ) -> Result<Vec<TapeSearchResult>, StoreError> {
503 Ok(Vec::new())
504 }
505
506 async fn all_entries(&self, _session_id: &SessionId) -> Result<Vec<TapeEntry>, StoreError> {
507 Ok(self.entries.lock().unwrap_or_else(|poisoned| poisoned.into_inner()).clone())
508 }
509
510 async fn anchors(&self, _session_id: &SessionId) -> Result<Vec<TapeEntry>, StoreError> {
511 Ok(Vec::new())
512 }
513 }
514
515 #[tokio::test]
516 async fn usage_command_reads_total_usage_from_store() {
517 let store = Arc::new(StaticSessionStore {
518 state: SessionState {
519 messages: Vec::new(),
520 total_usage: TokenUsage { prompt_tokens: 12, completion_tokens: 8 },
521 ..Default::default()
522 },
523 });
524 let loop_ = AgentLoop::new(Arc::new(StubRuntime), Arc::new(StubTools)).with_store(store);
525
526 let output = loop_.handle_input("/usage", "session-1").await;
527
528 match output {
529 Ok(AgentLoopOutput::CommandOutput(body)) => {
530 assert!(body.contains("Prompt tokens: 12"));
531 assert!(body.contains("Completion tokens: 8"));
532 assert!(body.contains("Total tokens: 20"));
533 }
534 Ok(other) => panic!("expected usage command output, got {other:?}"),
535 Err(err) => panic!("usage command failed: {err}"),
536 }
537 }
538
539 #[tokio::test]
540 async fn slash_commands_do_not_append_user_messages_to_tape() {
541 let store = Arc::new(StaticSessionStore {
542 state: SessionState {
543 messages: Vec::new(),
544 total_usage: TokenUsage { prompt_tokens: 12, completion_tokens: 8 },
545 ..Default::default()
546 },
547 });
548 let tape = Arc::new(MemoryTapeStore::default());
549 let loop_ = AgentLoop::new(Arc::new(StubRuntime), Arc::new(StubTools))
550 .with_store(store)
551 .with_tape(tape.clone());
552
553 let output = loop_.handle_input("/usage", "session-1").await;
554
555 match output {
556 Ok(AgentLoopOutput::CommandOutput(body)) => {
557 assert!(body.contains("Total tokens: 20"));
558 }
559 Ok(other) => panic!("expected usage command output, got {other:?}"),
560 Err(err) => panic!("usage command failed: {err}"),
561 }
562
563 let entries = tape.all_entries(&"session-1".to_string()).await;
564 let entries = match entries {
565 Ok(entries) => entries,
566 Err(err) => panic!("failed to read tape entries: {err}"),
567 };
568 assert!(entries.is_empty(), "slash commands should not be recorded as tape messages");
569 }
570
571 #[tokio::test]
572 async fn natural_language_turns_still_append_user_and_assistant_messages_to_tape() {
573 let tape = Arc::new(MemoryTapeStore::default());
574 let loop_ =
575 AgentLoop::new(Arc::new(StubRuntime), Arc::new(StubTools)).with_tape(tape.clone());
576
577 let output = loop_.handle_input("hello world", "session-1").await;
578
579 match output {
580 Ok(AgentLoopOutput::Response(AgentRunResult::Finished(resp))) => {
581 assert_eq!(resp.content, "stub");
582 }
583 Ok(other) => panic!("expected LLM response output, got {other:?}"),
584 Err(err) => panic!("natural language turn failed: {err}"),
585 }
586
587 let entries = tape.all_entries(&"session-1".to_string()).await;
588 let entries = match entries {
589 Ok(entries) => entries,
590 Err(err) => panic!("failed to read tape entries: {err}"),
591 };
592 assert_eq!(
593 entries.len(),
594 2,
595 "natural language turns should record both user and assistant"
596 );
597 assert!(matches!(
598 entries.first().map(|entry| &entry.kind),
599 Some(TapeEntryKind::Message { role: bob_core::types::Role::User, content })
600 if content == "hello world"
601 ));
602 assert!(matches!(
603 entries.get(1).map(|entry| &entry.kind),
604 Some(TapeEntryKind::Message { role: bob_core::types::Role::Assistant, content })
605 if content == "stub"
606 ));
607 }
608
609 #[tokio::test]
610 async fn handoff_resets_session_messages_but_keeps_usage() {
611 let store = Arc::new(MemorySessionStore {
612 state: Mutex::new(Some(SessionState {
613 messages: vec![
614 bob_core::types::Message::text(bob_core::types::Role::User, "before"),
615 bob_core::types::Message::text(bob_core::types::Role::Assistant, "answer"),
616 ],
617 total_usage: TokenUsage { prompt_tokens: 21, completion_tokens: 13 },
618 ..Default::default()
619 })),
620 });
621 let tape = Arc::new(MemoryTapeStore::default());
622 let loop_ = AgentLoop::new(Arc::new(StubRuntime), Arc::new(StubTools))
623 .with_store(store.clone())
624 .with_tape(tape.clone());
625
626 let output = loop_.handle_input("/handoff phase-2", "session-1").await;
627
628 match output {
629 Ok(AgentLoopOutput::CommandOutput(body)) => {
630 assert!(body.contains("Context window reset"));
631 }
632 Ok(other) => panic!("expected handoff command output, got {other:?}"),
633 Err(err) => panic!("handoff command failed: {err}"),
634 }
635
636 let saved = store.load(&"session-1".to_string()).await;
637 let saved = match saved {
638 Ok(Some(state)) => state,
639 other => panic!("expected saved session state, got {other:?}"),
640 };
641 assert!(saved.messages.is_empty(), "handoff should clear session messages");
642 assert_eq!(saved.total_usage.total(), 34, "handoff should preserve cumulative usage");
643
644 let entries = tape.all_entries(&"session-1".to_string()).await;
645 let entries = match entries {
646 Ok(entries) => entries,
647 Err(err) => panic!("failed to read tape entries: {err}"),
648 };
649 assert_eq!(entries.len(), 1, "handoff should not leave a slash-command message on tape");
650 assert!(
651 entries.iter().any(|entry| matches!(
652 entry.kind,
653 TapeEntryKind::Handoff { ref name, .. } if name == "phase-2"
654 )),
655 "handoff should be recorded to the tape",
656 );
657 }
658}