Skip to main content

bamboo_server/server_tools/
session_inspector.rs

1use async_trait::async_trait;
2use serde::Deserialize;
3use serde_json::json;
4use std::sync::Arc;
5
6use bamboo_agent_core::storage::Storage;
7use bamboo_agent_core::tools::{Tool, ToolError, ToolExecutionContext, ToolResult};
8use bamboo_agent_core::MessagePart;
9use bamboo_agent_core::{Message, Role, SessionKind};
10use bamboo_infrastructure::{SessionIndexEntry, SessionStoreV2};
11
12/// Server-only tool for inspecting V2 sessions stored under the Bamboo home dir.
13///
14/// Design goals:
15/// - Return metadata first (index-backed) so the model can narrow scope.
16/// - Allow bounded reads (pagination; from end; truncation).
17/// - Support lightweight search across session titles and (optionally) tail messages.
18/// - Keep inspection local by default; use child-session delegation only if the user explicitly asks.
19pub struct SessionInspectorTool {
20    session_store: Arc<SessionStoreV2>,
21    storage: Arc<dyn Storage>,
22}
23
24impl SessionInspectorTool {
25    pub fn new(session_store: Arc<SessionStoreV2>, storage: Arc<dyn Storage>) -> Self {
26        Self {
27            session_store,
28            storage,
29        }
30    }
31
32    async fn load_session(
33        &self,
34        session_id: &str,
35    ) -> Result<bamboo_agent_core::Session, ToolError> {
36        match self.storage.load_session(session_id).await {
37            Ok(Some(s)) => Ok(s),
38            Ok(None) => Err(ToolError::Execution(format!(
39                "session not found: {session_id}"
40            ))),
41            Err(e) => Err(ToolError::Execution(format!(
42                "failed to load session {session_id}: {e}"
43            ))),
44        }
45    }
46}
47
48#[derive(Debug, Deserialize)]
49#[serde(tag = "action", rename_all = "snake_case")]
50enum SessionInspectorArgs {
51    /// List sessions from the global index, with filtering/pagination.
52    List {
53        #[serde(default)]
54        query: Option<String>,
55        #[serde(default)]
56        kind: Option<String>, // "root" | "child"
57        #[serde(default)]
58        pinned: Option<bool>,
59        #[serde(default)]
60        parent_session_id: Option<String>,
61        #[serde(default)]
62        root_session_id: Option<String>,
63        #[serde(default)]
64        created_by_schedule_id: Option<String>,
65        #[serde(default)]
66        limit: Option<usize>,
67        #[serde(default)]
68        offset: Option<usize>,
69    },
70
71    /// Get a single session's index metadata (fast).
72    GetMeta { session_id: String },
73
74    /// Read message slice from a session, bounded by limit/offset and content truncation.
75    ReadMessages {
76        session_id: String,
77        /// Read from end of the conversation (default true).
78        #[serde(default)]
79        from_end: Option<bool>,
80        /// Offset from the chosen side (0 = start/end depending on from_end).
81        #[serde(default)]
82        offset: Option<usize>,
83        /// Number of messages to return.
84        #[serde(default)]
85        limit: Option<usize>,
86        /// Truncate each message content to at most this many chars.
87        #[serde(default)]
88        truncate_chars: Option<usize>,
89        /// Include System messages (default false).
90        #[serde(default)]
91        include_system: Option<bool>,
92        /// Include Tool role messages (default true).
93        #[serde(default)]
94        include_tool: Option<bool>,
95        /// Include Assistant tool_calls and tool_call_id fields (default false).
96        #[serde(default)]
97        include_tool_calls: Option<bool>,
98        /// Include image URLs found in content_parts (default true).
99        #[serde(default)]
100        include_image_urls: Option<bool>,
101    },
102
103    /// Read compressed historical context from local SQLite index cache.
104    ReadCompressedCache {
105        session_id: String,
106        #[serde(default)]
107        offset: Option<usize>,
108        #[serde(default)]
109        limit: Option<usize>,
110        #[serde(default)]
111        truncate_chars: Option<usize>,
112        #[serde(default)]
113        include_summary: Option<bool>,
114    },
115
116    /// Search session titles and (optionally) tail messages.
117    Search {
118        query: String,
119        /// Search mode.
120        /// - "title" (default): search in titles only
121        /// - "tail_messages": also scan the last `tail_messages` messages of each candidate
122        #[serde(default)]
123        mode: Option<String>,
124        /// Max sessions to scan when mode != "title".
125        #[serde(default)]
126        max_sessions: Option<usize>,
127        /// Tail message count per session when scanning content.
128        #[serde(default)]
129        tail_messages: Option<usize>,
130        /// Case-sensitive search (default false).
131        #[serde(default)]
132        case_sensitive: Option<bool>,
133        /// Max matches to return.
134        #[serde(default)]
135        max_matches: Option<usize>,
136    },
137}
138
139fn normalize_contains(haystack: &str, needle: &str, case_sensitive: bool) -> bool {
140    if case_sensitive {
141        haystack.contains(needle)
142    } else {
143        haystack
144            .to_ascii_lowercase()
145            .contains(&needle.to_ascii_lowercase())
146    }
147}
148
149fn truncate_string(s: &str, max_chars: usize) -> String {
150    if max_chars == 0 {
151        return String::new();
152    }
153    if s.chars().count() <= max_chars {
154        return s.to_string();
155    }
156    let mut out = String::with_capacity(max_chars + 3);
157    for (i, ch) in s.chars().enumerate() {
158        if i >= max_chars {
159            break;
160        }
161        out.push(ch);
162    }
163    out.push_str("...");
164    out
165}
166
167fn map_index_entry(e: &SessionIndexEntry) -> serde_json::Value {
168    json!({
169        "id": e.id,
170        "kind": e.kind,
171        "title": e.title,
172        "pinned": e.pinned,
173        "parent_session_id": e.parent_session_id,
174        "root_session_id": e.root_session_id,
175        "spawn_depth": e.spawn_depth,
176        "created_by_schedule_id": e.created_by_schedule_id,
177        "schedule_run_id": e.schedule_run_id,
178        "created_at": e.created_at,
179        "updated_at": e.updated_at,
180        "last_activity_at": e.last_activity_at,
181        "message_count": e.message_count,
182        "has_attachments": e.has_attachments,
183        "token_usage": e.token_usage,
184        // Expose rel_path so advanced workflows can inspect the Bamboo data dir directly if needed.
185        "rel_path": e.rel_path,
186    })
187}
188
189fn extract_image_urls(msg: &Message) -> Vec<String> {
190    let mut out = Vec::new();
191    let Some(parts) = msg.content_parts.as_ref() else {
192        return out;
193    };
194    for p in parts {
195        if let MessagePart::ImageUrl { image_url } = p {
196            out.push(image_url.url.clone());
197        }
198    }
199    out
200}
201
202fn role_to_str(role: &Role) -> &'static str {
203    match role {
204        Role::System => "system",
205        Role::User => "user",
206        Role::Assistant => "assistant",
207        Role::Tool => "tool",
208    }
209}
210
211#[async_trait]
212impl Tool for SessionInspectorTool {
213    fn name(&self) -> &str {
214        "session_history"
215    }
216
217    fn description(&self) -> &str {
218        "Read-only viewer over the local SQLite session history. Use this to list prior sessions, inspect metadata, read bounded message slices, read the compressed conversation cache, and full-text search prior conversation history before asking the user to repeat information. This is purely a read tool — it has no runtime control and cannot influence live sessions. Distinct from the `memory` tool, which manages durable cross-session knowledge."
219    }
220
221    fn parameters_schema(&self) -> serde_json::Value {
222        // Keep schema permissive; Rust parsing enforces action-specific requirements.
223        json!({
224            "type": "object",
225            "properties": {
226                "action": {
227                    "type": "string",
228                    "enum": ["list", "get_meta", "read_messages", "read_compressed_cache", "search"],
229                    "description": "Which inspection action to perform."
230                },
231                "query": { "type": "string", "description": "Search string (list/search)." },
232                "kind": { "type": "string", "enum": ["root", "child"], "description": "Filter by session kind (list)." },
233                "pinned": { "type": "boolean", "description": "Filter pinned sessions (list)." },
234                "parent_session_id": { "type": "string", "description": "Filter child sessions by parent (list)." },
235                "root_session_id": { "type": "string", "description": "Filter by root session (list)." },
236                "created_by_schedule_id": { "type": "string", "description": "Filter sessions created by a schedule (list)." },
237                "limit": { "type": "number", "description": "Max items/messages to return (list/read_messages)." },
238                "offset": { "type": "number", "description": "Offset (list/read_messages)." },
239                "session_id": { "type": "string", "description": "Target session id (get_meta/read_messages)." },
240                "from_end": { "type": "boolean", "description": "Read from end (read_messages)." },
241                "truncate_chars": { "type": "number", "description": "Max chars per message (read_messages)." },
242                "include_system": { "type": "boolean" },
243                "include_tool": { "type": "boolean" },
244                "include_tool_calls": { "type": "boolean" },
245                "include_image_urls": { "type": "boolean" },
246                "include_summary": { "type": "boolean", "description": "Include cached conversation summary when available (read_compressed_cache)." },
247                "mode": { "type": "string", "enum": ["title", "tail_messages"] },
248                "max_sessions": { "type": "number" },
249                "tail_messages": { "type": "number" },
250                "case_sensitive": { "type": "boolean" },
251                "max_matches": { "type": "number" }
252            },
253            "required": ["action"]
254        })
255    }
256
257    async fn execute(&self, args: serde_json::Value) -> Result<ToolResult, ToolError> {
258        self.execute_with_context(args, ToolExecutionContext::none("tool_call"))
259            .await
260    }
261
262    async fn execute_with_context(
263        &self,
264        args: serde_json::Value,
265        ctx: ToolExecutionContext<'_>,
266    ) -> Result<ToolResult, ToolError> {
267        let _caller_session_id = ctx.session_id.ok_or_else(|| {
268            ToolError::Execution(
269                "session_history requires a session_id in tool context".to_string(),
270            )
271        })?;
272
273        let parsed: SessionInspectorArgs = serde_json::from_value(args).map_err(|e| {
274            ToolError::InvalidArguments(format!("Invalid session_history args: {e}"))
275        })?;
276
277        match parsed {
278            SessionInspectorArgs::List {
279                query,
280                kind,
281                pinned,
282                parent_session_id,
283                root_session_id,
284                created_by_schedule_id,
285                limit,
286                offset,
287            } => {
288                let limit = limit.unwrap_or(50).min(200);
289                let offset = offset.unwrap_or(0).min(10_000);
290
291                let mut items = self.session_store.list_index_entries().await;
292
293                let query = query.as_ref().map(|v| v.trim()).filter(|v| !v.is_empty());
294                let kind = kind.as_ref().map(|v| v.trim().to_ascii_lowercase());
295                let parent_session_id = parent_session_id
296                    .as_ref()
297                    .map(|v| v.trim())
298                    .filter(|v| !v.is_empty());
299                let root_session_id = root_session_id
300                    .as_ref()
301                    .map(|v| v.trim())
302                    .filter(|v| !v.is_empty());
303                let created_by_schedule_id = created_by_schedule_id
304                    .as_ref()
305                    .map(|v| v.trim())
306                    .filter(|v| !v.is_empty());
307
308                items.retain(|e| {
309                    if let Some(q) = query {
310                        if !normalize_contains(&e.title, q, false)
311                            && !normalize_contains(&e.id, q, false)
312                        {
313                            return false;
314                        }
315                    }
316                    if let Some(ref k) = kind {
317                        match k.as_str() {
318                            "root" if e.kind != SessionKind::Root => return false,
319                            "child" if e.kind != SessionKind::Child => return false,
320                            _ => {}
321                        }
322                    }
323                    if let Some(p) = pinned {
324                        if e.pinned != p {
325                            return false;
326                        }
327                    }
328                    if let Some(pid) = parent_session_id {
329                        if e.parent_session_id.as_deref() != Some(pid) {
330                            return false;
331                        }
332                    }
333                    if let Some(rid) = root_session_id {
334                        if e.root_session_id != rid {
335                            return false;
336                        }
337                    }
338                    if let Some(sid) = created_by_schedule_id {
339                        if e.created_by_schedule_id.as_deref() != Some(sid) {
340                            return false;
341                        }
342                    }
343                    true
344                });
345
346                let total = items.len();
347                let page = items
348                    .into_iter()
349                    .skip(offset)
350                    .take(limit)
351                    .map(|e| map_index_entry(&e))
352                    .collect::<Vec<_>>();
353
354                Ok(ToolResult {
355                    success: true,
356                    result: json!({
357                        "total": total,
358                        "offset": offset,
359                        "limit": limit,
360                        "sessions": page,
361                        "note": "Use get_meta/read_messages with a small limit. Keep inspection local unless the user explicitly asks for delegated sub-session work."
362                    })
363                    .to_string(),
364                    display_preference: Some("Collapsible".to_string()),
365                })
366            }
367
368            SessionInspectorArgs::GetMeta { session_id } => {
369                let session_id = session_id.trim().to_string();
370                if session_id.is_empty() {
371                    return Err(ToolError::InvalidArguments(
372                        "session_id must be a non-empty string".to_string(),
373                    ));
374                }
375
376                let Some(entry) = self.session_store.get_index_entry(&session_id).await else {
377                    return Err(ToolError::Execution(format!(
378                        "session not found: {session_id}"
379                    )));
380                };
381
382                Ok(ToolResult {
383                    success: true,
384                    result: json!({ "session": map_index_entry(&entry) }).to_string(),
385                    display_preference: Some("Collapsible".to_string()),
386                })
387            }
388
389            SessionInspectorArgs::ReadMessages {
390                session_id,
391                from_end,
392                offset,
393                limit,
394                truncate_chars,
395                include_system,
396                include_tool,
397                include_tool_calls,
398                include_image_urls,
399            } => {
400                let session_id = session_id.trim().to_string();
401                if session_id.is_empty() {
402                    return Err(ToolError::InvalidArguments(
403                        "session_id must be a non-empty string".to_string(),
404                    ));
405                }
406
407                let from_end = from_end.unwrap_or(true);
408                let offset = offset.unwrap_or(0).min(50_000);
409                let limit = limit.unwrap_or(40).min(200);
410                let truncate_chars = truncate_chars.unwrap_or(800).min(4000);
411                let include_system = include_system.unwrap_or(false);
412                let include_tool = include_tool.unwrap_or(true);
413                let include_tool_calls = include_tool_calls.unwrap_or(false);
414                let include_image_urls = include_image_urls.unwrap_or(true);
415
416                let session = self.load_session(&session_id).await?;
417                let total = session.messages.len();
418
419                let mut messages: Vec<(usize, &Message)> = session
420                    .messages
421                    .iter()
422                    .enumerate()
423                    .filter(|(_, m)| {
424                        if !include_system && matches!(m.role, Role::System) {
425                            return false;
426                        }
427                        if !include_tool && matches!(m.role, Role::Tool) {
428                            return false;
429                        }
430                        true
431                    })
432                    .collect();
433
434                // Slice by index in the filtered sequence to keep semantics stable.
435                let filtered_total = messages.len();
436                let (start, end) = if from_end {
437                    let end = filtered_total.saturating_sub(offset);
438                    let start = end.saturating_sub(limit);
439                    (start, end)
440                } else {
441                    let start = offset.min(filtered_total);
442                    let end = (start + limit).min(filtered_total);
443                    (start, end)
444                };
445
446                let slice = messages
447                    .drain(start..end)
448                    .map(|(idx, m)| {
449                        let tool_calls_count = m.tool_calls.as_ref().map(|v| v.len()).unwrap_or(0);
450                        let image_urls = if include_image_urls {
451                            extract_image_urls(m)
452                        } else {
453                            Vec::new()
454                        };
455                        json!({
456                            "index": idx,
457                            "id": m.id,
458                            "role": role_to_str(&m.role),
459                            "created_at": m.created_at,
460                            "content_len": m.content.len(),
461                            "content": truncate_string(&m.content, truncate_chars),
462                            "has_images": !image_urls.is_empty(),
463                            "image_urls": image_urls,
464                            "tool_calls_count": tool_calls_count,
465                            "tool_call_id": if include_tool_calls { m.tool_call_id.clone() } else { None },
466                        })
467                    })
468                    .collect::<Vec<_>>();
469
470                Ok(ToolResult {
471                    success: true,
472                    result: json!({
473                        "session_id": session_id,
474                        "message_count_total": total,
475                        "message_count_filtered": filtered_total,
476                        "from_end": from_end,
477                        "offset": offset,
478                        "limit": limit,
479                        "slice_count": slice.len(),
480                        "messages": slice,
481                        "note": "If you need to read a lot of content, iterate with bounded read_messages calls. Only delegate to a child session if the user explicitly asks."
482                    })
483                    .to_string(),
484                    display_preference: Some("Collapsible".to_string()),
485                })
486            }
487
488            SessionInspectorArgs::ReadCompressedCache {
489                session_id,
490                offset,
491                limit,
492                truncate_chars,
493                include_summary,
494            } => {
495                let session_id = session_id.trim().to_string();
496                if session_id.is_empty() {
497                    return Err(ToolError::InvalidArguments(
498                        "session_id must be a non-empty string".to_string(),
499                    ));
500                }
501
502                let offset = offset.unwrap_or(0).min(1_000_000);
503                let limit = limit.unwrap_or(40).min(200);
504                let truncate_chars = truncate_chars.unwrap_or(1200).min(20_000);
505                let include_summary = include_summary.unwrap_or(true);
506
507                let sqlite_snapshot = self
508                    .session_store
509                    .search_index()
510                    .read_compressed_cache(&session_id, offset, limit, truncate_chars)
511                    .await;
512
513                let (source, summary, total_compressed, messages) = match sqlite_snapshot {
514                    Ok(snapshot) if snapshot.total_compressed_messages > 0 => (
515                        "sqlite_fts",
516                        if include_summary {
517                            snapshot.summary
518                        } else {
519                            None
520                        },
521                        snapshot.total_compressed_messages,
522                        snapshot
523                            .messages
524                            .into_iter()
525                            .map(|row| {
526                                json!({
527                                    "id": row.message_id,
528                                    "index": row.message_index,
529                                    "role": row.role,
530                                    "created_at": row.created_at,
531                                    "content_len": row.content_len,
532                                    "content": row.content,
533                                })
534                            })
535                            .collect::<Vec<_>>(),
536                    ),
537                    Ok(_) | Err(_) => {
538                        let session = self.load_session(&session_id).await?;
539                        let summary = if include_summary {
540                            session
541                                .conversation_summary
542                                .as_ref()
543                                .map(|value| value.content.clone())
544                        } else {
545                            None
546                        };
547                        let compressed_messages = session
548                            .messages
549                            .iter()
550                            .enumerate()
551                            .filter(|(_, message)| message.compressed)
552                            .collect::<Vec<_>>();
553                        let total = compressed_messages.len();
554                        let slice = compressed_messages
555                            .into_iter()
556                            .skip(offset)
557                            .take(limit)
558                            .map(|(index, message)| {
559                                json!({
560                                    "id": message.id,
561                                    "index": index,
562                                    "role": role_to_str(&message.role),
563                                    "created_at": message.created_at,
564                                    "content_len": message.content.chars().count(),
565                                    "content": truncate_string(&message.content, truncate_chars),
566                                })
567                            })
568                            .collect::<Vec<_>>();
569                        ("session_json_fallback", summary, total, slice)
570                    }
571                };
572
573                Ok(ToolResult {
574                    success: true,
575                    result: json!({
576                        "session_id": session_id,
577                        "source": source,
578                        "offset": offset,
579                        "limit": limit,
580                        "slice_count": messages.len(),
581                        "total_compressed_messages": total_compressed,
582                        "summary": summary,
583                        "messages": messages,
584                        "note": "Use this for bounded recall from compressed history. Prioritize current task list and recent turns when conflicts appear."
585                    })
586                    .to_string(),
587                    display_preference: Some("Collapsible".to_string()),
588                })
589            }
590
591            SessionInspectorArgs::Search {
592                query,
593                mode,
594                max_sessions,
595                tail_messages,
596                case_sensitive,
597                max_matches,
598            } => {
599                let q = query.trim();
600                if q.is_empty() {
601                    return Err(ToolError::InvalidArguments(
602                        "query must be a non-empty string".to_string(),
603                    ));
604                }
605                let case_sensitive = case_sensitive.unwrap_or(false);
606                let mode = mode
607                    .as_deref()
608                    .map(str::trim)
609                    .filter(|v| !v.is_empty())
610                    .unwrap_or("title")
611                    .to_ascii_lowercase();
612                let max_matches = max_matches.unwrap_or(50).min(200);
613
614                if !case_sensitive {
615                    match self
616                        .session_store
617                        .search_index()
618                        .search(q, max_matches)
619                        .await
620                    {
621                        Ok(fts_matches) if !fts_matches.is_empty() => {
622                            let matches = fts_matches
623                                .into_iter()
624                                .map(|m| {
625                                    json!({
626                                        "type": if m.match_type == "session" { "title_match" } else { "message_match" },
627                                        "session_id": m.session_id,
628                                        "session_title": m.session_title,
629                                        "session_kind": m.session_kind,
630                                        "root_session_id": m.root_session_id,
631                                        "parent_session_id": m.parent_session_id,
632                                        "pinned": m.pinned,
633                                        "updated_at": m.updated_at,
634                                        "rank": m.rank,
635                                        "message_id": m.message_id,
636                                        "message_index": m.message_index,
637                                        "role": m.role,
638                                        "content_preview": m.content_preview,
639                                    })
640                                })
641                                .collect::<Vec<_>>();
642
643                            return Ok(ToolResult {
644                                success: true,
645                                result: json!({
646                                    "query": q,
647                                    "mode": mode,
648                                    "case_sensitive": case_sensitive,
649                                    "search_backend": "sqlite_fts",
650                                    "matches": matches,
651                                    "note": "Results came from the local SQLite FTS session search index. Use read_messages for bounded inspection of matched sessions."
652                                })
653                                .to_string(),
654                                display_preference: Some("Collapsible".to_string()),
655                            });
656                        }
657                        Ok(_) => {}
658                        Err(error) => {
659                            tracing::warn!(
660                                "session_history FTS search failed for query '{}': {}. Falling back to in-memory scan.",
661                                q,
662                                error
663                            );
664                        }
665                    }
666                }
667
668                let entries = self.session_store.list_index_entries().await;
669                let mut results = Vec::new();
670
671                // Always search titles first.
672                for e in entries.iter() {
673                    if normalize_contains(&e.title, q, case_sensitive)
674                        || normalize_contains(&e.id, q, case_sensitive)
675                    {
676                        results.push(json!({
677                            "type": "title_match",
678                            "session": map_index_entry(e),
679                        }));
680                        if results.len() >= max_matches {
681                            break;
682                        }
683                    }
684                }
685
686                if mode != "title" && results.len() < max_matches {
687                    let max_sessions = max_sessions.unwrap_or(30).min(200);
688                    let tail_messages = tail_messages.unwrap_or(40).min(200);
689
690                    // Scan tail messages for additional matches (bounded).
691                    for e in entries.into_iter().take(max_sessions) {
692                        if results.len() >= max_matches {
693                            break;
694                        }
695                        let Ok(session) = self.storage.load_session(&e.id).await else {
696                            continue;
697                        };
698                        let Some(session) = session else {
699                            continue;
700                        };
701                        let start = session.messages.len().saturating_sub(tail_messages);
702                        for (idx, m) in session.messages.iter().enumerate().skip(start) {
703                            if results.len() >= max_matches {
704                                break;
705                            }
706                            if !normalize_contains(&m.content, q, case_sensitive) {
707                                continue;
708                            }
709                            results.push(json!({
710                                "type": "message_match",
711                                "session_id": e.id,
712                                "session_title": e.title,
713                                "message_index": idx,
714                                "message_id": m.id,
715                                "role": role_to_str(&m.role),
716                                "created_at": m.created_at,
717                                "content_preview": truncate_string(&m.content, 240),
718                            }));
719                        }
720                    }
721                }
722
723                Ok(ToolResult {
724                    success: true,
725                    result: json!({
726                        "query": q,
727                        "mode": mode,
728                        "case_sensitive": case_sensitive,
729                        "matches": results,
730                        "note": "Consider narrowing by session_id + read_messages. Keep summarization local unless the user explicitly asks for delegated child-session work."
731                    })
732                    .to_string(),
733                    display_preference: Some("Collapsible".to_string()),
734                })
735            }
736        }
737    }
738}