Skip to main content

pond/
wire.rs

1use std::collections::BTreeMap;
2
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use serde_json::{Map, Value};
6use uuid::Uuid;
7
8use crate::PROTOCOL_VERSION;
9use crate::adapter::Extracted;
10
11pub type ProviderOptions = BTreeMap<String, Value>;
12
13#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
14pub struct Session {
15    pub id: String,
16    #[serde(skip_serializing_if = "Option::is_none")]
17    pub parent_session_id: Option<String>,
18    /// spec.md#model-parent-pointer-coherence: when set, `parent_session_id`
19    /// MUST also be set. Spawn-only sources (claude-code subagents,
20    /// nanoclaw) leave this `None`; fork-with-cut-point sources
21    /// (pi-coding-agent) populate both pointers together.
22    #[serde(skip_serializing_if = "Option::is_none")]
23    pub parent_message_id: Option<String>,
24    pub source_agent: String,
25    pub created_at: DateTime<Utc>,
26    pub project: Extracted<String>,
27    #[serde(default)]
28    pub options: ProviderOptions,
29}
30
31#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
32#[serde(tag = "role", rename_all = "snake_case")]
33pub enum Message {
34    System {
35        id: String,
36        session_id: String,
37        timestamp: DateTime<Utc>,
38        /// `None` when the source row carried no content. The seal on
39        /// `Extracted<String>` means adapters CANNOT pass a synthesized
40        /// or sentinel string here - the value either flows from a
41        /// `Source` extraction or the field is `None`. Distinguishes
42        /// "source said content=''" (Some(extracted_empty)) from
43        /// "source had no content field" (None).
44        #[serde(default, skip_serializing_if = "Option::is_none")]
45        content: Option<Extracted<String>>,
46        #[serde(default)]
47        options: ProviderOptions,
48    },
49    User {
50        id: String,
51        session_id: String,
52        timestamp: DateTime<Utc>,
53        #[serde(default)]
54        options: ProviderOptions,
55    },
56    Assistant {
57        id: String,
58        session_id: String,
59        timestamp: DateTime<Utc>,
60        #[serde(default)]
61        options: ProviderOptions,
62    },
63    Tool {
64        id: String,
65        session_id: String,
66        timestamp: DateTime<Utc>,
67        #[serde(default)]
68        options: ProviderOptions,
69    },
70}
71
72impl Message {
73    pub fn id(&self) -> &str {
74        match self {
75            Self::System { id, .. }
76            | Self::User { id, .. }
77            | Self::Assistant { id, .. }
78            | Self::Tool { id, .. } => id,
79        }
80    }
81
82    pub fn session_id(&self) -> &str {
83        match self {
84            Self::System { session_id, .. }
85            | Self::User { session_id, .. }
86            | Self::Assistant { session_id, .. }
87            | Self::Tool { session_id, .. } => session_id,
88        }
89    }
90
91    pub fn role(&self) -> Role {
92        match self {
93            Self::System { .. } => Role::System,
94            Self::User { .. } => Role::User,
95            Self::Assistant { .. } => Role::Assistant,
96            Self::Tool { .. } => Role::Tool,
97        }
98    }
99
100    pub fn timestamp(&self) -> DateTime<Utc> {
101        match self {
102            Self::System { timestamp, .. }
103            | Self::User { timestamp, .. }
104            | Self::Assistant { timestamp, .. }
105            | Self::Tool { timestamp, .. } => *timestamp,
106        }
107    }
108
109    pub fn options(&self) -> &ProviderOptions {
110        match self {
111            Self::System { options, .. }
112            | Self::User { options, .. }
113            | Self::Assistant { options, .. }
114            | Self::Tool { options, .. } => options,
115        }
116    }
117
118    pub fn options_mut(&mut self) -> &mut ProviderOptions {
119        match self {
120            Self::System { options, .. }
121            | Self::User { options, .. }
122            | Self::Assistant { options, .. }
123            | Self::Tool { options, .. } => options,
124        }
125    }
126
127    pub fn system_content(&self) -> Option<&str> {
128        match self {
129            // Two layers of `as_deref`: the outer `Option<Extracted<String>>`
130            // becomes `Option<&Extracted<String>>`, then `Extracted: Deref`
131            // unwraps to `&str`.
132            Self::System { content, .. } => content.as_deref().map(|e| &**e),
133            Self::User { .. } | Self::Assistant { .. } | Self::Tool { .. } => None,
134        }
135    }
136}
137
138#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
139#[serde(rename_all = "snake_case")]
140pub enum Role {
141    System,
142    User,
143    Assistant,
144    Tool,
145}
146
147impl Role {
148    pub fn as_str(self) -> &'static str {
149        match self {
150            Self::System => "system",
151            Self::User => "user",
152            Self::Assistant => "assistant",
153            Self::Tool => "tool",
154        }
155    }
156}
157
158/// Whether a Part's content is conversation or harness-injected scaffolding
159/// (spec.md#model-part-provenance). No `Default` and no `#[serde(default)]` on the
160/// `Part.provenance` field below: constructing a Part without classifying it
161/// MUST be a compile error (spec.md#adapter-provenance-required).
162#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
163#[serde(rename_all = "snake_case")]
164pub enum Provenance {
165    Conversational,
166    Injected,
167}
168
169impl Provenance {
170    pub fn as_str(self) -> &'static str {
171        match self {
172            Self::Conversational => "conversational",
173            Self::Injected => "injected",
174        }
175    }
176}
177
178#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
179pub struct Part {
180    pub session_id: String,
181    pub id: String,
182    pub message_id: String,
183    pub ordinal: i32,
184    /// Conversation vs harness-injected (spec.md#model-part-provenance). Mandatory,
185    /// no serde default - search reads it to exclude injected scaffolding.
186    pub provenance: Provenance,
187    #[serde(default)]
188    pub options: ProviderOptions,
189    #[serde(flatten)]
190    pub kind: PartKind,
191}
192
193#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
194#[serde(tag = "type", rename_all = "snake_case")]
195pub enum PartKind {
196    Text {
197        /// `None` when the source row had no text field. The seal on
198        /// `Extracted<String>` means adapters CANNOT pass a synthesized
199        /// empty string or any other placeholder here - the value either
200        /// flows from a `Source` extraction or the field is `None`.
201        #[serde(default, skip_serializing_if = "Option::is_none")]
202        text: Option<Extracted<String>>,
203    },
204    Reasoning {
205        /// `None` when the source row had no reasoning text. Type-system
206        /// guard against `unwrap_or_default()`-style fallbacks: the
207        /// `Extracted<String>` seal forces the adapter to either get the
208        /// value from a `Source` or admit it is absent.
209        #[serde(default, skip_serializing_if = "Option::is_none")]
210        text: Option<Extracted<String>>,
211    },
212    File {
213        /// `None` when the source row carried no MIME hint. Sealed against
214        /// `unwrap_or("application/octet-stream")`-style fallbacks: an absent
215        /// type is faithfully absent, not a synthesized default
216        /// (spec.md#model-no-synthesis).
217        #[serde(default, skip_serializing_if = "Option::is_none")]
218        media_type: Option<String>,
219        #[serde(skip_serializing_if = "Option::is_none")]
220        file_name: Option<String>,
221        data: FileData,
222    },
223    ToolCall {
224        /// `None` when the source carried no call_id (rare; malformed).
225        /// Sealed via `Extracted<String>` - empty-string sentinels are
226        /// not constructable from adapter code.
227        #[serde(default, skip_serializing_if = "Option::is_none")]
228        call_id: Option<Extracted<String>>,
229        /// `None` when the source carried no tool name. claude-code
230        /// always carries it on `tool_use` rows; codex-cli sometimes
231        /// has placeholder shapes. The seal makes synthesized names
232        /// unconstructable from adapter code (spec.md#model-no-synthesis).
233        #[serde(default, skip_serializing_if = "Option::is_none")]
234        name: Option<Extracted<String>>,
235        params: Value,
236        provider_executed: bool,
237    },
238    ToolResult {
239        /// `None` when the source carried no `tool_use_id` link.
240        #[serde(default, skip_serializing_if = "Option::is_none")]
241        call_id: Option<Extracted<String>>,
242        /// `None` when the adapter could not resolve the tool name.
243        /// In claude-code, name lives only on the prior `tool_use` row;
244        /// the adapter resolves via a per-file `tool_use_id -> name`
245        /// map and surfaces a miss (e.g. compaction pruned the originating
246        /// call) as `None`, never as a fabricated string
247        /// (spec.md#model-no-synthesis).
248        #[serde(default, skip_serializing_if = "Option::is_none")]
249        name: Option<Extracted<String>>,
250        is_failure: bool,
251        result: Value,
252    },
253    ToolApprovalRequest {
254        approval_id: String,
255        tool_call_id: String,
256    },
257    ToolApprovalResponse {
258        approval_id: String,
259        approved: bool,
260        #[serde(skip_serializing_if = "Option::is_none")]
261        reason: Option<String>,
262    },
263}
264
265impl PartKind {
266    pub fn type_name(&self) -> &'static str {
267        match self {
268            Self::Text { .. } => "text",
269            Self::Reasoning { .. } => "reasoning",
270            Self::File { .. } => "file",
271            Self::ToolCall { .. } => "tool_call",
272            Self::ToolResult { .. } => "tool_result",
273            Self::ToolApprovalRequest { .. } => "tool_approval_request",
274            Self::ToolApprovalResponse { .. } => "tool_approval_response",
275        }
276    }
277}
278
279#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
280#[serde(tag = "kind", content = "value", rename_all = "snake_case")]
281pub enum FileData {
282    String(String),
283    Bytes(Vec<u8>),
284    Url(String),
285}
286
287#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
288#[serde(rename_all = "snake_case")]
289pub enum ErrorCode {
290    ValidationFailed,
291    VersionUnsupported,
292    NotFound,
293    NamespaceUnknown,
294    StorageUnavailable,
295    Conflict,
296    Internal,
297}
298
299#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
300pub struct ErrorBody {
301    pub code: ErrorCode,
302    pub message: String,
303    #[serde(default)]
304    pub details: Value,
305}
306
307#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
308pub struct ErrorEnvelope {
309    pub error: ErrorBody,
310}
311
312// The success/error size gap is fine here: a `GetEnvelope` is one per-request
313// return value, serialized immediately - never stored in bulk where the gap
314// would waste memory.
315#[allow(clippy::large_enum_variant)]
316#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
317#[serde(untagged)]
318pub enum GetEnvelope {
319    Success(GetResponse),
320    Error(ErrorEnvelope),
321}
322
323#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
324pub struct GetRequest {
325    pub protocol_version: u16,
326    #[serde(default)]
327    pub namespace: Option<String>,
328    // Mutually exclusive scopes.
329    #[serde(default)]
330    pub session_id: Option<String>,
331    #[serde(default)]
332    pub message_id: Option<String>,
333    /// `message_id` mode only: symmetric window radius, mirroring `grep -C`.
334    /// Returns up to `2*context_depth` siblings around the target. Ignored in
335    /// session mode.
336    #[serde(default)]
337    pub context_depth: usize,
338    #[serde(default = "default_get_limit")]
339    pub limit: usize,
340    /// Session mode: how much of each message to materialize. Message mode:
341    /// which siblings fill the context window (conversational by default;
342    /// complete/verbatim include system/tool carriers). The target message
343    /// always returns its full parts regardless.
344    #[serde(default)]
345    pub response_mode: ResponseMode,
346    /// Exclusive continuation anchor: last `message_id` in session mode, last
347    /// `part_id` in message mode. The append-only invariant means one id is
348    /// enough state - no opaque cursor.
349    #[serde(default)]
350    pub after_id: Option<String>,
351    /// Session mode only: which end to read `limit` messages from - `start`
352    /// (oldest, default) or `end` (most recent). Results stay chronological;
353    /// ignored in message mode.
354    #[serde(default)]
355    pub session_from: SessionFrom,
356}
357
358/// How much of each message `pond_get` materializes (spec.md#protocol).
359/// Ignored when `message_id` is set.
360#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
361#[serde(rename_all = "snake_case")]
362pub enum ResponseMode {
363    /// `search_text IS NOT NULL` conversational view + part summaries.
364    #[default]
365    Conversational,
366    /// All messages (including carriers) + part summaries.
367    Complete,
368    /// All messages + full parts inline (heaviest mode).
369    Verbatim,
370}
371
372/// Which end of a session `pond_get` reads its page from (spec.md#protocol).
373#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
374#[serde(rename_all = "lowercase")]
375pub enum SessionFrom {
376    /// Oldest messages first (the session's start).
377    #[default]
378    Start,
379    /// Most recent messages (the session's tail), still chronological.
380    End,
381}
382
383/// The session header is always present; `result` carries the mode-specific
384/// payload, discriminated by a `scope` tag (spec.md#protocol). Flattened so a
385/// client reads `session` / `scope` / payload fields off one object - no
386/// `session.session` nesting.
387#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
388pub struct GetResponse {
389    pub session: GetSession,
390    #[serde(flatten)]
391    pub result: GetResult,
392}
393
394/// Trimmed session header (spec.md#protocol): adapter-redundant `options`,
395/// parent pointers (served by `restore_lineage`), and per-message session id
396/// dropped to keep `pond_get` responses lean for agent context windows.
397#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
398pub struct GetSession {
399    pub id: String,
400    pub source_agent: String,
401    pub project: String,
402    pub created_at: DateTime<Utc>,
403}
404
405impl GetSession {
406    pub fn from_session(session: &Session) -> Self {
407        Self {
408            id: session.id.clone(),
409            source_agent: session.source_agent.clone(),
410            project: (*session.project).clone(),
411            created_at: session.created_at,
412        }
413    }
414}
415
416/// Per-message view in a `pond_get` response. `parts_summary` is always
417/// present (possibly empty); `parts` is populated only in Verbatim mode.
418#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
419pub struct MessageView {
420    pub id: String,
421    pub role: Role,
422    pub timestamp: DateTime<Utc>,
423    /// Conversational text (`search_text`); absent for carrier rows.
424    #[serde(default, skip_serializing_if = "Option::is_none")]
425    pub text: Option<String>,
426    /// System-message content string, when the source carried one.
427    #[serde(default, skip_serializing_if = "Option::is_none")]
428    pub content: Option<String>,
429    #[serde(default, skip_serializing_if = "Vec::is_empty")]
430    pub parts_summary: Vec<PartSummary>,
431    #[serde(default, skip_serializing_if = "Option::is_none")]
432    pub parts: Option<Vec<ResponsePart>>,
433}
434
435/// Compact per-part descriptor (spec.md#protocol): enough to tell what a
436/// message carries without paying for full content. `call_id` is populated
437/// for `tool_call` / `tool_result` only.
438#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
439pub struct PartSummary {
440    pub kind: String,
441    #[serde(default, skip_serializing_if = "Option::is_none")]
442    pub label: Option<String>,
443    #[serde(default, skip_serializing_if = "Option::is_none")]
444    pub call_id: Option<String>,
445}
446
447impl PartSummary {
448    /// Project a canonical [`PartKind`] into its compact response descriptor, or
449    /// `None` for a kind that does not earn a summary. Exhaustive on purpose - a
450    /// new `PartKind` variant must decide here. `call_id` is carried for
451    /// `tool_call` / `tool_result` only.
452    ///
453    /// `text` and `reasoning` return `None`: a text part's content already rides
454    /// the message's `text`/`content` (a summary would duplicate it), and
455    /// reasoning is deliberately not surfaced by default (its full body is still
456    /// reachable in verbatim mode). The kinds that survive are exactly
457    /// [`SUMMARY_PART_TYPES`].
458    pub fn for_kind(kind: &PartKind) -> Option<Self> {
459        let (label, call_id) = match kind {
460            PartKind::Text { .. } | PartKind::Reasoning { .. } => return None,
461            PartKind::File {
462                media_type,
463                file_name,
464                ..
465            } => (file_name.clone().or_else(|| media_type.clone()), None),
466            PartKind::ToolCall { name, call_id, .. } => {
467                (name.as_deref().cloned(), call_id.as_deref().cloned())
468            }
469            PartKind::ToolResult {
470                name,
471                call_id,
472                is_failure,
473                ..
474            } => {
475                let label = name.as_deref().map(|name| {
476                    if *is_failure {
477                        format!("{name} (failed)")
478                    } else {
479                        name.clone()
480                    }
481                });
482                (label, call_id.as_deref().cloned())
483            }
484            PartKind::ToolApprovalRequest { approval_id, .. } => (Some(approval_id.clone()), None),
485            PartKind::ToolApprovalResponse {
486                approval_id,
487                approved,
488                ..
489            } => {
490                let verb = if *approved { "approved" } else { "denied" };
491                (Some(format!("{approval_id} ({verb})")), None)
492            }
493        };
494        Some(Self {
495            kind: kind.type_name().to_owned(),
496            label,
497            call_id,
498        })
499    }
500}
501
502/// Canonical part `type` names that yield a [`PartSummary`] - every kind except
503/// `text` and `reasoning` (see [`PartSummary::for_kind`], the source of truth).
504/// The summary read paths filter the parts scan to these so a text/reasoning
505/// heavy session never loads parts that would summarize to nothing.
506pub const SUMMARY_PART_TYPES: &[&str] = &[
507    "file",
508    "tool_call",
509    "tool_result",
510    "tool_approval_request",
511    "tool_approval_response",
512];
513
514/// A `Part` as it rides a `pond_get` response (spec.md#protocol): the canonical
515/// part minus `session_id` / `message_id`, which the enclosing session and
516/// message already identify. Built from a canonical [`Part`] in the handler.
517#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
518pub struct ResponsePart {
519    pub id: String,
520    pub ordinal: i32,
521    pub provenance: Provenance,
522    #[serde(default, skip_serializing_if = "ProviderOptions::is_empty")]
523    pub options: ProviderOptions,
524    #[serde(flatten)]
525    pub kind: PartKind,
526}
527
528impl ResponsePart {
529    pub fn from_part(part: Part) -> Self {
530        Self {
531            id: part.id,
532            ordinal: part.ordinal,
533            provenance: part.provenance,
534            options: part.options,
535            kind: part.kind,
536        }
537    }
538}
539
540/// Mode-specific `pond_get` payload, tagged by `scope` and flattened into
541/// `GetResponse` alongside the shared session header.
542#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
543#[serde(tag = "scope", rename_all = "snake_case")]
544pub enum GetResult {
545    Session {
546        messages: Vec<MessageView>,
547        messages_remaining: usize,
548    },
549    Message {
550        target: MessageView,
551        target_parts: Vec<ResponsePart>,
552        target_parts_remaining: usize,
553        /// Up to `2*context_depth` messages around the target (target excluded).
554        siblings: Vec<MessageView>,
555    },
556}
557
558#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
559#[serde(untagged)]
560pub enum SearchEnvelope {
561    Success(SearchResponse),
562    Error(ErrorEnvelope),
563}
564
565/// JSON shape is externally tagged: `{"contains": "pond"}` or
566/// `{"regex": "^/Users/.*"}`.
567#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
568#[serde(rename_all = "snake_case")]
569pub enum ProjectFilter {
570    Contains(String),
571    Regex(String),
572}
573
574#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
575pub struct SearchRequest {
576    pub protocol_version: u16,
577    #[serde(default)]
578    pub namespace: Option<String>,
579    pub query: String,
580    // Server normally decides between hybrid and FTS-only from the embedder +
581    // embeddings-coverage state (spec.md#search); `mode_override` is the
582    // operator-tooling escape hatch. Production callers (MCP, HTTP agents)
583    // should leave it `None`.
584    #[serde(default, skip_serializing_if = "Option::is_none")]
585    pub mode_override: Option<SearchModeWire>,
586    #[serde(default)]
587    pub filters: SearchFilters,
588    #[serde(default = "default_limit")]
589    pub limit: usize,
590}
591
592/// Wire-level retrieval mode override (spec.md#search). Not normally set on
593/// the wire - the server decides hybrid vs FTS-only from embedding
594/// availability. The variant exists so operator tooling (`pond search --mode`,
595/// the embeddings-benchmark harness) can force one arm without an env-var
596/// backdoor.
597#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
598#[serde(rename_all = "lowercase")]
599pub enum SearchModeWire {
600    Fts,
601    Vector,
602    Hybrid,
603}
604
605#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize)]
606pub struct SearchFilters {
607    #[serde(default, skip_serializing_if = "Option::is_none")]
608    pub project: Option<ProjectFilter>,
609    #[serde(default, skip_serializing_if = "Option::is_none")]
610    pub session_id: Option<String>,
611    #[serde(default, skip_serializing_if = "Option::is_none")]
612    pub source_agent: Option<String>,
613    #[serde(default, skip_serializing_if = "Option::is_none")]
614    pub from_date: Option<String>,
615    #[serde(default, skip_serializing_if = "Option::is_none")]
616    pub to_date: Option<String>,
617    /// Score floor; hits below it are dropped. Not an absence signal: present
618    /// and absent content score in overlapping bands (see
619    /// `docs/researches/embeddings.md`), so the default stays 0 and the
620    /// response's `searchable_in_scope` carries the honesty instead.
621    // Skip the default 0.0 so an unfiltered request stays compact.
622    #[serde(default, skip_serializing_if = "is_zero_f64")]
623    pub min_score: f64,
624    /// Include subagent sessions (`source_agent` like `claude-code/<name>`).
625    /// Default false: a search targets the human-facing main sessions.
626    #[serde(default, skip_serializing_if = "is_false")]
627    pub include_subagents: bool,
628}
629
630fn is_false(value: &bool) -> bool {
631    !*value
632}
633
634fn is_zero_f64(value: &f64) -> bool {
635    *value == 0.0
636}
637
638#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
639pub struct SearchResponse {
640    pub sessions: Vec<SearchSession>,
641    pub matched_total: usize,
642    /// How many messages with conversational text the caller's filters left
643    /// in scope - the universe the search actually ran over. The absence
644    /// signal: 0 means the filters excluded everything before retrieval, and
645    /// a small value warns that "no relevant hits" covers a thin slice.
646    #[serde(default)]
647    pub searchable_in_scope: usize,
648    pub has_more: bool,
649}
650
651#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
652pub struct SearchSession {
653    pub session_id: String,
654    pub project: String,
655    pub source_agent: String,
656    pub session_messages_count: usize,
657    pub matched_message_count: usize,
658    pub matches: Vec<SearchResult>,
659}
660
661#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
662pub struct SearchResult {
663    pub message_id: String,
664    pub role: Role,
665    pub timestamp: DateTime<Utc>,
666    pub text: String,
667    pub score: f64,
668    /// Populated only for user-role hits: distinguishes a plain-text prompt
669    /// from one carrying file attachments or multi-part scaffolding.
670    #[serde(default, skip_serializing_if = "Vec::is_empty")]
671    pub parts_summary: Vec<PartSummary>,
672}
673
674#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
675#[serde(untagged)]
676pub enum IngestEnvelope {
677    Success(IngestResponse),
678    Error(ErrorEnvelope),
679}
680
681#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
682pub struct IngestRequest {
683    pub protocol_version: u16,
684    #[serde(default)]
685    pub namespace: Option<String>,
686    pub events: Vec<crate::sessions::IngestEvent>,
687}
688
689/// `pond_ingest` response (spec.md#protocol). `accepted = inserted + matched`,
690/// `rejected = error`; both derived from `results`. Per-row `results[]` is
691/// the contract clients rely on to reconcile retries (the PK is echoed so
692/// the client can match outcomes back to its input even when `index` is not
693/// enough). Each result reports the input event's `index`, `kind`, `pk`,
694/// `status`, and an `error` body when `status = "error"`.
695#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
696pub struct IngestResponse {
697    pub accepted: usize,
698    pub rejected: usize,
699    pub results: Vec<IngestResult>,
700}
701
702/// One row of `pond_ingest` per-row output (spec.md#protocol).
703#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
704pub struct IngestResult {
705    /// Position in the request's `events` array (0-based).
706    pub index: usize,
707    /// `"session"` | `"message"` | `"part"`, matching `IngestEvent::kind`.
708    pub kind: String,
709    /// Echoed primary key: scalar for session, `[session_id, message_id]` for
710    /// message, `[session_id, message_id, part_id]` for part. Lets clients reconcile
711    /// against their own state on retry.
712    pub pk: Value,
713    pub status: IngestStatus,
714    /// Set only when `status = "error"`. Carries the same shape as the
715    /// envelope-level error body.
716    #[serde(default, skip_serializing_if = "Option::is_none")]
717    pub error: Option<ErrorBody>,
718}
719
720#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
721#[serde(rename_all = "snake_case")]
722pub enum IngestStatus {
723    /// New PK; `merge_insert` wrote a fresh row.
724    Inserted,
725    /// PK existed; `merge_insert` matched it (no-op per spec.md#adapter-integrity-additive-sync).
726    Matched,
727    /// Per-row failure: validation or storage error. See `error` field.
728    Error,
729}
730
731fn default_limit() -> usize {
732    10
733}
734
735pub fn new_request_id() -> String {
736    format!("req_{}", Uuid::now_v7())
737}
738
739pub const DEFAULT_NAMESPACE: &str = "local";
740
741pub fn default_namespace() -> String {
742    DEFAULT_NAMESPACE.to_owned()
743}
744
745fn default_get_limit() -> usize {
746    20
747}
748
749pub fn validate_protocol(version: u16) -> Result<(), ErrorEnvelope> {
750    if version == PROTOCOL_VERSION {
751        return Ok(());
752    }
753
754    Err(error(
755        ErrorCode::VersionUnsupported,
756        "unsupported protocol_version",
757        serde_json::json!({
758            "received": version,
759            "supported": [PROTOCOL_VERSION],
760        }),
761    ))
762}
763
764pub fn error(code: ErrorCode, message: impl Into<String>, details: Value) -> ErrorEnvelope {
765    ErrorEnvelope {
766        error: ErrorBody {
767            code,
768            message: message.into(),
769            details,
770        },
771    }
772}
773
774impl From<crate::Error> for ErrorEnvelope {
775    fn from(error_value: crate::Error) -> Self {
776        match error_value {
777            crate::Error::Validation {
778                message,
779                field,
780                value,
781                expected,
782            } => error(
783                ErrorCode::ValidationFailed,
784                message,
785                validation_details(field, value, expected),
786            ),
787            crate::Error::NotFound { message, kind, pk } => error(
788                ErrorCode::NotFound,
789                message,
790                serde_json::json!({ "kind": kind, "pk": pk }),
791            ),
792            crate::Error::NamespaceUnknown { namespace } => error(
793                ErrorCode::NamespaceUnknown,
794                "namespace unknown",
795                serde_json::json!({ "namespace": namespace }),
796            ),
797            crate::Error::Conflict { attempts } => error(
798                ErrorCode::Conflict,
799                "commit conflict after retries exhausted",
800                serde_json::json!({ "attempts": attempts }),
801            ),
802            crate::Error::Storage(error_value) => storage_error(error_value),
803            crate::Error::Internal(message) => {
804                error(ErrorCode::Internal, message, serde_json::json!({}))
805            }
806        }
807    }
808}
809
810fn validation_details(
811    field: Option<String>,
812    value: Option<Value>,
813    expected: Option<String>,
814) -> Value {
815    let mut details = Map::new();
816    if let Some(field) = field {
817        details.insert("field".to_owned(), Value::String(field));
818    }
819    if let Some(value) = value {
820        details.insert("value".to_owned(), value);
821    }
822    if let Some(expected) = expected {
823        details.insert("expected".to_owned(), Value::String(expected));
824    }
825    Value::Object(details)
826}
827
828pub fn storage_error(error_value: anyhow::Error) -> ErrorEnvelope {
829    error(
830        ErrorCode::StorageUnavailable,
831        "storage operation failed",
832        serde_json::json!({ "underlying": error_value.to_string() }),
833    )
834}
835
836#[cfg(test)]
837mod tests {
838    #![allow(clippy::expect_used, clippy::unwrap_used)]
839
840    use super::*;
841    use serde_json::json;
842
843    #[test]
844    fn wire_envelope_carries_conflict_code_and_attempts_detail() {
845        let envelope: ErrorEnvelope = crate::Error::Conflict { attempts: 3 }.into();
846        assert_eq!(envelope.error.code, ErrorCode::Conflict);
847        assert_eq!(envelope.error.details, json!({ "attempts": 3 }));
848    }
849}