Skip to main content

wavekat_platform_client/
sync.rs

1//! Platform sync endpoints — uniform "batch upload + cursor list" shape.
2//!
3//! Every client→platform sync (calls today; recordings, transcripts,
4//! summaries later) goes through the [`SyncEndpoint`] trait. Each
5//! resource is a zero-sized marker type (e.g. [`crate::voice::VoiceCalls`])
6//! that nails down:
7//!
8//! - the URL segment under `/api/voice/` (`RESOURCE`);
9//! - the wire-shape [`SyncEndpoint::Record`] type;
10//! - the typed [`SyncEndpoint::Query`] for GET pagination.
11//!
12//! [`Client::sync`] and [`Client::list`] are the only two methods you
13//! need on the consumer side — both are parameterised by the marker.
14//!
15//! See `wavekat-voice/docs/21-platform-call-history-sync.md` for the
16//! full design rationale and the wire contract.
17
18use serde::{de::DeserializeOwned, Deserialize, Serialize};
19use serde_json::Value as JsonValue;
20
21use crate::client::Client;
22use crate::error::Result;
23
24/// Wire-level envelope that every sync record carries.
25///
26/// Embedded in each `SyncEndpoint::Record` via `#[serde(flatten)]`
27/// so the two fields end up at the top of the JSON object alongside
28/// the resource-specific columns. Lets the version-skew story live
29/// in one place — not duplicated on every resource type.
30///
31/// **`schema_version`**: which wire shape the daemon wrote this row
32/// with. `None` on the wire means "I'm not declaring one; treat as
33/// `1`." `Client::sync` fills this in from
34/// [`SyncEndpoint::CURRENT_SCHEMA_VERSION`] when a consumer leaves
35/// it [`None`].
36///
37/// **`extras`**: free-form JSON map for fields the consumer's
38/// schema version recognises but the platform's doesn't yet. The
39/// platform persists `extras` verbatim so a future deploy can
40/// promote a field out of it into a typed column without data loss.
41/// The platform deliberately does *not* echo `extras` back on GET —
42/// it's an internal-storage construct, not a public field.
43///
44/// Both fields are optional in serialization so a row that ships
45/// neither stays on the small/fast path.
46#[derive(Debug, Clone, Default, Serialize, Deserialize)]
47#[serde(rename_all = "camelCase")]
48pub struct SyncEnvelope {
49    #[serde(default, skip_serializing_if = "Option::is_none")]
50    pub schema_version: Option<u32>,
51    #[serde(default, skip_serializing_if = "Option::is_none")]
52    pub extras: Option<JsonValue>,
53}
54
55impl SyncEnvelope {
56    /// Build an envelope stamped with the endpoint's
57    /// `CURRENT_SCHEMA_VERSION`. Useful for daemon-side code that
58    /// wants to construct a record with the version already filled
59    /// in — `Client::sync` will also fill it in lazily, but doing it
60    /// at construction time keeps tests and logs honest.
61    pub fn for_endpoint<E: SyncEndpoint + ?Sized>() -> Self {
62        Self {
63            schema_version: Some(E::CURRENT_SCHEMA_VERSION),
64            extras: None,
65        }
66    }
67}
68
69/// Records that carry a [`SyncEnvelope`] expose it via this trait so
70/// the bridge crate can stamp the `schemaVersion` field uniformly
71/// across resources. One-line impl per record type:
72///
73/// ```ignore
74/// impl HasSyncEnvelope for VoiceCallRecord {
75///     fn envelope_mut(&mut self) -> &mut SyncEnvelope { &mut self.envelope }
76/// }
77/// ```
78pub trait HasSyncEnvelope {
79    fn envelope_mut(&mut self) -> &mut SyncEnvelope;
80}
81
82/// Clone `items` and fill in `schemaVersion` on every record whose
83/// envelope left it unset. Records that supplied an explicit version
84/// are passed through unchanged — useful for tests and for the rare
85/// "deliberately ship an older version during a rollback" case.
86fn stamp_schema_version<E: SyncEndpoint>(items: &[E::Record]) -> Vec<E::Record>
87where
88    E::Record: Clone + HasSyncEnvelope,
89{
90    let mut out = items.to_vec();
91    for item in &mut out {
92        let env = item.envelope_mut();
93        if env.schema_version.is_none() {
94            env.schema_version = Some(E::CURRENT_SCHEMA_VERSION);
95        }
96    }
97    out
98}
99
100/// One sync-able platform resource.
101///
102/// Implemented by zero-sized marker types — you call methods like
103/// `client.sync::<VoiceCalls>(&items)` rather than constructing a
104/// `VoiceCalls` value.
105pub trait SyncEndpoint {
106    /// Path segment under `/api/voice/`. e.g. `"calls"`, `"recordings"`.
107    ///
108    /// Combined into the full paths
109    /// `POST /api/voice/{RESOURCE}/sync` and
110    /// `GET  /api/voice/{RESOURCE}`.
111    const RESOURCE: &'static str;
112
113    /// Current wire-schema version for this resource's `Record` type.
114    ///
115    /// Bumped when the meaning of an existing field changes (a rare,
116    /// deliberate event). Additive field changes don't require a
117    /// version bump — they ride on the additive-only policy
118    /// (see `wavekat-voice/docs/21-platform-call-history-sync.md`
119    /// §"Versioning and forward compatibility").
120    ///
121    /// Used by `Client::sync` so consumers don't manage the version
122    /// themselves — upgrading the bridge crate picks up the right
123    /// number automatically. Default is `1`.
124    const CURRENT_SCHEMA_VERSION: u32 = 1;
125
126    /// One row's worth of data. Must round-trip through JSON; the wire
127    /// shape uses camelCase per the platform's Hono/Zod convention
128    /// (apply `#[serde(rename_all = "camelCase")]` on your struct).
129    ///
130    /// Records must embed [`SyncEnvelope`] via
131    /// `#[serde(flatten)] pub envelope: SyncEnvelope` so the
132    /// `schemaVersion` + `extras` fields appear at the top of the
133    /// JSON object alongside the resource-specific columns. The
134    /// trait doesn't enforce this via an associated type because
135    /// `#[serde(flatten)]` is a serde attribute (not a Rust trait
136    /// bound), but every record type ships with the envelope and
137    /// `Client::sync` relies on the field name `schemaVersion`.
138    /// See `VoiceCallRecord` for the canonical shape.
139    type Record: Serialize + DeserializeOwned + Send + Sync + 'static;
140
141    /// Query params for `GET /api/voice/{RESOURCE}`. Typically a cursor
142    /// (`before` as RFC 3339) plus a `limit` and any resource-specific
143    /// filters (e.g. `account_id`). Serialized as URL query.
144    type Query: Serialize + Send + Sync;
145}
146
147/// Body shape for `POST /api/voice/{R}/sync`.
148///
149/// `items` is the batch. The server caps batches at 100 — chunking
150/// is the consumer's responsibility (the daemon's `Uploader<E>` does
151/// this automatically; ad-hoc callers should too).
152#[derive(Debug, Clone, Serialize, Deserialize)]
153#[serde(rename_all = "camelCase")]
154pub struct SyncRequest<R> {
155    pub items: Vec<R>,
156}
157
158/// Response from `POST /api/voice/{R}/sync`.
159///
160/// `accepted` counts rows the platform actually wrote (insert *or*
161/// idempotent update). `skipped` counts rows the platform deliberately
162/// ignored — reserved for future mutable resources where a stale
163/// revision should be dropped without erroring. Always 0 for the
164/// immutable calls/recordings/transcripts shipped today; consumers
165/// can ignore it for now and still be forward-compatible.
166#[derive(Debug, Clone, Serialize, Deserialize)]
167pub struct SyncResponse {
168    pub accepted: u32,
169    pub skipped: u32,
170}
171
172/// One page of `GET /api/voice/{R}`.
173///
174/// `items` is newest-first. `next_before` is the cursor for the next
175/// page (pass it back as the request's `before` field); absent/None
176/// means the caller has reached the start of history.
177#[derive(Debug, Clone, Serialize, Deserialize)]
178#[serde(rename_all = "camelCase")]
179pub struct Page<R> {
180    pub items: Vec<R>,
181    #[serde(default, skip_serializing_if = "Option::is_none")]
182    pub next_before: Option<String>,
183}
184
185impl Client {
186    /// `POST /api/voice/{E::RESOURCE}/sync` — idempotent batch upload.
187    ///
188    /// The platform upserts keyed by `(user_id, item.source_id)`, so
189    /// retries after a flaky connection are safe.
190    ///
191    /// **Batch size.** The platform rejects batches over 100 items with
192    /// HTTP 413. This method does *not* chunk for you — pass a slice
193    /// you're confident about, or use the daemon's `Uploader<E>` which
194    /// chunks at 50.
195    ///
196    /// **Schema version.** Records whose envelope leaves
197    /// `schemaVersion` unset (the common case — consumers don't need
198    /// to know the number) have it stamped with
199    /// [`SyncEndpoint::CURRENT_SCHEMA_VERSION`] before serialization,
200    /// so the platform always sees an explicit version. Records that
201    /// set it explicitly are passed through untouched.
202    pub async fn sync<E: SyncEndpoint>(&self, items: &[E::Record]) -> Result<SyncResponse>
203    where
204        E::Record: Clone + HasSyncEnvelope,
205    {
206        let path = format!("/api/voice/{}/sync", E::RESOURCE);
207        let body = SyncRequest {
208            items: stamp_schema_version::<E>(items),
209        };
210        self.post_json::<SyncResponse, _>(&path, &body).await
211    }
212
213    /// `GET /api/voice/{E::RESOURCE}` — one page of the caller's rows,
214    /// newest first, scoped server-side to the bearer's user.
215    pub async fn list<E: SyncEndpoint>(&self, query: &E::Query) -> Result<Page<E::Record>> {
216        let path = format!("/api/voice/{}", E::RESOURCE);
217        self.get_json_query::<Page<E::Record>, _>(&path, query)
218            .await
219    }
220}
221
222#[cfg(test)]
223mod tests {
224    use super::*;
225
226    // A minimal marker so the trait surface is exercised independently
227    // of any specific resource type.
228    struct DummyResource;
229
230    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
231    #[serde(rename_all = "camelCase")]
232    struct DummyRecord {
233        source_id: String,
234        payload: String,
235    }
236
237    #[derive(Debug, Default, Serialize)]
238    #[serde(rename_all = "camelCase")]
239    struct DummyQuery {
240        before: Option<String>,
241        limit: Option<u32>,
242    }
243
244    impl SyncEndpoint for DummyResource {
245        const RESOURCE: &'static str = "dummy";
246        type Record = DummyRecord;
247        type Query = DummyQuery;
248    }
249
250    #[test]
251    fn sync_request_serializes_with_items_field() {
252        let body = SyncRequest::<DummyRecord> {
253            items: vec![
254                DummyRecord {
255                    source_id: "a".into(),
256                    payload: "x".into(),
257                },
258                DummyRecord {
259                    source_id: "b".into(),
260                    payload: "y".into(),
261                },
262            ],
263        };
264        let s = serde_json::to_string(&body).unwrap();
265        assert!(s.contains("\"items\":["), "missing items envelope: {s}");
266        assert!(
267            s.contains("\"sourceId\":\"a\""),
268            "wire should be camelCase: {s}"
269        );
270    }
271
272    #[test]
273    fn sync_response_parses_platform_shape() {
274        let raw = r#"{"accepted": 3, "skipped": 0}"#;
275        let parsed: SyncResponse = serde_json::from_str(raw).unwrap();
276        assert_eq!(parsed.accepted, 3);
277        assert_eq!(parsed.skipped, 0);
278    }
279
280    #[test]
281    fn page_round_trip_without_cursor() {
282        // The wire either omits next_before or sends null when there's
283        // no more history. Both should parse to None.
284        let with_null = r#"{"items": [], "nextBefore": null}"#;
285        let omitted = r#"{"items": []}"#;
286        let p1: Page<DummyRecord> = serde_json::from_str(with_null).unwrap();
287        let p2: Page<DummyRecord> = serde_json::from_str(omitted).unwrap();
288        assert!(p1.next_before.is_none());
289        assert!(p2.next_before.is_none());
290    }
291
292    #[test]
293    fn resource_const_drives_path() {
294        // Sanity check — the trait constant is what ends up in the URL.
295        assert_eq!(<DummyResource as SyncEndpoint>::RESOURCE, "dummy");
296    }
297
298    // A record that carries the envelope via flatten — exactly the
299    // shape every real resource type adopts. Verifies the
300    // stamp-on-send behaviour without depending on `VoiceCalls`
301    // (which lives in a sibling module).
302    #[derive(Debug, Clone, Default, Serialize, Deserialize)]
303    #[serde(rename_all = "camelCase")]
304    struct DummyEnvelopedRecord {
305        source_id: String,
306        #[serde(flatten, default)]
307        envelope: SyncEnvelope,
308    }
309
310    impl HasSyncEnvelope for DummyEnvelopedRecord {
311        fn envelope_mut(&mut self) -> &mut SyncEnvelope {
312            &mut self.envelope
313        }
314    }
315
316    struct EnvelopedResource;
317    impl SyncEndpoint for EnvelopedResource {
318        const RESOURCE: &'static str = "enveloped";
319        const CURRENT_SCHEMA_VERSION: u32 = 7;
320        type Record = DummyEnvelopedRecord;
321        type Query = DummyQuery;
322    }
323
324    #[test]
325    fn stamp_schema_version_fills_in_when_missing() {
326        let items = vec![DummyEnvelopedRecord {
327            source_id: "a".into(),
328            envelope: SyncEnvelope::default(),
329        }];
330        let stamped = stamp_schema_version::<EnvelopedResource>(&items);
331        assert_eq!(stamped[0].envelope.schema_version, Some(7));
332    }
333
334    #[test]
335    fn stamp_schema_version_preserves_explicit_value() {
336        // A consumer that deliberately set a version (e.g. a rollback
337        // test) should pass through unchanged.
338        let items = vec![DummyEnvelopedRecord {
339            source_id: "a".into(),
340            envelope: SyncEnvelope {
341                schema_version: Some(2),
342                extras: None,
343            },
344        }];
345        let stamped = stamp_schema_version::<EnvelopedResource>(&items);
346        assert_eq!(stamped[0].envelope.schema_version, Some(2));
347    }
348
349    #[test]
350    fn for_endpoint_returns_envelope_with_current_version() {
351        let env = SyncEnvelope::for_endpoint::<EnvelopedResource>();
352        assert_eq!(env.schema_version, Some(7));
353        assert!(env.extras.is_none());
354    }
355}