Skip to main content

pond/
transport.rs

1//! The HTTP+JSON and stdio-MCP transports: thin adapters over the shared wire
2//! handlers. Both transports dispatch to the same handler functions - no
3//! per-transport behavior divergence.
4//!
5//! HTTP exposes `POST /v1/search`, `POST /v1/get`, and `POST /v1/ingest`. MCP
6//! exposes `pond_search` / `pond_get` (the kb-parity surface) plus
7//! `pond_sql_query` (read-only SQL); ingest stays HTTP-only and CLI-only.
8
9use std::sync::Arc;
10
11use crate::{config::SearchConfig, embed::LazyEmbedder, sessions::Store};
12
13/// Shared state handed to both transports. `embedder` holds a lazy handle:
14/// the model isn't loaded until the first hybrid search asks for it, so
15/// `pond mcp` idles at ~50 MB resident and only pays the ~600 MB load cost on
16/// the first query that needs it (spec.md#search opt-in).
17#[derive(Clone)]
18pub struct AppState {
19    pub store: Arc<Store>,
20    pub embedder: Arc<LazyEmbedder>,
21    pub search: SearchConfig,
22}
23
24pub mod http {
25    //! axum HTTP+JSON server: `POST /v1/search`, `POST /v1/get`, and the `/mcp`
26    //! route carrying rmcp's streamable-HTTP MCP transport.
27
28    use std::net::{IpAddr, SocketAddr};
29
30    use anyhow::Context;
31    use axum::{
32        Json, Router,
33        extract::{DefaultBodyLimit, State},
34        http::{HeaderValue, StatusCode},
35        response::{IntoResponse, Response},
36        routing::post,
37    };
38    use rmcp::transport::streamable_http_server::{
39        StreamableHttpServerConfig, StreamableHttpService, session::local::LocalSessionManager,
40    };
41    use tokio::net::TcpListener;
42
43    use super::AppState;
44    use crate::{
45        handlers::{pond_get, pond_ingest, pond_search},
46        wire::{
47            ErrorCode, GetEnvelope, GetRequest, IngestEnvelope, IngestRequest, SearchEnvelope,
48            SearchRequest, default_namespace, new_request_id,
49        },
50    };
51
52    /// HTTP body cap for `POST /v1/*` JSON handlers (spec.md#protocol): 8 MB.
53    /// Replaces axum's 2 MB default - that default is more restrictive than the
54    /// design's intent and would surface oversize ingests as a generic 413
55    /// instead of pond's typed `validation_failed`.
56    pub const HTTP_BODY_LIMIT_BYTES: usize = 8 * 1024 * 1024;
57
58    /// Build the axum router: the `/v1/*` JSON handlers plus the nested `/mcp`
59    /// streamable-HTTP MCP service. Public so the integration test can drive it
60    /// without binding a socket.
61    pub fn router(state: AppState) -> Router {
62        let mcp_state = state.clone();
63        let mcp = StreamableHttpService::new(
64            move || Ok(super::mcp::PondMcp::new(mcp_state.clone())),
65            LocalSessionManager::default().into(),
66            StreamableHttpServerConfig::default(),
67        );
68        Router::new()
69            .route("/v1/search", post(search))
70            .route("/v1/get", post(get))
71            .route("/v1/ingest", post(ingest))
72            .layer(DefaultBodyLimit::max(HTTP_BODY_LIMIT_BYTES))
73            .with_state(state)
74            .nest_service("/mcp", mcp)
75    }
76
77    /// Bind and serve until ctrl-c. `--port 0` selects an OS-assigned free port;
78    /// an unspecified host (`0.0.0.0` / `::`) logs a security notice because the
79    /// personal pond is single-user and LAN exposure is opt-in (spec.md#scope).
80    pub async fn serve(state: AppState, host: String, port: u16) -> anyhow::Result<()> {
81        let ip: IpAddr = host
82            .parse()
83            .with_context(|| format!("invalid --host {host:?}"))?;
84        if ip.is_unspecified() {
85            tracing::warn!(
86                %host,
87                "binding to an unspecified address exposes pond on the LAN; \
88                 the personal pond is single-user"
89            );
90        }
91        let listener = TcpListener::bind(SocketAddr::new(ip, port))
92            .await
93            .with_context(|| format!("failed to bind {host}:{port}"))?;
94        let local = listener
95            .local_addr()
96            .context("failed to read bound address")?;
97        tracing::info!(%local, "pond serve listening (HTTP /v1/*, MCP /mcp)");
98        axum::serve(listener, router(state))
99            .with_graceful_shutdown(shutdown_signal())
100            .await
101            .context("axum server error")
102    }
103
104    async fn shutdown_signal() {
105        let _ = tokio::signal::ctrl_c().await;
106    }
107
108    async fn search(
109        State(state): State<AppState>,
110        Json(mut request): Json<SearchRequest>,
111    ) -> Response {
112        request.namespace.get_or_insert_with(default_namespace);
113        let envelope = pond_search(&state.store, &state.embedder, request, &state.search).await;
114        let status = match &envelope {
115            SearchEnvelope::Success(_) => StatusCode::OK,
116            SearchEnvelope::Error(error) => status_for(&error.error.code),
117        };
118        with_request_id((status, Json(envelope)).into_response())
119    }
120
121    async fn get(State(state): State<AppState>, Json(mut request): Json<GetRequest>) -> Response {
122        request.namespace.get_or_insert_with(default_namespace);
123        let envelope = pond_get(&state.store, request).await;
124        let status = match &envelope {
125            GetEnvelope::Success(_) => StatusCode::OK,
126            GetEnvelope::Error(error) => status_for(&error.error.code),
127        };
128        with_request_id((status, Json(envelope)).into_response())
129    }
130
131    async fn ingest(
132        State(state): State<AppState>,
133        Json(mut request): Json<IngestRequest>,
134    ) -> Response {
135        request.namespace.get_or_insert_with(default_namespace);
136        let envelope = pond_ingest(&state.store, request).await;
137        // Per-row errors in `results[]` are not request-level failures, so
138        // the envelope success path always returns 200; only transport-level
139        // failures (validation_failed, namespace_unknown, etc.) map to 4xx/5xx.
140        let status = match &envelope {
141            IngestEnvelope::Success(_) => StatusCode::OK,
142            IngestEnvelope::Error(error) => status_for(&error.error.code),
143        };
144        with_request_id((status, Json(envelope)).into_response())
145    }
146
147    fn with_request_id(mut response: Response) -> Response {
148        if let Ok(value) = HeaderValue::from_str(&new_request_id()) {
149            response.headers_mut().insert("x-pond-request-id", value);
150        }
151        response
152    }
153
154    /// Map a wire error code to an HTTP status. The envelope body still carries
155    /// the full typed error; the status is the coarse signal.
156    fn status_for(code: &ErrorCode) -> StatusCode {
157        match code {
158            ErrorCode::ValidationFailed
159            | ErrorCode::VersionUnsupported
160            | ErrorCode::NamespaceUnknown => StatusCode::BAD_REQUEST,
161            ErrorCode::NotFound => StatusCode::NOT_FOUND,
162            ErrorCode::Conflict => StatusCode::CONFLICT,
163            ErrorCode::StorageUnavailable => StatusCode::SERVICE_UNAVAILABLE,
164            ErrorCode::Internal => StatusCode::INTERNAL_SERVER_ERROR,
165        }
166    }
167}
168
169pub mod mcp {
170    //! The rmcp MCP layer: `pond_search` / `pond_get` / `pond_sql_query` tools
171    //! and `schema://pond` / `schema://pond-sql` / `stats://pond` (plus
172    //! `pond-sql-export://` export artifacts) resources, transport-agnostic.
173    //! Mounted on stdio (via `pond mcp`) and on the `/mcp` HTTP route (via
174    //! `pond serve`).
175
176    use anyhow::Context;
177    use base64::{Engine, engine::general_purpose::STANDARD};
178    use rmcp::{
179        ErrorData, RoleServer, ServerHandler, ServiceExt,
180        handler::server::{router::tool::ToolRouter, wrapper::Parameters},
181        model::{
182            AnnotateAble, CallToolResult, Content, ErrorCode as JsonRpcErrorCode, Implementation,
183            ListResourcesResult, ListToolsResult, Meta, PaginatedRequestParams, RawResource,
184            ReadResourceRequestParams, ReadResourceResult, ResourceContents, ServerCapabilities,
185            ServerInfo,
186        },
187        schemars,
188        service::RequestContext,
189        tool, tool_handler, tool_router,
190        transport::stdio,
191    };
192    use serde::Deserialize;
193    use uuid::Uuid;
194
195    use super::AppState;
196    use crate::{
197        PROTOCOL_VERSION,
198        handlers::pond_get as run_get,
199        handlers::pond_search as run_search,
200        sql,
201        substrate::Table,
202        wire::{
203            ErrorCode as WireErrorCode, ErrorEnvelope, GetEnvelope, GetRequest, GetResponse,
204            GetResult, MessageView, PartKind, PartSummary, ProjectFilter, ResponseMode,
205            ResponsePart, SearchEnvelope, SearchFilters, SearchRequest, SearchResponse,
206            SessionFrom, default_namespace,
207        },
208    };
209
210    /// Static documentation served as the `schema://pond` resource. Detail
211    /// agents load on demand; the per-tool descriptions below stay tight.
212    const SCHEMA_DOC: &str = "\
213pond_search filters: query (semantic - concepts, not project names), limit \
214(returned sessions; default 10, max 200 - also the want-more knob, there is \
215no pagination), project (path substring), session_id (exact session match - \
216semantic search within one session), source_agent, from_date / to_date \
217(YYYY-MM-DD), format (text|json).
218
219pond_search response: a transcript (or structured JSON when format=json). The \
220first line states totals (`matched_total` is the message count before `limit` \
221and byte-budget truncation), then results are grouped by session, ordered by \
222each session's best hit. Each session lists up to 3 top-scoring hits, \
223score-desc; each hit is a `--- [n] score | role | time | message_id | project \
224| agent | session ---` rule followed by its matched text (a ~600-char indexed \
225window). `score` is normalized to [0.0, 1.0] within one response. `has_more` \
226warns the ranked set was cut by `limit` or the byte budget - raise `limit` to \
227see the rest.
228
229pond_search multilingual: pond's embedder (multilingual-e5-small) is trained \
230for cross-lingual retrieval, so a query in language A can match indexed text \
231in language B via the vector arm. The FTS arm is character-ngram-based and \
232only matches surface tokens, so for cross-lingual queries expect most signal \
233to come from the vector arm.
234
235pond_get: message_id (the target message, marked `>`, plus context_depth \
236sibling messages each side) OR session_id (the whole session). Output is a \
237transcript - each message is a `--- [n] role | time | message_id ---` rule, \
238then its text/content as real lines, then parts (`-> name [call_id]` tool \
239call, `<- name [call_id] (ok|failed)` result). Session mode takes \
240response_mode: \"conversational\" (default - human/model text only), \
241\"complete\" (all messages incl. carriers, tools as one-liners), or \
242\"verbatim\" (full part bodies inline; heaviest). limit defaults to 20, caps \
243at 1000. Bounded by a size budget: when the footer shows `after_id=`, pass it \
244back to page. A whole-session response also lists the session's subagents (each \
245stored as its own session) in a footer; pass a listed id back as session_id to \
246open it. Not for bulk export - use `pond export`.";
247
248    /// Static documentation served as the `schema://pond-sql` resource: the
249    /// table/column schema, dialect, function set, output modes, pagination
250    /// pattern, drilling pattern, and worked examples for `pond_sql_query`.
251    /// Loaded on demand so the tool description stays tight.
252    ///
253    /// TODO(#47): when the lance v8 FM-Index on parts.variant_data lands,
254    /// tool-body substring search becomes `contains(variant_data, 'needle')`;
255    /// update the routing guidance below (drop the "Never LIKE over parts ...
256    /// no substring index (yet)" framing) and the timeout message in
257    /// src/sql.rs.
258    const SQL_SCHEMA_DOC: &str = "\
259pond_sql_query runs ONE read-only SELECT (DataFusion SQL, PostgreSQL-compatible) \
260over three registered tables. Read-only is hard-enforced: anything other than a \
261single SELECT/WITH (or EXPLAIN of one) is rejected (no INSERT/UPDATE/DELETE/\
262CREATE/DROP/COPY/SET).
263
264Routing - pick the right surface before writing SQL:
265- counts, group-by, time buckets, joins over metadata -> this tool, on \
266messages/sessions.
267- which tools ran / failed, tool params -> this tool, on parts (type = \
268'tool_call' / 'tool_result'); worked example below.
269- find text in conversations -> WHERE contains_tokens(search_text, '...') to \
270filter, FROM fts('messages', ...) to rank (both below), or pond_search for \
271meaning-based recall. Never LIKE over parts - tool bodies are JSON with no \
272substring index (yet), and the conversational text is messages.search_text.
273- read a transcript (a session, a message with context) -> pond_get, not SQL.
274
275Tables and columns:
276- messages(session_id text, message_id text, timestamp timestamp(us, UTC), role \
277text {user|assistant|system|tool}, source_agent text, project text, content text \
278NULL [system-role messages only], search_text text NULL [the conversational text - \
279null for system/tool messages], embedding_model text NULL, options json). The \
280embedding `vector` column exists but is never returned (omitted from results) and \
281explicit projection of it is rejected; you may still filter on it in WHERE, e.g. \
282`vector IS NOT NULL`. For semantic search, use pond_search.
283- sessions(session_id text, parent_session_id text NULL, parent_message_id text \
284NULL, source_agent text, created_at timestamp(us, UTC), project text, options json).
285- parts(session_id text, message_id text, id text, ordinal int, type text \
286{text|reasoning|file|tool_call|tool_result|tool_approval_request|\
287tool_approval_response - exact strings, underscores not hyphens}, provenance \
288text {conversational|injected}, variant_data json, options json). The verbatim \
289part body lives in `variant_data`; its fields follow the part type, e.g. \
290tool_call carries {call_id, name, params}, tool_result carries {call_id, name, \
291is_failure, result}, text/reasoning carry {text}. FilePart binary payloads are \
292not exposed in SQL.
293Enum literals matter: a wrong value (e.g. 'tool-call') is valid SQL and silently \
294returns zero rows. Discovery from SQL works too: SELECT table_name, column_name, \
295data_type FROM information_schema.columns.
296
297Join keys: messages.session_id = sessions.session_id; parts.session_id = \
298messages.session_id AND parts.message_id = messages.message_id. Subagents are \
299sessions whose source_agent matches '%/%' (e.g. 'claude-code/general-purpose').
300
301Indexed (fast) filter columns: messages.project / session_id / timestamp / role / \
302source_agent / message_id; parts.session_id / message_id; sessions.session_id. \
303Prefer equality/range predicates on these. Known limitation: prefix LIKE ('x%') and starts_with() FAIL \
304on bitmap-indexed columns (messages.source_agent, messages.role) with \"LIKE \
305prefix queries are not supported for bitmap indexes\". Workarounds: equality, \
306split_part(source_agent, '/', 1) = 'claude-code', or an infix pattern \
307(LIKE '%/%' is fine - leading-wildcard patterns are not pushed to the index).
308
309JSON columns (options, variant_data) are binary JSONB. Rules:
310- NEVER CAST a JSON column (`variant_data::text` is rejected at plan time - the \
311binary encoding can otherwise silently render as garbage). Stringify with \
312json_extract(col, '$').
313- A leading-wildcard LIKE over the whole document \
314(`json_extract(variant_data, '$') LIKE '%...%'`) is rejected at plan time: it \
315stringifies and scans every row and never finishes over parts. Match a single \
316field (`json_extract(variant_data, '$.field') LIKE '...'`), scope to one session, \
317or use contains_tokens for conversational text. (Substring search over tool \
318bodies arrives with the FM-Index, #47.)
319- json_extract(col, '$.a.b') takes a full JSONPath and returns JSON text of ANY \
320value (objects/arrays serialize) - the right call for deeply nested or mixed-type \
321fields, e.g. json_extract(variant_data, '$.params.command').
322- json_get_string|json_get_int|json_get_float|json_get_bool(col, 'key', ...) walk \
323a key path - json_get_string(options, 'anthropic', 'model') - array steps by \
324numeric index. json_get_string serializes non-string values; the typed getters \
325return NULL on a non-coercible value.
326- json_get(col, 'key') returns JSONB for chaining: \
327json_get_string(json_get(variant_data, 'params'), 'command').
328- Also: json_array_contains(col, 'key', value), json_array_length(col, 'key').
329
330Worked example - tool usage and failure rates over the last week:
331
332  SELECT json_get_string(c.variant_data, 'name') AS tool,
333         COUNT(*) AS calls,
334         SUM(CASE WHEN json_get_bool(r.variant_data, 'is_failure') THEN 1 \
335ELSE 0 END) AS failures
336  FROM parts c
337  JOIN messages m ON m.session_id = c.session_id AND m.message_id = c.message_id
338  LEFT JOIN parts r ON r.session_id = c.session_id
339   AND r.type = 'tool_result'
340   AND json_get_string(r.variant_data, 'call_id') = \
341json_get_string(c.variant_data, 'call_id')
342  WHERE c.type = 'tool_call' AND m.timestamp >= now() - INTERVAL '7 days'
343  GROUP BY tool ORDER BY calls DESC;
344
345Full-text search in SQL is a pair - filter form and ranked form:
346- Filtering (WHERE): contains_tokens(search_text, 'word1 word2') - true when the \
347text contains ALL the words (split on punctuation/whitespace, case-sensitive \
348tokens); accelerated by the FTS index. The right tool for exact strings, \
349identifiers, and error messages - compose freely with other predicates: \
350SELECT message_id FROM messages WHERE contains_tokens(search_text, 'OCC retry') \
351AND project LIKE '%pond%'.
352- Ranking (FROM): the fts() table function returns matches plus `_score` (BM25 \
353relevance, a regular projectable column): SELECT message_id, _score, search_text \
354FROM fts('messages', '{\"match\":{\"column\":\"search_text\",\"terms\":\"...\"}}') \
355ORDER BY _score DESC - compose with WHERE/JOIN/GROUP BY around it. AND semantics: \
356add \"operator\":\"And\" to the match; \"boolean\" queries (must/should/must_not \
357over match clauses) also work. \"phrase\" queries are unavailable (index built \
358without positions) - use contains_tokens or match + operator And, optionally with \
359LIKE post-filters, for exact substrings.
360fts() in WHERE is a plan-time error that points back here. Unlike pond_search, \
361both forms cover subagent sessions (filter them out with WHERE NOT (source_agent \
362LIKE '%/%') if unwanted). Vector/semantic search is NOT available in SQL; use \
363pond_search for that.
364
365Function quick-reference (exact DataFusion names so the model doesn't have to \
366guess):
367- aggregates: count, count(distinct ...), sum, avg, min, max, any_value, stddev, \
368median, approx_distinct, approx_percentile_cont, array_agg, string_agg
369- date/time: now(), date_trunc('day'|'hour'|'minute'|..., ts), date_part('year'|..., \
370ts), date_bin(interval, ts, origin), to_char(ts, fmt), to_timestamp(text), \
371extract(field FROM ts), age(t1, t2)
372- intervals: `INTERVAL '7 days'`, `INTERVAL '1 hour'` (single-quoted, postgres-style)
373- string: length, lower, upper, substr, position, split_part, regexp_like, \
374regexp_match, regexp_replace, like, ilike, starts_with, ends_with, concat, \
375concat_ws
376- text search: contains_tokens(col, 'words') in WHERE; fts(table, query_json) in \
377FROM (see above)
378- numeric: round, floor, ceil, abs, sign, log, exp, power, sqrt
379- conditional: CASE WHEN ... THEN ... ELSE ..., coalesce, nullif, greatest, least
380- cast: CAST(x AS TYPE) or x::TYPE - but never on JSON columns (see the JSON \
381rules above)
382Quote identifiers with double quotes when they collide with keywords (e.g. \
383\"timestamp\"); string literals use single quotes.
384
385EXPLAIN is allowed: `EXPLAIN <query>` or `EXPLAIN ANALYZE <query>` returns the \
386DataFusion plan (and per-operator timings for ANALYZE) so you can self-diagnose \
387slow queries without leaving SQL.
388
389Output modes (the `format` arg):
390- text (default): a row-capped rendered ASCII table with a header showing \
391`{total_rows} in {elapsed_ms} ms; showing {shown}` and, on truncation, a \
392keyset-pagination hint.
393- json: same row-capped payload as `text` but delivered as a JSON object \
394{total_rows, shown_rows, truncated, elapsed_ms, columns, rows: [{col: val, ...}]}. \
395Spec-compliant dual delivery: the structured JSON rides MCP's `structuredContent` \
396field; clients that don't surface that channel get the same JSON as a text block. \
397Empirically validated on Claude Code 2.1.165 - the agent reads the structured form.
398- parquet | ndjson: write the FULL result set to a file and return a \
399`pond-sql-export://<id>` resource link; read it via MCP resources/read. On a \
400local/stdio install the response also names the on-disk path so you can open it \
401directly with duckdb/polars.
402
403Pagination - keyset (preferred):
404Use ORDER BY on indexed columns plus a composite seek key for stable tie-breaking. \
405The agent owns the cursor (the last sort value it saw); no server-side state.
406
407  -- page 1: most recent 100 messages in pond
408  SELECT message_id, timestamp, role, project
409  FROM messages
410  WHERE project LIKE '%pond%'
411  ORDER BY timestamp DESC, message_id DESC
412  LIMIT 100;
413
414  -- page 2: pass back the last (timestamp, message_id) the agent saw
415  SELECT message_id, timestamp, role, project
416  FROM messages
417  WHERE project LIKE '%pond%'
418    AND (timestamp, message_id) < (TIMESTAMP '2026-06-05T08:14:22.123456Z', 'last-id')
419  ORDER BY timestamp DESC, message_id DESC
420  LIMIT 100;
421
422Keyset stays stable across concurrent ingest (older rows don't shift) and uses \
423the btree on `timestamp`/`message_id` directly. For known-bounded full results, skip \
424pagination entirely: format=parquet writes everything in one call. OFFSET works \
425but scans-and-discards prior rows and shifts pages under writes - prefer keyset.
426
427Drilling from aggregates to content (instead of N round-trips of pond_get):
428JOIN to messages/parts directly. Example - top 10 longest sessions with first \
429user message:
430
431  WITH top_sessions AS (
432    SELECT session_id, COUNT(*) AS msgs
433    FROM messages
434    GROUP BY session_id
435    ORDER BY msgs DESC
436    LIMIT 10
437  )
438  SELECT ts.session_id, ts.msgs, s.project, s.source_agent,
439         m.search_text AS first_user_msg
440  FROM top_sessions ts
441  JOIN sessions s ON s.session_id = ts.session_id
442  LEFT JOIN messages m
443    ON m.session_id = ts.session_id
444   AND m.role = 'user'
445   AND m.timestamp = (
446     SELECT MIN(timestamp) FROM messages
447     WHERE session_id = ts.session_id AND role = 'user'
448   );
449
450One call, agent picks exactly which columns to hydrate. When you want the \
451pond_get-style rendered transcript (tool-call lines, subagent footer), call \
452pond_get with the session_id - that's its job.
453
454Examples (4 patterns the agent should recognize):
455
456  -- 1. Activity by project this week
457  SELECT project, COUNT(*) AS msgs, COUNT(DISTINCT session_id) AS sessions
458  FROM messages
459  WHERE timestamp >= now() - INTERVAL '7 days'
460  GROUP BY project
461  ORDER BY msgs DESC
462  LIMIT 20;
463
464  -- 2. Subagent breakdown
465  SELECT source_agent, COUNT(*) AS n
466  FROM sessions
467  WHERE source_agent LIKE '%/%'
468  GROUP BY source_agent
469  ORDER BY n DESC;
470
471  -- 3. Text filter in WHERE (all words must appear), composed with metadata
472  SELECT message_id, timestamp, project, substr(search_text, 1, 120) AS preview
473  FROM messages
474  WHERE contains_tokens(search_text, 'race condition')
475    AND timestamp >= now() - INTERVAL '30 days'
476  ORDER BY timestamp DESC
477  LIMIT 50;
478
479  -- 4. BM25 search in FROM, joined with metadata, relevance-ranked
480  SELECT m.session_id, m.timestamp, m.project, f._score, m.search_text
481  FROM fts('messages', \
482'{\"match\":{\"column\":\"search_text\",\"terms\":\"race condition\"}}') f
483  JOIN messages m ON m.message_id = f.message_id
484  WHERE m.project LIKE '%pond%'
485  ORDER BY f._score DESC
486  LIMIT 50;";
487
488    /// `pond_search` MCP tool parameters.
489    #[derive(Debug, Deserialize, schemars::JsonSchema)]
490    struct McpSearchParams {
491        /// What to search for: concepts and keywords. Keep it semantic - do
492        /// not put project names in the query, use the `project` filter
493        /// instead.
494        query: String,
495        /// Max sessions to return. Default 10, server-capped at 200. This is
496        /// also the "want more results" knob - raise it; there is no pagination.
497        #[serde(default)]
498        limit: Option<usize>,
499        /// Filter to projects whose path contains this substring.
500        #[serde(default)]
501        project: Option<String>,
502        /// Filter to one session (exact match) - semantic search within a
503        /// single, possibly long, session.
504        #[serde(default)]
505        session_id: Option<String>,
506        /// Filter to one source agent, e.g. "claude-code" or
507        /// "claude-code/general-purpose" (a subagent).
508        #[serde(default)]
509        source_agent: Option<String>,
510        /// Include subagent / sub-task sessions. Default false: search targets
511        /// the main sessions where the human and agent talked. Set true to
512        /// include subagent sessions (source_agent like "claude-code/<name>").
513        #[serde(default)]
514        include_subagents: Option<bool>,
515        /// Only messages on or after this date (YYYY-MM-DD).
516        #[serde(default)]
517        from_date: Option<String>,
518        /// Only messages on or before this date (YYYY-MM-DD).
519        #[serde(default)]
520        to_date: Option<String>,
521        /// Output shape: "text" (default - a rendered transcript of the ranked
522        /// hits) or "json" (the same hits as structured data).
523        #[serde(default)]
524        format: Option<String>,
525    }
526
527    /// `pond_get` MCP tool parameters. Exactly one of `message_id` /
528    /// `session_id` is required.
529    #[derive(Debug, Deserialize, schemars::JsonSchema)]
530    struct McpGetParams {
531        /// Retrieve this message: its full parts plus `context_depth` sibling
532        /// messages each side (conversational siblings by default; set
533        /// response_mode to widen).
534        #[serde(default)]
535        message_id: Option<String>,
536        /// Retrieve this whole session (mutually exclusive with message_id).
537        #[serde(default)]
538        session_id: Option<String>,
539        /// With message_id: messages of thread context to include on each side.
540        #[serde(default)]
541        context_depth: Option<usize>,
542        /// Cap on returned messages (session mode) or parts (message mode).
543        /// Default 20, max 1000.
544        #[serde(default)]
545        limit: Option<usize>,
546        /// Depth: "conversational" (default; human/model text only, with part
547        /// summaries), "complete" (all messages incl. system/tool carriers,
548        /// with part summaries), or "verbatim" (all messages with full parts
549        /// inline; session mode only for the parts). In message mode it
550        /// selects which siblings fill the context window.
551        #[serde(default)]
552        response_mode: Option<String>,
553        /// Session mode only: which end to read `limit` messages from -
554        /// "start" (oldest, default) or "end" (most recent, e.g. to recover
555        /// recent context after compaction). Results stay chronological;
556        /// ignored in message mode.
557        #[serde(default)]
558        session_from: Option<String>,
559        /// Exclusive continuation anchor from a prior response: the last
560        /// `message_id` (session mode) or last `part_id` (message mode).
561        #[serde(default)]
562        after_id: Option<String>,
563    }
564
565    /// `pond_sql_query` MCP tool parameters.
566    #[derive(Debug, Deserialize, schemars::JsonSchema)]
567    struct McpSqlParams {
568        /// One read-only SQL statement (DataFusion / PostgreSQL-compatible).
569        /// SELECT/WITH only (or EXPLAIN of one); writes and side-effecting
570        /// statements are rejected. Exact columns - messages(session_id,
571        /// message_id, timestamp, role, source_agent, project, content
572        /// [system-role only], search_text [the conversational text],
573        /// embedding_model, options) | sessions(session_id,
574        /// parent_session_id, parent_message_id, source_agent, created_at,
575        /// project, options) | parts(session_id, message_id, id, ordinal,
576        /// type, provenance, variant_data, options). parts.type enums use
577        /// underscores: 'tool_call', 'tool_result', 'text', 'reasoning',
578        /// 'file'. JSON columns (variant_data, options) are JSONB: read
579        /// fields with json_extract(col, '$.a.b') or json_get_string(col,
580        /// 'key', ...), never CAST them. Text search: WHERE
581        /// contains_tokens(search_text, 'words') to filter, FROM
582        /// fts('messages', '{...}') for BM25-ranked results. Control row count
583        /// with SQL `LIMIT`; inline output is capped at 100 rows (use
584        /// format=parquet|ndjson to get every row). See the `schema://pond-sql`
585        /// resource for joins, JSON/FTS functions, pagination + drilling
586        /// patterns, and worked examples.
587        #[serde(alias = "sql")]
588        query: String,
589        /// Output format: "text" (default; rendered ASCII table with metrics
590        /// footer), "json" (same row-capped data as a structured JSON object,
591        /// delivered via MCP structuredContent), "parquet", or "ndjson". For
592        /// parquet/ndjson the full result set is written to a file and a
593        /// `pond-sql-export://` resource link is returned (no truncation).
594        #[serde(default)]
595        format: Option<String>,
596    }
597
598    fn parse_session_from(value: Option<String>) -> SessionFrom {
599        match value.as_deref() {
600            Some("end") => SessionFrom::End,
601            _ => SessionFrom::Start,
602        }
603    }
604
605    fn parse_response_mode(value: Option<String>) -> ResponseMode {
606        match value.as_deref() {
607            Some("complete") => ResponseMode::Complete,
608            Some("verbatim") => ResponseMode::Verbatim,
609            // None or any other value falls back to the conversational default.
610            _ => ResponseMode::Conversational,
611        }
612    }
613
614    /// The pond MCP server: holds the shared state and the generated tool router.
615    #[derive(Clone)]
616    pub struct PondMcp {
617        state: AppState,
618        tool_router: ToolRouter<PondMcp>,
619    }
620
621    #[tool_router]
622    impl PondMcp {
623        pub fn new(state: AppState) -> Self {
624            Self {
625                state,
626                tool_router: Self::tool_router(),
627            }
628        }
629
630        #[tool(
631            description = "Hybrid (vector + BM25) search over stored conversation history. \
632                           Returns a readable transcript: a leading `key:` line explains the \
633                           format and the first line states totals plus how many searchable \
634                           messages the filters left in scope (the absence signal - search only \
635                           sees conversational text, never tool calls/results), then results are \
636                           grouped by session, ordered by each session's best hit. Each hit is a \
637                           `--- [n] score | role | time | message_id | project | agent | session \
638                           ---` delimiter rule followed by the matched text. Pass a returned \
639                           `message_id` to `pond_get` for full text. Common args: \
640                           query (semantic - concepts, not project names), then project / \
641                           from_date / to_date to scope, limit to widen (no pagination - raise \
642                           limit for more). Advanced: source_agent (e.g. \"claude-code\", or \
643                           \"claude-code/general-purpose\" for subagents), session_id (search \
644                           within one long session), include_subagents (subagent sessions are \
645                           excluded by default), format (\"text\" default, or \"json\" for \
646                           structured hits). \
647                           Scores are relative within one response; there is no min_score. For \
648                           exact strings, identifiers, or error messages, pond_sql_query is the \
649                           sharper tool - WHERE contains_tokens(search_text, 'words') to \
650                           filter, FROM fts('messages', ...) for BM25 ranking - and it sees \
651                           subagent sessions too.",
652            annotations(read_only_hint = true, idempotent_hint = true, open_world_hint = false)
653        )]
654        async fn pond_search(
655            &self,
656            Parameters(params): Parameters<McpSearchParams>,
657        ) -> Result<CallToolResult, ErrorData> {
658            let json = matches!(params.format.as_deref(), Some("json"));
659            let request = SearchRequest {
660                protocol_version: PROTOCOL_VERSION,
661                namespace: Some(default_namespace()),
662                query: params.query,
663                filters: SearchFilters {
664                    project: params.project.map(ProjectFilter::Contains),
665                    session_id: params.session_id,
666                    source_agent: params.source_agent,
667                    from_date: params.from_date,
668                    to_date: params.to_date,
669                    // min_score is intentionally not on the MCP surface; scores
670                    // are response-relative, so a server-side threshold is a
671                    // footgun for agent callers. CLI / HTTP still exposes it
672                    // for the bench harness.
673                    min_score: 0.0,
674                    include_subagents: params.include_subagents.unwrap_or(false),
675                },
676                limit: params.limit.unwrap_or(10),
677                mode_override: None,
678            };
679            match run_search(
680                &self.state.store,
681                &self.state.embedder,
682                request.clone(),
683                &self.state.search,
684            )
685            .await
686            {
687                SearchEnvelope::Success(response) if json => {
688                    // `structured()` mirrors the same bytes into the text
689                    // content block, so shadowing clients still get the data.
690                    Ok(CallToolResult::structured(
691                        serde_json::to_value(&response).map_err(|error| {
692                            ErrorData::internal_error(
693                                format!("failed to serialize search response: {error}"),
694                                None,
695                            )
696                        })?,
697                    ))
698                }
699                SearchEnvelope::Success(response) => {
700                    Ok(tool_result(render_search_transcript(&response, &request)))
701                }
702                SearchEnvelope::Error(envelope) => Err(to_error_data(&envelope)),
703            }
704        }
705
706        #[tool(
707            description = "Retrieve stored conversation content as a readable transcript \
708                           (a leading `key:` line explains the format). Common: session_id \
709                           (whole session; pair with response_mode \
710                           conversational|complete|verbatim) OR message_id (that message \
711                           marked `>`, plus context_depth sibling messages each side, with \
712                           its tool/file parts in full). A session_id response lists the \
713                           session's subagents in a footer so you can open each. Advanced: \
714                           limit (cap), after_id (paging - pass the value the footer shows), \
715                           session_from (\"start\"|\"end\"; \"end\" returns the most recent \
716                           messages, \
717                           e.g. to recover context after compaction). \
718                           Tool/result lines render as `-> name [call_id]` / `<- name \
719                           [call_id] (ok|failed)`. Not for bulk export - use `pond export`.",
720            annotations(read_only_hint = true, idempotent_hint = true, open_world_hint = false)
721        )]
722        async fn pond_get(
723            &self,
724            Parameters(params): Parameters<McpGetParams>,
725        ) -> Result<CallToolResult, ErrorData> {
726            let request = GetRequest {
727                protocol_version: PROTOCOL_VERSION,
728                namespace: Some(default_namespace()),
729                session_id: params.session_id,
730                message_id: params.message_id,
731                context_depth: params.context_depth.unwrap_or(0),
732                limit: params.limit.unwrap_or(20),
733                response_mode: parse_response_mode(params.response_mode),
734                session_from: parse_session_from(params.session_from),
735                after_id: params.after_id,
736            };
737            match run_get(&self.state.store, request.clone()).await {
738                GetEnvelope::Success(response) => {
739                    let mut transcript = render_get_transcript(&response, &request);
740                    // Spawn-only subagents are stored as their own sessions
741                    // (spec.md#datasets); surface them on the parent's first page
742                    // so an agent can open each (otherwise they are undiscoverable
743                    // from the MCP surface). Best-effort: a lookup failure just
744                    // omits the footer rather than failing the get.
745                    if request.message_id.is_none()
746                        && request.after_id.is_none()
747                        && let Ok(children) =
748                            self.state.store.child_sessions(&response.session.id).await
749                        && !children.is_empty()
750                    {
751                        transcript.push_str(&render_subagents_footer(&children));
752                    }
753                    Ok(tool_result(transcript))
754                }
755                GetEnvelope::Error(envelope) => Err(to_error_data(&envelope)),
756            }
757        }
758
759        #[tool(
760            description = "Run ONE read-only SQL query (DataFusion / PostgreSQL-compatible) \
761                           over the stored corpus as three tables: sessions, messages, parts. \
762                           For filtering, joins, and aggregation (counts, group-by, time \
763                           buckets) - the analytic complement to pond_search's semantic \
764                           recall. SELECT/WITH only (or EXPLAIN of one); writes and side- \
765                           effecting statements are rejected. The exact column lists are in \
766                           the `query` parameter description - use those names, do not guess \
767                           (column discovery also works: SELECT column_name FROM \
768                           information_schema.columns WHERE table_name = 'messages'). \
769                           Routing: metadata analytics -> SQL on messages/sessions; tool-call \
770                           analytics -> parts WHERE type = 'tool_call' with \
771                           json_get_string(variant_data, 'name'); text search -> WHERE \
772                           contains_tokens(search_text, 'words') to filter or FROM \
773                           fts('messages', '{...json...}') for BM25-ranked results, or \
774                           pond_search for semantic recall; reading a transcript -> pond_get, \
775                           not SQL. The embedding `vector` column is never returned (explicit \
776                           projection is rejected; filtering in WHERE is fine). Control row \
777                           count with SQL `LIMIT`; inline output (format text|json) is capped \
778                           at 100 rows. format defaults to text (a row-capped rendered table); \
779                           set format=json for a structured JSON payload (delivered via MCP \
780                           structuredContent), or format=parquet|ndjson to write the full \
781                           result to a file returned as a pond-sql-export:// resource. Read \
782                           resource schema://pond-sql \
783                           for joins, indexed columns, JSON access rules, the function \
784                           quick-reference, pagination + drilling patterns, and worked \
785                           examples.",
786            annotations(read_only_hint = true, idempotent_hint = true, open_world_hint = false)
787        )]
788        async fn pond_sql_query(
789            &self,
790            Parameters(params): Parameters<McpSqlParams>,
791        ) -> Result<CallToolResult, ErrorData> {
792            let mode = match params.format.as_deref() {
793                None | Some("text") => sql::Mode::Inline,
794                Some("json") => sql::Mode::InlineJson,
795                Some("parquet") => sql::Mode::Export(sql::Format::Parquet),
796                Some("ndjson") => sql::Mode::Export(sql::Format::Ndjson),
797                Some(other) => {
798                    return Ok(CallToolResult::error(vec![Content::text(format!(
799                        "unknown format {other:?}; use \"text\", \"json\", \"parquet\", \
800                         or \"ndjson\""
801                    ))]));
802                }
803            };
804            let inline_rows = sql::DEFAULT_INLINE_ROWS;
805
806            // The three tables are independent (per-table caches/mutexes), so
807            // overlap their freshness/manifest fetches rather than serialize.
808            let store = &self.state.store;
809            let tables = match tokio::try_join!(
810                store.dataset(Table::Sessions),
811                store.dataset(Table::Messages),
812                store.dataset(Table::Parts),
813            ) {
814                Ok((sessions, messages, parts)) => sql::Tables {
815                    sessions,
816                    messages,
817                    parts,
818                },
819                Err(_) => {
820                    return Err(ErrorData::internal_error(
821                        "sql datasets unavailable".to_owned(),
822                        None,
823                    ));
824                }
825            };
826
827            match sql::run(&tables, &params.query, mode, inline_rows).await {
828                Ok(sql::Outcome::Inline(text)) => Ok(tool_result(text)),
829                Ok(sql::Outcome::InlineJson(value)) => Ok(CallToolResult::structured(value)),
830                Ok(sql::Outcome::Export {
831                    bytes,
832                    format,
833                    rows,
834                    columns,
835                }) => {
836                    let name = format!("{}.{}", Uuid::now_v7(), format.ext());
837                    match store.export_write(&name, &bytes).await {
838                        Ok(_) => Ok(export_result(
839                            store,
840                            &name,
841                            format,
842                            rows,
843                            &columns,
844                            bytes.len(),
845                        )),
846                        Err(error) => Err(ErrorData::internal_error(
847                            format!("export write failed: {error}"),
848                            None,
849                        )),
850                    }
851                }
852                Err(sql::SqlError::Query(message)) => {
853                    Ok(CallToolResult::error(vec![Content::text(message)]))
854                }
855                Err(sql::SqlError::Infra(error)) => Err(ErrorData::internal_error(
856                    format!("sql execution failed: {error}"),
857                    None,
858                )),
859            }
860        }
861    }
862
863    // `router = self.tool_router` makes the generated `call_tool` / `list_tools`
864    // read the cached router field; the bare-`#[tool_handler]` default rebuilds
865    // the router via `Self::tool_router()` on every call instead.
866    #[tool_handler(router = self.tool_router)]
867    impl ServerHandler for PondMcp {
868        fn get_info(&self) -> ServerInfo {
869            ServerInfo::new(
870                ServerCapabilities::builder()
871                    .enable_tools()
872                    .enable_resources()
873                    .build(),
874            )
875            // rmcp's default `from_build_env` reports the rmcp crate (name +
876            // version) - clients display the server's own identity, so set it.
877            .with_server_info(Implementation::new("pond", env!("CARGO_PKG_VERSION")))
878            .with_instructions(
879                "pond recalls past agent sessions (Claude Code and others) - prior work, \
880                 decisions, and context across sessions, not the live conversation. \
881                 Workflow: pond_search to find relevant messages, then pond_get to read \
882                 full text by message_id or a whole session by session_id; both return \
883                 readable transcripts, not JSON. Scope with filters, not the query: project \
884                 (path substring), session_id, source_agent, from_date / to_date - \
885                 keep query semantic (concepts, not project names). Scores are relative \
886                 within one response; there is no min_score. Subagents are stored as their \
887                 own sessions (source_agent like \"claude-code/general-purpose\"); pond_get \
888                 on a parent session lists them in a footer so you can open each. Recover \
889                 context lost to compaction: find this session via pond_search (a distinctive \
890                 recent topic + project + from_date=today), then pond_get(session_id, \
891                 session_from=\"end\") for the recent pre-compaction turns. Deeper \
892                 reference on demand: resource schema://pond (all filters + response format), \
893                 stats://pond (corpus + embedding stats). For structured/analytic queries \
894                 (filtering, joins, counts, group-by) use pond_sql_query: read-only SQL \
895                 (SELECT only) over the sessions/messages/parts tables, with optional \
896                 parquet/ndjson export; see resource schema://pond-sql. Search only indexes \
897                 conversational text (tool calls/results are invisible to it), and a \
898                 zero/weak result is not proof of absence - for exact strings, \
899                 identifiers, or error messages run pond_sql_query with WHERE \
900                 contains_tokens(search_text, 'words') (all words must match; \
901                 index-accelerated), or FROM fts('messages', \
902                 '{\"match\":{\"column\":\"search_text\",\"terms\":\"...\"}}') for \
903                 BM25-ranked results; both cover subagent sessions too.",
904            )
905        }
906
907        async fn list_resources(
908            &self,
909            _request: Option<PaginatedRequestParams>,
910            _context: RequestContext<RoleServer>,
911        ) -> Result<ListResourcesResult, ErrorData> {
912            Ok(ListResourcesResult {
913                resources: vec![
914                    RawResource::new("schema://pond", "pond search schema").no_annotation(),
915                    RawResource::new("schema://pond-sql", "pond SQL table schema").no_annotation(),
916                    RawResource::new("stats://pond", "pond corpus stats").no_annotation(),
917                ],
918                next_cursor: None,
919                meta: None,
920            })
921        }
922
923        async fn read_resource(
924            &self,
925            request: ReadResourceRequestParams,
926            _context: RequestContext<RoleServer>,
927        ) -> Result<ReadResourceResult, ErrorData> {
928            match request.uri.as_str() {
929                "schema://pond" => Ok(ReadResourceResult::new(vec![ResourceContents::text(
930                    SCHEMA_DOC,
931                    request.uri,
932                )])),
933                "schema://pond-sql" => Ok(ReadResourceResult::new(vec![ResourceContents::text(
934                    SQL_SCHEMA_DOC,
935                    request.uri,
936                )])),
937                // `pond_sql_query` export artifacts: read the file pond wrote
938                // (parquet -> base64 blob, ndjson -> text). The filename is
939                // validated to a minted `<uuid>.<ext>` so the URI can't traverse.
940                uri if uri.starts_with("pond-sql-export://") => {
941                    let name = uri.trim_start_matches("pond-sql-export://").to_owned();
942                    if !valid_export_name(&name) {
943                        return Err(ErrorData::resource_not_found(
944                            format!("invalid export id: {name}"),
945                            None,
946                        ));
947                    }
948                    let bytes = self.state.store.export_read(&name).await.map_err(|error| {
949                        ErrorData::resource_not_found(format!("export not found: {error}"), None)
950                    })?;
951                    let contents = if name.ends_with(".ndjson") {
952                        ResourceContents::text(
953                            String::from_utf8_lossy(&bytes).into_owned(),
954                            request.uri,
955                        )
956                        .with_mime_type("application/x-ndjson")
957                    } else {
958                        ResourceContents::blob(STANDARD.encode(&bytes), request.uri)
959                            .with_mime_type("application/vnd.apache.parquet")
960                    };
961                    Ok(ReadResourceResult::new(vec![contents]))
962                }
963                "stats://pond" => {
964                    let store = &self.state.store;
965                    let map_err = |error: anyhow::Error| {
966                        ErrorData::internal_error(format!("stats unavailable: {error}"), None)
967                    };
968                    let (sessions, messages, parts) = store.row_counts().await.map_err(&map_err)?;
969                    let embedding = store.embedding_progress().await.map_err(&map_err)?;
970                    let stale = store.stale_embedding_count().await.map_err(&map_err)?;
971                    let indices = store.index_status().await.map_err(&map_err)?;
972
973                    let embedded_percent = if embedding.total == 0 {
974                        0.0
975                    } else {
976                        #[allow(clippy::cast_precision_loss)]
977                        let pct = (embedding.embedded as f64 / embedding.total as f64) * 100.0;
978                        (pct * 10.0).round() / 10.0
979                    };
980                    let index_rows = indices
981                        .iter()
982                        .map(|status| {
983                            serde_json::json!({
984                                "table": status.table.as_str(),
985                                "intent": status.intent_name,
986                                "exists": status.exists,
987                                "fragments_covered": status.fragments_covered,
988                                "unindexed_rows": status.unindexed_rows,
989                            })
990                        })
991                        .collect::<Vec<_>>();
992
993                    // spec.md#search: `search_text` is the conversational text
994                    // (filtered of harness-injected parts at the adapter seam).
995                    // `embedding.total` is the searchable population - that is
996                    // the right denominator for "% embedded", not total messages.
997                    let stats = serde_json::json!({
998                        "corpus": {
999                            "sessions": sessions,
1000                            "messages": messages,
1001                            "searchable_messages": embedding.total,
1002                            "parts": parts,
1003                        },
1004                        "embeddings": {
1005                            "model": embedding.model,
1006                            "embedded": embedding.embedded,
1007                            "searchable_total": embedding.total,
1008                            "embedded_percent": embedded_percent,
1009                            "stale_under_other_model": stale,
1010                        },
1011                        "indices": index_rows,
1012                    });
1013                    Ok(ReadResourceResult::new(vec![ResourceContents::text(
1014                        stats.to_string(),
1015                        request.uri,
1016                    )]))
1017                }
1018                other => Err(ErrorData::resource_not_found(
1019                    format!("unknown resource: {other}"),
1020                    None,
1021                )),
1022            }
1023        }
1024
1025        async fn list_tools(
1026            &self,
1027            request: Option<PaginatedRequestParams>,
1028            context: RequestContext<RoleServer>,
1029        ) -> Result<ListToolsResult, ErrorData> {
1030            let _ = (request, context);
1031            let mut result = ListToolsResult {
1032                tools: self.tool_router.list_all(),
1033                next_cursor: None,
1034                meta: None,
1035            };
1036            annotate_tool_limits(&mut result);
1037            Ok(result)
1038        }
1039    }
1040
1041    fn annotate_tool_limits(result: &mut ListToolsResult) {
1042        for tool in &mut result.tools {
1043            let chars = match tool.name.as_ref() {
1044                "pond_search" => 80_000,
1045                "pond_get" => 200_000,
1046                "pond_sql_query" => 80_000,
1047                _ => continue,
1048            };
1049            let mut meta = serde_json::Map::new();
1050            meta.insert(
1051                "anthropic/maxResultSizeChars".to_owned(),
1052                serde_json::json!(chars),
1053            );
1054            tool.meta = Some(Meta(meta));
1055        }
1056    }
1057
1058    /// Run the stdio MCP server until the client disconnects. All diagnostics
1059    /// go to stderr (the shared `tracing` subscriber); stdout carries only
1060    /// JSON-RPC frames, written by rmcp's stdio transport (spec.md#scope).
1061    pub async fn serve_stdio(state: AppState) -> anyhow::Result<()> {
1062        let service = PondMcp::new(state)
1063            .serve(stdio())
1064            .await
1065            .context("failed to start stdio MCP server")?;
1066        service.waiting().await.context("stdio MCP server error")?;
1067        Ok(())
1068    }
1069
1070    /// Build an MCP tool result from a rendered transcript. Deliberately text
1071    /// only: Claude Code surfaces `structuredContent` over the text block when
1072    /// both are present, which would shadow the transcript - the readable view
1073    /// is the whole point on the MCP surface. Programmatic clients that want the
1074    /// structured wire shape use the HTTP `/v1/*` JSON API instead.
1075    fn tool_result(transcript: String) -> CallToolResult {
1076        CallToolResult::success(vec![Content::text(transcript)])
1077    }
1078
1079    /// Build the `pond_sql_query` export result: a text summary plus a
1080    /// `resource_link` to the artifact (the spec-canonical way to hand back a
1081    /// tool-produced file - the bytes ride `resources/read`, not the tool
1082    /// result, so they don't load into context unless the host fetches them).
1083    /// On a `file://` install the summary also names the on-disk path so a
1084    /// co-located agent can read it directly.
1085    fn export_result(
1086        store: &crate::sessions::Store,
1087        name: &str,
1088        format: sql::Format,
1089        rows: usize,
1090        columns: &[String],
1091        bytes: usize,
1092    ) -> CallToolResult {
1093        let uri = format!("pond-sql-export://{name}");
1094        let column_list = if columns.is_empty() {
1095            "(none)".to_owned()
1096        } else {
1097            columns.join(", ")
1098        };
1099        let mut summary = format!(
1100            "Exported {rows} row(s), {bytes} bytes ({}). Columns: {column_list}.\n\
1101             Fetch via MCP resources/read on {uri}.",
1102            format.ext()
1103        );
1104        if let Some(path) = store.export_local_path(name) {
1105            summary.push_str(&format!(
1106                "\nLocal file: {} - on this (stdio) install you can read it directly \
1107                 (e.g. duckdb, polars).",
1108                path.display()
1109            ));
1110        }
1111        let link = RawResource::new(uri, name.to_owned())
1112            .with_description(format!("pond SQL export ({}, {rows} rows)", format.ext()))
1113            .with_mime_type(format.mime().to_owned())
1114            .with_size(u32::try_from(bytes).unwrap_or(u32::MAX));
1115        CallToolResult::success(vec![Content::text(summary), Content::resource_link(link)])
1116    }
1117
1118    /// Accept only the export filenames pond mints (`<uuid>.parquet|ndjson`),
1119    /// guarding the `pond-sql-export://` resource against path traversal.
1120    fn valid_export_name(name: &str) -> bool {
1121        let Some((stem, ext)) = name.rsplit_once('.') else {
1122            return false;
1123        };
1124        matches!(ext, "parquet" | "ndjson")
1125            && !stem.is_empty()
1126            && stem.bytes().all(|b| b.is_ascii_hexdigit() || b == b'-')
1127    }
1128
1129    /// Footer for a `pond_get` session response listing the session's spawn-only
1130    /// subagents. Each subagent is its own session (spec.md#datasets) addressable
1131    /// by the printed id, so the caller can open any with `pond_get(session_id)`;
1132    /// without this they are invisible from the MCP surface.
1133    fn render_subagents_footer(children: &[crate::wire::Session]) -> String {
1134        use std::fmt::Write;
1135        let mut out = String::new();
1136        let _ = writeln!(out);
1137        let _ = writeln!(
1138            out,
1139            "subagents ({}) - pass an id to pond_get(session_id=...):",
1140            children.len()
1141        );
1142        for child in children {
1143            let _ = writeln!(out, "  {} | {}", child.id, child.source_agent);
1144        }
1145        out
1146    }
1147
1148    /// `YYYY-MM-DD HH:MM:SSZ` - compact, sortable, timezone-explicit.
1149    fn fmt_ts(ts: &chrono::DateTime<chrono::Utc>) -> String {
1150        ts.format("%Y-%m-%d %H:%M:%SZ").to_string()
1151    }
1152
1153    /// Inner string of an `Extracted<String>` option, or `?` when the source
1154    /// carried none (spec.md#model-no-synthesis: absence is real, not a blank).
1155    fn opt_name(value: &Option<crate::adapter::extract::Extracted<String>>) -> &str {
1156        value.as_deref().map(String::as_str).unwrap_or("?")
1157    }
1158
1159    /// Append each line of `body` to `out`, so escaped `\n` in stored text
1160    /// renders as real line breaks. A trailing blank line in the source is
1161    /// dropped (lines() already does this).
1162    fn push_lines(out: &mut String, body: &str, indent: &str) {
1163        use std::fmt::Write;
1164        for line in body.lines() {
1165            let _ = writeln!(out, "{indent}{line}");
1166        }
1167    }
1168
1169    fn render_search_transcript(response: &SearchResponse, request: &SearchRequest) -> String {
1170        use std::fmt::Write;
1171        // Must mirror build_filter's default-exclusion condition, else the note lies.
1172        let subagent_note = if !request.filters.include_subagents
1173            && request.filters.session_id.is_none()
1174            && request.filters.source_agent.is_none()
1175        {
1176            " Subagent sessions excluded; pass include_subagents=true to include them."
1177        } else {
1178            ""
1179        };
1180        if response.sessions.is_empty() {
1181            // spec.md#search-absence-honesty: name the scope size and the
1182            // recovery path - a zero-hit response must distinguish "nothing
1183            // relevant exists" from "the filters excluded everything".
1184            if response.searchable_in_scope == 0 {
1185                return format!(
1186                    "pond_search: 0 searchable messages in scope - the filters exclude \
1187                     everything before retrieval. Widen or drop project/date filters.\
1188                     {subagent_note}\n"
1189                );
1190            }
1191            let fts_hint = " For exact strings or identifiers, try pond_sql_query: SELECT \
1192                            message_id, session_id, search_text FROM messages WHERE \
1193                            contains_tokens(search_text, '...').";
1194            return format!(
1195                "pond_search: no matches for {:?} across {} searchable messages in \
1196                 scope.{subagent_note}{fts_hint}\n",
1197                request.query, response.searchable_in_scope
1198            );
1199        }
1200        let shown: usize = response.sessions.iter().map(|s| s.matches.len()).sum();
1201        let mut out = String::new();
1202        let _ = writeln!(
1203            out,
1204            "pond_search: {} matching messages ({} searchable in scope), showing {} hits from {} \
1205             sessions.{}",
1206            response.matched_total,
1207            response.searchable_in_scope,
1208            shown,
1209            response.sessions.len(),
1210            subagent_note,
1211        );
1212        let _ = writeln!(
1213            out,
1214            "key: session rules group hits by session, ordered by best hit; \"--- [n] score | role | time | message_id | project | agent | session ---\" delimits each hit + matched text. pond_get <message_id> for full; raise limit for more (no pagination)."
1215        );
1216        let mut index = 0;
1217        for (session_index, session) in response.sessions.iter().enumerate() {
1218            let best = session
1219                .matches
1220                .first()
1221                .map(|hit| hit.score)
1222                .unwrap_or_default();
1223            let _ = writeln!(out);
1224            let _ = writeln!(
1225                out,
1226                "{}",
1227                rule_line(&format!(
1228                    "session [{}] best {:.2} | {}/{} matched | {} | {} | {}",
1229                    session_index + 1,
1230                    best,
1231                    session.matched_message_count,
1232                    session.session_messages_count,
1233                    session.project,
1234                    session.source_agent,
1235                    session.session_id,
1236                )),
1237            );
1238            for hit in &session.matches {
1239                index += 1;
1240                let _ = writeln!(out);
1241                let _ = writeln!(
1242                    out,
1243                    "{}",
1244                    rule_line(&format!(
1245                        "[{index}] {:.2} | {} | {} | {} | {} | {} | {}",
1246                        hit.score,
1247                        hit.role.as_str(),
1248                        fmt_ts(&hit.timestamp),
1249                        hit.message_id,
1250                        session.project,
1251                        session.source_agent,
1252                        session.session_id,
1253                    )),
1254                );
1255                push_lines(&mut out, &hit.text, "");
1256            }
1257        }
1258        out
1259    }
1260
1261    fn render_get_transcript(response: &GetResponse, request: &GetRequest) -> String {
1262        use std::fmt::Write;
1263        let session = &response.session;
1264        let mut out = String::new();
1265        match &response.result {
1266            GetResult::Session {
1267                messages,
1268                messages_remaining,
1269            } => {
1270                let mode = match request.response_mode {
1271                    ResponseMode::Conversational => "conversational",
1272                    ResponseMode::Complete => "complete",
1273                    ResponseMode::Verbatim => "verbatim",
1274                };
1275                let more = if *messages_remaining > 0 {
1276                    " (more)"
1277                } else {
1278                    ""
1279                };
1280                let _ = writeln!(
1281                    out,
1282                    "pond_get: session {} ({mode}), {} messages{more}.",
1283                    session.id,
1284                    messages.len(),
1285                );
1286                let _ = writeln!(
1287                    out,
1288                    "key: \"--- [n] role | time | message_id ---\" delimits each message; \"->\" tool call, \"<-\" result. Pass after_id=<id> to page."
1289                );
1290                for (idx, message) in messages.iter().enumerate() {
1291                    let _ = writeln!(out);
1292                    render_message(
1293                        &mut out,
1294                        idx + 1,
1295                        message,
1296                        message.parts.as_deref(),
1297                        &message.parts_summary,
1298                        false,
1299                    );
1300                }
1301                let _ = writeln!(out);
1302                let _ = writeln!(
1303                    out,
1304                    "session {} | {} | {}",
1305                    session.id, session.source_agent, session.project,
1306                );
1307                if *messages_remaining > 0
1308                    && let Some(last) = messages.last()
1309                {
1310                    match request.session_from {
1311                        SessionFrom::Start => {
1312                            let _ = writeln!(
1313                                out,
1314                                "... {} more messages; pass after_id={} to pond_get to continue",
1315                                messages_remaining, last.id,
1316                            );
1317                        }
1318                        // Tail page: the remaining messages are *earlier*, before this
1319                        // page. after_id only pages forward, so it can't reach them -
1320                        // point back to the start instead of a cursor that dead-ends.
1321                        SessionFrom::End => {
1322                            let _ = writeln!(
1323                                out,
1324                                "... {messages_remaining} earlier messages precede this tail; call pond_get with session_from=\"start\" to read from the beginning",
1325                            );
1326                        }
1327                    }
1328                }
1329            }
1330            GetResult::Message {
1331                target,
1332                target_parts,
1333                target_parts_remaining,
1334                siblings,
1335            } => {
1336                let _ = writeln!(
1337                    out,
1338                    "pond_get: thread around {} in session {} (context +/-{}).",
1339                    target.id, session.id, request.context_depth,
1340                );
1341                let _ = writeln!(
1342                    out,
1343                    "key: \"--- [n] role | time | message_id ---\" delimits each message; \">\" = the one you requested; \"->\" tool call, \"<-\" result. pond_get <message_id> to expand any line."
1344                );
1345                // Interleave target with siblings, ordered by (timestamp, id) to
1346                // match storage - codex writes many messages at the same
1347                // timestamp, so the id is the real tiebreak (a bare timestamp
1348                // sort scrambles them). Drop context siblings with nothing to
1349                // render (carrier turns with no text/content/parts); the
1350                // requested target always stays, even if empty.
1351                let mut thread: Vec<(&MessageView, bool)> =
1352                    siblings.iter().map(|view| (view, false)).collect();
1353                thread.push((target, true));
1354                thread.sort_by(|a, b| {
1355                    a.0.timestamp
1356                        .cmp(&b.0.timestamp)
1357                        .then_with(|| a.0.id.cmp(&b.0.id))
1358                });
1359                thread.retain(|(view, is_target)| *is_target || message_has_content(view));
1360                for (idx, (view, is_target)) in thread.iter().enumerate() {
1361                    let _ = writeln!(out);
1362                    let parts: Option<&[ResponsePart]> = if *is_target {
1363                        Some(target_parts.as_slice())
1364                    } else {
1365                        view.parts.as_deref()
1366                    };
1367                    render_message(
1368                        &mut out,
1369                        idx + 1,
1370                        view,
1371                        parts,
1372                        &view.parts_summary,
1373                        *is_target,
1374                    );
1375                }
1376                let _ = writeln!(out);
1377                let _ = writeln!(
1378                    out,
1379                    "session {} | {} | {}",
1380                    session.id, session.source_agent, session.project,
1381                );
1382                if *target_parts_remaining > 0
1383                    && let Some(last) = target_parts.last()
1384                {
1385                    let _ = writeln!(
1386                        out,
1387                        "... {} more parts of {}; pass after_id={} to pond_get to continue",
1388                        target_parts_remaining, target.id, last.id,
1389                    );
1390                }
1391            }
1392        }
1393        out
1394    }
1395
1396    /// Whether a message view has anything to render below its header: real
1397    /// text/content, or any parts (full or summarized). Used to drop empty
1398    /// carrier turns from message-mode context.
1399    fn message_has_content(view: &MessageView) -> bool {
1400        view.text.as_deref().is_some_and(|t| !t.trim().is_empty())
1401            || view
1402                .content
1403                .as_deref()
1404                .is_some_and(|c| !c.trim().is_empty())
1405            || view.parts.as_deref().is_some_and(|p| !p.is_empty())
1406            || !view.parts_summary.is_empty()
1407    }
1408
1409    /// Target column width for a delimiter-rule header.
1410    const RULE_WIDTH: usize = 72;
1411
1412    /// Wrap `inner` as a delimiter rule: `--- {inner} ----...` padded to
1413    /// [`RULE_WIDTH`] (always at least a 3-dash tail when `inner` is already
1414    /// wide). Used for both search hits and get message headers.
1415    fn rule_line(inner: &str) -> String {
1416        let head = format!("--- {inner} ");
1417        let pad = RULE_WIDTH.saturating_sub(head.chars().count()).max(3);
1418        format!("{head}{}", "-".repeat(pad))
1419    }
1420
1421    /// One message block: an indexed `--- [n] role | time | id ---` delimiter
1422    /// rule (unambiguous even when the body has blank lines or `##` headings),
1423    /// then text/content as real lines, then parts - full bodies when `parts`
1424    /// is present, else one-line summaries.
1425    fn render_message(
1426        out: &mut String,
1427        index: usize,
1428        view: &MessageView,
1429        parts: Option<&[ResponsePart]>,
1430        summary: &[PartSummary],
1431        is_target: bool,
1432    ) {
1433        use std::fmt::Write;
1434        let marker = if is_target { "> " } else { "" };
1435        let _ = writeln!(
1436            out,
1437            "{}",
1438            rule_line(&format!(
1439                "[{index}] {marker}{} | {} | {}",
1440                view.role.as_str(),
1441                fmt_ts(&view.timestamp),
1442                view.id,
1443            )),
1444        );
1445        if let Some(text) = &view.text {
1446            push_lines(out, text, "");
1447        }
1448        if let Some(content) = &view.content {
1449            push_lines(out, content, "");
1450        }
1451        match parts {
1452            Some(parts) => {
1453                for part in parts {
1454                    render_part_full(out, part);
1455                }
1456            }
1457            None => {
1458                for part in summary {
1459                    render_part_summary(out, part);
1460                }
1461            }
1462        }
1463    }
1464
1465    fn render_part_full(out: &mut String, part: &ResponsePart) {
1466        use std::fmt::Write;
1467        match &part.kind {
1468            PartKind::Text { text } => {
1469                if let Some(text) = text {
1470                    push_lines(out, text, "");
1471                }
1472            }
1473            PartKind::Reasoning { text } => {
1474                let _ = writeln!(out, "  (reasoning)");
1475                if let Some(text) = text {
1476                    push_lines(out, text, "  ");
1477                }
1478            }
1479            PartKind::ToolCall {
1480                name,
1481                call_id,
1482                params,
1483                ..
1484            } => {
1485                let _ = writeln!(out, "  -> {} [{}]", opt_name(name), opt_name(call_id));
1486                push_lines(out, &value_to_text(params), "     ");
1487            }
1488            PartKind::ToolResult {
1489                name,
1490                call_id,
1491                is_failure,
1492                result,
1493            } => {
1494                let status = if *is_failure { "failed" } else { "ok" };
1495                let _ = writeln!(
1496                    out,
1497                    "  <- {} [{}] ({status})",
1498                    opt_name(name),
1499                    opt_name(call_id),
1500                );
1501                push_lines(out, &value_to_text(result), "     ");
1502            }
1503            PartKind::File {
1504                media_type,
1505                file_name,
1506                ..
1507            } => {
1508                let label = file_name
1509                    .as_deref()
1510                    .or(media_type.as_deref())
1511                    .unwrap_or("file");
1512                let _ = writeln!(out, "  [file {label}]");
1513            }
1514            PartKind::ToolApprovalRequest { approval_id, .. } => {
1515                let _ = writeln!(out, "  [approval request {approval_id}]");
1516            }
1517            PartKind::ToolApprovalResponse {
1518                approval_id,
1519                approved,
1520                ..
1521            } => {
1522                let verb = if *approved { "approved" } else { "denied" };
1523                let _ = writeln!(out, "  [approval {approval_id} {verb}]");
1524            }
1525        }
1526    }
1527
1528    fn render_part_summary(out: &mut String, summary: &PartSummary) {
1529        use std::fmt::Write;
1530        let label = summary.label.as_deref().unwrap_or("");
1531        let call = summary
1532            .call_id
1533            .as_deref()
1534            .map(|id| format!(" [{id}]"))
1535            .unwrap_or_default();
1536        match summary.kind.as_str() {
1537            "tool_call" => {
1538                let _ = writeln!(out, "  -> {label}{call}");
1539            }
1540            "tool_result" => {
1541                let _ = writeln!(out, "  <- {label}{call}");
1542            }
1543            "file" => {
1544                let _ = writeln!(out, "  [file {label}]");
1545            }
1546            other => {
1547                let _ = writeln!(out, "  [{other} {label}]");
1548            }
1549        }
1550    }
1551
1552    /// Render a tool param/result `Value` for the transcript: a JSON string
1553    /// shows as its text; anything else as compact JSON. `null` shows nothing.
1554    fn value_to_text(value: &serde_json::Value) -> String {
1555        match value {
1556            serde_json::Value::String(text) => text.clone(),
1557            serde_json::Value::Null => String::new(),
1558            other => serde_json::to_string(other).unwrap_or_default(),
1559        }
1560    }
1561
1562    /// Map a wire error envelope to a JSON-RPC error. rmcp ships no app-level
1563    /// codes, so pond defines its own `-32000`-family set here. The `data`
1564    /// payload carries pond's canonical string code and a `retryable` flag
1565    /// (per spec.md#error-model) so MCP callers can branch on retry semantics
1566    /// without parsing message strings or knowing the JSON-RPC code mapping.
1567    fn to_error_data(envelope: &ErrorEnvelope) -> ErrorData {
1568        let (jsonrpc_code, pond_code, retryable) = match envelope.error.code {
1569            WireErrorCode::ValidationFailed => (-32010, "validation_failed", false),
1570            WireErrorCode::VersionUnsupported => (-32011, "version_unsupported", false),
1571            WireErrorCode::NotFound => (-32012, "not_found", false),
1572            WireErrorCode::NamespaceUnknown => (-32013, "namespace_unknown", false),
1573            WireErrorCode::StorageUnavailable => (-32014, "storage_unavailable", true),
1574            WireErrorCode::Conflict => (-32015, "conflict", true),
1575            WireErrorCode::Internal => (-32016, "internal", false),
1576        };
1577        let mut data = match &envelope.error.details {
1578            serde_json::Value::Object(map) => map.clone(),
1579            _ => serde_json::Map::new(),
1580        };
1581        data.insert("pond_code".to_owned(), serde_json::json!(pond_code));
1582        data.insert("retryable".to_owned(), serde_json::json!(retryable));
1583        ErrorData::new(
1584            JsonRpcErrorCode(jsonrpc_code),
1585            envelope.error.message.clone(),
1586            Some(serde_json::Value::Object(data)),
1587        )
1588    }
1589
1590    #[cfg(test)]
1591    mod tests {
1592        #![allow(clippy::expect_used, clippy::unwrap_used)]
1593
1594        use std::sync::Arc;
1595
1596        use rmcp::model::{ErrorCode as JsonRpcErrorCode, Tool};
1597
1598        use super::*;
1599        use crate::wire::{ErrorBody, ErrorCode, Role, SearchResponse, SearchResult};
1600
1601        #[test]
1602        fn error_data_carries_code_and_retryability() {
1603            let cases = [
1604                (
1605                    ErrorCode::ValidationFailed,
1606                    -32010,
1607                    "validation_failed",
1608                    false,
1609                ),
1610                (
1611                    ErrorCode::VersionUnsupported,
1612                    -32011,
1613                    "version_unsupported",
1614                    false,
1615                ),
1616                (ErrorCode::NotFound, -32012, "not_found", false),
1617                (
1618                    ErrorCode::NamespaceUnknown,
1619                    -32013,
1620                    "namespace_unknown",
1621                    false,
1622                ),
1623                (
1624                    ErrorCode::StorageUnavailable,
1625                    -32014,
1626                    "storage_unavailable",
1627                    true,
1628                ),
1629                (ErrorCode::Conflict, -32015, "conflict", true),
1630                (ErrorCode::Internal, -32016, "internal", false),
1631            ];
1632            for (code, jsonrpc, pond_code, retryable) in cases {
1633                let error = to_error_data(&ErrorEnvelope {
1634                    error: ErrorBody {
1635                        code,
1636                        message: "boom".to_owned(),
1637                        details: serde_json::json!({"detail": 1}),
1638                    },
1639                });
1640                assert_eq!(error.code, JsonRpcErrorCode(jsonrpc));
1641                let data = error.data.unwrap();
1642                assert_eq!(data["detail"], serde_json::json!(1));
1643                assert_eq!(data["pond_code"], serde_json::json!(pond_code));
1644                assert_eq!(data["retryable"], serde_json::json!(retryable));
1645                assert!(
1646                    data.get("request_id").is_none(),
1647                    "MCP errors use JSON-RPC ids for correlation"
1648                );
1649            }
1650        }
1651
1652        #[test]
1653        fn annotate_tool_limits_sets_anthropic_meta() {
1654            let schema = Arc::new(serde_json::Map::new());
1655            let mut result = ListToolsResult {
1656                tools: vec![
1657                    Tool::new("pond_search", "Search", Arc::clone(&schema)),
1658                    Tool::new("pond_get", "Get", Arc::clone(&schema)),
1659                ],
1660                next_cursor: None,
1661                meta: None,
1662            };
1663            annotate_tool_limits(&mut result);
1664            let value = |name: &str| {
1665                result
1666                    .tools
1667                    .iter()
1668                    .find(|tool| tool.name == name)
1669                    .and_then(|tool| tool.meta.as_ref())
1670                    .and_then(|meta| meta.0.get("anthropic/maxResultSizeChars"))
1671                    .and_then(serde_json::Value::as_i64)
1672            };
1673            assert_eq!(value("pond_search"), Some(80_000));
1674            assert_eq!(value("pond_get"), Some(200_000));
1675        }
1676
1677        #[test]
1678        fn get_transcript_marks_target_and_renders_tool_parts() {
1679            let ts = chrono::DateTime::from_timestamp(0, 0).unwrap();
1680            let tool_call: ResponsePart = serde_json::from_value(serde_json::json!({
1681                "id": "p1", "ordinal": 0, "provenance": "conversational",
1682                "type": "tool_call", "name": "Bash", "call_id": "toolu_x",
1683                "params": { "command": "ls" }, "provider_executed": false,
1684            }))
1685            .unwrap();
1686            let tool_result: ResponsePart = serde_json::from_value(serde_json::json!({
1687                "id": "p2", "ordinal": 1, "provenance": "conversational",
1688                "type": "tool_result", "name": "Bash", "call_id": "toolu_x",
1689                "is_failure": false, "result": "file.txt",
1690            }))
1691            .unwrap();
1692            let target = MessageView {
1693                id: "m1".to_owned(),
1694                role: crate::wire::Role::Assistant,
1695                timestamp: ts,
1696                text: Some("Let me list files.".to_owned()),
1697                content: None,
1698                parts_summary: Vec::new(),
1699                parts: None,
1700            };
1701            let response = GetResponse {
1702                session: crate::wire::GetSession {
1703                    id: "s1".to_owned(),
1704                    source_agent: "claude-code".to_owned(),
1705                    project: "/p".to_owned(),
1706                    created_at: ts,
1707                },
1708                result: GetResult::Message {
1709                    target,
1710                    target_parts: vec![tool_call, tool_result],
1711                    target_parts_remaining: 0,
1712                    siblings: Vec::new(),
1713                },
1714            };
1715            let request = GetRequest {
1716                protocol_version: crate::PROTOCOL_VERSION,
1717                namespace: None,
1718                session_id: None,
1719                message_id: Some("m1".to_owned()),
1720                context_depth: 0,
1721                limit: 20,
1722                response_mode: ResponseMode::default(),
1723                session_from: SessionFrom::default(),
1724                after_id: None,
1725            };
1726
1727            let transcript = render_get_transcript(&response, &request);
1728            assert!(transcript.contains("--- [1] > assistant | 1970-01-01 00:00:00Z | m1 ---"));
1729            assert!(transcript.contains("Let me list files."));
1730            assert!(transcript.contains("  -> Bash [toolu_x]"));
1731            assert!(transcript.contains("  <- Bash [toolu_x] (ok)"));
1732            assert!(transcript.contains("session s1 | claude-code | /p"));
1733        }
1734
1735        #[test]
1736        fn search_transcript_renders_header_and_hits() {
1737            let response = SearchResponse {
1738                sessions: vec![crate::wire::SearchSession {
1739                    session_id: "s1".to_owned(),
1740                    project: "pond".to_owned(),
1741                    source_agent: "claude-code".to_owned(),
1742                    session_messages_count: 2,
1743                    matched_message_count: 1,
1744                    matches: vec![SearchResult {
1745                        message_id: "m1".to_owned(),
1746                        role: Role::User,
1747                        timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
1748                        text: "hello\nworld".to_owned(),
1749                        score: 1.0,
1750                        parts_summary: Vec::new(),
1751                    }],
1752                }],
1753                matched_total: 1,
1754                searchable_in_scope: 2,
1755                has_more: false,
1756            };
1757            let request = SearchRequest {
1758                protocol_version: crate::PROTOCOL_VERSION,
1759                namespace: None,
1760                query: "hi".to_owned(),
1761                mode_override: None,
1762                filters: SearchFilters::default(),
1763                limit: 10,
1764            };
1765
1766            let transcript = render_search_transcript(&response, &request);
1767            assert!(transcript.starts_with(
1768                "pond_search: 1 matching messages (2 searchable in scope), showing 1 hits from 1 \
1769                 sessions."
1770            ));
1771            assert!(
1772                transcript
1773                    .contains("key: session rules group hits by session, ordered by best hit")
1774            );
1775            assert!(
1776                transcript
1777                    .contains("--- session [1] best 1.00 | 1/2 matched | pond | claude-code | s1")
1778            );
1779            // Hit lines stay flat and indexed so callers can still extract
1780            // message_id from the same delimiter shape.
1781            assert!(transcript.contains(
1782                "--- [1] 1.00 | user | 1970-01-01 00:00:00Z | m1 | pond | claude-code | s1"
1783            ));
1784            // Stored "\n" renders as a real line break, not an escape.
1785            assert!(transcript.contains("hello\nworld"));
1786
1787            // The MCP result is transcript-only text (no structuredContent to
1788            // shadow it on the Claude Code client).
1789            let result = tool_result(transcript);
1790            assert!(result.content[0].raw.as_text().is_some());
1791            assert!(result.structured_content.is_none());
1792        }
1793    }
1794}