1use async_trait::async_trait;
13use futures_core::Stream;
14use serde::{Deserialize, Serialize};
15use serde_json::Value;
16use std::collections::HashMap;
17use std::fmt::Debug;
18use std::pin::Pin;
19
20use crate::prelude::*;
21
22#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
24#[serde(rename_all = "camelCase")]
25pub enum LockMode {
26 Soft,
27 Hard,
28}
29
30#[derive(Debug, Clone)]
32pub struct LockInfo {
33 pub user_id: Box<str>,
34 pub mode: LockMode,
35 pub acquired_at: u64,
36 pub ttl_secs: u64,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
41#[serde(tag = "op", rename_all = "camelCase")]
42pub enum AggregateOp {
43 Sum { field: String },
44 Avg { field: String },
45 Min { field: String },
46 Max { field: String },
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
51#[serde(rename_all = "camelCase")]
52pub struct AggregateOptions {
53 pub group_by: String,
55
56 #[serde(default, skip_serializing_if = "Vec::is_empty")]
58 pub ops: Vec<AggregateOp>,
59}
60
61#[derive(Debug, Clone, Default, Serialize, Deserialize)]
66pub struct QueryFilter {
67 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
69 pub equals: HashMap<String, Value>,
70
71 #[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "notEquals")]
73 pub not_equals: HashMap<String, Value>,
74
75 #[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "greaterThan")]
77 pub greater_than: HashMap<String, Value>,
78
79 #[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "greaterThanOrEqual")]
81 pub greater_than_or_equal: HashMap<String, Value>,
82
83 #[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "lessThan")]
85 pub less_than: HashMap<String, Value>,
86
87 #[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "lessThanOrEqual")]
89 pub less_than_or_equal: HashMap<String, Value>,
90
91 #[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "inArray")]
93 pub in_array: HashMap<String, Vec<Value>>,
94
95 #[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "arrayContains")]
97 pub array_contains: HashMap<String, Value>,
98
99 #[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "notInArray")]
101 pub not_in_array: HashMap<String, Vec<Value>>,
102
103 #[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "arrayContainsAny")]
105 pub array_contains_any: HashMap<String, Vec<Value>>,
106
107 #[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "arrayContainsAll")]
109 pub array_contains_all: HashMap<String, Vec<Value>>,
110}
111
112impl QueryFilter {
113 pub fn new() -> Self {
115 Self::default()
116 }
117
118 pub fn equals_one(field: impl Into<String>, value: Value) -> Self {
120 let mut equals = HashMap::new();
121 equals.insert(field.into(), value);
122 Self { equals, ..Default::default() }
123 }
124
125 pub fn with_equals(mut self, field: impl Into<String>, value: Value) -> Self {
127 self.equals.insert(field.into(), value);
128 self
129 }
130
131 pub fn with_not_equals(mut self, field: impl Into<String>, value: Value) -> Self {
133 self.not_equals.insert(field.into(), value);
134 self
135 }
136
137 pub fn with_greater_than(mut self, field: impl Into<String>, value: Value) -> Self {
139 self.greater_than.insert(field.into(), value);
140 self
141 }
142
143 pub fn with_greater_than_or_equal(mut self, field: impl Into<String>, value: Value) -> Self {
145 self.greater_than_or_equal.insert(field.into(), value);
146 self
147 }
148
149 pub fn with_less_than(mut self, field: impl Into<String>, value: Value) -> Self {
151 self.less_than.insert(field.into(), value);
152 self
153 }
154
155 pub fn with_less_than_or_equal(mut self, field: impl Into<String>, value: Value) -> Self {
157 self.less_than_or_equal.insert(field.into(), value);
158 self
159 }
160
161 pub fn with_in_array(mut self, field: impl Into<String>, values: Vec<Value>) -> Self {
163 self.in_array.insert(field.into(), values);
164 self
165 }
166
167 pub fn with_array_contains(mut self, field: impl Into<String>, value: Value) -> Self {
169 self.array_contains.insert(field.into(), value);
170 self
171 }
172
173 pub fn with_not_in_array(mut self, field: impl Into<String>, values: Vec<Value>) -> Self {
175 self.not_in_array.insert(field.into(), values);
176 self
177 }
178
179 pub fn with_array_contains_any(mut self, field: impl Into<String>, values: Vec<Value>) -> Self {
181 self.array_contains_any.insert(field.into(), values);
182 self
183 }
184
185 pub fn with_array_contains_all(mut self, field: impl Into<String>, values: Vec<Value>) -> Self {
187 self.array_contains_all.insert(field.into(), values);
188 self
189 }
190
191 pub fn matches(&self, doc: &Value) -> bool {
193 for (field, expected) in &self.equals {
195 if doc.get(field) != Some(expected) {
196 return false;
197 }
198 }
199
200 for (field, expected) in &self.not_equals {
202 if doc.get(field) == Some(expected) {
203 return false;
204 }
205 }
206
207 for (field, threshold) in &self.greater_than {
209 match doc.get(field) {
210 Some(actual)
211 if compare_json_values(Some(actual), Some(threshold))
212 == std::cmp::Ordering::Greater => {}
213 _ => return false,
214 }
215 }
216
217 for (field, threshold) in &self.greater_than_or_equal {
219 match doc.get(field) {
220 Some(actual) => {
221 let ord = compare_json_values(Some(actual), Some(threshold));
222 if ord != std::cmp::Ordering::Greater && ord != std::cmp::Ordering::Equal {
223 return false;
224 }
225 }
226 _ => return false,
227 }
228 }
229
230 for (field, threshold) in &self.less_than {
232 match doc.get(field) {
233 Some(actual)
234 if compare_json_values(Some(actual), Some(threshold))
235 == std::cmp::Ordering::Less => {}
236 _ => return false,
237 }
238 }
239
240 for (field, threshold) in &self.less_than_or_equal {
242 match doc.get(field) {
243 Some(actual) => {
244 let ord = compare_json_values(Some(actual), Some(threshold));
245 if ord != std::cmp::Ordering::Less && ord != std::cmp::Ordering::Equal {
246 return false;
247 }
248 }
249 _ => return false,
250 }
251 }
252
253 for (field, allowed_values) in &self.in_array {
255 match doc.get(field) {
256 Some(actual) if allowed_values.contains(actual) => {}
257 _ => return false,
258 }
259 }
260
261 for (field, required_value) in &self.array_contains {
263 match doc.get(field) {
264 Some(Value::Array(arr)) if arr.contains(required_value) => {}
265 _ => return false,
266 }
267 }
268
269 for (field, excluded_values) in &self.not_in_array {
271 if let Some(actual) = doc.get(field) {
272 if excluded_values.contains(actual) {
273 return false;
274 }
275 }
276 }
277
278 for (field, candidate_values) in &self.array_contains_any {
280 match doc.get(field) {
281 Some(Value::Array(arr)) if candidate_values.iter().any(|v| arr.contains(v)) => {}
282 _ => return false,
283 }
284 }
285
286 for (field, required_values) in &self.array_contains_all {
288 match doc.get(field) {
289 Some(Value::Array(arr)) if required_values.iter().all(|v| arr.contains(v)) => {}
290 _ => return false,
291 }
292 }
293
294 true
295 }
296
297 pub fn is_empty(&self) -> bool {
299 self.equals.is_empty()
300 && self.not_equals.is_empty()
301 && self.greater_than.is_empty()
302 && self.greater_than_or_equal.is_empty()
303 && self.less_than.is_empty()
304 && self.less_than_or_equal.is_empty()
305 && self.in_array.is_empty()
306 && self.array_contains.is_empty()
307 && self.not_in_array.is_empty()
308 && self.array_contains_any.is_empty()
309 && self.array_contains_all.is_empty()
310 }
311}
312
313#[derive(Debug, Clone, Serialize, Deserialize)]
315pub struct SortField {
316 pub field: String,
318
319 pub ascending: bool,
321}
322
323impl SortField {
324 pub fn asc(field: impl Into<String>) -> Self {
326 Self { field: field.into(), ascending: true }
327 }
328
329 pub fn desc(field: impl Into<String>) -> Self {
331 Self { field: field.into(), ascending: false }
332 }
333}
334
335#[derive(Debug, Clone, Default)]
337pub struct QueryOptions {
338 pub filter: Option<QueryFilter>,
340
341 pub sort: Option<Vec<SortField>>,
343
344 pub limit: Option<u32>,
346
347 pub offset: Option<u32>,
349
350 pub aggregate: Option<AggregateOptions>,
352}
353
354impl QueryOptions {
355 pub fn new() -> Self {
357 Self::default()
358 }
359
360 pub fn with_filter(mut self, filter: QueryFilter) -> Self {
362 self.filter = Some(filter);
363 self
364 }
365
366 pub fn with_sort(mut self, sort: Vec<SortField>) -> Self {
368 self.sort = Some(sort);
369 self
370 }
371
372 pub fn with_limit(mut self, limit: u32) -> Self {
374 self.limit = Some(limit);
375 self
376 }
377
378 pub fn with_offset(mut self, offset: u32) -> Self {
380 self.offset = Some(offset);
381 self
382 }
383
384 pub fn with_aggregate(mut self, aggregate: AggregateOptions) -> Self {
386 self.aggregate = Some(aggregate);
387 self
388 }
389}
390
391#[derive(Debug, Clone)]
393pub struct SubscriptionOptions {
394 pub path: Box<str>,
396
397 pub filter: Option<QueryFilter>,
399}
400
401impl SubscriptionOptions {
402 pub fn all(path: impl Into<Box<str>>) -> Self {
404 Self { path: path.into(), filter: None }
405 }
406
407 pub fn filtered(path: impl Into<Box<str>>, filter: QueryFilter) -> Self {
409 Self { path: path.into(), filter: Some(filter) }
410 }
411}
412
413#[derive(Debug, Clone, Serialize, Deserialize)]
415#[serde(tag = "action", rename_all = "camelCase")]
416pub enum ChangeEvent {
417 Create {
419 path: Box<str>,
421 data: Value,
423 },
424
425 Update {
427 path: Box<str>,
429 data: Value,
431 #[serde(default, skip_serializing_if = "Option::is_none")]
433 old_data: Option<Value>,
434 },
435
436 Delete {
438 path: Box<str>,
440 #[serde(default, skip_serializing_if = "Option::is_none")]
442 old_data: Option<Value>,
443 },
444
445 Lock {
447 path: Box<str>,
449 data: Value,
451 },
452
453 Unlock {
455 path: Box<str>,
457 data: Value,
459 },
460
461 Ready {
463 path: Box<str>,
465 #[serde(default, skip_serializing_if = "Option::is_none")]
467 data: Option<Value>,
468 },
469}
470
471impl ChangeEvent {
472 pub fn path(&self) -> &str {
474 match self {
475 ChangeEvent::Create { path, .. }
476 | ChangeEvent::Update { path, .. }
477 | ChangeEvent::Delete { path, .. }
478 | ChangeEvent::Lock { path, .. }
479 | ChangeEvent::Unlock { path, .. }
480 | ChangeEvent::Ready { path, .. } => path,
481 }
482 }
483
484 pub fn id(&self) -> Option<&str> {
486 self.path().split('/').next_back()
487 }
488
489 pub fn parent_path(&self) -> Option<&str> {
491 let path = self.path();
492 path.rfind('/').map(|pos| &path[..pos])
493 }
494
495 pub fn data(&self) -> Option<&Value> {
497 match self {
498 ChangeEvent::Create { data, .. }
499 | ChangeEvent::Update { data, .. }
500 | ChangeEvent::Lock { data, .. }
501 | ChangeEvent::Unlock { data, .. } => Some(data),
502 ChangeEvent::Delete { .. } => None,
503 ChangeEvent::Ready { data, .. } => data.as_ref(),
504 }
505 }
506
507 pub fn is_create(&self) -> bool {
509 matches!(self, ChangeEvent::Create { .. })
510 }
511
512 pub fn is_update(&self) -> bool {
514 matches!(self, ChangeEvent::Update { .. })
515 }
516
517 pub fn is_delete(&self) -> bool {
519 matches!(self, ChangeEvent::Delete { .. })
520 }
521}
522
523fn compare_json_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
525 match (a, b) {
526 (None, None) => std::cmp::Ordering::Equal,
527 (None, Some(_)) => std::cmp::Ordering::Less,
528 (Some(_), None) => std::cmp::Ordering::Greater,
529 (Some(Value::Number(a)), Some(Value::Number(b))) => {
530 a.as_f64().partial_cmp(&b.as_f64()).unwrap_or(std::cmp::Ordering::Equal)
531 }
532 (Some(Value::String(a)), Some(Value::String(b))) => a.cmp(b),
533 (Some(Value::Bool(a)), Some(Value::Bool(b))) => a.cmp(b),
534 (Some(a), Some(b)) => a.to_string().cmp(&b.to_string()),
535 }
536}
537
538pub fn value_to_group_string(value: &Value) -> String {
540 match value {
541 Value::String(s) => s.clone(),
542 Value::Number(n) => n.to_string(),
543 Value::Bool(b) => b.to_string(),
544 Value::Null => "null".to_string(),
545 _ => serde_json::to_string(value).unwrap_or_default(),
546 }
547}
548
549#[derive(Debug, Clone, Serialize, Deserialize)]
551pub struct DbStats {
552 pub size_bytes: u64,
554
555 pub record_count: u64,
557
558 pub table_count: u32,
560}
561
562#[async_trait]
566pub trait Transaction: Send + Sync {
567 async fn create(&mut self, path: &str, data: Value) -> ClResult<Box<str>>;
569
570 async fn update(&mut self, path: &str, data: Value) -> ClResult<()>;
575
576 async fn delete(&mut self, path: &str) -> ClResult<()>;
578
579 async fn get(&self, path: &str) -> ClResult<Option<Value>>;
591
592 async fn commit(&mut self) -> ClResult<()>;
594
595 async fn rollback(&mut self) -> ClResult<()>;
597}
598
599#[async_trait]
604pub trait RtdbAdapter: Debug + Send + Sync {
605 async fn transaction(&self, tn_id: TnId, db_id: &str) -> ClResult<Box<dyn Transaction>>;
607
608 async fn close_db(&self, tn_id: TnId, db_id: &str) -> ClResult<()>;
610
611 async fn query(
613 &self,
614 tn_id: TnId,
615 db_id: &str,
616 path: &str,
617 opts: QueryOptions,
618 ) -> ClResult<Vec<Value>>;
619
620 async fn get(&self, tn_id: TnId, db_id: &str, path: &str) -> ClResult<Option<Value>>;
622
623 async fn subscribe(
625 &self,
626 tn_id: TnId,
627 db_id: &str,
628 opts: SubscriptionOptions,
629 ) -> ClResult<Pin<Box<dyn Stream<Item = ChangeEvent> + Send>>>;
630
631 async fn create_index(&self, tn_id: TnId, db_id: &str, path: &str, field: &str)
633 -> ClResult<()>;
634
635 async fn stats(&self, tn_id: TnId, db_id: &str) -> ClResult<DbStats>;
637
638 async fn export_all(&self, tn_id: TnId, db_id: &str) -> ClResult<Vec<(Box<str>, Value)>>;
643
644 async fn acquire_lock(
649 &self,
650 tn_id: TnId,
651 db_id: &str,
652 path: &str,
653 user_id: &str,
654 mode: LockMode,
655 conn_id: &str,
656 ) -> ClResult<Option<LockInfo>>;
657
658 async fn release_lock(
660 &self,
661 tn_id: TnId,
662 db_id: &str,
663 path: &str,
664 user_id: &str,
665 conn_id: &str,
666 ) -> ClResult<()>;
667
668 async fn check_lock(&self, tn_id: TnId, db_id: &str, path: &str) -> ClResult<Option<LockInfo>>;
670
671 async fn release_all_locks(
673 &self,
674 tn_id: TnId,
675 db_id: &str,
676 user_id: &str,
677 conn_id: &str,
678 ) -> ClResult<()>;
679}
680
681