1use std::sync::Arc;
10
11use crate::{config::SearchConfig, embed::LazyEmbedder, sessions::Store};
12
13#[derive(Clone)]
18pub struct AppState {
19 pub store: Arc<Store>,
20 pub embedder: Arc<LazyEmbedder>,
21 pub search: SearchConfig,
22}
23
24pub mod http {
25 use std::net::{IpAddr, SocketAddr};
29
30 use anyhow::Context;
31 use axum::{
32 Json, Router,
33 extract::{DefaultBodyLimit, State},
34 http::{HeaderValue, StatusCode},
35 response::{IntoResponse, Response},
36 routing::post,
37 };
38 use rmcp::transport::streamable_http_server::{
39 StreamableHttpServerConfig, StreamableHttpService, session::local::LocalSessionManager,
40 };
41 use tokio::net::TcpListener;
42
43 use super::AppState;
44 use crate::{
45 handlers::{pond_get, pond_ingest, pond_search},
46 wire::{
47 ErrorCode, GetEnvelope, GetRequest, IngestEnvelope, IngestRequest, SearchEnvelope,
48 SearchRequest, default_namespace, new_request_id,
49 },
50 };
51
52 pub const HTTP_BODY_LIMIT_BYTES: usize = 8 * 1024 * 1024;
57
58 pub fn router(state: AppState) -> Router {
62 let mcp_state = state.clone();
63 let mcp = StreamableHttpService::new(
64 move || Ok(super::mcp::PondMcp::new(mcp_state.clone())),
65 LocalSessionManager::default().into(),
66 StreamableHttpServerConfig::default(),
67 );
68 Router::new()
69 .route("/v1/search", post(search))
70 .route("/v1/get", post(get))
71 .route("/v1/ingest", post(ingest))
72 .layer(DefaultBodyLimit::max(HTTP_BODY_LIMIT_BYTES))
73 .with_state(state)
74 .nest_service("/mcp", mcp)
75 }
76
77 pub async fn serve(state: AppState, host: String, port: u16) -> anyhow::Result<()> {
81 let ip: IpAddr = host
82 .parse()
83 .with_context(|| format!("invalid --host {host:?}"))?;
84 if ip.is_unspecified() {
85 tracing::warn!(
86 %host,
87 "binding to an unspecified address exposes pond on the LAN; \
88 the personal pond is single-user"
89 );
90 }
91 let listener = TcpListener::bind(SocketAddr::new(ip, port))
92 .await
93 .with_context(|| format!("failed to bind {host}:{port}"))?;
94 let local = listener
95 .local_addr()
96 .context("failed to read bound address")?;
97 tracing::info!(%local, "pond serve listening (HTTP /v1/*, MCP /mcp)");
98 axum::serve(listener, router(state))
99 .with_graceful_shutdown(shutdown_signal())
100 .await
101 .context("axum server error")
102 }
103
104 async fn shutdown_signal() {
105 let _ = tokio::signal::ctrl_c().await;
106 }
107
108 async fn search(
109 State(state): State<AppState>,
110 Json(mut request): Json<SearchRequest>,
111 ) -> Response {
112 request.namespace.get_or_insert_with(default_namespace);
113 let envelope = pond_search(&state.store, &state.embedder, request, &state.search).await;
114 let status = match &envelope {
115 SearchEnvelope::Success(_) => StatusCode::OK,
116 SearchEnvelope::Error(error) => status_for(&error.error.code),
117 };
118 with_request_id((status, Json(envelope)).into_response())
119 }
120
121 async fn get(State(state): State<AppState>, Json(mut request): Json<GetRequest>) -> Response {
122 request.namespace.get_or_insert_with(default_namespace);
123 let envelope = pond_get(&state.store, request).await;
124 let status = match &envelope {
125 GetEnvelope::Success(_) => StatusCode::OK,
126 GetEnvelope::Error(error) => status_for(&error.error.code),
127 };
128 with_request_id((status, Json(envelope)).into_response())
129 }
130
131 async fn ingest(
132 State(state): State<AppState>,
133 Json(mut request): Json<IngestRequest>,
134 ) -> Response {
135 request.namespace.get_or_insert_with(default_namespace);
136 let envelope = pond_ingest(&state.store, request).await;
137 let status = match &envelope {
141 IngestEnvelope::Success(_) => StatusCode::OK,
142 IngestEnvelope::Error(error) => status_for(&error.error.code),
143 };
144 with_request_id((status, Json(envelope)).into_response())
145 }
146
147 fn with_request_id(mut response: Response) -> Response {
148 if let Ok(value) = HeaderValue::from_str(&new_request_id()) {
149 response.headers_mut().insert("x-pond-request-id", value);
150 }
151 response
152 }
153
154 fn status_for(code: &ErrorCode) -> StatusCode {
157 match code {
158 ErrorCode::ValidationFailed
159 | ErrorCode::VersionUnsupported
160 | ErrorCode::NamespaceUnknown => StatusCode::BAD_REQUEST,
161 ErrorCode::NotFound => StatusCode::NOT_FOUND,
162 ErrorCode::Conflict => StatusCode::CONFLICT,
163 ErrorCode::StorageUnavailable => StatusCode::SERVICE_UNAVAILABLE,
164 ErrorCode::Internal => StatusCode::INTERNAL_SERVER_ERROR,
165 }
166 }
167}
168
169pub mod mcp {
170 use anyhow::Context;
175 use rmcp::{
176 ErrorData, RoleServer, ServerHandler, ServiceExt,
177 handler::server::{router::tool::ToolRouter, wrapper::Parameters},
178 model::{
179 AnnotateAble, CallToolResult, Content, ErrorCode as JsonRpcErrorCode,
180 ListResourcesResult, ListToolsResult, Meta, PaginatedRequestParams, RawResource,
181 ReadResourceRequestParams, ReadResourceResult, ResourceContents, ServerCapabilities,
182 ServerInfo,
183 },
184 schemars,
185 service::RequestContext,
186 tool, tool_handler, tool_router,
187 transport::stdio,
188 };
189 use serde::Deserialize;
190
191 use super::AppState;
192 use crate::{
193 PROTOCOL_VERSION,
194 handlers::pond_get as run_get,
195 handlers::pond_search as run_search,
196 wire::{
197 ErrorCode as WireErrorCode, ErrorEnvelope, GetEnvelope, GetRequest, GetResponse,
198 GetResult, MessageView, PartKind, PartSummary, ProjectFilter, ResponseMode,
199 ResponsePart, SearchEnvelope, SearchFilters, SearchRequest, SearchResponse,
200 SessionFrom, default_namespace,
201 },
202 };
203
204 const SCHEMA_DOC: &str = "\
207pond_search filters: query (semantic - concepts, not project names), limit \
208(returned sessions; default 10, max 200), project (path substring), session_id \
209(exact session match), source_agent, role (user|assistant|system|tool), \
210from_date / to_date (YYYY-MM-DD), cursor (opaque continuation token).
211
212pond_search response: a transcript. The first line states totals \
213(`matched_total` is the message count before `limit` and byte-budget \
214truncation), then results are grouped by session, ordered by each session's \
215best hit. Each session lists up to 3 top-scoring hits, score-desc; each hit is \
216a `--- [n] score | role | time | message_id | project | agent | session ---` \
217rule followed by its matched text (a ~600-char indexed window). `score` is \
218normalized to [0.0, 1.0] within one response. When more remain, a `cursor:` \
219footer carries the token to pass back as `cursor`; rank may shift between \
220pages if the corpus changes.
221
222pond_search multilingual: pond's embedder (multilingual-e5-small) is trained \
223for cross-lingual retrieval, so a query in language A can match indexed text \
224in language B via the vector arm. The FTS arm is character-ngram-based and \
225only matches surface tokens, so for cross-lingual queries expect most signal \
226to come from the vector arm.
227
228pond_get: message_id (the target message, marked `>`, plus context_depth \
229sibling messages each side) OR session_id (the whole session). Output is a \
230transcript - each message is a `--- [n] role | time | message_id ---` rule, \
231then its text/content as real lines, then parts (`-> name [call_id]` tool \
232call, `<- name [call_id] (ok|failed)` result). Session mode takes \
233response_mode: \"conversational\" (default - human/model text only), \
234\"complete\" (all messages incl. carriers, tools as one-liners), or \
235\"verbatim\" (full part bodies inline; heaviest). limit defaults to 20, caps \
236at 1000. Bounded by a size budget: when the footer shows `after_id=`, pass it \
237back to page. A whole-session response also lists the session's subagents (each \
238stored as its own session) in a footer; pass a listed id back as session_id to \
239open it. Not for bulk export - use `pond export`.";
240
241 #[derive(Debug, Deserialize, schemars::JsonSchema)]
243 struct McpSearchParams {
244 #[serde(default)]
250 query: Option<String>,
251 #[serde(default)]
253 limit: Option<usize>,
254 #[serde(default)]
256 project: Option<String>,
257 #[serde(default)]
259 session_id: Option<String>,
260 #[serde(default)]
263 source_agent: Option<String>,
264 #[serde(default)]
268 include_subagents: Option<bool>,
269 #[serde(default)]
271 role: Option<String>,
272 #[serde(default)]
274 from_date: Option<String>,
275 #[serde(default)]
277 to_date: Option<String>,
278 #[serde(default)]
284 similar_to: Option<String>,
285 #[serde(default)]
287 cursor: Option<String>,
288 }
289
290 #[derive(Debug, Deserialize, schemars::JsonSchema)]
293 struct McpGetParams {
294 #[serde(default)]
297 message_id: Option<String>,
298 #[serde(default)]
300 session_id: Option<String>,
301 #[serde(default)]
303 context_depth: Option<usize>,
304 #[serde(default)]
307 limit: Option<usize>,
308 #[serde(default)]
313 response_mode: Option<String>,
314 #[serde(default)]
319 session_from: Option<String>,
320 #[serde(default)]
323 after_id: Option<String>,
324 }
325
326 fn parse_session_from(value: Option<String>) -> SessionFrom {
327 match value.as_deref() {
328 Some("end") => SessionFrom::End,
329 _ => SessionFrom::Start,
330 }
331 }
332
333 fn parse_response_mode(value: Option<String>) -> ResponseMode {
334 match value.as_deref() {
335 Some("complete") => ResponseMode::Complete,
336 Some("verbatim") => ResponseMode::Verbatim,
337 _ => ResponseMode::Conversational,
339 }
340 }
341
342 #[derive(Clone)]
344 pub struct PondMcp {
345 state: AppState,
346 tool_router: ToolRouter<PondMcp>,
347 }
348
349 #[tool_router]
350 impl PondMcp {
351 pub fn new(state: AppState) -> Self {
352 Self {
353 state,
354 tool_router: Self::tool_router(),
355 }
356 }
357
358 #[tool(
359 description = "Hybrid (vector + BM25) search over stored conversation history. \
360 Returns a readable transcript: a leading `key:` line explains the \
361 format and the first line states totals, then results are grouped by \
362 session, ordered by each session's best hit. Each hit is a `--- [n] \
363 score | role | time | message_id | project | agent | session ---` \
364 delimiter rule followed by the matched text. Pass a returned \
365 `message_id` to `pond_get` for full text. Common args: \
366 query (semantic - concepts, not project names), then project / \
367 from_date / to_date to scope. Advanced: source_agent (e.g. \
368 \"claude-code\", or \"claude-code/general-purpose\" for subagents), \
369 similar_to (vector-only neighbors of a message_id), cursor (paging), \
370 include_subagents (subagent sessions are excluded by default). \
371 Scores are relative within one response; there is no min_score.",
372 annotations(read_only_hint = true, idempotent_hint = true, open_world_hint = false)
373 )]
374 async fn pond_search(
375 &self,
376 Parameters(params): Parameters<McpSearchParams>,
377 ) -> Result<CallToolResult, ErrorData> {
378 let request = SearchRequest {
379 protocol_version: PROTOCOL_VERSION,
380 namespace: Some(default_namespace()),
381 query: params.query.unwrap_or_default(),
382 filters: SearchFilters {
383 project: params.project.map(ProjectFilter::Contains),
384 session_id: params.session_id,
385 source_agent: params.source_agent,
386 from_date: params.from_date,
387 to_date: params.to_date,
388 role: params.role,
389 min_score: 0.0,
394 include_subagents: params.include_subagents.unwrap_or(false),
395 },
396 limit: params.limit.unwrap_or(10),
397 cursor: params.cursor,
398 mode_override: None,
399 similar_to: params.similar_to,
400 };
401 match run_search(
402 &self.state.store,
403 &self.state.embedder,
404 request.clone(),
405 &self.state.search,
406 )
407 .await
408 {
409 SearchEnvelope::Success(response) => {
410 Ok(tool_result(render_search_transcript(&response, &request)))
411 }
412 SearchEnvelope::Error(envelope) => Err(to_error_data(&envelope)),
413 }
414 }
415
416 #[tool(
417 description = "Retrieve stored conversation content as a readable transcript \
418 (a leading `key:` line explains the format). Common: session_id \
419 (whole session; pair with response_mode \
420 conversational|complete|verbatim) OR message_id (that message \
421 marked `>`, plus context_depth sibling messages each side, with \
422 its tool/file parts in full). A session_id response lists the \
423 session's subagents in a footer so you can open each. Advanced: \
424 limit (cap), after_id (paging - pass the value the footer shows), \
425 session_from (\"start\"|\"end\"; \"end\" returns the most recent \
426 messages, \
427 e.g. to recover context after compaction). \
428 Tool/result lines render as `-> name [call_id]` / `<- name \
429 [call_id] (ok|failed)`. Not for bulk export - use `pond export`.",
430 annotations(read_only_hint = true, idempotent_hint = true, open_world_hint = false)
431 )]
432 async fn pond_get(
433 &self,
434 Parameters(params): Parameters<McpGetParams>,
435 ) -> Result<CallToolResult, ErrorData> {
436 let request = GetRequest {
437 protocol_version: PROTOCOL_VERSION,
438 namespace: Some(default_namespace()),
439 session_id: params.session_id,
440 message_id: params.message_id,
441 context_depth: params.context_depth.unwrap_or(0),
442 limit: params.limit.unwrap_or(20),
443 response_mode: parse_response_mode(params.response_mode),
444 session_from: parse_session_from(params.session_from),
445 after_id: params.after_id,
446 };
447 match run_get(&self.state.store, request.clone()).await {
448 GetEnvelope::Success(response) => {
449 let mut transcript = render_get_transcript(&response, &request);
450 if request.message_id.is_none()
456 && request.after_id.is_none()
457 && let Ok(children) =
458 self.state.store.child_sessions(&response.session.id).await
459 && !children.is_empty()
460 {
461 transcript.push_str(&render_subagents_footer(&children));
462 }
463 Ok(tool_result(transcript))
464 }
465 GetEnvelope::Error(envelope) => Err(to_error_data(&envelope)),
466 }
467 }
468 }
469
470 #[tool_handler(router = self.tool_router)]
474 impl ServerHandler for PondMcp {
475 fn get_info(&self) -> ServerInfo {
476 ServerInfo::new(
477 ServerCapabilities::builder()
478 .enable_tools()
479 .enable_resources()
480 .build(),
481 )
482 .with_instructions(
483 "pond recalls past agent sessions (Claude Code and others) - prior work, \
484 decisions, and context across sessions, not the live conversation. \
485 Workflow: pond_search to find relevant messages, then pond_get to read \
486 full text by message_id or a whole session by session_id; both return \
487 readable transcripts, not JSON. Scope with filters, not the query: project \
488 (path substring), session_id, source_agent, role, from_date / to_date - \
489 keep query semantic (concepts, not project names). Scores are relative \
490 within one response; there is no min_score. Subagents are stored as their \
491 own sessions (source_agent like \"claude-code/general-purpose\"); pond_get \
492 on a parent session lists them in a footer so you can open each. Recover \
493 context lost to compaction: find this session via pond_search (a distinctive \
494 recent topic + project + from_date=today), then pond_get(session_id, \
495 session_from=\"end\") for the recent pre-compaction turns. Deeper \
496 reference on demand: resource schema://pond (all filters + response format), \
497 stats://pond (corpus + embedding stats).",
498 )
499 }
500
501 async fn list_resources(
502 &self,
503 _request: Option<PaginatedRequestParams>,
504 _context: RequestContext<RoleServer>,
505 ) -> Result<ListResourcesResult, ErrorData> {
506 Ok(ListResourcesResult {
507 resources: vec![
508 RawResource::new("schema://pond", "pond search schema").no_annotation(),
509 RawResource::new("stats://pond", "pond corpus stats").no_annotation(),
510 ],
511 next_cursor: None,
512 meta: None,
513 })
514 }
515
516 async fn read_resource(
517 &self,
518 request: ReadResourceRequestParams,
519 _context: RequestContext<RoleServer>,
520 ) -> Result<ReadResourceResult, ErrorData> {
521 match request.uri.as_str() {
522 "schema://pond" => Ok(ReadResourceResult::new(vec![ResourceContents::text(
523 SCHEMA_DOC,
524 request.uri,
525 )])),
526 "stats://pond" => {
527 let store = &self.state.store;
528 let map_err = |error: anyhow::Error| {
529 ErrorData::internal_error(format!("stats unavailable: {error}"), None)
530 };
531 let (sessions, messages, parts) = store.row_counts().await.map_err(&map_err)?;
532 let embedding = store.embedding_progress().await.map_err(&map_err)?;
533 let stale = store.stale_embedding_count().await.map_err(&map_err)?;
534 let indices = store.index_status().await.map_err(&map_err)?;
535
536 let embedded_percent = if embedding.total == 0 {
537 0.0
538 } else {
539 #[allow(clippy::cast_precision_loss)]
540 let pct = (embedding.embedded as f64 / embedding.total as f64) * 100.0;
541 (pct * 10.0).round() / 10.0
542 };
543 let index_rows = indices
544 .iter()
545 .map(|status| {
546 serde_json::json!({
547 "table": status.table.as_str(),
548 "intent": status.intent_name,
549 "exists": status.exists,
550 "fragments_covered": status.fragments_covered,
551 "unindexed_rows": status.unindexed_rows,
552 })
553 })
554 .collect::<Vec<_>>();
555
556 let stats = serde_json::json!({
561 "corpus": {
562 "sessions": sessions,
563 "messages": messages,
564 "searchable_messages": embedding.total,
565 "parts": parts,
566 },
567 "embeddings": {
568 "model": embedding.model,
569 "embedded": embedding.embedded,
570 "searchable_total": embedding.total,
571 "embedded_percent": embedded_percent,
572 "stale_under_other_model": stale,
573 },
574 "indices": index_rows,
575 });
576 Ok(ReadResourceResult::new(vec![ResourceContents::text(
577 stats.to_string(),
578 request.uri,
579 )]))
580 }
581 other => Err(ErrorData::resource_not_found(
582 format!("unknown resource: {other}"),
583 None,
584 )),
585 }
586 }
587
588 async fn list_tools(
589 &self,
590 request: Option<PaginatedRequestParams>,
591 context: RequestContext<RoleServer>,
592 ) -> Result<ListToolsResult, ErrorData> {
593 let _ = (request, context);
594 let mut result = ListToolsResult {
595 tools: self.tool_router.list_all(),
596 next_cursor: None,
597 meta: None,
598 };
599 annotate_tool_limits(&mut result);
600 Ok(result)
601 }
602 }
603
604 fn annotate_tool_limits(result: &mut ListToolsResult) {
605 for tool in &mut result.tools {
606 let chars = match tool.name.as_ref() {
607 "pond_search" => 80_000,
608 "pond_get" => 200_000,
609 _ => continue,
610 };
611 let mut meta = serde_json::Map::new();
612 meta.insert(
613 "anthropic/maxResultSizeChars".to_owned(),
614 serde_json::json!(chars),
615 );
616 tool.meta = Some(Meta(meta));
617 }
618 }
619
620 pub async fn serve_stdio(state: AppState) -> anyhow::Result<()> {
624 let service = PondMcp::new(state)
625 .serve(stdio())
626 .await
627 .context("failed to start stdio MCP server")?;
628 service.waiting().await.context("stdio MCP server error")?;
629 Ok(())
630 }
631
632 fn tool_result(transcript: String) -> CallToolResult {
638 CallToolResult::success(vec![Content::text(transcript)])
639 }
640
641 fn render_subagents_footer(children: &[crate::wire::Session]) -> String {
646 use std::fmt::Write;
647 let mut out = String::new();
648 let _ = writeln!(out);
649 let _ = writeln!(
650 out,
651 "subagents ({}) - pass an id to pond_get(session_id=...):",
652 children.len()
653 );
654 for child in children {
655 let _ = writeln!(out, " {} | {}", child.id, child.source_agent);
656 }
657 out
658 }
659
660 fn fmt_ts(ts: &chrono::DateTime<chrono::Utc>) -> String {
662 ts.format("%Y-%m-%d %H:%M:%SZ").to_string()
663 }
664
665 fn opt_name(value: &Option<crate::adapter::extract::Extracted<String>>) -> &str {
668 value.as_deref().map(String::as_str).unwrap_or("?")
669 }
670
671 fn push_lines(out: &mut String, body: &str, indent: &str) {
675 use std::fmt::Write;
676 for line in body.lines() {
677 let _ = writeln!(out, "{indent}{line}");
678 }
679 }
680
681 fn render_search_transcript(response: &SearchResponse, request: &SearchRequest) -> String {
682 use std::fmt::Write;
683 let subagent_note = if !request.filters.include_subagents
685 && request.filters.session_id.is_none()
686 && request.filters.source_agent.is_none()
687 {
688 " Subagent sessions excluded; pass include_subagents=true to include them."
689 } else {
690 ""
691 };
692 if response.sessions.is_empty() {
693 return match request.similar_to.as_deref() {
694 Some(id) => format!("pond_search: no matches similar to {id}.{subagent_note}\n"),
695 None => {
696 format!(
697 "pond_search: no matches for {:?}.{subagent_note}\n",
698 request.query
699 )
700 }
701 };
702 }
703 let shown: usize = response.sessions.iter().map(|s| s.matches.len()).sum();
704 let sim = request
705 .similar_to
706 .as_deref()
707 .map(|id| format!(" similar to {id}"))
708 .unwrap_or_default();
709 let mut out = String::new();
710 let _ = writeln!(
711 out,
712 "pond_search: {} matching messages, showing {} hits from {} sessions{}.{}",
713 response.matched_total,
714 shown,
715 response.sessions.len(),
716 sim,
717 subagent_note,
718 );
719 let _ = writeln!(
720 out,
721 "key: session rules group hits by session, ordered by best hit; \"--- [n] score | role | time | message_id | project | agent | session ---\" delimits each hit + matched text. pond_get <message_id> for full; pass cursor to page."
722 );
723 let mut index = 0;
724 for (session_index, session) in response.sessions.iter().enumerate() {
725 let best = session
726 .matches
727 .first()
728 .map(|hit| hit.score)
729 .unwrap_or_default();
730 let _ = writeln!(out);
731 let _ = writeln!(
732 out,
733 "{}",
734 rule_line(&format!(
735 "session [{}] best {:.2} | {}/{} matched | {} | {} | {}",
736 session_index + 1,
737 best,
738 session.matched_message_count,
739 session.session_messages_count,
740 session.project,
741 session.source_agent,
742 session.session_id,
743 )),
744 );
745 for hit in &session.matches {
746 index += 1;
747 let _ = writeln!(out);
748 let _ = writeln!(
749 out,
750 "{}",
751 rule_line(&format!(
752 "[{index}] {:.2} | {} | {} | {} | {} | {} | {}",
753 hit.score,
754 hit.role.as_str(),
755 fmt_ts(&hit.timestamp),
756 hit.message_id,
757 session.project,
758 session.source_agent,
759 session.session_id,
760 )),
761 );
762 push_lines(&mut out, &hit.text, "");
763 }
764 }
765 if let Some(cursor) = &response.next_cursor {
766 let _ = writeln!(out);
767 let _ = writeln!(out, "cursor: {cursor} (pass as `cursor` to page)");
768 }
769 out
770 }
771
772 fn render_get_transcript(response: &GetResponse, request: &GetRequest) -> String {
773 use std::fmt::Write;
774 let session = &response.session;
775 let mut out = String::new();
776 match &response.result {
777 GetResult::Session {
778 messages,
779 messages_remaining,
780 } => {
781 let mode = match request.response_mode {
782 ResponseMode::Conversational => "conversational",
783 ResponseMode::Complete => "complete",
784 ResponseMode::Verbatim => "verbatim",
785 };
786 let more = if *messages_remaining > 0 {
787 " (more)"
788 } else {
789 ""
790 };
791 let _ = writeln!(
792 out,
793 "pond_get: session {} ({mode}), {} messages{more}.",
794 session.id,
795 messages.len(),
796 );
797 let _ = writeln!(
798 out,
799 "key: \"--- [n] role | time | message_id ---\" delimits each message; \"->\" tool call, \"<-\" result. Pass after_id=<id> to page."
800 );
801 for (idx, message) in messages.iter().enumerate() {
802 let _ = writeln!(out);
803 render_message(
804 &mut out,
805 idx + 1,
806 message,
807 message.parts.as_deref(),
808 &message.parts_summary,
809 false,
810 );
811 }
812 let _ = writeln!(out);
813 let _ = writeln!(
814 out,
815 "session {} | {} | {}",
816 session.id, session.source_agent, session.project,
817 );
818 if *messages_remaining > 0
819 && let Some(last) = messages.last()
820 {
821 match request.session_from {
822 SessionFrom::Start => {
823 let _ = writeln!(
824 out,
825 "... {} more messages; pass after_id={} to pond_get to continue",
826 messages_remaining, last.id,
827 );
828 }
829 SessionFrom::End => {
833 let _ = writeln!(
834 out,
835 "... {messages_remaining} earlier messages precede this tail; call pond_get with session_from=\"start\" to read from the beginning",
836 );
837 }
838 }
839 }
840 }
841 GetResult::Message {
842 target,
843 target_parts,
844 target_parts_remaining,
845 siblings,
846 } => {
847 let _ = writeln!(
848 out,
849 "pond_get: thread around {} in session {} (context +/-{}).",
850 target.id, session.id, request.context_depth,
851 );
852 let _ = writeln!(
853 out,
854 "key: \"--- [n] role | time | message_id ---\" delimits each message; \">\" = the one you requested; \"->\" tool call, \"<-\" result. pond_get <message_id> to expand any line."
855 );
856 let mut thread: Vec<(&MessageView, bool)> =
863 siblings.iter().map(|view| (view, false)).collect();
864 thread.push((target, true));
865 thread.sort_by(|a, b| {
866 a.0.timestamp
867 .cmp(&b.0.timestamp)
868 .then_with(|| a.0.id.cmp(&b.0.id))
869 });
870 thread.retain(|(view, is_target)| *is_target || message_has_content(view));
871 for (idx, (view, is_target)) in thread.iter().enumerate() {
872 let _ = writeln!(out);
873 let parts: Option<&[ResponsePart]> = if *is_target {
874 Some(target_parts.as_slice())
875 } else {
876 view.parts.as_deref()
877 };
878 render_message(
879 &mut out,
880 idx + 1,
881 view,
882 parts,
883 &view.parts_summary,
884 *is_target,
885 );
886 }
887 let _ = writeln!(out);
888 let _ = writeln!(
889 out,
890 "session {} | {} | {}",
891 session.id, session.source_agent, session.project,
892 );
893 if *target_parts_remaining > 0
894 && let Some(last) = target_parts.last()
895 {
896 let _ = writeln!(
897 out,
898 "... {} more parts of {}; pass after_id={} to pond_get to continue",
899 target_parts_remaining, target.id, last.id,
900 );
901 }
902 }
903 }
904 out
905 }
906
907 fn message_has_content(view: &MessageView) -> bool {
911 view.text.as_deref().is_some_and(|t| !t.trim().is_empty())
912 || view
913 .content
914 .as_deref()
915 .is_some_and(|c| !c.trim().is_empty())
916 || view.parts.as_deref().is_some_and(|p| !p.is_empty())
917 || !view.parts_summary.is_empty()
918 }
919
920 const RULE_WIDTH: usize = 72;
922
923 fn rule_line(inner: &str) -> String {
927 let head = format!("--- {inner} ");
928 let pad = RULE_WIDTH.saturating_sub(head.chars().count()).max(3);
929 format!("{head}{}", "-".repeat(pad))
930 }
931
932 fn render_message(
937 out: &mut String,
938 index: usize,
939 view: &MessageView,
940 parts: Option<&[ResponsePart]>,
941 summary: &[PartSummary],
942 is_target: bool,
943 ) {
944 use std::fmt::Write;
945 let marker = if is_target { "> " } else { "" };
946 let _ = writeln!(
947 out,
948 "{}",
949 rule_line(&format!(
950 "[{index}] {marker}{} | {} | {}",
951 view.role.as_str(),
952 fmt_ts(&view.timestamp),
953 view.id,
954 )),
955 );
956 if let Some(text) = &view.text {
957 push_lines(out, text, "");
958 }
959 if let Some(content) = &view.content {
960 push_lines(out, content, "");
961 }
962 match parts {
963 Some(parts) => {
964 for part in parts {
965 render_part_full(out, part);
966 }
967 }
968 None => {
969 for part in summary {
970 render_part_summary(out, part);
971 }
972 }
973 }
974 }
975
976 fn render_part_full(out: &mut String, part: &ResponsePart) {
977 use std::fmt::Write;
978 match &part.kind {
979 PartKind::Text { text } => {
980 if let Some(text) = text {
981 push_lines(out, text, "");
982 }
983 }
984 PartKind::Reasoning { text } => {
985 let _ = writeln!(out, " (reasoning)");
986 if let Some(text) = text {
987 push_lines(out, text, " ");
988 }
989 }
990 PartKind::ToolCall {
991 name,
992 call_id,
993 params,
994 ..
995 } => {
996 let _ = writeln!(out, " -> {} [{}]", opt_name(name), opt_name(call_id));
997 push_lines(out, &value_to_text(params), " ");
998 }
999 PartKind::ToolResult {
1000 name,
1001 call_id,
1002 is_failure,
1003 result,
1004 } => {
1005 let status = if *is_failure { "failed" } else { "ok" };
1006 let _ = writeln!(
1007 out,
1008 " <- {} [{}] ({status})",
1009 opt_name(name),
1010 opt_name(call_id),
1011 );
1012 push_lines(out, &value_to_text(result), " ");
1013 }
1014 PartKind::File {
1015 media_type,
1016 file_name,
1017 ..
1018 } => {
1019 let label = file_name
1020 .as_deref()
1021 .or(media_type.as_deref())
1022 .unwrap_or("file");
1023 let _ = writeln!(out, " [file {label}]");
1024 }
1025 PartKind::ToolApprovalRequest { approval_id, .. } => {
1026 let _ = writeln!(out, " [approval request {approval_id}]");
1027 }
1028 PartKind::ToolApprovalResponse {
1029 approval_id,
1030 approved,
1031 ..
1032 } => {
1033 let verb = if *approved { "approved" } else { "denied" };
1034 let _ = writeln!(out, " [approval {approval_id} {verb}]");
1035 }
1036 }
1037 }
1038
1039 fn render_part_summary(out: &mut String, summary: &PartSummary) {
1040 use std::fmt::Write;
1041 let label = summary.label.as_deref().unwrap_or("");
1042 let call = summary
1043 .call_id
1044 .as_deref()
1045 .map(|id| format!(" [{id}]"))
1046 .unwrap_or_default();
1047 match summary.kind.as_str() {
1048 "tool_call" => {
1049 let _ = writeln!(out, " -> {label}{call}");
1050 }
1051 "tool_result" => {
1052 let _ = writeln!(out, " <- {label}{call}");
1053 }
1054 "file" => {
1055 let _ = writeln!(out, " [file {label}]");
1056 }
1057 other => {
1058 let _ = writeln!(out, " [{other} {label}]");
1059 }
1060 }
1061 }
1062
1063 fn value_to_text(value: &serde_json::Value) -> String {
1066 match value {
1067 serde_json::Value::String(text) => text.clone(),
1068 serde_json::Value::Null => String::new(),
1069 other => serde_json::to_string(other).unwrap_or_default(),
1070 }
1071 }
1072
1073 fn to_error_data(envelope: &ErrorEnvelope) -> ErrorData {
1079 let (jsonrpc_code, pond_code, retryable) = match envelope.error.code {
1080 WireErrorCode::ValidationFailed => (-32010, "validation_failed", false),
1081 WireErrorCode::VersionUnsupported => (-32011, "version_unsupported", false),
1082 WireErrorCode::NotFound => (-32012, "not_found", false),
1083 WireErrorCode::NamespaceUnknown => (-32013, "namespace_unknown", false),
1084 WireErrorCode::StorageUnavailable => (-32014, "storage_unavailable", true),
1085 WireErrorCode::Conflict => (-32015, "conflict", true),
1086 WireErrorCode::Internal => (-32016, "internal", false),
1087 };
1088 let mut data = match &envelope.error.details {
1089 serde_json::Value::Object(map) => map.clone(),
1090 _ => serde_json::Map::new(),
1091 };
1092 data.insert("pond_code".to_owned(), serde_json::json!(pond_code));
1093 data.insert("retryable".to_owned(), serde_json::json!(retryable));
1094 ErrorData::new(
1095 JsonRpcErrorCode(jsonrpc_code),
1096 envelope.error.message.clone(),
1097 Some(serde_json::Value::Object(data)),
1098 )
1099 }
1100
1101 #[cfg(test)]
1102 mod tests {
1103 #![allow(clippy::expect_used, clippy::unwrap_used)]
1104
1105 use std::sync::Arc;
1106
1107 use rmcp::model::{ErrorCode as JsonRpcErrorCode, Tool};
1108
1109 use super::*;
1110 use crate::wire::{ErrorBody, ErrorCode, Role, SearchResponse, SearchResult};
1111
1112 #[test]
1113 fn error_data_carries_code_and_retryability() {
1114 let cases = [
1115 (
1116 ErrorCode::ValidationFailed,
1117 -32010,
1118 "validation_failed",
1119 false,
1120 ),
1121 (
1122 ErrorCode::VersionUnsupported,
1123 -32011,
1124 "version_unsupported",
1125 false,
1126 ),
1127 (ErrorCode::NotFound, -32012, "not_found", false),
1128 (
1129 ErrorCode::NamespaceUnknown,
1130 -32013,
1131 "namespace_unknown",
1132 false,
1133 ),
1134 (
1135 ErrorCode::StorageUnavailable,
1136 -32014,
1137 "storage_unavailable",
1138 true,
1139 ),
1140 (ErrorCode::Conflict, -32015, "conflict", true),
1141 (ErrorCode::Internal, -32016, "internal", false),
1142 ];
1143 for (code, jsonrpc, pond_code, retryable) in cases {
1144 let error = to_error_data(&ErrorEnvelope {
1145 error: ErrorBody {
1146 code,
1147 message: "boom".to_owned(),
1148 details: serde_json::json!({"detail": 1}),
1149 },
1150 });
1151 assert_eq!(error.code, JsonRpcErrorCode(jsonrpc));
1152 let data = error.data.unwrap();
1153 assert_eq!(data["detail"], serde_json::json!(1));
1154 assert_eq!(data["pond_code"], serde_json::json!(pond_code));
1155 assert_eq!(data["retryable"], serde_json::json!(retryable));
1156 assert!(
1157 data.get("request_id").is_none(),
1158 "MCP errors use JSON-RPC ids for correlation"
1159 );
1160 }
1161 }
1162
1163 #[test]
1164 fn annotate_tool_limits_sets_anthropic_meta() {
1165 let schema = Arc::new(serde_json::Map::new());
1166 let mut result = ListToolsResult {
1167 tools: vec![
1168 Tool::new("pond_search", "Search", Arc::clone(&schema)),
1169 Tool::new("pond_get", "Get", Arc::clone(&schema)),
1170 ],
1171 next_cursor: None,
1172 meta: None,
1173 };
1174 annotate_tool_limits(&mut result);
1175 let value = |name: &str| {
1176 result
1177 .tools
1178 .iter()
1179 .find(|tool| tool.name == name)
1180 .and_then(|tool| tool.meta.as_ref())
1181 .and_then(|meta| meta.0.get("anthropic/maxResultSizeChars"))
1182 .and_then(serde_json::Value::as_i64)
1183 };
1184 assert_eq!(value("pond_search"), Some(80_000));
1185 assert_eq!(value("pond_get"), Some(200_000));
1186 }
1187
1188 #[test]
1189 fn get_transcript_marks_target_and_renders_tool_parts() {
1190 let ts = chrono::DateTime::from_timestamp(0, 0).unwrap();
1191 let tool_call: ResponsePart = serde_json::from_value(serde_json::json!({
1192 "id": "p1", "ordinal": 0, "provenance": "conversational",
1193 "type": "tool_call", "name": "Bash", "call_id": "toolu_x",
1194 "params": { "command": "ls" }, "provider_executed": false,
1195 }))
1196 .unwrap();
1197 let tool_result: ResponsePart = serde_json::from_value(serde_json::json!({
1198 "id": "p2", "ordinal": 1, "provenance": "conversational",
1199 "type": "tool_result", "name": "Bash", "call_id": "toolu_x",
1200 "is_failure": false, "result": "file.txt",
1201 }))
1202 .unwrap();
1203 let target = MessageView {
1204 id: "m1".to_owned(),
1205 role: crate::wire::Role::Assistant,
1206 timestamp: ts,
1207 text: Some("Let me list files.".to_owned()),
1208 content: None,
1209 parts_summary: Vec::new(),
1210 parts: None,
1211 };
1212 let response = GetResponse {
1213 session: crate::wire::GetSession {
1214 id: "s1".to_owned(),
1215 source_agent: "claude-code".to_owned(),
1216 project: "/p".to_owned(),
1217 created_at: ts,
1218 },
1219 result: GetResult::Message {
1220 target,
1221 target_parts: vec![tool_call, tool_result],
1222 target_parts_remaining: 0,
1223 siblings: Vec::new(),
1224 },
1225 };
1226 let request = GetRequest {
1227 protocol_version: crate::PROTOCOL_VERSION,
1228 namespace: None,
1229 session_id: None,
1230 message_id: Some("m1".to_owned()),
1231 context_depth: 0,
1232 limit: 20,
1233 response_mode: ResponseMode::default(),
1234 session_from: SessionFrom::default(),
1235 after_id: None,
1236 };
1237
1238 let transcript = render_get_transcript(&response, &request);
1239 assert!(transcript.contains("--- [1] > assistant | 1970-01-01 00:00:00Z | m1 ---"));
1240 assert!(transcript.contains("Let me list files."));
1241 assert!(transcript.contains(" -> Bash [toolu_x]"));
1242 assert!(transcript.contains(" <- Bash [toolu_x] (ok)"));
1243 assert!(transcript.contains("session s1 | claude-code | /p"));
1244 }
1245
1246 #[test]
1247 fn search_transcript_renders_header_and_hits() {
1248 let response = SearchResponse {
1249 sessions: vec![crate::wire::SearchSession {
1250 session_id: "s1".to_owned(),
1251 project: "pond".to_owned(),
1252 source_agent: "claude-code".to_owned(),
1253 session_messages_count: 2,
1254 matched_message_count: 1,
1255 matches: vec![SearchResult {
1256 message_id: "m1".to_owned(),
1257 role: Role::User,
1258 timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
1259 text: "hello\nworld".to_owned(),
1260 score: 1.0,
1261 parts_summary: Vec::new(),
1262 }],
1263 }],
1264 matched_total: 1,
1265 has_more: false,
1266 next_cursor: None,
1267 };
1268 let request = SearchRequest {
1269 protocol_version: crate::PROTOCOL_VERSION,
1270 namespace: None,
1271 query: "hi".to_owned(),
1272 mode_override: None,
1273 similar_to: None,
1274 filters: SearchFilters::default(),
1275 limit: 10,
1276 cursor: None,
1277 };
1278
1279 let transcript = render_search_transcript(&response, &request);
1280 assert!(
1281 transcript.starts_with(
1282 "pond_search: 1 matching messages, showing 1 hits from 1 sessions."
1283 )
1284 );
1285 assert!(
1286 transcript
1287 .contains("key: session rules group hits by session, ordered by best hit")
1288 );
1289 assert!(
1290 transcript
1291 .contains("--- session [1] best 1.00 | 1/2 matched | pond | claude-code | s1")
1292 );
1293 assert!(transcript.contains(
1296 "--- [1] 1.00 | user | 1970-01-01 00:00:00Z | m1 | pond | claude-code | s1"
1297 ));
1298 assert!(transcript.contains("hello\nworld"));
1300
1301 let result = tool_result(transcript);
1304 assert!(result.content[0].raw.as_text().is_some());
1305 assert!(result.structured_content.is_none());
1306 }
1307 }
1308}