use async_trait::async_trait;
use serde::Deserialize;
use serde_json::json;
use std::sync::Arc;
use bamboo_agent_core::storage::Storage;
use bamboo_agent_core::tools::{Tool, ToolError, ToolExecutionContext, ToolResult};
use bamboo_agent_core::MessagePart;
use bamboo_agent_core::{Message, Role, SessionKind};
use bamboo_infrastructure::{SessionIndexEntry, SessionStoreV2};
pub struct SessionInspectorTool {
session_store: Arc<SessionStoreV2>,
storage: Arc<dyn Storage>,
}
impl SessionInspectorTool {
pub fn new(session_store: Arc<SessionStoreV2>, storage: Arc<dyn Storage>) -> Self {
Self {
session_store,
storage,
}
}
async fn load_session(
&self,
session_id: &str,
) -> Result<bamboo_agent_core::Session, ToolError> {
match self.storage.load_session(session_id).await {
Ok(Some(s)) => Ok(s),
Ok(None) => Err(ToolError::Execution(format!(
"session not found: {session_id}"
))),
Err(e) => Err(ToolError::Execution(format!(
"failed to load session {session_id}: {e}"
))),
}
}
}
#[derive(Debug, Deserialize)]
#[serde(tag = "action", rename_all = "snake_case")]
enum SessionInspectorArgs {
List {
#[serde(default)]
query: Option<String>,
#[serde(default)]
kind: Option<String>, #[serde(default)]
pinned: Option<bool>,
#[serde(default)]
parent_session_id: Option<String>,
#[serde(default)]
root_session_id: Option<String>,
#[serde(default)]
created_by_schedule_id: Option<String>,
#[serde(default)]
limit: Option<usize>,
#[serde(default)]
offset: Option<usize>,
},
GetMeta { session_id: String },
ReadMessages {
session_id: String,
#[serde(default)]
from_end: Option<bool>,
#[serde(default)]
offset: Option<usize>,
#[serde(default)]
limit: Option<usize>,
#[serde(default)]
truncate_chars: Option<usize>,
#[serde(default)]
include_system: Option<bool>,
#[serde(default)]
include_tool: Option<bool>,
#[serde(default)]
include_tool_calls: Option<bool>,
#[serde(default)]
include_image_urls: Option<bool>,
},
ReadCompressedCache {
session_id: String,
#[serde(default)]
offset: Option<usize>,
#[serde(default)]
limit: Option<usize>,
#[serde(default)]
truncate_chars: Option<usize>,
#[serde(default)]
include_summary: Option<bool>,
},
Search {
query: String,
#[serde(default)]
mode: Option<String>,
#[serde(default)]
max_sessions: Option<usize>,
#[serde(default)]
tail_messages: Option<usize>,
#[serde(default)]
case_sensitive: Option<bool>,
#[serde(default)]
max_matches: Option<usize>,
},
}
fn normalize_contains(haystack: &str, needle: &str, case_sensitive: bool) -> bool {
if case_sensitive {
haystack.contains(needle)
} else {
haystack
.to_ascii_lowercase()
.contains(&needle.to_ascii_lowercase())
}
}
fn truncate_string(s: &str, max_chars: usize) -> String {
if max_chars == 0 {
return String::new();
}
if s.chars().count() <= max_chars {
return s.to_string();
}
let mut out = String::with_capacity(max_chars + 3);
for (i, ch) in s.chars().enumerate() {
if i >= max_chars {
break;
}
out.push(ch);
}
out.push_str("...");
out
}
fn map_index_entry(e: &SessionIndexEntry) -> serde_json::Value {
json!({
"id": e.id,
"kind": e.kind,
"title": e.title,
"pinned": e.pinned,
"parent_session_id": e.parent_session_id,
"root_session_id": e.root_session_id,
"spawn_depth": e.spawn_depth,
"created_by_schedule_id": e.created_by_schedule_id,
"schedule_run_id": e.schedule_run_id,
"created_at": e.created_at,
"updated_at": e.updated_at,
"last_activity_at": e.last_activity_at,
"message_count": e.message_count,
"has_attachments": e.has_attachments,
"token_usage": e.token_usage,
"rel_path": e.rel_path,
})
}
fn extract_image_urls(msg: &Message) -> Vec<String> {
let mut out = Vec::new();
let Some(parts) = msg.content_parts.as_ref() else {
return out;
};
for p in parts {
if let MessagePart::ImageUrl { image_url } = p {
out.push(image_url.url.clone());
}
}
out
}
fn role_to_str(role: &Role) -> &'static str {
match role {
Role::System => "system",
Role::User => "user",
Role::Assistant => "assistant",
Role::Tool => "tool",
}
}
#[async_trait]
impl Tool for SessionInspectorTool {
fn name(&self) -> &str {
"recall"
}
fn description(&self) -> &str {
"Recall prior Bamboo context from local storage. Use this to list sessions, inspect metadata, read bounded message slices, read compressed recall, and search prior conversation history before asking the user to repeat information. Keep recall local by default; only use child-session delegation if the user explicitly asks for it."
}
fn parameters_schema(&self) -> serde_json::Value {
json!({
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["list", "get_meta", "read_messages", "read_compressed_cache", "search"],
"description": "Which inspection action to perform."
},
"query": { "type": "string", "description": "Search string (list/search)." },
"kind": { "type": "string", "enum": ["root", "child"], "description": "Filter by session kind (list)." },
"pinned": { "type": "boolean", "description": "Filter pinned sessions (list)." },
"parent_session_id": { "type": "string", "description": "Filter child sessions by parent (list)." },
"root_session_id": { "type": "string", "description": "Filter by root session (list)." },
"created_by_schedule_id": { "type": "string", "description": "Filter sessions created by a schedule (list)." },
"limit": { "type": "number", "description": "Max items/messages to return (list/read_messages)." },
"offset": { "type": "number", "description": "Offset (list/read_messages)." },
"session_id": { "type": "string", "description": "Target session id (get_meta/read_messages)." },
"from_end": { "type": "boolean", "description": "Read from end (read_messages)." },
"truncate_chars": { "type": "number", "description": "Max chars per message (read_messages)." },
"include_system": { "type": "boolean" },
"include_tool": { "type": "boolean" },
"include_tool_calls": { "type": "boolean" },
"include_image_urls": { "type": "boolean" },
"include_summary": { "type": "boolean", "description": "Include cached conversation summary when available (read_compressed_cache)." },
"mode": { "type": "string", "enum": ["title", "tail_messages"] },
"max_sessions": { "type": "number" },
"tail_messages": { "type": "number" },
"case_sensitive": { "type": "boolean" },
"max_matches": { "type": "number" }
},
"required": ["action"]
})
}
async fn execute(&self, args: serde_json::Value) -> Result<ToolResult, ToolError> {
self.execute_with_context(args, ToolExecutionContext::none("tool_call"))
.await
}
async fn execute_with_context(
&self,
args: serde_json::Value,
ctx: ToolExecutionContext<'_>,
) -> Result<ToolResult, ToolError> {
let _caller_session_id = ctx.session_id.ok_or_else(|| {
ToolError::Execution("recall requires a session_id in tool context".to_string())
})?;
let parsed: SessionInspectorArgs = serde_json::from_value(args)
.map_err(|e| ToolError::InvalidArguments(format!("Invalid recall args: {e}")))?;
match parsed {
SessionInspectorArgs::List {
query,
kind,
pinned,
parent_session_id,
root_session_id,
created_by_schedule_id,
limit,
offset,
} => {
let limit = limit.unwrap_or(50).min(200);
let offset = offset.unwrap_or(0).min(10_000);
let mut items = self.session_store.list_index_entries().await;
let query = query.as_ref().map(|v| v.trim()).filter(|v| !v.is_empty());
let kind = kind.as_ref().map(|v| v.trim().to_ascii_lowercase());
let parent_session_id = parent_session_id
.as_ref()
.map(|v| v.trim())
.filter(|v| !v.is_empty());
let root_session_id = root_session_id
.as_ref()
.map(|v| v.trim())
.filter(|v| !v.is_empty());
let created_by_schedule_id = created_by_schedule_id
.as_ref()
.map(|v| v.trim())
.filter(|v| !v.is_empty());
items.retain(|e| {
if let Some(q) = query {
if !normalize_contains(&e.title, q, false)
&& !normalize_contains(&e.id, q, false)
{
return false;
}
}
if let Some(ref k) = kind {
match k.as_str() {
"root" if e.kind != SessionKind::Root => return false,
"child" if e.kind != SessionKind::Child => return false,
_ => {}
}
}
if let Some(p) = pinned {
if e.pinned != p {
return false;
}
}
if let Some(pid) = parent_session_id {
if e.parent_session_id.as_deref() != Some(pid) {
return false;
}
}
if let Some(rid) = root_session_id {
if e.root_session_id != rid {
return false;
}
}
if let Some(sid) = created_by_schedule_id {
if e.created_by_schedule_id.as_deref() != Some(sid) {
return false;
}
}
true
});
let total = items.len();
let page = items
.into_iter()
.skip(offset)
.take(limit)
.map(|e| map_index_entry(&e))
.collect::<Vec<_>>();
Ok(ToolResult {
success: true,
result: json!({
"total": total,
"offset": offset,
"limit": limit,
"sessions": page,
"note": "Use get_meta/read_messages with a small limit. Keep inspection local unless the user explicitly asks for delegated sub-session work."
})
.to_string(),
display_preference: Some("Collapsible".to_string()),
})
}
SessionInspectorArgs::GetMeta { session_id } => {
let session_id = session_id.trim().to_string();
if session_id.is_empty() {
return Err(ToolError::InvalidArguments(
"session_id must be a non-empty string".to_string(),
));
}
let Some(entry) = self.session_store.get_index_entry(&session_id).await else {
return Err(ToolError::Execution(format!(
"session not found: {session_id}"
)));
};
Ok(ToolResult {
success: true,
result: json!({ "session": map_index_entry(&entry) }).to_string(),
display_preference: Some("Collapsible".to_string()),
})
}
SessionInspectorArgs::ReadMessages {
session_id,
from_end,
offset,
limit,
truncate_chars,
include_system,
include_tool,
include_tool_calls,
include_image_urls,
} => {
let session_id = session_id.trim().to_string();
if session_id.is_empty() {
return Err(ToolError::InvalidArguments(
"session_id must be a non-empty string".to_string(),
));
}
let from_end = from_end.unwrap_or(true);
let offset = offset.unwrap_or(0).min(50_000);
let limit = limit.unwrap_or(40).min(200);
let truncate_chars = truncate_chars.unwrap_or(800).min(4000);
let include_system = include_system.unwrap_or(false);
let include_tool = include_tool.unwrap_or(true);
let include_tool_calls = include_tool_calls.unwrap_or(false);
let include_image_urls = include_image_urls.unwrap_or(true);
let session = self.load_session(&session_id).await?;
let total = session.messages.len();
let mut messages: Vec<(usize, &Message)> = session
.messages
.iter()
.enumerate()
.filter(|(_, m)| {
if !include_system && matches!(m.role, Role::System) {
return false;
}
if !include_tool && matches!(m.role, Role::Tool) {
return false;
}
true
})
.collect();
let filtered_total = messages.len();
let (start, end) = if from_end {
let end = filtered_total.saturating_sub(offset);
let start = end.saturating_sub(limit);
(start, end)
} else {
let start = offset.min(filtered_total);
let end = (start + limit).min(filtered_total);
(start, end)
};
let slice = messages
.drain(start..end)
.map(|(idx, m)| {
let tool_calls_count = m.tool_calls.as_ref().map(|v| v.len()).unwrap_or(0);
let image_urls = if include_image_urls {
extract_image_urls(m)
} else {
Vec::new()
};
json!({
"index": idx,
"id": m.id,
"role": role_to_str(&m.role),
"created_at": m.created_at,
"content_len": m.content.len(),
"content": truncate_string(&m.content, truncate_chars),
"has_images": !image_urls.is_empty(),
"image_urls": image_urls,
"tool_calls_count": tool_calls_count,
"tool_call_id": if include_tool_calls { m.tool_call_id.clone() } else { None },
})
})
.collect::<Vec<_>>();
Ok(ToolResult {
success: true,
result: json!({
"session_id": session_id,
"message_count_total": total,
"message_count_filtered": filtered_total,
"from_end": from_end,
"offset": offset,
"limit": limit,
"slice_count": slice.len(),
"messages": slice,
"note": "If you need to read a lot of content, iterate with bounded read_messages calls. Only delegate to a child session if the user explicitly asks."
})
.to_string(),
display_preference: Some("Collapsible".to_string()),
})
}
SessionInspectorArgs::ReadCompressedCache {
session_id,
offset,
limit,
truncate_chars,
include_summary,
} => {
let session_id = session_id.trim().to_string();
if session_id.is_empty() {
return Err(ToolError::InvalidArguments(
"session_id must be a non-empty string".to_string(),
));
}
let offset = offset.unwrap_or(0).min(1_000_000);
let limit = limit.unwrap_or(40).min(200);
let truncate_chars = truncate_chars.unwrap_or(1200).min(20_000);
let include_summary = include_summary.unwrap_or(true);
let sqlite_snapshot = self
.session_store
.search_index()
.read_compressed_cache(&session_id, offset, limit, truncate_chars)
.await;
let (source, summary, total_compressed, messages) = match sqlite_snapshot {
Ok(snapshot) if snapshot.total_compressed_messages > 0 => (
"sqlite_fts",
if include_summary {
snapshot.summary
} else {
None
},
snapshot.total_compressed_messages,
snapshot
.messages
.into_iter()
.map(|row| {
json!({
"id": row.message_id,
"index": row.message_index,
"role": row.role,
"created_at": row.created_at,
"content_len": row.content_len,
"content": row.content,
})
})
.collect::<Vec<_>>(),
),
Ok(_) | Err(_) => {
let session = self.load_session(&session_id).await?;
let summary = if include_summary {
session
.conversation_summary
.as_ref()
.map(|value| value.content.clone())
} else {
None
};
let compressed_messages = session
.messages
.iter()
.enumerate()
.filter(|(_, message)| message.compressed)
.collect::<Vec<_>>();
let total = compressed_messages.len();
let slice = compressed_messages
.into_iter()
.skip(offset)
.take(limit)
.map(|(index, message)| {
json!({
"id": message.id,
"index": index,
"role": role_to_str(&message.role),
"created_at": message.created_at,
"content_len": message.content.chars().count(),
"content": truncate_string(&message.content, truncate_chars),
})
})
.collect::<Vec<_>>();
("session_json_fallback", summary, total, slice)
}
};
Ok(ToolResult {
success: true,
result: json!({
"session_id": session_id,
"source": source,
"offset": offset,
"limit": limit,
"slice_count": messages.len(),
"total_compressed_messages": total_compressed,
"summary": summary,
"messages": messages,
"note": "Use this for bounded recall from compressed history. Prioritize current task list and recent turns when conflicts appear."
})
.to_string(),
display_preference: Some("Collapsible".to_string()),
})
}
SessionInspectorArgs::Search {
query,
mode,
max_sessions,
tail_messages,
case_sensitive,
max_matches,
} => {
let q = query.trim();
if q.is_empty() {
return Err(ToolError::InvalidArguments(
"query must be a non-empty string".to_string(),
));
}
let case_sensitive = case_sensitive.unwrap_or(false);
let mode = mode
.as_deref()
.map(str::trim)
.filter(|v| !v.is_empty())
.unwrap_or("title")
.to_ascii_lowercase();
let max_matches = max_matches.unwrap_or(50).min(200);
if !case_sensitive {
match self
.session_store
.search_index()
.search(q, max_matches)
.await
{
Ok(fts_matches) if !fts_matches.is_empty() => {
let matches = fts_matches
.into_iter()
.map(|m| {
json!({
"type": if m.match_type == "session" { "title_match" } else { "message_match" },
"session_id": m.session_id,
"session_title": m.session_title,
"session_kind": m.session_kind,
"root_session_id": m.root_session_id,
"parent_session_id": m.parent_session_id,
"pinned": m.pinned,
"updated_at": m.updated_at,
"rank": m.rank,
"message_id": m.message_id,
"message_index": m.message_index,
"role": m.role,
"content_preview": m.content_preview,
})
})
.collect::<Vec<_>>();
return Ok(ToolResult {
success: true,
result: json!({
"query": q,
"mode": mode,
"case_sensitive": case_sensitive,
"search_backend": "sqlite_fts",
"matches": matches,
"note": "Results came from the local SQLite FTS session search index. Use read_messages for bounded inspection of matched sessions."
})
.to_string(),
display_preference: Some("Collapsible".to_string()),
});
}
Ok(_) => {}
Err(error) => {
tracing::warn!(
"recall FTS search failed for query '{}': {}. Falling back to in-memory scan.",
q,
error
);
}
}
}
let entries = self.session_store.list_index_entries().await;
let mut results = Vec::new();
for e in entries.iter() {
if normalize_contains(&e.title, q, case_sensitive)
|| normalize_contains(&e.id, q, case_sensitive)
{
results.push(json!({
"type": "title_match",
"session": map_index_entry(e),
}));
if results.len() >= max_matches {
break;
}
}
}
if mode != "title" && results.len() < max_matches {
let max_sessions = max_sessions.unwrap_or(30).min(200);
let tail_messages = tail_messages.unwrap_or(40).min(200);
for e in entries.into_iter().take(max_sessions) {
if results.len() >= max_matches {
break;
}
let Ok(session) = self.storage.load_session(&e.id).await else {
continue;
};
let Some(session) = session else {
continue;
};
let start = session.messages.len().saturating_sub(tail_messages);
for (idx, m) in session.messages.iter().enumerate().skip(start) {
if results.len() >= max_matches {
break;
}
if !normalize_contains(&m.content, q, case_sensitive) {
continue;
}
results.push(json!({
"type": "message_match",
"session_id": e.id,
"session_title": e.title,
"message_index": idx,
"message_id": m.id,
"role": role_to_str(&m.role),
"created_at": m.created_at,
"content_preview": truncate_string(&m.content, 240),
}));
}
}
}
Ok(ToolResult {
success: true,
result: json!({
"query": q,
"mode": mode,
"case_sensitive": case_sensitive,
"matches": results,
"note": "Consider narrowing by session_id + read_messages. Keep summarization local unless the user explicitly asks for delegated child-session work."
})
.to_string(),
display_preference: Some("Collapsible".to_string()),
})
}
}
}
}