1use async_trait::async_trait;
13use futures_core::Stream;
14use serde::{Deserialize, Serialize};
15use serde_json::Value;
16use std::collections::HashMap;
17use std::fmt::Debug;
18use std::pin::Pin;
19
20use crate::prelude::*;
21use crate::types::TnId;
22
23#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
25#[serde(rename_all = "camelCase")]
26pub enum LockMode {
27 Soft,
28 Hard,
29}
30
31#[derive(Debug, Clone)]
33pub struct LockInfo {
34 pub user_id: Box<str>,
35 pub mode: LockMode,
36 pub acquired_at: u64,
37 pub ttl_secs: u64,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42#[serde(tag = "op", rename_all = "camelCase")]
43pub enum AggregateOp {
44 Sum { field: String },
45 Avg { field: String },
46 Min { field: String },
47 Max { field: String },
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
52#[serde(rename_all = "camelCase")]
53pub struct AggregateOptions {
54 pub group_by: String,
56
57 #[serde(default, skip_serializing_if = "Vec::is_empty")]
59 pub ops: Vec<AggregateOp>,
60}
61
62#[derive(Debug, Clone, Default, Serialize, Deserialize)]
67pub struct QueryFilter {
68 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
70 pub equals: HashMap<String, Value>,
71
72 #[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "notEquals")]
74 pub not_equals: HashMap<String, Value>,
75
76 #[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "greaterThan")]
78 pub greater_than: HashMap<String, Value>,
79
80 #[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "greaterThanOrEqual")]
82 pub greater_than_or_equal: HashMap<String, Value>,
83
84 #[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "lessThan")]
86 pub less_than: HashMap<String, Value>,
87
88 #[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "lessThanOrEqual")]
90 pub less_than_or_equal: HashMap<String, Value>,
91
92 #[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "inArray")]
94 pub in_array: HashMap<String, Vec<Value>>,
95
96 #[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "arrayContains")]
98 pub array_contains: HashMap<String, Value>,
99
100 #[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "notInArray")]
102 pub not_in_array: HashMap<String, Vec<Value>>,
103
104 #[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "arrayContainsAny")]
106 pub array_contains_any: HashMap<String, Vec<Value>>,
107
108 #[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "arrayContainsAll")]
110 pub array_contains_all: HashMap<String, Vec<Value>>,
111}
112
113impl QueryFilter {
114 pub fn new() -> Self {
116 Self::default()
117 }
118
119 pub fn equals_one(field: impl Into<String>, value: Value) -> Self {
121 let mut equals = HashMap::new();
122 equals.insert(field.into(), value);
123 Self { equals, ..Default::default() }
124 }
125
126 pub fn with_equals(mut self, field: impl Into<String>, value: Value) -> Self {
128 self.equals.insert(field.into(), value);
129 self
130 }
131
132 pub fn with_not_equals(mut self, field: impl Into<String>, value: Value) -> Self {
134 self.not_equals.insert(field.into(), value);
135 self
136 }
137
138 pub fn with_greater_than(mut self, field: impl Into<String>, value: Value) -> Self {
140 self.greater_than.insert(field.into(), value);
141 self
142 }
143
144 pub fn with_greater_than_or_equal(mut self, field: impl Into<String>, value: Value) -> Self {
146 self.greater_than_or_equal.insert(field.into(), value);
147 self
148 }
149
150 pub fn with_less_than(mut self, field: impl Into<String>, value: Value) -> Self {
152 self.less_than.insert(field.into(), value);
153 self
154 }
155
156 pub fn with_less_than_or_equal(mut self, field: impl Into<String>, value: Value) -> Self {
158 self.less_than_or_equal.insert(field.into(), value);
159 self
160 }
161
162 pub fn with_in_array(mut self, field: impl Into<String>, values: Vec<Value>) -> Self {
164 self.in_array.insert(field.into(), values);
165 self
166 }
167
168 pub fn with_array_contains(mut self, field: impl Into<String>, value: Value) -> Self {
170 self.array_contains.insert(field.into(), value);
171 self
172 }
173
174 pub fn with_not_in_array(mut self, field: impl Into<String>, values: Vec<Value>) -> Self {
176 self.not_in_array.insert(field.into(), values);
177 self
178 }
179
180 pub fn with_array_contains_any(mut self, field: impl Into<String>, values: Vec<Value>) -> Self {
182 self.array_contains_any.insert(field.into(), values);
183 self
184 }
185
186 pub fn with_array_contains_all(mut self, field: impl Into<String>, values: Vec<Value>) -> Self {
188 self.array_contains_all.insert(field.into(), values);
189 self
190 }
191
192 pub fn matches(&self, doc: &Value) -> bool {
194 for (field, expected) in &self.equals {
196 match doc.get(field) {
197 Some(actual) if actual == expected => continue,
198 _ => return false,
199 }
200 }
201
202 for (field, expected) in &self.not_equals {
204 match doc.get(field) {
205 Some(actual) if actual == expected => return false,
206 _ => continue,
207 }
208 }
209
210 for (field, threshold) in &self.greater_than {
212 match doc.get(field) {
213 Some(actual)
214 if compare_json_values(Some(actual), Some(threshold))
215 == std::cmp::Ordering::Greater =>
216 {
217 continue
218 }
219 _ => return false,
220 }
221 }
222
223 for (field, threshold) in &self.greater_than_or_equal {
225 match doc.get(field) {
226 Some(actual) => {
227 let ord = compare_json_values(Some(actual), Some(threshold));
228 if ord == std::cmp::Ordering::Greater || ord == std::cmp::Ordering::Equal {
229 continue;
230 }
231 return false;
232 }
233 _ => return false,
234 }
235 }
236
237 for (field, threshold) in &self.less_than {
239 match doc.get(field) {
240 Some(actual)
241 if compare_json_values(Some(actual), Some(threshold))
242 == std::cmp::Ordering::Less =>
243 {
244 continue
245 }
246 _ => return false,
247 }
248 }
249
250 for (field, threshold) in &self.less_than_or_equal {
252 match doc.get(field) {
253 Some(actual) => {
254 let ord = compare_json_values(Some(actual), Some(threshold));
255 if ord == std::cmp::Ordering::Less || ord == std::cmp::Ordering::Equal {
256 continue;
257 }
258 return false;
259 }
260 _ => return false,
261 }
262 }
263
264 for (field, allowed_values) in &self.in_array {
266 match doc.get(field) {
267 Some(actual) if allowed_values.contains(actual) => continue,
268 _ => return false,
269 }
270 }
271
272 for (field, required_value) in &self.array_contains {
274 match doc.get(field) {
275 Some(Value::Array(arr)) if arr.contains(required_value) => continue,
276 _ => return false,
277 }
278 }
279
280 for (field, excluded_values) in &self.not_in_array {
282 match doc.get(field) {
283 Some(actual) if excluded_values.contains(actual) => return false,
284 _ => continue,
285 }
286 }
287
288 for (field, candidate_values) in &self.array_contains_any {
290 match doc.get(field) {
291 Some(Value::Array(arr)) if candidate_values.iter().any(|v| arr.contains(v)) => {
292 continue
293 }
294 _ => return false,
295 }
296 }
297
298 for (field, required_values) in &self.array_contains_all {
300 match doc.get(field) {
301 Some(Value::Array(arr)) if required_values.iter().all(|v| arr.contains(v)) => {
302 continue
303 }
304 _ => return false,
305 }
306 }
307
308 true
309 }
310
311 pub fn is_empty(&self) -> bool {
313 self.equals.is_empty()
314 && self.not_equals.is_empty()
315 && self.greater_than.is_empty()
316 && self.greater_than_or_equal.is_empty()
317 && self.less_than.is_empty()
318 && self.less_than_or_equal.is_empty()
319 && self.in_array.is_empty()
320 && self.array_contains.is_empty()
321 && self.not_in_array.is_empty()
322 && self.array_contains_any.is_empty()
323 && self.array_contains_all.is_empty()
324 }
325}
326
327#[derive(Debug, Clone, Serialize, Deserialize)]
329pub struct SortField {
330 pub field: String,
332
333 pub ascending: bool,
335}
336
337impl SortField {
338 pub fn asc(field: impl Into<String>) -> Self {
340 Self { field: field.into(), ascending: true }
341 }
342
343 pub fn desc(field: impl Into<String>) -> Self {
345 Self { field: field.into(), ascending: false }
346 }
347}
348
349#[derive(Debug, Clone, Default)]
351pub struct QueryOptions {
352 pub filter: Option<QueryFilter>,
354
355 pub sort: Option<Vec<SortField>>,
357
358 pub limit: Option<u32>,
360
361 pub offset: Option<u32>,
363
364 pub aggregate: Option<AggregateOptions>,
366}
367
368impl QueryOptions {
369 pub fn new() -> Self {
371 Self::default()
372 }
373
374 pub fn with_filter(mut self, filter: QueryFilter) -> Self {
376 self.filter = Some(filter);
377 self
378 }
379
380 pub fn with_sort(mut self, sort: Vec<SortField>) -> Self {
382 self.sort = Some(sort);
383 self
384 }
385
386 pub fn with_limit(mut self, limit: u32) -> Self {
388 self.limit = Some(limit);
389 self
390 }
391
392 pub fn with_offset(mut self, offset: u32) -> Self {
394 self.offset = Some(offset);
395 self
396 }
397
398 pub fn with_aggregate(mut self, aggregate: AggregateOptions) -> Self {
400 self.aggregate = Some(aggregate);
401 self
402 }
403}
404
405#[derive(Debug, Clone)]
407pub struct SubscriptionOptions {
408 pub path: Box<str>,
410
411 pub filter: Option<QueryFilter>,
413}
414
415impl SubscriptionOptions {
416 pub fn all(path: impl Into<Box<str>>) -> Self {
418 Self { path: path.into(), filter: None }
419 }
420
421 pub fn filtered(path: impl Into<Box<str>>, filter: QueryFilter) -> Self {
423 Self { path: path.into(), filter: Some(filter) }
424 }
425}
426
427#[derive(Debug, Clone, Serialize, Deserialize)]
429#[serde(tag = "action", rename_all = "camelCase")]
430pub enum ChangeEvent {
431 Create {
433 path: Box<str>,
435 data: Value,
437 },
438
439 Update {
441 path: Box<str>,
443 data: Value,
445 #[serde(default, skip_serializing_if = "Option::is_none")]
447 old_data: Option<Value>,
448 },
449
450 Delete {
452 path: Box<str>,
454 #[serde(default, skip_serializing_if = "Option::is_none")]
456 old_data: Option<Value>,
457 },
458
459 Lock {
461 path: Box<str>,
463 data: Value,
465 },
466
467 Unlock {
469 path: Box<str>,
471 data: Value,
473 },
474
475 Ready {
477 path: Box<str>,
479 #[serde(default, skip_serializing_if = "Option::is_none")]
481 data: Option<Value>,
482 },
483}
484
485impl ChangeEvent {
486 pub fn path(&self) -> &str {
488 match self {
489 ChangeEvent::Create { path, .. } => path,
490 ChangeEvent::Update { path, .. } => path,
491 ChangeEvent::Delete { path, .. } => path,
492 ChangeEvent::Lock { path, .. } => path,
493 ChangeEvent::Unlock { path, .. } => path,
494 ChangeEvent::Ready { path, .. } => path,
495 }
496 }
497
498 pub fn id(&self) -> Option<&str> {
500 self.path().split('/').next_back()
501 }
502
503 pub fn parent_path(&self) -> Option<&str> {
505 let path = self.path();
506 path.rfind('/').map(|pos| &path[..pos])
507 }
508
509 pub fn data(&self) -> Option<&Value> {
511 match self {
512 ChangeEvent::Create { data, .. } | ChangeEvent::Update { data, .. } => Some(data),
513 ChangeEvent::Lock { data, .. } | ChangeEvent::Unlock { data, .. } => Some(data),
514 ChangeEvent::Delete { .. } => None,
515 ChangeEvent::Ready { data, .. } => data.as_ref(),
516 }
517 }
518
519 pub fn is_create(&self) -> bool {
521 matches!(self, ChangeEvent::Create { .. })
522 }
523
524 pub fn is_update(&self) -> bool {
526 matches!(self, ChangeEvent::Update { .. })
527 }
528
529 pub fn is_delete(&self) -> bool {
531 matches!(self, ChangeEvent::Delete { .. })
532 }
533}
534
535fn compare_json_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
537 match (a, b) {
538 (None, None) => std::cmp::Ordering::Equal,
539 (None, Some(_)) => std::cmp::Ordering::Less,
540 (Some(_), None) => std::cmp::Ordering::Greater,
541 (Some(Value::Number(a)), Some(Value::Number(b))) => {
542 a.as_f64().partial_cmp(&b.as_f64()).unwrap_or(std::cmp::Ordering::Equal)
543 }
544 (Some(Value::String(a)), Some(Value::String(b))) => a.cmp(b),
545 (Some(Value::Bool(a)), Some(Value::Bool(b))) => a.cmp(b),
546 (Some(a), Some(b)) => a.to_string().cmp(&b.to_string()),
547 }
548}
549
550pub fn value_to_group_string(value: &Value) -> String {
552 match value {
553 Value::String(s) => s.clone(),
554 Value::Number(n) => n.to_string(),
555 Value::Bool(b) => b.to_string(),
556 Value::Null => "null".to_string(),
557 _ => serde_json::to_string(value).unwrap_or_default(),
558 }
559}
560
561#[derive(Debug, Clone, Serialize, Deserialize)]
563pub struct DbStats {
564 pub size_bytes: u64,
566
567 pub record_count: u64,
569
570 pub table_count: u32,
572}
573
574#[async_trait]
578pub trait Transaction: Send + Sync {
579 async fn create(&mut self, path: &str, data: Value) -> ClResult<Box<str>>;
581
582 async fn update(&mut self, path: &str, data: Value) -> ClResult<()>;
587
588 async fn delete(&mut self, path: &str) -> ClResult<()>;
590
591 async fn get(&self, path: &str) -> ClResult<Option<Value>>;
603
604 async fn commit(&mut self) -> ClResult<()>;
606
607 async fn rollback(&mut self) -> ClResult<()>;
609}
610
611#[async_trait]
616pub trait RtdbAdapter: Debug + Send + Sync {
617 async fn transaction(&self, tn_id: TnId, db_id: &str) -> ClResult<Box<dyn Transaction>>;
619
620 async fn close_db(&self, tn_id: TnId, db_id: &str) -> ClResult<()>;
622
623 async fn query(
625 &self,
626 tn_id: TnId,
627 db_id: &str,
628 path: &str,
629 opts: QueryOptions,
630 ) -> ClResult<Vec<Value>>;
631
632 async fn get(&self, tn_id: TnId, db_id: &str, path: &str) -> ClResult<Option<Value>>;
634
635 async fn subscribe(
637 &self,
638 tn_id: TnId,
639 db_id: &str,
640 opts: SubscriptionOptions,
641 ) -> ClResult<Pin<Box<dyn Stream<Item = ChangeEvent> + Send>>>;
642
643 async fn create_index(&self, tn_id: TnId, db_id: &str, path: &str, field: &str)
645 -> ClResult<()>;
646
647 async fn stats(&self, tn_id: TnId, db_id: &str) -> ClResult<DbStats>;
649
650 async fn export_all(&self, tn_id: TnId, db_id: &str) -> ClResult<Vec<(Box<str>, Value)>>;
655
656 async fn acquire_lock(
661 &self,
662 tn_id: TnId,
663 db_id: &str,
664 path: &str,
665 user_id: &str,
666 mode: LockMode,
667 conn_id: &str,
668 ) -> ClResult<Option<LockInfo>>;
669
670 async fn release_lock(
672 &self,
673 tn_id: TnId,
674 db_id: &str,
675 path: &str,
676 user_id: &str,
677 conn_id: &str,
678 ) -> ClResult<()>;
679
680 async fn check_lock(&self, tn_id: TnId, db_id: &str, path: &str) -> ClResult<Option<LockInfo>>;
682
683 async fn release_all_locks(
685 &self,
686 tn_id: TnId,
687 db_id: &str,
688 user_id: &str,
689 conn_id: &str,
690 ) -> ClResult<()>;
691}
692
693