Skip to main content

pond/
wire.rs

1use std::collections::BTreeMap;
2
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use serde_json::{Map, Value};
6use uuid::Uuid;
7
8use crate::PROTOCOL_VERSION;
9use crate::adapter::Extracted;
10
11pub type ProviderOptions = BTreeMap<String, Value>;
12
13#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
14pub struct Session {
15    pub id: String,
16    #[serde(skip_serializing_if = "Option::is_none")]
17    pub parent_session_id: Option<String>,
18    /// spec.md#model-parent-pointer-coherence: when set, `parent_session_id`
19    /// MUST also be set. Spawn-only sources (claude-code subagents,
20    /// nanoclaw) leave this `None`; fork-with-cut-point sources
21    /// (pi-mono) populate both pointers together.
22    #[serde(skip_serializing_if = "Option::is_none")]
23    pub parent_message_id: Option<String>,
24    pub source_agent: String,
25    pub created_at: DateTime<Utc>,
26    pub project: Extracted<String>,
27    #[serde(default)]
28    pub options: ProviderOptions,
29}
30
31#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
32#[serde(tag = "role", rename_all = "snake_case")]
33pub enum Message {
34    System {
35        id: String,
36        session_id: String,
37        timestamp: DateTime<Utc>,
38        /// `None` when the source row carried no content. The seal on
39        /// `Extracted<String>` means adapters CANNOT pass a synthesized
40        /// or sentinel string here - the value either flows from a
41        /// `Source` extraction or the field is `None`. Distinguishes
42        /// "source said content=''" (Some(extracted_empty)) from
43        /// "source had no content field" (None).
44        #[serde(default, skip_serializing_if = "Option::is_none")]
45        content: Option<Extracted<String>>,
46        #[serde(default)]
47        options: ProviderOptions,
48    },
49    User {
50        id: String,
51        session_id: String,
52        timestamp: DateTime<Utc>,
53        #[serde(default)]
54        options: ProviderOptions,
55    },
56    Assistant {
57        id: String,
58        session_id: String,
59        timestamp: DateTime<Utc>,
60        #[serde(default)]
61        options: ProviderOptions,
62    },
63    Tool {
64        id: String,
65        session_id: String,
66        timestamp: DateTime<Utc>,
67        #[serde(default)]
68        options: ProviderOptions,
69    },
70}
71
72impl Message {
73    pub fn id(&self) -> &str {
74        match self {
75            Self::System { id, .. }
76            | Self::User { id, .. }
77            | Self::Assistant { id, .. }
78            | Self::Tool { id, .. } => id,
79        }
80    }
81
82    pub fn session_id(&self) -> &str {
83        match self {
84            Self::System { session_id, .. }
85            | Self::User { session_id, .. }
86            | Self::Assistant { session_id, .. }
87            | Self::Tool { session_id, .. } => session_id,
88        }
89    }
90
91    pub fn role(&self) -> Role {
92        match self {
93            Self::System { .. } => Role::System,
94            Self::User { .. } => Role::User,
95            Self::Assistant { .. } => Role::Assistant,
96            Self::Tool { .. } => Role::Tool,
97        }
98    }
99
100    pub fn timestamp(&self) -> DateTime<Utc> {
101        match self {
102            Self::System { timestamp, .. }
103            | Self::User { timestamp, .. }
104            | Self::Assistant { timestamp, .. }
105            | Self::Tool { timestamp, .. } => *timestamp,
106        }
107    }
108
109    pub fn options(&self) -> &ProviderOptions {
110        match self {
111            Self::System { options, .. }
112            | Self::User { options, .. }
113            | Self::Assistant { options, .. }
114            | Self::Tool { options, .. } => options,
115        }
116    }
117
118    pub fn system_content(&self) -> Option<&str> {
119        match self {
120            // Two layers of `as_deref`: the outer `Option<Extracted<String>>`
121            // becomes `Option<&Extracted<String>>`, then `Extracted: Deref`
122            // unwraps to `&str`.
123            Self::System { content, .. } => content.as_deref().map(|e| &**e),
124            Self::User { .. } | Self::Assistant { .. } | Self::Tool { .. } => None,
125        }
126    }
127}
128
129#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
130#[serde(rename_all = "snake_case")]
131pub enum Role {
132    System,
133    User,
134    Assistant,
135    Tool,
136}
137
138impl Role {
139    pub fn as_str(self) -> &'static str {
140        match self {
141            Self::System => "system",
142            Self::User => "user",
143            Self::Assistant => "assistant",
144            Self::Tool => "tool",
145        }
146    }
147}
148
149/// Whether a Part's content is conversation or harness-injected scaffolding
150/// (spec.md#model-part-provenance). No `Default` and no `#[serde(default)]` on the
151/// `Part.provenance` field below: constructing a Part without classifying it
152/// MUST be a compile error (spec.md#adapter-provenance-required).
153#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
154#[serde(rename_all = "snake_case")]
155pub enum Provenance {
156    Conversational,
157    Injected,
158}
159
160impl Provenance {
161    pub fn as_str(self) -> &'static str {
162        match self {
163            Self::Conversational => "conversational",
164            Self::Injected => "injected",
165        }
166    }
167}
168
169#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
170pub struct Part {
171    pub session_id: String,
172    pub id: String,
173    pub message_id: String,
174    pub ordinal: i32,
175    /// Conversation vs harness-injected (spec.md#model-part-provenance). Mandatory,
176    /// no serde default - search reads it to exclude injected scaffolding.
177    pub provenance: Provenance,
178    #[serde(default)]
179    pub options: ProviderOptions,
180    #[serde(flatten)]
181    pub kind: PartKind,
182}
183
184#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
185#[serde(tag = "type", rename_all = "snake_case")]
186pub enum PartKind {
187    Text {
188        /// `None` when the source row had no text field. The seal on
189        /// `Extracted<String>` means adapters CANNOT pass a synthesized
190        /// empty string or any other placeholder here - the value either
191        /// flows from a `Source` extraction or the field is `None`.
192        #[serde(default, skip_serializing_if = "Option::is_none")]
193        text: Option<Extracted<String>>,
194    },
195    Reasoning {
196        /// `None` when the source row had no reasoning text. Type-system
197        /// guard against `unwrap_or_default()`-style fallbacks: the
198        /// `Extracted<String>` seal forces the adapter to either get the
199        /// value from a `Source` or admit it is absent.
200        #[serde(default, skip_serializing_if = "Option::is_none")]
201        text: Option<Extracted<String>>,
202    },
203    File {
204        media_type: String,
205        #[serde(skip_serializing_if = "Option::is_none")]
206        file_name: Option<String>,
207        data: FileData,
208    },
209    ToolCall {
210        /// `None` when the source carried no call_id (rare; malformed).
211        /// Sealed via `Extracted<String>` - empty-string sentinels are
212        /// not constructable from adapter code.
213        #[serde(default, skip_serializing_if = "Option::is_none")]
214        call_id: Option<Extracted<String>>,
215        /// `None` when the source carried no tool name. claude-code
216        /// always carries it on `tool_use` rows; codex-cli sometimes
217        /// has placeholder shapes. The seal makes synthesized names
218        /// unconstructable from adapter code (spec.md#model-no-synthesis).
219        #[serde(default, skip_serializing_if = "Option::is_none")]
220        name: Option<Extracted<String>>,
221        params: Value,
222        provider_executed: bool,
223    },
224    ToolResult {
225        /// `None` when the source carried no `tool_use_id` link.
226        #[serde(default, skip_serializing_if = "Option::is_none")]
227        call_id: Option<Extracted<String>>,
228        /// `None` when the adapter could not resolve the tool name.
229        /// In claude-code, name lives only on the prior `tool_use` row;
230        /// the adapter resolves via a per-file `tool_use_id -> name`
231        /// map and surfaces a miss (e.g. compaction pruned the originating
232        /// call) as `None`, never as a fabricated string
233        /// (spec.md#model-no-synthesis).
234        #[serde(default, skip_serializing_if = "Option::is_none")]
235        name: Option<Extracted<String>>,
236        is_failure: bool,
237        result: Value,
238    },
239    ToolApprovalRequest {
240        approval_id: String,
241        tool_call_id: String,
242    },
243    ToolApprovalResponse {
244        approval_id: String,
245        approved: bool,
246        #[serde(skip_serializing_if = "Option::is_none")]
247        reason: Option<String>,
248    },
249}
250
251impl PartKind {
252    pub fn type_name(&self) -> &'static str {
253        match self {
254            Self::Text { .. } => "text",
255            Self::Reasoning { .. } => "reasoning",
256            Self::File { .. } => "file",
257            Self::ToolCall { .. } => "tool_call",
258            Self::ToolResult { .. } => "tool_result",
259            Self::ToolApprovalRequest { .. } => "tool_approval_request",
260            Self::ToolApprovalResponse { .. } => "tool_approval_response",
261        }
262    }
263}
264
265#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
266#[serde(tag = "kind", content = "value", rename_all = "snake_case")]
267pub enum FileData {
268    String(String),
269    Bytes(Vec<u8>),
270    Url(String),
271}
272
273#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
274#[serde(rename_all = "snake_case")]
275pub enum ErrorCode {
276    ValidationFailed,
277    VersionUnsupported,
278    NotFound,
279    NamespaceUnknown,
280    StorageUnavailable,
281    Conflict,
282    Internal,
283}
284
285#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
286pub struct ErrorBody {
287    pub code: ErrorCode,
288    pub message: String,
289    #[serde(default)]
290    pub details: Value,
291}
292
293#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
294pub struct ErrorEnvelope {
295    pub error: ErrorBody,
296}
297
298// The success/error size gap is fine here: a `GetEnvelope` is one per-request
299// return value, serialized immediately - never stored in bulk where the gap
300// would waste memory.
301#[allow(clippy::large_enum_variant)]
302#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
303#[serde(untagged)]
304pub enum GetEnvelope {
305    Success(GetResponse),
306    Error(ErrorEnvelope),
307}
308
309#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
310pub struct GetRequest {
311    pub protocol_version: u16,
312    #[serde(default)]
313    pub namespace: Option<String>,
314    // Mutually exclusive scopes.
315    #[serde(default)]
316    pub session_id: Option<String>,
317    #[serde(default)]
318    pub message_id: Option<String>,
319    /// `message_id` mode only: symmetric window radius, mirroring `grep -C`.
320    /// Returns up to `2*context_depth` siblings around the target. Ignored in
321    /// session mode.
322    #[serde(default)]
323    pub context_depth: usize,
324    #[serde(default = "default_get_limit")]
325    pub limit: usize,
326    /// Ignored in `message_id` mode (that mode always returns the target with
327    /// its full parts, paginated).
328    #[serde(default)]
329    pub response_mode: ResponseMode,
330    /// Exclusive continuation anchor: last `message_id` in session mode, last
331    /// `part_id` in message mode. The append-only invariant means one id is
332    /// enough state - no opaque cursor.
333    #[serde(default)]
334    pub after_id: Option<String>,
335}
336
337/// How much of each message `pond_get` materializes (spec.md#protocol).
338/// Ignored when `message_id` is set.
339#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
340#[serde(rename_all = "snake_case")]
341pub enum ResponseMode {
342    /// `search_text IS NOT NULL` conversational view + part summaries.
343    #[default]
344    Conversational,
345    /// All messages (including carriers) + part summaries.
346    Complete,
347    /// All messages + full parts inline (heaviest mode).
348    Verbatim,
349}
350
351/// The session header is always present; `result` carries the mode-specific
352/// payload, discriminated by a `scope` tag (spec.md#protocol). Flattened so a
353/// client reads `session` / `scope` / payload fields off one object - no
354/// `session.session` nesting.
355#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
356pub struct GetResponse {
357    pub session: GetSession,
358    #[serde(flatten)]
359    pub result: GetResult,
360}
361
362/// Trimmed session header (spec.md#protocol): adapter-redundant `options`,
363/// parent pointers (served by `restore_lineage`), and per-message session id
364/// dropped to keep `pond_get` responses lean for agent context windows.
365#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
366pub struct GetSession {
367    pub id: String,
368    pub source_agent: String,
369    pub project: String,
370    pub created_at: DateTime<Utc>,
371}
372
373impl GetSession {
374    pub fn from_session(session: &Session) -> Self {
375        Self {
376            id: session.id.clone(),
377            source_agent: session.source_agent.clone(),
378            project: (*session.project).clone(),
379            created_at: session.created_at,
380        }
381    }
382}
383
384/// Per-message view in a `pond_get` response. `parts_summary` is always
385/// present (possibly empty); `parts` is populated only in Verbatim mode.
386#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
387pub struct MessageView {
388    pub id: String,
389    pub role: Role,
390    pub timestamp: DateTime<Utc>,
391    /// Conversational text (`search_text`); absent for carrier rows.
392    #[serde(default, skip_serializing_if = "Option::is_none")]
393    pub text: Option<String>,
394    /// System-message content string, when the source carried one.
395    #[serde(default, skip_serializing_if = "Option::is_none")]
396    pub content: Option<String>,
397    #[serde(default, skip_serializing_if = "Vec::is_empty")]
398    pub parts_summary: Vec<PartSummary>,
399    #[serde(default, skip_serializing_if = "Option::is_none")]
400    pub parts: Option<Vec<ResponsePart>>,
401}
402
403/// Compact per-part descriptor (spec.md#protocol): enough to tell what a
404/// message carries without paying for full content. `call_id` is populated
405/// for `tool_call` / `tool_result` only.
406#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
407pub struct PartSummary {
408    pub kind: String,
409    #[serde(default, skip_serializing_if = "Option::is_none")]
410    pub label: Option<String>,
411    #[serde(default, skip_serializing_if = "Option::is_none")]
412    pub call_id: Option<String>,
413}
414
415impl PartSummary {
416    /// Project a canonical [`PartKind`] into its compact response descriptor, or
417    /// `None` for a kind that does not earn a summary. Exhaustive on purpose - a
418    /// new `PartKind` variant must decide here. `call_id` is carried for
419    /// `tool_call` / `tool_result` only.
420    ///
421    /// `text` and `reasoning` return `None`: a text part's content already rides
422    /// the message's `text`/`content` (a summary would duplicate it), and
423    /// reasoning is deliberately not surfaced by default (its full body is still
424    /// reachable in verbatim mode). The kinds that survive are exactly
425    /// [`SUMMARY_PART_TYPES`].
426    pub fn for_kind(kind: &PartKind) -> Option<Self> {
427        let (label, call_id) = match kind {
428            PartKind::Text { .. } | PartKind::Reasoning { .. } => return None,
429            PartKind::File {
430                media_type,
431                file_name,
432                ..
433            } => (
434                Some(file_name.clone().unwrap_or_else(|| media_type.clone())),
435                None,
436            ),
437            PartKind::ToolCall { name, call_id, .. } => {
438                (name.as_deref().cloned(), call_id.as_deref().cloned())
439            }
440            PartKind::ToolResult {
441                name,
442                call_id,
443                is_failure,
444                ..
445            } => {
446                let label = name.as_deref().map(|name| {
447                    if *is_failure {
448                        format!("{name} (failed)")
449                    } else {
450                        name.clone()
451                    }
452                });
453                (label, call_id.as_deref().cloned())
454            }
455            PartKind::ToolApprovalRequest { approval_id, .. } => (Some(approval_id.clone()), None),
456            PartKind::ToolApprovalResponse {
457                approval_id,
458                approved,
459                ..
460            } => {
461                let verb = if *approved { "approved" } else { "denied" };
462                (Some(format!("{approval_id} ({verb})")), None)
463            }
464        };
465        Some(Self {
466            kind: kind.type_name().to_owned(),
467            label,
468            call_id,
469        })
470    }
471}
472
473/// Canonical part `type` names that yield a [`PartSummary`] - every kind except
474/// `text` and `reasoning` (see [`PartSummary::for_kind`], the source of truth).
475/// The summary read paths filter the parts scan to these so a text/reasoning
476/// heavy session never loads parts that would summarize to nothing.
477pub const SUMMARY_PART_TYPES: &[&str] = &[
478    "file",
479    "tool_call",
480    "tool_result",
481    "tool_approval_request",
482    "tool_approval_response",
483];
484
485/// A `Part` as it rides a `pond_get` response (spec.md#protocol): the canonical
486/// part minus `session_id` / `message_id`, which the enclosing session and
487/// message already identify. Built from a canonical [`Part`] in the handler.
488#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
489pub struct ResponsePart {
490    pub id: String,
491    pub ordinal: i32,
492    pub provenance: Provenance,
493    #[serde(default, skip_serializing_if = "ProviderOptions::is_empty")]
494    pub options: ProviderOptions,
495    #[serde(flatten)]
496    pub kind: PartKind,
497}
498
499impl ResponsePart {
500    pub fn from_part(part: Part) -> Self {
501        Self {
502            id: part.id,
503            ordinal: part.ordinal,
504            provenance: part.provenance,
505            options: part.options,
506            kind: part.kind,
507        }
508    }
509}
510
511/// Mode-specific `pond_get` payload, tagged by `scope` and flattened into
512/// `GetResponse` alongside the shared session header.
513#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
514#[serde(tag = "scope", rename_all = "snake_case")]
515pub enum GetResult {
516    Session {
517        messages: Vec<MessageView>,
518        messages_remaining: usize,
519    },
520    Message {
521        target: MessageView,
522        target_parts: Vec<ResponsePart>,
523        target_parts_remaining: usize,
524        /// Up to `2*context_depth` messages around the target (target excluded).
525        siblings: Vec<MessageView>,
526    },
527}
528
529#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
530#[serde(untagged)]
531pub enum SearchEnvelope {
532    Success(SearchResponse),
533    Error(ErrorEnvelope),
534}
535
536/// JSON shape is externally tagged: `{"contains": "pond"}` or
537/// `{"regex": "^/Users/.*"}`.
538#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
539#[serde(rename_all = "snake_case")]
540pub enum ProjectFilter {
541    Contains(String),
542    Regex(String),
543}
544
545#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
546pub struct SearchRequest {
547    pub protocol_version: u16,
548    #[serde(default)]
549    pub namespace: Option<String>,
550    pub query: String,
551    // Server normally decides between hybrid and FTS-only from the embedder +
552    // embeddings-coverage state (spec.md#search); `mode_override` is the
553    // operator-tooling escape hatch consumed by the `scripts/search-benchmarks/`
554    // harness. Production callers (MCP, HTTP agents) should leave it `None`.
555    #[serde(default, skip_serializing_if = "Option::is_none")]
556    pub mode_override: Option<SearchModeWire>,
557    /// When set, retrieve messages similar to this stored message - pond uses
558    /// the message's stored `vector` directly as the query, runs vector-only
559    /// kNN, and ignores `query` and the FTS arm. The stored vector was
560    /// derived from `search_text` (`spec.md#session-embed-from-canonical`), so the
561    /// signal is already filtered of harness-injected parts. Filters and
562    /// `limit` still apply.
563    #[serde(default, skip_serializing_if = "Option::is_none")]
564    pub similar_to: Option<String>,
565    #[serde(default)]
566    pub filters: SearchFilters,
567    #[serde(default = "default_limit")]
568    pub limit: usize,
569    #[serde(default, skip_serializing_if = "Option::is_none")]
570    pub cursor: Option<String>,
571}
572
573/// Wire-level retrieval mode override (spec.md#search). Not normally set on
574/// the wire - the server decides hybrid vs FTS-only from embedding
575/// availability. The variant exists so operator tooling (`pond search --mode`,
576/// the embeddings-benchmark harness) can force one arm without an env-var
577/// backdoor.
578#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
579#[serde(rename_all = "lowercase")]
580pub enum SearchModeWire {
581    Fts,
582    Vector,
583    Hybrid,
584}
585
586#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize)]
587pub struct SearchFilters {
588    #[serde(default, skip_serializing_if = "Option::is_none")]
589    pub project: Option<ProjectFilter>,
590    #[serde(default, skip_serializing_if = "Option::is_none")]
591    pub session_id: Option<String>,
592    #[serde(default, skip_serializing_if = "Option::is_none")]
593    pub source_agent: Option<String>,
594    #[serde(default, skip_serializing_if = "Option::is_none")]
595    pub from_date: Option<String>,
596    #[serde(default, skip_serializing_if = "Option::is_none")]
597    pub to_date: Option<String>,
598    #[serde(default, skip_serializing_if = "Option::is_none")]
599    pub role: Option<String>,
600    // Skip the default 0.0 so an unfiltered cursor/request stays compact.
601    #[serde(default, skip_serializing_if = "is_zero_f64")]
602    pub min_score: f64,
603}
604
605impl SearchFilters {
606    fn is_default(&self) -> bool {
607        *self == Self::default()
608    }
609}
610
611fn is_zero_f64(value: &f64) -> bool {
612    *value == 0.0
613}
614
615#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
616pub struct SearchResponse {
617    pub sessions: Vec<SearchSession>,
618    pub matched_total: usize,
619    pub has_more: bool,
620    #[serde(default, skip_serializing_if = "Option::is_none")]
621    pub next_cursor: Option<String>,
622}
623
624#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
625pub struct SearchSession {
626    pub session_id: String,
627    pub project: String,
628    pub source_agent: String,
629    pub session_messages_count: usize,
630    pub matched_message_count: usize,
631    pub matches: Vec<SearchResult>,
632}
633
634#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
635pub struct SearchResult {
636    pub message_id: String,
637    pub role: Role,
638    pub timestamp: DateTime<Utc>,
639    pub text: String,
640    pub score: f64,
641    /// Populated only for user-role hits: distinguishes a plain-text prompt
642    /// from one carrying file attachments or multi-part scaffolding.
643    #[serde(default, skip_serializing_if = "Vec::is_empty")]
644    pub parts_summary: Vec<PartSummary>,
645}
646
647#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
648pub struct SearchCursor {
649    pub query: String,
650    #[serde(default, skip_serializing_if = "Option::is_none")]
651    pub similar_to: Option<String>,
652    #[serde(default, skip_serializing_if = "SearchFilters::is_default")]
653    pub filters: SearchFilters,
654    pub offset: usize,
655}
656
657#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
658#[serde(untagged)]
659pub enum IngestEnvelope {
660    Success(IngestResponse),
661    Error(ErrorEnvelope),
662}
663
664#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
665pub struct IngestRequest {
666    pub protocol_version: u16,
667    #[serde(default)]
668    pub namespace: Option<String>,
669    pub events: Vec<crate::sessions::IngestEvent>,
670}
671
672/// `pond_ingest` response (spec.md#protocol). `accepted = inserted + matched`,
673/// `rejected = error`; both derived from `results`. Per-row `results[]` is
674/// the contract clients rely on to reconcile retries (the PK is echoed so
675/// the client can match outcomes back to its input even when `index` is not
676/// enough). Each result reports the input event's `index`, `kind`, `pk`,
677/// `status`, and an `error` body when `status = "error"`.
678#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
679pub struct IngestResponse {
680    pub accepted: usize,
681    pub rejected: usize,
682    pub results: Vec<IngestResult>,
683}
684
685/// One row of `pond_ingest` per-row output (spec.md#protocol).
686#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
687pub struct IngestResult {
688    /// Position in the request's `events` array (0-based).
689    pub index: usize,
690    /// `"session"` | `"message"` | `"part"`, matching `IngestEvent::kind`.
691    pub kind: String,
692    /// Echoed primary key: scalar for session, `[session_id, message_id]` for
693    /// message, `[session_id, message_id, part_id]` for part. Lets clients reconcile
694    /// against their own state on retry.
695    pub pk: Value,
696    pub status: IngestStatus,
697    /// Set only when `status = "error"`. Carries the same shape as the
698    /// envelope-level error body.
699    #[serde(default, skip_serializing_if = "Option::is_none")]
700    pub error: Option<ErrorBody>,
701}
702
703#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
704#[serde(rename_all = "snake_case")]
705pub enum IngestStatus {
706    /// New PK; `merge_insert` wrote a fresh row.
707    Inserted,
708    /// PK existed; `merge_insert` matched it (no-op per spec.md#adapter-integrity-additive-sync).
709    Matched,
710    /// Per-row failure: validation or storage error. See `error` field.
711    Error,
712}
713
714fn default_limit() -> usize {
715    10
716}
717
718pub fn new_request_id() -> String {
719    format!("req_{}", Uuid::now_v7())
720}
721
722pub const DEFAULT_NAMESPACE: &str = "local";
723
724pub fn default_namespace() -> String {
725    DEFAULT_NAMESPACE.to_owned()
726}
727
728fn default_get_limit() -> usize {
729    20
730}
731
732pub fn validate_protocol(version: u16) -> Result<(), ErrorEnvelope> {
733    if version == PROTOCOL_VERSION {
734        return Ok(());
735    }
736
737    Err(error(
738        ErrorCode::VersionUnsupported,
739        "unsupported protocol_version",
740        serde_json::json!({
741            "received": version,
742            "supported": [PROTOCOL_VERSION],
743        }),
744    ))
745}
746
747pub fn error(code: ErrorCode, message: impl Into<String>, details: Value) -> ErrorEnvelope {
748    ErrorEnvelope {
749        error: ErrorBody {
750            code,
751            message: message.into(),
752            details,
753        },
754    }
755}
756
757impl From<crate::Error> for ErrorEnvelope {
758    fn from(error_value: crate::Error) -> Self {
759        match error_value {
760            crate::Error::Validation {
761                message,
762                field,
763                value,
764                expected,
765            } => error(
766                ErrorCode::ValidationFailed,
767                message,
768                validation_details(field, value, expected),
769            ),
770            crate::Error::NotFound { message, kind, pk } => error(
771                ErrorCode::NotFound,
772                message,
773                serde_json::json!({ "kind": kind, "pk": pk }),
774            ),
775            crate::Error::NamespaceUnknown { namespace } => error(
776                ErrorCode::NamespaceUnknown,
777                "namespace unknown",
778                serde_json::json!({ "namespace": namespace }),
779            ),
780            crate::Error::Conflict { attempts } => error(
781                ErrorCode::Conflict,
782                "commit conflict after retries exhausted",
783                serde_json::json!({ "attempts": attempts }),
784            ),
785            crate::Error::Storage(error_value) => storage_error(error_value),
786            crate::Error::Internal(message) => {
787                error(ErrorCode::Internal, message, serde_json::json!({}))
788            }
789        }
790    }
791}
792
793fn validation_details(
794    field: Option<String>,
795    value: Option<Value>,
796    expected: Option<String>,
797) -> Value {
798    let mut details = Map::new();
799    if let Some(field) = field {
800        details.insert("field".to_owned(), Value::String(field));
801    }
802    if let Some(value) = value {
803        details.insert("value".to_owned(), value);
804    }
805    if let Some(expected) = expected {
806        details.insert("expected".to_owned(), Value::String(expected));
807    }
808    Value::Object(details)
809}
810
811pub fn storage_error(error_value: anyhow::Error) -> ErrorEnvelope {
812    error(
813        ErrorCode::StorageUnavailable,
814        "storage operation failed",
815        serde_json::json!({ "underlying": error_value.to_string() }),
816    )
817}
818
819#[cfg(test)]
820mod tests {
821    #![allow(clippy::expect_used, clippy::unwrap_used)]
822
823    use super::*;
824    use serde_json::json;
825
826    #[test]
827    fn wire_envelope_carries_conflict_code_and_attempts_detail() {
828        let envelope: ErrorEnvelope = crate::Error::Conflict { attempts: 3 }.into();
829        assert_eq!(envelope.error.code, ErrorCode::Conflict);
830        assert_eq!(envelope.error.details, json!({ "attempts": 3 }));
831    }
832}