Skip to main content

pond/
transport.rs

1//! The HTTP+JSON and stdio-MCP transports: thin adapters over the shared wire
2//! handlers. Both transports dispatch to the same handler functions - no
3//! per-transport behavior divergence.
4//!
5//! HTTP exposes `POST /v1/search`, `POST /v1/get`, and `POST /v1/ingest`. MCP
6//! exposes only `pond_search` / `pond_get` (the kb-parity surface); ingest
7//! stays HTTP-only and CLI-only.
8
9use std::sync::Arc;
10
11use crate::{config::SearchConfig, embed::LazyEmbedder, sessions::Store};
12
13/// Shared state handed to both transports. `embedder` holds a lazy handle:
14/// the model isn't loaded until the first hybrid search asks for it, so
15/// `pond mcp` idles at ~50 MB resident and only pays the ~600 MB load cost on
16/// the first query that needs it (spec.md#search opt-in).
17#[derive(Clone)]
18pub struct AppState {
19    pub store: Arc<Store>,
20    pub embedder: Arc<LazyEmbedder>,
21    pub search: SearchConfig,
22}
23
24pub mod http {
25    //! axum HTTP+JSON server: `POST /v1/search`, `POST /v1/get`, and the `/mcp`
26    //! route carrying rmcp's streamable-HTTP MCP transport.
27
28    use std::net::{IpAddr, SocketAddr};
29
30    use anyhow::Context;
31    use axum::{
32        Json, Router,
33        extract::{DefaultBodyLimit, State},
34        http::{HeaderValue, StatusCode},
35        response::{IntoResponse, Response},
36        routing::post,
37    };
38    use rmcp::transport::streamable_http_server::{
39        StreamableHttpServerConfig, StreamableHttpService, session::local::LocalSessionManager,
40    };
41    use tokio::net::TcpListener;
42
43    use super::AppState;
44    use crate::{
45        handlers::{pond_get, pond_ingest, pond_search},
46        wire::{
47            ErrorCode, GetEnvelope, GetRequest, IngestEnvelope, IngestRequest, SearchEnvelope,
48            SearchRequest, default_namespace, new_request_id,
49        },
50    };
51
52    /// HTTP body cap for `POST /v1/*` JSON handlers (spec.md#protocol): 8 MB.
53    /// Replaces axum's 2 MB default - that default is more restrictive than the
54    /// design's intent and would surface oversize ingests as a generic 413
55    /// instead of pond's typed `validation_failed`.
56    pub const HTTP_BODY_LIMIT_BYTES: usize = 8 * 1024 * 1024;
57
58    /// Build the axum router: the `/v1/*` JSON handlers plus the nested `/mcp`
59    /// streamable-HTTP MCP service. Public so the integration test can drive it
60    /// without binding a socket.
61    pub fn router(state: AppState) -> Router {
62        let mcp_state = state.clone();
63        let mcp = StreamableHttpService::new(
64            move || Ok(super::mcp::PondMcp::new(mcp_state.clone())),
65            LocalSessionManager::default().into(),
66            StreamableHttpServerConfig::default(),
67        );
68        Router::new()
69            .route("/v1/search", post(search))
70            .route("/v1/get", post(get))
71            .route("/v1/ingest", post(ingest))
72            .layer(DefaultBodyLimit::max(HTTP_BODY_LIMIT_BYTES))
73            .with_state(state)
74            .nest_service("/mcp", mcp)
75    }
76
77    /// Bind and serve until ctrl-c. `--port 0` selects an OS-assigned free port;
78    /// an unspecified host (`0.0.0.0` / `::`) logs a security notice because the
79    /// personal pond is single-user and LAN exposure is opt-in (spec.md#scope).
80    pub async fn serve(state: AppState, host: String, port: u16) -> anyhow::Result<()> {
81        let ip: IpAddr = host
82            .parse()
83            .with_context(|| format!("invalid --host {host:?}"))?;
84        if ip.is_unspecified() {
85            tracing::warn!(
86                %host,
87                "binding to an unspecified address exposes pond on the LAN; \
88                 the personal pond is single-user"
89            );
90        }
91        let listener = TcpListener::bind(SocketAddr::new(ip, port))
92            .await
93            .with_context(|| format!("failed to bind {host}:{port}"))?;
94        let local = listener
95            .local_addr()
96            .context("failed to read bound address")?;
97        tracing::info!(%local, "pond serve listening (HTTP /v1/*, MCP /mcp)");
98        axum::serve(listener, router(state))
99            .with_graceful_shutdown(shutdown_signal())
100            .await
101            .context("axum server error")
102    }
103
104    async fn shutdown_signal() {
105        let _ = tokio::signal::ctrl_c().await;
106    }
107
108    async fn search(
109        State(state): State<AppState>,
110        Json(mut request): Json<SearchRequest>,
111    ) -> Response {
112        request.namespace.get_or_insert_with(default_namespace);
113        let envelope = pond_search(&state.store, &state.embedder, request, &state.search).await;
114        let status = match &envelope {
115            SearchEnvelope::Success(_) => StatusCode::OK,
116            SearchEnvelope::Error(error) => status_for(&error.error.code),
117        };
118        with_request_id((status, Json(envelope)).into_response())
119    }
120
121    async fn get(State(state): State<AppState>, Json(mut request): Json<GetRequest>) -> Response {
122        request.namespace.get_or_insert_with(default_namespace);
123        let envelope = pond_get(&state.store, request).await;
124        let status = match &envelope {
125            GetEnvelope::Success(_) => StatusCode::OK,
126            GetEnvelope::Error(error) => status_for(&error.error.code),
127        };
128        with_request_id((status, Json(envelope)).into_response())
129    }
130
131    async fn ingest(
132        State(state): State<AppState>,
133        Json(mut request): Json<IngestRequest>,
134    ) -> Response {
135        request.namespace.get_or_insert_with(default_namespace);
136        let envelope = pond_ingest(&state.store, request).await;
137        // Per-row errors in `results[]` are not request-level failures, so
138        // the envelope success path always returns 200; only transport-level
139        // failures (validation_failed, namespace_unknown, etc.) map to 4xx/5xx.
140        let status = match &envelope {
141            IngestEnvelope::Success(_) => StatusCode::OK,
142            IngestEnvelope::Error(error) => status_for(&error.error.code),
143        };
144        with_request_id((status, Json(envelope)).into_response())
145    }
146
147    fn with_request_id(mut response: Response) -> Response {
148        if let Ok(value) = HeaderValue::from_str(&new_request_id()) {
149            response.headers_mut().insert("x-pond-request-id", value);
150        }
151        response
152    }
153
154    /// Map a wire error code to an HTTP status. The envelope body still carries
155    /// the full typed error; the status is the coarse signal.
156    fn status_for(code: &ErrorCode) -> StatusCode {
157        match code {
158            ErrorCode::ValidationFailed
159            | ErrorCode::VersionUnsupported
160            | ErrorCode::NamespaceUnknown => StatusCode::BAD_REQUEST,
161            ErrorCode::NotFound => StatusCode::NOT_FOUND,
162            ErrorCode::Conflict => StatusCode::CONFLICT,
163            ErrorCode::StorageUnavailable => StatusCode::SERVICE_UNAVAILABLE,
164            ErrorCode::Internal => StatusCode::INTERNAL_SERVER_ERROR,
165        }
166    }
167}
168
169pub mod mcp {
170    //! The rmcp MCP layer: `pond_search` / `pond_get` tools and `schema://pond`
171    //! / `stats://pond` resources, transport-agnostic. Mounted on stdio (via
172    //! `pond mcp`) and on the `/mcp` HTTP route (via `pond serve`).
173
174    use anyhow::Context;
175    use rmcp::{
176        ErrorData, RoleServer, ServerHandler, ServiceExt,
177        handler::server::{router::tool::ToolRouter, wrapper::Parameters},
178        model::{
179            AnnotateAble, CallToolResult, Content, ErrorCode as JsonRpcErrorCode,
180            ListResourcesResult, ListToolsResult, Meta, PaginatedRequestParams, RawResource,
181            ReadResourceRequestParams, ReadResourceResult, ResourceContents, ServerCapabilities,
182            ServerInfo,
183        },
184        schemars,
185        service::RequestContext,
186        tool, tool_handler, tool_router,
187        transport::stdio,
188    };
189    use serde::Deserialize;
190
191    use super::AppState;
192    use crate::{
193        PROTOCOL_VERSION,
194        handlers::pond_get as run_get,
195        handlers::pond_search as run_search,
196        wire::{
197            ErrorCode as WireErrorCode, ErrorEnvelope, GetEnvelope, GetRequest, GetResponse,
198            GetResult, MessageView, PartKind, PartSummary, ProjectFilter, ResponseMode,
199            ResponsePart, SearchEnvelope, SearchFilters, SearchRequest, SearchResponse,
200            default_namespace,
201        },
202    };
203
204    /// Static documentation served as the `schema://pond` resource. Detail
205    /// agents load on demand; the per-tool descriptions below stay tight.
206    const SCHEMA_DOC: &str = "\
207pond_search filters: query (semantic - concepts, not project names), limit \
208(default 10, max 200), project (path substring), session_id (exact session \
209match), source_agent, role (user|assistant|system|tool), from_date / to_date \
210(YYYY-MM-DD), cursor (opaque continuation token).
211
212pond_search response: a transcript. The first line states totals \
213(`matched_total` is the message count before `limit` and byte-budget \
214truncation), then each hit is a `--- [n] score | role | time | message_id | \
215project | agent | session ---` rule followed by its matched text (a ~600-char \
216indexed window). Up to 3 top-scoring hits per session, score-desc; `score` is \
217normalized to [0.0, 1.0] within one response. When more remain, a `cursor:` \
218footer carries the token to pass back as `cursor`; rank may shift between \
219pages if the corpus changes.
220
221pond_search multilingual: pond's embedder (multilingual-e5-small) is trained \
222for cross-lingual retrieval, so a query in language A can match indexed text \
223in language B via the vector arm. The FTS arm is character-ngram-based and \
224only matches surface tokens, so for cross-lingual queries expect most signal \
225to come from the vector arm.
226
227pond_get: message_id (the target message, marked `>`, plus context_depth \
228sibling messages each side) OR session_id (the whole session). Output is a \
229transcript - each message is a `--- [n] role | time | message_id ---` rule, \
230then its text/content as real lines, then parts (`-> name [call_id]` tool \
231call, `<- name [call_id] (ok|failed)` result). Session mode takes \
232response_mode: \"conversational\" (default - human/model text only), \
233\"complete\" (all messages incl. carriers, tools as one-liners), or \
234\"verbatim\" (full part bodies inline; heaviest). limit defaults to 20, caps \
235at 1000. Bounded by a size budget: when the footer shows `after_id=`, pass it \
236back to page. Not for bulk export - use `pond export`.";
237
238    /// `pond_search` MCP tool parameters.
239    #[derive(Debug, Deserialize, schemars::JsonSchema)]
240    struct McpSearchParams {
241        /// What to search for: concepts and keywords. Keep it semantic - do
242        /// not put project names in the query, use the `project` filter
243        /// instead. Optional only when `similar_to` is set (vector-only mode
244        /// uses the stored vector and ignores the query text); required in
245        /// every other call.
246        #[serde(default)]
247        query: Option<String>,
248        /// Max hits to return. Default 10, server-capped at 200.
249        #[serde(default)]
250        limit: Option<usize>,
251        /// Filter to projects whose path contains this substring.
252        #[serde(default)]
253        project: Option<String>,
254        /// Filter to one session (exact match).
255        #[serde(default)]
256        session_id: Option<String>,
257        /// Filter to one source agent (e.g. "claude-code").
258        #[serde(default)]
259        source_agent: Option<String>,
260        /// Filter by message role: "user" or "assistant".
261        #[serde(default)]
262        role: Option<String>,
263        /// Only messages on or after this date (YYYY-MM-DD).
264        #[serde(default)]
265        from_date: Option<String>,
266        /// Only messages on or before this date (YYYY-MM-DD).
267        #[serde(default)]
268        to_date: Option<String>,
269        /// "Find similar messages to this one." When set, pond uses the
270        /// stored vector for `similar_to` as the kNN query and ignores the
271        /// `query` text; vector-only, no embedder load. Compose with
272        /// `pond_search` -> read top hit -> `pond_search(similar_to=<that
273        /// message_id>)` to explore neighbors of any returned hit.
274        #[serde(default)]
275        similar_to: Option<String>,
276        /// Opaque continuation token from a prior response's `next_cursor`.
277        #[serde(default)]
278        cursor: Option<String>,
279    }
280
281    /// `pond_get` MCP tool parameters. Exactly one of `message_id` /
282    /// `session_id` is required.
283    #[derive(Debug, Deserialize, schemars::JsonSchema)]
284    struct McpGetParams {
285        /// Retrieve this message: its full parts plus `context_depth` sibling
286        /// messages each side. `response_mode` is ignored in this mode.
287        #[serde(default)]
288        message_id: Option<String>,
289        /// Retrieve this whole session (mutually exclusive with message_id).
290        #[serde(default)]
291        session_id: Option<String>,
292        /// With message_id: messages of thread context to include on each side.
293        #[serde(default)]
294        context_depth: Option<usize>,
295        /// Cap on returned messages (session mode) or parts (message mode).
296        /// Default 20, max 1000.
297        #[serde(default)]
298        limit: Option<usize>,
299        /// Session-mode depth: "conversational" (default; human/model text
300        /// only, with part summaries), "complete" (all messages incl. carriers,
301        /// with part summaries), or "verbatim" (all messages with full parts
302        /// inline). Ignored in message mode.
303        #[serde(default)]
304        response_mode: Option<String>,
305        /// Exclusive continuation anchor from a prior response: the last
306        /// `message_id` (session mode) or last `part_id` (message mode).
307        #[serde(default)]
308        after_id: Option<String>,
309    }
310
311    fn parse_response_mode(value: Option<String>) -> ResponseMode {
312        match value.as_deref() {
313            Some("complete") => ResponseMode::Complete,
314            Some("verbatim") => ResponseMode::Verbatim,
315            // None or any other value falls back to the conversational default.
316            _ => ResponseMode::Conversational,
317        }
318    }
319
320    /// The pond MCP server: holds the shared state and the generated tool router.
321    #[derive(Clone)]
322    pub struct PondMcp {
323        state: AppState,
324        tool_router: ToolRouter<PondMcp>,
325    }
326
327    #[tool_router]
328    impl PondMcp {
329        pub fn new(state: AppState) -> Self {
330            Self {
331                state,
332                tool_router: Self::tool_router(),
333            }
334        }
335
336        #[tool(
337            description = "Hybrid (vector + BM25) search over stored conversation history. \
338                           Returns a readable transcript: a leading `key:` line explains the \
339                           format and the first line states totals, then each hit is a \
340                           `--- [n] score | role | time | message_id | project | agent | \
341                           session ---` delimiter rule followed by the matched text. Pass a \
342                           returned `message_id` to `pond_get` for full text; pass `cursor` \
343                           to page. Keep `query` semantic; use `project` / `session_id` \
344                           filters for scope."
345        )]
346        async fn pond_search(
347            &self,
348            Parameters(params): Parameters<McpSearchParams>,
349        ) -> Result<CallToolResult, ErrorData> {
350            let request = SearchRequest {
351                protocol_version: PROTOCOL_VERSION,
352                namespace: Some(default_namespace()),
353                query: params.query.unwrap_or_default(),
354                filters: SearchFilters {
355                    project: params.project.map(ProjectFilter::Contains),
356                    session_id: params.session_id,
357                    source_agent: params.source_agent,
358                    from_date: params.from_date,
359                    to_date: params.to_date,
360                    role: params.role,
361                    // min_score is intentionally not on the MCP surface; scores
362                    // are response-relative, so a server-side threshold is a
363                    // footgun for agent callers. CLI / HTTP still exposes it
364                    // for the bench harness.
365                    min_score: 0.0,
366                },
367                limit: params.limit.unwrap_or(10),
368                cursor: params.cursor,
369                mode_override: None,
370                similar_to: params.similar_to,
371            };
372            match run_search(
373                &self.state.store,
374                &self.state.embedder,
375                request.clone(),
376                &self.state.search,
377            )
378            .await
379            {
380                SearchEnvelope::Success(response) => {
381                    Ok(tool_result(render_search_transcript(&response, &request)))
382                }
383                SearchEnvelope::Error(envelope) => Err(to_error_data(&envelope)),
384            }
385        }
386
387        #[tool(
388            description = "Retrieve stored conversation content as a readable transcript \
389                           (a leading `key:` line explains the format). With `message_id`: \
390                           the requested message (marked `>`) plus `context_depth` sibling \
391                           messages each side, with its tool/file parts shown in full. With \
392                           `session_id`: the session at one of three `response_mode`s - \
393                           \"conversational\" (default; human/model text only), \"complete\" \
394                           (all messages, tools as one-liners), or \"verbatim\" (full part \
395                           bodies inline). Bounded by a size budget; when the footer shows an \
396                           `after_id=`, pass it back to page on. Tool/result lines render as \
397                           `-> name [call_id]` / `<- name [call_id] (ok|failed)`. Not for \
398                           bulk export - use `pond export`."
399        )]
400        async fn pond_get(
401            &self,
402            Parameters(params): Parameters<McpGetParams>,
403        ) -> Result<CallToolResult, ErrorData> {
404            let request = GetRequest {
405                protocol_version: PROTOCOL_VERSION,
406                namespace: Some(default_namespace()),
407                session_id: params.session_id,
408                message_id: params.message_id,
409                context_depth: params.context_depth.unwrap_or(0),
410                limit: params.limit.unwrap_or(20),
411                response_mode: parse_response_mode(params.response_mode),
412                after_id: params.after_id,
413            };
414            match run_get(&self.state.store, request.clone()).await {
415                GetEnvelope::Success(response) => {
416                    Ok(tool_result(render_get_transcript(&response, &request)))
417                }
418                GetEnvelope::Error(envelope) => Err(to_error_data(&envelope)),
419            }
420        }
421    }
422
423    // `router = self.tool_router` makes the generated `call_tool` / `list_tools`
424    // read the cached router field; the bare-`#[tool_handler]` default rebuilds
425    // the router via `Self::tool_router()` on every call instead.
426    #[tool_handler(router = self.tool_router)]
427    impl ServerHandler for PondMcp {
428        fn get_info(&self) -> ServerInfo {
429            ServerInfo::new(
430                ServerCapabilities::builder()
431                    .enable_tools()
432                    .enable_resources()
433                    .build(),
434            )
435            .with_instructions(
436                "pond: session storage and retrieval. Tools: pond_search (hybrid search \
437                 over conversation history), pond_get (retrieve a message with thread \
438                 context, or a full session). Resources: schema://pond, stats://pond.",
439            )
440        }
441
442        async fn list_resources(
443            &self,
444            _request: Option<PaginatedRequestParams>,
445            _context: RequestContext<RoleServer>,
446        ) -> Result<ListResourcesResult, ErrorData> {
447            Ok(ListResourcesResult {
448                resources: vec![
449                    RawResource::new("schema://pond", "pond search schema").no_annotation(),
450                    RawResource::new("stats://pond", "pond corpus stats").no_annotation(),
451                ],
452                next_cursor: None,
453                meta: None,
454            })
455        }
456
457        async fn read_resource(
458            &self,
459            request: ReadResourceRequestParams,
460            _context: RequestContext<RoleServer>,
461        ) -> Result<ReadResourceResult, ErrorData> {
462            match request.uri.as_str() {
463                "schema://pond" => Ok(ReadResourceResult::new(vec![ResourceContents::text(
464                    SCHEMA_DOC,
465                    request.uri,
466                )])),
467                "stats://pond" => {
468                    let store = &self.state.store;
469                    let map_err = |error: anyhow::Error| {
470                        ErrorData::internal_error(format!("stats unavailable: {error}"), None)
471                    };
472                    let (sessions, messages, parts) = store.row_counts().await.map_err(&map_err)?;
473                    let embedding = store.embedding_progress().await.map_err(&map_err)?;
474                    let stale = store.stale_embedding_count().await.map_err(&map_err)?;
475                    let indices = store.index_status().await.map_err(&map_err)?;
476
477                    let embedded_percent = if embedding.total == 0 {
478                        0.0
479                    } else {
480                        #[allow(clippy::cast_precision_loss)]
481                        let pct = (embedding.embedded as f64 / embedding.total as f64) * 100.0;
482                        (pct * 10.0).round() / 10.0
483                    };
484                    let index_rows = indices
485                        .iter()
486                        .map(|status| {
487                            serde_json::json!({
488                                "table": status.table.as_str(),
489                                "intent": status.intent_name,
490                                "exists": status.exists,
491                                "fragments_covered": status.fragments_covered,
492                                "unindexed_rows": status.unindexed_rows,
493                            })
494                        })
495                        .collect::<Vec<_>>();
496
497                    // spec.md#search: `search_text` is the conversational text
498                    // (filtered of harness-injected parts at the adapter seam).
499                    // `embedding.total` is the searchable population - that is
500                    // the right denominator for "% embedded", not total messages.
501                    let stats = serde_json::json!({
502                        "corpus": {
503                            "sessions": sessions,
504                            "messages": messages,
505                            "searchable_messages": embedding.total,
506                            "parts": parts,
507                        },
508                        "embeddings": {
509                            "model": embedding.model,
510                            "embedded": embedding.embedded,
511                            "searchable_total": embedding.total,
512                            "embedded_percent": embedded_percent,
513                            "stale_under_other_model": stale,
514                        },
515                        "indices": index_rows,
516                    });
517                    Ok(ReadResourceResult::new(vec![ResourceContents::text(
518                        stats.to_string(),
519                        request.uri,
520                    )]))
521                }
522                other => Err(ErrorData::resource_not_found(
523                    format!("unknown resource: {other}"),
524                    None,
525                )),
526            }
527        }
528
529        async fn list_tools(
530            &self,
531            request: Option<PaginatedRequestParams>,
532            context: RequestContext<RoleServer>,
533        ) -> Result<ListToolsResult, ErrorData> {
534            let _ = (request, context);
535            let mut result = ListToolsResult {
536                tools: self.tool_router.list_all(),
537                next_cursor: None,
538                meta: None,
539            };
540            annotate_tool_limits(&mut result);
541            Ok(result)
542        }
543    }
544
545    fn annotate_tool_limits(result: &mut ListToolsResult) {
546        for tool in &mut result.tools {
547            let chars = match tool.name.as_ref() {
548                "pond_search" => 80_000,
549                "pond_get" => 200_000,
550                _ => continue,
551            };
552            let mut meta = serde_json::Map::new();
553            meta.insert(
554                "anthropic/maxResultSizeChars".to_owned(),
555                serde_json::json!(chars),
556            );
557            tool.meta = Some(Meta(meta));
558        }
559    }
560
561    /// Run the stdio MCP server until the client disconnects. All diagnostics
562    /// go to stderr (the shared `tracing` subscriber); stdout carries only
563    /// JSON-RPC frames, written by rmcp's stdio transport (spec.md#scope).
564    pub async fn serve_stdio(state: AppState) -> anyhow::Result<()> {
565        let service = PondMcp::new(state)
566            .serve(stdio())
567            .await
568            .context("failed to start stdio MCP server")?;
569        service.waiting().await.context("stdio MCP server error")?;
570        Ok(())
571    }
572
573    /// Build an MCP tool result from a rendered transcript. Deliberately text
574    /// only: Claude Code surfaces `structuredContent` over the text block when
575    /// both are present, which would shadow the transcript - the readable view
576    /// is the whole point on the MCP surface. Programmatic clients that want the
577    /// structured wire shape use the HTTP `/v1/*` JSON API instead.
578    fn tool_result(transcript: String) -> CallToolResult {
579        CallToolResult::success(vec![Content::text(transcript)])
580    }
581
582    /// `YYYY-MM-DD HH:MM:SSZ` - compact, sortable, timezone-explicit.
583    fn fmt_ts(ts: &chrono::DateTime<chrono::Utc>) -> String {
584        ts.format("%Y-%m-%d %H:%M:%SZ").to_string()
585    }
586
587    /// Inner string of an `Extracted<String>` option, or `?` when the source
588    /// carried none (spec.md#model-no-synthesis: absence is real, not a blank).
589    fn opt_name(value: &Option<crate::adapter::extract::Extracted<String>>) -> &str {
590        value.as_deref().map(String::as_str).unwrap_or("?")
591    }
592
593    /// Append each line of `body` to `out`, so escaped `\n` in stored text
594    /// renders as real line breaks. A trailing blank line in the source is
595    /// dropped (lines() already does this).
596    fn push_lines(out: &mut String, body: &str, indent: &str) {
597        use std::fmt::Write;
598        for line in body.lines() {
599            let _ = writeln!(out, "{indent}{line}");
600        }
601    }
602
603    fn render_search_transcript(response: &SearchResponse, request: &SearchRequest) -> String {
604        use std::fmt::Write;
605        if response.sessions.is_empty() {
606            return match request.similar_to.as_deref() {
607                Some(id) => format!("pond_search: no matches similar to {id}.\n"),
608                None => format!("pond_search: no matches for {:?}.\n", request.query),
609            };
610        }
611        let shown: usize = response.sessions.iter().map(|s| s.matches.len()).sum();
612        let sim = request
613            .similar_to
614            .as_deref()
615            .map(|id| format!(" similar to {id}"))
616            .unwrap_or_default();
617        let mut out = String::new();
618        let _ = writeln!(
619            out,
620            "pond_search: {} matches in {} sessions, showing {}{}.",
621            response.matched_total,
622            response.sessions.len(),
623            shown,
624            sim,
625        );
626        let _ = writeln!(
627            out,
628            "key: \"--- [n] score | role | time | message_id | project | agent | session ---\" delimits each hit + matched text. pond_get <message_id> for full; pass cursor to page."
629        );
630        let mut index = 0;
631        for session in &response.sessions {
632            for hit in &session.matches {
633                index += 1;
634                let _ = writeln!(out);
635                let _ = writeln!(
636                    out,
637                    "{}",
638                    rule_line(&format!(
639                        "[{index}] {:.2} | {} | {} | {} | {} | {} | {}",
640                        hit.score,
641                        hit.role.as_str(),
642                        fmt_ts(&hit.timestamp),
643                        hit.message_id,
644                        session.project,
645                        session.source_agent,
646                        session.session_id,
647                    )),
648                );
649                push_lines(&mut out, &hit.text, "");
650            }
651        }
652        if let Some(cursor) = &response.next_cursor {
653            let _ = writeln!(out);
654            let _ = writeln!(out, "cursor: {cursor} (pass as `cursor` to page)");
655        }
656        out
657    }
658
659    fn render_get_transcript(response: &GetResponse, request: &GetRequest) -> String {
660        use std::fmt::Write;
661        let session = &response.session;
662        let mut out = String::new();
663        match &response.result {
664            GetResult::Session {
665                messages,
666                messages_remaining,
667            } => {
668                let mode = match request.response_mode {
669                    ResponseMode::Conversational => "conversational",
670                    ResponseMode::Complete => "complete",
671                    ResponseMode::Verbatim => "verbatim",
672                };
673                let more = if *messages_remaining > 0 {
674                    " (more)"
675                } else {
676                    ""
677                };
678                let _ = writeln!(
679                    out,
680                    "pond_get: session {} ({mode}), {} messages{more}.",
681                    session.id,
682                    messages.len(),
683                );
684                let _ = writeln!(
685                    out,
686                    "key: \"--- [n] role | time | message_id ---\" delimits each message; \"->\" tool call, \"<-\" result. Pass after_id=<id> to page."
687                );
688                for (idx, message) in messages.iter().enumerate() {
689                    let _ = writeln!(out);
690                    render_message(
691                        &mut out,
692                        idx + 1,
693                        message,
694                        message.parts.as_deref(),
695                        &message.parts_summary,
696                        false,
697                    );
698                }
699                let _ = writeln!(out);
700                let _ = writeln!(
701                    out,
702                    "session {} | {} | {}",
703                    session.id, session.source_agent, session.project,
704                );
705                if *messages_remaining > 0
706                    && let Some(last) = messages.last()
707                {
708                    let _ = writeln!(
709                        out,
710                        "... {} more messages; pass after_id={} to pond_get to continue",
711                        messages_remaining, last.id,
712                    );
713                }
714            }
715            GetResult::Message {
716                target,
717                target_parts,
718                target_parts_remaining,
719                siblings,
720            } => {
721                let _ = writeln!(
722                    out,
723                    "pond_get: thread around {} in session {} (context +/-{}).",
724                    target.id, session.id, request.context_depth,
725                );
726                let _ = writeln!(
727                    out,
728                    "key: \"--- [n] role | time | message_id ---\" delimits each message; \">\" = the one you requested; \"->\" tool call, \"<-\" result. pond_get <message_id> to expand any line."
729                );
730                // Interleave target with siblings, ordered by (timestamp, id) to
731                // match storage - codex writes many messages at the same
732                // timestamp, so the id is the real tiebreak (a bare timestamp
733                // sort scrambles them). Drop context siblings with nothing to
734                // render (carrier turns with no text/content/parts); the
735                // requested target always stays, even if empty.
736                let mut thread: Vec<(&MessageView, bool)> =
737                    siblings.iter().map(|view| (view, false)).collect();
738                thread.push((target, true));
739                thread.sort_by(|a, b| {
740                    a.0.timestamp
741                        .cmp(&b.0.timestamp)
742                        .then_with(|| a.0.id.cmp(&b.0.id))
743                });
744                thread.retain(|(view, is_target)| *is_target || message_has_content(view));
745                for (idx, (view, is_target)) in thread.iter().enumerate() {
746                    let _ = writeln!(out);
747                    let parts: Option<&[ResponsePart]> = if *is_target {
748                        Some(target_parts.as_slice())
749                    } else {
750                        view.parts.as_deref()
751                    };
752                    render_message(
753                        &mut out,
754                        idx + 1,
755                        view,
756                        parts,
757                        &view.parts_summary,
758                        *is_target,
759                    );
760                }
761                let _ = writeln!(out);
762                let _ = writeln!(
763                    out,
764                    "session {} | {} | {}",
765                    session.id, session.source_agent, session.project,
766                );
767                if *target_parts_remaining > 0
768                    && let Some(last) = target_parts.last()
769                {
770                    let _ = writeln!(
771                        out,
772                        "... {} more parts of {}; pass after_id={} to pond_get to continue",
773                        target_parts_remaining, target.id, last.id,
774                    );
775                }
776            }
777        }
778        out
779    }
780
781    /// Whether a message view has anything to render below its header: real
782    /// text/content, or any parts (full or summarized). Used to drop empty
783    /// carrier turns from message-mode context.
784    fn message_has_content(view: &MessageView) -> bool {
785        view.text.as_deref().is_some_and(|t| !t.trim().is_empty())
786            || view
787                .content
788                .as_deref()
789                .is_some_and(|c| !c.trim().is_empty())
790            || view.parts.as_deref().is_some_and(|p| !p.is_empty())
791            || !view.parts_summary.is_empty()
792    }
793
794    /// Target column width for a delimiter-rule header.
795    const RULE_WIDTH: usize = 72;
796
797    /// Wrap `inner` as a delimiter rule: `--- {inner} ----...` padded to
798    /// [`RULE_WIDTH`] (always at least a 3-dash tail when `inner` is already
799    /// wide). Used for both search hits and get message headers.
800    fn rule_line(inner: &str) -> String {
801        let head = format!("--- {inner} ");
802        let pad = RULE_WIDTH.saturating_sub(head.chars().count()).max(3);
803        format!("{head}{}", "-".repeat(pad))
804    }
805
806    /// One message block: an indexed `--- [n] role | time | id ---` delimiter
807    /// rule (unambiguous even when the body has blank lines or `##` headings),
808    /// then text/content as real lines, then parts - full bodies when `parts`
809    /// is present, else one-line summaries.
810    fn render_message(
811        out: &mut String,
812        index: usize,
813        view: &MessageView,
814        parts: Option<&[ResponsePart]>,
815        summary: &[PartSummary],
816        is_target: bool,
817    ) {
818        use std::fmt::Write;
819        let marker = if is_target { "> " } else { "" };
820        let _ = writeln!(
821            out,
822            "{}",
823            rule_line(&format!(
824                "[{index}] {marker}{} | {} | {}",
825                view.role.as_str(),
826                fmt_ts(&view.timestamp),
827                view.id,
828            )),
829        );
830        if let Some(text) = &view.text {
831            push_lines(out, text, "");
832        }
833        if let Some(content) = &view.content {
834            push_lines(out, content, "");
835        }
836        match parts {
837            Some(parts) => {
838                for part in parts {
839                    render_part_full(out, part);
840                }
841            }
842            None => {
843                for part in summary {
844                    render_part_summary(out, part);
845                }
846            }
847        }
848    }
849
850    fn render_part_full(out: &mut String, part: &ResponsePart) {
851        use std::fmt::Write;
852        match &part.kind {
853            PartKind::Text { text } => {
854                if let Some(text) = text {
855                    push_lines(out, text, "");
856                }
857            }
858            PartKind::Reasoning { text } => {
859                let _ = writeln!(out, "  (reasoning)");
860                if let Some(text) = text {
861                    push_lines(out, text, "  ");
862                }
863            }
864            PartKind::ToolCall {
865                name,
866                call_id,
867                params,
868                ..
869            } => {
870                let _ = writeln!(out, "  -> {} [{}]", opt_name(name), opt_name(call_id));
871                push_lines(out, &value_to_text(params), "     ");
872            }
873            PartKind::ToolResult {
874                name,
875                call_id,
876                is_failure,
877                result,
878            } => {
879                let status = if *is_failure { "failed" } else { "ok" };
880                let _ = writeln!(
881                    out,
882                    "  <- {} [{}] ({status})",
883                    opt_name(name),
884                    opt_name(call_id),
885                );
886                push_lines(out, &value_to_text(result), "     ");
887            }
888            PartKind::File {
889                media_type,
890                file_name,
891                ..
892            } => {
893                let _ = writeln!(
894                    out,
895                    "  [file {}]",
896                    file_name.as_deref().unwrap_or(media_type)
897                );
898            }
899            PartKind::ToolApprovalRequest { approval_id, .. } => {
900                let _ = writeln!(out, "  [approval request {approval_id}]");
901            }
902            PartKind::ToolApprovalResponse {
903                approval_id,
904                approved,
905                ..
906            } => {
907                let verb = if *approved { "approved" } else { "denied" };
908                let _ = writeln!(out, "  [approval {approval_id} {verb}]");
909            }
910        }
911    }
912
913    fn render_part_summary(out: &mut String, summary: &PartSummary) {
914        use std::fmt::Write;
915        let label = summary.label.as_deref().unwrap_or("");
916        let call = summary
917            .call_id
918            .as_deref()
919            .map(|id| format!(" [{id}]"))
920            .unwrap_or_default();
921        match summary.kind.as_str() {
922            "tool_call" => {
923                let _ = writeln!(out, "  -> {label}{call}");
924            }
925            "tool_result" => {
926                let _ = writeln!(out, "  <- {label}{call}");
927            }
928            "file" => {
929                let _ = writeln!(out, "  [file {label}]");
930            }
931            other => {
932                let _ = writeln!(out, "  [{other} {label}]");
933            }
934        }
935    }
936
937    /// Render a tool param/result `Value` for the transcript: a JSON string
938    /// shows as its text; anything else as compact JSON. `null` shows nothing.
939    fn value_to_text(value: &serde_json::Value) -> String {
940        match value {
941            serde_json::Value::String(text) => text.clone(),
942            serde_json::Value::Null => String::new(),
943            other => serde_json::to_string(other).unwrap_or_default(),
944        }
945    }
946
947    /// Map a wire error envelope to a JSON-RPC error. rmcp ships no app-level
948    /// codes, so pond defines its own `-32000`-family set here. The `data`
949    /// payload carries pond's canonical string code and a `retryable` flag
950    /// (per spec.md#error-model) so MCP callers can branch on retry semantics
951    /// without parsing message strings or knowing the JSON-RPC code mapping.
952    fn to_error_data(envelope: &ErrorEnvelope) -> ErrorData {
953        let (jsonrpc_code, pond_code, retryable) = match envelope.error.code {
954            WireErrorCode::ValidationFailed => (-32010, "validation_failed", false),
955            WireErrorCode::VersionUnsupported => (-32011, "version_unsupported", false),
956            WireErrorCode::NotFound => (-32012, "not_found", false),
957            WireErrorCode::NamespaceUnknown => (-32013, "namespace_unknown", false),
958            WireErrorCode::StorageUnavailable => (-32014, "storage_unavailable", true),
959            WireErrorCode::Conflict => (-32015, "conflict", true),
960            WireErrorCode::Internal => (-32016, "internal", false),
961        };
962        let mut data = match &envelope.error.details {
963            serde_json::Value::Object(map) => map.clone(),
964            _ => serde_json::Map::new(),
965        };
966        data.insert("pond_code".to_owned(), serde_json::json!(pond_code));
967        data.insert("retryable".to_owned(), serde_json::json!(retryable));
968        ErrorData::new(
969            JsonRpcErrorCode(jsonrpc_code),
970            envelope.error.message.clone(),
971            Some(serde_json::Value::Object(data)),
972        )
973    }
974
975    #[cfg(test)]
976    mod tests {
977        #![allow(clippy::expect_used, clippy::unwrap_used)]
978
979        use std::sync::Arc;
980
981        use rmcp::model::{ErrorCode as JsonRpcErrorCode, Tool};
982
983        use super::*;
984        use crate::wire::{ErrorBody, ErrorCode, Role, SearchResponse, SearchResult};
985
986        #[test]
987        fn error_data_carries_code_and_retryability() {
988            let cases = [
989                (
990                    ErrorCode::ValidationFailed,
991                    -32010,
992                    "validation_failed",
993                    false,
994                ),
995                (
996                    ErrorCode::VersionUnsupported,
997                    -32011,
998                    "version_unsupported",
999                    false,
1000                ),
1001                (ErrorCode::NotFound, -32012, "not_found", false),
1002                (
1003                    ErrorCode::NamespaceUnknown,
1004                    -32013,
1005                    "namespace_unknown",
1006                    false,
1007                ),
1008                (
1009                    ErrorCode::StorageUnavailable,
1010                    -32014,
1011                    "storage_unavailable",
1012                    true,
1013                ),
1014                (ErrorCode::Conflict, -32015, "conflict", true),
1015                (ErrorCode::Internal, -32016, "internal", false),
1016            ];
1017            for (code, jsonrpc, pond_code, retryable) in cases {
1018                let error = to_error_data(&ErrorEnvelope {
1019                    error: ErrorBody {
1020                        code,
1021                        message: "boom".to_owned(),
1022                        details: serde_json::json!({"detail": 1}),
1023                    },
1024                });
1025                assert_eq!(error.code, JsonRpcErrorCode(jsonrpc));
1026                let data = error.data.unwrap();
1027                assert_eq!(data["detail"], serde_json::json!(1));
1028                assert_eq!(data["pond_code"], serde_json::json!(pond_code));
1029                assert_eq!(data["retryable"], serde_json::json!(retryable));
1030                assert!(
1031                    data.get("request_id").is_none(),
1032                    "MCP errors use JSON-RPC ids for correlation"
1033                );
1034            }
1035        }
1036
1037        #[test]
1038        fn annotate_tool_limits_sets_anthropic_meta() {
1039            let schema = Arc::new(serde_json::Map::new());
1040            let mut result = ListToolsResult {
1041                tools: vec![
1042                    Tool::new("pond_search", "Search", Arc::clone(&schema)),
1043                    Tool::new("pond_get", "Get", Arc::clone(&schema)),
1044                ],
1045                next_cursor: None,
1046                meta: None,
1047            };
1048            annotate_tool_limits(&mut result);
1049            let value = |name: &str| {
1050                result
1051                    .tools
1052                    .iter()
1053                    .find(|tool| tool.name == name)
1054                    .and_then(|tool| tool.meta.as_ref())
1055                    .and_then(|meta| meta.0.get("anthropic/maxResultSizeChars"))
1056                    .and_then(serde_json::Value::as_i64)
1057            };
1058            assert_eq!(value("pond_search"), Some(80_000));
1059            assert_eq!(value("pond_get"), Some(200_000));
1060        }
1061
1062        #[test]
1063        fn get_transcript_marks_target_and_renders_tool_parts() {
1064            let ts = chrono::DateTime::from_timestamp(0, 0).unwrap();
1065            let tool_call: ResponsePart = serde_json::from_value(serde_json::json!({
1066                "id": "p1", "ordinal": 0, "provenance": "conversational",
1067                "type": "tool_call", "name": "Bash", "call_id": "toolu_x",
1068                "params": { "command": "ls" }, "provider_executed": false,
1069            }))
1070            .unwrap();
1071            let tool_result: ResponsePart = serde_json::from_value(serde_json::json!({
1072                "id": "p2", "ordinal": 1, "provenance": "conversational",
1073                "type": "tool_result", "name": "Bash", "call_id": "toolu_x",
1074                "is_failure": false, "result": "file.txt",
1075            }))
1076            .unwrap();
1077            let target = MessageView {
1078                id: "m1".to_owned(),
1079                role: crate::wire::Role::Assistant,
1080                timestamp: ts,
1081                text: Some("Let me list files.".to_owned()),
1082                content: None,
1083                parts_summary: Vec::new(),
1084                parts: None,
1085            };
1086            let response = GetResponse {
1087                session: crate::wire::GetSession {
1088                    id: "s1".to_owned(),
1089                    source_agent: "claude-code".to_owned(),
1090                    project: "/p".to_owned(),
1091                    created_at: ts,
1092                },
1093                result: GetResult::Message {
1094                    target,
1095                    target_parts: vec![tool_call, tool_result],
1096                    target_parts_remaining: 0,
1097                    siblings: Vec::new(),
1098                },
1099            };
1100            let request = GetRequest {
1101                protocol_version: crate::PROTOCOL_VERSION,
1102                namespace: None,
1103                session_id: None,
1104                message_id: Some("m1".to_owned()),
1105                context_depth: 0,
1106                limit: 20,
1107                response_mode: ResponseMode::default(),
1108                after_id: None,
1109            };
1110
1111            let transcript = render_get_transcript(&response, &request);
1112            assert!(transcript.contains("--- [1] > assistant | 1970-01-01 00:00:00Z | m1 ---"));
1113            assert!(transcript.contains("Let me list files."));
1114            assert!(transcript.contains("  -> Bash [toolu_x]"));
1115            assert!(transcript.contains("  <- Bash [toolu_x] (ok)"));
1116            assert!(transcript.contains("session s1 | claude-code | /p"));
1117        }
1118
1119        #[test]
1120        fn search_transcript_renders_header_and_hits() {
1121            let response = SearchResponse {
1122                sessions: vec![crate::wire::SearchSession {
1123                    session_id: "s1".to_owned(),
1124                    project: "pond".to_owned(),
1125                    source_agent: "claude-code".to_owned(),
1126                    session_messages_count: 2,
1127                    matched_message_count: 1,
1128                    matches: vec![SearchResult {
1129                        message_id: "m1".to_owned(),
1130                        role: Role::User,
1131                        timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
1132                        text: "hello\nworld".to_owned(),
1133                        score: 1.0,
1134                        parts_summary: Vec::new(),
1135                    }],
1136                }],
1137                matched_total: 1,
1138                has_more: false,
1139                next_cursor: None,
1140            };
1141            let request = SearchRequest {
1142                protocol_version: crate::PROTOCOL_VERSION,
1143                namespace: None,
1144                query: "hi".to_owned(),
1145                mode_override: None,
1146                similar_to: None,
1147                filters: SearchFilters::default(),
1148                limit: 10,
1149                cursor: None,
1150            };
1151
1152            let transcript = render_search_transcript(&response, &request);
1153            assert!(transcript.starts_with("pond_search: 1 matches in 1 sessions, showing 1."));
1154            assert!(transcript.contains("key: \"--- [n] score | role | time | message_id"));
1155            // Flat indexed hit: score/role/time/id and the session metadata
1156            // folded into one delimiter rule, no separate `#` header.
1157            assert!(transcript.contains(
1158                "--- [1] 1.00 | user | 1970-01-01 00:00:00Z | m1 | pond | claude-code | s1"
1159            ));
1160            // Stored "\n" renders as a real line break, not an escape.
1161            assert!(transcript.contains("hello\nworld"));
1162
1163            // The MCP result is transcript-only text (no structuredContent to
1164            // shadow it on the Claude Code client).
1165            let result = tool_result(transcript);
1166            assert!(result.content[0].raw.as_text().is_some());
1167            assert!(result.structured_content.is_none());
1168        }
1169    }
1170}