wavekat_platform_client/sync.rs
1//! Platform sync endpoints — uniform "batch upload + cursor list" shape.
2//!
3//! Every client→platform sync (calls today; recordings, transcripts,
4//! summaries later) goes through the [`SyncEndpoint`] trait. Each
5//! resource is a zero-sized marker type (e.g. [`crate::voice::VoiceCalls`])
6//! that nails down:
7//!
8//! - the URL segment under `/api/voice/` (`RESOURCE`);
9//! - the wire-shape [`SyncEndpoint::Record`] type;
10//! - the typed [`SyncEndpoint::Query`] for GET pagination.
11//!
12//! [`Client::sync`] and [`Client::list`] are the only two methods you
13//! need on the consumer side — both are parameterised by the marker.
14//!
15//! See `wavekat-voice/docs/21-platform-call-history-sync.md` for the
16//! full design rationale and the wire contract.
17
18use serde::{de::DeserializeOwned, Deserialize, Serialize};
19use serde_json::Value as JsonValue;
20
21use crate::client::Client;
22use crate::error::Result;
23
24/// Wire-level envelope that every sync record carries.
25///
26/// Embedded in each `SyncEndpoint::Record` via `#[serde(flatten)]`
27/// so the two fields end up at the top of the JSON object alongside
28/// the resource-specific columns. Lets the version-skew story live
29/// in one place — not duplicated on every resource type.
30///
31/// **`schema_version`**: which wire shape the daemon wrote this row
32/// with. `None` on the wire means "I'm not declaring one; treat as
33/// `1`." `Client::sync` fills this in from
34/// [`SyncEndpoint::CURRENT_SCHEMA_VERSION`] when a consumer leaves
35/// it [`None`].
36///
37/// **`extras`**: free-form JSON map for fields the consumer's
38/// schema version recognises but the platform's doesn't yet. The
39/// platform persists `extras` verbatim so a future deploy can
40/// promote a field out of it into a typed column without data loss.
41/// The platform deliberately does *not* echo `extras` back on GET —
42/// it's an internal-storage construct, not a public field.
43///
44/// Both fields are optional in serialization so a row that ships
45/// neither stays on the small/fast path.
46#[derive(Debug, Clone, Default, Serialize, Deserialize)]
47#[serde(rename_all = "camelCase")]
48pub struct SyncEnvelope {
49 #[serde(default, skip_serializing_if = "Option::is_none")]
50 pub schema_version: Option<u32>,
51 #[serde(default, skip_serializing_if = "Option::is_none")]
52 pub extras: Option<JsonValue>,
53}
54
55impl SyncEnvelope {
56 /// Build an envelope stamped with the endpoint's
57 /// `CURRENT_SCHEMA_VERSION`. Useful for daemon-side code that
58 /// wants to construct a record with the version already filled
59 /// in — `Client::sync` will also fill it in lazily, but doing it
60 /// at construction time keeps tests and logs honest.
61 pub fn for_endpoint<E: SyncEndpoint + ?Sized>() -> Self {
62 Self {
63 schema_version: Some(E::CURRENT_SCHEMA_VERSION),
64 extras: None,
65 }
66 }
67}
68
69/// Records that carry a [`SyncEnvelope`] expose it via this trait so
70/// the bridge crate can stamp the `schemaVersion` field uniformly
71/// across resources. One-line impl per record type:
72///
73/// ```ignore
74/// impl HasSyncEnvelope for VoiceCallRecord {
75/// fn envelope_mut(&mut self) -> &mut SyncEnvelope { &mut self.envelope }
76/// }
77/// ```
78pub trait HasSyncEnvelope {
79 fn envelope_mut(&mut self) -> &mut SyncEnvelope;
80}
81
82/// Clone `items` and fill in `schemaVersion` on every record whose
83/// envelope left it unset. Records that supplied an explicit version
84/// are passed through unchanged — useful for tests and for the rare
85/// "deliberately ship an older version during a rollback" case.
86fn stamp_schema_version<E: SyncEndpoint>(items: &[E::Record]) -> Vec<E::Record>
87where
88 E::Record: Clone + HasSyncEnvelope,
89{
90 let mut out = items.to_vec();
91 for item in &mut out {
92 let env = item.envelope_mut();
93 if env.schema_version.is_none() {
94 env.schema_version = Some(E::CURRENT_SCHEMA_VERSION);
95 }
96 }
97 out
98}
99
100/// One sync-able platform resource.
101///
102/// Implemented by zero-sized marker types — you call methods like
103/// `client.sync::<VoiceCalls>(&items)` rather than constructing a
104/// `VoiceCalls` value.
105pub trait SyncEndpoint {
106 /// Path segment under `/api/voice/`. e.g. `"calls"`, `"recordings"`.
107 ///
108 /// Combined into the full paths
109 /// `POST /api/voice/{RESOURCE}/sync` and
110 /// `GET /api/voice/{RESOURCE}`.
111 const RESOURCE: &'static str;
112
113 /// Current wire-schema version for this resource's `Record` type.
114 ///
115 /// Bumped when the meaning of an existing field changes (a rare,
116 /// deliberate event). Additive field changes don't require a
117 /// version bump — they ride on the additive-only policy
118 /// (see `wavekat-voice/docs/21-platform-call-history-sync.md`
119 /// §"Versioning and forward compatibility").
120 ///
121 /// Used by `Client::sync` so consumers don't manage the version
122 /// themselves — upgrading the bridge crate picks up the right
123 /// number automatically. Default is `1`.
124 const CURRENT_SCHEMA_VERSION: u32 = 1;
125
126 /// One row's worth of data. Must round-trip through JSON; the wire
127 /// shape uses camelCase per the platform's Hono/Zod convention
128 /// (apply `#[serde(rename_all = "camelCase")]` on your struct).
129 ///
130 /// Records must embed [`SyncEnvelope`] via
131 /// `#[serde(flatten)] pub envelope: SyncEnvelope` so the
132 /// `schemaVersion` + `extras` fields appear at the top of the
133 /// JSON object alongside the resource-specific columns. The
134 /// trait doesn't enforce this via an associated type because
135 /// `#[serde(flatten)]` is a serde attribute (not a Rust trait
136 /// bound), but every record type ships with the envelope and
137 /// `Client::sync` relies on the field name `schemaVersion`.
138 /// See `VoiceCallRecord` for the canonical shape.
139 type Record: Serialize + DeserializeOwned + Send + Sync + 'static;
140
141 /// Query params for `GET /api/voice/{RESOURCE}`. Typically a cursor
142 /// (`before` as RFC 3339) plus a `limit` and any resource-specific
143 /// filters (e.g. `account_id`). Serialized as URL query.
144 type Query: Serialize + Send + Sync;
145}
146
147/// Body shape for `POST /api/voice/{R}/sync`.
148///
149/// `items` is the batch. The server caps batches at 100 — chunking
150/// is the consumer's responsibility (the daemon's `Uploader<E>` does
151/// this automatically; ad-hoc callers should too).
152#[derive(Debug, Clone, Serialize, Deserialize)]
153#[serde(rename_all = "camelCase")]
154pub struct SyncRequest<R> {
155 pub items: Vec<R>,
156}
157
158/// Response from `POST /api/voice/{R}/sync`.
159///
160/// `accepted` counts rows the platform actually wrote (insert *or*
161/// idempotent update). `skipped` counts rows the platform deliberately
162/// ignored — reserved for future mutable resources where a stale
163/// revision should be dropped without erroring. Always 0 for the
164/// immutable calls/recordings/transcripts shipped today; consumers
165/// can ignore it for now and still be forward-compatible.
166#[derive(Debug, Clone, Serialize, Deserialize)]
167pub struct SyncResponse {
168 pub accepted: u32,
169 pub skipped: u32,
170}
171
172/// One page of `GET /api/voice/{R}`.
173///
174/// `items` is newest-first. `next_before` is the cursor for the next
175/// page (pass it back as the request's `before` field); absent/None
176/// means the caller has reached the start of history.
177#[derive(Debug, Clone, Serialize, Deserialize)]
178#[serde(rename_all = "camelCase")]
179pub struct Page<R> {
180 pub items: Vec<R>,
181 #[serde(default, skip_serializing_if = "Option::is_none")]
182 pub next_before: Option<String>,
183}
184
185impl Client {
186 /// `POST /api/voice/{E::RESOURCE}/sync` — idempotent batch upload.
187 ///
188 /// The platform upserts keyed by `(user_id, item.source_id)`, so
189 /// retries after a flaky connection are safe.
190 ///
191 /// **Batch size.** The platform rejects batches over 100 items with
192 /// HTTP 413. This method does *not* chunk for you — pass a slice
193 /// you're confident about, or use the daemon's `Uploader<E>` which
194 /// chunks at 50.
195 ///
196 /// **Schema version.** Records whose envelope leaves
197 /// `schemaVersion` unset (the common case — consumers don't need
198 /// to know the number) have it stamped with
199 /// [`SyncEndpoint::CURRENT_SCHEMA_VERSION`] before serialization,
200 /// so the platform always sees an explicit version. Records that
201 /// set it explicitly are passed through untouched.
202 pub async fn sync<E: SyncEndpoint>(&self, items: &[E::Record]) -> Result<SyncResponse>
203 where
204 E::Record: Clone + HasSyncEnvelope,
205 {
206 let path = format!("/api/voice/{}/sync", E::RESOURCE);
207 let body = SyncRequest {
208 items: stamp_schema_version::<E>(items),
209 };
210 self.post_json::<SyncResponse, _>(&path, &body).await
211 }
212
213 /// `GET /api/voice/{E::RESOURCE}` — one page of the caller's rows,
214 /// newest first, scoped server-side to the bearer's user.
215 pub async fn list<E: SyncEndpoint>(&self, query: &E::Query) -> Result<Page<E::Record>> {
216 let path = format!("/api/voice/{}", E::RESOURCE);
217 self.get_json_query::<Page<E::Record>, _>(&path, query)
218 .await
219 }
220}
221
222#[cfg(test)]
223mod tests {
224 use super::*;
225
226 // A minimal marker so the trait surface is exercised independently
227 // of any specific resource type.
228 struct DummyResource;
229
230 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
231 #[serde(rename_all = "camelCase")]
232 struct DummyRecord {
233 source_id: String,
234 payload: String,
235 }
236
237 #[derive(Debug, Default, Serialize)]
238 #[serde(rename_all = "camelCase")]
239 struct DummyQuery {
240 before: Option<String>,
241 limit: Option<u32>,
242 }
243
244 impl SyncEndpoint for DummyResource {
245 const RESOURCE: &'static str = "dummy";
246 type Record = DummyRecord;
247 type Query = DummyQuery;
248 }
249
250 #[test]
251 fn sync_request_serializes_with_items_field() {
252 let body = SyncRequest::<DummyRecord> {
253 items: vec![
254 DummyRecord {
255 source_id: "a".into(),
256 payload: "x".into(),
257 },
258 DummyRecord {
259 source_id: "b".into(),
260 payload: "y".into(),
261 },
262 ],
263 };
264 let s = serde_json::to_string(&body).unwrap();
265 assert!(s.contains("\"items\":["), "missing items envelope: {s}");
266 assert!(
267 s.contains("\"sourceId\":\"a\""),
268 "wire should be camelCase: {s}"
269 );
270 }
271
272 #[test]
273 fn sync_response_parses_platform_shape() {
274 let raw = r#"{"accepted": 3, "skipped": 0}"#;
275 let parsed: SyncResponse = serde_json::from_str(raw).unwrap();
276 assert_eq!(parsed.accepted, 3);
277 assert_eq!(parsed.skipped, 0);
278 }
279
280 #[test]
281 fn page_round_trip_without_cursor() {
282 // The wire either omits next_before or sends null when there's
283 // no more history. Both should parse to None.
284 let with_null = r#"{"items": [], "nextBefore": null}"#;
285 let omitted = r#"{"items": []}"#;
286 let p1: Page<DummyRecord> = serde_json::from_str(with_null).unwrap();
287 let p2: Page<DummyRecord> = serde_json::from_str(omitted).unwrap();
288 assert!(p1.next_before.is_none());
289 assert!(p2.next_before.is_none());
290 }
291
292 #[test]
293 fn resource_const_drives_path() {
294 // Sanity check — the trait constant is what ends up in the URL.
295 assert_eq!(<DummyResource as SyncEndpoint>::RESOURCE, "dummy");
296 }
297
298 // A record that carries the envelope via flatten — exactly the
299 // shape every real resource type adopts. Verifies the
300 // stamp-on-send behaviour without depending on `VoiceCalls`
301 // (which lives in a sibling module).
302 #[derive(Debug, Clone, Default, Serialize, Deserialize)]
303 #[serde(rename_all = "camelCase")]
304 struct DummyEnvelopedRecord {
305 source_id: String,
306 #[serde(flatten, default)]
307 envelope: SyncEnvelope,
308 }
309
310 impl HasSyncEnvelope for DummyEnvelopedRecord {
311 fn envelope_mut(&mut self) -> &mut SyncEnvelope {
312 &mut self.envelope
313 }
314 }
315
316 struct EnvelopedResource;
317 impl SyncEndpoint for EnvelopedResource {
318 const RESOURCE: &'static str = "enveloped";
319 const CURRENT_SCHEMA_VERSION: u32 = 7;
320 type Record = DummyEnvelopedRecord;
321 type Query = DummyQuery;
322 }
323
324 #[test]
325 fn stamp_schema_version_fills_in_when_missing() {
326 let items = vec![DummyEnvelopedRecord {
327 source_id: "a".into(),
328 envelope: SyncEnvelope::default(),
329 }];
330 let stamped = stamp_schema_version::<EnvelopedResource>(&items);
331 assert_eq!(stamped[0].envelope.schema_version, Some(7));
332 }
333
334 #[test]
335 fn stamp_schema_version_preserves_explicit_value() {
336 // A consumer that deliberately set a version (e.g. a rollback
337 // test) should pass through unchanged.
338 let items = vec![DummyEnvelopedRecord {
339 source_id: "a".into(),
340 envelope: SyncEnvelope {
341 schema_version: Some(2),
342 extras: None,
343 },
344 }];
345 let stamped = stamp_schema_version::<EnvelopedResource>(&items);
346 assert_eq!(stamped[0].envelope.schema_version, Some(2));
347 }
348
349 #[test]
350 fn for_endpoint_returns_envelope_with_current_version() {
351 let env = SyncEnvelope::for_endpoint::<EnvelopedResource>();
352 assert_eq!(env.schema_version, Some(7));
353 assert!(env.extras.is_none());
354 }
355}