1use std::sync::Arc;
10
11use crate::{config::SearchConfig, embed::LazyEmbedder, sessions::Store};
12
13#[derive(Clone)]
18pub struct AppState {
19 pub store: Arc<Store>,
20 pub embedder: Arc<LazyEmbedder>,
21 pub search: SearchConfig,
22}
23
24pub mod http {
25 use std::net::{IpAddr, SocketAddr};
29
30 use anyhow::Context;
31 use axum::{
32 Json, Router,
33 extract::{DefaultBodyLimit, State},
34 http::{HeaderValue, StatusCode},
35 response::{IntoResponse, Response},
36 routing::post,
37 };
38 use rmcp::transport::streamable_http_server::{
39 StreamableHttpServerConfig, StreamableHttpService, session::local::LocalSessionManager,
40 };
41 use tokio::net::TcpListener;
42
43 use super::AppState;
44 use crate::{
45 handlers::{pond_get, pond_ingest, pond_search},
46 wire::{
47 ErrorCode, GetEnvelope, GetRequest, IngestEnvelope, IngestRequest, SearchEnvelope,
48 SearchRequest, default_namespace, new_request_id,
49 },
50 };
51
52 pub const HTTP_BODY_LIMIT_BYTES: usize = 8 * 1024 * 1024;
57
58 pub fn router(state: AppState) -> Router {
62 let mcp_state = state.clone();
63 let mcp = StreamableHttpService::new(
64 move || Ok(super::mcp::PondMcp::new(mcp_state.clone())),
65 LocalSessionManager::default().into(),
66 StreamableHttpServerConfig::default(),
67 );
68 Router::new()
69 .route("/v1/search", post(search))
70 .route("/v1/get", post(get))
71 .route("/v1/ingest", post(ingest))
72 .layer(DefaultBodyLimit::max(HTTP_BODY_LIMIT_BYTES))
73 .with_state(state)
74 .nest_service("/mcp", mcp)
75 }
76
77 pub async fn serve(state: AppState, host: String, port: u16) -> anyhow::Result<()> {
81 let ip: IpAddr = host
82 .parse()
83 .with_context(|| format!("invalid --host {host:?}"))?;
84 if ip.is_unspecified() {
85 tracing::warn!(
86 %host,
87 "binding to an unspecified address exposes pond on the LAN; \
88 the personal pond is single-user"
89 );
90 }
91 let listener = TcpListener::bind(SocketAddr::new(ip, port))
92 .await
93 .with_context(|| format!("failed to bind {host}:{port}"))?;
94 let local = listener
95 .local_addr()
96 .context("failed to read bound address")?;
97 tracing::info!(%local, "pond serve listening (HTTP /v1/*, MCP /mcp)");
98 axum::serve(listener, router(state))
99 .with_graceful_shutdown(shutdown_signal())
100 .await
101 .context("axum server error")
102 }
103
104 async fn shutdown_signal() {
105 let _ = tokio::signal::ctrl_c().await;
106 }
107
108 async fn search(
109 State(state): State<AppState>,
110 Json(mut request): Json<SearchRequest>,
111 ) -> Response {
112 request.namespace.get_or_insert_with(default_namespace);
113 let envelope = pond_search(&state.store, &state.embedder, request, &state.search).await;
114 let status = match &envelope {
115 SearchEnvelope::Success(_) => StatusCode::OK,
116 SearchEnvelope::Error(error) => status_for(&error.error.code),
117 };
118 with_request_id((status, Json(envelope)).into_response())
119 }
120
121 async fn get(State(state): State<AppState>, Json(mut request): Json<GetRequest>) -> Response {
122 request.namespace.get_or_insert_with(default_namespace);
123 let envelope = pond_get(&state.store, request).await;
124 let status = match &envelope {
125 GetEnvelope::Success(_) => StatusCode::OK,
126 GetEnvelope::Error(error) => status_for(&error.error.code),
127 };
128 with_request_id((status, Json(envelope)).into_response())
129 }
130
131 async fn ingest(
132 State(state): State<AppState>,
133 Json(mut request): Json<IngestRequest>,
134 ) -> Response {
135 request.namespace.get_or_insert_with(default_namespace);
136 let envelope = pond_ingest(&state.store, request).await;
137 let status = match &envelope {
141 IngestEnvelope::Success(_) => StatusCode::OK,
142 IngestEnvelope::Error(error) => status_for(&error.error.code),
143 };
144 with_request_id((status, Json(envelope)).into_response())
145 }
146
147 fn with_request_id(mut response: Response) -> Response {
148 if let Ok(value) = HeaderValue::from_str(&new_request_id()) {
149 response.headers_mut().insert("x-pond-request-id", value);
150 }
151 response
152 }
153
154 fn status_for(code: &ErrorCode) -> StatusCode {
157 match code {
158 ErrorCode::ValidationFailed
159 | ErrorCode::VersionUnsupported
160 | ErrorCode::NamespaceUnknown => StatusCode::BAD_REQUEST,
161 ErrorCode::NotFound => StatusCode::NOT_FOUND,
162 ErrorCode::Conflict => StatusCode::CONFLICT,
163 ErrorCode::StorageUnavailable => StatusCode::SERVICE_UNAVAILABLE,
164 ErrorCode::Internal => StatusCode::INTERNAL_SERVER_ERROR,
165 }
166 }
167}
168
169pub mod mcp {
170 use anyhow::Context;
177 use base64::{Engine, engine::general_purpose::STANDARD};
178 use rmcp::{
179 ErrorData, RoleServer, ServerHandler, ServiceExt,
180 handler::server::{router::tool::ToolRouter, wrapper::Parameters},
181 model::{
182 AnnotateAble, CallToolResult, Content, ErrorCode as JsonRpcErrorCode, Implementation,
183 ListResourcesResult, ListToolsResult, Meta, PaginatedRequestParams, RawResource,
184 ReadResourceRequestParams, ReadResourceResult, ResourceContents, ServerCapabilities,
185 ServerInfo,
186 },
187 schemars,
188 service::RequestContext,
189 tool, tool_handler, tool_router,
190 transport::stdio,
191 };
192 use serde::Deserialize;
193 use uuid::Uuid;
194
195 use super::AppState;
196 use crate::{
197 PROTOCOL_VERSION,
198 handlers::pond_get as run_get,
199 handlers::pond_search as run_search,
200 sql,
201 substrate::Table,
202 wire::{
203 ErrorCode as WireErrorCode, ErrorEnvelope, GetEnvelope, GetRequest, GetResponse,
204 GetResult, MessageView, PartKind, PartSummary, ProjectFilter, ResponseMode,
205 ResponsePart, SearchEnvelope, SearchFilters, SearchRequest, SearchResponse,
206 SessionFrom, default_namespace,
207 },
208 };
209
210 const SCHEMA_DOC: &str = "\
213pond_search filters: query (semantic - concepts, not project names), limit \
214(returned sessions; default 10, max 200 - also the want-more knob, there is \
215no pagination), project (path substring), session_id (exact session match - \
216semantic search within one session), source_agent, from_date / to_date \
217(YYYY-MM-DD), format (text|json).
218
219pond_search response: a transcript (or structured JSON when format=json). The \
220first line states totals (`matched_total` is the message count before `limit` \
221and byte-budget truncation), then results are grouped by session, ordered by \
222each session's best hit. Each session lists up to 3 top-scoring hits, \
223score-desc; each hit is a `--- [n] score | role | time | message_id | project \
224| agent | session ---` rule followed by its matched text (a ~600-char indexed \
225window). `score` is normalized to [0.0, 1.0] within one response. `has_more` \
226warns the ranked set was cut by `limit` or the byte budget - raise `limit` to \
227see the rest.
228
229pond_search multilingual: pond's embedder (multilingual-e5-small) is trained \
230for cross-lingual retrieval, so a query in language A can match indexed text \
231in language B via the vector arm. The FTS arm is character-ngram-based and \
232only matches surface tokens, so for cross-lingual queries expect most signal \
233to come from the vector arm.
234
235pond_get: message_id (the target message, marked `>`, plus context_depth \
236sibling messages each side) OR session_id (the whole session). Output is a \
237transcript - each message is a `--- [n] role | time | message_id ---` rule, \
238then its text/content as real lines, then parts (`-> name [call_id]` tool \
239call, `<- name [call_id] (ok|failed)` result). Session mode takes \
240response_mode: \"conversational\" (default - human/model text only), \
241\"complete\" (all messages incl. carriers, tools as one-liners), or \
242\"verbatim\" (full part bodies inline; heaviest). limit defaults to 20, caps \
243at 1000. Bounded by a size budget: when the footer shows `after_id=`, pass it \
244back to page. A whole-session response also lists the session's subagents (each \
245stored as its own session) in a footer; pass a listed id back as session_id to \
246open it. Not for bulk export - use `pond export`.";
247
248 const SQL_SCHEMA_DOC: &str = "\
259pond_sql_query runs ONE read-only SELECT (DataFusion SQL, PostgreSQL-compatible) \
260over three registered tables. Read-only is hard-enforced: anything other than a \
261single SELECT/WITH (or EXPLAIN of one) is rejected (no INSERT/UPDATE/DELETE/\
262CREATE/DROP/COPY/SET).
263
264Routing - pick the right surface before writing SQL:
265- counts, group-by, time buckets, joins over metadata -> this tool, on \
266messages/sessions.
267- which tools ran / failed, tool params -> this tool, on parts (type = \
268'tool_call' / 'tool_result'); worked example below.
269- find text in conversations -> WHERE contains_tokens(search_text, '...') to \
270filter, FROM fts('messages', ...) to rank (both below), or pond_search for \
271meaning-based recall. Never LIKE over parts - tool bodies are JSON with no \
272substring index (yet), and the conversational text is messages.search_text.
273- read a transcript (a session, a message with context) -> pond_get, not SQL.
274
275Tables and columns:
276- messages(session_id text, message_id text, timestamp timestamp(us, UTC), role \
277text {user|assistant|system|tool}, source_agent text, project text, content text \
278NULL [system-role messages only], search_text text NULL [the conversational text - \
279null for system/tool messages], embedding_model text NULL, options json). The \
280embedding `vector` column exists but is never returned (omitted from results) and \
281explicit projection of it is rejected; you may still filter on it in WHERE, e.g. \
282`vector IS NOT NULL`. For semantic search, use pond_search.
283- sessions(session_id text, parent_session_id text NULL, parent_message_id text \
284NULL, source_agent text, created_at timestamp(us, UTC), project text, options json).
285- parts(session_id text, message_id text, id text, ordinal int, type text \
286{text|reasoning|file|tool_call|tool_result|tool_approval_request|\
287tool_approval_response - exact strings, underscores not hyphens}, provenance \
288text {conversational|injected}, variant_data json, options json). The verbatim \
289part body lives in `variant_data`; its fields follow the part type, e.g. \
290tool_call carries {call_id, name, params}, tool_result carries {call_id, name, \
291is_failure, result}, text/reasoning carry {text}. FilePart binary payloads are \
292not exposed in SQL.
293Enum literals matter: a wrong value (e.g. 'tool-call') is valid SQL and silently \
294returns zero rows. Discovery from SQL works too: SELECT table_name, column_name, \
295data_type FROM information_schema.columns.
296
297Join keys: messages.session_id = sessions.session_id; parts.session_id = \
298messages.session_id AND parts.message_id = messages.message_id. Subagents are \
299sessions whose source_agent matches '%/%' (e.g. 'claude-code/general-purpose').
300
301Indexed (fast) filter columns: messages.project / session_id / timestamp / role / \
302source_agent / message_id; parts.session_id / message_id; sessions.session_id. \
303Prefer equality/range predicates on these. Known limitation: prefix LIKE ('x%') and starts_with() FAIL \
304on bitmap-indexed columns (messages.source_agent, messages.role) with \"LIKE \
305prefix queries are not supported for bitmap indexes\". Workarounds: equality, \
306split_part(source_agent, '/', 1) = 'claude-code', or an infix pattern \
307(LIKE '%/%' is fine - leading-wildcard patterns are not pushed to the index).
308
309JSON columns (options, variant_data) are binary JSONB. Rules:
310- NEVER CAST a JSON column (`variant_data::text` is rejected at plan time - the \
311binary encoding can otherwise silently render as garbage). Stringify with \
312json_extract(col, '$').
313- A leading-wildcard LIKE over the whole document \
314(`json_extract(variant_data, '$') LIKE '%...%'`) is rejected at plan time: it \
315stringifies and scans every row and never finishes over parts. Match a single \
316field (`json_extract(variant_data, '$.field') LIKE '...'`), scope to one session, \
317or use contains_tokens for conversational text. (Substring search over tool \
318bodies arrives with the FM-Index, #47.)
319- json_extract(col, '$.a.b') takes a full JSONPath and returns JSON text of ANY \
320value (objects/arrays serialize) - the right call for deeply nested or mixed-type \
321fields, e.g. json_extract(variant_data, '$.params.command').
322- json_get_string|json_get_int|json_get_float|json_get_bool(col, 'key', ...) walk \
323a key path - json_get_string(options, 'anthropic', 'model') - array steps by \
324numeric index. json_get_string serializes non-string values; the typed getters \
325return NULL on a non-coercible value.
326- json_get(col, 'key') returns JSONB for chaining: \
327json_get_string(json_get(variant_data, 'params'), 'command').
328- Also: json_array_contains(col, 'key', value), json_array_length(col, 'key').
329
330Worked example - tool usage and failure rates over the last week:
331
332 SELECT json_get_string(c.variant_data, 'name') AS tool,
333 COUNT(*) AS calls,
334 SUM(CASE WHEN json_get_bool(r.variant_data, 'is_failure') THEN 1 \
335ELSE 0 END) AS failures
336 FROM parts c
337 JOIN messages m ON m.session_id = c.session_id AND m.message_id = c.message_id
338 LEFT JOIN parts r ON r.session_id = c.session_id
339 AND r.type = 'tool_result'
340 AND json_get_string(r.variant_data, 'call_id') = \
341json_get_string(c.variant_data, 'call_id')
342 WHERE c.type = 'tool_call' AND m.timestamp >= now() - INTERVAL '7 days'
343 GROUP BY tool ORDER BY calls DESC;
344
345Full-text search in SQL is a pair - filter form and ranked form:
346- Filtering (WHERE): contains_tokens(search_text, 'word1 word2') - true when the \
347text contains ALL the words (split on punctuation/whitespace, case-sensitive \
348tokens); accelerated by the FTS index. The right tool for exact strings, \
349identifiers, and error messages - compose freely with other predicates: \
350SELECT message_id FROM messages WHERE contains_tokens(search_text, 'OCC retry') \
351AND project LIKE '%pond%'.
352- Ranking (FROM): the fts() table function returns matches plus `_score` (BM25 \
353relevance, a regular projectable column): SELECT message_id, _score, search_text \
354FROM fts('messages', '{\"match\":{\"column\":\"search_text\",\"terms\":\"...\"}}') \
355ORDER BY _score DESC - compose with WHERE/JOIN/GROUP BY around it. AND semantics: \
356add \"operator\":\"And\" to the match; \"boolean\" queries (must/should/must_not \
357over match clauses) also work. \"phrase\" queries are unavailable (index built \
358without positions) - use contains_tokens or match + operator And, optionally with \
359LIKE post-filters, for exact substrings.
360fts() in WHERE is a plan-time error that points back here. Unlike pond_search, \
361both forms cover subagent sessions (filter them out with WHERE NOT (source_agent \
362LIKE '%/%') if unwanted). Vector/semantic search is NOT available in SQL; use \
363pond_search for that.
364
365Function quick-reference (exact DataFusion names so the model doesn't have to \
366guess):
367- aggregates: count, count(distinct ...), sum, avg, min, max, any_value, stddev, \
368median, approx_distinct, approx_percentile_cont, array_agg, string_agg
369- date/time: now(), date_trunc('day'|'hour'|'minute'|..., ts), date_part('year'|..., \
370ts), date_bin(interval, ts, origin), to_char(ts, fmt), to_timestamp(text), \
371extract(field FROM ts), age(t1, t2)
372- intervals: `INTERVAL '7 days'`, `INTERVAL '1 hour'` (single-quoted, postgres-style)
373- string: length, lower, upper, substr, position, split_part, regexp_like, \
374regexp_match, regexp_replace, like, ilike, starts_with, ends_with, concat, \
375concat_ws
376- text search: contains_tokens(col, 'words') in WHERE; fts(table, query_json) in \
377FROM (see above)
378- numeric: round, floor, ceil, abs, sign, log, exp, power, sqrt
379- conditional: CASE WHEN ... THEN ... ELSE ..., coalesce, nullif, greatest, least
380- cast: CAST(x AS TYPE) or x::TYPE - but never on JSON columns (see the JSON \
381rules above)
382Quote identifiers with double quotes when they collide with keywords (e.g. \
383\"timestamp\"); string literals use single quotes.
384
385EXPLAIN is allowed: `EXPLAIN <query>` or `EXPLAIN ANALYZE <query>` returns the \
386DataFusion plan (and per-operator timings for ANALYZE) so you can self-diagnose \
387slow queries without leaving SQL.
388
389Output modes (the `format` arg):
390- text (default): a row-capped rendered ASCII table with a header showing \
391`{total_rows} in {elapsed_ms} ms; showing {shown}` and, on truncation, a \
392keyset-pagination hint.
393- json: same row-capped payload as `text` but delivered as a JSON object \
394{total_rows, shown_rows, truncated, elapsed_ms, columns, rows: [{col: val, ...}]}. \
395Spec-compliant dual delivery: the structured JSON rides MCP's `structuredContent` \
396field; clients that don't surface that channel get the same JSON as a text block. \
397Empirically validated on Claude Code 2.1.165 - the agent reads the structured form.
398- parquet | ndjson: write the FULL result set to a file and return a \
399`pond-sql-export://<id>` resource link; read it via MCP resources/read. On a \
400local/stdio install the response also names the on-disk path so you can open it \
401directly with duckdb/polars.
402
403Pagination - keyset (preferred):
404Use ORDER BY on indexed columns plus a composite seek key for stable tie-breaking. \
405The agent owns the cursor (the last sort value it saw); no server-side state.
406
407 -- page 1: most recent 100 messages in pond
408 SELECT message_id, timestamp, role, project
409 FROM messages
410 WHERE project LIKE '%pond%'
411 ORDER BY timestamp DESC, message_id DESC
412 LIMIT 100;
413
414 -- page 2: pass back the last (timestamp, message_id) the agent saw
415 SELECT message_id, timestamp, role, project
416 FROM messages
417 WHERE project LIKE '%pond%'
418 AND (timestamp, message_id) < (TIMESTAMP '2026-06-05T08:14:22.123456Z', 'last-id')
419 ORDER BY timestamp DESC, message_id DESC
420 LIMIT 100;
421
422Keyset stays stable across concurrent ingest (older rows don't shift) and uses \
423the btree on `timestamp`/`message_id` directly. For known-bounded full results, skip \
424pagination entirely: format=parquet writes everything in one call. OFFSET works \
425but scans-and-discards prior rows and shifts pages under writes - prefer keyset.
426
427Drilling from aggregates to content (instead of N round-trips of pond_get):
428JOIN to messages/parts directly. Example - top 10 longest sessions with first \
429user message:
430
431 WITH top_sessions AS (
432 SELECT session_id, COUNT(*) AS msgs
433 FROM messages
434 GROUP BY session_id
435 ORDER BY msgs DESC
436 LIMIT 10
437 )
438 SELECT ts.session_id, ts.msgs, s.project, s.source_agent,
439 m.search_text AS first_user_msg
440 FROM top_sessions ts
441 JOIN sessions s ON s.session_id = ts.session_id
442 LEFT JOIN messages m
443 ON m.session_id = ts.session_id
444 AND m.role = 'user'
445 AND m.timestamp = (
446 SELECT MIN(timestamp) FROM messages
447 WHERE session_id = ts.session_id AND role = 'user'
448 );
449
450One call, agent picks exactly which columns to hydrate. When you want the \
451pond_get-style rendered transcript (tool-call lines, subagent footer), call \
452pond_get with the session_id - that's its job.
453
454Examples (4 patterns the agent should recognize):
455
456 -- 1. Activity by project this week
457 SELECT project, COUNT(*) AS msgs, COUNT(DISTINCT session_id) AS sessions
458 FROM messages
459 WHERE timestamp >= now() - INTERVAL '7 days'
460 GROUP BY project
461 ORDER BY msgs DESC
462 LIMIT 20;
463
464 -- 2. Subagent breakdown
465 SELECT source_agent, COUNT(*) AS n
466 FROM sessions
467 WHERE source_agent LIKE '%/%'
468 GROUP BY source_agent
469 ORDER BY n DESC;
470
471 -- 3. Text filter in WHERE (all words must appear), composed with metadata
472 SELECT message_id, timestamp, project, substr(search_text, 1, 120) AS preview
473 FROM messages
474 WHERE contains_tokens(search_text, 'race condition')
475 AND timestamp >= now() - INTERVAL '30 days'
476 ORDER BY timestamp DESC
477 LIMIT 50;
478
479 -- 4. BM25 search in FROM, joined with metadata, relevance-ranked
480 SELECT m.session_id, m.timestamp, m.project, f._score, m.search_text
481 FROM fts('messages', \
482'{\"match\":{\"column\":\"search_text\",\"terms\":\"race condition\"}}') f
483 JOIN messages m ON m.message_id = f.message_id
484 WHERE m.project LIKE '%pond%'
485 ORDER BY f._score DESC
486 LIMIT 50;";
487
488 #[derive(Debug, Deserialize, schemars::JsonSchema)]
490 struct McpSearchParams {
491 query: String,
495 #[serde(default)]
498 limit: Option<usize>,
499 #[serde(default)]
501 project: Option<String>,
502 #[serde(default)]
505 session_id: Option<String>,
506 #[serde(default)]
509 source_agent: Option<String>,
510 #[serde(default)]
514 include_subagents: Option<bool>,
515 #[serde(default)]
517 from_date: Option<String>,
518 #[serde(default)]
520 to_date: Option<String>,
521 #[serde(default)]
524 format: Option<String>,
525 }
526
527 #[derive(Debug, Deserialize, schemars::JsonSchema)]
530 struct McpGetParams {
531 #[serde(default)]
535 message_id: Option<String>,
536 #[serde(default)]
538 session_id: Option<String>,
539 #[serde(default)]
541 context_depth: Option<usize>,
542 #[serde(default)]
545 limit: Option<usize>,
546 #[serde(default)]
552 response_mode: Option<String>,
553 #[serde(default)]
558 session_from: Option<String>,
559 #[serde(default)]
562 after_id: Option<String>,
563 }
564
565 #[derive(Debug, Deserialize, schemars::JsonSchema)]
567 struct McpSqlParams {
568 #[serde(alias = "sql")]
588 query: String,
589 #[serde(default)]
595 format: Option<String>,
596 }
597
598 fn parse_session_from(value: Option<String>) -> SessionFrom {
599 match value.as_deref() {
600 Some("end") => SessionFrom::End,
601 _ => SessionFrom::Start,
602 }
603 }
604
605 fn parse_response_mode(value: Option<String>) -> ResponseMode {
606 match value.as_deref() {
607 Some("complete") => ResponseMode::Complete,
608 Some("verbatim") => ResponseMode::Verbatim,
609 _ => ResponseMode::Conversational,
611 }
612 }
613
614 #[derive(Clone)]
616 pub struct PondMcp {
617 state: AppState,
618 tool_router: ToolRouter<PondMcp>,
619 }
620
621 #[tool_router]
622 impl PondMcp {
623 pub fn new(state: AppState) -> Self {
624 Self {
625 state,
626 tool_router: Self::tool_router(),
627 }
628 }
629
630 #[tool(
631 description = "Hybrid (vector + BM25) search over stored conversation history. \
632 Returns a readable transcript: a leading `key:` line explains the \
633 format and the first line states totals plus how many searchable \
634 messages the filters left in scope (the absence signal - search only \
635 sees conversational text, never tool calls/results), then results are \
636 grouped by session, ordered by each session's best hit. Each hit is a \
637 `--- [n] score | role | time | message_id | project | agent | session \
638 ---` delimiter rule followed by the matched text. Pass a returned \
639 `message_id` to `pond_get` for full text. Common args: \
640 query (semantic - concepts, not project names), then project / \
641 from_date / to_date to scope, limit to widen (no pagination - raise \
642 limit for more). Advanced: source_agent (e.g. \"claude-code\", or \
643 \"claude-code/general-purpose\" for subagents), session_id (search \
644 within one long session), include_subagents (subagent sessions are \
645 excluded by default), format (\"text\" default, or \"json\" for \
646 structured hits). \
647 Scores are relative within one response; there is no min_score. For \
648 exact strings, identifiers, or error messages, pond_sql_query is the \
649 sharper tool - WHERE contains_tokens(search_text, 'words') to \
650 filter, FROM fts('messages', ...) for BM25 ranking - and it sees \
651 subagent sessions too.",
652 annotations(read_only_hint = true, idempotent_hint = true, open_world_hint = false)
653 )]
654 async fn pond_search(
655 &self,
656 Parameters(params): Parameters<McpSearchParams>,
657 ) -> Result<CallToolResult, ErrorData> {
658 let json = matches!(params.format.as_deref(), Some("json"));
659 let request = SearchRequest {
660 protocol_version: PROTOCOL_VERSION,
661 namespace: Some(default_namespace()),
662 query: params.query,
663 filters: SearchFilters {
664 project: params.project.map(ProjectFilter::Contains),
665 session_id: params.session_id,
666 source_agent: params.source_agent,
667 from_date: params.from_date,
668 to_date: params.to_date,
669 min_score: 0.0,
674 include_subagents: params.include_subagents.unwrap_or(false),
675 },
676 limit: params.limit.unwrap_or(10),
677 mode_override: None,
678 };
679 match run_search(
680 &self.state.store,
681 &self.state.embedder,
682 request.clone(),
683 &self.state.search,
684 )
685 .await
686 {
687 SearchEnvelope::Success(response) if json => {
688 Ok(CallToolResult::structured(
691 serde_json::to_value(&response).map_err(|error| {
692 ErrorData::internal_error(
693 format!("failed to serialize search response: {error}"),
694 None,
695 )
696 })?,
697 ))
698 }
699 SearchEnvelope::Success(response) => {
700 Ok(tool_result(render_search_transcript(&response, &request)))
701 }
702 SearchEnvelope::Error(envelope) => Err(to_error_data(&envelope)),
703 }
704 }
705
706 #[tool(
707 description = "Retrieve stored conversation content as a readable transcript \
708 (a leading `key:` line explains the format). Common: session_id \
709 (whole session; pair with response_mode \
710 conversational|complete|verbatim) OR message_id (that message \
711 marked `>`, plus context_depth sibling messages each side, with \
712 its tool/file parts in full). A session_id response lists the \
713 session's subagents in a footer so you can open each. Advanced: \
714 limit (cap), after_id (paging - pass the value the footer shows), \
715 session_from (\"start\"|\"end\"; \"end\" returns the most recent \
716 messages, \
717 e.g. to recover context after compaction). \
718 Tool/result lines render as `-> name [call_id]` / `<- name \
719 [call_id] (ok|failed)`. Not for bulk export - use `pond export`.",
720 annotations(read_only_hint = true, idempotent_hint = true, open_world_hint = false)
721 )]
722 async fn pond_get(
723 &self,
724 Parameters(params): Parameters<McpGetParams>,
725 ) -> Result<CallToolResult, ErrorData> {
726 let request = GetRequest {
727 protocol_version: PROTOCOL_VERSION,
728 namespace: Some(default_namespace()),
729 session_id: params.session_id,
730 message_id: params.message_id,
731 context_depth: params.context_depth.unwrap_or(0),
732 limit: params.limit.unwrap_or(20),
733 response_mode: parse_response_mode(params.response_mode),
734 session_from: parse_session_from(params.session_from),
735 after_id: params.after_id,
736 };
737 match run_get(&self.state.store, request.clone()).await {
738 GetEnvelope::Success(response) => {
739 let mut transcript = render_get_transcript(&response, &request);
740 if request.message_id.is_none()
746 && request.after_id.is_none()
747 && let Ok(children) =
748 self.state.store.child_sessions(&response.session.id).await
749 && !children.is_empty()
750 {
751 transcript.push_str(&render_subagents_footer(&children));
752 }
753 Ok(tool_result(transcript))
754 }
755 GetEnvelope::Error(envelope) => Err(to_error_data(&envelope)),
756 }
757 }
758
759 #[tool(
760 description = "Run ONE read-only SQL query (DataFusion / PostgreSQL-compatible) \
761 over the stored corpus as three tables: sessions, messages, parts. \
762 For filtering, joins, and aggregation (counts, group-by, time \
763 buckets) - the analytic complement to pond_search's semantic \
764 recall. SELECT/WITH only (or EXPLAIN of one); writes and side- \
765 effecting statements are rejected. The exact column lists are in \
766 the `query` parameter description - use those names, do not guess \
767 (column discovery also works: SELECT column_name FROM \
768 information_schema.columns WHERE table_name = 'messages'). \
769 Routing: metadata analytics -> SQL on messages/sessions; tool-call \
770 analytics -> parts WHERE type = 'tool_call' with \
771 json_get_string(variant_data, 'name'); text search -> WHERE \
772 contains_tokens(search_text, 'words') to filter or FROM \
773 fts('messages', '{...json...}') for BM25-ranked results, or \
774 pond_search for semantic recall; reading a transcript -> pond_get, \
775 not SQL. The embedding `vector` column is never returned (explicit \
776 projection is rejected; filtering in WHERE is fine). Control row \
777 count with SQL `LIMIT`; inline output (format text|json) is capped \
778 at 100 rows. format defaults to text (a row-capped rendered table); \
779 set format=json for a structured JSON payload (delivered via MCP \
780 structuredContent), or format=parquet|ndjson to write the full \
781 result to a file returned as a pond-sql-export:// resource. Read \
782 resource schema://pond-sql \
783 for joins, indexed columns, JSON access rules, the function \
784 quick-reference, pagination + drilling patterns, and worked \
785 examples.",
786 annotations(read_only_hint = true, idempotent_hint = true, open_world_hint = false)
787 )]
788 async fn pond_sql_query(
789 &self,
790 Parameters(params): Parameters<McpSqlParams>,
791 ) -> Result<CallToolResult, ErrorData> {
792 let mode = match params.format.as_deref() {
793 None | Some("text") => sql::Mode::Inline,
794 Some("json") => sql::Mode::InlineJson,
795 Some("parquet") => sql::Mode::Export(sql::Format::Parquet),
796 Some("ndjson") => sql::Mode::Export(sql::Format::Ndjson),
797 Some(other) => {
798 return Ok(CallToolResult::error(vec![Content::text(format!(
799 "unknown format {other:?}; use \"text\", \"json\", \"parquet\", \
800 or \"ndjson\""
801 ))]));
802 }
803 };
804 let inline_rows = sql::DEFAULT_INLINE_ROWS;
805
806 let store = &self.state.store;
809 let tables = match tokio::try_join!(
810 store.dataset(Table::Sessions),
811 store.dataset(Table::Messages),
812 store.dataset(Table::Parts),
813 ) {
814 Ok((sessions, messages, parts)) => sql::Tables {
815 sessions,
816 messages,
817 parts,
818 },
819 Err(_) => {
820 return Err(ErrorData::internal_error(
821 "sql datasets unavailable".to_owned(),
822 None,
823 ));
824 }
825 };
826
827 match sql::run(&tables, ¶ms.query, mode, inline_rows).await {
828 Ok(sql::Outcome::Inline(text)) => Ok(tool_result(text)),
829 Ok(sql::Outcome::InlineJson(value)) => Ok(CallToolResult::structured(value)),
830 Ok(sql::Outcome::Export {
831 bytes,
832 format,
833 rows,
834 columns,
835 }) => {
836 let name = format!("{}.{}", Uuid::now_v7(), format.ext());
837 match store.export_write(&name, &bytes).await {
838 Ok(_) => Ok(export_result(
839 store,
840 &name,
841 format,
842 rows,
843 &columns,
844 bytes.len(),
845 )),
846 Err(error) => Err(ErrorData::internal_error(
847 format!("export write failed: {error}"),
848 None,
849 )),
850 }
851 }
852 Err(sql::SqlError::Query(message)) => {
853 Ok(CallToolResult::error(vec![Content::text(message)]))
854 }
855 Err(sql::SqlError::Infra(error)) => Err(ErrorData::internal_error(
856 format!("sql execution failed: {error}"),
857 None,
858 )),
859 }
860 }
861 }
862
863 #[tool_handler(router = self.tool_router)]
867 impl ServerHandler for PondMcp {
868 fn get_info(&self) -> ServerInfo {
869 ServerInfo::new(
870 ServerCapabilities::builder()
871 .enable_tools()
872 .enable_resources()
873 .build(),
874 )
875 .with_server_info(Implementation::new("pond", env!("CARGO_PKG_VERSION")))
878 .with_instructions(
879 "pond recalls past agent sessions (Claude Code and others) - prior work, \
880 decisions, and context across sessions, not the live conversation. \
881 Workflow: pond_search to find relevant messages, then pond_get to read \
882 full text by message_id or a whole session by session_id; both return \
883 readable transcripts, not JSON. Scope with filters, not the query: project \
884 (path substring), session_id, source_agent, from_date / to_date - \
885 keep query semantic (concepts, not project names). Scores are relative \
886 within one response; there is no min_score. Subagents are stored as their \
887 own sessions (source_agent like \"claude-code/general-purpose\"); pond_get \
888 on a parent session lists them in a footer so you can open each. Recover \
889 context lost to compaction: find this session via pond_search (a distinctive \
890 recent topic + project + from_date=today), then pond_get(session_id, \
891 session_from=\"end\") for the recent pre-compaction turns. Deeper \
892 reference on demand: resource schema://pond (all filters + response format), \
893 stats://pond (corpus + embedding stats). For structured/analytic queries \
894 (filtering, joins, counts, group-by) use pond_sql_query: read-only SQL \
895 (SELECT only) over the sessions/messages/parts tables, with optional \
896 parquet/ndjson export; see resource schema://pond-sql. Search only indexes \
897 conversational text (tool calls/results are invisible to it), and a \
898 zero/weak result is not proof of absence - for exact strings, \
899 identifiers, or error messages run pond_sql_query with WHERE \
900 contains_tokens(search_text, 'words') (all words must match; \
901 index-accelerated), or FROM fts('messages', \
902 '{\"match\":{\"column\":\"search_text\",\"terms\":\"...\"}}') for \
903 BM25-ranked results; both cover subagent sessions too.",
904 )
905 }
906
907 async fn list_resources(
908 &self,
909 _request: Option<PaginatedRequestParams>,
910 _context: RequestContext<RoleServer>,
911 ) -> Result<ListResourcesResult, ErrorData> {
912 Ok(ListResourcesResult {
913 resources: vec![
914 RawResource::new("schema://pond", "pond search schema").no_annotation(),
915 RawResource::new("schema://pond-sql", "pond SQL table schema").no_annotation(),
916 RawResource::new("stats://pond", "pond corpus stats").no_annotation(),
917 ],
918 next_cursor: None,
919 meta: None,
920 })
921 }
922
923 async fn read_resource(
924 &self,
925 request: ReadResourceRequestParams,
926 _context: RequestContext<RoleServer>,
927 ) -> Result<ReadResourceResult, ErrorData> {
928 match request.uri.as_str() {
929 "schema://pond" => Ok(ReadResourceResult::new(vec![ResourceContents::text(
930 SCHEMA_DOC,
931 request.uri,
932 )])),
933 "schema://pond-sql" => Ok(ReadResourceResult::new(vec![ResourceContents::text(
934 SQL_SCHEMA_DOC,
935 request.uri,
936 )])),
937 uri if uri.starts_with("pond-sql-export://") => {
941 let name = uri.trim_start_matches("pond-sql-export://").to_owned();
942 if !valid_export_name(&name) {
943 return Err(ErrorData::resource_not_found(
944 format!("invalid export id: {name}"),
945 None,
946 ));
947 }
948 let bytes = self.state.store.export_read(&name).await.map_err(|error| {
949 ErrorData::resource_not_found(format!("export not found: {error}"), None)
950 })?;
951 let contents = if name.ends_with(".ndjson") {
952 ResourceContents::text(
953 String::from_utf8_lossy(&bytes).into_owned(),
954 request.uri,
955 )
956 .with_mime_type("application/x-ndjson")
957 } else {
958 ResourceContents::blob(STANDARD.encode(&bytes), request.uri)
959 .with_mime_type("application/vnd.apache.parquet")
960 };
961 Ok(ReadResourceResult::new(vec![contents]))
962 }
963 "stats://pond" => {
964 let store = &self.state.store;
965 let map_err = |error: anyhow::Error| {
966 ErrorData::internal_error(format!("stats unavailable: {error}"), None)
967 };
968 let (sessions, messages, parts) = store.row_counts().await.map_err(&map_err)?;
969 let embedding = store.embedding_progress().await.map_err(&map_err)?;
970 let stale = store.stale_embedding_count().await.map_err(&map_err)?;
971 let indices = store.index_status().await.map_err(&map_err)?;
972
973 let embedded_percent = if embedding.total == 0 {
974 0.0
975 } else {
976 #[allow(clippy::cast_precision_loss)]
977 let pct = (embedding.embedded as f64 / embedding.total as f64) * 100.0;
978 (pct * 10.0).round() / 10.0
979 };
980 let index_rows = indices
981 .iter()
982 .map(|status| {
983 serde_json::json!({
984 "table": status.table.as_str(),
985 "intent": status.intent_name,
986 "exists": status.exists,
987 "fragments_covered": status.fragments_covered,
988 "unindexed_rows": status.unindexed_rows,
989 })
990 })
991 .collect::<Vec<_>>();
992
993 let stats = serde_json::json!({
998 "corpus": {
999 "sessions": sessions,
1000 "messages": messages,
1001 "searchable_messages": embedding.total,
1002 "parts": parts,
1003 },
1004 "embeddings": {
1005 "model": embedding.model,
1006 "embedded": embedding.embedded,
1007 "searchable_total": embedding.total,
1008 "embedded_percent": embedded_percent,
1009 "stale_under_other_model": stale,
1010 },
1011 "indices": index_rows,
1012 });
1013 Ok(ReadResourceResult::new(vec![ResourceContents::text(
1014 stats.to_string(),
1015 request.uri,
1016 )]))
1017 }
1018 other => Err(ErrorData::resource_not_found(
1019 format!("unknown resource: {other}"),
1020 None,
1021 )),
1022 }
1023 }
1024
1025 async fn list_tools(
1026 &self,
1027 request: Option<PaginatedRequestParams>,
1028 context: RequestContext<RoleServer>,
1029 ) -> Result<ListToolsResult, ErrorData> {
1030 let _ = (request, context);
1031 let mut result = ListToolsResult {
1032 tools: self.tool_router.list_all(),
1033 next_cursor: None,
1034 meta: None,
1035 };
1036 annotate_tool_limits(&mut result);
1037 Ok(result)
1038 }
1039 }
1040
1041 fn annotate_tool_limits(result: &mut ListToolsResult) {
1042 for tool in &mut result.tools {
1043 let chars = match tool.name.as_ref() {
1044 "pond_search" => 80_000,
1045 "pond_get" => 200_000,
1046 "pond_sql_query" => 80_000,
1047 _ => continue,
1048 };
1049 let mut meta = serde_json::Map::new();
1050 meta.insert(
1051 "anthropic/maxResultSizeChars".to_owned(),
1052 serde_json::json!(chars),
1053 );
1054 tool.meta = Some(Meta(meta));
1055 }
1056 }
1057
1058 pub async fn serve_stdio(state: AppState) -> anyhow::Result<()> {
1062 let service = PondMcp::new(state)
1063 .serve(stdio())
1064 .await
1065 .context("failed to start stdio MCP server")?;
1066 service.waiting().await.context("stdio MCP server error")?;
1067 Ok(())
1068 }
1069
1070 fn tool_result(transcript: String) -> CallToolResult {
1076 CallToolResult::success(vec![Content::text(transcript)])
1077 }
1078
1079 fn export_result(
1086 store: &crate::sessions::Store,
1087 name: &str,
1088 format: sql::Format,
1089 rows: usize,
1090 columns: &[String],
1091 bytes: usize,
1092 ) -> CallToolResult {
1093 let uri = format!("pond-sql-export://{name}");
1094 let column_list = if columns.is_empty() {
1095 "(none)".to_owned()
1096 } else {
1097 columns.join(", ")
1098 };
1099 let mut summary = format!(
1100 "Exported {rows} row(s), {bytes} bytes ({}). Columns: {column_list}.\n\
1101 Fetch via MCP resources/read on {uri}.",
1102 format.ext()
1103 );
1104 if let Some(path) = store.export_local_path(name) {
1105 summary.push_str(&format!(
1106 "\nLocal file: {} - on this (stdio) install you can read it directly \
1107 (e.g. duckdb, polars).",
1108 path.display()
1109 ));
1110 }
1111 let link = RawResource::new(uri, name.to_owned())
1112 .with_description(format!("pond SQL export ({}, {rows} rows)", format.ext()))
1113 .with_mime_type(format.mime().to_owned())
1114 .with_size(u32::try_from(bytes).unwrap_or(u32::MAX));
1115 CallToolResult::success(vec![Content::text(summary), Content::resource_link(link)])
1116 }
1117
1118 fn valid_export_name(name: &str) -> bool {
1121 let Some((stem, ext)) = name.rsplit_once('.') else {
1122 return false;
1123 };
1124 matches!(ext, "parquet" | "ndjson")
1125 && !stem.is_empty()
1126 && stem.bytes().all(|b| b.is_ascii_hexdigit() || b == b'-')
1127 }
1128
1129 fn render_subagents_footer(children: &[crate::wire::Session]) -> String {
1134 use std::fmt::Write;
1135 let mut out = String::new();
1136 let _ = writeln!(out);
1137 let _ = writeln!(
1138 out,
1139 "subagents ({}) - pass an id to pond_get(session_id=...):",
1140 children.len()
1141 );
1142 for child in children {
1143 let _ = writeln!(out, " {} | {}", child.id, child.source_agent);
1144 }
1145 out
1146 }
1147
1148 fn fmt_ts(ts: &chrono::DateTime<chrono::Utc>) -> String {
1150 ts.format("%Y-%m-%d %H:%M:%SZ").to_string()
1151 }
1152
1153 fn opt_name(value: &Option<crate::adapter::extract::Extracted<String>>) -> &str {
1156 value.as_deref().map(String::as_str).unwrap_or("?")
1157 }
1158
1159 fn push_lines(out: &mut String, body: &str, indent: &str) {
1163 use std::fmt::Write;
1164 for line in body.lines() {
1165 let _ = writeln!(out, "{indent}{line}");
1166 }
1167 }
1168
1169 fn render_search_transcript(response: &SearchResponse, request: &SearchRequest) -> String {
1170 use std::fmt::Write;
1171 let subagent_note = if !request.filters.include_subagents
1173 && request.filters.session_id.is_none()
1174 && request.filters.source_agent.is_none()
1175 {
1176 " Subagent sessions excluded; pass include_subagents=true to include them."
1177 } else {
1178 ""
1179 };
1180 if response.sessions.is_empty() {
1181 if response.searchable_in_scope == 0 {
1185 return format!(
1186 "pond_search: 0 searchable messages in scope - the filters exclude \
1187 everything before retrieval. Widen or drop project/date filters.\
1188 {subagent_note}\n"
1189 );
1190 }
1191 let fts_hint = " For exact strings or identifiers, try pond_sql_query: SELECT \
1192 message_id, session_id, search_text FROM messages WHERE \
1193 contains_tokens(search_text, '...').";
1194 return format!(
1195 "pond_search: no matches for {:?} across {} searchable messages in \
1196 scope.{subagent_note}{fts_hint}\n",
1197 request.query, response.searchable_in_scope
1198 );
1199 }
1200 let shown: usize = response.sessions.iter().map(|s| s.matches.len()).sum();
1201 let mut out = String::new();
1202 let _ = writeln!(
1203 out,
1204 "pond_search: {} matching messages ({} searchable in scope), showing {} hits from {} \
1205 sessions.{}",
1206 response.matched_total,
1207 response.searchable_in_scope,
1208 shown,
1209 response.sessions.len(),
1210 subagent_note,
1211 );
1212 let _ = writeln!(
1213 out,
1214 "key: session rules group hits by session, ordered by best hit; \"--- [n] score | role | time | message_id | project | agent | session ---\" delimits each hit + matched text. pond_get <message_id> for full; raise limit for more (no pagination)."
1215 );
1216 let mut index = 0;
1217 for (session_index, session) in response.sessions.iter().enumerate() {
1218 let best = session
1219 .matches
1220 .first()
1221 .map(|hit| hit.score)
1222 .unwrap_or_default();
1223 let _ = writeln!(out);
1224 let _ = writeln!(
1225 out,
1226 "{}",
1227 rule_line(&format!(
1228 "session [{}] best {:.2} | {}/{} matched | {} | {} | {}",
1229 session_index + 1,
1230 best,
1231 session.matched_message_count,
1232 session.session_messages_count,
1233 session.project,
1234 session.source_agent,
1235 session.session_id,
1236 )),
1237 );
1238 for hit in &session.matches {
1239 index += 1;
1240 let _ = writeln!(out);
1241 let _ = writeln!(
1242 out,
1243 "{}",
1244 rule_line(&format!(
1245 "[{index}] {:.2} | {} | {} | {} | {} | {} | {}",
1246 hit.score,
1247 hit.role.as_str(),
1248 fmt_ts(&hit.timestamp),
1249 hit.message_id,
1250 session.project,
1251 session.source_agent,
1252 session.session_id,
1253 )),
1254 );
1255 push_lines(&mut out, &hit.text, "");
1256 }
1257 }
1258 out
1259 }
1260
1261 fn render_get_transcript(response: &GetResponse, request: &GetRequest) -> String {
1262 use std::fmt::Write;
1263 let session = &response.session;
1264 let mut out = String::new();
1265 match &response.result {
1266 GetResult::Session {
1267 messages,
1268 messages_remaining,
1269 } => {
1270 let mode = match request.response_mode {
1271 ResponseMode::Conversational => "conversational",
1272 ResponseMode::Complete => "complete",
1273 ResponseMode::Verbatim => "verbatim",
1274 };
1275 let more = if *messages_remaining > 0 {
1276 " (more)"
1277 } else {
1278 ""
1279 };
1280 let _ = writeln!(
1281 out,
1282 "pond_get: session {} ({mode}), {} messages{more}.",
1283 session.id,
1284 messages.len(),
1285 );
1286 let _ = writeln!(
1287 out,
1288 "key: \"--- [n] role | time | message_id ---\" delimits each message; \"->\" tool call, \"<-\" result. Pass after_id=<id> to page."
1289 );
1290 for (idx, message) in messages.iter().enumerate() {
1291 let _ = writeln!(out);
1292 render_message(
1293 &mut out,
1294 idx + 1,
1295 message,
1296 message.parts.as_deref(),
1297 &message.parts_summary,
1298 false,
1299 );
1300 }
1301 let _ = writeln!(out);
1302 let _ = writeln!(
1303 out,
1304 "session {} | {} | {}",
1305 session.id, session.source_agent, session.project,
1306 );
1307 if *messages_remaining > 0
1308 && let Some(last) = messages.last()
1309 {
1310 match request.session_from {
1311 SessionFrom::Start => {
1312 let _ = writeln!(
1313 out,
1314 "... {} more messages; pass after_id={} to pond_get to continue",
1315 messages_remaining, last.id,
1316 );
1317 }
1318 SessionFrom::End => {
1322 let _ = writeln!(
1323 out,
1324 "... {messages_remaining} earlier messages precede this tail; call pond_get with session_from=\"start\" to read from the beginning",
1325 );
1326 }
1327 }
1328 }
1329 }
1330 GetResult::Message {
1331 target,
1332 target_parts,
1333 target_parts_remaining,
1334 siblings,
1335 } => {
1336 let _ = writeln!(
1337 out,
1338 "pond_get: thread around {} in session {} (context +/-{}).",
1339 target.id, session.id, request.context_depth,
1340 );
1341 let _ = writeln!(
1342 out,
1343 "key: \"--- [n] role | time | message_id ---\" delimits each message; \">\" = the one you requested; \"->\" tool call, \"<-\" result. pond_get <message_id> to expand any line."
1344 );
1345 let mut thread: Vec<(&MessageView, bool)> =
1352 siblings.iter().map(|view| (view, false)).collect();
1353 thread.push((target, true));
1354 thread.sort_by(|a, b| {
1355 a.0.timestamp
1356 .cmp(&b.0.timestamp)
1357 .then_with(|| a.0.id.cmp(&b.0.id))
1358 });
1359 thread.retain(|(view, is_target)| *is_target || message_has_content(view));
1360 for (idx, (view, is_target)) in thread.iter().enumerate() {
1361 let _ = writeln!(out);
1362 let parts: Option<&[ResponsePart]> = if *is_target {
1363 Some(target_parts.as_slice())
1364 } else {
1365 view.parts.as_deref()
1366 };
1367 render_message(
1368 &mut out,
1369 idx + 1,
1370 view,
1371 parts,
1372 &view.parts_summary,
1373 *is_target,
1374 );
1375 }
1376 let _ = writeln!(out);
1377 let _ = writeln!(
1378 out,
1379 "session {} | {} | {}",
1380 session.id, session.source_agent, session.project,
1381 );
1382 if *target_parts_remaining > 0
1383 && let Some(last) = target_parts.last()
1384 {
1385 let _ = writeln!(
1386 out,
1387 "... {} more parts of {}; pass after_id={} to pond_get to continue",
1388 target_parts_remaining, target.id, last.id,
1389 );
1390 }
1391 }
1392 }
1393 out
1394 }
1395
1396 fn message_has_content(view: &MessageView) -> bool {
1400 view.text.as_deref().is_some_and(|t| !t.trim().is_empty())
1401 || view
1402 .content
1403 .as_deref()
1404 .is_some_and(|c| !c.trim().is_empty())
1405 || view.parts.as_deref().is_some_and(|p| !p.is_empty())
1406 || !view.parts_summary.is_empty()
1407 }
1408
1409 const RULE_WIDTH: usize = 72;
1411
1412 fn rule_line(inner: &str) -> String {
1416 let head = format!("--- {inner} ");
1417 let pad = RULE_WIDTH.saturating_sub(head.chars().count()).max(3);
1418 format!("{head}{}", "-".repeat(pad))
1419 }
1420
1421 fn render_message(
1426 out: &mut String,
1427 index: usize,
1428 view: &MessageView,
1429 parts: Option<&[ResponsePart]>,
1430 summary: &[PartSummary],
1431 is_target: bool,
1432 ) {
1433 use std::fmt::Write;
1434 let marker = if is_target { "> " } else { "" };
1435 let _ = writeln!(
1436 out,
1437 "{}",
1438 rule_line(&format!(
1439 "[{index}] {marker}{} | {} | {}",
1440 view.role.as_str(),
1441 fmt_ts(&view.timestamp),
1442 view.id,
1443 )),
1444 );
1445 if let Some(text) = &view.text {
1446 push_lines(out, text, "");
1447 }
1448 if let Some(content) = &view.content {
1449 push_lines(out, content, "");
1450 }
1451 match parts {
1452 Some(parts) => {
1453 for part in parts {
1454 render_part_full(out, part);
1455 }
1456 }
1457 None => {
1458 for part in summary {
1459 render_part_summary(out, part);
1460 }
1461 }
1462 }
1463 }
1464
1465 fn render_part_full(out: &mut String, part: &ResponsePart) {
1466 use std::fmt::Write;
1467 match &part.kind {
1468 PartKind::Text { text } => {
1469 if let Some(text) = text {
1470 push_lines(out, text, "");
1471 }
1472 }
1473 PartKind::Reasoning { text } => {
1474 let _ = writeln!(out, " (reasoning)");
1475 if let Some(text) = text {
1476 push_lines(out, text, " ");
1477 }
1478 }
1479 PartKind::ToolCall {
1480 name,
1481 call_id,
1482 params,
1483 ..
1484 } => {
1485 let _ = writeln!(out, " -> {} [{}]", opt_name(name), opt_name(call_id));
1486 push_lines(out, &value_to_text(params), " ");
1487 }
1488 PartKind::ToolResult {
1489 name,
1490 call_id,
1491 is_failure,
1492 result,
1493 } => {
1494 let status = if *is_failure { "failed" } else { "ok" };
1495 let _ = writeln!(
1496 out,
1497 " <- {} [{}] ({status})",
1498 opt_name(name),
1499 opt_name(call_id),
1500 );
1501 push_lines(out, &value_to_text(result), " ");
1502 }
1503 PartKind::File {
1504 media_type,
1505 file_name,
1506 ..
1507 } => {
1508 let label = file_name
1509 .as_deref()
1510 .or(media_type.as_deref())
1511 .unwrap_or("file");
1512 let _ = writeln!(out, " [file {label}]");
1513 }
1514 PartKind::ToolApprovalRequest { approval_id, .. } => {
1515 let _ = writeln!(out, " [approval request {approval_id}]");
1516 }
1517 PartKind::ToolApprovalResponse {
1518 approval_id,
1519 approved,
1520 ..
1521 } => {
1522 let verb = if *approved { "approved" } else { "denied" };
1523 let _ = writeln!(out, " [approval {approval_id} {verb}]");
1524 }
1525 }
1526 }
1527
1528 fn render_part_summary(out: &mut String, summary: &PartSummary) {
1529 use std::fmt::Write;
1530 let label = summary.label.as_deref().unwrap_or("");
1531 let call = summary
1532 .call_id
1533 .as_deref()
1534 .map(|id| format!(" [{id}]"))
1535 .unwrap_or_default();
1536 match summary.kind.as_str() {
1537 "tool_call" => {
1538 let _ = writeln!(out, " -> {label}{call}");
1539 }
1540 "tool_result" => {
1541 let _ = writeln!(out, " <- {label}{call}");
1542 }
1543 "file" => {
1544 let _ = writeln!(out, " [file {label}]");
1545 }
1546 other => {
1547 let _ = writeln!(out, " [{other} {label}]");
1548 }
1549 }
1550 }
1551
1552 fn value_to_text(value: &serde_json::Value) -> String {
1555 match value {
1556 serde_json::Value::String(text) => text.clone(),
1557 serde_json::Value::Null => String::new(),
1558 other => serde_json::to_string(other).unwrap_or_default(),
1559 }
1560 }
1561
1562 fn to_error_data(envelope: &ErrorEnvelope) -> ErrorData {
1568 let (jsonrpc_code, pond_code, retryable) = match envelope.error.code {
1569 WireErrorCode::ValidationFailed => (-32010, "validation_failed", false),
1570 WireErrorCode::VersionUnsupported => (-32011, "version_unsupported", false),
1571 WireErrorCode::NotFound => (-32012, "not_found", false),
1572 WireErrorCode::NamespaceUnknown => (-32013, "namespace_unknown", false),
1573 WireErrorCode::StorageUnavailable => (-32014, "storage_unavailable", true),
1574 WireErrorCode::Conflict => (-32015, "conflict", true),
1575 WireErrorCode::Internal => (-32016, "internal", false),
1576 };
1577 let mut data = match &envelope.error.details {
1578 serde_json::Value::Object(map) => map.clone(),
1579 _ => serde_json::Map::new(),
1580 };
1581 data.insert("pond_code".to_owned(), serde_json::json!(pond_code));
1582 data.insert("retryable".to_owned(), serde_json::json!(retryable));
1583 ErrorData::new(
1584 JsonRpcErrorCode(jsonrpc_code),
1585 envelope.error.message.clone(),
1586 Some(serde_json::Value::Object(data)),
1587 )
1588 }
1589
1590 #[cfg(test)]
1591 mod tests {
1592 #![allow(clippy::expect_used, clippy::unwrap_used)]
1593
1594 use std::sync::Arc;
1595
1596 use rmcp::model::{ErrorCode as JsonRpcErrorCode, Tool};
1597
1598 use super::*;
1599 use crate::wire::{ErrorBody, ErrorCode, Role, SearchResponse, SearchResult};
1600
1601 #[test]
1602 fn error_data_carries_code_and_retryability() {
1603 let cases = [
1604 (
1605 ErrorCode::ValidationFailed,
1606 -32010,
1607 "validation_failed",
1608 false,
1609 ),
1610 (
1611 ErrorCode::VersionUnsupported,
1612 -32011,
1613 "version_unsupported",
1614 false,
1615 ),
1616 (ErrorCode::NotFound, -32012, "not_found", false),
1617 (
1618 ErrorCode::NamespaceUnknown,
1619 -32013,
1620 "namespace_unknown",
1621 false,
1622 ),
1623 (
1624 ErrorCode::StorageUnavailable,
1625 -32014,
1626 "storage_unavailable",
1627 true,
1628 ),
1629 (ErrorCode::Conflict, -32015, "conflict", true),
1630 (ErrorCode::Internal, -32016, "internal", false),
1631 ];
1632 for (code, jsonrpc, pond_code, retryable) in cases {
1633 let error = to_error_data(&ErrorEnvelope {
1634 error: ErrorBody {
1635 code,
1636 message: "boom".to_owned(),
1637 details: serde_json::json!({"detail": 1}),
1638 },
1639 });
1640 assert_eq!(error.code, JsonRpcErrorCode(jsonrpc));
1641 let data = error.data.unwrap();
1642 assert_eq!(data["detail"], serde_json::json!(1));
1643 assert_eq!(data["pond_code"], serde_json::json!(pond_code));
1644 assert_eq!(data["retryable"], serde_json::json!(retryable));
1645 assert!(
1646 data.get("request_id").is_none(),
1647 "MCP errors use JSON-RPC ids for correlation"
1648 );
1649 }
1650 }
1651
1652 #[test]
1653 fn annotate_tool_limits_sets_anthropic_meta() {
1654 let schema = Arc::new(serde_json::Map::new());
1655 let mut result = ListToolsResult {
1656 tools: vec![
1657 Tool::new("pond_search", "Search", Arc::clone(&schema)),
1658 Tool::new("pond_get", "Get", Arc::clone(&schema)),
1659 ],
1660 next_cursor: None,
1661 meta: None,
1662 };
1663 annotate_tool_limits(&mut result);
1664 let value = |name: &str| {
1665 result
1666 .tools
1667 .iter()
1668 .find(|tool| tool.name == name)
1669 .and_then(|tool| tool.meta.as_ref())
1670 .and_then(|meta| meta.0.get("anthropic/maxResultSizeChars"))
1671 .and_then(serde_json::Value::as_i64)
1672 };
1673 assert_eq!(value("pond_search"), Some(80_000));
1674 assert_eq!(value("pond_get"), Some(200_000));
1675 }
1676
1677 #[test]
1678 fn get_transcript_marks_target_and_renders_tool_parts() {
1679 let ts = chrono::DateTime::from_timestamp(0, 0).unwrap();
1680 let tool_call: ResponsePart = serde_json::from_value(serde_json::json!({
1681 "id": "p1", "ordinal": 0, "provenance": "conversational",
1682 "type": "tool_call", "name": "Bash", "call_id": "toolu_x",
1683 "params": { "command": "ls" }, "provider_executed": false,
1684 }))
1685 .unwrap();
1686 let tool_result: ResponsePart = serde_json::from_value(serde_json::json!({
1687 "id": "p2", "ordinal": 1, "provenance": "conversational",
1688 "type": "tool_result", "name": "Bash", "call_id": "toolu_x",
1689 "is_failure": false, "result": "file.txt",
1690 }))
1691 .unwrap();
1692 let target = MessageView {
1693 id: "m1".to_owned(),
1694 role: crate::wire::Role::Assistant,
1695 timestamp: ts,
1696 text: Some("Let me list files.".to_owned()),
1697 content: None,
1698 parts_summary: Vec::new(),
1699 parts: None,
1700 };
1701 let response = GetResponse {
1702 session: crate::wire::GetSession {
1703 id: "s1".to_owned(),
1704 source_agent: "claude-code".to_owned(),
1705 project: "/p".to_owned(),
1706 created_at: ts,
1707 },
1708 result: GetResult::Message {
1709 target,
1710 target_parts: vec![tool_call, tool_result],
1711 target_parts_remaining: 0,
1712 siblings: Vec::new(),
1713 },
1714 };
1715 let request = GetRequest {
1716 protocol_version: crate::PROTOCOL_VERSION,
1717 namespace: None,
1718 session_id: None,
1719 message_id: Some("m1".to_owned()),
1720 context_depth: 0,
1721 limit: 20,
1722 response_mode: ResponseMode::default(),
1723 session_from: SessionFrom::default(),
1724 after_id: None,
1725 };
1726
1727 let transcript = render_get_transcript(&response, &request);
1728 assert!(transcript.contains("--- [1] > assistant | 1970-01-01 00:00:00Z | m1 ---"));
1729 assert!(transcript.contains("Let me list files."));
1730 assert!(transcript.contains(" -> Bash [toolu_x]"));
1731 assert!(transcript.contains(" <- Bash [toolu_x] (ok)"));
1732 assert!(transcript.contains("session s1 | claude-code | /p"));
1733 }
1734
1735 #[test]
1736 fn search_transcript_renders_header_and_hits() {
1737 let response = SearchResponse {
1738 sessions: vec![crate::wire::SearchSession {
1739 session_id: "s1".to_owned(),
1740 project: "pond".to_owned(),
1741 source_agent: "claude-code".to_owned(),
1742 session_messages_count: 2,
1743 matched_message_count: 1,
1744 matches: vec![SearchResult {
1745 message_id: "m1".to_owned(),
1746 role: Role::User,
1747 timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
1748 text: "hello\nworld".to_owned(),
1749 score: 1.0,
1750 parts_summary: Vec::new(),
1751 }],
1752 }],
1753 matched_total: 1,
1754 searchable_in_scope: 2,
1755 has_more: false,
1756 };
1757 let request = SearchRequest {
1758 protocol_version: crate::PROTOCOL_VERSION,
1759 namespace: None,
1760 query: "hi".to_owned(),
1761 mode_override: None,
1762 filters: SearchFilters::default(),
1763 limit: 10,
1764 };
1765
1766 let transcript = render_search_transcript(&response, &request);
1767 assert!(transcript.starts_with(
1768 "pond_search: 1 matching messages (2 searchable in scope), showing 1 hits from 1 \
1769 sessions."
1770 ));
1771 assert!(
1772 transcript
1773 .contains("key: session rules group hits by session, ordered by best hit")
1774 );
1775 assert!(
1776 transcript
1777 .contains("--- session [1] best 1.00 | 1/2 matched | pond | claude-code | s1")
1778 );
1779 assert!(transcript.contains(
1782 "--- [1] 1.00 | user | 1970-01-01 00:00:00Z | m1 | pond | claude-code | s1"
1783 ));
1784 assert!(transcript.contains("hello\nworld"));
1786
1787 let result = tool_result(transcript);
1790 assert!(result.content[0].raw.as_text().is_some());
1791 assert!(result.structured_content.is_none());
1792 }
1793 }
1794}