Skip to main content

pond/
wire.rs

1use std::collections::BTreeMap;
2
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use serde_json::{Map, Value};
6use uuid::Uuid;
7
8use crate::PROTOCOL_VERSION;
9use crate::adapter::Extracted;
10
11pub type ProviderOptions = BTreeMap<String, Value>;
12
13#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
14pub struct Session {
15    pub id: String,
16    #[serde(skip_serializing_if = "Option::is_none")]
17    pub parent_session_id: Option<String>,
18    /// spec.md#model-parent-pointer-coherence: when set, `parent_session_id`
19    /// MUST also be set. Spawn-only sources (claude-code subagents,
20    /// nanoclaw) leave this `None`; fork-with-cut-point sources
21    /// (pi-coding-agent) populate both pointers together.
22    #[serde(skip_serializing_if = "Option::is_none")]
23    pub parent_message_id: Option<String>,
24    pub source_agent: String,
25    pub created_at: DateTime<Utc>,
26    pub project: Extracted<String>,
27    #[serde(default)]
28    pub options: ProviderOptions,
29}
30
31#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
32#[serde(tag = "role", rename_all = "snake_case")]
33pub enum Message {
34    System {
35        id: String,
36        session_id: String,
37        timestamp: DateTime<Utc>,
38        /// `None` when the source row carried no content. The seal on
39        /// `Extracted<String>` means adapters CANNOT pass a synthesized
40        /// or sentinel string here - the value either flows from a
41        /// `Source` extraction or the field is `None`. Distinguishes
42        /// "source said content=''" (Some(extracted_empty)) from
43        /// "source had no content field" (None).
44        #[serde(default, skip_serializing_if = "Option::is_none")]
45        content: Option<Extracted<String>>,
46        #[serde(default)]
47        options: ProviderOptions,
48    },
49    User {
50        id: String,
51        session_id: String,
52        timestamp: DateTime<Utc>,
53        #[serde(default)]
54        options: ProviderOptions,
55    },
56    Assistant {
57        id: String,
58        session_id: String,
59        timestamp: DateTime<Utc>,
60        #[serde(default)]
61        options: ProviderOptions,
62    },
63    Tool {
64        id: String,
65        session_id: String,
66        timestamp: DateTime<Utc>,
67        #[serde(default)]
68        options: ProviderOptions,
69    },
70}
71
72impl Message {
73    pub fn id(&self) -> &str {
74        match self {
75            Self::System { id, .. }
76            | Self::User { id, .. }
77            | Self::Assistant { id, .. }
78            | Self::Tool { id, .. } => id,
79        }
80    }
81
82    pub fn session_id(&self) -> &str {
83        match self {
84            Self::System { session_id, .. }
85            | Self::User { session_id, .. }
86            | Self::Assistant { session_id, .. }
87            | Self::Tool { session_id, .. } => session_id,
88        }
89    }
90
91    pub fn role(&self) -> Role {
92        match self {
93            Self::System { .. } => Role::System,
94            Self::User { .. } => Role::User,
95            Self::Assistant { .. } => Role::Assistant,
96            Self::Tool { .. } => Role::Tool,
97        }
98    }
99
100    pub fn timestamp(&self) -> DateTime<Utc> {
101        match self {
102            Self::System { timestamp, .. }
103            | Self::User { timestamp, .. }
104            | Self::Assistant { timestamp, .. }
105            | Self::Tool { timestamp, .. } => *timestamp,
106        }
107    }
108
109    pub fn options(&self) -> &ProviderOptions {
110        match self {
111            Self::System { options, .. }
112            | Self::User { options, .. }
113            | Self::Assistant { options, .. }
114            | Self::Tool { options, .. } => options,
115        }
116    }
117
118    pub fn system_content(&self) -> Option<&str> {
119        match self {
120            // Two layers of `as_deref`: the outer `Option<Extracted<String>>`
121            // becomes `Option<&Extracted<String>>`, then `Extracted: Deref`
122            // unwraps to `&str`.
123            Self::System { content, .. } => content.as_deref().map(|e| &**e),
124            Self::User { .. } | Self::Assistant { .. } | Self::Tool { .. } => None,
125        }
126    }
127}
128
129#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
130#[serde(rename_all = "snake_case")]
131pub enum Role {
132    System,
133    User,
134    Assistant,
135    Tool,
136}
137
138impl Role {
139    pub fn as_str(self) -> &'static str {
140        match self {
141            Self::System => "system",
142            Self::User => "user",
143            Self::Assistant => "assistant",
144            Self::Tool => "tool",
145        }
146    }
147}
148
149/// Whether a Part's content is conversation or harness-injected scaffolding
150/// (spec.md#model-part-provenance). No `Default` and no `#[serde(default)]` on the
151/// `Part.provenance` field below: constructing a Part without classifying it
152/// MUST be a compile error (spec.md#adapter-provenance-required).
153#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
154#[serde(rename_all = "snake_case")]
155pub enum Provenance {
156    Conversational,
157    Injected,
158}
159
160impl Provenance {
161    pub fn as_str(self) -> &'static str {
162        match self {
163            Self::Conversational => "conversational",
164            Self::Injected => "injected",
165        }
166    }
167}
168
169#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
170pub struct Part {
171    pub session_id: String,
172    pub id: String,
173    pub message_id: String,
174    pub ordinal: i32,
175    /// Conversation vs harness-injected (spec.md#model-part-provenance). Mandatory,
176    /// no serde default - search reads it to exclude injected scaffolding.
177    pub provenance: Provenance,
178    #[serde(default)]
179    pub options: ProviderOptions,
180    #[serde(flatten)]
181    pub kind: PartKind,
182}
183
184#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
185#[serde(tag = "type", rename_all = "snake_case")]
186pub enum PartKind {
187    Text {
188        /// `None` when the source row had no text field. The seal on
189        /// `Extracted<String>` means adapters CANNOT pass a synthesized
190        /// empty string or any other placeholder here - the value either
191        /// flows from a `Source` extraction or the field is `None`.
192        #[serde(default, skip_serializing_if = "Option::is_none")]
193        text: Option<Extracted<String>>,
194    },
195    Reasoning {
196        /// `None` when the source row had no reasoning text. Type-system
197        /// guard against `unwrap_or_default()`-style fallbacks: the
198        /// `Extracted<String>` seal forces the adapter to either get the
199        /// value from a `Source` or admit it is absent.
200        #[serde(default, skip_serializing_if = "Option::is_none")]
201        text: Option<Extracted<String>>,
202    },
203    File {
204        /// `None` when the source row carried no MIME hint. Sealed against
205        /// `unwrap_or("application/octet-stream")`-style fallbacks: an absent
206        /// type is faithfully absent, not a synthesized default
207        /// (spec.md#model-no-synthesis).
208        #[serde(default, skip_serializing_if = "Option::is_none")]
209        media_type: Option<String>,
210        #[serde(skip_serializing_if = "Option::is_none")]
211        file_name: Option<String>,
212        data: FileData,
213    },
214    ToolCall {
215        /// `None` when the source carried no call_id (rare; malformed).
216        /// Sealed via `Extracted<String>` - empty-string sentinels are
217        /// not constructable from adapter code.
218        #[serde(default, skip_serializing_if = "Option::is_none")]
219        call_id: Option<Extracted<String>>,
220        /// `None` when the source carried no tool name. claude-code
221        /// always carries it on `tool_use` rows; codex-cli sometimes
222        /// has placeholder shapes. The seal makes synthesized names
223        /// unconstructable from adapter code (spec.md#model-no-synthesis).
224        #[serde(default, skip_serializing_if = "Option::is_none")]
225        name: Option<Extracted<String>>,
226        params: Value,
227        provider_executed: bool,
228    },
229    ToolResult {
230        /// `None` when the source carried no `tool_use_id` link.
231        #[serde(default, skip_serializing_if = "Option::is_none")]
232        call_id: Option<Extracted<String>>,
233        /// `None` when the adapter could not resolve the tool name.
234        /// In claude-code, name lives only on the prior `tool_use` row;
235        /// the adapter resolves via a per-file `tool_use_id -> name`
236        /// map and surfaces a miss (e.g. compaction pruned the originating
237        /// call) as `None`, never as a fabricated string
238        /// (spec.md#model-no-synthesis).
239        #[serde(default, skip_serializing_if = "Option::is_none")]
240        name: Option<Extracted<String>>,
241        is_failure: bool,
242        result: Value,
243    },
244    ToolApprovalRequest {
245        approval_id: String,
246        tool_call_id: String,
247    },
248    ToolApprovalResponse {
249        approval_id: String,
250        approved: bool,
251        #[serde(skip_serializing_if = "Option::is_none")]
252        reason: Option<String>,
253    },
254}
255
256impl PartKind {
257    pub fn type_name(&self) -> &'static str {
258        match self {
259            Self::Text { .. } => "text",
260            Self::Reasoning { .. } => "reasoning",
261            Self::File { .. } => "file",
262            Self::ToolCall { .. } => "tool_call",
263            Self::ToolResult { .. } => "tool_result",
264            Self::ToolApprovalRequest { .. } => "tool_approval_request",
265            Self::ToolApprovalResponse { .. } => "tool_approval_response",
266        }
267    }
268}
269
270#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
271#[serde(tag = "kind", content = "value", rename_all = "snake_case")]
272pub enum FileData {
273    String(String),
274    Bytes(Vec<u8>),
275    Url(String),
276}
277
278#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
279#[serde(rename_all = "snake_case")]
280pub enum ErrorCode {
281    ValidationFailed,
282    VersionUnsupported,
283    NotFound,
284    NamespaceUnknown,
285    StorageUnavailable,
286    Conflict,
287    Internal,
288}
289
290#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
291pub struct ErrorBody {
292    pub code: ErrorCode,
293    pub message: String,
294    #[serde(default)]
295    pub details: Value,
296}
297
298#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
299pub struct ErrorEnvelope {
300    pub error: ErrorBody,
301}
302
303// The success/error size gap is fine here: a `GetEnvelope` is one per-request
304// return value, serialized immediately - never stored in bulk where the gap
305// would waste memory.
306#[allow(clippy::large_enum_variant)]
307#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
308#[serde(untagged)]
309pub enum GetEnvelope {
310    Success(GetResponse),
311    Error(ErrorEnvelope),
312}
313
314#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
315pub struct GetRequest {
316    pub protocol_version: u16,
317    #[serde(default)]
318    pub namespace: Option<String>,
319    // Mutually exclusive scopes.
320    #[serde(default)]
321    pub session_id: Option<String>,
322    #[serde(default)]
323    pub message_id: Option<String>,
324    /// `message_id` mode only: symmetric window radius, mirroring `grep -C`.
325    /// Returns up to `2*context_depth` siblings around the target. Ignored in
326    /// session mode.
327    #[serde(default)]
328    pub context_depth: usize,
329    #[serde(default = "default_get_limit")]
330    pub limit: usize,
331    /// Ignored in `message_id` mode (that mode always returns the target with
332    /// its full parts, paginated).
333    #[serde(default)]
334    pub response_mode: ResponseMode,
335    /// Exclusive continuation anchor: last `message_id` in session mode, last
336    /// `part_id` in message mode. The append-only invariant means one id is
337    /// enough state - no opaque cursor.
338    #[serde(default)]
339    pub after_id: Option<String>,
340}
341
342/// How much of each message `pond_get` materializes (spec.md#protocol).
343/// Ignored when `message_id` is set.
344#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
345#[serde(rename_all = "snake_case")]
346pub enum ResponseMode {
347    /// `search_text IS NOT NULL` conversational view + part summaries.
348    #[default]
349    Conversational,
350    /// All messages (including carriers) + part summaries.
351    Complete,
352    /// All messages + full parts inline (heaviest mode).
353    Verbatim,
354}
355
356/// The session header is always present; `result` carries the mode-specific
357/// payload, discriminated by a `scope` tag (spec.md#protocol). Flattened so a
358/// client reads `session` / `scope` / payload fields off one object - no
359/// `session.session` nesting.
360#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
361pub struct GetResponse {
362    pub session: GetSession,
363    #[serde(flatten)]
364    pub result: GetResult,
365}
366
367/// Trimmed session header (spec.md#protocol): adapter-redundant `options`,
368/// parent pointers (served by `restore_lineage`), and per-message session id
369/// dropped to keep `pond_get` responses lean for agent context windows.
370#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
371pub struct GetSession {
372    pub id: String,
373    pub source_agent: String,
374    pub project: String,
375    pub created_at: DateTime<Utc>,
376}
377
378impl GetSession {
379    pub fn from_session(session: &Session) -> Self {
380        Self {
381            id: session.id.clone(),
382            source_agent: session.source_agent.clone(),
383            project: (*session.project).clone(),
384            created_at: session.created_at,
385        }
386    }
387}
388
389/// Per-message view in a `pond_get` response. `parts_summary` is always
390/// present (possibly empty); `parts` is populated only in Verbatim mode.
391#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
392pub struct MessageView {
393    pub id: String,
394    pub role: Role,
395    pub timestamp: DateTime<Utc>,
396    /// Conversational text (`search_text`); absent for carrier rows.
397    #[serde(default, skip_serializing_if = "Option::is_none")]
398    pub text: Option<String>,
399    /// System-message content string, when the source carried one.
400    #[serde(default, skip_serializing_if = "Option::is_none")]
401    pub content: Option<String>,
402    #[serde(default, skip_serializing_if = "Vec::is_empty")]
403    pub parts_summary: Vec<PartSummary>,
404    #[serde(default, skip_serializing_if = "Option::is_none")]
405    pub parts: Option<Vec<ResponsePart>>,
406}
407
408/// Compact per-part descriptor (spec.md#protocol): enough to tell what a
409/// message carries without paying for full content. `call_id` is populated
410/// for `tool_call` / `tool_result` only.
411#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
412pub struct PartSummary {
413    pub kind: String,
414    #[serde(default, skip_serializing_if = "Option::is_none")]
415    pub label: Option<String>,
416    #[serde(default, skip_serializing_if = "Option::is_none")]
417    pub call_id: Option<String>,
418}
419
420impl PartSummary {
421    /// Project a canonical [`PartKind`] into its compact response descriptor, or
422    /// `None` for a kind that does not earn a summary. Exhaustive on purpose - a
423    /// new `PartKind` variant must decide here. `call_id` is carried for
424    /// `tool_call` / `tool_result` only.
425    ///
426    /// `text` and `reasoning` return `None`: a text part's content already rides
427    /// the message's `text`/`content` (a summary would duplicate it), and
428    /// reasoning is deliberately not surfaced by default (its full body is still
429    /// reachable in verbatim mode). The kinds that survive are exactly
430    /// [`SUMMARY_PART_TYPES`].
431    pub fn for_kind(kind: &PartKind) -> Option<Self> {
432        let (label, call_id) = match kind {
433            PartKind::Text { .. } | PartKind::Reasoning { .. } => return None,
434            PartKind::File {
435                media_type,
436                file_name,
437                ..
438            } => (file_name.clone().or_else(|| media_type.clone()), None),
439            PartKind::ToolCall { name, call_id, .. } => {
440                (name.as_deref().cloned(), call_id.as_deref().cloned())
441            }
442            PartKind::ToolResult {
443                name,
444                call_id,
445                is_failure,
446                ..
447            } => {
448                let label = name.as_deref().map(|name| {
449                    if *is_failure {
450                        format!("{name} (failed)")
451                    } else {
452                        name.clone()
453                    }
454                });
455                (label, call_id.as_deref().cloned())
456            }
457            PartKind::ToolApprovalRequest { approval_id, .. } => (Some(approval_id.clone()), None),
458            PartKind::ToolApprovalResponse {
459                approval_id,
460                approved,
461                ..
462            } => {
463                let verb = if *approved { "approved" } else { "denied" };
464                (Some(format!("{approval_id} ({verb})")), None)
465            }
466        };
467        Some(Self {
468            kind: kind.type_name().to_owned(),
469            label,
470            call_id,
471        })
472    }
473}
474
475/// Canonical part `type` names that yield a [`PartSummary`] - every kind except
476/// `text` and `reasoning` (see [`PartSummary::for_kind`], the source of truth).
477/// The summary read paths filter the parts scan to these so a text/reasoning
478/// heavy session never loads parts that would summarize to nothing.
479pub const SUMMARY_PART_TYPES: &[&str] = &[
480    "file",
481    "tool_call",
482    "tool_result",
483    "tool_approval_request",
484    "tool_approval_response",
485];
486
487/// A `Part` as it rides a `pond_get` response (spec.md#protocol): the canonical
488/// part minus `session_id` / `message_id`, which the enclosing session and
489/// message already identify. Built from a canonical [`Part`] in the handler.
490#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
491pub struct ResponsePart {
492    pub id: String,
493    pub ordinal: i32,
494    pub provenance: Provenance,
495    #[serde(default, skip_serializing_if = "ProviderOptions::is_empty")]
496    pub options: ProviderOptions,
497    #[serde(flatten)]
498    pub kind: PartKind,
499}
500
501impl ResponsePart {
502    pub fn from_part(part: Part) -> Self {
503        Self {
504            id: part.id,
505            ordinal: part.ordinal,
506            provenance: part.provenance,
507            options: part.options,
508            kind: part.kind,
509        }
510    }
511}
512
513/// Mode-specific `pond_get` payload, tagged by `scope` and flattened into
514/// `GetResponse` alongside the shared session header.
515#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
516#[serde(tag = "scope", rename_all = "snake_case")]
517pub enum GetResult {
518    Session {
519        messages: Vec<MessageView>,
520        messages_remaining: usize,
521    },
522    Message {
523        target: MessageView,
524        target_parts: Vec<ResponsePart>,
525        target_parts_remaining: usize,
526        /// Up to `2*context_depth` messages around the target (target excluded).
527        siblings: Vec<MessageView>,
528    },
529}
530
531#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
532#[serde(untagged)]
533pub enum SearchEnvelope {
534    Success(SearchResponse),
535    Error(ErrorEnvelope),
536}
537
538/// JSON shape is externally tagged: `{"contains": "pond"}` or
539/// `{"regex": "^/Users/.*"}`.
540#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
541#[serde(rename_all = "snake_case")]
542pub enum ProjectFilter {
543    Contains(String),
544    Regex(String),
545}
546
547#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
548pub struct SearchRequest {
549    pub protocol_version: u16,
550    #[serde(default)]
551    pub namespace: Option<String>,
552    pub query: String,
553    // Server normally decides between hybrid and FTS-only from the embedder +
554    // embeddings-coverage state (spec.md#search); `mode_override` is the
555    // operator-tooling escape hatch consumed by the `scripts/search-benchmarks/`
556    // harness. Production callers (MCP, HTTP agents) should leave it `None`.
557    #[serde(default, skip_serializing_if = "Option::is_none")]
558    pub mode_override: Option<SearchModeWire>,
559    /// When set, retrieve messages similar to this stored message - pond uses
560    /// the message's stored `vector` directly as the query, runs vector-only
561    /// kNN, and ignores `query` and the FTS arm. The stored vector was
562    /// derived from `search_text` (`spec.md#session-embed-from-canonical`), so the
563    /// signal is already filtered of harness-injected parts. Filters and
564    /// `limit` still apply.
565    #[serde(default, skip_serializing_if = "Option::is_none")]
566    pub similar_to: Option<String>,
567    #[serde(default)]
568    pub filters: SearchFilters,
569    #[serde(default = "default_limit")]
570    pub limit: usize,
571    #[serde(default, skip_serializing_if = "Option::is_none")]
572    pub cursor: Option<String>,
573}
574
575/// Wire-level retrieval mode override (spec.md#search). Not normally set on
576/// the wire - the server decides hybrid vs FTS-only from embedding
577/// availability. The variant exists so operator tooling (`pond search --mode`,
578/// the embeddings-benchmark harness) can force one arm without an env-var
579/// backdoor.
580#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
581#[serde(rename_all = "lowercase")]
582pub enum SearchModeWire {
583    Fts,
584    Vector,
585    Hybrid,
586}
587
588#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize)]
589pub struct SearchFilters {
590    #[serde(default, skip_serializing_if = "Option::is_none")]
591    pub project: Option<ProjectFilter>,
592    #[serde(default, skip_serializing_if = "Option::is_none")]
593    pub session_id: Option<String>,
594    #[serde(default, skip_serializing_if = "Option::is_none")]
595    pub source_agent: Option<String>,
596    #[serde(default, skip_serializing_if = "Option::is_none")]
597    pub from_date: Option<String>,
598    #[serde(default, skip_serializing_if = "Option::is_none")]
599    pub to_date: Option<String>,
600    #[serde(default, skip_serializing_if = "Option::is_none")]
601    pub role: Option<String>,
602    // Skip the default 0.0 so an unfiltered cursor/request stays compact.
603    #[serde(default, skip_serializing_if = "is_zero_f64")]
604    pub min_score: f64,
605}
606
607impl SearchFilters {
608    fn is_default(&self) -> bool {
609        *self == Self::default()
610    }
611}
612
613fn is_zero_f64(value: &f64) -> bool {
614    *value == 0.0
615}
616
617#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
618pub struct SearchResponse {
619    pub sessions: Vec<SearchSession>,
620    pub matched_total: usize,
621    pub has_more: bool,
622    #[serde(default, skip_serializing_if = "Option::is_none")]
623    pub next_cursor: Option<String>,
624}
625
626#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
627pub struct SearchSession {
628    pub session_id: String,
629    pub project: String,
630    pub source_agent: String,
631    pub session_messages_count: usize,
632    pub matched_message_count: usize,
633    pub matches: Vec<SearchResult>,
634}
635
636#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
637pub struct SearchResult {
638    pub message_id: String,
639    pub role: Role,
640    pub timestamp: DateTime<Utc>,
641    pub text: String,
642    pub score: f64,
643    /// Populated only for user-role hits: distinguishes a plain-text prompt
644    /// from one carrying file attachments or multi-part scaffolding.
645    #[serde(default, skip_serializing_if = "Vec::is_empty")]
646    pub parts_summary: Vec<PartSummary>,
647}
648
649#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
650pub struct SearchCursor {
651    pub query: String,
652    #[serde(default, skip_serializing_if = "Option::is_none")]
653    pub similar_to: Option<String>,
654    #[serde(default, skip_serializing_if = "SearchFilters::is_default")]
655    pub filters: SearchFilters,
656    pub offset: usize,
657}
658
659#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
660#[serde(untagged)]
661pub enum IngestEnvelope {
662    Success(IngestResponse),
663    Error(ErrorEnvelope),
664}
665
666#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
667pub struct IngestRequest {
668    pub protocol_version: u16,
669    #[serde(default)]
670    pub namespace: Option<String>,
671    pub events: Vec<crate::sessions::IngestEvent>,
672}
673
674/// `pond_ingest` response (spec.md#protocol). `accepted = inserted + matched`,
675/// `rejected = error`; both derived from `results`. Per-row `results[]` is
676/// the contract clients rely on to reconcile retries (the PK is echoed so
677/// the client can match outcomes back to its input even when `index` is not
678/// enough). Each result reports the input event's `index`, `kind`, `pk`,
679/// `status`, and an `error` body when `status = "error"`.
680#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
681pub struct IngestResponse {
682    pub accepted: usize,
683    pub rejected: usize,
684    pub results: Vec<IngestResult>,
685}
686
687/// One row of `pond_ingest` per-row output (spec.md#protocol).
688#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
689pub struct IngestResult {
690    /// Position in the request's `events` array (0-based).
691    pub index: usize,
692    /// `"session"` | `"message"` | `"part"`, matching `IngestEvent::kind`.
693    pub kind: String,
694    /// Echoed primary key: scalar for session, `[session_id, message_id]` for
695    /// message, `[session_id, message_id, part_id]` for part. Lets clients reconcile
696    /// against their own state on retry.
697    pub pk: Value,
698    pub status: IngestStatus,
699    /// Set only when `status = "error"`. Carries the same shape as the
700    /// envelope-level error body.
701    #[serde(default, skip_serializing_if = "Option::is_none")]
702    pub error: Option<ErrorBody>,
703}
704
705#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
706#[serde(rename_all = "snake_case")]
707pub enum IngestStatus {
708    /// New PK; `merge_insert` wrote a fresh row.
709    Inserted,
710    /// PK existed; `merge_insert` matched it (no-op per spec.md#adapter-integrity-additive-sync).
711    Matched,
712    /// Per-row failure: validation or storage error. See `error` field.
713    Error,
714}
715
716fn default_limit() -> usize {
717    10
718}
719
720pub fn new_request_id() -> String {
721    format!("req_{}", Uuid::now_v7())
722}
723
724pub const DEFAULT_NAMESPACE: &str = "local";
725
726pub fn default_namespace() -> String {
727    DEFAULT_NAMESPACE.to_owned()
728}
729
730fn default_get_limit() -> usize {
731    20
732}
733
734pub fn validate_protocol(version: u16) -> Result<(), ErrorEnvelope> {
735    if version == PROTOCOL_VERSION {
736        return Ok(());
737    }
738
739    Err(error(
740        ErrorCode::VersionUnsupported,
741        "unsupported protocol_version",
742        serde_json::json!({
743            "received": version,
744            "supported": [PROTOCOL_VERSION],
745        }),
746    ))
747}
748
749pub fn error(code: ErrorCode, message: impl Into<String>, details: Value) -> ErrorEnvelope {
750    ErrorEnvelope {
751        error: ErrorBody {
752            code,
753            message: message.into(),
754            details,
755        },
756    }
757}
758
759impl From<crate::Error> for ErrorEnvelope {
760    fn from(error_value: crate::Error) -> Self {
761        match error_value {
762            crate::Error::Validation {
763                message,
764                field,
765                value,
766                expected,
767            } => error(
768                ErrorCode::ValidationFailed,
769                message,
770                validation_details(field, value, expected),
771            ),
772            crate::Error::NotFound { message, kind, pk } => error(
773                ErrorCode::NotFound,
774                message,
775                serde_json::json!({ "kind": kind, "pk": pk }),
776            ),
777            crate::Error::NamespaceUnknown { namespace } => error(
778                ErrorCode::NamespaceUnknown,
779                "namespace unknown",
780                serde_json::json!({ "namespace": namespace }),
781            ),
782            crate::Error::Conflict { attempts } => error(
783                ErrorCode::Conflict,
784                "commit conflict after retries exhausted",
785                serde_json::json!({ "attempts": attempts }),
786            ),
787            crate::Error::Storage(error_value) => storage_error(error_value),
788            crate::Error::Internal(message) => {
789                error(ErrorCode::Internal, message, serde_json::json!({}))
790            }
791        }
792    }
793}
794
795fn validation_details(
796    field: Option<String>,
797    value: Option<Value>,
798    expected: Option<String>,
799) -> Value {
800    let mut details = Map::new();
801    if let Some(field) = field {
802        details.insert("field".to_owned(), Value::String(field));
803    }
804    if let Some(value) = value {
805        details.insert("value".to_owned(), value);
806    }
807    if let Some(expected) = expected {
808        details.insert("expected".to_owned(), Value::String(expected));
809    }
810    Value::Object(details)
811}
812
813pub fn storage_error(error_value: anyhow::Error) -> ErrorEnvelope {
814    error(
815        ErrorCode::StorageUnavailable,
816        "storage operation failed",
817        serde_json::json!({ "underlying": error_value.to_string() }),
818    )
819}
820
821#[cfg(test)]
822mod tests {
823    #![allow(clippy::expect_used, clippy::unwrap_used)]
824
825    use super::*;
826    use serde_json::json;
827
828    #[test]
829    fn wire_envelope_carries_conflict_code_and_attempts_detail() {
830        let envelope: ErrorEnvelope = crate::Error::Conflict { attempts: 3 }.into();
831        assert_eq!(envelope.error.code, ErrorCode::Conflict);
832        assert_eq!(envelope.error.details, json!({ "attempts": 3 }));
833    }
834}