1use async_trait::async_trait;
16use futures_core::Stream;
17use serde::{Deserialize, Serialize};
18use serde_json::Value;
19use std::collections::HashMap;
20use std::fmt::Debug;
21use std::pin::Pin;
22
23use crate::prelude::*;
24
25#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
27#[serde(rename_all = "camelCase")]
28pub enum LockMode {
29 Soft,
30 Hard,
31}
32
33#[derive(Debug, Clone)]
35pub struct LockInfo {
36 pub user_id: Box<str>,
37 pub mode: LockMode,
38 pub acquired_at: u64,
39 pub ttl_secs: u64,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44#[serde(tag = "op", rename_all = "camelCase")]
45pub enum AggregateOp {
46 Sum { field: String },
47 Avg { field: String },
48 Min { field: String },
49 Max { field: String },
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
54#[serde(rename_all = "camelCase")]
55pub struct AggregateOptions {
56 pub group_by: String,
58
59 #[serde(default, skip_serializing_if = "Vec::is_empty")]
61 pub ops: Vec<AggregateOp>,
62}
63
64#[derive(Debug, Clone, Default, Serialize, Deserialize)]
69pub struct QueryFilter {
70 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
72 pub equals: HashMap<String, Value>,
73
74 #[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "notEquals")]
76 pub not_equals: HashMap<String, Value>,
77
78 #[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "greaterThan")]
80 pub greater_than: HashMap<String, Value>,
81
82 #[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "greaterThanOrEqual")]
84 pub greater_than_or_equal: HashMap<String, Value>,
85
86 #[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "lessThan")]
88 pub less_than: HashMap<String, Value>,
89
90 #[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "lessThanOrEqual")]
92 pub less_than_or_equal: HashMap<String, Value>,
93
94 #[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "inArray")]
96 pub in_array: HashMap<String, Vec<Value>>,
97
98 #[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "arrayContains")]
100 pub array_contains: HashMap<String, Value>,
101
102 #[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "notInArray")]
104 pub not_in_array: HashMap<String, Vec<Value>>,
105
106 #[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "arrayContainsAny")]
108 pub array_contains_any: HashMap<String, Vec<Value>>,
109
110 #[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "arrayContainsAll")]
112 pub array_contains_all: HashMap<String, Vec<Value>>,
113}
114
115impl QueryFilter {
116 pub fn new() -> Self {
118 Self::default()
119 }
120
121 pub fn equals_one(field: impl Into<String>, value: Value) -> Self {
123 let mut equals = HashMap::new();
124 equals.insert(field.into(), value);
125 Self { equals, ..Default::default() }
126 }
127
128 pub fn with_equals(mut self, field: impl Into<String>, value: Value) -> Self {
130 self.equals.insert(field.into(), value);
131 self
132 }
133
134 pub fn with_not_equals(mut self, field: impl Into<String>, value: Value) -> Self {
136 self.not_equals.insert(field.into(), value);
137 self
138 }
139
140 pub fn with_greater_than(mut self, field: impl Into<String>, value: Value) -> Self {
142 self.greater_than.insert(field.into(), value);
143 self
144 }
145
146 pub fn with_greater_than_or_equal(mut self, field: impl Into<String>, value: Value) -> Self {
148 self.greater_than_or_equal.insert(field.into(), value);
149 self
150 }
151
152 pub fn with_less_than(mut self, field: impl Into<String>, value: Value) -> Self {
154 self.less_than.insert(field.into(), value);
155 self
156 }
157
158 pub fn with_less_than_or_equal(mut self, field: impl Into<String>, value: Value) -> Self {
160 self.less_than_or_equal.insert(field.into(), value);
161 self
162 }
163
164 pub fn with_in_array(mut self, field: impl Into<String>, values: Vec<Value>) -> Self {
166 self.in_array.insert(field.into(), values);
167 self
168 }
169
170 pub fn with_array_contains(mut self, field: impl Into<String>, value: Value) -> Self {
172 self.array_contains.insert(field.into(), value);
173 self
174 }
175
176 pub fn with_not_in_array(mut self, field: impl Into<String>, values: Vec<Value>) -> Self {
178 self.not_in_array.insert(field.into(), values);
179 self
180 }
181
182 pub fn with_array_contains_any(mut self, field: impl Into<String>, values: Vec<Value>) -> Self {
184 self.array_contains_any.insert(field.into(), values);
185 self
186 }
187
188 pub fn with_array_contains_all(mut self, field: impl Into<String>, values: Vec<Value>) -> Self {
190 self.array_contains_all.insert(field.into(), values);
191 self
192 }
193
194 pub fn matches(&self, doc: &Value) -> bool {
196 for (field, expected) in &self.equals {
198 if doc.get(field) != Some(expected) {
199 return false;
200 }
201 }
202
203 for (field, expected) in &self.not_equals {
205 if doc.get(field) == Some(expected) {
206 return false;
207 }
208 }
209
210 for (field, threshold) in &self.greater_than {
212 match doc.get(field) {
213 Some(actual)
214 if compare_json_values(Some(actual), Some(threshold))
215 == std::cmp::Ordering::Greater => {}
216 _ => return false,
217 }
218 }
219
220 for (field, threshold) in &self.greater_than_or_equal {
222 match doc.get(field) {
223 Some(actual) => {
224 let ord = compare_json_values(Some(actual), Some(threshold));
225 if ord != std::cmp::Ordering::Greater && ord != std::cmp::Ordering::Equal {
226 return false;
227 }
228 }
229 _ => return false,
230 }
231 }
232
233 for (field, threshold) in &self.less_than {
235 match doc.get(field) {
236 Some(actual)
237 if compare_json_values(Some(actual), Some(threshold))
238 == std::cmp::Ordering::Less => {}
239 _ => return false,
240 }
241 }
242
243 for (field, threshold) in &self.less_than_or_equal {
245 match doc.get(field) {
246 Some(actual) => {
247 let ord = compare_json_values(Some(actual), Some(threshold));
248 if ord != std::cmp::Ordering::Less && ord != std::cmp::Ordering::Equal {
249 return false;
250 }
251 }
252 _ => return false,
253 }
254 }
255
256 for (field, allowed_values) in &self.in_array {
258 match doc.get(field) {
259 Some(actual) if allowed_values.contains(actual) => {}
260 _ => return false,
261 }
262 }
263
264 for (field, required_value) in &self.array_contains {
266 match doc.get(field) {
267 Some(Value::Array(arr)) if arr.contains(required_value) => {}
268 _ => return false,
269 }
270 }
271
272 for (field, excluded_values) in &self.not_in_array {
274 if let Some(actual) = doc.get(field) {
275 if excluded_values.contains(actual) {
276 return false;
277 }
278 }
279 }
280
281 for (field, candidate_values) in &self.array_contains_any {
283 match doc.get(field) {
284 Some(Value::Array(arr)) if candidate_values.iter().any(|v| arr.contains(v)) => {}
285 _ => return false,
286 }
287 }
288
289 for (field, required_values) in &self.array_contains_all {
291 match doc.get(field) {
292 Some(Value::Array(arr)) if required_values.iter().all(|v| arr.contains(v)) => {}
293 _ => return false,
294 }
295 }
296
297 true
298 }
299
300 pub fn is_empty(&self) -> bool {
302 self.equals.is_empty()
303 && self.not_equals.is_empty()
304 && self.greater_than.is_empty()
305 && self.greater_than_or_equal.is_empty()
306 && self.less_than.is_empty()
307 && self.less_than_or_equal.is_empty()
308 && self.in_array.is_empty()
309 && self.array_contains.is_empty()
310 && self.not_in_array.is_empty()
311 && self.array_contains_any.is_empty()
312 && self.array_contains_all.is_empty()
313 }
314}
315
316#[derive(Debug, Clone, Serialize, Deserialize)]
318pub struct SortField {
319 pub field: String,
321
322 pub ascending: bool,
324}
325
326impl SortField {
327 pub fn asc(field: impl Into<String>) -> Self {
329 Self { field: field.into(), ascending: true }
330 }
331
332 pub fn desc(field: impl Into<String>) -> Self {
334 Self { field: field.into(), ascending: false }
335 }
336}
337
338#[derive(Debug, Clone, Default)]
340pub struct QueryOptions {
341 pub filter: Option<QueryFilter>,
343
344 pub sort: Option<Vec<SortField>>,
346
347 pub limit: Option<u32>,
349
350 pub offset: Option<u32>,
352
353 pub aggregate: Option<AggregateOptions>,
355}
356
357impl QueryOptions {
358 pub fn new() -> Self {
360 Self::default()
361 }
362
363 pub fn with_filter(mut self, filter: QueryFilter) -> Self {
365 self.filter = Some(filter);
366 self
367 }
368
369 pub fn with_sort(mut self, sort: Vec<SortField>) -> Self {
371 self.sort = Some(sort);
372 self
373 }
374
375 pub fn with_limit(mut self, limit: u32) -> Self {
377 self.limit = Some(limit);
378 self
379 }
380
381 pub fn with_offset(mut self, offset: u32) -> Self {
383 self.offset = Some(offset);
384 self
385 }
386
387 pub fn with_aggregate(mut self, aggregate: AggregateOptions) -> Self {
389 self.aggregate = Some(aggregate);
390 self
391 }
392}
393
394#[derive(Debug, Clone)]
396pub struct SubscriptionOptions {
397 pub path: Box<str>,
399
400 pub filter: Option<QueryFilter>,
402}
403
404impl SubscriptionOptions {
405 pub fn all(path: impl Into<Box<str>>) -> Self {
407 Self { path: path.into(), filter: None }
408 }
409
410 pub fn filtered(path: impl Into<Box<str>>, filter: QueryFilter) -> Self {
412 Self { path: path.into(), filter: Some(filter) }
413 }
414}
415
416#[derive(Debug, Clone, Serialize, Deserialize)]
418#[serde(tag = "action", rename_all = "camelCase")]
419pub enum ChangeEvent {
420 Create {
422 path: Box<str>,
424 data: Value,
426 },
427
428 Update {
430 path: Box<str>,
432 data: Value,
434 #[serde(default, skip_serializing_if = "Option::is_none")]
436 old_data: Option<Value>,
437 },
438
439 Delete {
441 path: Box<str>,
443 #[serde(default, skip_serializing_if = "Option::is_none")]
445 old_data: Option<Value>,
446 },
447
448 Lock {
450 path: Box<str>,
452 data: Value,
454 },
455
456 Unlock {
458 path: Box<str>,
460 data: Value,
462 },
463
464 Ready {
466 path: Box<str>,
468 #[serde(default, skip_serializing_if = "Option::is_none")]
470 data: Option<Value>,
471 },
472}
473
474impl ChangeEvent {
475 pub fn path(&self) -> &str {
477 match self {
478 ChangeEvent::Create { path, .. }
479 | ChangeEvent::Update { path, .. }
480 | ChangeEvent::Delete { path, .. }
481 | ChangeEvent::Lock { path, .. }
482 | ChangeEvent::Unlock { path, .. }
483 | ChangeEvent::Ready { path, .. } => path,
484 }
485 }
486
487 pub fn id(&self) -> Option<&str> {
489 self.path().split('/').next_back()
490 }
491
492 pub fn parent_path(&self) -> Option<&str> {
494 let path = self.path();
495 path.rfind('/').map(|pos| &path[..pos])
496 }
497
498 pub fn data(&self) -> Option<&Value> {
500 match self {
501 ChangeEvent::Create { data, .. }
502 | ChangeEvent::Update { data, .. }
503 | ChangeEvent::Lock { data, .. }
504 | ChangeEvent::Unlock { data, .. } => Some(data),
505 ChangeEvent::Delete { .. } => None,
506 ChangeEvent::Ready { data, .. } => data.as_ref(),
507 }
508 }
509
510 pub fn is_create(&self) -> bool {
512 matches!(self, ChangeEvent::Create { .. })
513 }
514
515 pub fn is_update(&self) -> bool {
517 matches!(self, ChangeEvent::Update { .. })
518 }
519
520 pub fn is_delete(&self) -> bool {
522 matches!(self, ChangeEvent::Delete { .. })
523 }
524}
525
526fn compare_json_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
528 match (a, b) {
529 (None, None) => std::cmp::Ordering::Equal,
530 (None, Some(_)) => std::cmp::Ordering::Less,
531 (Some(_), None) => std::cmp::Ordering::Greater,
532 (Some(Value::Number(a)), Some(Value::Number(b))) => {
533 a.as_f64().partial_cmp(&b.as_f64()).unwrap_or(std::cmp::Ordering::Equal)
534 }
535 (Some(Value::String(a)), Some(Value::String(b))) => a.cmp(b),
536 (Some(Value::Bool(a)), Some(Value::Bool(b))) => a.cmp(b),
537 (Some(a), Some(b)) => a.to_string().cmp(&b.to_string()),
538 }
539}
540
541pub fn value_to_group_string(value: &Value) -> String {
543 match value {
544 Value::String(s) => s.clone(),
545 Value::Number(n) => n.to_string(),
546 Value::Bool(b) => b.to_string(),
547 Value::Null => "null".to_string(),
548 _ => serde_json::to_string(value).unwrap_or_default(),
549 }
550}
551
552#[derive(Debug, Clone, Serialize, Deserialize)]
554pub struct DbStats {
555 pub size_bytes: u64,
557
558 pub record_count: u64,
560
561 pub table_count: u32,
563}
564
565#[async_trait]
569pub trait Transaction: Send + Sync {
570 async fn create(&mut self, path: &str, data: Value) -> ClResult<Box<str>>;
572
573 async fn update(&mut self, path: &str, data: Value) -> ClResult<()>;
578
579 async fn delete(&mut self, path: &str) -> ClResult<()>;
581
582 async fn get(&self, path: &str) -> ClResult<Option<Value>>;
594
595 async fn commit(&mut self) -> ClResult<()>;
597
598 async fn rollback(&mut self) -> ClResult<()>;
600}
601
602#[async_trait]
607pub trait RtdbAdapter: Debug + Send + Sync {
608 async fn transaction(&self, tn_id: TnId, db_id: &str) -> ClResult<Box<dyn Transaction>>;
610
611 async fn close_db(&self, tn_id: TnId, db_id: &str) -> ClResult<()>;
613
614 async fn query(
616 &self,
617 tn_id: TnId,
618 db_id: &str,
619 path: &str,
620 opts: QueryOptions,
621 ) -> ClResult<Vec<Value>>;
622
623 async fn get(&self, tn_id: TnId, db_id: &str, path: &str) -> ClResult<Option<Value>>;
625
626 async fn subscribe(
628 &self,
629 tn_id: TnId,
630 db_id: &str,
631 opts: SubscriptionOptions,
632 ) -> ClResult<Pin<Box<dyn Stream<Item = ChangeEvent> + Send>>>;
633
634 async fn create_index(&self, tn_id: TnId, db_id: &str, path: &str, field: &str)
636 -> ClResult<()>;
637
638 async fn stats(&self, tn_id: TnId, db_id: &str) -> ClResult<DbStats>;
640
641 async fn export_all(&self, tn_id: TnId, db_id: &str) -> ClResult<Vec<(Box<str>, Value)>>;
646
647 async fn acquire_lock(
652 &self,
653 tn_id: TnId,
654 db_id: &str,
655 path: &str,
656 user_id: &str,
657 mode: LockMode,
658 conn_id: &str,
659 ) -> ClResult<Option<LockInfo>>;
660
661 async fn release_lock(
663 &self,
664 tn_id: TnId,
665 db_id: &str,
666 path: &str,
667 user_id: &str,
668 conn_id: &str,
669 ) -> ClResult<()>;
670
671 async fn check_lock(&self, tn_id: TnId, db_id: &str, path: &str) -> ClResult<Option<LockInfo>>;
673
674 async fn release_all_locks(
676 &self,
677 tn_id: TnId,
678 db_id: &str,
679 user_id: &str,
680 conn_id: &str,
681 ) -> ClResult<()>;
682}
683
684