1use std::collections::{BTreeMap, HashMap};
2use std::convert::TryInto;
3use std::fmt;
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::{Arc, LazyLock, Mutex};
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use serde_json::{Map, Number, Value};
9
10use crate::app;
11use crate::app::FirebaseApp;
12use crate::component::types::{
13 ComponentError, DynService, InstanceFactoryOptions, InstantiationMode,
14};
15use crate::component::{Component, ComponentType};
16use crate::database::backend::{select_backend, DatabaseBackend};
17use crate::database::constants::DATABASE_COMPONENT_NAME;
18use crate::database::error::{internal_error, invalid_argument, DatabaseError, DatabaseResult};
19use crate::database::on_disconnect::OnDisconnect;
20use crate::database::push_id::next_push_id;
21use crate::database::query::{QueryBound, QueryIndex, QueryLimit, QueryParams};
22use crate::database::realtime::{ListenSpec, Repo};
23use crate::logger::Logger;
24use crate::platform::runtime;
25
26static REALTIME_LOGGER: LazyLock<Logger> =
27 LazyLock::new(|| Logger::new("@firebase/database/realtime"));
28
29#[derive(Clone, Debug)]
30pub struct Database {
31 inner: Arc<DatabaseInner>,
32}
33
34struct DatabaseInner {
35 app: FirebaseApp,
36 backend: Arc<dyn DatabaseBackend>,
37 repo: Arc<Repo>,
38 listeners: Mutex<HashMap<u64, Listener>>,
39 next_listener_id: AtomicU64,
40 root_cache: Mutex<Option<Value>>,
41}
42
43impl fmt::Debug for DatabaseInner {
44 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
45 f.debug_struct("DatabaseInner")
46 .field("app", &self.app.name())
47 .field("backend", &"dynamic")
48 .field("repo", &"realtime")
49 .finish()
50 }
51}
52
53#[derive(Clone, Debug)]
54pub struct DatabaseReference {
55 database: Database,
56 path: Vec<String>,
57}
58
59#[derive(Clone, Debug)]
62pub struct DatabaseQuery {
63 reference: DatabaseReference,
64 params: QueryParams,
65}
66
67#[derive(Clone, Debug)]
70pub struct QueryConstraint {
71 kind: QueryConstraintKind,
72}
73
74#[derive(Clone, Debug)]
75enum QueryConstraintKind {
76 OrderByChild(String),
77 OrderByKey,
78 OrderByValue,
79 OrderByPriority,
80 Start {
81 value: Value,
82 name: Option<String>,
83 inclusive: bool,
84 },
85 End {
86 value: Value,
87 name: Option<String>,
88 inclusive: bool,
89 },
90 LimitFirst(u32),
91 LimitLast(u32),
92 EqualTo {
93 value: Value,
94 name: Option<String>,
95 },
96}
97
98type ValueListenerCallback = Arc<dyn Fn(Result<DataSnapshot, DatabaseError>) + Send + Sync>;
99type ChildListenerCallback = Arc<dyn Fn(Result<ChildEvent, DatabaseError>) + Send + Sync>;
100
101#[derive(Clone, Copy, Debug, PartialEq, Eq)]
102pub enum ChildEventType {
103 Added,
104 Changed,
105 Removed,
106}
107
108#[derive(Clone)]
109pub struct ChildEvent {
110 pub event: ChildEventType,
111 pub snapshot: DataSnapshot,
112 pub previous_name: Option<String>,
113}
114
115#[derive(Clone)]
116enum ListenerKind {
117 Value(ValueListenerCallback),
118 Child {
119 event: ChildEventType,
120 callback: ChildListenerCallback,
121 },
122}
123
124#[derive(Clone)]
125struct Listener {
126 target: ListenerTarget,
127 kind: ListenerKind,
128 spec: ListenSpec,
129}
130
131#[derive(Clone)]
132enum ListenerTarget {
133 Reference(Vec<String>),
134 Query {
135 path: Vec<String>,
136 params: QueryParams,
137 },
138}
139
140impl ListenerTarget {
141 fn matches(&self, changed_path: &[String]) -> bool {
142 match self {
143 ListenerTarget::Reference(path) => paths_related(path, changed_path),
144 ListenerTarget::Query { path, .. } => paths_related(path, changed_path),
145 }
146 }
147}
148
149#[derive(Clone, Debug)]
152pub struct DataSnapshot {
153 reference: DatabaseReference,
154 value: Value,
155}
156
157impl DataSnapshot {
158 pub fn reference(&self) -> &DatabaseReference {
159 &self.reference
160 }
161
162 pub fn value(&self) -> &Value {
163 &self.value
164 }
165
166 pub fn exists(&self) -> bool {
167 !self.value.is_null()
168 }
169
170 pub fn key(&self) -> Option<&str> {
171 self.reference.key()
172 }
173
174 pub fn into_value(self) -> Value {
175 self.value
176 }
177
178 pub fn child(&self, relative_path: &str) -> DatabaseResult<DataSnapshot> {
181 let segments = normalize_path(relative_path)?;
182 let child_reference = self.reference.child(relative_path)?;
183 let value = get_value_at_path(&self.value, &segments).unwrap_or(Value::Null);
184 Ok(DataSnapshot {
185 reference: child_reference,
186 value,
187 })
188 }
189
190 pub fn has_child(&self, relative_path: &str) -> DatabaseResult<bool> {
193 let segments = normalize_path(relative_path)?;
194 Ok(get_value_at_path(&self.value, &segments)
195 .map(|value| !value.is_null())
196 .unwrap_or(false))
197 }
198
199 pub fn has_children(&self) -> bool {
202 match extract_data_ref(&self.value) {
203 Value::Object(map) => !map.is_empty(),
204 Value::Array(array) => !array.is_empty(),
205 _ => false,
206 }
207 }
208
209 pub fn size(&self) -> usize {
211 match extract_data_ref(&self.value) {
212 Value::Object(map) => map.len(),
213 Value::Array(array) => array.len(),
214 _ => 0,
215 }
216 }
217
218 pub fn to_json(&self) -> Value {
220 self.value.clone()
221 }
222}
223
224pub struct ListenerRegistration {
227 database: Database,
228 id: Option<u64>,
229}
230
231impl ListenerRegistration {
232 fn new(database: Database, id: u64) -> Self {
233 Self {
234 database,
235 id: Some(id),
236 }
237 }
238
239 pub fn detach(mut self) {
240 if let Some(id) = self.id.take() {
241 self.database.remove_listener(id);
242 }
243 }
244}
245
246#[derive(Clone, Debug)]
248pub struct TransactionResult {
249 pub committed: bool,
252 pub snapshot: DataSnapshot,
254}
255
256impl Drop for ListenerRegistration {
257 fn drop(&mut self) {
258 if let Some(id) = self.id.take() {
259 self.database.remove_listener(id);
260 }
261 }
262}
263
264impl QueryConstraint {
265 fn new(kind: QueryConstraintKind) -> Self {
266 Self { kind }
267 }
268
269 fn apply(self, query: DatabaseQuery) -> DatabaseResult<DatabaseQuery> {
270 match self.kind {
271 QueryConstraintKind::OrderByChild(path) => query.order_by_child(&path),
272 QueryConstraintKind::OrderByKey => query.order_by_key(),
273 QueryConstraintKind::OrderByValue => query.order_by_value(),
274 QueryConstraintKind::OrderByPriority => query.order_by_priority(),
275 QueryConstraintKind::Start {
276 value,
277 name,
278 inclusive,
279 } => {
280 if inclusive {
281 query.start_at_with_key(value, name)
282 } else {
283 query.start_after_with_key(value, name)
284 }
285 }
286 QueryConstraintKind::End {
287 value,
288 name,
289 inclusive,
290 } => {
291 if inclusive {
292 query.end_at_with_key(value, name)
293 } else {
294 query.end_before_with_key(value, name)
295 }
296 }
297 QueryConstraintKind::LimitFirst(limit) => query.limit_to_first(limit),
298 QueryConstraintKind::LimitLast(limit) => query.limit_to_last(limit),
299 QueryConstraintKind::EqualTo { value, name } => query.equal_to_with_key(value, name),
300 }
301 }
302}
303
304pub fn query(
307 reference: DatabaseReference,
308 constraints: impl IntoIterator<Item = QueryConstraint>,
309) -> DatabaseResult<DatabaseQuery> {
310 let mut current = reference.query();
311 for constraint in constraints {
312 current = constraint.apply(current)?;
313 }
314 Ok(current)
315}
316
317pub fn order_by_child(path: impl Into<String>) -> QueryConstraint {
320 QueryConstraint::new(QueryConstraintKind::OrderByChild(path.into()))
321}
322
323pub fn order_by_key() -> QueryConstraint {
325 QueryConstraint::new(QueryConstraintKind::OrderByKey)
326}
327
328pub fn order_by_priority() -> QueryConstraint {
330 QueryConstraint::new(QueryConstraintKind::OrderByPriority)
331}
332
333pub fn order_by_value() -> QueryConstraint {
335 QueryConstraint::new(QueryConstraintKind::OrderByValue)
336}
337
338pub fn start_at<V>(value: V) -> QueryConstraint
340where
341 V: Into<Value>,
342{
343 QueryConstraint::new(QueryConstraintKind::Start {
344 value: value.into(),
345 name: None,
346 inclusive: true,
347 })
348}
349
350pub fn start_at_with_key<V, S>(value: V, name: S) -> QueryConstraint
352where
353 V: Into<Value>,
354 S: Into<String>,
355{
356 QueryConstraint::new(QueryConstraintKind::Start {
357 value: value.into(),
358 name: Some(name.into()),
359 inclusive: true,
360 })
361}
362
363pub fn start_after<V>(value: V) -> QueryConstraint
365where
366 V: Into<Value>,
367{
368 QueryConstraint::new(QueryConstraintKind::Start {
369 value: value.into(),
370 name: None,
371 inclusive: false,
372 })
373}
374
375pub fn start_after_with_key<V, S>(value: V, name: S) -> QueryConstraint
377where
378 V: Into<Value>,
379 S: Into<String>,
380{
381 QueryConstraint::new(QueryConstraintKind::Start {
382 value: value.into(),
383 name: Some(name.into()),
384 inclusive: false,
385 })
386}
387
388pub fn end_at<V>(value: V) -> QueryConstraint
390where
391 V: Into<Value>,
392{
393 QueryConstraint::new(QueryConstraintKind::End {
394 value: value.into(),
395 name: None,
396 inclusive: true,
397 })
398}
399
400pub fn end_at_with_key<V, S>(value: V, name: S) -> QueryConstraint
402where
403 V: Into<Value>,
404 S: Into<String>,
405{
406 QueryConstraint::new(QueryConstraintKind::End {
407 value: value.into(),
408 name: Some(name.into()),
409 inclusive: true,
410 })
411}
412
413pub fn end_before<V>(value: V) -> QueryConstraint
415where
416 V: Into<Value>,
417{
418 QueryConstraint::new(QueryConstraintKind::End {
419 value: value.into(),
420 name: None,
421 inclusive: false,
422 })
423}
424
425pub fn end_before_with_key<V, S>(value: V, name: S) -> QueryConstraint
427where
428 V: Into<Value>,
429 S: Into<String>,
430{
431 QueryConstraint::new(QueryConstraintKind::End {
432 value: value.into(),
433 name: Some(name.into()),
434 inclusive: false,
435 })
436}
437
438pub fn limit_to_first(limit: u32) -> QueryConstraint {
440 QueryConstraint::new(QueryConstraintKind::LimitFirst(limit))
441}
442
443pub fn limit_to_last(limit: u32) -> QueryConstraint {
445 QueryConstraint::new(QueryConstraintKind::LimitLast(limit))
446}
447
448pub fn equal_to<V>(value: V) -> QueryConstraint
450where
451 V: Into<Value>,
452{
453 QueryConstraint::new(QueryConstraintKind::EqualTo {
454 value: value.into(),
455 name: None,
456 })
457}
458
459pub fn equal_to_with_key<V, S>(value: V, name: S) -> QueryConstraint
461where
462 V: Into<Value>,
463 S: Into<String>,
464{
465 QueryConstraint::new(QueryConstraintKind::EqualTo {
466 value: value.into(),
467 name: Some(name.into()),
468 })
469}
470
471pub async fn push(reference: &DatabaseReference) -> DatabaseResult<DatabaseReference> {
476 reference.push().await
477}
478
479pub async fn push_with_value<V>(
484 reference: &DatabaseReference,
485 value: V,
486) -> DatabaseResult<DatabaseReference>
487where
488 V: Into<Value>,
489{
490 reference.push_with_value(value).await
491}
492
493#[allow(dead_code)]
495pub async fn on_value<F>(
496 reference: &DatabaseReference,
497 callback: F,
498) -> DatabaseResult<ListenerRegistration>
499where
500 F: Fn(Result<DataSnapshot, DatabaseError>) + Send + Sync + 'static,
501{
502 reference.on_value(callback).await
503}
504
505pub async fn on_child_added<F>(
507 reference: &DatabaseReference,
508 callback: F,
509) -> DatabaseResult<ListenerRegistration>
510where
511 F: Fn(Result<ChildEvent, DatabaseError>) + Send + Sync + 'static,
512{
513 reference.on_child_added(callback).await
514}
515
516pub async fn on_child_changed<F>(
518 reference: &DatabaseReference,
519 callback: F,
520) -> DatabaseResult<ListenerRegistration>
521where
522 F: Fn(Result<ChildEvent, DatabaseError>) + Send + Sync + 'static,
523{
524 reference.on_child_changed(callback).await
525}
526
527pub async fn on_child_removed<F>(
529 reference: &DatabaseReference,
530 callback: F,
531) -> DatabaseResult<ListenerRegistration>
532where
533 F: Fn(Result<ChildEvent, DatabaseError>) + Send + Sync + 'static,
534{
535 reference.on_child_removed(callback).await
536}
537
538pub async fn run_transaction<F>(
545 reference: &DatabaseReference,
546 mut update: F,
547) -> DatabaseResult<TransactionResult>
548where
549 F: FnMut(Value) -> Option<Value>,
550{
551 reference.run_transaction(|value| update(value)).await
552}
553
554pub async fn set_with_priority<V, P>(
557 reference: &DatabaseReference,
558 value: V,
559 priority: P,
560) -> DatabaseResult<()>
561where
562 V: Into<Value>,
563 P: Into<Value>,
564{
565 reference.set_with_priority(value, priority).await
566}
567
568pub async fn set_priority<P>(reference: &DatabaseReference, priority: P) -> DatabaseResult<()>
571where
572 P: Into<Value>,
573{
574 reference.set_priority(priority).await
575}
576
577impl Database {
578 fn new(app: FirebaseApp) -> Self {
579 let repo = Repo::new_for_app(&app);
580 let inner = Arc::new(DatabaseInner {
581 backend: select_backend(&app),
582 repo: repo.clone(),
583 app,
584 listeners: Mutex::new(HashMap::new()),
585 next_listener_id: AtomicU64::new(1),
586 root_cache: Mutex::new(None),
587 });
588 let database = Self { inner };
589 let handler_db = database.clone();
590 repo.set_event_handler(Arc::new(move |action, body| {
591 let database = handler_db.clone();
592 Box::pin(async move { database.handle_realtime_action(&action, &body).await })
593 }));
594 database
595 }
596
597 fn cache_root(&self, value: Value) {
598 *self.inner.root_cache.lock().unwrap() = Some(value);
599 }
600
601 pub(crate) fn repo(&self) -> Arc<Repo> {
602 self.inner.repo.clone()
603 }
604
605 #[allow(dead_code)]
606 #[cfg(test)]
607 fn clear_root_cache_for_test(&self) {
608 *self.inner.root_cache.lock().unwrap() = None;
609 }
610
611 async fn handle_realtime_action(
612 &self,
613 action: &str,
614 body: &serde_json::Value,
615 ) -> DatabaseResult<()> {
616 match action {
617 "d" | "m" => self.handle_realtime_data(action, body).await,
618 "c" => {
619 REALTIME_LOGGER.warn("listener revoked by server".to_string());
620 self.revoke_listener(body).await;
621 Ok(())
622 }
623 "ac" | "apc" => {
624 REALTIME_LOGGER.warn(format!("credential revoked by server ({action})"));
625 self.fail_listeners(internal_error(format!(
626 "realtime credential revoked: {action}"
627 )));
628 Ok(())
629 }
630 "error" => {
631 let message = body.as_str().unwrap_or("realtime connection error");
632 self.fail_listeners(internal_error(message.to_string()));
633 Ok(())
634 }
635 "sd" => Ok(()),
636 other => Err(internal_error(format!(
637 "unhandled realtime action '{other}'"
638 ))),
639 }
640 }
641
642 async fn handle_realtime_data(
643 &self,
644 action: &str,
645 body: &serde_json::Value,
646 ) -> DatabaseResult<()> {
647 let Some(path) = body.get("p").and_then(|value| value.as_str()) else {
648 return Ok(());
649 };
650 let data = body.get("d").cloned().unwrap_or(serde_json::Value::Null);
651
652 let segments = normalize_path(path)?;
653 let old_root = self.root_snapshot().await?;
654 let mut new_root = old_root.clone();
655
656 match action {
657 "d" => apply_realtime_value(&mut new_root, &segments, data.clone()),
658 "m" => {
659 let Value::Object(map) = &data else {
660 return Err(invalid_argument(
661 "Realtime merge payload must be a JSON object",
662 ));
663 };
664 for (key, value) in map.iter() {
665 let mut child_path = segments.clone();
666 child_path.extend(normalize_path(key)?);
667 apply_realtime_value(&mut new_root, &child_path, value.clone());
668 }
669 }
670 _ => {}
671 }
672
673 REALTIME_LOGGER.debug(format!(
674 "realtime payload action={action} path={path} data={data:?}"
675 ));
676
677 let new_root_for_cache = new_root.clone();
678 self.dispatch_listeners(&segments, &old_root, &new_root)
679 .await?;
680 self.cache_root(new_root_for_cache);
681 Ok(())
682 }
683
684 async fn revoke_listener(&self, body: &serde_json::Value) {
685 let Some(path) = body.get("p").and_then(|value| value.as_str()) else {
686 return;
687 };
688 let segments = match normalize_path(path) {
689 Ok(segments) => segments,
690 Err(err) => {
691 REALTIME_LOGGER.warn(format!("failed to normalise revoked path: {err}"));
692 return;
693 }
694 };
695
696 let (removed, should_disconnect) = {
697 let mut listeners = self.inner.listeners.lock().unwrap();
698 let cancelled: Vec<u64> = listeners
699 .iter()
700 .filter(|(_, listener)| match &listener.target {
701 ListenerTarget::Reference(path) => path == &segments,
702 ListenerTarget::Query { path, .. } => path == &segments,
703 })
704 .map(|(id, _)| *id)
705 .collect();
706
707 let removed = cancelled
708 .into_iter()
709 .filter_map(|id| listeners.remove(&id))
710 .collect::<Vec<_>>();
711 let should_disconnect = listeners.is_empty();
712 (removed, should_disconnect)
713 };
714
715 if removed.is_empty() {
716 return;
717 }
718
719 for listener in &removed {
720 if let Err(err) = self.inner.repo.unlisten(listener.spec.clone()).await {
721 REALTIME_LOGGER.warn(format!("failed to detach revoked realtime listener: {err}"));
722 }
723 }
724
725 let error = internal_error("listener revoked by server".to_string());
726 for listener in removed {
727 match listener.kind {
728 ListenerKind::Value(callback) => {
729 callback(Err(error.clone()));
730 }
731 ListenerKind::Child { callback, .. } => {
732 callback(Err(error.clone()));
733 }
734 }
735 }
736
737 if should_disconnect {
738 if let Err(err) = self.go_offline().await {
739 REALTIME_LOGGER.warn(format!(
740 "failed to go offline after listener revocation: {err}"
741 ));
742 }
743 }
744 }
745
746 fn fail_listeners(&self, err: DatabaseError) {
747 let mut guard = self.inner.listeners.lock().unwrap();
748 let listeners = guard.values().cloned().collect::<Vec<_>>();
749 guard.clear();
750 drop(guard);
751 for listener in listeners {
752 REALTIME_LOGGER.warn(format!("listener cancelled due to error: {err}"));
753 match listener.kind {
754 ListenerKind::Value(callback) => {
755 callback(Err(err.clone()));
756 }
757 ListenerKind::Child { callback, .. } => {
758 callback(Err(err.clone()));
759 }
760 }
761 }
762 }
763
764 pub async fn go_online(&self) -> DatabaseResult<()> {
765 self.inner.repo.go_online().await
766 }
767
768 pub async fn go_offline(&self) -> DatabaseResult<()> {
769 self.inner.repo.go_offline().await
770 }
771
772 pub fn app(&self) -> &FirebaseApp {
773 &self.inner.app
774 }
775
776 pub fn reference(&self, path: &str) -> DatabaseResult<DatabaseReference> {
777 let segments = normalize_path(path)?;
778 Ok(DatabaseReference {
779 database: self.clone(),
780 path: segments,
781 })
782 }
783
784 fn reference_from_segments(&self, segments: Vec<String>) -> DatabaseReference {
785 DatabaseReference {
786 database: self.clone(),
787 path: segments,
788 }
789 }
790
791 fn listen_spec_for_target(&self, target: &ListenerTarget) -> DatabaseResult<ListenSpec> {
792 match target {
793 ListenerTarget::Reference(path) => Ok(ListenSpec::new(path.clone(), Vec::new())),
794 ListenerTarget::Query { path, params } => {
795 let mut rest_params = params.to_rest_params()?;
796 if rest_params.iter().all(|(key, _)| key != "format") {
802 rest_params.push(("format".to_string(), "export".to_string()));
803 }
804 Ok(ListenSpec::new(path.clone(), rest_params))
805 }
806 }
807 }
808
809 async fn register_listener(
810 &self,
811 target: ListenerTarget,
812 kind: ListenerKind,
813 ) -> DatabaseResult<ListenerRegistration> {
814 let spec = self.listen_spec_for_target(&target)?;
815 let id = self.inner.next_listener_id.fetch_add(1, Ordering::SeqCst);
816
817 let first_listener = {
818 let mut listeners = self.inner.listeners.lock().unwrap();
819 let was_empty = listeners.is_empty();
820 listeners.insert(
821 id,
822 Listener {
823 target: target.clone(),
824 kind: kind.clone(),
825 spec: spec.clone(),
826 },
827 );
828 was_empty
829 };
830
831 if first_listener {
832 if let Err(err) = self.go_online().await {
833 let mut listeners = self.inner.listeners.lock().unwrap();
834 listeners.remove(&id);
835 return Err(err);
836 }
837 }
838
839 if let Err(err) = self.inner.repo.listen(spec.clone()).await {
840 let mut listeners = self.inner.listeners.lock().unwrap();
841 listeners.remove(&id);
842 if first_listener {
843 let _ = self.go_offline().await;
844 }
845 return Err(err);
846 }
847
848 let current_root = match self.root_snapshot().await {
849 Ok(root) => root,
850 Err(err) => {
851 self.remove_listener(id);
852 return Err(err);
853 }
854 };
855 match kind {
856 ListenerKind::Value(callback) => {
857 let snapshot = self.snapshot_from_root(&target, ¤t_root).await?;
858 callback(Ok(snapshot));
859 }
860 ListenerKind::Child { event, callback } => {
861 if let Err(err) =
862 self.fire_initial_child_events(&target, event, &callback, ¤t_root)
863 {
864 self.remove_listener(id);
865 return Err(err);
866 }
867 }
868 }
869
870 Ok(ListenerRegistration::new(self.clone(), id))
871 }
872
873 fn remove_listener(&self, id: u64) {
874 let (listener, should_disconnect) = {
875 let mut listeners = self.inner.listeners.lock().unwrap();
876 let removed = listeners.remove(&id);
877 let should_disconnect = listeners.is_empty();
878 (removed, should_disconnect)
879 };
880
881 if let Some(listener) = listener {
882 let repo = self.inner.repo.clone();
883 let spec = listener.spec.clone();
884 runtime::spawn_detached(async move {
885 if let Err(err) = repo.unlisten(spec).await {
886 REALTIME_LOGGER.warn(format!(
887 "failed to detach realtime listener during cleanup: {err}"
888 ));
889 }
890 });
891 }
892
893 if should_disconnect {
894 let database = self.clone();
895 runtime::spawn_detached(async move {
896 if let Err(err) = database.go_offline().await {
897 REALTIME_LOGGER.warn(format!(
898 "failed to go offline after removing last listener: {err}"
899 ));
900 }
901 });
902 }
903 }
904
905 async fn dispatch_listeners(
906 &self,
907 changed_path: &[String],
908 old_root: &Value,
909 new_root: &Value,
910 ) -> DatabaseResult<()> {
911 let listeners: Vec<Listener> = {
912 let listeners = self.inner.listeners.lock().unwrap();
913 listeners
914 .values()
915 .filter(|listener| listener.target.matches(changed_path))
916 .cloned()
917 .collect()
918 };
919
920 for listener in listeners {
921 match &listener.kind {
922 ListenerKind::Value(callback) => {
923 let snapshot = self.snapshot_from_root(&listener.target, new_root).await?;
924 callback(Ok(snapshot));
925 }
926 ListenerKind::Child { event, callback } => {
927 self.invoke_child_listener(&listener, *event, callback, old_root, new_root)?;
928 }
929 }
930 }
931 Ok(())
932 }
933
934 async fn root_snapshot(&self) -> DatabaseResult<Value> {
935 if let Some(value) = self.inner.root_cache.lock().unwrap().clone() {
936 return Ok(value);
937 }
938 let value = self.inner.backend.get(&[], &[]).await?;
939 *self.inner.root_cache.lock().unwrap() = Some(value.clone());
940 Ok(value)
941 }
942
943 async fn snapshot_from_root(
944 &self,
945 target: &ListenerTarget,
946 root: &Value,
947 ) -> DatabaseResult<DataSnapshot> {
948 match target {
949 ListenerTarget::Reference(path) => {
950 let value = value_at_path(root, path);
951 let reference = self.reference_from_segments(path.clone());
952 Ok(DataSnapshot { reference, value })
953 }
954 ListenerTarget::Query { .. } => self.snapshot_for_target(target).await,
955 }
956 }
957
958 fn fire_initial_child_events(
959 &self,
960 target: &ListenerTarget,
961 event: ChildEventType,
962 callback: &ChildListenerCallback,
963 root: &Value,
964 ) -> DatabaseResult<()> {
965 if event != ChildEventType::Added {
966 return Ok(());
967 }
968
969 if let ListenerTarget::Reference(path) = target {
970 let new_value = value_at_path(root, path);
971 let empty = Value::Null;
972 self.emit_child_events(path, event, callback, &empty, &new_value)?;
973 }
974 Ok(())
975 }
976
977 fn invoke_child_listener(
978 &self,
979 listener: &Listener,
980 event: ChildEventType,
981 callback: &ChildListenerCallback,
982 old_root: &Value,
983 new_root: &Value,
984 ) -> DatabaseResult<()> {
985 let ListenerTarget::Reference(path) = &listener.target else {
986 return Ok(());
987 };
988 let old_value = value_at_path(old_root, path);
989 let new_value = value_at_path(new_root, path);
990 self.emit_child_events(path, event, callback, &old_value, &new_value)
991 }
992
993 fn emit_child_events(
994 &self,
995 parent_path: &[String],
996 event: ChildEventType,
997 callback: &ChildListenerCallback,
998 old_value: &Value,
999 new_value: &Value,
1000 ) -> DatabaseResult<()> {
1001 let old_children = children_map(old_value);
1002 let new_children = children_map(new_value);
1003
1004 match event {
1005 ChildEventType::Added => {
1006 let new_keys: Vec<String> = new_children.keys().cloned().collect();
1007 for key in new_keys.iter() {
1008 if !old_children.contains_key(key) {
1009 let value = new_children.get(key).cloned().unwrap_or(Value::Null);
1010 let prev_name = previous_key(&new_keys, key);
1011 let snapshot = self.child_snapshot(parent_path, key, value.clone());
1012 callback(Ok(ChildEvent {
1013 event,
1014 snapshot,
1015 previous_name: prev_name,
1016 }));
1017 }
1018 }
1019 }
1020 ChildEventType::Changed => {
1021 let new_keys: Vec<String> = new_children.keys().cloned().collect();
1022 for key in new_keys.iter() {
1023 if let Some(old_value_child) = old_children.get(key) {
1024 let new_child = new_children.get(key).expect("child exists in map");
1025 if old_value_child != new_child {
1026 let value = new_child.clone();
1027 let prev_name = previous_key(&new_keys, key);
1028 let snapshot = self.child_snapshot(parent_path, key, value);
1029 callback(Ok(ChildEvent {
1030 event,
1031 snapshot,
1032 previous_name: prev_name,
1033 }));
1034 }
1035 }
1036 }
1037 }
1038 ChildEventType::Removed => {
1039 let old_keys: Vec<String> = old_children.keys().cloned().collect();
1040 for key in old_keys.iter() {
1041 if !new_children.contains_key(key) {
1042 let value = old_children.get(key).cloned().unwrap_or(Value::Null);
1043 let prev_name = previous_key(&old_keys, key);
1044 let snapshot = self.child_snapshot(parent_path, key, value);
1045 callback(Ok(ChildEvent {
1046 event,
1047 snapshot,
1048 previous_name: prev_name,
1049 }));
1050 }
1051 }
1052 }
1053 }
1054 Ok(())
1055 }
1056
1057 fn child_snapshot(
1058 &self,
1059 parent_path: &[String],
1060 child_key: &str,
1061 value: Value,
1062 ) -> DataSnapshot {
1063 let mut segments = parent_path.to_vec();
1064 segments.push(child_key.to_string());
1065 let reference = self.reference_from_segments(segments);
1066 DataSnapshot { reference, value }
1067 }
1068
1069 async fn snapshot_for_target(&self, target: &ListenerTarget) -> DatabaseResult<DataSnapshot> {
1070 match target {
1071 ListenerTarget::Reference(path) => {
1072 let value = self.inner.backend.get(path, &[]).await?;
1073 let reference = self.reference_from_segments(path.clone());
1074 Ok(DataSnapshot { reference, value })
1075 }
1076 ListenerTarget::Query { path, params } => {
1077 let rest_params = params.to_rest_params()?;
1078 let value = self.inner.backend.get(path, rest_params.as_slice()).await?;
1079 let reference = self.reference_from_segments(path.clone());
1080 Ok(DataSnapshot { reference, value })
1081 }
1082 }
1083 }
1084}
1085
1086impl DatabaseReference {
1087 pub fn child(&self, relative: &str) -> DatabaseResult<DatabaseReference> {
1088 let mut segments = self.path.clone();
1089 segments.extend(normalize_path(relative)?);
1090 Ok(DatabaseReference {
1091 database: self.database.clone(),
1092 path: segments,
1093 })
1094 }
1095
1096 pub fn parent(&self) -> Option<DatabaseReference> {
1098 if self.path.is_empty() {
1099 None
1100 } else {
1101 let mut parent = self.path.clone();
1102 parent.pop();
1103 Some(DatabaseReference {
1104 database: self.database.clone(),
1105 path: parent,
1106 })
1107 }
1108 }
1109
1110 pub fn root(&self) -> DatabaseReference {
1112 DatabaseReference {
1113 database: self.database.clone(),
1114 path: Vec::new(),
1115 }
1116 }
1117
1118 pub async fn set(&self, value: Value) -> DatabaseResult<()> {
1119 let value = self.resolve_value_for_path(&self.path, value).await?;
1120 let old_root = self.database.root_snapshot().await?;
1121 let value_for_local = value.clone();
1122 self.database.inner.backend.set(&self.path, value).await?;
1123
1124 let mut new_root = old_root.clone();
1125 apply_realtime_value(&mut new_root, &self.path, value_for_local);
1126
1127 self.database
1128 .dispatch_listeners(&self.path, &old_root, &new_root)
1129 .await?;
1130 self.database.cache_root(new_root);
1131 Ok(())
1132 }
1133
1134 pub fn query(&self) -> DatabaseQuery {
1136 DatabaseQuery {
1137 reference: self.clone(),
1138 params: QueryParams::default(),
1139 }
1140 }
1141
1142 pub fn order_by_child(&self, path: &str) -> DatabaseResult<DatabaseQuery> {
1144 self.query().order_by_child(path)
1145 }
1146
1147 pub fn order_by_key(&self) -> DatabaseResult<DatabaseQuery> {
1149 self.query().order_by_key()
1150 }
1151
1152 pub fn order_by_value(&self) -> DatabaseResult<DatabaseQuery> {
1154 self.query().order_by_value()
1155 }
1156
1157 pub fn order_by_priority(&self) -> DatabaseResult<DatabaseQuery> {
1159 self.query().order_by_priority()
1160 }
1161
1162 pub async fn on_value<F>(&self, callback: F) -> DatabaseResult<ListenerRegistration>
1164 where
1165 F: Fn(Result<DataSnapshot, DatabaseError>) + Send + Sync + 'static,
1166 {
1167 let user_fn: ValueListenerCallback = Arc::new(callback);
1168 self.database
1169 .register_listener(
1170 ListenerTarget::Reference(self.path.clone()),
1171 ListenerKind::Value(user_fn),
1172 )
1173 .await
1174 }
1175
1176 pub async fn on_child_added<F>(&self, callback: F) -> DatabaseResult<ListenerRegistration>
1178 where
1179 F: Fn(Result<ChildEvent, DatabaseError>) + Send + Sync + 'static,
1180 {
1181 let cb: ChildListenerCallback = Arc::new(callback);
1182 self.database
1183 .register_listener(
1184 ListenerTarget::Reference(self.path.clone()),
1185 ListenerKind::Child {
1186 event: ChildEventType::Added,
1187 callback: cb,
1188 },
1189 )
1190 .await
1191 }
1192
1193 pub async fn on_child_changed<F>(&self, callback: F) -> DatabaseResult<ListenerRegistration>
1195 where
1196 F: Fn(Result<ChildEvent, DatabaseError>) + Send + Sync + 'static,
1197 {
1198 let cb: ChildListenerCallback = Arc::new(callback);
1199 self.database
1200 .register_listener(
1201 ListenerTarget::Reference(self.path.clone()),
1202 ListenerKind::Child {
1203 event: ChildEventType::Changed,
1204 callback: cb,
1205 },
1206 )
1207 .await
1208 }
1209
1210 pub async fn on_child_removed<F>(&self, callback: F) -> DatabaseResult<ListenerRegistration>
1212 where
1213 F: Fn(Result<ChildEvent, DatabaseError>) + Send + Sync + 'static,
1214 {
1215 let cb: ChildListenerCallback = Arc::new(callback);
1216 self.database
1217 .register_listener(
1218 ListenerTarget::Reference(self.path.clone()),
1219 ListenerKind::Child {
1220 event: ChildEventType::Removed,
1221 callback: cb,
1222 },
1223 )
1224 .await
1225 }
1226
1227 pub fn on_disconnect(&self) -> OnDisconnect {
1229 OnDisconnect::new(self.clone())
1230 }
1231
1232 pub(crate) fn database(&self) -> &Database {
1233 &self.database
1234 }
1235
1236 pub(crate) fn path_segments(&self) -> Vec<String> {
1237 self.path.clone()
1238 }
1239
1240 pub(crate) async fn resolve_for_current_path(&self, value: Value) -> DatabaseResult<Value> {
1241 self.resolve_value_for_path(&self.path, value).await
1242 }
1243
1244 pub(crate) async fn resolve_for_absolute_path(
1245 &self,
1246 path: &[String],
1247 value: Value,
1248 ) -> DatabaseResult<Value> {
1249 self.resolve_value_for_path(path, value).await
1250 }
1251
1252 pub async fn run_transaction<F>(&self, mut update: F) -> DatabaseResult<TransactionResult>
1255 where
1256 F: FnMut(Value) -> Option<Value>,
1257 {
1258 let current_value = self.get().await?;
1259 let maybe_new = update(current_value.clone());
1260
1261 match maybe_new {
1262 Some(new_value) => {
1263 self.set(new_value.clone()).await?;
1264 let snapshot = DataSnapshot {
1265 reference: self.clone(),
1266 value: new_value,
1267 };
1268 Ok(TransactionResult {
1269 committed: true,
1270 snapshot,
1271 })
1272 }
1273 None => Ok(TransactionResult {
1274 committed: false,
1275 snapshot: DataSnapshot {
1276 reference: self.clone(),
1277 value: current_value,
1278 },
1279 }),
1280 }
1281 }
1282
1283 pub async fn update(&self, updates: serde_json::Map<String, Value>) -> DatabaseResult<()> {
1289 if updates.is_empty() {
1290 return Ok(());
1291 }
1292
1293 let mut operations = Vec::with_capacity(updates.len());
1294 for (key, value) in updates {
1295 if key.trim().is_empty() {
1296 return Err(invalid_argument("Database update path cannot be empty"));
1297 }
1298 let mut segments = self.path.clone();
1299 let relative = normalize_path(&key)?;
1300 if relative.is_empty() {
1301 return Err(invalid_argument(
1302 "Database update path cannot reference the current location",
1303 ));
1304 }
1305 segments.extend(relative);
1306 let resolved = self.resolve_value_for_path(&segments, value).await?;
1307 operations.push((segments, resolved));
1308 }
1309
1310 let old_root = self.database.root_snapshot().await?;
1311 let ops_for_local = operations.clone();
1312 self.database
1313 .inner
1314 .backend
1315 .update(&self.path, operations)
1316 .await?;
1317
1318 let mut new_root = old_root.clone();
1319 for (absolute, value) in ops_for_local {
1320 apply_realtime_value(&mut new_root, &absolute, value);
1321 }
1322
1323 self.database
1324 .dispatch_listeners(&self.path, &old_root, &new_root)
1325 .await?;
1326 self.database.cache_root(new_root);
1327 Ok(())
1328 }
1329
1330 pub async fn get(&self) -> DatabaseResult<Value> {
1331 if let Some(root) = self.database.inner.root_cache.lock().unwrap().clone() {
1332 return Ok(value_at_path(&root, &self.path));
1333 }
1334 self.database.inner.backend.get(&self.path, &[]).await
1335 }
1336
1337 pub async fn remove(&self) -> DatabaseResult<()> {
1339 let old_root = self.database.root_snapshot().await?;
1340 self.database.inner.backend.delete(&self.path).await?;
1341 let mut new_root = old_root.clone();
1342 apply_realtime_value(&mut new_root, &self.path, Value::Null);
1343 self.database
1344 .dispatch_listeners(&self.path, &old_root, &new_root)
1345 .await?;
1346 self.database.cache_root(new_root);
1347 Ok(())
1348 }
1349
1350 pub async fn set_with_priority<V, P>(&self, value: V, priority: P) -> DatabaseResult<()>
1353 where
1354 V: Into<Value>,
1355 P: Into<Value>,
1356 {
1357 let priority = priority.into();
1358 validate_priority_value(&priority)?;
1359 if matches!(self.key(), Some(".length" | ".keys")) {
1360 return Err(invalid_argument(
1361 "set_with_priority failed: read-only child key",
1362 ));
1363 }
1364
1365 let value = self
1366 .resolve_value_for_path(&self.path, value.into())
1367 .await?;
1368 let payload = pack_with_priority(value, priority);
1369 let payload_for_local = payload.clone();
1370 let old_root = self.database.root_snapshot().await?;
1371 self.database.inner.backend.set(&self.path, payload).await?;
1372
1373 let mut new_root = old_root.clone();
1374 apply_realtime_value(&mut new_root, &self.path, payload_for_local);
1375
1376 self.database
1377 .dispatch_listeners(&self.path, &old_root, &new_root)
1378 .await?;
1379 self.database.cache_root(new_root);
1380 Ok(())
1381 }
1382
1383 pub async fn set_priority<P>(&self, priority: P) -> DatabaseResult<()>
1385 where
1386 P: Into<Value>,
1387 {
1388 let priority = priority.into();
1389 validate_priority_value(&priority)?;
1390
1391 let current = self.database.inner.backend.get(&self.path, &[]).await?;
1392 let value = extract_data_owned(¤t);
1393 let payload = pack_with_priority(value, priority);
1394 let payload_for_local = payload.clone();
1395 let old_root = self.database.root_snapshot().await?;
1396 self.database.inner.backend.set(&self.path, payload).await?;
1397
1398 let mut new_root = old_root.clone();
1399 apply_realtime_value(&mut new_root, &self.path, payload_for_local);
1400
1401 self.database
1402 .dispatch_listeners(&self.path, &old_root, &new_root)
1403 .await?;
1404 self.database.cache_root(new_root);
1405 Ok(())
1406 }
1407
1408 pub async fn push(&self) -> DatabaseResult<DatabaseReference> {
1423 self.push_internal(None).await
1424 }
1425
1426 pub async fn push_with_value<V>(&self, value: V) -> DatabaseResult<DatabaseReference>
1430 where
1431 V: Into<Value>,
1432 {
1433 self.push_internal(Some(value.into())).await
1434 }
1435
1436 async fn resolve_value_for_path(&self, path: &[String], value: Value) -> DatabaseResult<Value> {
1437 if contains_server_value(&value) {
1438 let current = self.database.inner.backend.get(path, &[]).await?;
1439 let current_ref = extract_data_ref(¤t);
1440 resolve_server_values(value, Some(current_ref))
1441 } else {
1442 Ok(value)
1443 }
1444 }
1445
1446 async fn push_internal(&self, value: Option<Value>) -> DatabaseResult<DatabaseReference> {
1447 let timestamp = current_time_millis()?;
1448 let key = next_push_id(timestamp);
1449 let child = self.child(&key)?;
1450 if let Some(value) = value {
1451 child.set(value).await?;
1452 }
1453 Ok(child)
1454 }
1455
1456 pub fn key(&self) -> Option<&str> {
1459 self.path.last().map(|segment| segment.as_str())
1460 }
1461
1462 pub fn path(&self) -> String {
1463 if self.path.is_empty() {
1464 "/".to_string()
1465 } else {
1466 format!("/{}/", self.path.join("/"))
1467 }
1468 }
1469}
1470
1471impl DatabaseQuery {
1472 pub fn reference(&self) -> &DatabaseReference {
1474 &self.reference
1475 }
1476
1477 pub fn order_by_child(mut self, path: &str) -> DatabaseResult<Self> {
1479 validate_order_by_child_target(path)?;
1480 let segments = normalize_path(path)?;
1481 if segments.is_empty() {
1482 return Err(invalid_argument("orderByChild path cannot be empty"));
1483 }
1484 let joined = segments.join("/");
1485 self.params.set_index(QueryIndex::Child(joined))?;
1486 Ok(self)
1487 }
1488
1489 pub fn order_by_key(mut self) -> DatabaseResult<Self> {
1491 self.params.set_index(QueryIndex::Key)?;
1492 Ok(self)
1493 }
1494
1495 pub fn order_by_value(mut self) -> DatabaseResult<Self> {
1497 self.params.set_index(QueryIndex::Value)?;
1498 Ok(self)
1499 }
1500
1501 pub fn order_by_priority(mut self) -> DatabaseResult<Self> {
1503 self.params.set_index(QueryIndex::Priority)?;
1504 Ok(self)
1505 }
1506
1507 pub fn start_at(self, value: Value) -> DatabaseResult<Self> {
1509 self.start_at_with_key(value, None)
1510 }
1511
1512 pub fn start_at_with_key(mut self, value: Value, name: Option<String>) -> DatabaseResult<Self> {
1514 let bound = QueryBound {
1515 value,
1516 name,
1517 inclusive: true,
1518 };
1519 self.params.set_start(bound)?;
1520 Ok(self)
1521 }
1522
1523 pub fn start_after(self, value: Value) -> DatabaseResult<Self> {
1525 self.start_after_with_key(value, None)
1526 }
1527
1528 pub fn start_after_with_key(
1530 mut self,
1531 value: Value,
1532 name: Option<String>,
1533 ) -> DatabaseResult<Self> {
1534 let bound = QueryBound {
1535 value,
1536 name,
1537 inclusive: false,
1538 };
1539 self.params.set_start(bound)?;
1540 Ok(self)
1541 }
1542
1543 pub fn end_at(self, value: Value) -> DatabaseResult<Self> {
1545 self.end_at_with_key(value, None)
1546 }
1547
1548 pub fn end_at_with_key(mut self, value: Value, name: Option<String>) -> DatabaseResult<Self> {
1550 let bound = QueryBound {
1551 value,
1552 name,
1553 inclusive: true,
1554 };
1555 self.params.set_end(bound)?;
1556 Ok(self)
1557 }
1558
1559 pub fn end_before(self, value: Value) -> DatabaseResult<Self> {
1561 self.end_before_with_key(value, None)
1562 }
1563
1564 pub fn end_before_with_key(
1566 mut self,
1567 value: Value,
1568 name: Option<String>,
1569 ) -> DatabaseResult<Self> {
1570 let bound = QueryBound {
1571 value,
1572 name,
1573 inclusive: false,
1574 };
1575 self.params.set_end(bound)?;
1576 Ok(self)
1577 }
1578
1579 pub fn limit_to_first(mut self, limit: u32) -> DatabaseResult<Self> {
1581 if limit == 0 {
1582 return Err(invalid_argument("limitToFirst must be greater than zero"));
1583 }
1584 self.params.set_limit(QueryLimit::First(limit))?;
1585 Ok(self)
1586 }
1587
1588 pub fn limit_to_last(mut self, limit: u32) -> DatabaseResult<Self> {
1590 if limit == 0 {
1591 return Err(invalid_argument("limitToLast must be greater than zero"));
1592 }
1593 self.params.set_limit(QueryLimit::Last(limit))?;
1594 Ok(self)
1595 }
1596
1597 pub fn equal_to(self, value: Value) -> DatabaseResult<Self> {
1599 self.equal_to_with_key(value, None)
1600 }
1601
1602 pub fn equal_to_with_key(mut self, value: Value, name: Option<String>) -> DatabaseResult<Self> {
1604 let start_bound = QueryBound {
1605 value: value.clone(),
1606 name: name.clone(),
1607 inclusive: true,
1608 };
1609 let end_bound = QueryBound {
1610 value,
1611 name,
1612 inclusive: true,
1613 };
1614 self.params.set_start(start_bound)?;
1615 self.params.set_end(end_bound)?;
1616 Ok(self)
1617 }
1618
1619 pub async fn get(&self) -> DatabaseResult<Value> {
1621 let params = self.params.to_rest_params()?;
1622 self.reference
1623 .database
1624 .inner
1625 .backend
1626 .get(&self.reference.path, params.as_slice())
1627 .await
1628 }
1629
1630 pub async fn on_value<F>(&self, callback: F) -> DatabaseResult<ListenerRegistration>
1632 where
1633 F: Fn(Result<DataSnapshot, DatabaseError>) + Send + Sync + 'static,
1634 {
1635 let user_fn: ValueListenerCallback = Arc::new(callback);
1636 self.reference
1637 .database
1638 .register_listener(
1639 ListenerTarget::Query {
1640 path: self.reference.path.clone(),
1641 params: self.params.clone(),
1642 },
1643 ListenerKind::Value(user_fn),
1644 )
1645 .await
1646 }
1647}
1648
1649pub(crate) fn normalize_path(path: &str) -> DatabaseResult<Vec<String>> {
1650 let trimmed = path.trim_matches('/');
1651 if trimmed.is_empty() {
1652 return Ok(Vec::new());
1653 }
1654 let mut segments = Vec::new();
1655 for segment in trimmed.split('/') {
1656 if segment.is_empty() {
1657 return Err(invalid_argument(
1658 "Database path cannot contain empty segments",
1659 ));
1660 }
1661 segments.push(segment.to_string());
1662 }
1663 Ok(segments)
1664}
1665
1666fn validate_order_by_child_target(path: &str) -> DatabaseResult<()> {
1667 match path {
1668 "$key" => Err(invalid_argument(
1669 "order_by_child(\"$key\") is invalid; call order_by_key() instead",
1670 )),
1671 "$priority" => Err(invalid_argument(
1672 "order_by_child(\"$priority\") is invalid; call order_by_priority() instead",
1673 )),
1674 "$value" => Err(invalid_argument(
1675 "order_by_child(\"$value\") is invalid; call order_by_value() instead",
1676 )),
1677 _ => Ok(()),
1678 }
1679}
1680
1681fn paths_related(a: &[String], b: &[String]) -> bool {
1682 is_prefix(a, b) || is_prefix(b, a)
1683}
1684
1685fn is_prefix(prefix: &[String], path: &[String]) -> bool {
1686 if prefix.len() > path.len() {
1687 return false;
1688 }
1689 prefix
1690 .iter()
1691 .zip(path.iter())
1692 .all(|(left, right)| left == right)
1693}
1694
1695fn apply_realtime_value(root: &mut Value, path: &[String], value: Value) {
1696 if path.is_empty() {
1697 *root = value;
1698 return;
1699 }
1700
1701 let mut current = root;
1702 for segment in &path[..path.len() - 1] {
1703 if !current.is_object() {
1704 *current = Value::Object(Map::new());
1705 }
1706 let obj = current.as_object_mut().expect("object ensured");
1707 current = obj
1708 .entry(segment.clone())
1709 .or_insert(Value::Object(Map::new()));
1710 }
1711
1712 if value.is_null() {
1713 if let Some(obj) = current.as_object_mut() {
1714 obj.remove(path.last().expect("path non-empty"));
1715 }
1716 } else {
1717 if !current.is_object() {
1718 *current = Value::Object(Map::new());
1719 }
1720 current
1721 .as_object_mut()
1722 .expect("object ensured")
1723 .insert(path.last().expect("path non-empty").clone(), value);
1724 }
1725}
1726
1727pub(crate) fn validate_priority_value(priority: &Value) -> DatabaseResult<()> {
1728 match priority {
1729 Value::Null | Value::Number(_) | Value::String(_) => Ok(()),
1730 _ => Err(invalid_argument(
1731 "Priority must be a string, number, or null",
1732 )),
1733 }
1734}
1735
1736pub(crate) fn pack_with_priority(value: Value, priority: Value) -> Value {
1737 let mut map = Map::with_capacity(2);
1738 map.insert(".value".to_string(), value);
1739 map.insert(".priority".to_string(), priority);
1740 Value::Object(map)
1741}
1742
1743fn extract_data_ref<'a>(value: &'a Value) -> &'a Value {
1744 value
1745 .as_object()
1746 .and_then(|obj| obj.get(".value"))
1747 .unwrap_or(value)
1748}
1749
1750fn extract_data_owned(value: &Value) -> Value {
1751 extract_data_ref(value).clone()
1752}
1753
1754fn contains_server_value(value: &Value) -> bool {
1755 match value {
1756 Value::Object(map) => {
1757 if map.contains_key(".sv") {
1758 return true;
1759 }
1760 map.values().any(contains_server_value)
1761 }
1762 Value::Array(items) => items.iter().any(contains_server_value),
1763 _ => false,
1764 }
1765}
1766
1767fn resolve_server_values(value: Value, current: Option<&Value>) -> DatabaseResult<Value> {
1768 match value {
1769 Value::Object(mut map) => {
1770 if let Some(spec) = map.remove(".sv") {
1771 return resolve_server_placeholder(spec, current.map(extract_data_ref));
1772 }
1773 let mut resolved = Map::with_capacity(map.len());
1774 for (key, child) in map.into_iter() {
1775 let child_current = current
1776 .and_then(|curr| match curr {
1777 Value::Object(obj) => obj.get(&key),
1778 Value::Array(arr) => key.parse::<usize>().ok().and_then(|idx| arr.get(idx)),
1779 _ => None,
1780 })
1781 .map(extract_data_ref);
1782 let child_resolved = resolve_server_values(child, child_current)?;
1783 resolved.insert(key, child_resolved);
1784 }
1785 Ok(Value::Object(resolved))
1786 }
1787 Value::Array(items) => {
1788 let mut resolved = Vec::with_capacity(items.len());
1789 for (index, child) in items.into_iter().enumerate() {
1790 let child_current = current
1791 .and_then(|curr| match curr {
1792 Value::Array(arr) => arr.get(index),
1793 _ => None,
1794 })
1795 .map(extract_data_ref);
1796 resolved.push(resolve_server_values(child, child_current)?);
1797 }
1798 Ok(Value::Array(resolved))
1799 }
1800 other => Ok(other),
1801 }
1802}
1803
1804fn resolve_server_placeholder(spec: Value, current: Option<&Value>) -> DatabaseResult<Value> {
1805 match spec {
1806 Value::String(token) if token == "timestamp" => {
1807 let millis = current_time_millis()?;
1808 Ok(Value::Number(Number::from(millis)))
1809 }
1810 Value::Object(mut map) => {
1811 if let Some(delta) = map.remove("increment") {
1812 let delta = delta.as_f64().ok_or_else(|| {
1813 invalid_argument("ServerValue.increment delta must be numeric")
1814 })?;
1815 let base = current
1816 .and_then(|value| match value {
1817 Value::Number(number) => number.as_f64(),
1818 _ => None,
1819 })
1820 .unwrap_or(0.0);
1821 let total = base + delta;
1822 let number = Number::from_f64(total).ok_or_else(|| {
1823 invalid_argument("ServerValue.increment produced an invalid number")
1824 })?;
1825 Ok(Value::Number(number))
1826 } else {
1827 Err(invalid_argument("Unsupported server value placeholder"))
1828 }
1829 }
1830 _ => Err(invalid_argument("Unsupported server value placeholder")),
1831 }
1832}
1833
1834fn value_at_path(root: &Value, path: &[String]) -> Value {
1835 if path.is_empty() {
1836 return extract_data_ref(root).clone();
1837 }
1838 get_value_at_path(root, path).unwrap_or(Value::Null)
1839}
1840
1841fn children_map(value: &Value) -> BTreeMap<String, Value> {
1842 let mut map = BTreeMap::new();
1843 match extract_data_ref(value) {
1844 Value::Object(obj) => {
1845 for (key, child) in obj.iter() {
1846 map.insert(key.clone(), child.clone());
1847 }
1848 }
1849 Value::Array(array) => {
1850 for (index, child) in array.iter().enumerate() {
1851 map.insert(index.to_string(), child.clone());
1852 }
1853 }
1854 _ => {}
1855 }
1856 map
1857}
1858
1859fn previous_key(keys: &[String], key: &str) -> Option<String> {
1860 let mut previous: Option<String> = None;
1861 for current in keys {
1862 if current == key {
1863 return previous;
1864 }
1865 previous = Some(current.clone());
1866 }
1867 None
1868}
1869
1870fn get_value_at_path(root: &Value, segments: &[String]) -> Option<Value> {
1871 if segments.is_empty() {
1872 return Some(extract_data_ref(root).clone());
1873 }
1874
1875 let mut current = root;
1876 for segment in segments {
1877 match current {
1878 Value::Object(map) => {
1879 if let Some(child) = map.get(segment) {
1880 current = child;
1881 continue;
1882 }
1883
1884 if let Some(value_field) = map.get(".value") {
1885 current = match value_field {
1886 Value::Object(obj) => obj.get(segment)?,
1887 Value::Array(items) => {
1888 let index = segment.parse::<usize>().ok()?;
1889 items.get(index)?
1890 }
1891 _ => return None,
1892 };
1893 continue;
1894 }
1895
1896 return None;
1897 }
1898 Value::Array(array) => {
1899 let index = segment.parse::<usize>().ok()?;
1900 current = array.get(index)?;
1901 }
1902 _ => return None,
1903 }
1904 }
1905
1906 Some(current.clone())
1907}
1908
1909fn current_time_millis() -> DatabaseResult<u64> {
1910 let duration = SystemTime::now()
1911 .duration_since(UNIX_EPOCH)
1912 .map_err(|_| internal_error("System time is before the Unix epoch"))?;
1913 let millis = duration.as_millis();
1914 millis
1915 .try_into()
1916 .map_err(|_| internal_error("Timestamp exceeds 64-bit range"))
1917}
1918
1919static DATABASE_COMPONENT: LazyLock<()> = LazyLock::new(|| {
1920 let component = Component::new(
1921 DATABASE_COMPONENT_NAME,
1922 Arc::new(database_factory),
1923 ComponentType::Public,
1924 )
1925 .with_instantiation_mode(InstantiationMode::Lazy);
1926 let _ = app::register_component(component);
1927});
1928
1929fn database_factory(
1930 container: &crate::component::ComponentContainer,
1931 _options: InstanceFactoryOptions,
1932) -> Result<DynService, ComponentError> {
1933 let app = container.root_service::<FirebaseApp>().ok_or_else(|| {
1934 ComponentError::InitializationFailed {
1935 name: DATABASE_COMPONENT_NAME.to_string(),
1936 reason: "Firebase app not attached to component container".to_string(),
1937 }
1938 })?;
1939
1940 let database = Database::new((*app).clone());
1941 Ok(Arc::new(database) as DynService)
1942}
1943
1944fn ensure_registered() {
1945 LazyLock::force(&DATABASE_COMPONENT);
1946}
1947
1948pub fn register_database_component() {
1949 ensure_registered();
1950}
1951
1952pub async fn get_database(app: Option<FirebaseApp>) -> DatabaseResult<Arc<Database>> {
1953 ensure_registered();
1954 let app = match app {
1955 Some(app) => app,
1956 None => {
1957 #[cfg(all(feature = "wasm-web", target_arch = "wasm32"))]
1958 {
1959 return Err(internal_error(
1960 "get_database(None) is not supported on wasm; pass a FirebaseApp",
1961 ));
1962 }
1963 #[cfg(not(all(feature = "wasm-web", target_arch = "wasm32")))]
1964 {
1965 crate::app::get_app(None)
1966 .await
1967 .map_err(|err| internal_error(err.to_string()))?
1968 }
1969 }
1970 };
1971
1972 let provider = app::get_provider(&app, DATABASE_COMPONENT_NAME);
1973 if let Some(database) = provider.get_immediate::<Database>() {
1974 return Ok(database);
1975 }
1976
1977 match provider.initialize::<Database>(Value::Null, None) {
1978 Ok(service) => Ok(service),
1979 Err(crate::component::types::ComponentError::InstanceUnavailable { .. }) => provider
1980 .get_immediate::<Database>()
1981 .ok_or_else(|| internal_error("Database component not available")),
1982 Err(err) => Err(internal_error(err.to_string())),
1983 }
1984}
1985
1986#[cfg(all(test, not(target_arch = "wasm32")))]
1987mod tests {
1988 use super::*;
1989 use crate::app::initialize_app;
1990 use crate::app::{FirebaseAppSettings, FirebaseOptions};
1991 use crate::database::{
1992 equal_to_with_key, increment, limit_to_first, limit_to_last, order_by_child, order_by_key,
1993 query as compose_query, server_timestamp, start_at,
1994 };
1995 use httpmock::prelude::*;
1996 use httpmock::Method::{DELETE, GET, PATCH, PUT};
1997 use serde_json::{json, Value};
1998 use std::sync::{Arc, Mutex};
1999 fn unique_settings() -> FirebaseAppSettings {
2000 use std::sync::atomic::{AtomicUsize, Ordering};
2001 static COUNTER: AtomicUsize = AtomicUsize::new(0);
2002 FirebaseAppSettings {
2003 name: Some(format!(
2004 "database-{}",
2005 COUNTER.fetch_add(1, Ordering::SeqCst)
2006 )),
2007 ..Default::default()
2008 }
2009 }
2010
2011 #[tokio::test(flavor = "multi_thread")]
2012 async fn set_and_get_value() {
2013 let options = FirebaseOptions {
2014 project_id: Some("project".into()),
2015 ..Default::default()
2016 };
2017 let app = initialize_app(options, Some(unique_settings()))
2018 .await
2019 .unwrap();
2020 let database = get_database(Some(app)).await.unwrap();
2021 let ref_root = database.reference("/messages").unwrap();
2022 ref_root
2023 .set(json!({ "greeting": "hello" }))
2024 .await
2025 .expect("set");
2026 let value = ref_root.get().await.unwrap();
2027 assert_eq!(value, json!({ "greeting": "hello" }));
2028 }
2029
2030 #[tokio::test(flavor = "multi_thread")]
2031 async fn push_generates_monotonic_keys() {
2032 let options = FirebaseOptions {
2033 project_id: Some("project".into()),
2034 ..Default::default()
2035 };
2036 let app = initialize_app(options, Some(unique_settings()))
2037 .await
2038 .unwrap();
2039 let database = get_database(Some(app)).await.unwrap();
2040 let queue = database.reference("queue").unwrap();
2041
2042 let mut keys = Vec::new();
2043 for _ in 0..5 {
2044 let key = queue.push().await.unwrap().key().unwrap().to_string();
2045 keys.push(key);
2046 }
2047
2048 let mut sorted = keys.clone();
2049 sorted.sort();
2050 assert_eq!(keys, sorted);
2051 }
2052
2053 #[tokio::test(flavor = "multi_thread")]
2054 async fn push_with_value_persists_data() {
2055 let options = FirebaseOptions {
2056 project_id: Some("project".into()),
2057 ..Default::default()
2058 };
2059 let app = initialize_app(options, Some(unique_settings()))
2060 .await
2061 .unwrap();
2062 let database = get_database(Some(app)).await.unwrap();
2063 let messages = database.reference("messages").unwrap();
2064
2065 let payload = json!({ "text": "hello" });
2066 let child = messages
2067 .push_with_value(payload.clone())
2068 .await
2069 .expect("push with value");
2070
2071 let stored = child.get().await.unwrap();
2072 assert_eq!(stored, payload);
2073
2074 let parent = messages.get().await.unwrap();
2075 let key = child.key().unwrap();
2076 assert_eq!(parent.get(key), Some(&payload));
2077 }
2078
2079 #[tokio::test(flavor = "multi_thread")]
2080 async fn child_updates_merge() {
2081 let options = FirebaseOptions {
2082 project_id: Some("project".into()),
2083 ..Default::default()
2084 };
2085 let app = initialize_app(options, Some(unique_settings()))
2086 .await
2087 .unwrap();
2088 let database = get_database(Some(app)).await.unwrap();
2089 let root = database.reference("items").unwrap();
2090 root.set(json!({ "a": { "count": 1 } })).await.unwrap();
2091 root.child("a/count").unwrap().set(json!(2)).await.unwrap();
2092 let value = root.get().await.unwrap();
2093 assert_eq!(value, json!({ "a": { "count": 2 } }));
2094 }
2095
2096 #[tokio::test(flavor = "multi_thread")]
2097 async fn set_with_priority_wraps_value() {
2098 let options = FirebaseOptions {
2099 project_id: Some("project".into()),
2100 ..Default::default()
2101 };
2102 let app = initialize_app(options, Some(unique_settings()))
2103 .await
2104 .unwrap();
2105 let database = get_database(Some(app)).await.unwrap();
2106 let item = database.reference("items/main").unwrap();
2107
2108 item.set_with_priority(json!({ "count": 1 }), json!(5))
2109 .await
2110 .unwrap();
2111
2112 let stored = item.get().await.unwrap();
2113 assert_eq!(
2114 stored,
2115 json!({
2116 ".value": { "count": 1 },
2117 ".priority": 5
2118 })
2119 );
2120 }
2121
2122 #[tokio::test(flavor = "multi_thread")]
2123 async fn set_priority_updates_existing_value() {
2124 let options = FirebaseOptions {
2125 project_id: Some("project".into()),
2126 ..Default::default()
2127 };
2128 let app = initialize_app(options, Some(unique_settings()))
2129 .await
2130 .unwrap();
2131 let database = get_database(Some(app)).await.unwrap();
2132 let item = database.reference("items/main").unwrap();
2133
2134 item.set(json!({ "count": 4 })).await.unwrap();
2135 item.set_priority(json!(10)).await.unwrap();
2136
2137 let stored = item.get().await.unwrap();
2138 assert_eq!(
2139 stored,
2140 json!({
2141 ".value": { "count": 4 },
2142 ".priority": 10
2143 })
2144 );
2145 }
2146
2147 #[tokio::test(flavor = "multi_thread")]
2148 async fn server_timestamp_is_resolved_on_set() {
2149 let options = FirebaseOptions {
2150 project_id: Some("project".into()),
2151 ..Default::default()
2152 };
2153 let app = initialize_app(options, Some(unique_settings()))
2154 .await
2155 .unwrap();
2156 let database = get_database(Some(app)).await.unwrap();
2157 let created_at = database.reference("meta/created_at").unwrap();
2158
2159 created_at.set(server_timestamp()).await.unwrap();
2160
2161 let value = created_at.get().await.unwrap();
2162 let ts = value.as_u64().expect("timestamp as u64");
2163 let now = SystemTime::now()
2164 .duration_since(std::time::UNIX_EPOCH)
2165 .unwrap()
2166 .as_millis() as u64;
2167 assert!(now >= ts);
2168 assert!(now - ts < 5_000);
2169 }
2170
2171 #[tokio::test(flavor = "multi_thread")]
2172 async fn server_increment_updates_value() {
2173 let options = FirebaseOptions {
2174 project_id: Some("project".into()),
2175 ..Default::default()
2176 };
2177 let app = initialize_app(options, Some(unique_settings()))
2178 .await
2179 .unwrap();
2180 let database = get_database(Some(app)).await.unwrap();
2181 let counter = database.reference("counters/main").unwrap();
2182
2183 counter.set(json!(1)).await.unwrap();
2184 counter.set(increment(2.0)).await.unwrap();
2185
2186 let value = counter.get().await.unwrap();
2187 assert_eq!(value.as_f64().unwrap(), 3.0);
2188 }
2189
2190 #[tokio::test(flavor = "multi_thread")]
2191 async fn update_supports_server_increment() {
2192 let options = FirebaseOptions {
2193 project_id: Some("project".into()),
2194 ..Default::default()
2195 };
2196 let app = initialize_app(options, Some(unique_settings()))
2197 .await
2198 .unwrap();
2199 let database = get_database(Some(app)).await.unwrap();
2200 let scores = database.reference("scores").unwrap();
2201
2202 scores.set(json!({ "alice": 4 })).await.unwrap();
2203 let mut delta = serde_json::Map::new();
2204 delta.insert("alice".to_string(), increment(3.0));
2205 scores.update(delta).await.unwrap();
2206
2207 let stored = scores.get().await.unwrap();
2208 assert_eq!(stored.get("alice").unwrap().as_f64().unwrap(), 7.0);
2209 }
2210
2211 #[tokio::test(flavor = "multi_thread")]
2212 async fn update_rejects_empty_key() {
2213 let options = FirebaseOptions {
2214 project_id: Some("project".into()),
2215 ..Default::default()
2216 };
2217 let app = initialize_app(options, Some(unique_settings()))
2218 .await
2219 .unwrap();
2220 let database = get_database(Some(app)).await.unwrap();
2221 let reference = database.reference("items").unwrap();
2222
2223 let mut updates = serde_json::Map::new();
2224 updates.insert("".to_string(), json!(1));
2225
2226 let err = reference.update(updates).await.unwrap_err();
2227 assert_eq!(err.code_str(), "database/invalid-argument");
2228 }
2229
2230 #[tokio::test(flavor = "multi_thread")]
2231 async fn run_transaction_commits_update() {
2232 let options = FirebaseOptions {
2233 project_id: Some("project".into()),
2234 ..Default::default()
2235 };
2236 let app = initialize_app(options, Some(unique_settings()))
2237 .await
2238 .unwrap();
2239 let database = get_database(Some(app)).await.unwrap();
2240 let counter = database.reference("counters/main").unwrap();
2241 counter.set(json!(1)).await.unwrap();
2242
2243 let result = counter
2244 .run_transaction(|current| {
2245 let next = current.as_i64().unwrap_or(0) + 1;
2246 Some(json!(next))
2247 })
2248 .await
2249 .unwrap();
2250
2251 assert!(result.committed);
2252 assert_eq!(result.snapshot.into_value(), json!(2));
2253 let stored = counter.get().await.unwrap();
2254 assert_eq!(stored, json!(2));
2255 }
2256
2257 #[tokio::test(flavor = "multi_thread")]
2258 async fn run_transaction_abort_preserves_value() {
2259 let options = FirebaseOptions {
2260 project_id: Some("project".into()),
2261 ..Default::default()
2262 };
2263 let app = initialize_app(options, Some(unique_settings()))
2264 .await
2265 .unwrap();
2266 let database = get_database(Some(app)).await.unwrap();
2267 let flag = database.reference("flags/feature").unwrap();
2268 flag.set(json!(true)).await.unwrap();
2269
2270 let result = flag
2271 .run_transaction(|current| {
2272 if current == Value::Bool(true) {
2273 None
2274 } else {
2275 Some(Value::Bool(true))
2276 }
2277 })
2278 .await
2279 .unwrap();
2280
2281 assert!(!result.committed);
2282 assert_eq!(result.snapshot.into_value(), json!(true));
2283 let stored = flag.get().await.unwrap();
2284 assert_eq!(stored, json!(true));
2285 }
2286
2287 #[tokio::test(flavor = "multi_thread")]
2288 async fn rest_backend_performs_http_requests() {
2289 let server = MockServer::start();
2290
2291 let set_mock = server.mock(|when, then| {
2292 when.method(PUT)
2293 .path("/messages.json")
2294 .query_param("print", "silent")
2295 .json_body(json!({ "greeting": "hello" }));
2296 then.status(200)
2297 .header("content-type", "application/json")
2298 .body("null");
2299 });
2300
2301 let get_mock = server.mock(|when, then| {
2302 when.method(GET)
2303 .path("/messages.json")
2304 .query_param("format", "export");
2305 then.status(200)
2306 .header("content-type", "application/json")
2307 .body(r#"{"greeting":"hello"}"#);
2308 });
2309
2310 let options = FirebaseOptions {
2311 project_id: Some("project".into()),
2312 database_url: Some(server.url("/")),
2313 ..Default::default()
2314 };
2315 let app = initialize_app(options, Some(unique_settings()))
2316 .await
2317 .unwrap();
2318 let database = get_database(Some(app)).await.unwrap();
2319 let reference = database.reference("/messages").unwrap();
2320
2321 reference
2322 .set(json!({ "greeting": "hello" }))
2323 .await
2324 .expect("set over REST");
2325 database.clear_root_cache_for_test();
2326 let value = reference.get().await.expect("get over REST");
2327
2328 assert_eq!(value, json!({ "greeting": "hello" }));
2329 set_mock.assert();
2330 get_mock.assert();
2331 }
2332
2333 #[tokio::test(flavor = "multi_thread")]
2334 async fn reference_parent_and_root() {
2335 let options = FirebaseOptions {
2336 project_id: Some("project".into()),
2337 ..Default::default()
2338 };
2339 let app = initialize_app(options, Some(unique_settings()))
2340 .await
2341 .unwrap();
2342 let database = get_database(Some(app)).await.unwrap();
2343
2344 let nested = database.reference("users/alice/profile").unwrap();
2345 let parent = nested.parent().expect("parent reference");
2346 assert_eq!(parent.path(), "/users/alice/");
2347 assert_eq!(parent.parent().unwrap().path(), "/users/");
2348
2349 let root = nested.root();
2350 assert_eq!(root.path(), "/");
2351 assert!(root.parent().is_none());
2352 }
2353
2354 #[tokio::test(flavor = "multi_thread")]
2355 async fn realtime_set_updates_cache_and_listeners() {
2356 let options = FirebaseOptions {
2357 project_id: Some("project".into()),
2358 ..Default::default()
2359 };
2360 let app = initialize_app(options, Some(unique_settings()))
2361 .await
2362 .unwrap();
2363 let database = get_database(Some(app)).await.unwrap();
2364
2365 let reference = database.reference("items/foo").unwrap();
2366 let events: Arc<Mutex<Vec<Value>>> = Arc::new(Mutex::new(Vec::new()));
2367 let events_clone = events.clone();
2368 let _registration = reference
2369 .on_value(move |result| {
2370 if let Ok(snapshot) = result {
2371 events_clone.lock().unwrap().push(snapshot.value().clone());
2372 }
2373 })
2374 .await
2375 .unwrap();
2376
2377 database
2378 .handle_realtime_action(
2379 "d",
2380 &json!({
2381 "p": "items/foo",
2382 "d": { "count": 5 }
2383 }),
2384 )
2385 .await
2386 .unwrap();
2387
2388 let values = events.lock().unwrap();
2389 assert_eq!(values.last().unwrap(), &json!({ "count": 5 }));
2390 drop(values);
2391
2392 assert_eq!(reference.get().await.unwrap(), json!({ "count": 5 }));
2393 }
2394
2395 #[tokio::test(flavor = "multi_thread")]
2396 async fn realtime_merge_updates_cache() {
2397 let options = FirebaseOptions {
2398 project_id: Some("project".into()),
2399 ..Default::default()
2400 };
2401 let app = initialize_app(options, Some(unique_settings()))
2402 .await
2403 .unwrap();
2404 let database = get_database(Some(app)).await.unwrap();
2405
2406 database
2407 .handle_realtime_action(
2408 "d",
2409 &json!({
2410 "p": "items",
2411 "d": {"foo": {"count": 1}}
2412 }),
2413 )
2414 .await
2415 .unwrap();
2416
2417 database
2418 .handle_realtime_action(
2419 "m",
2420 &json!({
2421 "p": "items",
2422 "d": {
2423 "foo/count": 2,
2424 "bar": {"value": true}
2425 }
2426 }),
2427 )
2428 .await
2429 .unwrap();
2430
2431 let foo = database
2432 .reference("items/foo")
2433 .unwrap()
2434 .get()
2435 .await
2436 .unwrap();
2437 assert_eq!(foo, json!({"count": 2}));
2438
2439 let bar = database
2440 .reference("items/bar")
2441 .unwrap()
2442 .get()
2443 .await
2444 .unwrap();
2445 assert_eq!(bar, json!({"value": true}));
2446 }
2447
2448 #[tokio::test(flavor = "multi_thread")]
2449 async fn datasnapshot_child_and_metadata_helpers() {
2450 let options = FirebaseOptions {
2451 project_id: Some("project".into()),
2452 ..Default::default()
2453 };
2454 let app = initialize_app(options, Some(unique_settings()))
2455 .await
2456 .unwrap();
2457 let database = get_database(Some(app)).await.unwrap();
2458 let profiles = database.reference("profiles").unwrap();
2459
2460 profiles
2461 .set(json!({
2462 "alice": { "age": 31, "city": "Rome" },
2463 "bob": { "age": 29 }
2464 }))
2465 .await
2466 .unwrap();
2467
2468 let captured = Arc::new(Mutex::new(None));
2469 let holder = captured.clone();
2470 profiles
2471 .on_value(move |result| {
2472 if let Ok(snapshot) = result {
2473 *holder.lock().unwrap() = Some(snapshot);
2474 }
2475 })
2476 .await
2477 .unwrap();
2478
2479 let snapshot = captured.lock().unwrap().clone().expect("initial snapshot");
2480 assert!(snapshot.exists());
2481 assert!(snapshot.has_children());
2482 assert_eq!(snapshot.size(), 2);
2483
2484 let alice = snapshot.child("alice").unwrap();
2485 assert_eq!(alice.key(), Some("alice"));
2486 assert_eq!(alice.size(), 2);
2487 assert!(alice.has_children());
2488 assert_eq!(alice.child("age").unwrap().value(), &json!(31));
2489 assert!(snapshot.has_child("bob").unwrap());
2490 assert!(!snapshot.has_child("carol").unwrap());
2491
2492 let json_output = snapshot.to_json();
2493 assert_eq!(
2494 json_output,
2495 json!({
2496 "alice": { "age": 31, "city": "Rome" },
2497 "bob": { "age": 29 }
2498 })
2499 );
2500 }
2501
2502 #[tokio::test(flavor = "multi_thread")]
2503 async fn child_event_listeners_receive_updates() {
2504 let options = FirebaseOptions {
2505 project_id: Some("project".into()),
2506 ..Default::default()
2507 };
2508 let app = initialize_app(options, Some(unique_settings()))
2509 .await
2510 .unwrap();
2511 let database = get_database(Some(app)).await.unwrap();
2512 let items = database.reference("items").unwrap();
2513
2514 items
2515 .set(json!({
2516 "a": { "count": 1 },
2517 "b": { "count": 2 }
2518 }))
2519 .await
2520 .unwrap();
2521
2522 let added_events = Arc::new(Mutex::new(Vec::<(String, Option<String>)>::new()));
2523 let capture = added_events.clone();
2524 let registration = items
2525 .on_child_added(move |result| {
2526 if let Ok(event) = result {
2527 capture.lock().unwrap().push((
2528 event.snapshot.key().unwrap().to_string(),
2529 event.previous_name.clone(),
2530 ));
2531 }
2532 })
2533 .await
2534 .unwrap();
2535
2536 {
2537 let events = added_events.lock().unwrap();
2538 assert_eq!(events.len(), 2);
2539 assert_eq!(events[0].0, "a");
2540 assert_eq!(events[1].0, "b");
2541 }
2542
2543 items
2544 .child("c")
2545 .unwrap()
2546 .set(json!({ "count": 3 }))
2547 .await
2548 .unwrap();
2549
2550 {
2551 let events = added_events.lock().unwrap();
2552 assert_eq!(events.len(), 3);
2553 assert_eq!(events[2].0, "c");
2554 }
2555
2556 registration.detach();
2557 }
2558
2559 #[tokio::test(flavor = "multi_thread")]
2560 async fn rest_backend_set_with_priority_includes_metadata() {
2561 let server = MockServer::start();
2562
2563 let put_mock = server.mock(|when, then| {
2564 when.method(PUT)
2565 .path("/items.json")
2566 .query_param("print", "silent")
2567 .json_body(json!({
2568 ".value": { "count": 1 },
2569 ".priority": 3
2570 }));
2571 then.status(200)
2572 .header("content-type", "application/json")
2573 .body("null");
2574 });
2575
2576 let options = FirebaseOptions {
2577 project_id: Some("project".into()),
2578 database_url: Some(server.url("/")),
2579 ..Default::default()
2580 };
2581 let app = initialize_app(options, Some(unique_settings()))
2582 .await
2583 .unwrap();
2584 let database = get_database(Some(app)).await.unwrap();
2585 let reference = database.reference("items").unwrap();
2586
2587 reference
2588 .set_with_priority(json!({ "count": 1 }), json!(3))
2589 .await
2590 .unwrap();
2591
2592 put_mock.assert();
2593 }
2594
2595 #[tokio::test(flavor = "multi_thread")]
2596 async fn push_with_value_rest_backend_performs_put() {
2597 let server = MockServer::start();
2598
2599 let push_mock = server.mock(|when, then| {
2600 when.method(PUT)
2601 .path_contains("/messages/")
2602 .query_param("print", "silent")
2603 .json_body(json!({ "text": "hello" }));
2604 then.status(200)
2605 .header("content-type", "application/json")
2606 .body("null");
2607 });
2608
2609 let options = FirebaseOptions {
2610 project_id: Some("project".into()),
2611 database_url: Some(server.url("/")),
2612 ..Default::default()
2613 };
2614 let app = initialize_app(options, Some(unique_settings()))
2615 .await
2616 .unwrap();
2617 let database = get_database(Some(app)).await.unwrap();
2618 let messages = database.reference("messages").unwrap();
2619
2620 let child = messages
2621 .push_with_value(json!({ "text": "hello" }))
2622 .await
2623 .expect("push with value rest");
2624
2625 assert_eq!(child.key().unwrap().len(), 20);
2626 push_mock.assert();
2627 }
2628
2629 #[tokio::test(flavor = "multi_thread")]
2630 async fn rest_backend_uses_patch_for_updates() {
2631 let server = MockServer::start();
2632
2633 let patch_mock = server.mock(|when, then| {
2634 when.method(PATCH)
2635 .path("/items.json")
2636 .query_param("print", "silent")
2637 .json_body(json!({ "a/count": 2, "b": { "flag": true } }));
2638 then.status(200)
2639 .header("content-type", "application/json")
2640 .body("null");
2641 });
2642
2643 let options = FirebaseOptions {
2644 project_id: Some("project".into()),
2645 database_url: Some(server.url("/")),
2646 ..Default::default()
2647 };
2648 let app = initialize_app(options, Some(unique_settings()))
2649 .await
2650 .unwrap();
2651 let database = get_database(Some(app)).await.unwrap();
2652 let reference = database.reference("items").unwrap();
2653
2654 let mut updates = serde_json::Map::new();
2655 updates.insert("a/count".to_string(), json!(2));
2656 updates.insert("b".to_string(), json!({ "flag": true }));
2657 reference.update(updates).await.expect("patch update");
2658
2659 patch_mock.assert();
2660 }
2661
2662 #[tokio::test(flavor = "multi_thread")]
2663 async fn rest_backend_delete_supports_remove() {
2664 let server = MockServer::start();
2665
2666 let delete_mock = server.mock(|when, then| {
2667 when.method(DELETE)
2668 .path("/items.json")
2669 .query_param("print", "silent");
2670 then.status(200).body("null");
2671 });
2672
2673 let options = FirebaseOptions {
2674 project_id: Some("project".into()),
2675 database_url: Some(server.url("/")),
2676 ..Default::default()
2677 };
2678 let app = initialize_app(options, Some(unique_settings()))
2679 .await
2680 .unwrap();
2681 let database = get_database(Some(app)).await.unwrap();
2682 let reference = database.reference("items").unwrap();
2683
2684 reference.remove().await.expect("delete request");
2685 delete_mock.assert();
2686 }
2687
2688 #[tokio::test(flavor = "multi_thread")]
2689 async fn rest_backend_preserves_namespace_query_parameter() {
2690 let server = MockServer::start();
2691
2692 let set_mock = server.mock(|when, then| {
2693 when.method(PUT)
2694 .path("/messages.json")
2695 .query_param("ns", "demo-ns")
2696 .query_param("print", "silent")
2697 .json_body(json!({ "value": 1 }));
2698 then.status(200).body("null");
2699 });
2700
2701 let options = FirebaseOptions {
2702 project_id: Some("project".into()),
2703 database_url: Some(format!("{}?ns=demo-ns", server.url("/"))),
2704 ..Default::default()
2705 };
2706 let app = initialize_app(options, Some(unique_settings()))
2707 .await
2708 .unwrap();
2709 let database = get_database(Some(app)).await.unwrap();
2710 let reference = database.reference("messages").unwrap();
2711
2712 reference.set(json!({ "value": 1 })).await.unwrap();
2713 set_mock.assert();
2714 }
2715
2716 #[tokio::test(flavor = "multi_thread")]
2717 async fn rest_query_order_by_child_and_limit() {
2718 let server = MockServer::start();
2719
2720 let get_mock = server.mock(|when, then| {
2721 when.method(GET)
2722 .path("/items.json")
2723 .query_param("orderBy", "\"score\"")
2724 .query_param("startAt", "100")
2725 .query_param("limitToFirst", "5")
2726 .query_param("format", "export");
2727 then.status(200)
2728 .header("content-type", "application/json")
2729 .body(r#"{"a":{"score":120}}"#);
2730 });
2731
2732 let options = FirebaseOptions {
2733 project_id: Some("project".into()),
2734 database_url: Some(server.url("/")),
2735 ..Default::default()
2736 };
2737 let app = initialize_app(options, Some(unique_settings()))
2738 .await
2739 .unwrap();
2740 let database = get_database(Some(app)).await.unwrap();
2741 let reference = database.reference("items").unwrap();
2742 let filtered = compose_query(
2743 reference,
2744 vec![order_by_child("score"), start_at(100), limit_to_first(5)],
2745 )
2746 .expect("compose query with constraints");
2747
2748 let value = filtered.get().await.unwrap();
2749 assert_eq!(value, json!({ "a": { "score": 120 } }));
2750 get_mock.assert();
2751 }
2752
2753 #[tokio::test(flavor = "multi_thread")]
2754 async fn rest_query_equal_to_with_key() {
2755 let server = MockServer::start();
2756
2757 let get_mock = server.mock(|when, then| {
2758 when.method(GET)
2759 .path("/items.json")
2760 .query_param("orderBy", "\"$key\"")
2761 .query_param("startAt", "\"item-1\",\"item-1\"")
2762 .query_param("endAt", "\"item-1\",\"item-1\"")
2763 .query_param("format", "export");
2764 then.status(200)
2765 .header("content-type", "application/json")
2766 .body(r#"{"item-1":{"value":true}}"#);
2767 });
2768
2769 let options = FirebaseOptions {
2770 project_id: Some("project".into()),
2771 database_url: Some(server.url("/")),
2772 ..Default::default()
2773 };
2774 let app = initialize_app(options, Some(unique_settings()))
2775 .await
2776 .unwrap();
2777 let database = get_database(Some(app)).await.unwrap();
2778 let filtered = compose_query(
2779 database.reference("items").unwrap(),
2780 vec![order_by_key(), equal_to_with_key("item-1", "item-1")],
2781 )
2782 .expect("compose equal_to query");
2783
2784 let value = filtered.get().await.unwrap();
2785 assert_eq!(value, json!({ "item-1": { "value": true } }));
2786 get_mock.assert();
2787 }
2788
2789 #[tokio::test(flavor = "multi_thread")]
2790 async fn limit_to_first_rejects_zero() {
2791 let options = FirebaseOptions {
2792 project_id: Some("project".into()),
2793 ..Default::default()
2794 };
2795 let app = initialize_app(options, Some(unique_settings()))
2796 .await
2797 .unwrap();
2798 let database = get_database(Some(app)).await.unwrap();
2799
2800 let err = database
2801 .reference("items")
2802 .unwrap()
2803 .query()
2804 .limit_to_first(0)
2805 .unwrap_err();
2806
2807 assert_eq!(err.code_str(), "database/invalid-argument");
2808 }
2809
2810 #[tokio::test(flavor = "multi_thread")]
2811 async fn order_by_child_rejects_empty_path() {
2812 let options = FirebaseOptions {
2813 project_id: Some("project".into()),
2814 ..Default::default()
2815 };
2816 let app = initialize_app(options, Some(unique_settings()))
2817 .await
2818 .unwrap();
2819 let database = get_database(Some(app)).await.unwrap();
2820
2821 let err = database
2822 .reference("items")
2823 .unwrap()
2824 .order_by_child("")
2825 .unwrap_err();
2826
2827 assert_eq!(err.code_str(), "database/invalid-argument");
2828 }
2829
2830 #[tokio::test(flavor = "multi_thread")]
2831 async fn query_rejects_multiple_order_by_constraints() {
2832 let options = FirebaseOptions {
2833 project_id: Some("project".into()),
2834 ..Default::default()
2835 };
2836 let app = initialize_app(options, Some(unique_settings()))
2837 .await
2838 .unwrap();
2839 let database = get_database(Some(app)).await.unwrap();
2840 let reference = database.reference("items").unwrap();
2841
2842 let err =
2843 compose_query(reference, vec![order_by_key(), order_by_child("score")]).unwrap_err();
2844
2845 assert_eq!(err.code_str(), "database/invalid-argument");
2846 }
2847
2848 #[tokio::test(flavor = "multi_thread")]
2849 async fn on_value_listener_receives_updates() {
2850 let options = FirebaseOptions {
2851 project_id: Some("project".into()),
2852 ..Default::default()
2853 };
2854 let app = initialize_app(options, Some(unique_settings()))
2855 .await
2856 .unwrap();
2857 let database = get_database(Some(app)).await.unwrap();
2858 let reference = database.reference("counters/main").unwrap();
2859
2860 let events = Arc::new(Mutex::new(Vec::<Value>::new()));
2861 let captured = events.clone();
2862
2863 let registration = reference
2864 .on_value(move |result| {
2865 if let Ok(snapshot) = result {
2866 captured.lock().unwrap().push(snapshot.value().clone());
2867 }
2868 })
2869 .await
2870 .expect("on_value registration");
2871
2872 reference.set(json!(1)).await.unwrap();
2873 reference.set(json!(2)).await.unwrap();
2874
2875 {
2876 let events = events.lock().unwrap();
2877 assert_eq!(events.len(), 3);
2878 assert_eq!(events[0], Value::Null);
2879 assert_eq!(events[1], json!(1));
2880 assert_eq!(events[2], json!(2));
2881 }
2882
2883 registration.detach();
2884 reference.set(json!(3)).await.unwrap();
2885
2886 let events = events.lock().unwrap();
2887 assert_eq!(events.len(), 3);
2888 }
2889
2890 #[tokio::test(flavor = "multi_thread")]
2891 async fn query_on_value_reacts_to_changes() {
2892 let options = FirebaseOptions {
2893 project_id: Some("project".into()),
2894 ..Default::default()
2895 };
2896 let app = initialize_app(options, Some(unique_settings()))
2897 .await
2898 .unwrap();
2899 let database = get_database(Some(app)).await.unwrap();
2900 let scores = database.reference("scores").unwrap();
2901
2902 scores
2903 .set(json!({
2904 "a": { "score": 10 },
2905 "b": { "score": 20 },
2906 "c": { "score": 30 }
2907 }))
2908 .await
2909 .unwrap();
2910
2911 let events = Arc::new(Mutex::new(Vec::<Value>::new()));
2912 let captured = events.clone();
2913
2914 let _registration = compose_query(
2915 scores.clone(),
2916 vec![order_by_child("score"), limit_to_last(1)],
2917 )
2918 .unwrap()
2919 .on_value(move |result| {
2920 if let Ok(snapshot) = result {
2921 captured.lock().unwrap().push(snapshot.value().clone());
2922 }
2923 })
2924 .await
2925 .unwrap();
2926
2927 {
2928 let events = events.lock().unwrap();
2929 assert_eq!(events.len(), 1);
2930 assert_eq!(
2931 events[0],
2932 json!({
2933 "a": { "score": 10 },
2934 "b": { "score": 20 },
2935 "c": { "score": 30 }
2936 })
2937 );
2938 }
2939
2940 scores
2941 .child("d")
2942 .unwrap()
2943 .set(json!({ "score": 50 }))
2944 .await
2945 .unwrap();
2946
2947 let events = events.lock().unwrap();
2948 assert_eq!(events.len(), 2);
2949 assert_eq!(
2950 events[1],
2951 json!({
2952 "a": { "score": 10 },
2953 "b": { "score": 20 },
2954 "c": { "score": 30 },
2955 "d": { "score": 50 }
2956 })
2957 );
2958 }
2959}