Skip to main content

myko/wire/
query.rs

1use std::sync::Arc;
2
3#[cfg(not(target_arch = "wasm32"))]
4use hyphae::MapDiff;
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use ts_rs::TS;
8
9use super::{item::ErasedWrappedItem, shared::value_with_tx};
10use crate::core::{
11    item::AnyItem,
12    query::{QueryId, QueryItemType},
13};
14
15#[derive(Debug, Clone, Serialize)]
16#[serde(rename_all = "camelCase")]
17pub struct QueryResponse {
18    #[serde(default, skip_serializing_if = "Vec::is_empty")]
19    pub changes: Vec<QueryChange>,
20
21    pub deletes: Vec<Arc<str>>,
22
23    pub upserts: Vec<ErasedWrappedItem>,
24
25    pub sequence: u64,
26
27    pub tx: Arc<str>,
28
29    #[serde(default, skip_serializing_if = "Option::is_none")]
30    pub total_count: Option<usize>,
31
32    #[serde(default, skip_serializing_if = "Option::is_none")]
33    pub window: Option<QueryWindow>,
34}
35
36/// Custom Deserialize for QueryResponse: deserializes using Value-based items,
37/// then parses each through the item registry to produce Arc<dyn AnyItem>.
38/// Falls back to a no-op AnyItem wrapper when running on the client side (WASM)
39/// or when the item type is unknown.
40impl<'de> Deserialize<'de> for QueryResponse {
41    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
42    where
43        D: serde::Deserializer<'de>,
44    {
45        let raw = ClientQueryResponse::deserialize(deserializer)?;
46        Ok(QueryResponse {
47            changes: raw
48                .changes
49                .into_iter()
50                .map(|c| match c {
51                    ClientQueryChange::Upsert { item } => QueryChange::Upsert {
52                        item: ErasedWrappedItem {
53                            item: Arc::new(ValueItem {
54                                value: item.item,
55                                item_type: item.item_type.clone(),
56                            }),
57                            item_type: item.item_type,
58                        },
59                    },
60                    ClientQueryChange::Delete { id } => QueryChange::Delete { id },
61                    ClientQueryChange::WindowOrder {
62                        ids,
63                        total_count,
64                        window,
65                    } => QueryChange::WindowOrder {
66                        ids,
67                        total_count,
68                        window,
69                    },
70                })
71                .collect(),
72            deletes: raw.deletes,
73            upserts: raw
74                .upserts
75                .into_iter()
76                .map(|item| ErasedWrappedItem {
77                    item: Arc::new(ValueItem {
78                        value: item.item,
79                        item_type: item.item_type.clone(),
80                    }),
81                    item_type: item.item_type,
82                })
83                .collect(),
84            sequence: raw.sequence,
85            tx: raw.tx,
86            total_count: raw.total_count,
87            window: raw.window,
88        })
89    }
90}
91
92/// Thin AnyItem wrapper around a serde_json::Value, used when deserializing
93/// QueryResponse on the client side where concrete entity types are unavailable.
94#[derive(Debug, Clone)]
95struct ValueItem {
96    value: Value,
97    item_type: Arc<str>,
98}
99
100impl Serialize for ValueItem {
101    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
102        self.value.serialize(serializer)
103    }
104}
105
106impl crate::common::with_id::WithId for ValueItem {
107    fn id(&self) -> Arc<str> {
108        self.value
109            .get("id")
110            .and_then(|v| v.as_str())
111            .unwrap_or("")
112            .into()
113    }
114}
115
116impl AnyItem for ValueItem {
117    fn as_any(&self) -> &dyn std::any::Any {
118        self
119    }
120    fn entity_type(&self) -> &'static str {
121        // NOTE(ts): leak is acceptable here because entity types are a fixed set
122        Box::leak(self.item_type.to_string().into_boxed_str())
123    }
124    fn equals(&self, other: &dyn AnyItem) -> bool {
125        other
126            .as_any()
127            .downcast_ref::<Self>()
128            .map(|typed| self.value == typed.value)
129            .unwrap_or(false)
130    }
131}
132
133#[derive(Debug, Clone, Serialize)]
134#[serde(tag = "kind", rename_all = "camelCase")]
135pub enum QueryChange {
136    Upsert {
137        item: ErasedWrappedItem,
138    },
139    Delete {
140        id: Arc<str>,
141    },
142    WindowOrder {
143        ids: Vec<Arc<str>>,
144        total_count: usize,
145        #[serde(default, skip_serializing_if = "Option::is_none")]
146        window: Option<QueryWindow>,
147    },
148}
149
150pub struct QueryResult<T> {
151    pub deletes: Vec<String>,
152    pub upserts: Vec<T>,
153    pub sequence: u64,
154    pub tx: String,
155}
156
157impl<T> QueryResult<T> {
158    pub fn new(tx: String, upserts: Vec<T>) -> QueryResult<T> {
159        QueryResult {
160            deletes: vec![],
161            upserts,
162            sequence: 0,
163            tx,
164        }
165    }
166}
167
168impl QueryResponse {
169    pub fn new(tx: Arc<str>, _result: Vec<Value>) -> QueryResponse {
170        QueryResponse {
171            changes: vec![],
172            sequence: 0,
173            upserts: vec![],
174            deletes: vec![],
175            tx,
176            total_count: None,
177            window: None,
178        }
179    }
180
181    /// Create a QueryResponse from a MapDiff.
182    #[cfg(not(target_arch = "wasm32"))]
183    pub fn from_diff(
184        diff: &MapDiff<Arc<str>, Arc<dyn AnyItem>>,
185        tx: Arc<str>,
186        sequence: u64,
187    ) -> QueryResponse {
188        fn push_change(
189            diff: &MapDiff<Arc<str>, Arc<dyn AnyItem>>,
190            upserts: &mut Vec<ErasedWrappedItem>,
191            deletes: &mut Vec<Arc<str>>,
192            changes: &mut Vec<QueryChange>,
193        ) {
194            match diff {
195                MapDiff::Initial { entries } => {
196                    for (_, item) in entries {
197                        let wrapped = ErasedWrappedItem {
198                            item: item.clone(),
199                            item_type: item.entity_type().into(),
200                        };
201                        changes.push(QueryChange::Upsert {
202                            item: wrapped.clone(),
203                        });
204                        upserts.push(wrapped);
205                    }
206                }
207                MapDiff::Insert { key: _, value } => {
208                    let wrapped = ErasedWrappedItem {
209                        item: value.clone(),
210                        item_type: value.entity_type().into(),
211                    };
212                    changes.push(QueryChange::Upsert {
213                        item: wrapped.clone(),
214                    });
215                    upserts.push(wrapped);
216                }
217                MapDiff::Update {
218                    key: _,
219                    old_value: _,
220                    new_value,
221                } => {
222                    let wrapped = ErasedWrappedItem {
223                        item: new_value.clone(),
224                        item_type: new_value.entity_type().into(),
225                    };
226                    changes.push(QueryChange::Upsert {
227                        item: wrapped.clone(),
228                    });
229                    upserts.push(wrapped);
230                }
231                MapDiff::Remove { key, old_value: _ } => {
232                    deletes.push(key.clone());
233                    changes.push(QueryChange::Delete { id: key.clone() });
234                }
235                MapDiff::Batch { changes: batch } => {
236                    for change in batch {
237                        push_change(change, upserts, deletes, changes);
238                    }
239                }
240            }
241        }
242
243        match diff {
244            MapDiff::Initial { entries } => {
245                let upserts: Vec<ErasedWrappedItem> = entries
246                    .iter()
247                    .map(|(_, item)| ErasedWrappedItem {
248                        item: item.clone(),
249                        item_type: item.entity_type().into(),
250                    })
251                    .collect();
252                let changes = upserts
253                    .iter()
254                    .cloned()
255                    .map(|item| QueryChange::Upsert { item })
256                    .collect();
257                QueryResponse {
258                    tx,
259                    sequence,
260                    changes,
261                    upserts,
262                    deletes: vec![],
263                    total_count: None,
264                    window: None,
265                }
266            }
267            MapDiff::Insert { key: _, value } => {
268                let upserts = vec![ErasedWrappedItem {
269                    item: value.clone(),
270                    item_type: value.entity_type().into(),
271                }];
272                let changes = upserts
273                    .iter()
274                    .cloned()
275                    .map(|item| QueryChange::Upsert { item })
276                    .collect();
277                QueryResponse {
278                    tx,
279                    sequence,
280                    changes,
281                    upserts,
282                    deletes: vec![],
283                    total_count: None,
284                    window: None,
285                }
286            }
287            MapDiff::Update {
288                key: _,
289                old_value: _,
290                new_value,
291            } => {
292                let upserts = vec![ErasedWrappedItem {
293                    item: new_value.clone(),
294                    item_type: new_value.entity_type().into(),
295                }];
296                let changes = upserts
297                    .iter()
298                    .cloned()
299                    .map(|item| QueryChange::Upsert { item })
300                    .collect();
301                QueryResponse {
302                    tx,
303                    sequence,
304                    changes,
305                    upserts,
306                    deletes: vec![],
307                    total_count: None,
308                    window: None,
309                }
310            }
311            MapDiff::Remove { key, old_value: _ } => QueryResponse {
312                tx,
313                sequence,
314                changes: vec![QueryChange::Delete { id: key.clone() }],
315                upserts: vec![],
316                deletes: vec![key.clone()],
317                total_count: None,
318                window: None,
319            },
320            MapDiff::Batch { .. } => {
321                let mut upserts = Vec::new();
322                let mut deletes = Vec::new();
323                let mut changes = Vec::new();
324                push_change(diff, &mut upserts, &mut deletes, &mut changes);
325                QueryResponse {
326                    tx,
327                    sequence,
328                    changes,
329                    upserts,
330                    deletes,
331                    total_count: None,
332                    window: None,
333                }
334            }
335        }
336    }
337
338    pub fn to_string(&self) -> Result<String, serde_json::Error> {
339        serde_json::to_string(self)
340    }
341}
342
343impl QueryResponse {
344    pub fn get_tx(&self) -> Arc<str> {
345        self.tx.clone()
346    }
347}
348
349#[derive(Debug, Clone, Serialize, Deserialize, TS)]
350#[serde(rename_all = "camelCase")]
351pub struct QueryWindow {
352    pub offset: usize,
353    pub limit: usize,
354}
355
356#[derive(Debug, Clone, Serialize, Deserialize, TS)]
357#[serde(rename_all = "camelCase")]
358pub struct QueryWindowUpdate {
359    pub tx: String,
360    #[serde(default, skip_serializing_if = "Option::is_none")]
361    pub window: Option<QueryWindow>,
362}
363
364#[derive(Debug, Clone, Serialize, Deserialize, TS)]
365#[serde(rename_all = "camelCase")]
366pub struct WrappedQuery {
367    pub query: Value,
368    pub query_id: Arc<str>,
369    pub query_item_type: Arc<str>,
370    #[serde(default, skip_serializing_if = "Option::is_none")]
371    pub window: Option<QueryWindow>,
372}
373
374#[derive(Debug, Clone, Serialize, Deserialize, TS)]
375#[serde(rename_all = "camelCase")]
376pub struct QueryError {
377    pub tx: String,
378    pub query_id: String,
379    pub message: String,
380}
381
382// ─────────────────────────────────────────────────────────────────────────────
383// Client-side (inbound) deserialization types
384// ─────────────────────────────────────────────────────────────────────────────
385
386/// Client-side query response for deserialization. Uses WrappedItem (JSON values)
387/// instead of the server-side ErasedWrappedItem (type-erased AnyItem).
388#[derive(Debug, Clone, Deserialize)]
389#[serde(rename_all = "camelCase")]
390pub struct ClientQueryResponse {
391    #[serde(default)]
392    pub changes: Vec<ClientQueryChange>,
393
394    pub deletes: Vec<Arc<str>>,
395
396    pub upserts: Vec<super::item::WrappedItem>,
397
398    pub sequence: u64,
399
400    pub tx: Arc<str>,
401
402    #[serde(default)]
403    pub total_count: Option<usize>,
404
405    #[serde(default)]
406    pub window: Option<QueryWindow>,
407}
408
409#[derive(Debug, Clone, Deserialize)]
410#[serde(tag = "kind", rename_all = "camelCase")]
411pub enum ClientQueryChange {
412    Upsert {
413        item: super::item::WrappedItem,
414    },
415    Delete {
416        id: Arc<str>,
417    },
418    WindowOrder {
419        ids: Vec<Arc<str>>,
420        total_count: usize,
421        #[serde(default)]
422        window: Option<QueryWindow>,
423    },
424}
425
426pub fn wrap_query<Q: QueryId + QueryItemType + Serialize + Clone>(
427    tx: Arc<str>,
428    query: &Q,
429) -> Result<WrappedQuery, serde_json::Error> {
430    Ok(WrappedQuery {
431        query: value_with_tx(tx, query)?,
432        query_id: query.query_id(),
433        query_item_type: query.query_item_type(),
434        window: None,
435    })
436}