1use std::collections::{BTreeMap, HashMap};
2use std::convert::TryInto;
3use std::fmt;
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::{Arc, LazyLock, Mutex};
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use serde_json::{Map, Number, Value};
9
10use crate::app;
11use crate::app::FirebaseApp;
12use crate::component::types::{
13 ComponentError, DynService, InstanceFactoryOptions, InstantiationMode,
14};
15use crate::component::{Component, ComponentType};
16use crate::database::backend::{select_backend, DatabaseBackend};
17use crate::database::constants::DATABASE_COMPONENT_NAME;
18use crate::database::error::{internal_error, invalid_argument, DatabaseResult};
19use crate::database::on_disconnect::OnDisconnect;
20use crate::database::push_id::next_push_id;
21use crate::database::query::{QueryBound, QueryIndex, QueryLimit, QueryParams};
22
23#[derive(Clone, Debug)]
24pub struct Database {
25 inner: Arc<DatabaseInner>,
26}
27
28struct DatabaseInner {
29 app: FirebaseApp,
30 backend: Arc<dyn DatabaseBackend>,
31 listeners: Mutex<HashMap<u64, Listener>>,
32 next_listener_id: AtomicU64,
33}
34
35impl fmt::Debug for DatabaseInner {
36 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37 f.debug_struct("DatabaseInner")
38 .field("app", &self.app.name())
39 .field("backend", &"dynamic")
40 .finish()
41 }
42}
43
44#[derive(Clone, Debug)]
45pub struct DatabaseReference {
46 database: Database,
47 path: Vec<String>,
48}
49
50#[derive(Clone, Debug)]
53pub struct DatabaseQuery {
54 reference: DatabaseReference,
55 params: QueryParams,
56}
57
58#[derive(Clone, Debug)]
61pub struct QueryConstraint {
62 kind: QueryConstraintKind,
63}
64
65#[derive(Clone, Debug)]
66enum QueryConstraintKind {
67 OrderByChild(String),
68 OrderByKey,
69 OrderByValue,
70 OrderByPriority,
71 Start {
72 value: Value,
73 name: Option<String>,
74 inclusive: bool,
75 },
76 End {
77 value: Value,
78 name: Option<String>,
79 inclusive: bool,
80 },
81 LimitFirst(u32),
82 LimitLast(u32),
83 EqualTo {
84 value: Value,
85 name: Option<String>,
86 },
87}
88
89type ValueListenerCallback = Arc<dyn Fn(DataSnapshot) + Send + Sync>;
90type ChildListenerCallback = Arc<dyn Fn(DataSnapshot, Option<String>) + Send + Sync>;
91
92#[derive(Clone, Copy, Debug, PartialEq, Eq)]
93enum ChildEventType {
94 Added,
95 Changed,
96 Removed,
97}
98
99#[derive(Clone)]
100enum ListenerKind {
101 Value(ValueListenerCallback),
102 Child {
103 event: ChildEventType,
104 callback: ChildListenerCallback,
105 },
106}
107
108#[derive(Clone)]
109struct Listener {
110 target: ListenerTarget,
111 kind: ListenerKind,
112}
113
114#[derive(Clone)]
115enum ListenerTarget {
116 Reference(Vec<String>),
117 Query {
118 path: Vec<String>,
119 params: QueryParams,
120 },
121}
122
123impl ListenerTarget {
124 fn matches(&self, changed_path: &[String]) -> bool {
125 match self {
126 ListenerTarget::Reference(path) => paths_related(path, changed_path),
127 ListenerTarget::Query { path, .. } => paths_related(path, changed_path),
128 }
129 }
130}
131
132#[derive(Clone, Debug)]
135pub struct DataSnapshot {
136 reference: DatabaseReference,
137 value: Value,
138}
139
140impl DataSnapshot {
141 pub fn reference(&self) -> &DatabaseReference {
142 &self.reference
143 }
144
145 pub fn value(&self) -> &Value {
146 &self.value
147 }
148
149 pub fn exists(&self) -> bool {
150 !self.value.is_null()
151 }
152
153 pub fn key(&self) -> Option<&str> {
154 self.reference.key()
155 }
156
157 pub fn into_value(self) -> Value {
158 self.value
159 }
160
161 pub fn child(&self, relative_path: &str) -> DatabaseResult<DataSnapshot> {
164 let segments = normalize_path(relative_path)?;
165 let child_reference = self.reference.child(relative_path)?;
166 let value = get_value_at_path(&self.value, &segments).unwrap_or(Value::Null);
167 Ok(DataSnapshot {
168 reference: child_reference,
169 value,
170 })
171 }
172
173 pub fn has_child(&self, relative_path: &str) -> DatabaseResult<bool> {
176 let segments = normalize_path(relative_path)?;
177 Ok(get_value_at_path(&self.value, &segments)
178 .map(|value| !value.is_null())
179 .unwrap_or(false))
180 }
181
182 pub fn has_children(&self) -> bool {
185 match extract_data_ref(&self.value) {
186 Value::Object(map) => !map.is_empty(),
187 Value::Array(array) => !array.is_empty(),
188 _ => false,
189 }
190 }
191
192 pub fn size(&self) -> usize {
194 match extract_data_ref(&self.value) {
195 Value::Object(map) => map.len(),
196 Value::Array(array) => array.len(),
197 _ => 0,
198 }
199 }
200
201 pub fn to_json(&self) -> Value {
203 self.value.clone()
204 }
205}
206
207pub struct ListenerRegistration {
210 database: Database,
211 id: Option<u64>,
212}
213
214impl ListenerRegistration {
215 fn new(database: Database, id: u64) -> Self {
216 Self {
217 database,
218 id: Some(id),
219 }
220 }
221
222 pub fn detach(mut self) {
223 if let Some(id) = self.id.take() {
224 self.database.remove_listener(id);
225 }
226 }
227}
228
229impl Drop for ListenerRegistration {
230 fn drop(&mut self) {
231 if let Some(id) = self.id.take() {
232 self.database.remove_listener(id);
233 }
234 }
235}
236
237impl QueryConstraint {
238 fn new(kind: QueryConstraintKind) -> Self {
239 Self { kind }
240 }
241
242 fn apply(self, query: DatabaseQuery) -> DatabaseResult<DatabaseQuery> {
243 match self.kind {
244 QueryConstraintKind::OrderByChild(path) => query.order_by_child(&path),
245 QueryConstraintKind::OrderByKey => query.order_by_key(),
246 QueryConstraintKind::OrderByValue => query.order_by_value(),
247 QueryConstraintKind::OrderByPriority => query.order_by_priority(),
248 QueryConstraintKind::Start {
249 value,
250 name,
251 inclusive,
252 } => {
253 if inclusive {
254 query.start_at_with_key(value, name)
255 } else {
256 query.start_after_with_key(value, name)
257 }
258 }
259 QueryConstraintKind::End {
260 value,
261 name,
262 inclusive,
263 } => {
264 if inclusive {
265 query.end_at_with_key(value, name)
266 } else {
267 query.end_before_with_key(value, name)
268 }
269 }
270 QueryConstraintKind::LimitFirst(limit) => query.limit_to_first(limit),
271 QueryConstraintKind::LimitLast(limit) => query.limit_to_last(limit),
272 QueryConstraintKind::EqualTo { value, name } => query.equal_to_with_key(value, name),
273 }
274 }
275}
276
277pub fn query(
280 reference: DatabaseReference,
281 constraints: impl IntoIterator<Item = QueryConstraint>,
282) -> DatabaseResult<DatabaseQuery> {
283 let mut current = reference.query();
284 for constraint in constraints {
285 current = constraint.apply(current)?;
286 }
287 Ok(current)
288}
289
290pub fn order_by_child(path: impl Into<String>) -> QueryConstraint {
293 QueryConstraint::new(QueryConstraintKind::OrderByChild(path.into()))
294}
295
296pub fn order_by_key() -> QueryConstraint {
298 QueryConstraint::new(QueryConstraintKind::OrderByKey)
299}
300
301pub fn order_by_priority() -> QueryConstraint {
303 QueryConstraint::new(QueryConstraintKind::OrderByPriority)
304}
305
306pub fn order_by_value() -> QueryConstraint {
308 QueryConstraint::new(QueryConstraintKind::OrderByValue)
309}
310
311pub fn start_at<V>(value: V) -> QueryConstraint
313where
314 V: Into<Value>,
315{
316 QueryConstraint::new(QueryConstraintKind::Start {
317 value: value.into(),
318 name: None,
319 inclusive: true,
320 })
321}
322
323pub fn start_at_with_key<V, S>(value: V, name: S) -> QueryConstraint
325where
326 V: Into<Value>,
327 S: Into<String>,
328{
329 QueryConstraint::new(QueryConstraintKind::Start {
330 value: value.into(),
331 name: Some(name.into()),
332 inclusive: true,
333 })
334}
335
336pub fn start_after<V>(value: V) -> QueryConstraint
338where
339 V: Into<Value>,
340{
341 QueryConstraint::new(QueryConstraintKind::Start {
342 value: value.into(),
343 name: None,
344 inclusive: false,
345 })
346}
347
348pub fn start_after_with_key<V, S>(value: V, name: S) -> QueryConstraint
350where
351 V: Into<Value>,
352 S: Into<String>,
353{
354 QueryConstraint::new(QueryConstraintKind::Start {
355 value: value.into(),
356 name: Some(name.into()),
357 inclusive: false,
358 })
359}
360
361pub fn end_at<V>(value: V) -> QueryConstraint
363where
364 V: Into<Value>,
365{
366 QueryConstraint::new(QueryConstraintKind::End {
367 value: value.into(),
368 name: None,
369 inclusive: true,
370 })
371}
372
373pub fn end_at_with_key<V, S>(value: V, name: S) -> QueryConstraint
375where
376 V: Into<Value>,
377 S: Into<String>,
378{
379 QueryConstraint::new(QueryConstraintKind::End {
380 value: value.into(),
381 name: Some(name.into()),
382 inclusive: true,
383 })
384}
385
386pub fn end_before<V>(value: V) -> QueryConstraint
388where
389 V: Into<Value>,
390{
391 QueryConstraint::new(QueryConstraintKind::End {
392 value: value.into(),
393 name: None,
394 inclusive: false,
395 })
396}
397
398pub fn end_before_with_key<V, S>(value: V, name: S) -> QueryConstraint
400where
401 V: Into<Value>,
402 S: Into<String>,
403{
404 QueryConstraint::new(QueryConstraintKind::End {
405 value: value.into(),
406 name: Some(name.into()),
407 inclusive: false,
408 })
409}
410
411pub fn limit_to_first(limit: u32) -> QueryConstraint {
413 QueryConstraint::new(QueryConstraintKind::LimitFirst(limit))
414}
415
416pub fn limit_to_last(limit: u32) -> QueryConstraint {
418 QueryConstraint::new(QueryConstraintKind::LimitLast(limit))
419}
420
421pub fn equal_to<V>(value: V) -> QueryConstraint
423where
424 V: Into<Value>,
425{
426 QueryConstraint::new(QueryConstraintKind::EqualTo {
427 value: value.into(),
428 name: None,
429 })
430}
431
432pub fn equal_to_with_key<V, S>(value: V, name: S) -> QueryConstraint
434where
435 V: Into<Value>,
436 S: Into<String>,
437{
438 QueryConstraint::new(QueryConstraintKind::EqualTo {
439 value: value.into(),
440 name: Some(name.into()),
441 })
442}
443
444pub fn push(reference: &DatabaseReference) -> DatabaseResult<DatabaseReference> {
449 reference.push()
450}
451
452pub fn push_with_value<V>(
457 reference: &DatabaseReference,
458 value: V,
459) -> DatabaseResult<DatabaseReference>
460where
461 V: Into<Value>,
462{
463 reference.push_with_value(value)
464}
465
466pub fn on_child_added<F>(
468 reference: &DatabaseReference,
469 callback: F,
470) -> DatabaseResult<ListenerRegistration>
471where
472 F: Fn(DataSnapshot, Option<String>) + Send + Sync + 'static,
473{
474 reference.on_child_added(callback)
475}
476
477pub fn on_child_changed<F>(
479 reference: &DatabaseReference,
480 callback: F,
481) -> DatabaseResult<ListenerRegistration>
482where
483 F: Fn(DataSnapshot, Option<String>) + Send + Sync + 'static,
484{
485 reference.on_child_changed(callback)
486}
487
488pub fn on_child_removed<F>(
490 reference: &DatabaseReference,
491 callback: F,
492) -> DatabaseResult<ListenerRegistration>
493where
494 F: Fn(DataSnapshot, Option<String>) + Send + Sync + 'static,
495{
496 reference.on_child_removed(callback)
497}
498
499pub fn run_transaction<F>(reference: &DatabaseReference, update: F) -> DatabaseResult<()>
501where
502 F: Fn(Value) -> Value + Send + Sync + 'static,
503{
504 reference.run_transaction(update)
505}
506
507pub fn set_with_priority<V, P>(
510 reference: &DatabaseReference,
511 value: V,
512 priority: P,
513) -> DatabaseResult<()>
514where
515 V: Into<Value>,
516 P: Into<Value>,
517{
518 reference.set_with_priority(value, priority)
519}
520
521pub fn set_priority<P>(reference: &DatabaseReference, priority: P) -> DatabaseResult<()>
524where
525 P: Into<Value>,
526{
527 reference.set_priority(priority)
528}
529
530impl Database {
531 fn new(app: FirebaseApp) -> Self {
532 Self {
533 inner: Arc::new(DatabaseInner {
534 backend: select_backend(&app),
535 app,
536 listeners: Mutex::new(HashMap::new()),
537 next_listener_id: AtomicU64::new(1),
538 }),
539 }
540 }
541
542 pub fn app(&self) -> &FirebaseApp {
543 &self.inner.app
544 }
545
546 pub fn reference(&self, path: &str) -> DatabaseResult<DatabaseReference> {
547 let segments = normalize_path(path)?;
548 Ok(DatabaseReference {
549 database: self.clone(),
550 path: segments,
551 })
552 }
553
554 fn reference_from_segments(&self, segments: Vec<String>) -> DatabaseReference {
555 DatabaseReference {
556 database: self.clone(),
557 path: segments,
558 }
559 }
560
561 fn register_listener(
562 &self,
563 target: ListenerTarget,
564 kind: ListenerKind,
565 ) -> DatabaseResult<ListenerRegistration> {
566 let id = self.inner.next_listener_id.fetch_add(1, Ordering::SeqCst);
567
568 {
569 let mut listeners = self.inner.listeners.lock().unwrap();
570 listeners.insert(
571 id,
572 Listener {
573 target: target.clone(),
574 kind: kind.clone(),
575 },
576 );
577 }
578
579 let current_root = match self.root_snapshot() {
580 Ok(root) => root,
581 Err(err) => {
582 self.remove_listener(id);
583 return Err(err);
584 }
585 };
586 match kind {
587 ListenerKind::Value(callback) => {
588 let snapshot = self.snapshot_from_root(&target, ¤t_root)?;
589 callback(snapshot);
590 }
591 ListenerKind::Child { event, callback } => {
592 if let Err(err) =
593 self.fire_initial_child_events(&target, event, &callback, ¤t_root)
594 {
595 self.remove_listener(id);
596 return Err(err);
597 }
598 }
599 }
600
601 Ok(ListenerRegistration::new(self.clone(), id))
602 }
603
604 fn remove_listener(&self, id: u64) {
605 let mut listeners = self.inner.listeners.lock().unwrap();
606 listeners.remove(&id);
607 }
608
609 fn dispatch_listeners(
610 &self,
611 changed_path: &[String],
612 old_root: &Value,
613 new_root: &Value,
614 ) -> DatabaseResult<()> {
615 let listeners: Vec<Listener> = {
616 let listeners = self.inner.listeners.lock().unwrap();
617 listeners
618 .values()
619 .filter(|listener| listener.target.matches(changed_path))
620 .cloned()
621 .collect()
622 };
623
624 for listener in listeners {
625 match &listener.kind {
626 ListenerKind::Value(callback) => {
627 let snapshot = self.snapshot_from_root(&listener.target, new_root)?;
628 callback(snapshot);
629 }
630 ListenerKind::Child { event, callback } => {
631 self.invoke_child_listener(&listener, *event, callback, old_root, new_root)?;
632 }
633 }
634 }
635 Ok(())
636 }
637
638 fn root_snapshot(&self) -> DatabaseResult<Value> {
639 self.inner.backend.get(&[], &[])
640 }
641
642 fn snapshot_from_root(
643 &self,
644 target: &ListenerTarget,
645 root: &Value,
646 ) -> DatabaseResult<DataSnapshot> {
647 match target {
648 ListenerTarget::Reference(path) => {
649 let value = value_at_path(root, path);
650 let reference = self.reference_from_segments(path.clone());
651 Ok(DataSnapshot { reference, value })
652 }
653 ListenerTarget::Query { .. } => self.snapshot_for_target(target),
654 }
655 }
656
657 fn fire_initial_child_events(
658 &self,
659 target: &ListenerTarget,
660 event: ChildEventType,
661 callback: &ChildListenerCallback,
662 root: &Value,
663 ) -> DatabaseResult<()> {
664 if event != ChildEventType::Added {
665 return Ok(());
666 }
667
668 if let ListenerTarget::Reference(path) = target {
669 let new_value = value_at_path(root, path);
670 let empty = Value::Null;
671 self.emit_child_events(path, event, callback, &empty, &new_value)?;
672 }
673 Ok(())
674 }
675
676 fn invoke_child_listener(
677 &self,
678 listener: &Listener,
679 event: ChildEventType,
680 callback: &ChildListenerCallback,
681 old_root: &Value,
682 new_root: &Value,
683 ) -> DatabaseResult<()> {
684 let ListenerTarget::Reference(path) = &listener.target else {
685 return Ok(());
686 };
687 let old_value = value_at_path(old_root, path);
688 let new_value = value_at_path(new_root, path);
689 self.emit_child_events(path, event, callback, &old_value, &new_value)
690 }
691
692 fn emit_child_events(
693 &self,
694 parent_path: &[String],
695 event: ChildEventType,
696 callback: &ChildListenerCallback,
697 old_value: &Value,
698 new_value: &Value,
699 ) -> DatabaseResult<()> {
700 let old_children = children_map(old_value);
701 let new_children = children_map(new_value);
702
703 match event {
704 ChildEventType::Added => {
705 let new_keys: Vec<String> = new_children.keys().cloned().collect();
706 for key in new_keys.iter() {
707 if !old_children.contains_key(key) {
708 let value = new_children.get(key).cloned().unwrap_or(Value::Null);
709 let prev_name = previous_key(&new_keys, key);
710 let snapshot = self.child_snapshot(parent_path, key, value.clone());
711 callback(snapshot, prev_name);
712 }
713 }
714 }
715 ChildEventType::Changed => {
716 let new_keys: Vec<String> = new_children.keys().cloned().collect();
717 for key in new_keys.iter() {
718 if let Some(old_value_child) = old_children.get(key) {
719 let new_child = new_children.get(key).expect("child exists in map");
720 if old_value_child != new_child {
721 let value = new_child.clone();
722 let prev_name = previous_key(&new_keys, key);
723 let snapshot = self.child_snapshot(parent_path, key, value);
724 callback(snapshot, prev_name);
725 }
726 }
727 }
728 }
729 ChildEventType::Removed => {
730 let old_keys: Vec<String> = old_children.keys().cloned().collect();
731 for key in old_keys.iter() {
732 if !new_children.contains_key(key) {
733 let value = old_children.get(key).cloned().unwrap_or(Value::Null);
734 let prev_name = previous_key(&old_keys, key);
735 let snapshot = self.child_snapshot(parent_path, key, value);
736 callback(snapshot, prev_name);
737 }
738 }
739 }
740 }
741 Ok(())
742 }
743
744 fn child_snapshot(
745 &self,
746 parent_path: &[String],
747 child_key: &str,
748 value: Value,
749 ) -> DataSnapshot {
750 let mut segments = parent_path.to_vec();
751 segments.push(child_key.to_string());
752 let reference = self.reference_from_segments(segments);
753 DataSnapshot { reference, value }
754 }
755
756 fn snapshot_for_target(&self, target: &ListenerTarget) -> DatabaseResult<DataSnapshot> {
757 match target {
758 ListenerTarget::Reference(path) => {
759 let value = self.inner.backend.get(path, &[])?;
760 let reference = self.reference_from_segments(path.clone());
761 Ok(DataSnapshot { reference, value })
762 }
763 ListenerTarget::Query { path, params } => {
764 let rest_params = params.to_rest_params()?;
765 let value = self.inner.backend.get(path, rest_params.as_slice())?;
766 let reference = self.reference_from_segments(path.clone());
767 Ok(DataSnapshot { reference, value })
768 }
769 }
770 }
771}
772
773impl DatabaseReference {
774 pub fn child(&self, relative: &str) -> DatabaseResult<DatabaseReference> {
775 let mut segments = self.path.clone();
776 segments.extend(normalize_path(relative)?);
777 Ok(DatabaseReference {
778 database: self.database.clone(),
779 path: segments,
780 })
781 }
782
783 pub fn parent(&self) -> Option<DatabaseReference> {
785 if self.path.is_empty() {
786 None
787 } else {
788 let mut parent = self.path.clone();
789 parent.pop();
790 Some(DatabaseReference {
791 database: self.database.clone(),
792 path: parent,
793 })
794 }
795 }
796
797 pub fn root(&self) -> DatabaseReference {
799 DatabaseReference {
800 database: self.database.clone(),
801 path: Vec::new(),
802 }
803 }
804
805 pub fn set(&self, value: Value) -> DatabaseResult<()> {
806 let value = self.resolve_value_for_path(&self.path, value)?;
807 let old_root = self.database.root_snapshot()?;
808 self.database.inner.backend.set(&self.path, value)?;
809 let new_root = self.database.root_snapshot()?;
810 self.database
811 .dispatch_listeners(&self.path, &old_root, &new_root)?;
812 Ok(())
813 }
814
815 pub fn query(&self) -> DatabaseQuery {
817 DatabaseQuery {
818 reference: self.clone(),
819 params: QueryParams::default(),
820 }
821 }
822
823 pub fn order_by_child(&self, path: &str) -> DatabaseResult<DatabaseQuery> {
825 self.query().order_by_child(path)
826 }
827
828 pub fn order_by_key(&self) -> DatabaseResult<DatabaseQuery> {
830 self.query().order_by_key()
831 }
832
833 pub fn order_by_value(&self) -> DatabaseResult<DatabaseQuery> {
835 self.query().order_by_value()
836 }
837
838 pub fn order_by_priority(&self) -> DatabaseResult<DatabaseQuery> {
840 self.query().order_by_priority()
841 }
842
843 pub fn on_value<F>(&self, callback: F) -> DatabaseResult<ListenerRegistration>
845 where
846 F: Fn(DataSnapshot) + Send + Sync + 'static,
847 {
848 let user_fn: ValueListenerCallback = Arc::new(callback);
849 self.database.register_listener(
850 ListenerTarget::Reference(self.path.clone()),
851 ListenerKind::Value(user_fn),
852 )
853 }
854
855 pub fn on_child_added<F>(&self, callback: F) -> DatabaseResult<ListenerRegistration>
857 where
858 F: Fn(DataSnapshot, Option<String>) + Send + Sync + 'static,
859 {
860 let cb: ChildListenerCallback = Arc::new(callback);
861 self.database.register_listener(
862 ListenerTarget::Reference(self.path.clone()),
863 ListenerKind::Child {
864 event: ChildEventType::Added,
865 callback: cb,
866 },
867 )
868 }
869
870 pub fn on_child_changed<F>(&self, callback: F) -> DatabaseResult<ListenerRegistration>
872 where
873 F: Fn(DataSnapshot, Option<String>) + Send + Sync + 'static,
874 {
875 let cb: ChildListenerCallback = Arc::new(callback);
876 self.database.register_listener(
877 ListenerTarget::Reference(self.path.clone()),
878 ListenerKind::Child {
879 event: ChildEventType::Changed,
880 callback: cb,
881 },
882 )
883 }
884
885 pub fn on_child_removed<F>(&self, callback: F) -> DatabaseResult<ListenerRegistration>
887 where
888 F: Fn(DataSnapshot, Option<String>) + Send + Sync + 'static,
889 {
890 let cb: ChildListenerCallback = Arc::new(callback);
891 self.database.register_listener(
892 ListenerTarget::Reference(self.path.clone()),
893 ListenerKind::Child {
894 event: ChildEventType::Removed,
895 callback: cb,
896 },
897 )
898 }
899
900 pub fn on_disconnect(&self) -> OnDisconnect {
902 OnDisconnect::new(self.clone())
903 }
904
905 pub fn run_transaction<F>(&self, _update: F) -> DatabaseResult<()>
907 where
908 F: Fn(Value) -> Value + Send + Sync + 'static,
909 {
910 Err(internal_error(
911 "Transactions require realtime transport and are not yet implemented",
912 ))
913 }
914
915 pub fn update(&self, updates: serde_json::Map<String, Value>) -> DatabaseResult<()> {
921 if updates.is_empty() {
922 return Ok(());
923 }
924
925 let mut operations = Vec::with_capacity(updates.len());
926 for (key, value) in updates {
927 if key.trim().is_empty() {
928 return Err(invalid_argument("Database update path cannot be empty"));
929 }
930 let mut segments = self.path.clone();
931 let relative = normalize_path(&key)?;
932 if relative.is_empty() {
933 return Err(invalid_argument(
934 "Database update path cannot reference the current location",
935 ));
936 }
937 segments.extend(relative);
938 let resolved = self.resolve_value_for_path(&segments, value)?;
939 operations.push((segments, resolved));
940 }
941
942 let old_root = self.database.root_snapshot()?;
943 self.database.inner.backend.update(&self.path, operations)?;
944 let new_root = self.database.root_snapshot()?;
945 self.database
946 .dispatch_listeners(&self.path, &old_root, &new_root)?;
947 Ok(())
948 }
949
950 pub fn get(&self) -> DatabaseResult<Value> {
951 self.database.inner.backend.get(&self.path, &[])
952 }
953
954 pub fn remove(&self) -> DatabaseResult<()> {
956 let old_root = self.database.root_snapshot()?;
957 self.database.inner.backend.delete(&self.path)?;
958 let new_root = self.database.root_snapshot()?;
959 self.database
960 .dispatch_listeners(&self.path, &old_root, &new_root)?;
961 Ok(())
962 }
963
964 pub fn set_with_priority<V, P>(&self, value: V, priority: P) -> DatabaseResult<()>
967 where
968 V: Into<Value>,
969 P: Into<Value>,
970 {
971 let priority = priority.into();
972 validate_priority_value(&priority)?;
973 if matches!(self.key(), Some(".length" | ".keys")) {
974 return Err(invalid_argument(
975 "set_with_priority failed: read-only child key",
976 ));
977 }
978
979 let value = self.resolve_value_for_path(&self.path, value.into())?;
980 let payload = pack_with_priority(value, priority);
981 let old_root = self.database.root_snapshot()?;
982 self.database.inner.backend.set(&self.path, payload)?;
983 let new_root = self.database.root_snapshot()?;
984 self.database
985 .dispatch_listeners(&self.path, &old_root, &new_root)?;
986 Ok(())
987 }
988
989 pub fn set_priority<P>(&self, priority: P) -> DatabaseResult<()>
991 where
992 P: Into<Value>,
993 {
994 let priority = priority.into();
995 validate_priority_value(&priority)?;
996
997 let current = self.database.inner.backend.get(&self.path, &[])?;
998 let value = extract_data_owned(¤t);
999 let payload = pack_with_priority(value, priority);
1000 let old_root = self.database.root_snapshot()?;
1001 self.database.inner.backend.set(&self.path, payload)?;
1002 let new_root = self.database.root_snapshot()?;
1003 self.database
1004 .dispatch_listeners(&self.path, &old_root, &new_root)?;
1005 Ok(())
1006 }
1007
1008 pub fn push(&self) -> DatabaseResult<DatabaseReference> {
1023 self.push_internal(None)
1024 }
1025
1026 pub fn push_with_value<V>(&self, value: V) -> DatabaseResult<DatabaseReference>
1030 where
1031 V: Into<Value>,
1032 {
1033 self.push_internal(Some(value.into()))
1034 }
1035
1036 fn resolve_value_for_path(&self, path: &[String], value: Value) -> DatabaseResult<Value> {
1037 if contains_server_value(&value) {
1038 let current = self.database.inner.backend.get(path, &[])?;
1039 let current_ref = extract_data_ref(¤t);
1040 resolve_server_values(value, Some(current_ref))
1041 } else {
1042 Ok(value)
1043 }
1044 }
1045
1046 fn push_internal(&self, value: Option<Value>) -> DatabaseResult<DatabaseReference> {
1047 let timestamp = current_time_millis()?;
1048 let key = next_push_id(timestamp);
1049 let child = self.child(&key)?;
1050 if let Some(value) = value {
1051 child.set(value)?;
1052 }
1053 Ok(child)
1054 }
1055
1056 pub fn key(&self) -> Option<&str> {
1059 self.path.last().map(|segment| segment.as_str())
1060 }
1061
1062 pub fn path(&self) -> String {
1063 if self.path.is_empty() {
1064 "/".to_string()
1065 } else {
1066 format!("/{}/", self.path.join("/"))
1067 }
1068 }
1069}
1070
1071impl DatabaseQuery {
1072 pub fn reference(&self) -> &DatabaseReference {
1074 &self.reference
1075 }
1076
1077 pub fn order_by_child(mut self, path: &str) -> DatabaseResult<Self> {
1079 validate_order_by_child_target(path)?;
1080 let segments = normalize_path(path)?;
1081 if segments.is_empty() {
1082 return Err(invalid_argument("orderByChild path cannot be empty"));
1083 }
1084 let joined = segments.join("/");
1085 self.params.set_index(QueryIndex::Child(joined))?;
1086 Ok(self)
1087 }
1088
1089 pub fn order_by_key(mut self) -> DatabaseResult<Self> {
1091 self.params.set_index(QueryIndex::Key)?;
1092 Ok(self)
1093 }
1094
1095 pub fn order_by_value(mut self) -> DatabaseResult<Self> {
1097 self.params.set_index(QueryIndex::Value)?;
1098 Ok(self)
1099 }
1100
1101 pub fn order_by_priority(mut self) -> DatabaseResult<Self> {
1103 self.params.set_index(QueryIndex::Priority)?;
1104 Ok(self)
1105 }
1106
1107 pub fn start_at(self, value: Value) -> DatabaseResult<Self> {
1109 self.start_at_with_key(value, None)
1110 }
1111
1112 pub fn start_at_with_key(mut self, value: Value, name: Option<String>) -> DatabaseResult<Self> {
1114 let bound = QueryBound {
1115 value,
1116 name,
1117 inclusive: true,
1118 };
1119 self.params.set_start(bound)?;
1120 Ok(self)
1121 }
1122
1123 pub fn start_after(self, value: Value) -> DatabaseResult<Self> {
1125 self.start_after_with_key(value, None)
1126 }
1127
1128 pub fn start_after_with_key(
1130 mut self,
1131 value: Value,
1132 name: Option<String>,
1133 ) -> DatabaseResult<Self> {
1134 let bound = QueryBound {
1135 value,
1136 name,
1137 inclusive: false,
1138 };
1139 self.params.set_start(bound)?;
1140 Ok(self)
1141 }
1142
1143 pub fn end_at(self, value: Value) -> DatabaseResult<Self> {
1145 self.end_at_with_key(value, None)
1146 }
1147
1148 pub fn end_at_with_key(mut self, value: Value, name: Option<String>) -> DatabaseResult<Self> {
1150 let bound = QueryBound {
1151 value,
1152 name,
1153 inclusive: true,
1154 };
1155 self.params.set_end(bound)?;
1156 Ok(self)
1157 }
1158
1159 pub fn end_before(self, value: Value) -> DatabaseResult<Self> {
1161 self.end_before_with_key(value, None)
1162 }
1163
1164 pub fn end_before_with_key(
1166 mut self,
1167 value: Value,
1168 name: Option<String>,
1169 ) -> DatabaseResult<Self> {
1170 let bound = QueryBound {
1171 value,
1172 name,
1173 inclusive: false,
1174 };
1175 self.params.set_end(bound)?;
1176 Ok(self)
1177 }
1178
1179 pub fn limit_to_first(mut self, limit: u32) -> DatabaseResult<Self> {
1181 if limit == 0 {
1182 return Err(invalid_argument("limitToFirst must be greater than zero"));
1183 }
1184 self.params.set_limit(QueryLimit::First(limit))?;
1185 Ok(self)
1186 }
1187
1188 pub fn limit_to_last(mut self, limit: u32) -> DatabaseResult<Self> {
1190 if limit == 0 {
1191 return Err(invalid_argument("limitToLast must be greater than zero"));
1192 }
1193 self.params.set_limit(QueryLimit::Last(limit))?;
1194 Ok(self)
1195 }
1196
1197 pub fn equal_to(self, value: Value) -> DatabaseResult<Self> {
1199 self.equal_to_with_key(value, None)
1200 }
1201
1202 pub fn equal_to_with_key(mut self, value: Value, name: Option<String>) -> DatabaseResult<Self> {
1204 let start_bound = QueryBound {
1205 value: value.clone(),
1206 name: name.clone(),
1207 inclusive: true,
1208 };
1209 let end_bound = QueryBound {
1210 value,
1211 name,
1212 inclusive: true,
1213 };
1214 self.params.set_start(start_bound)?;
1215 self.params.set_end(end_bound)?;
1216 Ok(self)
1217 }
1218
1219 pub fn get(&self) -> DatabaseResult<Value> {
1221 let params = self.params.to_rest_params()?;
1222 self.reference
1223 .database
1224 .inner
1225 .backend
1226 .get(&self.reference.path, params.as_slice())
1227 }
1228
1229 pub fn on_value<F>(&self, callback: F) -> DatabaseResult<ListenerRegistration>
1231 where
1232 F: Fn(DataSnapshot) + Send + Sync + 'static,
1233 {
1234 let user_fn: ValueListenerCallback = Arc::new(callback);
1235 self.reference.database.register_listener(
1236 ListenerTarget::Query {
1237 path: self.reference.path.clone(),
1238 params: self.params.clone(),
1239 },
1240 ListenerKind::Value(user_fn),
1241 )
1242 }
1243}
1244
1245fn normalize_path(path: &str) -> DatabaseResult<Vec<String>> {
1246 let trimmed = path.trim_matches('/');
1247 if trimmed.is_empty() {
1248 return Ok(Vec::new());
1249 }
1250 let mut segments = Vec::new();
1251 for segment in trimmed.split('/') {
1252 if segment.is_empty() {
1253 return Err(invalid_argument(
1254 "Database path cannot contain empty segments",
1255 ));
1256 }
1257 segments.push(segment.to_string());
1258 }
1259 Ok(segments)
1260}
1261
1262fn validate_order_by_child_target(path: &str) -> DatabaseResult<()> {
1263 match path {
1264 "$key" => Err(invalid_argument(
1265 "order_by_child(\"$key\") is invalid; call order_by_key() instead",
1266 )),
1267 "$priority" => Err(invalid_argument(
1268 "order_by_child(\"$priority\") is invalid; call order_by_priority() instead",
1269 )),
1270 "$value" => Err(invalid_argument(
1271 "order_by_child(\"$value\") is invalid; call order_by_value() instead",
1272 )),
1273 _ => Ok(()),
1274 }
1275}
1276
1277fn paths_related(a: &[String], b: &[String]) -> bool {
1278 is_prefix(a, b) || is_prefix(b, a)
1279}
1280
1281fn is_prefix(prefix: &[String], path: &[String]) -> bool {
1282 if prefix.len() > path.len() {
1283 return false;
1284 }
1285 prefix
1286 .iter()
1287 .zip(path.iter())
1288 .all(|(left, right)| left == right)
1289}
1290
1291fn validate_priority_value(priority: &Value) -> DatabaseResult<()> {
1292 match priority {
1293 Value::Null | Value::Number(_) | Value::String(_) => Ok(()),
1294 _ => Err(invalid_argument(
1295 "Priority must be a string, number, or null",
1296 )),
1297 }
1298}
1299
1300fn pack_with_priority(value: Value, priority: Value) -> Value {
1301 let mut map = Map::with_capacity(2);
1302 map.insert(".value".to_string(), value);
1303 map.insert(".priority".to_string(), priority);
1304 Value::Object(map)
1305}
1306
1307fn extract_data_ref<'a>(value: &'a Value) -> &'a Value {
1308 value
1309 .as_object()
1310 .and_then(|obj| obj.get(".value"))
1311 .unwrap_or(value)
1312}
1313
1314fn extract_data_owned(value: &Value) -> Value {
1315 extract_data_ref(value).clone()
1316}
1317
1318fn contains_server_value(value: &Value) -> bool {
1319 match value {
1320 Value::Object(map) => {
1321 if map.contains_key(".sv") {
1322 return true;
1323 }
1324 map.values().any(contains_server_value)
1325 }
1326 Value::Array(items) => items.iter().any(contains_server_value),
1327 _ => false,
1328 }
1329}
1330
1331fn resolve_server_values(value: Value, current: Option<&Value>) -> DatabaseResult<Value> {
1332 match value {
1333 Value::Object(mut map) => {
1334 if let Some(spec) = map.remove(".sv") {
1335 return resolve_server_placeholder(spec, current.map(extract_data_ref));
1336 }
1337 let mut resolved = Map::with_capacity(map.len());
1338 for (key, child) in map.into_iter() {
1339 let child_current = current
1340 .and_then(|curr| match curr {
1341 Value::Object(obj) => obj.get(&key),
1342 Value::Array(arr) => key.parse::<usize>().ok().and_then(|idx| arr.get(idx)),
1343 _ => None,
1344 })
1345 .map(extract_data_ref);
1346 let child_resolved = resolve_server_values(child, child_current)?;
1347 resolved.insert(key, child_resolved);
1348 }
1349 Ok(Value::Object(resolved))
1350 }
1351 Value::Array(items) => {
1352 let mut resolved = Vec::with_capacity(items.len());
1353 for (index, child) in items.into_iter().enumerate() {
1354 let child_current = current
1355 .and_then(|curr| match curr {
1356 Value::Array(arr) => arr.get(index),
1357 _ => None,
1358 })
1359 .map(extract_data_ref);
1360 resolved.push(resolve_server_values(child, child_current)?);
1361 }
1362 Ok(Value::Array(resolved))
1363 }
1364 other => Ok(other),
1365 }
1366}
1367
1368fn resolve_server_placeholder(spec: Value, current: Option<&Value>) -> DatabaseResult<Value> {
1369 match spec {
1370 Value::String(token) if token == "timestamp" => {
1371 let millis = current_time_millis()?;
1372 Ok(Value::Number(Number::from(millis)))
1373 }
1374 Value::Object(mut map) => {
1375 if let Some(delta) = map.remove("increment") {
1376 let delta = delta.as_f64().ok_or_else(|| {
1377 invalid_argument("ServerValue.increment delta must be numeric")
1378 })?;
1379 let base = current
1380 .and_then(|value| match value {
1381 Value::Number(number) => number.as_f64(),
1382 _ => None,
1383 })
1384 .unwrap_or(0.0);
1385 let total = base + delta;
1386 let number = Number::from_f64(total).ok_or_else(|| {
1387 invalid_argument("ServerValue.increment produced an invalid number")
1388 })?;
1389 Ok(Value::Number(number))
1390 } else {
1391 Err(invalid_argument("Unsupported server value placeholder"))
1392 }
1393 }
1394 _ => Err(invalid_argument("Unsupported server value placeholder")),
1395 }
1396}
1397
1398fn value_at_path(root: &Value, path: &[String]) -> Value {
1399 if path.is_empty() {
1400 return extract_data_ref(root).clone();
1401 }
1402 get_value_at_path(root, path).unwrap_or(Value::Null)
1403}
1404
1405fn children_map(value: &Value) -> BTreeMap<String, Value> {
1406 let mut map = BTreeMap::new();
1407 match extract_data_ref(value) {
1408 Value::Object(obj) => {
1409 for (key, child) in obj.iter() {
1410 map.insert(key.clone(), child.clone());
1411 }
1412 }
1413 Value::Array(array) => {
1414 for (index, child) in array.iter().enumerate() {
1415 map.insert(index.to_string(), child.clone());
1416 }
1417 }
1418 _ => {}
1419 }
1420 map
1421}
1422
1423fn previous_key(keys: &[String], key: &str) -> Option<String> {
1424 let mut previous: Option<String> = None;
1425 for current in keys {
1426 if current == key {
1427 return previous;
1428 }
1429 previous = Some(current.clone());
1430 }
1431 None
1432}
1433
1434fn get_value_at_path(root: &Value, segments: &[String]) -> Option<Value> {
1435 if segments.is_empty() {
1436 return Some(extract_data_ref(root).clone());
1437 }
1438
1439 fn traverse<'a>(current: &'a Value, segments: &[String]) -> Option<&'a Value> {
1440 if segments.is_empty() {
1441 return Some(current);
1442 }
1443
1444 let data = extract_data_ref(current);
1445 let (first, rest) = segments.split_first().unwrap();
1446
1447 match data {
1448 Value::Object(map) => map.get(first).and_then(|child| traverse(child, rest)),
1449 Value::Array(array) => {
1450 let index = first.parse::<usize>().ok()?;
1451 array.get(index).and_then(|child| traverse(child, rest))
1452 }
1453 _ => None,
1454 }
1455 }
1456
1457 traverse(root, segments).map(|value| extract_data_ref(value).clone())
1458}
1459
1460fn current_time_millis() -> DatabaseResult<u64> {
1461 let duration = SystemTime::now()
1462 .duration_since(UNIX_EPOCH)
1463 .map_err(|_| internal_error("System time is before the Unix epoch"))?;
1464 let millis = duration.as_millis();
1465 millis
1466 .try_into()
1467 .map_err(|_| internal_error("Timestamp exceeds 64-bit range"))
1468}
1469
1470static DATABASE_COMPONENT: LazyLock<()> = LazyLock::new(|| {
1471 let component = Component::new(
1472 DATABASE_COMPONENT_NAME,
1473 Arc::new(database_factory),
1474 ComponentType::Public,
1475 )
1476 .with_instantiation_mode(InstantiationMode::Lazy);
1477 let _ = app::registry::register_component(component);
1478});
1479
1480fn database_factory(
1481 container: &crate::component::ComponentContainer,
1482 _options: InstanceFactoryOptions,
1483) -> Result<DynService, ComponentError> {
1484 let app = container.root_service::<FirebaseApp>().ok_or_else(|| {
1485 ComponentError::InitializationFailed {
1486 name: DATABASE_COMPONENT_NAME.to_string(),
1487 reason: "Firebase app not attached to component container".to_string(),
1488 }
1489 })?;
1490
1491 let database = Database::new((*app).clone());
1492 Ok(Arc::new(database) as DynService)
1493}
1494
1495fn ensure_registered() {
1496 LazyLock::force(&DATABASE_COMPONENT);
1497}
1498
1499pub fn register_database_component() {
1500 ensure_registered();
1501}
1502
1503pub fn get_database(app: Option<FirebaseApp>) -> DatabaseResult<Arc<Database>> {
1504 ensure_registered();
1505 let app = match app {
1506 Some(app) => app,
1507 None => crate::app::api::get_app(None).map_err(|err| internal_error(err.to_string()))?,
1508 };
1509
1510 let provider = app::registry::get_provider(&app, DATABASE_COMPONENT_NAME);
1511 if let Some(database) = provider.get_immediate::<Database>() {
1512 return Ok(database);
1513 }
1514
1515 match provider.initialize::<Database>(Value::Null, None) {
1516 Ok(service) => Ok(service),
1517 Err(crate::component::types::ComponentError::InstanceUnavailable { .. }) => provider
1518 .get_immediate::<Database>()
1519 .ok_or_else(|| internal_error("Database component not available")),
1520 Err(err) => Err(internal_error(err.to_string())),
1521 }
1522}
1523
1524#[cfg(test)]
1525mod tests {
1526 use super::*;
1527 use crate::app::api::initialize_app;
1528 use crate::app::{FirebaseAppSettings, FirebaseOptions};
1529 use crate::database::{
1530 equal_to_with_key, increment, limit_to_first, limit_to_last, order_by_child, order_by_key,
1531 query as compose_query, server_timestamp, start_at,
1532 };
1533 use httpmock::prelude::*;
1534 use httpmock::Method::{DELETE, GET, PATCH, PUT};
1535 use serde_json::{json, Value};
1536 use std::sync::{Arc, Mutex};
1537
1538 fn unique_settings() -> FirebaseAppSettings {
1539 use std::sync::atomic::{AtomicUsize, Ordering};
1540 static COUNTER: AtomicUsize = AtomicUsize::new(0);
1541 FirebaseAppSettings {
1542 name: Some(format!(
1543 "database-{}",
1544 COUNTER.fetch_add(1, Ordering::SeqCst)
1545 )),
1546 ..Default::default()
1547 }
1548 }
1549
1550 #[test]
1551 fn set_and_get_value() {
1552 let options = FirebaseOptions {
1553 project_id: Some("project".into()),
1554 ..Default::default()
1555 };
1556 let app = initialize_app(options, Some(unique_settings())).unwrap();
1557 let database = get_database(Some(app)).unwrap();
1558 let ref_root = database.reference("/messages").unwrap();
1559 ref_root.set(json!({ "greeting": "hello" })).expect("set");
1560 let value = ref_root.get().unwrap();
1561 assert_eq!(value, json!({ "greeting": "hello" }));
1562 }
1563
1564 #[test]
1565 fn push_generates_monotonic_keys() {
1566 let options = FirebaseOptions {
1567 project_id: Some("project".into()),
1568 ..Default::default()
1569 };
1570 let app = initialize_app(options, Some(unique_settings())).unwrap();
1571 let database = get_database(Some(app)).unwrap();
1572 let queue = database.reference("queue").unwrap();
1573
1574 let keys: Vec<String> = (0..5)
1575 .map(|_| queue.push().unwrap().key().unwrap().to_string())
1576 .collect();
1577
1578 let mut sorted = keys.clone();
1579 sorted.sort();
1580 assert_eq!(keys, sorted);
1581 }
1582
1583 #[test]
1584 fn push_with_value_persists_data() {
1585 let options = FirebaseOptions {
1586 project_id: Some("project".into()),
1587 ..Default::default()
1588 };
1589 let app = initialize_app(options, Some(unique_settings())).unwrap();
1590 let database = get_database(Some(app)).unwrap();
1591 let messages = database.reference("messages").unwrap();
1592
1593 let payload = json!({ "text": "hello" });
1594 let child = messages
1595 .push_with_value(payload.clone())
1596 .expect("push with value");
1597
1598 let stored = child.get().unwrap();
1599 assert_eq!(stored, payload);
1600
1601 let parent = messages.get().unwrap();
1602 let key = child.key().unwrap();
1603 assert_eq!(parent.get(key), Some(&payload));
1604 }
1605
1606 #[test]
1607 fn child_updates_merge() {
1608 let options = FirebaseOptions {
1609 project_id: Some("project".into()),
1610 ..Default::default()
1611 };
1612 let app = initialize_app(options, Some(unique_settings())).unwrap();
1613 let database = get_database(Some(app)).unwrap();
1614 let root = database.reference("items").unwrap();
1615 root.set(json!({ "a": { "count": 1 } })).unwrap();
1616 root.child("a/count").unwrap().set(json!(2)).unwrap();
1617 let value = root.get().unwrap();
1618 assert_eq!(value, json!({ "a": { "count": 2 } }));
1619 }
1620
1621 #[test]
1622 fn set_with_priority_wraps_value() {
1623 let options = FirebaseOptions {
1624 project_id: Some("project".into()),
1625 ..Default::default()
1626 };
1627 let app = initialize_app(options, Some(unique_settings())).unwrap();
1628 let database = get_database(Some(app)).unwrap();
1629 let item = database.reference("items/main").unwrap();
1630
1631 item.set_with_priority(json!({ "count": 1 }), json!(5))
1632 .unwrap();
1633
1634 let stored = item.get().unwrap();
1635 assert_eq!(
1636 stored,
1637 json!({
1638 ".value": { "count": 1 },
1639 ".priority": 5
1640 })
1641 );
1642 }
1643
1644 #[test]
1645 fn set_priority_updates_existing_value() {
1646 let options = FirebaseOptions {
1647 project_id: Some("project".into()),
1648 ..Default::default()
1649 };
1650 let app = initialize_app(options, Some(unique_settings())).unwrap();
1651 let database = get_database(Some(app)).unwrap();
1652 let item = database.reference("items/main").unwrap();
1653
1654 item.set(json!({ "count": 4 })).unwrap();
1655 item.set_priority(json!(10)).unwrap();
1656
1657 let stored = item.get().unwrap();
1658 assert_eq!(
1659 stored,
1660 json!({
1661 ".value": { "count": 4 },
1662 ".priority": 10
1663 })
1664 );
1665 }
1666
1667 #[test]
1668 fn server_timestamp_is_resolved_on_set() {
1669 let options = FirebaseOptions {
1670 project_id: Some("project".into()),
1671 ..Default::default()
1672 };
1673 let app = initialize_app(options, Some(unique_settings())).unwrap();
1674 let database = get_database(Some(app)).unwrap();
1675 let created_at = database.reference("meta/created_at").unwrap();
1676
1677 created_at.set(server_timestamp()).unwrap();
1678
1679 let value = created_at.get().unwrap();
1680 let ts = value.as_u64().expect("timestamp as u64");
1681 let now = SystemTime::now()
1682 .duration_since(std::time::UNIX_EPOCH)
1683 .unwrap()
1684 .as_millis() as u64;
1685 assert!(now >= ts);
1686 assert!(now - ts < 5_000);
1687 }
1688
1689 #[test]
1690 fn server_increment_updates_value() {
1691 let options = FirebaseOptions {
1692 project_id: Some("project".into()),
1693 ..Default::default()
1694 };
1695 let app = initialize_app(options, Some(unique_settings())).unwrap();
1696 let database = get_database(Some(app)).unwrap();
1697 let counter = database.reference("counters/main").unwrap();
1698
1699 counter.set(json!(1)).unwrap();
1700 counter.set(increment(2.0)).unwrap();
1701
1702 let value = counter.get().unwrap();
1703 assert_eq!(value.as_f64().unwrap(), 3.0);
1704 }
1705
1706 #[test]
1707 fn update_supports_server_increment() {
1708 let options = FirebaseOptions {
1709 project_id: Some("project".into()),
1710 ..Default::default()
1711 };
1712 let app = initialize_app(options, Some(unique_settings())).unwrap();
1713 let database = get_database(Some(app)).unwrap();
1714 let scores = database.reference("scores").unwrap();
1715
1716 scores.set(json!({ "alice": 4 })).unwrap();
1717 let mut delta = serde_json::Map::new();
1718 delta.insert("alice".to_string(), increment(3.0));
1719 scores.update(delta).unwrap();
1720
1721 let stored = scores.get().unwrap();
1722 assert_eq!(stored.get("alice").unwrap().as_f64().unwrap(), 7.0);
1723 }
1724
1725 #[test]
1726 fn update_rejects_empty_key() {
1727 let options = FirebaseOptions {
1728 project_id: Some("project".into()),
1729 ..Default::default()
1730 };
1731 let app = initialize_app(options, Some(unique_settings())).unwrap();
1732 let database = get_database(Some(app)).unwrap();
1733 let reference = database.reference("items").unwrap();
1734
1735 let mut updates = serde_json::Map::new();
1736 updates.insert("".to_string(), json!(1));
1737
1738 let err = reference.update(updates).unwrap_err();
1739 assert_eq!(err.code_str(), "database/invalid-argument");
1740 }
1741
1742 #[test]
1743 fn rest_backend_performs_http_requests() {
1744 let server = MockServer::start();
1745
1746 let set_mock = server.mock(|when, then| {
1747 when.method(PUT)
1748 .path("/messages.json")
1749 .query_param("print", "silent")
1750 .json_body(json!({ "greeting": "hello" }));
1751 then.status(200)
1752 .header("content-type", "application/json")
1753 .body("null");
1754 });
1755
1756 let get_mock = server.mock(|when, then| {
1757 when.method(GET)
1758 .path("/messages.json")
1759 .query_param("format", "export");
1760 then.status(200)
1761 .header("content-type", "application/json")
1762 .body(r#"{"greeting":"hello"}"#);
1763 });
1764
1765 let options = FirebaseOptions {
1766 project_id: Some("project".into()),
1767 database_url: Some(server.url("/")),
1768 ..Default::default()
1769 };
1770 let app = initialize_app(options, Some(unique_settings())).unwrap();
1771 let database = get_database(Some(app)).unwrap();
1772 let reference = database.reference("/messages").unwrap();
1773
1774 reference
1775 .set(json!({ "greeting": "hello" }))
1776 .expect("set over REST");
1777 let value = reference.get().expect("get over REST");
1778
1779 assert_eq!(value, json!({ "greeting": "hello" }));
1780 set_mock.assert();
1781 get_mock.assert();
1782 }
1783
1784 #[test]
1785 fn reference_parent_and_root() {
1786 let options = FirebaseOptions {
1787 project_id: Some("project".into()),
1788 ..Default::default()
1789 };
1790 let app = initialize_app(options, Some(unique_settings())).unwrap();
1791 let database = get_database(Some(app)).unwrap();
1792
1793 let nested = database.reference("users/alice/profile").unwrap();
1794 let parent = nested.parent().expect("parent reference");
1795 assert_eq!(parent.path(), "/users/alice/");
1796 assert_eq!(parent.parent().unwrap().path(), "/users/");
1797
1798 let root = nested.root();
1799 assert_eq!(root.path(), "/");
1800 assert!(root.parent().is_none());
1801 }
1802
1803 #[test]
1804 fn datasnapshot_child_and_metadata_helpers() {
1805 let options = FirebaseOptions {
1806 project_id: Some("project".into()),
1807 ..Default::default()
1808 };
1809 let app = initialize_app(options, Some(unique_settings())).unwrap();
1810 let database = get_database(Some(app)).unwrap();
1811 let profiles = database.reference("profiles").unwrap();
1812
1813 profiles
1814 .set(json!({
1815 "alice": { "age": 31, "city": "Rome" },
1816 "bob": { "age": 29 }
1817 }))
1818 .unwrap();
1819
1820 let captured = Arc::new(Mutex::new(None));
1821 let holder = captured.clone();
1822 profiles
1823 .on_value(move |snapshot| {
1824 *holder.lock().unwrap() = Some(snapshot);
1825 })
1826 .unwrap();
1827
1828 let snapshot = captured.lock().unwrap().clone().expect("initial snapshot");
1829 assert!(snapshot.exists());
1830 assert!(snapshot.has_children());
1831 assert_eq!(snapshot.size(), 2);
1832
1833 let alice = snapshot.child("alice").unwrap();
1834 assert_eq!(alice.key(), Some("alice"));
1835 assert_eq!(alice.size(), 2);
1836 assert!(alice.has_children());
1837 assert_eq!(alice.child("age").unwrap().value(), &json!(31));
1838 assert!(snapshot.has_child("bob").unwrap());
1839 assert!(!snapshot.has_child("carol").unwrap());
1840
1841 let json_output = snapshot.to_json();
1842 assert_eq!(
1843 json_output,
1844 json!({
1845 "alice": { "age": 31, "city": "Rome" },
1846 "bob": { "age": 29 }
1847 })
1848 );
1849 }
1850
1851 #[test]
1852 fn child_event_listeners_receive_updates() {
1853 let options = FirebaseOptions {
1854 project_id: Some("project".into()),
1855 ..Default::default()
1856 };
1857 let app = initialize_app(options, Some(unique_settings())).unwrap();
1858 let database = get_database(Some(app)).unwrap();
1859 let items = database.reference("items").unwrap();
1860
1861 items
1862 .set(json!({
1863 "a": { "count": 1 },
1864 "b": { "count": 2 }
1865 }))
1866 .unwrap();
1867
1868 let added_events = Arc::new(Mutex::new(Vec::<(String, Option<String>)>::new()));
1869 let capture = added_events.clone();
1870 let registration = items
1871 .on_child_added(move |snapshot, prev| {
1872 capture
1873 .lock()
1874 .unwrap()
1875 .push((snapshot.key().unwrap().to_string(), prev.clone()));
1876 })
1877 .unwrap();
1878
1879 {
1880 let events = added_events.lock().unwrap();
1881 assert_eq!(events.len(), 2);
1882 assert_eq!(events[0].0, "a");
1883 assert_eq!(events[1].0, "b");
1884 }
1885
1886 items
1887 .child("c")
1888 .unwrap()
1889 .set(json!({ "count": 3 }))
1890 .unwrap();
1891
1892 {
1893 let events = added_events.lock().unwrap();
1894 assert_eq!(events.len(), 3);
1895 assert_eq!(events[2].0, "c");
1896 }
1897
1898 registration.detach();
1899 }
1900
1901 #[test]
1902 fn rest_backend_set_with_priority_includes_metadata() {
1903 let server = MockServer::start();
1904
1905 let put_mock = server.mock(|when, then| {
1906 when.method(PUT)
1907 .path("/items.json")
1908 .query_param("print", "silent")
1909 .json_body(json!({
1910 ".value": { "count": 1 },
1911 ".priority": 3
1912 }));
1913 then.status(200)
1914 .header("content-type", "application/json")
1915 .body("null");
1916 });
1917
1918 let options = FirebaseOptions {
1919 project_id: Some("project".into()),
1920 database_url: Some(server.url("/")),
1921 ..Default::default()
1922 };
1923 let app = initialize_app(options, Some(unique_settings())).unwrap();
1924 let database = get_database(Some(app)).unwrap();
1925 let reference = database.reference("items").unwrap();
1926
1927 reference
1928 .set_with_priority(json!({ "count": 1 }), json!(3))
1929 .unwrap();
1930
1931 put_mock.assert();
1932 }
1933
1934 #[test]
1935 fn push_with_value_rest_backend_performs_put() {
1936 let server = MockServer::start();
1937
1938 let push_mock = server.mock(|when, then| {
1939 when.method(PUT)
1940 .path_contains("/messages/")
1941 .query_param("print", "silent")
1942 .json_body(json!({ "text": "hello" }));
1943 then.status(200)
1944 .header("content-type", "application/json")
1945 .body("null");
1946 });
1947
1948 let options = FirebaseOptions {
1949 project_id: Some("project".into()),
1950 database_url: Some(server.url("/")),
1951 ..Default::default()
1952 };
1953 let app = initialize_app(options, Some(unique_settings())).unwrap();
1954 let database = get_database(Some(app)).unwrap();
1955 let messages = database.reference("messages").unwrap();
1956
1957 let child = messages
1958 .push_with_value(json!({ "text": "hello" }))
1959 .expect("push with value rest");
1960
1961 assert_eq!(child.key().unwrap().len(), 20);
1962 push_mock.assert();
1963 }
1964
1965 #[test]
1966 fn rest_backend_uses_patch_for_updates() {
1967 let server = MockServer::start();
1968
1969 let patch_mock = server.mock(|when, then| {
1970 when.method(PATCH)
1971 .path("/items.json")
1972 .query_param("print", "silent")
1973 .json_body(json!({ "a/count": 2, "b": { "flag": true } }));
1974 then.status(200)
1975 .header("content-type", "application/json")
1976 .body("null");
1977 });
1978
1979 let options = FirebaseOptions {
1980 project_id: Some("project".into()),
1981 database_url: Some(server.url("/")),
1982 ..Default::default()
1983 };
1984 let app = initialize_app(options, Some(unique_settings())).unwrap();
1985 let database = get_database(Some(app)).unwrap();
1986 let reference = database.reference("items").unwrap();
1987
1988 let mut updates = serde_json::Map::new();
1989 updates.insert("a/count".to_string(), json!(2));
1990 updates.insert("b".to_string(), json!({ "flag": true }));
1991 reference.update(updates).expect("patch update");
1992
1993 patch_mock.assert();
1994 }
1995
1996 #[test]
1997 fn rest_backend_delete_supports_remove() {
1998 let server = MockServer::start();
1999
2000 let delete_mock = server.mock(|when, then| {
2001 when.method(DELETE)
2002 .path("/items.json")
2003 .query_param("print", "silent");
2004 then.status(200).body("null");
2005 });
2006
2007 let options = FirebaseOptions {
2008 project_id: Some("project".into()),
2009 database_url: Some(server.url("/")),
2010 ..Default::default()
2011 };
2012 let app = initialize_app(options, Some(unique_settings())).unwrap();
2013 let database = get_database(Some(app)).unwrap();
2014 let reference = database.reference("items").unwrap();
2015
2016 reference.remove().expect("delete request");
2017 delete_mock.assert();
2018 }
2019
2020 #[test]
2021 fn rest_backend_preserves_namespace_query_parameter() {
2022 let server = MockServer::start();
2023
2024 let set_mock = server.mock(|when, then| {
2025 when.method(PUT)
2026 .path("/messages.json")
2027 .query_param("ns", "demo-ns")
2028 .query_param("print", "silent")
2029 .json_body(json!({ "value": 1 }));
2030 then.status(200).body("null");
2031 });
2032
2033 let options = FirebaseOptions {
2034 project_id: Some("project".into()),
2035 database_url: Some(format!("{}?ns=demo-ns", server.url("/"))),
2036 ..Default::default()
2037 };
2038 let app = initialize_app(options, Some(unique_settings())).unwrap();
2039 let database = get_database(Some(app)).unwrap();
2040 let reference = database.reference("messages").unwrap();
2041
2042 reference.set(json!({ "value": 1 })).unwrap();
2043 set_mock.assert();
2044 }
2045
2046 #[test]
2047 fn rest_query_order_by_child_and_limit() {
2048 let server = MockServer::start();
2049
2050 let get_mock = server.mock(|when, then| {
2051 when.method(GET)
2052 .path("/items.json")
2053 .query_param("orderBy", "\"score\"")
2054 .query_param("startAt", "100")
2055 .query_param("limitToFirst", "5")
2056 .query_param("format", "export");
2057 then.status(200)
2058 .header("content-type", "application/json")
2059 .body(r#"{"a":{"score":120}}"#);
2060 });
2061
2062 let options = FirebaseOptions {
2063 project_id: Some("project".into()),
2064 database_url: Some(server.url("/")),
2065 ..Default::default()
2066 };
2067 let app = initialize_app(options, Some(unique_settings())).unwrap();
2068 let database = get_database(Some(app)).unwrap();
2069 let reference = database.reference("items").unwrap();
2070 let filtered = compose_query(
2071 reference,
2072 vec![order_by_child("score"), start_at(100), limit_to_first(5)],
2073 )
2074 .expect("compose query with constraints");
2075
2076 let value = filtered.get().unwrap();
2077 assert_eq!(value, json!({ "a": { "score": 120 } }));
2078 get_mock.assert();
2079 }
2080
2081 #[test]
2082 fn rest_query_equal_to_with_key() {
2083 let server = MockServer::start();
2084
2085 let get_mock = server.mock(|when, then| {
2086 when.method(GET)
2087 .path("/items.json")
2088 .query_param("orderBy", "\"$key\"")
2089 .query_param("startAt", "\"item-1\",\"item-1\"")
2090 .query_param("endAt", "\"item-1\",\"item-1\"")
2091 .query_param("format", "export");
2092 then.status(200)
2093 .header("content-type", "application/json")
2094 .body(r#"{"item-1":{"value":true}}"#);
2095 });
2096
2097 let options = FirebaseOptions {
2098 project_id: Some("project".into()),
2099 database_url: Some(server.url("/")),
2100 ..Default::default()
2101 };
2102 let app = initialize_app(options, Some(unique_settings())).unwrap();
2103 let database = get_database(Some(app)).unwrap();
2104 let filtered = compose_query(
2105 database.reference("items").unwrap(),
2106 vec![order_by_key(), equal_to_with_key("item-1", "item-1")],
2107 )
2108 .expect("compose equal_to query");
2109
2110 let value = filtered.get().unwrap();
2111 assert_eq!(value, json!({ "item-1": { "value": true } }));
2112 get_mock.assert();
2113 }
2114
2115 #[test]
2116 fn limit_to_first_rejects_zero() {
2117 let options = FirebaseOptions {
2118 project_id: Some("project".into()),
2119 ..Default::default()
2120 };
2121 let app = initialize_app(options, Some(unique_settings())).unwrap();
2122 let database = get_database(Some(app)).unwrap();
2123
2124 let err = database
2125 .reference("items")
2126 .unwrap()
2127 .query()
2128 .limit_to_first(0)
2129 .unwrap_err();
2130
2131 assert_eq!(err.code_str(), "database/invalid-argument");
2132 }
2133
2134 #[test]
2135 fn order_by_child_rejects_empty_path() {
2136 let options = FirebaseOptions {
2137 project_id: Some("project".into()),
2138 ..Default::default()
2139 };
2140 let app = initialize_app(options, Some(unique_settings())).unwrap();
2141 let database = get_database(Some(app)).unwrap();
2142
2143 let err = database
2144 .reference("items")
2145 .unwrap()
2146 .order_by_child("")
2147 .unwrap_err();
2148
2149 assert_eq!(err.code_str(), "database/invalid-argument");
2150 }
2151
2152 #[test]
2153 fn query_rejects_multiple_order_by_constraints() {
2154 let options = FirebaseOptions {
2155 project_id: Some("project".into()),
2156 ..Default::default()
2157 };
2158 let app = initialize_app(options, Some(unique_settings())).unwrap();
2159 let database = get_database(Some(app)).unwrap();
2160 let reference = database.reference("items").unwrap();
2161
2162 let err =
2163 compose_query(reference, vec![order_by_key(), order_by_child("score")]).unwrap_err();
2164
2165 assert_eq!(err.code_str(), "database/invalid-argument");
2166 }
2167
2168 #[test]
2169 fn on_value_listener_receives_updates() {
2170 let options = FirebaseOptions {
2171 project_id: Some("project".into()),
2172 ..Default::default()
2173 };
2174 let app = initialize_app(options, Some(unique_settings())).unwrap();
2175 let database = get_database(Some(app)).unwrap();
2176 let reference = database.reference("counters/main").unwrap();
2177
2178 let events = Arc::new(Mutex::new(Vec::<Value>::new()));
2179 let captured = events.clone();
2180
2181 let registration = reference
2182 .on_value(move |snapshot| {
2183 captured.lock().unwrap().push(snapshot.value().clone());
2184 })
2185 .expect("on_value registration");
2186
2187 reference.set(json!(1)).unwrap();
2188 reference.set(json!(2)).unwrap();
2189
2190 {
2191 let events = events.lock().unwrap();
2192 assert_eq!(events.len(), 3);
2193 assert_eq!(events[0], Value::Null);
2194 assert_eq!(events[1], json!(1));
2195 assert_eq!(events[2], json!(2));
2196 }
2197
2198 registration.detach();
2199 reference.set(json!(3)).unwrap();
2200
2201 let events = events.lock().unwrap();
2202 assert_eq!(events.len(), 3);
2203 }
2204
2205 #[test]
2206 fn query_on_value_reacts_to_changes() {
2207 let options = FirebaseOptions {
2208 project_id: Some("project".into()),
2209 ..Default::default()
2210 };
2211 let app = initialize_app(options, Some(unique_settings())).unwrap();
2212 let database = get_database(Some(app)).unwrap();
2213 let scores = database.reference("scores").unwrap();
2214
2215 scores
2216 .set(json!({
2217 "a": { "score": 10 },
2218 "b": { "score": 20 },
2219 "c": { "score": 30 }
2220 }))
2221 .unwrap();
2222
2223 let events = Arc::new(Mutex::new(Vec::<Value>::new()));
2224 let captured = events.clone();
2225
2226 let _registration = compose_query(
2227 scores.clone(),
2228 vec![order_by_child("score"), limit_to_last(1)],
2229 )
2230 .unwrap()
2231 .on_value(move |snapshot| {
2232 captured.lock().unwrap().push(snapshot.value().clone());
2233 })
2234 .unwrap();
2235
2236 {
2237 let events = events.lock().unwrap();
2238 assert_eq!(events.len(), 1);
2239 assert_eq!(
2240 events[0],
2241 json!({
2242 "a": { "score": 10 },
2243 "b": { "score": 20 },
2244 "c": { "score": 30 }
2245 })
2246 );
2247 }
2248
2249 scores
2250 .child("d")
2251 .unwrap()
2252 .set(json!({ "score": 50 }))
2253 .unwrap();
2254
2255 let events = events.lock().unwrap();
2256 assert_eq!(events.len(), 2);
2257 assert_eq!(
2258 events[1],
2259 json!({
2260 "a": { "score": 10 },
2261 "b": { "score": 20 },
2262 "c": { "score": 30 },
2263 "d": { "score": 50 }
2264 })
2265 );
2266 }
2267}