1use async_trait::async_trait;
2use serde::Deserialize;
3use serde_json::json;
4use std::sync::Arc;
5
6use bamboo_agent_core::storage::Storage;
7use bamboo_agent_core::tools::{Tool, ToolError, ToolExecutionContext, ToolResult};
8use bamboo_agent_core::MessagePart;
9use bamboo_agent_core::{Message, Role, SessionKind};
10use bamboo_infrastructure::{SessionIndexEntry, SessionStoreV2};
11
12pub struct SessionInspectorTool {
20 session_store: Arc<SessionStoreV2>,
21 storage: Arc<dyn Storage>,
22}
23
24impl SessionInspectorTool {
25 pub fn new(session_store: Arc<SessionStoreV2>, storage: Arc<dyn Storage>) -> Self {
26 Self {
27 session_store,
28 storage,
29 }
30 }
31
32 async fn load_session(
33 &self,
34 session_id: &str,
35 ) -> Result<bamboo_agent_core::Session, ToolError> {
36 match self.storage.load_session(session_id).await {
37 Ok(Some(s)) => Ok(s),
38 Ok(None) => Err(ToolError::Execution(format!(
39 "session not found: {session_id}"
40 ))),
41 Err(e) => Err(ToolError::Execution(format!(
42 "failed to load session {session_id}: {e}"
43 ))),
44 }
45 }
46}
47
48#[derive(Debug, Deserialize)]
49#[serde(tag = "action", rename_all = "snake_case")]
50enum SessionInspectorArgs {
51 List {
53 #[serde(default)]
54 query: Option<String>,
55 #[serde(default)]
56 kind: Option<String>, #[serde(default)]
58 pinned: Option<bool>,
59 #[serde(default)]
60 parent_session_id: Option<String>,
61 #[serde(default)]
62 root_session_id: Option<String>,
63 #[serde(default)]
64 created_by_schedule_id: Option<String>,
65 #[serde(default)]
66 limit: Option<usize>,
67 #[serde(default)]
68 offset: Option<usize>,
69 },
70
71 GetMeta { session_id: String },
73
74 ReadMessages {
76 session_id: String,
77 #[serde(default)]
79 from_end: Option<bool>,
80 #[serde(default)]
82 offset: Option<usize>,
83 #[serde(default)]
85 limit: Option<usize>,
86 #[serde(default)]
88 truncate_chars: Option<usize>,
89 #[serde(default)]
91 include_system: Option<bool>,
92 #[serde(default)]
94 include_tool: Option<bool>,
95 #[serde(default)]
97 include_tool_calls: Option<bool>,
98 #[serde(default)]
100 include_image_urls: Option<bool>,
101 },
102
103 ReadCompressedCache {
105 session_id: String,
106 #[serde(default)]
107 offset: Option<usize>,
108 #[serde(default)]
109 limit: Option<usize>,
110 #[serde(default)]
111 truncate_chars: Option<usize>,
112 #[serde(default)]
113 include_summary: Option<bool>,
114 },
115
116 Search {
118 query: String,
119 #[serde(default)]
123 mode: Option<String>,
124 #[serde(default)]
126 max_sessions: Option<usize>,
127 #[serde(default)]
129 tail_messages: Option<usize>,
130 #[serde(default)]
132 case_sensitive: Option<bool>,
133 #[serde(default)]
135 max_matches: Option<usize>,
136 },
137}
138
139fn normalize_contains(haystack: &str, needle: &str, case_sensitive: bool) -> bool {
140 if case_sensitive {
141 haystack.contains(needle)
142 } else {
143 haystack
144 .to_ascii_lowercase()
145 .contains(&needle.to_ascii_lowercase())
146 }
147}
148
149fn truncate_string(s: &str, max_chars: usize) -> String {
150 if max_chars == 0 {
151 return String::new();
152 }
153 if s.chars().count() <= max_chars {
154 return s.to_string();
155 }
156 let mut out = String::with_capacity(max_chars + 3);
157 for (i, ch) in s.chars().enumerate() {
158 if i >= max_chars {
159 break;
160 }
161 out.push(ch);
162 }
163 out.push_str("...");
164 out
165}
166
167fn map_index_entry(e: &SessionIndexEntry) -> serde_json::Value {
168 json!({
169 "id": e.id,
170 "kind": e.kind,
171 "title": e.title,
172 "pinned": e.pinned,
173 "parent_session_id": e.parent_session_id,
174 "root_session_id": e.root_session_id,
175 "spawn_depth": e.spawn_depth,
176 "created_by_schedule_id": e.created_by_schedule_id,
177 "schedule_run_id": e.schedule_run_id,
178 "created_at": e.created_at,
179 "updated_at": e.updated_at,
180 "last_activity_at": e.last_activity_at,
181 "message_count": e.message_count,
182 "has_attachments": e.has_attachments,
183 "token_usage": e.token_usage,
184 "rel_path": e.rel_path,
186 })
187}
188
189fn extract_image_urls(msg: &Message) -> Vec<String> {
190 let mut out = Vec::new();
191 let Some(parts) = msg.content_parts.as_ref() else {
192 return out;
193 };
194 for p in parts {
195 if let MessagePart::ImageUrl { image_url } = p {
196 out.push(image_url.url.clone());
197 }
198 }
199 out
200}
201
202fn role_to_str(role: &Role) -> &'static str {
203 match role {
204 Role::System => "system",
205 Role::User => "user",
206 Role::Assistant => "assistant",
207 Role::Tool => "tool",
208 }
209}
210
211#[async_trait]
212impl Tool for SessionInspectorTool {
213 fn name(&self) -> &str {
214 "session_history"
215 }
216
217 fn description(&self) -> &str {
218 "Read-only viewer over the local SQLite session history. Use this to list prior sessions, inspect metadata, read bounded message slices, read the compressed conversation cache, and full-text search prior conversation history before asking the user to repeat information. This is purely a read tool — it has no runtime control and cannot influence live sessions. Distinct from the `memory` tool, which manages durable cross-session knowledge."
219 }
220
221 fn parameters_schema(&self) -> serde_json::Value {
222 json!({
224 "type": "object",
225 "properties": {
226 "action": {
227 "type": "string",
228 "enum": ["list", "get_meta", "read_messages", "read_compressed_cache", "search"],
229 "description": "Which inspection action to perform."
230 },
231 "query": { "type": "string", "description": "Search string (list/search)." },
232 "kind": { "type": "string", "enum": ["root", "child"], "description": "Filter by session kind (list)." },
233 "pinned": { "type": "boolean", "description": "Filter pinned sessions (list)." },
234 "parent_session_id": { "type": "string", "description": "Filter child sessions by parent (list)." },
235 "root_session_id": { "type": "string", "description": "Filter by root session (list)." },
236 "created_by_schedule_id": { "type": "string", "description": "Filter sessions created by a schedule (list)." },
237 "limit": { "type": "number", "description": "Max items/messages to return (list/read_messages)." },
238 "offset": { "type": "number", "description": "Offset (list/read_messages)." },
239 "session_id": { "type": "string", "description": "Target session id (get_meta/read_messages)." },
240 "from_end": { "type": "boolean", "description": "Read from end (read_messages)." },
241 "truncate_chars": { "type": "number", "description": "Max chars per message (read_messages)." },
242 "include_system": { "type": "boolean" },
243 "include_tool": { "type": "boolean" },
244 "include_tool_calls": { "type": "boolean" },
245 "include_image_urls": { "type": "boolean" },
246 "include_summary": { "type": "boolean", "description": "Include cached conversation summary when available (read_compressed_cache)." },
247 "mode": { "type": "string", "enum": ["title", "tail_messages"] },
248 "max_sessions": { "type": "number" },
249 "tail_messages": { "type": "number" },
250 "case_sensitive": { "type": "boolean" },
251 "max_matches": { "type": "number" }
252 },
253 "required": ["action"]
254 })
255 }
256
257 async fn execute(&self, args: serde_json::Value) -> Result<ToolResult, ToolError> {
258 self.execute_with_context(args, ToolExecutionContext::none("tool_call"))
259 .await
260 }
261
262 async fn execute_with_context(
263 &self,
264 args: serde_json::Value,
265 ctx: ToolExecutionContext<'_>,
266 ) -> Result<ToolResult, ToolError> {
267 let _caller_session_id = ctx.session_id.ok_or_else(|| {
268 ToolError::Execution(
269 "session_history requires a session_id in tool context".to_string(),
270 )
271 })?;
272
273 let parsed: SessionInspectorArgs = serde_json::from_value(args).map_err(|e| {
274 ToolError::InvalidArguments(format!("Invalid session_history args: {e}"))
275 })?;
276
277 match parsed {
278 SessionInspectorArgs::List {
279 query,
280 kind,
281 pinned,
282 parent_session_id,
283 root_session_id,
284 created_by_schedule_id,
285 limit,
286 offset,
287 } => {
288 let limit = limit.unwrap_or(50).min(200);
289 let offset = offset.unwrap_or(0).min(10_000);
290
291 let mut items = self.session_store.list_index_entries().await;
292
293 let query = query.as_ref().map(|v| v.trim()).filter(|v| !v.is_empty());
294 let kind = kind.as_ref().map(|v| v.trim().to_ascii_lowercase());
295 let parent_session_id = parent_session_id
296 .as_ref()
297 .map(|v| v.trim())
298 .filter(|v| !v.is_empty());
299 let root_session_id = root_session_id
300 .as_ref()
301 .map(|v| v.trim())
302 .filter(|v| !v.is_empty());
303 let created_by_schedule_id = created_by_schedule_id
304 .as_ref()
305 .map(|v| v.trim())
306 .filter(|v| !v.is_empty());
307
308 items.retain(|e| {
309 if let Some(q) = query {
310 if !normalize_contains(&e.title, q, false)
311 && !normalize_contains(&e.id, q, false)
312 {
313 return false;
314 }
315 }
316 if let Some(ref k) = kind {
317 match k.as_str() {
318 "root" if e.kind != SessionKind::Root => return false,
319 "child" if e.kind != SessionKind::Child => return false,
320 _ => {}
321 }
322 }
323 if let Some(p) = pinned {
324 if e.pinned != p {
325 return false;
326 }
327 }
328 if let Some(pid) = parent_session_id {
329 if e.parent_session_id.as_deref() != Some(pid) {
330 return false;
331 }
332 }
333 if let Some(rid) = root_session_id {
334 if e.root_session_id != rid {
335 return false;
336 }
337 }
338 if let Some(sid) = created_by_schedule_id {
339 if e.created_by_schedule_id.as_deref() != Some(sid) {
340 return false;
341 }
342 }
343 true
344 });
345
346 let total = items.len();
347 let page = items
348 .into_iter()
349 .skip(offset)
350 .take(limit)
351 .map(|e| map_index_entry(&e))
352 .collect::<Vec<_>>();
353
354 Ok(ToolResult {
355 success: true,
356 result: json!({
357 "total": total,
358 "offset": offset,
359 "limit": limit,
360 "sessions": page,
361 "note": "Use get_meta/read_messages with a small limit. Keep inspection local unless the user explicitly asks for delegated sub-session work."
362 })
363 .to_string(),
364 display_preference: Some("Collapsible".to_string()),
365 })
366 }
367
368 SessionInspectorArgs::GetMeta { session_id } => {
369 let session_id = session_id.trim().to_string();
370 if session_id.is_empty() {
371 return Err(ToolError::InvalidArguments(
372 "session_id must be a non-empty string".to_string(),
373 ));
374 }
375
376 let Some(entry) = self.session_store.get_index_entry(&session_id).await else {
377 return Err(ToolError::Execution(format!(
378 "session not found: {session_id}"
379 )));
380 };
381
382 Ok(ToolResult {
383 success: true,
384 result: json!({ "session": map_index_entry(&entry) }).to_string(),
385 display_preference: Some("Collapsible".to_string()),
386 })
387 }
388
389 SessionInspectorArgs::ReadMessages {
390 session_id,
391 from_end,
392 offset,
393 limit,
394 truncate_chars,
395 include_system,
396 include_tool,
397 include_tool_calls,
398 include_image_urls,
399 } => {
400 let session_id = session_id.trim().to_string();
401 if session_id.is_empty() {
402 return Err(ToolError::InvalidArguments(
403 "session_id must be a non-empty string".to_string(),
404 ));
405 }
406
407 let from_end = from_end.unwrap_or(true);
408 let offset = offset.unwrap_or(0).min(50_000);
409 let limit = limit.unwrap_or(40).min(200);
410 let truncate_chars = truncate_chars.unwrap_or(800).min(4000);
411 let include_system = include_system.unwrap_or(false);
412 let include_tool = include_tool.unwrap_or(true);
413 let include_tool_calls = include_tool_calls.unwrap_or(false);
414 let include_image_urls = include_image_urls.unwrap_or(true);
415
416 let session = self.load_session(&session_id).await?;
417 let total = session.messages.len();
418
419 let mut messages: Vec<(usize, &Message)> = session
420 .messages
421 .iter()
422 .enumerate()
423 .filter(|(_, m)| {
424 if !include_system && matches!(m.role, Role::System) {
425 return false;
426 }
427 if !include_tool && matches!(m.role, Role::Tool) {
428 return false;
429 }
430 true
431 })
432 .collect();
433
434 let filtered_total = messages.len();
436 let (start, end) = if from_end {
437 let end = filtered_total.saturating_sub(offset);
438 let start = end.saturating_sub(limit);
439 (start, end)
440 } else {
441 let start = offset.min(filtered_total);
442 let end = (start + limit).min(filtered_total);
443 (start, end)
444 };
445
446 let slice = messages
447 .drain(start..end)
448 .map(|(idx, m)| {
449 let tool_calls_count = m.tool_calls.as_ref().map(|v| v.len()).unwrap_or(0);
450 let image_urls = if include_image_urls {
451 extract_image_urls(m)
452 } else {
453 Vec::new()
454 };
455 json!({
456 "index": idx,
457 "id": m.id,
458 "role": role_to_str(&m.role),
459 "created_at": m.created_at,
460 "content_len": m.content.len(),
461 "content": truncate_string(&m.content, truncate_chars),
462 "has_images": !image_urls.is_empty(),
463 "image_urls": image_urls,
464 "tool_calls_count": tool_calls_count,
465 "tool_call_id": if include_tool_calls { m.tool_call_id.clone() } else { None },
466 })
467 })
468 .collect::<Vec<_>>();
469
470 Ok(ToolResult {
471 success: true,
472 result: json!({
473 "session_id": session_id,
474 "message_count_total": total,
475 "message_count_filtered": filtered_total,
476 "from_end": from_end,
477 "offset": offset,
478 "limit": limit,
479 "slice_count": slice.len(),
480 "messages": slice,
481 "note": "If you need to read a lot of content, iterate with bounded read_messages calls. Only delegate to a child session if the user explicitly asks."
482 })
483 .to_string(),
484 display_preference: Some("Collapsible".to_string()),
485 })
486 }
487
488 SessionInspectorArgs::ReadCompressedCache {
489 session_id,
490 offset,
491 limit,
492 truncate_chars,
493 include_summary,
494 } => {
495 let session_id = session_id.trim().to_string();
496 if session_id.is_empty() {
497 return Err(ToolError::InvalidArguments(
498 "session_id must be a non-empty string".to_string(),
499 ));
500 }
501
502 let offset = offset.unwrap_or(0).min(1_000_000);
503 let limit = limit.unwrap_or(40).min(200);
504 let truncate_chars = truncate_chars.unwrap_or(1200).min(20_000);
505 let include_summary = include_summary.unwrap_or(true);
506
507 let sqlite_snapshot = self
508 .session_store
509 .search_index()
510 .read_compressed_cache(&session_id, offset, limit, truncate_chars)
511 .await;
512
513 let (source, summary, total_compressed, messages) = match sqlite_snapshot {
514 Ok(snapshot) if snapshot.total_compressed_messages > 0 => (
515 "sqlite_fts",
516 if include_summary {
517 snapshot.summary
518 } else {
519 None
520 },
521 snapshot.total_compressed_messages,
522 snapshot
523 .messages
524 .into_iter()
525 .map(|row| {
526 json!({
527 "id": row.message_id,
528 "index": row.message_index,
529 "role": row.role,
530 "created_at": row.created_at,
531 "content_len": row.content_len,
532 "content": row.content,
533 })
534 })
535 .collect::<Vec<_>>(),
536 ),
537 Ok(_) | Err(_) => {
538 let session = self.load_session(&session_id).await?;
539 let summary = if include_summary {
540 session
541 .conversation_summary
542 .as_ref()
543 .map(|value| value.content.clone())
544 } else {
545 None
546 };
547 let compressed_messages = session
548 .messages
549 .iter()
550 .enumerate()
551 .filter(|(_, message)| message.compressed)
552 .collect::<Vec<_>>();
553 let total = compressed_messages.len();
554 let slice = compressed_messages
555 .into_iter()
556 .skip(offset)
557 .take(limit)
558 .map(|(index, message)| {
559 json!({
560 "id": message.id,
561 "index": index,
562 "role": role_to_str(&message.role),
563 "created_at": message.created_at,
564 "content_len": message.content.chars().count(),
565 "content": truncate_string(&message.content, truncate_chars),
566 })
567 })
568 .collect::<Vec<_>>();
569 ("session_json_fallback", summary, total, slice)
570 }
571 };
572
573 Ok(ToolResult {
574 success: true,
575 result: json!({
576 "session_id": session_id,
577 "source": source,
578 "offset": offset,
579 "limit": limit,
580 "slice_count": messages.len(),
581 "total_compressed_messages": total_compressed,
582 "summary": summary,
583 "messages": messages,
584 "note": "Use this for bounded recall from compressed history. Prioritize current task list and recent turns when conflicts appear."
585 })
586 .to_string(),
587 display_preference: Some("Collapsible".to_string()),
588 })
589 }
590
591 SessionInspectorArgs::Search {
592 query,
593 mode,
594 max_sessions,
595 tail_messages,
596 case_sensitive,
597 max_matches,
598 } => {
599 let q = query.trim();
600 if q.is_empty() {
601 return Err(ToolError::InvalidArguments(
602 "query must be a non-empty string".to_string(),
603 ));
604 }
605 let case_sensitive = case_sensitive.unwrap_or(false);
606 let mode = mode
607 .as_deref()
608 .map(str::trim)
609 .filter(|v| !v.is_empty())
610 .unwrap_or("title")
611 .to_ascii_lowercase();
612 let max_matches = max_matches.unwrap_or(50).min(200);
613
614 if !case_sensitive {
615 match self
616 .session_store
617 .search_index()
618 .search(q, max_matches)
619 .await
620 {
621 Ok(fts_matches) if !fts_matches.is_empty() => {
622 let matches = fts_matches
623 .into_iter()
624 .map(|m| {
625 json!({
626 "type": if m.match_type == "session" { "title_match" } else { "message_match" },
627 "session_id": m.session_id,
628 "session_title": m.session_title,
629 "session_kind": m.session_kind,
630 "root_session_id": m.root_session_id,
631 "parent_session_id": m.parent_session_id,
632 "pinned": m.pinned,
633 "updated_at": m.updated_at,
634 "rank": m.rank,
635 "message_id": m.message_id,
636 "message_index": m.message_index,
637 "role": m.role,
638 "content_preview": m.content_preview,
639 })
640 })
641 .collect::<Vec<_>>();
642
643 return Ok(ToolResult {
644 success: true,
645 result: json!({
646 "query": q,
647 "mode": mode,
648 "case_sensitive": case_sensitive,
649 "search_backend": "sqlite_fts",
650 "matches": matches,
651 "note": "Results came from the local SQLite FTS session search index. Use read_messages for bounded inspection of matched sessions."
652 })
653 .to_string(),
654 display_preference: Some("Collapsible".to_string()),
655 });
656 }
657 Ok(_) => {}
658 Err(error) => {
659 tracing::warn!(
660 "session_history FTS search failed for query '{}': {}. Falling back to in-memory scan.",
661 q,
662 error
663 );
664 }
665 }
666 }
667
668 let entries = self.session_store.list_index_entries().await;
669 let mut results = Vec::new();
670
671 for e in entries.iter() {
673 if normalize_contains(&e.title, q, case_sensitive)
674 || normalize_contains(&e.id, q, case_sensitive)
675 {
676 results.push(json!({
677 "type": "title_match",
678 "session": map_index_entry(e),
679 }));
680 if results.len() >= max_matches {
681 break;
682 }
683 }
684 }
685
686 if mode != "title" && results.len() < max_matches {
687 let max_sessions = max_sessions.unwrap_or(30).min(200);
688 let tail_messages = tail_messages.unwrap_or(40).min(200);
689
690 for e in entries.into_iter().take(max_sessions) {
692 if results.len() >= max_matches {
693 break;
694 }
695 let Ok(session) = self.storage.load_session(&e.id).await else {
696 continue;
697 };
698 let Some(session) = session else {
699 continue;
700 };
701 let start = session.messages.len().saturating_sub(tail_messages);
702 for (idx, m) in session.messages.iter().enumerate().skip(start) {
703 if results.len() >= max_matches {
704 break;
705 }
706 if !normalize_contains(&m.content, q, case_sensitive) {
707 continue;
708 }
709 results.push(json!({
710 "type": "message_match",
711 "session_id": e.id,
712 "session_title": e.title,
713 "message_index": idx,
714 "message_id": m.id,
715 "role": role_to_str(&m.role),
716 "created_at": m.created_at,
717 "content_preview": truncate_string(&m.content, 240),
718 }));
719 }
720 }
721 }
722
723 Ok(ToolResult {
724 success: true,
725 result: json!({
726 "query": q,
727 "mode": mode,
728 "case_sensitive": case_sensitive,
729 "matches": results,
730 "note": "Consider narrowing by session_id + read_messages. Keep summarization local unless the user explicitly asks for delegated child-session work."
731 })
732 .to_string(),
733 display_preference: Some("Collapsible".to_string()),
734 })
735 }
736 }
737 }
738}