1use std::sync::Arc;
2
3#[cfg(not(target_arch = "wasm32"))]
4use hyphae::MapDiff;
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use ts_rs::TS;
8
9use super::{item::ErasedWrappedItem, shared::value_with_tx};
10use crate::core::{
11 item::AnyItem,
12 query::{QueryId, QueryItemType},
13};
14
15#[derive(Debug, Clone, Serialize)]
16#[serde(rename_all = "camelCase")]
17pub struct QueryResponse {
18 #[serde(default, skip_serializing_if = "Vec::is_empty")]
19 pub changes: Vec<QueryChange>,
20
21 pub deletes: Vec<Arc<str>>,
22
23 pub upserts: Vec<ErasedWrappedItem>,
24
25 pub sequence: u64,
26
27 pub tx: Arc<str>,
28
29 #[serde(default, skip_serializing_if = "Option::is_none")]
30 pub total_count: Option<usize>,
31
32 #[serde(default, skip_serializing_if = "Option::is_none")]
33 pub window: Option<QueryWindow>,
34}
35
36impl<'de> Deserialize<'de> for QueryResponse {
41 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
42 where
43 D: serde::Deserializer<'de>,
44 {
45 let raw = ClientQueryResponse::deserialize(deserializer)?;
46 Ok(QueryResponse {
47 changes: raw
48 .changes
49 .into_iter()
50 .map(|c| match c {
51 ClientQueryChange::Upsert { item } => QueryChange::Upsert {
52 item: ErasedWrappedItem {
53 item: Arc::new(ValueItem {
54 value: item.item,
55 item_type: item.item_type.clone(),
56 }),
57 item_type: item.item_type,
58 },
59 },
60 ClientQueryChange::Delete { id } => QueryChange::Delete { id },
61 ClientQueryChange::WindowOrder {
62 ids,
63 total_count,
64 window,
65 } => QueryChange::WindowOrder {
66 ids,
67 total_count,
68 window,
69 },
70 })
71 .collect(),
72 deletes: raw.deletes,
73 upserts: raw
74 .upserts
75 .into_iter()
76 .map(|item| ErasedWrappedItem {
77 item: Arc::new(ValueItem {
78 value: item.item,
79 item_type: item.item_type.clone(),
80 }),
81 item_type: item.item_type,
82 })
83 .collect(),
84 sequence: raw.sequence,
85 tx: raw.tx,
86 total_count: raw.total_count,
87 window: raw.window,
88 })
89 }
90}
91
92#[derive(Debug, Clone)]
95struct ValueItem {
96 value: Value,
97 item_type: Arc<str>,
98}
99
100impl Serialize for ValueItem {
101 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
102 self.value.serialize(serializer)
103 }
104}
105
106impl crate::common::with_id::WithId for ValueItem {
107 fn id(&self) -> Arc<str> {
108 self.value
109 .get("id")
110 .and_then(|v| v.as_str())
111 .unwrap_or("")
112 .into()
113 }
114}
115
116impl AnyItem for ValueItem {
117 fn as_any(&self) -> &dyn std::any::Any {
118 self
119 }
120 fn entity_type(&self) -> &'static str {
121 Box::leak(self.item_type.to_string().into_boxed_str())
123 }
124 fn equals(&self, other: &dyn AnyItem) -> bool {
125 other
126 .as_any()
127 .downcast_ref::<Self>()
128 .map(|typed| self.value == typed.value)
129 .unwrap_or(false)
130 }
131}
132
133#[derive(Debug, Clone, Serialize)]
134#[serde(tag = "kind", rename_all = "camelCase")]
135pub enum QueryChange {
136 Upsert {
137 item: ErasedWrappedItem,
138 },
139 Delete {
140 id: Arc<str>,
141 },
142 WindowOrder {
143 ids: Vec<Arc<str>>,
144 total_count: usize,
145 #[serde(default, skip_serializing_if = "Option::is_none")]
146 window: Option<QueryWindow>,
147 },
148}
149
150pub struct QueryResult<T> {
151 pub deletes: Vec<String>,
152 pub upserts: Vec<T>,
153 pub sequence: u64,
154 pub tx: String,
155}
156
157impl<T> QueryResult<T> {
158 pub fn new(tx: String, upserts: Vec<T>) -> QueryResult<T> {
159 QueryResult {
160 deletes: vec![],
161 upserts,
162 sequence: 0,
163 tx,
164 }
165 }
166}
167
168impl QueryResponse {
169 pub fn new(tx: Arc<str>, _result: Vec<Value>) -> QueryResponse {
170 QueryResponse {
171 changes: vec![],
172 sequence: 0,
173 upserts: vec![],
174 deletes: vec![],
175 tx,
176 total_count: None,
177 window: None,
178 }
179 }
180
181 #[cfg(not(target_arch = "wasm32"))]
183 pub fn from_diff(
184 diff: &MapDiff<Arc<str>, Arc<dyn AnyItem>>,
185 tx: Arc<str>,
186 sequence: u64,
187 ) -> QueryResponse {
188 fn push_change(
189 diff: &MapDiff<Arc<str>, Arc<dyn AnyItem>>,
190 upserts: &mut Vec<ErasedWrappedItem>,
191 deletes: &mut Vec<Arc<str>>,
192 changes: &mut Vec<QueryChange>,
193 ) {
194 match diff {
195 MapDiff::Initial { entries } => {
196 for (_, item) in entries {
197 let wrapped = ErasedWrappedItem {
198 item: item.clone(),
199 item_type: item.entity_type().into(),
200 };
201 changes.push(QueryChange::Upsert {
202 item: wrapped.clone(),
203 });
204 upserts.push(wrapped);
205 }
206 }
207 MapDiff::Insert { key: _, value } => {
208 let wrapped = ErasedWrappedItem {
209 item: value.clone(),
210 item_type: value.entity_type().into(),
211 };
212 changes.push(QueryChange::Upsert {
213 item: wrapped.clone(),
214 });
215 upserts.push(wrapped);
216 }
217 MapDiff::Update {
218 key: _,
219 old_value: _,
220 new_value,
221 } => {
222 let wrapped = ErasedWrappedItem {
223 item: new_value.clone(),
224 item_type: new_value.entity_type().into(),
225 };
226 changes.push(QueryChange::Upsert {
227 item: wrapped.clone(),
228 });
229 upserts.push(wrapped);
230 }
231 MapDiff::Remove { key, old_value: _ } => {
232 deletes.push(key.clone());
233 changes.push(QueryChange::Delete { id: key.clone() });
234 }
235 MapDiff::Batch { changes: batch } => {
236 for change in batch {
237 push_change(change, upserts, deletes, changes);
238 }
239 }
240 }
241 }
242
243 match diff {
244 MapDiff::Initial { entries } => {
245 let upserts: Vec<ErasedWrappedItem> = entries
246 .iter()
247 .map(|(_, item)| ErasedWrappedItem {
248 item: item.clone(),
249 item_type: item.entity_type().into(),
250 })
251 .collect();
252 let changes = upserts
253 .iter()
254 .cloned()
255 .map(|item| QueryChange::Upsert { item })
256 .collect();
257 QueryResponse {
258 tx,
259 sequence,
260 changes,
261 upserts,
262 deletes: vec![],
263 total_count: None,
264 window: None,
265 }
266 }
267 MapDiff::Insert { key: _, value } => {
268 let upserts = vec![ErasedWrappedItem {
269 item: value.clone(),
270 item_type: value.entity_type().into(),
271 }];
272 let changes = upserts
273 .iter()
274 .cloned()
275 .map(|item| QueryChange::Upsert { item })
276 .collect();
277 QueryResponse {
278 tx,
279 sequence,
280 changes,
281 upserts,
282 deletes: vec![],
283 total_count: None,
284 window: None,
285 }
286 }
287 MapDiff::Update {
288 key: _,
289 old_value: _,
290 new_value,
291 } => {
292 let upserts = vec![ErasedWrappedItem {
293 item: new_value.clone(),
294 item_type: new_value.entity_type().into(),
295 }];
296 let changes = upserts
297 .iter()
298 .cloned()
299 .map(|item| QueryChange::Upsert { item })
300 .collect();
301 QueryResponse {
302 tx,
303 sequence,
304 changes,
305 upserts,
306 deletes: vec![],
307 total_count: None,
308 window: None,
309 }
310 }
311 MapDiff::Remove { key, old_value: _ } => QueryResponse {
312 tx,
313 sequence,
314 changes: vec![QueryChange::Delete { id: key.clone() }],
315 upserts: vec![],
316 deletes: vec![key.clone()],
317 total_count: None,
318 window: None,
319 },
320 MapDiff::Batch { .. } => {
321 let mut upserts = Vec::new();
322 let mut deletes = Vec::new();
323 let mut changes = Vec::new();
324 push_change(diff, &mut upserts, &mut deletes, &mut changes);
325 QueryResponse {
326 tx,
327 sequence,
328 changes,
329 upserts,
330 deletes,
331 total_count: None,
332 window: None,
333 }
334 }
335 }
336 }
337
338 pub fn to_string(&self) -> Result<String, serde_json::Error> {
339 serde_json::to_string(self)
340 }
341}
342
343impl QueryResponse {
344 pub fn get_tx(&self) -> Arc<str> {
345 self.tx.clone()
346 }
347}
348
349#[derive(Debug, Clone, Serialize, Deserialize, TS)]
350#[serde(rename_all = "camelCase")]
351pub struct QueryWindow {
352 pub offset: usize,
353 pub limit: usize,
354}
355
356#[derive(Debug, Clone, Serialize, Deserialize, TS)]
357#[serde(rename_all = "camelCase")]
358pub struct QueryWindowUpdate {
359 pub tx: String,
360 #[serde(default, skip_serializing_if = "Option::is_none")]
361 pub window: Option<QueryWindow>,
362}
363
364#[derive(Debug, Clone, Serialize, Deserialize, TS)]
365#[serde(rename_all = "camelCase")]
366pub struct WrappedQuery {
367 pub query: Value,
368 pub query_id: Arc<str>,
369 pub query_item_type: Arc<str>,
370 #[serde(default, skip_serializing_if = "Option::is_none")]
371 pub window: Option<QueryWindow>,
372}
373
374#[derive(Debug, Clone, Serialize, Deserialize, TS)]
375#[serde(rename_all = "camelCase")]
376pub struct QueryError {
377 pub tx: String,
378 pub query_id: String,
379 pub message: String,
380}
381
382#[derive(Debug, Clone, Deserialize)]
389#[serde(rename_all = "camelCase")]
390pub struct ClientQueryResponse {
391 #[serde(default)]
392 pub changes: Vec<ClientQueryChange>,
393
394 pub deletes: Vec<Arc<str>>,
395
396 pub upserts: Vec<super::item::WrappedItem>,
397
398 pub sequence: u64,
399
400 pub tx: Arc<str>,
401
402 #[serde(default)]
403 pub total_count: Option<usize>,
404
405 #[serde(default)]
406 pub window: Option<QueryWindow>,
407}
408
409#[derive(Debug, Clone, Deserialize)]
410#[serde(tag = "kind", rename_all = "camelCase")]
411pub enum ClientQueryChange {
412 Upsert {
413 item: super::item::WrappedItem,
414 },
415 Delete {
416 id: Arc<str>,
417 },
418 WindowOrder {
419 ids: Vec<Arc<str>>,
420 total_count: usize,
421 #[serde(default)]
422 window: Option<QueryWindow>,
423 },
424}
425
426pub fn wrap_query<Q: QueryId + QueryItemType + Serialize + Clone>(
427 tx: Arc<str>,
428 query: &Q,
429) -> Result<WrappedQuery, serde_json::Error> {
430 Ok(WrappedQuery {
431 query: value_with_tx(tx, query)?,
432 query_id: query.query_id(),
433 query_item_type: query.query_item_type(),
434 window: None,
435 })
436}