1use std::sync::Arc;
10
11use crate::{config::SearchConfig, embed::LazyEmbedder, sessions::Store};
12
13#[derive(Clone)]
18pub struct AppState {
19 pub store: Arc<Store>,
20 pub embedder: Arc<LazyEmbedder>,
21 pub search: SearchConfig,
22}
23
24pub mod http {
25 use std::net::{IpAddr, SocketAddr};
29
30 use anyhow::Context;
31 use axum::{
32 Json, Router,
33 extract::{DefaultBodyLimit, State},
34 http::{HeaderValue, StatusCode},
35 response::{IntoResponse, Response},
36 routing::post,
37 };
38 use rmcp::transport::streamable_http_server::{
39 StreamableHttpServerConfig, StreamableHttpService, session::local::LocalSessionManager,
40 };
41 use tokio::net::TcpListener;
42
43 use super::AppState;
44 use crate::{
45 handlers::{pond_get, pond_ingest, pond_search},
46 wire::{
47 ErrorCode, GetEnvelope, GetRequest, IngestEnvelope, IngestRequest, SearchEnvelope,
48 SearchRequest, default_namespace, new_request_id,
49 },
50 };
51
52 pub const HTTP_BODY_LIMIT_BYTES: usize = 8 * 1024 * 1024;
57
58 pub fn router(state: AppState) -> Router {
62 let mcp_state = state.clone();
63 let mcp = StreamableHttpService::new(
64 move || Ok(super::mcp::PondMcp::new(mcp_state.clone())),
65 LocalSessionManager::default().into(),
66 StreamableHttpServerConfig::default(),
67 );
68 Router::new()
69 .route("/v1/search", post(search))
70 .route("/v1/get", post(get))
71 .route("/v1/ingest", post(ingest))
72 .layer(DefaultBodyLimit::max(HTTP_BODY_LIMIT_BYTES))
73 .with_state(state)
74 .nest_service("/mcp", mcp)
75 }
76
77 pub async fn serve(state: AppState, host: String, port: u16) -> anyhow::Result<()> {
81 let ip: IpAddr = host
82 .parse()
83 .with_context(|| format!("invalid --host {host:?}"))?;
84 if ip.is_unspecified() {
85 tracing::warn!(
86 %host,
87 "binding to an unspecified address exposes pond on the LAN; \
88 the personal pond is single-user"
89 );
90 }
91 let listener = TcpListener::bind(SocketAddr::new(ip, port))
92 .await
93 .with_context(|| format!("failed to bind {host}:{port}"))?;
94 let local = listener
95 .local_addr()
96 .context("failed to read bound address")?;
97 tracing::info!(%local, "pond serve listening (HTTP /v1/*, MCP /mcp)");
98 axum::serve(listener, router(state))
99 .with_graceful_shutdown(shutdown_signal())
100 .await
101 .context("axum server error")
102 }
103
104 async fn shutdown_signal() {
105 let _ = tokio::signal::ctrl_c().await;
106 }
107
108 async fn search(
109 State(state): State<AppState>,
110 Json(mut request): Json<SearchRequest>,
111 ) -> Response {
112 request.namespace.get_or_insert_with(default_namespace);
113 let envelope = pond_search(&state.store, &state.embedder, request, &state.search).await;
114 let status = match &envelope {
115 SearchEnvelope::Success(_) => StatusCode::OK,
116 SearchEnvelope::Error(error) => status_for(&error.error.code),
117 };
118 with_request_id((status, Json(envelope)).into_response())
119 }
120
121 async fn get(State(state): State<AppState>, Json(mut request): Json<GetRequest>) -> Response {
122 request.namespace.get_or_insert_with(default_namespace);
123 let envelope = pond_get(&state.store, request).await;
124 let status = match &envelope {
125 GetEnvelope::Success(_) => StatusCode::OK,
126 GetEnvelope::Error(error) => status_for(&error.error.code),
127 };
128 with_request_id((status, Json(envelope)).into_response())
129 }
130
131 async fn ingest(
132 State(state): State<AppState>,
133 Json(mut request): Json<IngestRequest>,
134 ) -> Response {
135 request.namespace.get_or_insert_with(default_namespace);
136 let envelope = pond_ingest(&state.store, request).await;
137 let status = match &envelope {
141 IngestEnvelope::Success(_) => StatusCode::OK,
142 IngestEnvelope::Error(error) => status_for(&error.error.code),
143 };
144 with_request_id((status, Json(envelope)).into_response())
145 }
146
147 fn with_request_id(mut response: Response) -> Response {
148 if let Ok(value) = HeaderValue::from_str(&new_request_id()) {
149 response.headers_mut().insert("x-pond-request-id", value);
150 }
151 response
152 }
153
154 fn status_for(code: &ErrorCode) -> StatusCode {
157 match code {
158 ErrorCode::ValidationFailed
159 | ErrorCode::VersionUnsupported
160 | ErrorCode::NamespaceUnknown => StatusCode::BAD_REQUEST,
161 ErrorCode::NotFound => StatusCode::NOT_FOUND,
162 ErrorCode::Conflict => StatusCode::CONFLICT,
163 ErrorCode::StorageUnavailable => StatusCode::SERVICE_UNAVAILABLE,
164 ErrorCode::Internal => StatusCode::INTERNAL_SERVER_ERROR,
165 }
166 }
167}
168
169pub mod mcp {
170 use anyhow::Context;
175 use rmcp::{
176 ErrorData, RoleServer, ServerHandler, ServiceExt,
177 handler::server::{router::tool::ToolRouter, wrapper::Parameters},
178 model::{
179 AnnotateAble, CallToolResult, Content, ErrorCode as JsonRpcErrorCode,
180 ListResourcesResult, ListToolsResult, Meta, PaginatedRequestParams, RawResource,
181 ReadResourceRequestParams, ReadResourceResult, ResourceContents, ServerCapabilities,
182 ServerInfo,
183 },
184 schemars,
185 service::RequestContext,
186 tool, tool_handler, tool_router,
187 transport::stdio,
188 };
189 use serde::Deserialize;
190
191 use super::AppState;
192 use crate::{
193 PROTOCOL_VERSION,
194 handlers::pond_get as run_get,
195 handlers::pond_search as run_search,
196 wire::{
197 ErrorCode as WireErrorCode, ErrorEnvelope, GetEnvelope, GetRequest, GetResponse,
198 GetResult, MessageView, PartKind, PartSummary, ProjectFilter, ResponseMode,
199 ResponsePart, SearchEnvelope, SearchFilters, SearchRequest, SearchResponse,
200 default_namespace,
201 },
202 };
203
204 const SCHEMA_DOC: &str = "\
207pond_search filters: query (semantic - concepts, not project names), limit \
208(default 10, max 200), project (path substring), session_id (exact session \
209match), source_agent, role (user|assistant|system|tool), from_date / to_date \
210(YYYY-MM-DD), cursor (opaque continuation token).
211
212pond_search response: a transcript. The first line states totals \
213(`matched_total` is the message count before `limit` and byte-budget \
214truncation), then each hit is a `--- [n] score | role | time | message_id | \
215project | agent | session ---` rule followed by its matched text (a ~600-char \
216indexed window). Up to 3 top-scoring hits per session, score-desc; `score` is \
217normalized to [0.0, 1.0] within one response. When more remain, a `cursor:` \
218footer carries the token to pass back as `cursor`; rank may shift between \
219pages if the corpus changes.
220
221pond_search multilingual: pond's embedder (multilingual-e5-small) is trained \
222for cross-lingual retrieval, so a query in language A can match indexed text \
223in language B via the vector arm. The FTS arm is character-ngram-based and \
224only matches surface tokens, so for cross-lingual queries expect most signal \
225to come from the vector arm.
226
227pond_get: message_id (the target message, marked `>`, plus context_depth \
228sibling messages each side) OR session_id (the whole session). Output is a \
229transcript - each message is a `--- [n] role | time | message_id ---` rule, \
230then its text/content as real lines, then parts (`-> name [call_id]` tool \
231call, `<- name [call_id] (ok|failed)` result). Session mode takes \
232response_mode: \"conversational\" (default - human/model text only), \
233\"complete\" (all messages incl. carriers, tools as one-liners), or \
234\"verbatim\" (full part bodies inline; heaviest). limit defaults to 20, caps \
235at 1000. Bounded by a size budget: when the footer shows `after_id=`, pass it \
236back to page. Not for bulk export - use `pond export`.";
237
238 #[derive(Debug, Deserialize, schemars::JsonSchema)]
240 struct McpSearchParams {
241 #[serde(default)]
247 query: Option<String>,
248 #[serde(default)]
250 limit: Option<usize>,
251 #[serde(default)]
253 project: Option<String>,
254 #[serde(default)]
256 session_id: Option<String>,
257 #[serde(default)]
259 source_agent: Option<String>,
260 #[serde(default)]
262 role: Option<String>,
263 #[serde(default)]
265 from_date: Option<String>,
266 #[serde(default)]
268 to_date: Option<String>,
269 #[serde(default)]
275 similar_to: Option<String>,
276 #[serde(default)]
278 cursor: Option<String>,
279 }
280
281 #[derive(Debug, Deserialize, schemars::JsonSchema)]
284 struct McpGetParams {
285 #[serde(default)]
288 message_id: Option<String>,
289 #[serde(default)]
291 session_id: Option<String>,
292 #[serde(default)]
294 context_depth: Option<usize>,
295 #[serde(default)]
298 limit: Option<usize>,
299 #[serde(default)]
304 response_mode: Option<String>,
305 #[serde(default)]
308 after_id: Option<String>,
309 }
310
311 fn parse_response_mode(value: Option<String>) -> ResponseMode {
312 match value.as_deref() {
313 Some("complete") => ResponseMode::Complete,
314 Some("verbatim") => ResponseMode::Verbatim,
315 _ => ResponseMode::Conversational,
317 }
318 }
319
320 #[derive(Clone)]
322 pub struct PondMcp {
323 state: AppState,
324 tool_router: ToolRouter<PondMcp>,
325 }
326
327 #[tool_router]
328 impl PondMcp {
329 pub fn new(state: AppState) -> Self {
330 Self {
331 state,
332 tool_router: Self::tool_router(),
333 }
334 }
335
336 #[tool(
337 description = "Hybrid (vector + BM25) search over stored conversation history. \
338 Returns a readable transcript: a leading `key:` line explains the \
339 format and the first line states totals, then each hit is a \
340 `--- [n] score | role | time | message_id | project | agent | \
341 session ---` delimiter rule followed by the matched text. Pass a \
342 returned `message_id` to `pond_get` for full text; pass `cursor` \
343 to page. Keep `query` semantic; use `project` / `session_id` \
344 filters for scope."
345 )]
346 async fn pond_search(
347 &self,
348 Parameters(params): Parameters<McpSearchParams>,
349 ) -> Result<CallToolResult, ErrorData> {
350 let request = SearchRequest {
351 protocol_version: PROTOCOL_VERSION,
352 namespace: Some(default_namespace()),
353 query: params.query.unwrap_or_default(),
354 filters: SearchFilters {
355 project: params.project.map(ProjectFilter::Contains),
356 session_id: params.session_id,
357 source_agent: params.source_agent,
358 from_date: params.from_date,
359 to_date: params.to_date,
360 role: params.role,
361 min_score: 0.0,
366 },
367 limit: params.limit.unwrap_or(10),
368 cursor: params.cursor,
369 mode_override: None,
370 similar_to: params.similar_to,
371 };
372 match run_search(
373 &self.state.store,
374 &self.state.embedder,
375 request.clone(),
376 &self.state.search,
377 )
378 .await
379 {
380 SearchEnvelope::Success(response) => {
381 Ok(tool_result(render_search_transcript(&response, &request)))
382 }
383 SearchEnvelope::Error(envelope) => Err(to_error_data(&envelope)),
384 }
385 }
386
387 #[tool(
388 description = "Retrieve stored conversation content as a readable transcript \
389 (a leading `key:` line explains the format). With `message_id`: \
390 the requested message (marked `>`) plus `context_depth` sibling \
391 messages each side, with its tool/file parts shown in full. With \
392 `session_id`: the session at one of three `response_mode`s - \
393 \"conversational\" (default; human/model text only), \"complete\" \
394 (all messages, tools as one-liners), or \"verbatim\" (full part \
395 bodies inline). Bounded by a size budget; when the footer shows an \
396 `after_id=`, pass it back to page on. Tool/result lines render as \
397 `-> name [call_id]` / `<- name [call_id] (ok|failed)`. Not for \
398 bulk export - use `pond export`."
399 )]
400 async fn pond_get(
401 &self,
402 Parameters(params): Parameters<McpGetParams>,
403 ) -> Result<CallToolResult, ErrorData> {
404 let request = GetRequest {
405 protocol_version: PROTOCOL_VERSION,
406 namespace: Some(default_namespace()),
407 session_id: params.session_id,
408 message_id: params.message_id,
409 context_depth: params.context_depth.unwrap_or(0),
410 limit: params.limit.unwrap_or(20),
411 response_mode: parse_response_mode(params.response_mode),
412 after_id: params.after_id,
413 };
414 match run_get(&self.state.store, request.clone()).await {
415 GetEnvelope::Success(response) => {
416 Ok(tool_result(render_get_transcript(&response, &request)))
417 }
418 GetEnvelope::Error(envelope) => Err(to_error_data(&envelope)),
419 }
420 }
421 }
422
423 #[tool_handler(router = self.tool_router)]
427 impl ServerHandler for PondMcp {
428 fn get_info(&self) -> ServerInfo {
429 ServerInfo::new(
430 ServerCapabilities::builder()
431 .enable_tools()
432 .enable_resources()
433 .build(),
434 )
435 .with_instructions(
436 "pond: session storage and retrieval. Tools: pond_search (hybrid search \
437 over conversation history), pond_get (retrieve a message with thread \
438 context, or a full session). Resources: schema://pond, stats://pond.",
439 )
440 }
441
442 async fn list_resources(
443 &self,
444 _request: Option<PaginatedRequestParams>,
445 _context: RequestContext<RoleServer>,
446 ) -> Result<ListResourcesResult, ErrorData> {
447 Ok(ListResourcesResult {
448 resources: vec![
449 RawResource::new("schema://pond", "pond search schema").no_annotation(),
450 RawResource::new("stats://pond", "pond corpus stats").no_annotation(),
451 ],
452 next_cursor: None,
453 meta: None,
454 })
455 }
456
457 async fn read_resource(
458 &self,
459 request: ReadResourceRequestParams,
460 _context: RequestContext<RoleServer>,
461 ) -> Result<ReadResourceResult, ErrorData> {
462 match request.uri.as_str() {
463 "schema://pond" => Ok(ReadResourceResult::new(vec![ResourceContents::text(
464 SCHEMA_DOC,
465 request.uri,
466 )])),
467 "stats://pond" => {
468 let store = &self.state.store;
469 let map_err = |error: anyhow::Error| {
470 ErrorData::internal_error(format!("stats unavailable: {error}"), None)
471 };
472 let (sessions, messages, parts) = store.row_counts().await.map_err(&map_err)?;
473 let embedding = store.embedding_progress().await.map_err(&map_err)?;
474 let stale = store.stale_embedding_count().await.map_err(&map_err)?;
475 let indices = store.index_status().await.map_err(&map_err)?;
476
477 let embedded_percent = if embedding.total == 0 {
478 0.0
479 } else {
480 #[allow(clippy::cast_precision_loss)]
481 let pct = (embedding.embedded as f64 / embedding.total as f64) * 100.0;
482 (pct * 10.0).round() / 10.0
483 };
484 let index_rows = indices
485 .iter()
486 .map(|status| {
487 serde_json::json!({
488 "table": status.table.as_str(),
489 "intent": status.intent_name,
490 "exists": status.exists,
491 "fragments_covered": status.fragments_covered,
492 "unindexed_rows": status.unindexed_rows,
493 })
494 })
495 .collect::<Vec<_>>();
496
497 let stats = serde_json::json!({
502 "corpus": {
503 "sessions": sessions,
504 "messages": messages,
505 "searchable_messages": embedding.total,
506 "parts": parts,
507 },
508 "embeddings": {
509 "model": embedding.model,
510 "embedded": embedding.embedded,
511 "searchable_total": embedding.total,
512 "embedded_percent": embedded_percent,
513 "stale_under_other_model": stale,
514 },
515 "indices": index_rows,
516 });
517 Ok(ReadResourceResult::new(vec![ResourceContents::text(
518 stats.to_string(),
519 request.uri,
520 )]))
521 }
522 other => Err(ErrorData::resource_not_found(
523 format!("unknown resource: {other}"),
524 None,
525 )),
526 }
527 }
528
529 async fn list_tools(
530 &self,
531 request: Option<PaginatedRequestParams>,
532 context: RequestContext<RoleServer>,
533 ) -> Result<ListToolsResult, ErrorData> {
534 let _ = (request, context);
535 let mut result = ListToolsResult {
536 tools: self.tool_router.list_all(),
537 next_cursor: None,
538 meta: None,
539 };
540 annotate_tool_limits(&mut result);
541 Ok(result)
542 }
543 }
544
545 fn annotate_tool_limits(result: &mut ListToolsResult) {
546 for tool in &mut result.tools {
547 let chars = match tool.name.as_ref() {
548 "pond_search" => 80_000,
549 "pond_get" => 200_000,
550 _ => continue,
551 };
552 let mut meta = serde_json::Map::new();
553 meta.insert(
554 "anthropic/maxResultSizeChars".to_owned(),
555 serde_json::json!(chars),
556 );
557 tool.meta = Some(Meta(meta));
558 }
559 }
560
561 pub async fn serve_stdio(state: AppState) -> anyhow::Result<()> {
565 let service = PondMcp::new(state)
566 .serve(stdio())
567 .await
568 .context("failed to start stdio MCP server")?;
569 service.waiting().await.context("stdio MCP server error")?;
570 Ok(())
571 }
572
573 fn tool_result(transcript: String) -> CallToolResult {
579 CallToolResult::success(vec![Content::text(transcript)])
580 }
581
582 fn fmt_ts(ts: &chrono::DateTime<chrono::Utc>) -> String {
584 ts.format("%Y-%m-%d %H:%M:%SZ").to_string()
585 }
586
587 fn opt_name(value: &Option<crate::adapter::extract::Extracted<String>>) -> &str {
590 value.as_deref().map(String::as_str).unwrap_or("?")
591 }
592
593 fn push_lines(out: &mut String, body: &str, indent: &str) {
597 use std::fmt::Write;
598 for line in body.lines() {
599 let _ = writeln!(out, "{indent}{line}");
600 }
601 }
602
603 fn render_search_transcript(response: &SearchResponse, request: &SearchRequest) -> String {
604 use std::fmt::Write;
605 if response.sessions.is_empty() {
606 return match request.similar_to.as_deref() {
607 Some(id) => format!("pond_search: no matches similar to {id}.\n"),
608 None => format!("pond_search: no matches for {:?}.\n", request.query),
609 };
610 }
611 let shown: usize = response.sessions.iter().map(|s| s.matches.len()).sum();
612 let sim = request
613 .similar_to
614 .as_deref()
615 .map(|id| format!(" similar to {id}"))
616 .unwrap_or_default();
617 let mut out = String::new();
618 let _ = writeln!(
619 out,
620 "pond_search: {} matches in {} sessions, showing {}{}.",
621 response.matched_total,
622 response.sessions.len(),
623 shown,
624 sim,
625 );
626 let _ = writeln!(
627 out,
628 "key: \"--- [n] score | role | time | message_id | project | agent | session ---\" delimits each hit + matched text. pond_get <message_id> for full; pass cursor to page."
629 );
630 let mut index = 0;
631 for session in &response.sessions {
632 for hit in &session.matches {
633 index += 1;
634 let _ = writeln!(out);
635 let _ = writeln!(
636 out,
637 "{}",
638 rule_line(&format!(
639 "[{index}] {:.2} | {} | {} | {} | {} | {} | {}",
640 hit.score,
641 hit.role.as_str(),
642 fmt_ts(&hit.timestamp),
643 hit.message_id,
644 session.project,
645 session.source_agent,
646 session.session_id,
647 )),
648 );
649 push_lines(&mut out, &hit.text, "");
650 }
651 }
652 if let Some(cursor) = &response.next_cursor {
653 let _ = writeln!(out);
654 let _ = writeln!(out, "cursor: {cursor} (pass as `cursor` to page)");
655 }
656 out
657 }
658
659 fn render_get_transcript(response: &GetResponse, request: &GetRequest) -> String {
660 use std::fmt::Write;
661 let session = &response.session;
662 let mut out = String::new();
663 match &response.result {
664 GetResult::Session {
665 messages,
666 messages_remaining,
667 } => {
668 let mode = match request.response_mode {
669 ResponseMode::Conversational => "conversational",
670 ResponseMode::Complete => "complete",
671 ResponseMode::Verbatim => "verbatim",
672 };
673 let more = if *messages_remaining > 0 {
674 " (more)"
675 } else {
676 ""
677 };
678 let _ = writeln!(
679 out,
680 "pond_get: session {} ({mode}), {} messages{more}.",
681 session.id,
682 messages.len(),
683 );
684 let _ = writeln!(
685 out,
686 "key: \"--- [n] role | time | message_id ---\" delimits each message; \"->\" tool call, \"<-\" result. Pass after_id=<id> to page."
687 );
688 for (idx, message) in messages.iter().enumerate() {
689 let _ = writeln!(out);
690 render_message(
691 &mut out,
692 idx + 1,
693 message,
694 message.parts.as_deref(),
695 &message.parts_summary,
696 false,
697 );
698 }
699 let _ = writeln!(out);
700 let _ = writeln!(
701 out,
702 "session {} | {} | {}",
703 session.id, session.source_agent, session.project,
704 );
705 if *messages_remaining > 0
706 && let Some(last) = messages.last()
707 {
708 let _ = writeln!(
709 out,
710 "... {} more messages; pass after_id={} to pond_get to continue",
711 messages_remaining, last.id,
712 );
713 }
714 }
715 GetResult::Message {
716 target,
717 target_parts,
718 target_parts_remaining,
719 siblings,
720 } => {
721 let _ = writeln!(
722 out,
723 "pond_get: thread around {} in session {} (context +/-{}).",
724 target.id, session.id, request.context_depth,
725 );
726 let _ = writeln!(
727 out,
728 "key: \"--- [n] role | time | message_id ---\" delimits each message; \">\" = the one you requested; \"->\" tool call, \"<-\" result. pond_get <message_id> to expand any line."
729 );
730 let mut thread: Vec<(&MessageView, bool)> =
737 siblings.iter().map(|view| (view, false)).collect();
738 thread.push((target, true));
739 thread.sort_by(|a, b| {
740 a.0.timestamp
741 .cmp(&b.0.timestamp)
742 .then_with(|| a.0.id.cmp(&b.0.id))
743 });
744 thread.retain(|(view, is_target)| *is_target || message_has_content(view));
745 for (idx, (view, is_target)) in thread.iter().enumerate() {
746 let _ = writeln!(out);
747 let parts: Option<&[ResponsePart]> = if *is_target {
748 Some(target_parts.as_slice())
749 } else {
750 view.parts.as_deref()
751 };
752 render_message(
753 &mut out,
754 idx + 1,
755 view,
756 parts,
757 &view.parts_summary,
758 *is_target,
759 );
760 }
761 let _ = writeln!(out);
762 let _ = writeln!(
763 out,
764 "session {} | {} | {}",
765 session.id, session.source_agent, session.project,
766 );
767 if *target_parts_remaining > 0
768 && let Some(last) = target_parts.last()
769 {
770 let _ = writeln!(
771 out,
772 "... {} more parts of {}; pass after_id={} to pond_get to continue",
773 target_parts_remaining, target.id, last.id,
774 );
775 }
776 }
777 }
778 out
779 }
780
781 fn message_has_content(view: &MessageView) -> bool {
785 view.text.as_deref().is_some_and(|t| !t.trim().is_empty())
786 || view
787 .content
788 .as_deref()
789 .is_some_and(|c| !c.trim().is_empty())
790 || view.parts.as_deref().is_some_and(|p| !p.is_empty())
791 || !view.parts_summary.is_empty()
792 }
793
794 const RULE_WIDTH: usize = 72;
796
797 fn rule_line(inner: &str) -> String {
801 let head = format!("--- {inner} ");
802 let pad = RULE_WIDTH.saturating_sub(head.chars().count()).max(3);
803 format!("{head}{}", "-".repeat(pad))
804 }
805
806 fn render_message(
811 out: &mut String,
812 index: usize,
813 view: &MessageView,
814 parts: Option<&[ResponsePart]>,
815 summary: &[PartSummary],
816 is_target: bool,
817 ) {
818 use std::fmt::Write;
819 let marker = if is_target { "> " } else { "" };
820 let _ = writeln!(
821 out,
822 "{}",
823 rule_line(&format!(
824 "[{index}] {marker}{} | {} | {}",
825 view.role.as_str(),
826 fmt_ts(&view.timestamp),
827 view.id,
828 )),
829 );
830 if let Some(text) = &view.text {
831 push_lines(out, text, "");
832 }
833 if let Some(content) = &view.content {
834 push_lines(out, content, "");
835 }
836 match parts {
837 Some(parts) => {
838 for part in parts {
839 render_part_full(out, part);
840 }
841 }
842 None => {
843 for part in summary {
844 render_part_summary(out, part);
845 }
846 }
847 }
848 }
849
850 fn render_part_full(out: &mut String, part: &ResponsePart) {
851 use std::fmt::Write;
852 match &part.kind {
853 PartKind::Text { text } => {
854 if let Some(text) = text {
855 push_lines(out, text, "");
856 }
857 }
858 PartKind::Reasoning { text } => {
859 let _ = writeln!(out, " (reasoning)");
860 if let Some(text) = text {
861 push_lines(out, text, " ");
862 }
863 }
864 PartKind::ToolCall {
865 name,
866 call_id,
867 params,
868 ..
869 } => {
870 let _ = writeln!(out, " -> {} [{}]", opt_name(name), opt_name(call_id));
871 push_lines(out, &value_to_text(params), " ");
872 }
873 PartKind::ToolResult {
874 name,
875 call_id,
876 is_failure,
877 result,
878 } => {
879 let status = if *is_failure { "failed" } else { "ok" };
880 let _ = writeln!(
881 out,
882 " <- {} [{}] ({status})",
883 opt_name(name),
884 opt_name(call_id),
885 );
886 push_lines(out, &value_to_text(result), " ");
887 }
888 PartKind::File {
889 media_type,
890 file_name,
891 ..
892 } => {
893 let _ = writeln!(
894 out,
895 " [file {}]",
896 file_name.as_deref().unwrap_or(media_type)
897 );
898 }
899 PartKind::ToolApprovalRequest { approval_id, .. } => {
900 let _ = writeln!(out, " [approval request {approval_id}]");
901 }
902 PartKind::ToolApprovalResponse {
903 approval_id,
904 approved,
905 ..
906 } => {
907 let verb = if *approved { "approved" } else { "denied" };
908 let _ = writeln!(out, " [approval {approval_id} {verb}]");
909 }
910 }
911 }
912
913 fn render_part_summary(out: &mut String, summary: &PartSummary) {
914 use std::fmt::Write;
915 let label = summary.label.as_deref().unwrap_or("");
916 let call = summary
917 .call_id
918 .as_deref()
919 .map(|id| format!(" [{id}]"))
920 .unwrap_or_default();
921 match summary.kind.as_str() {
922 "tool_call" => {
923 let _ = writeln!(out, " -> {label}{call}");
924 }
925 "tool_result" => {
926 let _ = writeln!(out, " <- {label}{call}");
927 }
928 "file" => {
929 let _ = writeln!(out, " [file {label}]");
930 }
931 other => {
932 let _ = writeln!(out, " [{other} {label}]");
933 }
934 }
935 }
936
937 fn value_to_text(value: &serde_json::Value) -> String {
940 match value {
941 serde_json::Value::String(text) => text.clone(),
942 serde_json::Value::Null => String::new(),
943 other => serde_json::to_string(other).unwrap_or_default(),
944 }
945 }
946
947 fn to_error_data(envelope: &ErrorEnvelope) -> ErrorData {
953 let (jsonrpc_code, pond_code, retryable) = match envelope.error.code {
954 WireErrorCode::ValidationFailed => (-32010, "validation_failed", false),
955 WireErrorCode::VersionUnsupported => (-32011, "version_unsupported", false),
956 WireErrorCode::NotFound => (-32012, "not_found", false),
957 WireErrorCode::NamespaceUnknown => (-32013, "namespace_unknown", false),
958 WireErrorCode::StorageUnavailable => (-32014, "storage_unavailable", true),
959 WireErrorCode::Conflict => (-32015, "conflict", true),
960 WireErrorCode::Internal => (-32016, "internal", false),
961 };
962 let mut data = match &envelope.error.details {
963 serde_json::Value::Object(map) => map.clone(),
964 _ => serde_json::Map::new(),
965 };
966 data.insert("pond_code".to_owned(), serde_json::json!(pond_code));
967 data.insert("retryable".to_owned(), serde_json::json!(retryable));
968 ErrorData::new(
969 JsonRpcErrorCode(jsonrpc_code),
970 envelope.error.message.clone(),
971 Some(serde_json::Value::Object(data)),
972 )
973 }
974
975 #[cfg(test)]
976 mod tests {
977 #![allow(clippy::expect_used, clippy::unwrap_used)]
978
979 use std::sync::Arc;
980
981 use rmcp::model::{ErrorCode as JsonRpcErrorCode, Tool};
982
983 use super::*;
984 use crate::wire::{ErrorBody, ErrorCode, Role, SearchResponse, SearchResult};
985
986 #[test]
987 fn error_data_carries_code_and_retryability() {
988 let cases = [
989 (
990 ErrorCode::ValidationFailed,
991 -32010,
992 "validation_failed",
993 false,
994 ),
995 (
996 ErrorCode::VersionUnsupported,
997 -32011,
998 "version_unsupported",
999 false,
1000 ),
1001 (ErrorCode::NotFound, -32012, "not_found", false),
1002 (
1003 ErrorCode::NamespaceUnknown,
1004 -32013,
1005 "namespace_unknown",
1006 false,
1007 ),
1008 (
1009 ErrorCode::StorageUnavailable,
1010 -32014,
1011 "storage_unavailable",
1012 true,
1013 ),
1014 (ErrorCode::Conflict, -32015, "conflict", true),
1015 (ErrorCode::Internal, -32016, "internal", false),
1016 ];
1017 for (code, jsonrpc, pond_code, retryable) in cases {
1018 let error = to_error_data(&ErrorEnvelope {
1019 error: ErrorBody {
1020 code,
1021 message: "boom".to_owned(),
1022 details: serde_json::json!({"detail": 1}),
1023 },
1024 });
1025 assert_eq!(error.code, JsonRpcErrorCode(jsonrpc));
1026 let data = error.data.unwrap();
1027 assert_eq!(data["detail"], serde_json::json!(1));
1028 assert_eq!(data["pond_code"], serde_json::json!(pond_code));
1029 assert_eq!(data["retryable"], serde_json::json!(retryable));
1030 assert!(
1031 data.get("request_id").is_none(),
1032 "MCP errors use JSON-RPC ids for correlation"
1033 );
1034 }
1035 }
1036
1037 #[test]
1038 fn annotate_tool_limits_sets_anthropic_meta() {
1039 let schema = Arc::new(serde_json::Map::new());
1040 let mut result = ListToolsResult {
1041 tools: vec![
1042 Tool::new("pond_search", "Search", Arc::clone(&schema)),
1043 Tool::new("pond_get", "Get", Arc::clone(&schema)),
1044 ],
1045 next_cursor: None,
1046 meta: None,
1047 };
1048 annotate_tool_limits(&mut result);
1049 let value = |name: &str| {
1050 result
1051 .tools
1052 .iter()
1053 .find(|tool| tool.name == name)
1054 .and_then(|tool| tool.meta.as_ref())
1055 .and_then(|meta| meta.0.get("anthropic/maxResultSizeChars"))
1056 .and_then(serde_json::Value::as_i64)
1057 };
1058 assert_eq!(value("pond_search"), Some(80_000));
1059 assert_eq!(value("pond_get"), Some(200_000));
1060 }
1061
1062 #[test]
1063 fn get_transcript_marks_target_and_renders_tool_parts() {
1064 let ts = chrono::DateTime::from_timestamp(0, 0).unwrap();
1065 let tool_call: ResponsePart = serde_json::from_value(serde_json::json!({
1066 "id": "p1", "ordinal": 0, "provenance": "conversational",
1067 "type": "tool_call", "name": "Bash", "call_id": "toolu_x",
1068 "params": { "command": "ls" }, "provider_executed": false,
1069 }))
1070 .unwrap();
1071 let tool_result: ResponsePart = serde_json::from_value(serde_json::json!({
1072 "id": "p2", "ordinal": 1, "provenance": "conversational",
1073 "type": "tool_result", "name": "Bash", "call_id": "toolu_x",
1074 "is_failure": false, "result": "file.txt",
1075 }))
1076 .unwrap();
1077 let target = MessageView {
1078 id: "m1".to_owned(),
1079 role: crate::wire::Role::Assistant,
1080 timestamp: ts,
1081 text: Some("Let me list files.".to_owned()),
1082 content: None,
1083 parts_summary: Vec::new(),
1084 parts: None,
1085 };
1086 let response = GetResponse {
1087 session: crate::wire::GetSession {
1088 id: "s1".to_owned(),
1089 source_agent: "claude-code".to_owned(),
1090 project: "/p".to_owned(),
1091 created_at: ts,
1092 },
1093 result: GetResult::Message {
1094 target,
1095 target_parts: vec![tool_call, tool_result],
1096 target_parts_remaining: 0,
1097 siblings: Vec::new(),
1098 },
1099 };
1100 let request = GetRequest {
1101 protocol_version: crate::PROTOCOL_VERSION,
1102 namespace: None,
1103 session_id: None,
1104 message_id: Some("m1".to_owned()),
1105 context_depth: 0,
1106 limit: 20,
1107 response_mode: ResponseMode::default(),
1108 after_id: None,
1109 };
1110
1111 let transcript = render_get_transcript(&response, &request);
1112 assert!(transcript.contains("--- [1] > assistant | 1970-01-01 00:00:00Z | m1 ---"));
1113 assert!(transcript.contains("Let me list files."));
1114 assert!(transcript.contains(" -> Bash [toolu_x]"));
1115 assert!(transcript.contains(" <- Bash [toolu_x] (ok)"));
1116 assert!(transcript.contains("session s1 | claude-code | /p"));
1117 }
1118
1119 #[test]
1120 fn search_transcript_renders_header_and_hits() {
1121 let response = SearchResponse {
1122 sessions: vec![crate::wire::SearchSession {
1123 session_id: "s1".to_owned(),
1124 project: "pond".to_owned(),
1125 source_agent: "claude-code".to_owned(),
1126 session_messages_count: 2,
1127 matched_message_count: 1,
1128 matches: vec![SearchResult {
1129 message_id: "m1".to_owned(),
1130 role: Role::User,
1131 timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
1132 text: "hello\nworld".to_owned(),
1133 score: 1.0,
1134 parts_summary: Vec::new(),
1135 }],
1136 }],
1137 matched_total: 1,
1138 has_more: false,
1139 next_cursor: None,
1140 };
1141 let request = SearchRequest {
1142 protocol_version: crate::PROTOCOL_VERSION,
1143 namespace: None,
1144 query: "hi".to_owned(),
1145 mode_override: None,
1146 similar_to: None,
1147 filters: SearchFilters::default(),
1148 limit: 10,
1149 cursor: None,
1150 };
1151
1152 let transcript = render_search_transcript(&response, &request);
1153 assert!(transcript.starts_with("pond_search: 1 matches in 1 sessions, showing 1."));
1154 assert!(transcript.contains("key: \"--- [n] score | role | time | message_id"));
1155 assert!(transcript.contains(
1158 "--- [1] 1.00 | user | 1970-01-01 00:00:00Z | m1 | pond | claude-code | s1"
1159 ));
1160 assert!(transcript.contains("hello\nworld"));
1162
1163 let result = tool_result(transcript);
1166 assert!(result.content[0].raw.as_text().is_some());
1167 assert!(result.structured_content.is_none());
1168 }
1169 }
1170}