firebase_rs_sdk/database/
api.rs

1use std::collections::{BTreeMap, HashMap};
2use std::convert::TryInto;
3use std::fmt;
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::{Arc, LazyLock, Mutex};
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use serde_json::{Map, Number, Value};
9
10use crate::app;
11use crate::app::FirebaseApp;
12use crate::component::types::{
13    ComponentError, DynService, InstanceFactoryOptions, InstantiationMode,
14};
15use crate::component::{Component, ComponentType};
16use crate::database::backend::{select_backend, DatabaseBackend};
17use crate::database::constants::DATABASE_COMPONENT_NAME;
18use crate::database::error::{internal_error, invalid_argument, DatabaseError, DatabaseResult};
19use crate::database::on_disconnect::OnDisconnect;
20use crate::database::push_id::next_push_id;
21use crate::database::query::{QueryBound, QueryIndex, QueryLimit, QueryParams};
22use crate::database::realtime::{ListenSpec, Repo};
23use crate::logger::Logger;
24use crate::platform::runtime;
25
26static REALTIME_LOGGER: LazyLock<Logger> =
27    LazyLock::new(|| Logger::new("@firebase/database/realtime"));
28
29#[derive(Clone, Debug)]
30pub struct Database {
31    inner: Arc<DatabaseInner>,
32}
33
34struct DatabaseInner {
35    app: FirebaseApp,
36    backend: Arc<dyn DatabaseBackend>,
37    repo: Arc<Repo>,
38    listeners: Mutex<HashMap<u64, Listener>>,
39    next_listener_id: AtomicU64,
40    root_cache: Mutex<Option<Value>>,
41}
42
43impl fmt::Debug for DatabaseInner {
44    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
45        f.debug_struct("DatabaseInner")
46            .field("app", &self.app.name())
47            .field("backend", &"dynamic")
48            .field("repo", &"realtime")
49            .finish()
50    }
51}
52
53#[derive(Clone, Debug)]
54pub struct DatabaseReference {
55    database: Database,
56    path: Vec<String>,
57}
58
59/// Represents a composable database query, analogous to the JS `QueryImpl`
60/// (`packages/database/src/api/Reference_impl.ts`).
61#[derive(Clone, Debug)]
62pub struct DatabaseQuery {
63    reference: DatabaseReference,
64    params: QueryParams,
65}
66
67/// Represents a single constraint produced by helpers such as `order_by_child()`
68/// (`packages/database/src/api/Reference_impl.ts`).
69#[derive(Clone, Debug)]
70pub struct QueryConstraint {
71    kind: QueryConstraintKind,
72}
73
74#[derive(Clone, Debug)]
75enum QueryConstraintKind {
76    OrderByChild(String),
77    OrderByKey,
78    OrderByValue,
79    OrderByPriority,
80    Start {
81        value: Value,
82        name: Option<String>,
83        inclusive: bool,
84    },
85    End {
86        value: Value,
87        name: Option<String>,
88        inclusive: bool,
89    },
90    LimitFirst(u32),
91    LimitLast(u32),
92    EqualTo {
93        value: Value,
94        name: Option<String>,
95    },
96}
97
98type ValueListenerCallback = Arc<dyn Fn(Result<DataSnapshot, DatabaseError>) + Send + Sync>;
99type ChildListenerCallback = Arc<dyn Fn(Result<ChildEvent, DatabaseError>) + Send + Sync>;
100
101#[derive(Clone, Copy, Debug, PartialEq, Eq)]
102pub enum ChildEventType {
103    Added,
104    Changed,
105    Removed,
106}
107
108#[derive(Clone)]
109pub struct ChildEvent {
110    pub event: ChildEventType,
111    pub snapshot: DataSnapshot,
112    pub previous_name: Option<String>,
113}
114
115#[derive(Clone)]
116enum ListenerKind {
117    Value(ValueListenerCallback),
118    Child {
119        event: ChildEventType,
120        callback: ChildListenerCallback,
121    },
122}
123
124#[derive(Clone)]
125struct Listener {
126    target: ListenerTarget,
127    kind: ListenerKind,
128    spec: ListenSpec,
129}
130
131#[derive(Clone)]
132enum ListenerTarget {
133    Reference(Vec<String>),
134    Query {
135        path: Vec<String>,
136        params: QueryParams,
137    },
138}
139
140impl ListenerTarget {
141    fn matches(&self, changed_path: &[String]) -> bool {
142        match self {
143            ListenerTarget::Reference(path) => paths_related(path, changed_path),
144            ListenerTarget::Query { path, .. } => paths_related(path, changed_path),
145        }
146    }
147}
148
149/// Represents a data snapshot returned to listeners, analogous to the JS
150/// `DataSnapshot` type.
151#[derive(Clone, Debug)]
152pub struct DataSnapshot {
153    reference: DatabaseReference,
154    value: Value,
155}
156
157impl DataSnapshot {
158    pub fn reference(&self) -> &DatabaseReference {
159        &self.reference
160    }
161
162    pub fn value(&self) -> &Value {
163        &self.value
164    }
165
166    pub fn exists(&self) -> bool {
167        !self.value.is_null()
168    }
169
170    pub fn key(&self) -> Option<&str> {
171        self.reference.key()
172    }
173
174    pub fn into_value(self) -> Value {
175        self.value
176    }
177
178    /// Returns a snapshot for the provided relative path, mirroring
179    /// `DataSnapshot.child(path)` in `Reference_impl.ts`.
180    pub fn child(&self, relative_path: &str) -> DatabaseResult<DataSnapshot> {
181        let segments = normalize_path(relative_path)?;
182        let child_reference = self.reference.child(relative_path)?;
183        let value = get_value_at_path(&self.value, &segments).unwrap_or(Value::Null);
184        Ok(DataSnapshot {
185            reference: child_reference,
186            value,
187        })
188    }
189
190    /// Returns whether data exists at the provided child path, mirroring the JS
191    /// `DataSnapshot.hasChild()` helper.
192    pub fn has_child(&self, relative_path: &str) -> DatabaseResult<bool> {
193        let segments = normalize_path(relative_path)?;
194        Ok(get_value_at_path(&self.value, &segments)
195            .map(|value| !value.is_null())
196            .unwrap_or(false))
197    }
198
199    /// Returns whether the snapshot has any direct child properties, mirroring
200    /// `DataSnapshot.hasChildren()`.
201    pub fn has_children(&self) -> bool {
202        match extract_data_ref(&self.value) {
203            Value::Object(map) => !map.is_empty(),
204            Value::Array(array) => !array.is_empty(),
205            _ => false,
206        }
207    }
208
209    /// Returns the number of direct child properties, mirroring the JS `size` getter.
210    pub fn size(&self) -> usize {
211        match extract_data_ref(&self.value) {
212            Value::Object(map) => map.len(),
213            Value::Array(array) => array.len(),
214            _ => 0,
215        }
216    }
217
218    /// Returns the JSON representation (including priority metadata) of this snapshot.
219    pub fn to_json(&self) -> Value {
220        self.value.clone()
221    }
222}
223
224/// RAII-style listener registration; dropping the handle detaches the
225/// underlying listener.
226pub struct ListenerRegistration {
227    database: Database,
228    id: Option<u64>,
229}
230
231impl ListenerRegistration {
232    fn new(database: Database, id: u64) -> Self {
233        Self {
234            database,
235            id: Some(id),
236        }
237    }
238
239    pub fn detach(mut self) {
240        if let Some(id) = self.id.take() {
241            self.database.remove_listener(id);
242        }
243    }
244}
245
246/// Result returned by `run_transaction`, mirroring the JS SDK contract.
247#[derive(Clone, Debug)]
248pub struct TransactionResult {
249    /// Indicates whether the transaction committed (i.e. the update function
250    /// returned `Some` and the write succeeded).
251    pub committed: bool,
252    /// Snapshot reflecting the data at the location after the transaction.
253    pub snapshot: DataSnapshot,
254}
255
256impl Drop for ListenerRegistration {
257    fn drop(&mut self) {
258        if let Some(id) = self.id.take() {
259            self.database.remove_listener(id);
260        }
261    }
262}
263
264impl QueryConstraint {
265    fn new(kind: QueryConstraintKind) -> Self {
266        Self { kind }
267    }
268
269    fn apply(self, query: DatabaseQuery) -> DatabaseResult<DatabaseQuery> {
270        match self.kind {
271            QueryConstraintKind::OrderByChild(path) => query.order_by_child(&path),
272            QueryConstraintKind::OrderByKey => query.order_by_key(),
273            QueryConstraintKind::OrderByValue => query.order_by_value(),
274            QueryConstraintKind::OrderByPriority => query.order_by_priority(),
275            QueryConstraintKind::Start {
276                value,
277                name,
278                inclusive,
279            } => {
280                if inclusive {
281                    query.start_at_with_key(value, name)
282                } else {
283                    query.start_after_with_key(value, name)
284                }
285            }
286            QueryConstraintKind::End {
287                value,
288                name,
289                inclusive,
290            } => {
291                if inclusive {
292                    query.end_at_with_key(value, name)
293                } else {
294                    query.end_before_with_key(value, name)
295                }
296            }
297            QueryConstraintKind::LimitFirst(limit) => query.limit_to_first(limit),
298            QueryConstraintKind::LimitLast(limit) => query.limit_to_last(limit),
299            QueryConstraintKind::EqualTo { value, name } => query.equal_to_with_key(value, name),
300        }
301    }
302}
303
304/// Creates a derived query by applying the provided constraints, following the
305/// semantics of `query()` in `packages/database/src/api/Reference_impl.ts`.
306pub fn query(
307    reference: DatabaseReference,
308    constraints: impl IntoIterator<Item = QueryConstraint>,
309) -> DatabaseResult<DatabaseQuery> {
310    let mut current = reference.query();
311    for constraint in constraints {
312        current = constraint.apply(current)?;
313    }
314    Ok(current)
315}
316
317/// Produces a constraint that orders the results by the specified child path.
318/// Mirrors `orderByChild()` from the JS SDK.
319pub fn order_by_child(path: impl Into<String>) -> QueryConstraint {
320    QueryConstraint::new(QueryConstraintKind::OrderByChild(path.into()))
321}
322
323/// Produces a constraint that orders nodes by key (`orderByKey()` in JS).
324pub fn order_by_key() -> QueryConstraint {
325    QueryConstraint::new(QueryConstraintKind::OrderByKey)
326}
327
328/// Produces a constraint that orders nodes by priority (`orderByPriority()` in JS).
329pub fn order_by_priority() -> QueryConstraint {
330    QueryConstraint::new(QueryConstraintKind::OrderByPriority)
331}
332
333/// Produces a constraint that orders nodes by value (`orderByValue()` in JS).
334pub fn order_by_value() -> QueryConstraint {
335    QueryConstraint::new(QueryConstraintKind::OrderByValue)
336}
337
338/// Mirrors the JS `startAt()` constraint (`Reference_impl.ts`).
339pub fn start_at<V>(value: V) -> QueryConstraint
340where
341    V: Into<Value>,
342{
343    QueryConstraint::new(QueryConstraintKind::Start {
344        value: value.into(),
345        name: None,
346        inclusive: true,
347    })
348}
349
350/// Mirrors the JS `startAt(value, name)` overload (`Reference_impl.ts`).
351pub fn start_at_with_key<V, S>(value: V, name: S) -> QueryConstraint
352where
353    V: Into<Value>,
354    S: Into<String>,
355{
356    QueryConstraint::new(QueryConstraintKind::Start {
357        value: value.into(),
358        name: Some(name.into()),
359        inclusive: true,
360    })
361}
362
363/// Mirrors the JS `startAfter()` constraint (`Reference_impl.ts`).
364pub fn start_after<V>(value: V) -> QueryConstraint
365where
366    V: Into<Value>,
367{
368    QueryConstraint::new(QueryConstraintKind::Start {
369        value: value.into(),
370        name: None,
371        inclusive: false,
372    })
373}
374
375/// Mirrors the JS `startAfter(value, name)` overload (`Reference_impl.ts`).
376pub fn start_after_with_key<V, S>(value: V, name: S) -> QueryConstraint
377where
378    V: Into<Value>,
379    S: Into<String>,
380{
381    QueryConstraint::new(QueryConstraintKind::Start {
382        value: value.into(),
383        name: Some(name.into()),
384        inclusive: false,
385    })
386}
387
388/// Mirrors the JS `endAt()` constraint (`Reference_impl.ts`).
389pub fn end_at<V>(value: V) -> QueryConstraint
390where
391    V: Into<Value>,
392{
393    QueryConstraint::new(QueryConstraintKind::End {
394        value: value.into(),
395        name: None,
396        inclusive: true,
397    })
398}
399
400/// Mirrors the JS `endAt(value, name)` overload (`Reference_impl.ts`).
401pub fn end_at_with_key<V, S>(value: V, name: S) -> QueryConstraint
402where
403    V: Into<Value>,
404    S: Into<String>,
405{
406    QueryConstraint::new(QueryConstraintKind::End {
407        value: value.into(),
408        name: Some(name.into()),
409        inclusive: true,
410    })
411}
412
413/// Mirrors the JS `endBefore()` constraint (`Reference_impl.ts`).
414pub fn end_before<V>(value: V) -> QueryConstraint
415where
416    V: Into<Value>,
417{
418    QueryConstraint::new(QueryConstraintKind::End {
419        value: value.into(),
420        name: None,
421        inclusive: false,
422    })
423}
424
425/// Mirrors the JS `endBefore(value, name)` overload (`Reference_impl.ts`).
426pub fn end_before_with_key<V, S>(value: V, name: S) -> QueryConstraint
427where
428    V: Into<Value>,
429    S: Into<String>,
430{
431    QueryConstraint::new(QueryConstraintKind::End {
432        value: value.into(),
433        name: Some(name.into()),
434        inclusive: false,
435    })
436}
437
438/// Mirrors the JS `limitToFirst()` constraint (`Reference_impl.ts`).
439pub fn limit_to_first(limit: u32) -> QueryConstraint {
440    QueryConstraint::new(QueryConstraintKind::LimitFirst(limit))
441}
442
443/// Mirrors the JS `limitToLast()` constraint (`Reference_impl.ts`).
444pub fn limit_to_last(limit: u32) -> QueryConstraint {
445    QueryConstraint::new(QueryConstraintKind::LimitLast(limit))
446}
447
448/// Mirrors the JS `equalTo()` constraint (`Reference_impl.ts`).
449pub fn equal_to<V>(value: V) -> QueryConstraint
450where
451    V: Into<Value>,
452{
453    QueryConstraint::new(QueryConstraintKind::EqualTo {
454        value: value.into(),
455        name: None,
456    })
457}
458
459/// Mirrors the JS `equalTo(value, name)` overload (`Reference_impl.ts`).
460pub fn equal_to_with_key<V, S>(value: V, name: S) -> QueryConstraint
461where
462    V: Into<Value>,
463    S: Into<String>,
464{
465    QueryConstraint::new(QueryConstraintKind::EqualTo {
466        value: value.into(),
467        name: Some(name.into()),
468    })
469}
470
471/// Generates a child location with an auto-generated push ID.
472///
473/// Mirrors the modular `push()` helper from the JS SDK
474/// (`packages/database/src/api/Reference_impl.ts`).
475pub async fn push(reference: &DatabaseReference) -> DatabaseResult<DatabaseReference> {
476    reference.push().await
477}
478
479/// Generates a child location with an auto-generated push ID and writes the provided value.
480///
481/// Mirrors the modular `push(ref, value)` overload from the JS SDK
482/// (`packages/database/src/api/Reference_impl.ts`).
483pub async fn push_with_value<V>(
484    reference: &DatabaseReference,
485    value: V,
486) -> DatabaseResult<DatabaseReference>
487where
488    V: Into<Value>,
489{
490    reference.push_with_value(value).await
491}
492
493/// Registers a `value` listener for the provided reference.
494#[allow(dead_code)]
495pub async fn on_value<F>(
496    reference: &DatabaseReference,
497    callback: F,
498) -> DatabaseResult<ListenerRegistration>
499where
500    F: Fn(Result<DataSnapshot, DatabaseError>) + Send + Sync + 'static,
501{
502    reference.on_value(callback).await
503}
504
505/// Registers a `child_added` listener for the provided reference.
506pub async fn on_child_added<F>(
507    reference: &DatabaseReference,
508    callback: F,
509) -> DatabaseResult<ListenerRegistration>
510where
511    F: Fn(Result<ChildEvent, DatabaseError>) + Send + Sync + 'static,
512{
513    reference.on_child_added(callback).await
514}
515
516/// Registers a `child_changed` listener for the provided reference.
517pub async fn on_child_changed<F>(
518    reference: &DatabaseReference,
519    callback: F,
520) -> DatabaseResult<ListenerRegistration>
521where
522    F: Fn(Result<ChildEvent, DatabaseError>) + Send + Sync + 'static,
523{
524    reference.on_child_changed(callback).await
525}
526
527/// Registers a `child_removed` listener for the provided reference.
528pub async fn on_child_removed<F>(
529    reference: &DatabaseReference,
530    callback: F,
531) -> DatabaseResult<ListenerRegistration>
532where
533    F: Fn(Result<ChildEvent, DatabaseError>) + Send + Sync + 'static,
534{
535    reference.on_child_removed(callback).await
536}
537
538/// Runs a transaction at the provided reference, mirroring the JS SDK.
539///
540/// The update closure receives the current value and can return `Some(new_value)`
541/// to commit the write or `None` to abort. The operation currently uses a
542/// best-effort optimistic strategy when hitting the REST backend; concurrent
543/// writers may lead to retries from user-space code.
544pub async fn run_transaction<F>(
545    reference: &DatabaseReference,
546    mut update: F,
547) -> DatabaseResult<TransactionResult>
548where
549    F: FnMut(Value) -> Option<Value>,
550{
551    reference.run_transaction(|value| update(value)).await
552}
553
554/// Writes a value together with a priority, mirroring the modular `setWithPriority()` helper
555/// (`packages/database/src/api/Reference_impl.ts`).
556pub async fn set_with_priority<V, P>(
557    reference: &DatabaseReference,
558    value: V,
559    priority: P,
560) -> DatabaseResult<()>
561where
562    V: Into<Value>,
563    P: Into<Value>,
564{
565    reference.set_with_priority(value, priority).await
566}
567
568/// Updates the priority for the current location, mirroring the modular `setPriority()` helper
569/// (`packages/database/src/api/Reference_impl.ts`).
570pub async fn set_priority<P>(reference: &DatabaseReference, priority: P) -> DatabaseResult<()>
571where
572    P: Into<Value>,
573{
574    reference.set_priority(priority).await
575}
576
577impl Database {
578    fn new(app: FirebaseApp) -> Self {
579        let repo = Repo::new_for_app(&app);
580        let inner = Arc::new(DatabaseInner {
581            backend: select_backend(&app),
582            repo: repo.clone(),
583            app,
584            listeners: Mutex::new(HashMap::new()),
585            next_listener_id: AtomicU64::new(1),
586            root_cache: Mutex::new(None),
587        });
588        let database = Self { inner };
589        let handler_db = database.clone();
590        repo.set_event_handler(Arc::new(move |action, body| {
591            let database = handler_db.clone();
592            Box::pin(async move { database.handle_realtime_action(&action, &body).await })
593        }));
594        database
595    }
596
597    fn cache_root(&self, value: Value) {
598        *self.inner.root_cache.lock().unwrap() = Some(value);
599    }
600
601    pub(crate) fn repo(&self) -> Arc<Repo> {
602        self.inner.repo.clone()
603    }
604
605    #[allow(dead_code)]
606    #[cfg(test)]
607    fn clear_root_cache_for_test(&self) {
608        *self.inner.root_cache.lock().unwrap() = None;
609    }
610
611    async fn handle_realtime_action(
612        &self,
613        action: &str,
614        body: &serde_json::Value,
615    ) -> DatabaseResult<()> {
616        match action {
617            "d" | "m" => self.handle_realtime_data(action, body).await,
618            "c" => {
619                REALTIME_LOGGER.warn("listener revoked by server".to_string());
620                self.revoke_listener(body).await;
621                Ok(())
622            }
623            "ac" | "apc" => {
624                REALTIME_LOGGER.warn(format!("credential revoked by server ({action})"));
625                self.fail_listeners(internal_error(format!(
626                    "realtime credential revoked: {action}"
627                )));
628                Ok(())
629            }
630            "error" => {
631                let message = body.as_str().unwrap_or("realtime connection error");
632                self.fail_listeners(internal_error(message.to_string()));
633                Ok(())
634            }
635            "sd" => Ok(()),
636            other => Err(internal_error(format!(
637                "unhandled realtime action '{other}'"
638            ))),
639        }
640    }
641
642    async fn handle_realtime_data(
643        &self,
644        action: &str,
645        body: &serde_json::Value,
646    ) -> DatabaseResult<()> {
647        let Some(path) = body.get("p").and_then(|value| value.as_str()) else {
648            return Ok(());
649        };
650        let data = body.get("d").cloned().unwrap_or(serde_json::Value::Null);
651
652        let segments = normalize_path(path)?;
653        let old_root = self.root_snapshot().await?;
654        let mut new_root = old_root.clone();
655
656        match action {
657            "d" => apply_realtime_value(&mut new_root, &segments, data.clone()),
658            "m" => {
659                let Value::Object(map) = &data else {
660                    return Err(invalid_argument(
661                        "Realtime merge payload must be a JSON object",
662                    ));
663                };
664                for (key, value) in map.iter() {
665                    let mut child_path = segments.clone();
666                    child_path.extend(normalize_path(key)?);
667                    apply_realtime_value(&mut new_root, &child_path, value.clone());
668                }
669            }
670            _ => {}
671        }
672
673        REALTIME_LOGGER.debug(format!(
674            "realtime payload action={action} path={path} data={data:?}"
675        ));
676
677        let new_root_for_cache = new_root.clone();
678        self.dispatch_listeners(&segments, &old_root, &new_root)
679            .await?;
680        self.cache_root(new_root_for_cache);
681        Ok(())
682    }
683
684    async fn revoke_listener(&self, body: &serde_json::Value) {
685        let Some(path) = body.get("p").and_then(|value| value.as_str()) else {
686            return;
687        };
688        let segments = match normalize_path(path) {
689            Ok(segments) => segments,
690            Err(err) => {
691                REALTIME_LOGGER.warn(format!("failed to normalise revoked path: {err}"));
692                return;
693            }
694        };
695
696        let (removed, should_disconnect) = {
697            let mut listeners = self.inner.listeners.lock().unwrap();
698            let cancelled: Vec<u64> = listeners
699                .iter()
700                .filter(|(_, listener)| match &listener.target {
701                    ListenerTarget::Reference(path) => path == &segments,
702                    ListenerTarget::Query { path, .. } => path == &segments,
703                })
704                .map(|(id, _)| *id)
705                .collect();
706
707            let removed = cancelled
708                .into_iter()
709                .filter_map(|id| listeners.remove(&id))
710                .collect::<Vec<_>>();
711            let should_disconnect = listeners.is_empty();
712            (removed, should_disconnect)
713        };
714
715        if removed.is_empty() {
716            return;
717        }
718
719        for listener in &removed {
720            if let Err(err) = self.inner.repo.unlisten(listener.spec.clone()).await {
721                REALTIME_LOGGER.warn(format!("failed to detach revoked realtime listener: {err}"));
722            }
723        }
724
725        let error = internal_error("listener revoked by server".to_string());
726        for listener in removed {
727            match listener.kind {
728                ListenerKind::Value(callback) => {
729                    callback(Err(error.clone()));
730                }
731                ListenerKind::Child { callback, .. } => {
732                    callback(Err(error.clone()));
733                }
734            }
735        }
736
737        if should_disconnect {
738            if let Err(err) = self.go_offline().await {
739                REALTIME_LOGGER.warn(format!(
740                    "failed to go offline after listener revocation: {err}"
741                ));
742            }
743        }
744    }
745
746    fn fail_listeners(&self, err: DatabaseError) {
747        let mut guard = self.inner.listeners.lock().unwrap();
748        let listeners = guard.values().cloned().collect::<Vec<_>>();
749        guard.clear();
750        drop(guard);
751        for listener in listeners {
752            REALTIME_LOGGER.warn(format!("listener cancelled due to error: {err}"));
753            match listener.kind {
754                ListenerKind::Value(callback) => {
755                    callback(Err(err.clone()));
756                }
757                ListenerKind::Child { callback, .. } => {
758                    callback(Err(err.clone()));
759                }
760            }
761        }
762    }
763
764    pub async fn go_online(&self) -> DatabaseResult<()> {
765        self.inner.repo.go_online().await
766    }
767
768    pub async fn go_offline(&self) -> DatabaseResult<()> {
769        self.inner.repo.go_offline().await
770    }
771
772    pub fn app(&self) -> &FirebaseApp {
773        &self.inner.app
774    }
775
776    pub fn reference(&self, path: &str) -> DatabaseResult<DatabaseReference> {
777        let segments = normalize_path(path)?;
778        Ok(DatabaseReference {
779            database: self.clone(),
780            path: segments,
781        })
782    }
783
784    fn reference_from_segments(&self, segments: Vec<String>) -> DatabaseReference {
785        DatabaseReference {
786            database: self.clone(),
787            path: segments,
788        }
789    }
790
791    fn listen_spec_for_target(&self, target: &ListenerTarget) -> DatabaseResult<ListenSpec> {
792        match target {
793            ListenerTarget::Reference(path) => Ok(ListenSpec::new(path.clone(), Vec::new())),
794            ListenerTarget::Query { path, params } => {
795                let mut rest_params = params.to_rest_params()?;
796                // REST params may omit `format=export` when not required; add it
797                // to stabilise server-side hashing so multiple listeners with
798                // equivalent semantics collapse to the same spec. This mirrors
799                // the JS SDK behaviour where the listen ID incorporates the
800                // complete query object, including defaults.
801                if rest_params.iter().all(|(key, _)| key != "format") {
802                    rest_params.push(("format".to_string(), "export".to_string()));
803                }
804                Ok(ListenSpec::new(path.clone(), rest_params))
805            }
806        }
807    }
808
809    async fn register_listener(
810        &self,
811        target: ListenerTarget,
812        kind: ListenerKind,
813    ) -> DatabaseResult<ListenerRegistration> {
814        let spec = self.listen_spec_for_target(&target)?;
815        let id = self.inner.next_listener_id.fetch_add(1, Ordering::SeqCst);
816
817        let first_listener = {
818            let mut listeners = self.inner.listeners.lock().unwrap();
819            let was_empty = listeners.is_empty();
820            listeners.insert(
821                id,
822                Listener {
823                    target: target.clone(),
824                    kind: kind.clone(),
825                    spec: spec.clone(),
826                },
827            );
828            was_empty
829        };
830
831        if first_listener {
832            if let Err(err) = self.go_online().await {
833                let mut listeners = self.inner.listeners.lock().unwrap();
834                listeners.remove(&id);
835                return Err(err);
836            }
837        }
838
839        if let Err(err) = self.inner.repo.listen(spec.clone()).await {
840            let mut listeners = self.inner.listeners.lock().unwrap();
841            listeners.remove(&id);
842            if first_listener {
843                let _ = self.go_offline().await;
844            }
845            return Err(err);
846        }
847
848        let current_root = match self.root_snapshot().await {
849            Ok(root) => root,
850            Err(err) => {
851                self.remove_listener(id);
852                return Err(err);
853            }
854        };
855        match kind {
856            ListenerKind::Value(callback) => {
857                let snapshot = self.snapshot_from_root(&target, &current_root).await?;
858                callback(Ok(snapshot));
859            }
860            ListenerKind::Child { event, callback } => {
861                if let Err(err) =
862                    self.fire_initial_child_events(&target, event, &callback, &current_root)
863                {
864                    self.remove_listener(id);
865                    return Err(err);
866                }
867            }
868        }
869
870        Ok(ListenerRegistration::new(self.clone(), id))
871    }
872
873    fn remove_listener(&self, id: u64) {
874        let (listener, should_disconnect) = {
875            let mut listeners = self.inner.listeners.lock().unwrap();
876            let removed = listeners.remove(&id);
877            let should_disconnect = listeners.is_empty();
878            (removed, should_disconnect)
879        };
880
881        if let Some(listener) = listener {
882            let repo = self.inner.repo.clone();
883            let spec = listener.spec.clone();
884            runtime::spawn_detached(async move {
885                if let Err(err) = repo.unlisten(spec).await {
886                    REALTIME_LOGGER.warn(format!(
887                        "failed to detach realtime listener during cleanup: {err}"
888                    ));
889                }
890            });
891        }
892
893        if should_disconnect {
894            let database = self.clone();
895            runtime::spawn_detached(async move {
896                if let Err(err) = database.go_offline().await {
897                    REALTIME_LOGGER.warn(format!(
898                        "failed to go offline after removing last listener: {err}"
899                    ));
900                }
901            });
902        }
903    }
904
905    async fn dispatch_listeners(
906        &self,
907        changed_path: &[String],
908        old_root: &Value,
909        new_root: &Value,
910    ) -> DatabaseResult<()> {
911        let listeners: Vec<Listener> = {
912            let listeners = self.inner.listeners.lock().unwrap();
913            listeners
914                .values()
915                .filter(|listener| listener.target.matches(changed_path))
916                .cloned()
917                .collect()
918        };
919
920        for listener in listeners {
921            match &listener.kind {
922                ListenerKind::Value(callback) => {
923                    let snapshot = self.snapshot_from_root(&listener.target, new_root).await?;
924                    callback(Ok(snapshot));
925                }
926                ListenerKind::Child { event, callback } => {
927                    self.invoke_child_listener(&listener, *event, callback, old_root, new_root)?;
928                }
929            }
930        }
931        Ok(())
932    }
933
934    async fn root_snapshot(&self) -> DatabaseResult<Value> {
935        if let Some(value) = self.inner.root_cache.lock().unwrap().clone() {
936            return Ok(value);
937        }
938        let value = self.inner.backend.get(&[], &[]).await?;
939        *self.inner.root_cache.lock().unwrap() = Some(value.clone());
940        Ok(value)
941    }
942
943    async fn snapshot_from_root(
944        &self,
945        target: &ListenerTarget,
946        root: &Value,
947    ) -> DatabaseResult<DataSnapshot> {
948        match target {
949            ListenerTarget::Reference(path) => {
950                let value = value_at_path(root, path);
951                let reference = self.reference_from_segments(path.clone());
952                Ok(DataSnapshot { reference, value })
953            }
954            ListenerTarget::Query { .. } => self.snapshot_for_target(target).await,
955        }
956    }
957
958    fn fire_initial_child_events(
959        &self,
960        target: &ListenerTarget,
961        event: ChildEventType,
962        callback: &ChildListenerCallback,
963        root: &Value,
964    ) -> DatabaseResult<()> {
965        if event != ChildEventType::Added {
966            return Ok(());
967        }
968
969        if let ListenerTarget::Reference(path) = target {
970            let new_value = value_at_path(root, path);
971            let empty = Value::Null;
972            self.emit_child_events(path, event, callback, &empty, &new_value)?;
973        }
974        Ok(())
975    }
976
977    fn invoke_child_listener(
978        &self,
979        listener: &Listener,
980        event: ChildEventType,
981        callback: &ChildListenerCallback,
982        old_root: &Value,
983        new_root: &Value,
984    ) -> DatabaseResult<()> {
985        let ListenerTarget::Reference(path) = &listener.target else {
986            return Ok(());
987        };
988        let old_value = value_at_path(old_root, path);
989        let new_value = value_at_path(new_root, path);
990        self.emit_child_events(path, event, callback, &old_value, &new_value)
991    }
992
993    fn emit_child_events(
994        &self,
995        parent_path: &[String],
996        event: ChildEventType,
997        callback: &ChildListenerCallback,
998        old_value: &Value,
999        new_value: &Value,
1000    ) -> DatabaseResult<()> {
1001        let old_children = children_map(old_value);
1002        let new_children = children_map(new_value);
1003
1004        match event {
1005            ChildEventType::Added => {
1006                let new_keys: Vec<String> = new_children.keys().cloned().collect();
1007                for key in new_keys.iter() {
1008                    if !old_children.contains_key(key) {
1009                        let value = new_children.get(key).cloned().unwrap_or(Value::Null);
1010                        let prev_name = previous_key(&new_keys, key);
1011                        let snapshot = self.child_snapshot(parent_path, key, value.clone());
1012                        callback(Ok(ChildEvent {
1013                            event,
1014                            snapshot,
1015                            previous_name: prev_name,
1016                        }));
1017                    }
1018                }
1019            }
1020            ChildEventType::Changed => {
1021                let new_keys: Vec<String> = new_children.keys().cloned().collect();
1022                for key in new_keys.iter() {
1023                    if let Some(old_value_child) = old_children.get(key) {
1024                        let new_child = new_children.get(key).expect("child exists in map");
1025                        if old_value_child != new_child {
1026                            let value = new_child.clone();
1027                            let prev_name = previous_key(&new_keys, key);
1028                            let snapshot = self.child_snapshot(parent_path, key, value);
1029                            callback(Ok(ChildEvent {
1030                                event,
1031                                snapshot,
1032                                previous_name: prev_name,
1033                            }));
1034                        }
1035                    }
1036                }
1037            }
1038            ChildEventType::Removed => {
1039                let old_keys: Vec<String> = old_children.keys().cloned().collect();
1040                for key in old_keys.iter() {
1041                    if !new_children.contains_key(key) {
1042                        let value = old_children.get(key).cloned().unwrap_or(Value::Null);
1043                        let prev_name = previous_key(&old_keys, key);
1044                        let snapshot = self.child_snapshot(parent_path, key, value);
1045                        callback(Ok(ChildEvent {
1046                            event,
1047                            snapshot,
1048                            previous_name: prev_name,
1049                        }));
1050                    }
1051                }
1052            }
1053        }
1054        Ok(())
1055    }
1056
1057    fn child_snapshot(
1058        &self,
1059        parent_path: &[String],
1060        child_key: &str,
1061        value: Value,
1062    ) -> DataSnapshot {
1063        let mut segments = parent_path.to_vec();
1064        segments.push(child_key.to_string());
1065        let reference = self.reference_from_segments(segments);
1066        DataSnapshot { reference, value }
1067    }
1068
1069    async fn snapshot_for_target(&self, target: &ListenerTarget) -> DatabaseResult<DataSnapshot> {
1070        match target {
1071            ListenerTarget::Reference(path) => {
1072                let value = self.inner.backend.get(path, &[]).await?;
1073                let reference = self.reference_from_segments(path.clone());
1074                Ok(DataSnapshot { reference, value })
1075            }
1076            ListenerTarget::Query { path, params } => {
1077                let rest_params = params.to_rest_params()?;
1078                let value = self.inner.backend.get(path, rest_params.as_slice()).await?;
1079                let reference = self.reference_from_segments(path.clone());
1080                Ok(DataSnapshot { reference, value })
1081            }
1082        }
1083    }
1084}
1085
1086impl DatabaseReference {
1087    pub fn child(&self, relative: &str) -> DatabaseResult<DatabaseReference> {
1088        let mut segments = self.path.clone();
1089        segments.extend(normalize_path(relative)?);
1090        Ok(DatabaseReference {
1091            database: self.database.clone(),
1092            path: segments,
1093        })
1094    }
1095
1096    /// Returns the parent of this reference, mirroring `ref.parent` in the JS SDK.
1097    pub fn parent(&self) -> Option<DatabaseReference> {
1098        if self.path.is_empty() {
1099            None
1100        } else {
1101            let mut parent = self.path.clone();
1102            parent.pop();
1103            Some(DatabaseReference {
1104                database: self.database.clone(),
1105                path: parent,
1106            })
1107        }
1108    }
1109
1110    /// Returns the root of the database, mirroring `ref.root` in the JS SDK.
1111    pub fn root(&self) -> DatabaseReference {
1112        DatabaseReference {
1113            database: self.database.clone(),
1114            path: Vec::new(),
1115        }
1116    }
1117
1118    pub async fn set(&self, value: Value) -> DatabaseResult<()> {
1119        let value = self.resolve_value_for_path(&self.path, value).await?;
1120        let old_root = self.database.root_snapshot().await?;
1121        let value_for_local = value.clone();
1122        self.database.inner.backend.set(&self.path, value).await?;
1123
1124        let mut new_root = old_root.clone();
1125        apply_realtime_value(&mut new_root, &self.path, value_for_local);
1126
1127        self.database
1128            .dispatch_listeners(&self.path, &old_root, &new_root)
1129            .await?;
1130        self.database.cache_root(new_root);
1131        Ok(())
1132    }
1133
1134    /// Creates a query anchored at this reference, mirroring the JS `query()` helper.
1135    pub fn query(&self) -> DatabaseQuery {
1136        DatabaseQuery {
1137            reference: self.clone(),
1138            params: QueryParams::default(),
1139        }
1140    }
1141
1142    /// Returns a query ordered by the provided child path, mirroring `orderByChild()`.
1143    pub fn order_by_child(&self, path: &str) -> DatabaseResult<DatabaseQuery> {
1144        self.query().order_by_child(path)
1145    }
1146
1147    /// Returns a query ordered by key, mirroring `orderByKey()`.
1148    pub fn order_by_key(&self) -> DatabaseResult<DatabaseQuery> {
1149        self.query().order_by_key()
1150    }
1151
1152    /// Returns a query ordered by value, mirroring `orderByValue()`.
1153    pub fn order_by_value(&self) -> DatabaseResult<DatabaseQuery> {
1154        self.query().order_by_value()
1155    }
1156
1157    /// Returns a query ordered by priority, mirroring `orderByPriority()`.
1158    pub fn order_by_priority(&self) -> DatabaseResult<DatabaseQuery> {
1159        self.query().order_by_priority()
1160    }
1161
1162    /// Registers a value listener for this reference, mirroring `onValue()`.
1163    pub async fn on_value<F>(&self, callback: F) -> DatabaseResult<ListenerRegistration>
1164    where
1165        F: Fn(Result<DataSnapshot, DatabaseError>) + Send + Sync + 'static,
1166    {
1167        let user_fn: ValueListenerCallback = Arc::new(callback);
1168        self.database
1169            .register_listener(
1170                ListenerTarget::Reference(self.path.clone()),
1171                ListenerKind::Value(user_fn),
1172            )
1173            .await
1174    }
1175
1176    /// Registers an `onChildAdded` listener, mirroring the JS SDK.
1177    pub async fn on_child_added<F>(&self, callback: F) -> DatabaseResult<ListenerRegistration>
1178    where
1179        F: Fn(Result<ChildEvent, DatabaseError>) + Send + Sync + 'static,
1180    {
1181        let cb: ChildListenerCallback = Arc::new(callback);
1182        self.database
1183            .register_listener(
1184                ListenerTarget::Reference(self.path.clone()),
1185                ListenerKind::Child {
1186                    event: ChildEventType::Added,
1187                    callback: cb,
1188                },
1189            )
1190            .await
1191    }
1192
1193    /// Registers an `onChildChanged` listener, mirroring the JS SDK.
1194    pub async fn on_child_changed<F>(&self, callback: F) -> DatabaseResult<ListenerRegistration>
1195    where
1196        F: Fn(Result<ChildEvent, DatabaseError>) + Send + Sync + 'static,
1197    {
1198        let cb: ChildListenerCallback = Arc::new(callback);
1199        self.database
1200            .register_listener(
1201                ListenerTarget::Reference(self.path.clone()),
1202                ListenerKind::Child {
1203                    event: ChildEventType::Changed,
1204                    callback: cb,
1205                },
1206            )
1207            .await
1208    }
1209
1210    /// Registers an `onChildRemoved` listener, mirroring the JS SDK.
1211    pub async fn on_child_removed<F>(&self, callback: F) -> DatabaseResult<ListenerRegistration>
1212    where
1213        F: Fn(Result<ChildEvent, DatabaseError>) + Send + Sync + 'static,
1214    {
1215        let cb: ChildListenerCallback = Arc::new(callback);
1216        self.database
1217            .register_listener(
1218                ListenerTarget::Reference(self.path.clone()),
1219                ListenerKind::Child {
1220                    event: ChildEventType::Removed,
1221                    callback: cb,
1222                },
1223            )
1224            .await
1225    }
1226
1227    /// Returns a handle for configuring operations to run when the client disconnects.
1228    pub fn on_disconnect(&self) -> OnDisconnect {
1229        OnDisconnect::new(self.clone())
1230    }
1231
1232    pub(crate) fn database(&self) -> &Database {
1233        &self.database
1234    }
1235
1236    pub(crate) fn path_segments(&self) -> Vec<String> {
1237        self.path.clone()
1238    }
1239
1240    pub(crate) async fn resolve_for_current_path(&self, value: Value) -> DatabaseResult<Value> {
1241        self.resolve_value_for_path(&self.path, value).await
1242    }
1243
1244    pub(crate) async fn resolve_for_absolute_path(
1245        &self,
1246        path: &[String],
1247        value: Value,
1248    ) -> DatabaseResult<Value> {
1249        self.resolve_value_for_path(path, value).await
1250    }
1251
1252    /// Runs a transaction on this reference. The closure receives the current value and may
1253    /// return `Some(next)` to commit or `None` to abort, mirroring the JS SDK contract.
1254    pub async fn run_transaction<F>(&self, mut update: F) -> DatabaseResult<TransactionResult>
1255    where
1256        F: FnMut(Value) -> Option<Value>,
1257    {
1258        let current_value = self.get().await?;
1259        let maybe_new = update(current_value.clone());
1260
1261        match maybe_new {
1262            Some(new_value) => {
1263                self.set(new_value.clone()).await?;
1264                let snapshot = DataSnapshot {
1265                    reference: self.clone(),
1266                    value: new_value,
1267                };
1268                Ok(TransactionResult {
1269                    committed: true,
1270                    snapshot,
1271                })
1272            }
1273            None => Ok(TransactionResult {
1274                committed: false,
1275                snapshot: DataSnapshot {
1276                    reference: self.clone(),
1277                    value: current_value,
1278                },
1279            }),
1280        }
1281    }
1282
1283    /// Applies the provided partial updates to the current location using a single
1284    /// REST `PATCH` call when available.
1285    ///
1286    /// Each key represents a relative child path (e.g. `"profile/name"`).
1287    /// The method rejects empty keys to mirror the JS SDK behaviour.
1288    pub async fn update(&self, updates: serde_json::Map<String, Value>) -> DatabaseResult<()> {
1289        if updates.is_empty() {
1290            return Ok(());
1291        }
1292
1293        let mut operations = Vec::with_capacity(updates.len());
1294        for (key, value) in updates {
1295            if key.trim().is_empty() {
1296                return Err(invalid_argument("Database update path cannot be empty"));
1297            }
1298            let mut segments = self.path.clone();
1299            let relative = normalize_path(&key)?;
1300            if relative.is_empty() {
1301                return Err(invalid_argument(
1302                    "Database update path cannot reference the current location",
1303                ));
1304            }
1305            segments.extend(relative);
1306            let resolved = self.resolve_value_for_path(&segments, value).await?;
1307            operations.push((segments, resolved));
1308        }
1309
1310        let old_root = self.database.root_snapshot().await?;
1311        let ops_for_local = operations.clone();
1312        self.database
1313            .inner
1314            .backend
1315            .update(&self.path, operations)
1316            .await?;
1317
1318        let mut new_root = old_root.clone();
1319        for (absolute, value) in ops_for_local {
1320            apply_realtime_value(&mut new_root, &absolute, value);
1321        }
1322
1323        self.database
1324            .dispatch_listeners(&self.path, &old_root, &new_root)
1325            .await?;
1326        self.database.cache_root(new_root);
1327        Ok(())
1328    }
1329
1330    pub async fn get(&self) -> DatabaseResult<Value> {
1331        if let Some(root) = self.database.inner.root_cache.lock().unwrap().clone() {
1332            return Ok(value_at_path(&root, &self.path));
1333        }
1334        self.database.inner.backend.get(&self.path, &[]).await
1335    }
1336
1337    /// Deletes the value at this location using the backend's `DELETE` support.
1338    pub async fn remove(&self) -> DatabaseResult<()> {
1339        let old_root = self.database.root_snapshot().await?;
1340        self.database.inner.backend.delete(&self.path).await?;
1341        let mut new_root = old_root.clone();
1342        apply_realtime_value(&mut new_root, &self.path, Value::Null);
1343        self.database
1344            .dispatch_listeners(&self.path, &old_root, &new_root)
1345            .await?;
1346        self.database.cache_root(new_root);
1347        Ok(())
1348    }
1349
1350    /// Writes the provided value together with its priority, mirroring
1351    /// `setWithPriority()` in `packages/database/src/api/Reference_impl.ts`.
1352    pub async fn set_with_priority<V, P>(&self, value: V, priority: P) -> DatabaseResult<()>
1353    where
1354        V: Into<Value>,
1355        P: Into<Value>,
1356    {
1357        let priority = priority.into();
1358        validate_priority_value(&priority)?;
1359        if matches!(self.key(), Some(".length" | ".keys")) {
1360            return Err(invalid_argument(
1361                "set_with_priority failed: read-only child key",
1362            ));
1363        }
1364
1365        let value = self
1366            .resolve_value_for_path(&self.path, value.into())
1367            .await?;
1368        let payload = pack_with_priority(value, priority);
1369        let payload_for_local = payload.clone();
1370        let old_root = self.database.root_snapshot().await?;
1371        self.database.inner.backend.set(&self.path, payload).await?;
1372
1373        let mut new_root = old_root.clone();
1374        apply_realtime_value(&mut new_root, &self.path, payload_for_local);
1375
1376        self.database
1377            .dispatch_listeners(&self.path, &old_root, &new_root)
1378            .await?;
1379        self.database.cache_root(new_root);
1380        Ok(())
1381    }
1382
1383    /// Updates the priority for this location, mirroring `setPriority()` in the JS SDK.
1384    pub async fn set_priority<P>(&self, priority: P) -> DatabaseResult<()>
1385    where
1386        P: Into<Value>,
1387    {
1388        let priority = priority.into();
1389        validate_priority_value(&priority)?;
1390
1391        let current = self.database.inner.backend.get(&self.path, &[]).await?;
1392        let value = extract_data_owned(&current);
1393        let payload = pack_with_priority(value, priority);
1394        let payload_for_local = payload.clone();
1395        let old_root = self.database.root_snapshot().await?;
1396        self.database.inner.backend.set(&self.path, payload).await?;
1397
1398        let mut new_root = old_root.clone();
1399        apply_realtime_value(&mut new_root, &self.path, payload_for_local);
1400
1401        self.database
1402            .dispatch_listeners(&self.path, &old_root, &new_root)
1403            .await?;
1404        self.database.cache_root(new_root);
1405        Ok(())
1406    }
1407
1408    /// Creates a child location with an auto-generated key, mirroring `push()` from the JS SDK.
1409    ///
1410    /// Port of `push()` in `packages/database/src/api/Reference_impl.ts`.
1411    ///
1412    /// # Examples
1413    /// ```rust,ignore
1414    /// # use firebase_rs_sdk::database::{DatabaseReference, DatabaseResult};
1415    /// # use serde_json::json;
1416    /// # fn demo(messages: &DatabaseReference) -> DatabaseResult<()> {
1417    /// let new_message = messages.push_with_value(json!({ "text": "hi" })).await.expected("Failed to push message");
1418    /// assert!(new_message.key().is_some());
1419    /// # Ok(())
1420    /// # }
1421    /// ```
1422    pub async fn push(&self) -> DatabaseResult<DatabaseReference> {
1423        self.push_internal(None).await
1424    }
1425
1426    /// Creates a child location with an auto-generated key and writes the provided value.
1427    ///
1428    /// Mirrors the `push(ref, value)` overload from `packages/database/src/api/Reference_impl.ts`.
1429    pub async fn push_with_value<V>(&self, value: V) -> DatabaseResult<DatabaseReference>
1430    where
1431        V: Into<Value>,
1432    {
1433        self.push_internal(Some(value.into())).await
1434    }
1435
1436    async fn resolve_value_for_path(&self, path: &[String], value: Value) -> DatabaseResult<Value> {
1437        if contains_server_value(&value) {
1438            let current = self.database.inner.backend.get(path, &[]).await?;
1439            let current_ref = extract_data_ref(&current);
1440            resolve_server_values(value, Some(current_ref))
1441        } else {
1442            Ok(value)
1443        }
1444    }
1445
1446    async fn push_internal(&self, value: Option<Value>) -> DatabaseResult<DatabaseReference> {
1447        let timestamp = current_time_millis()?;
1448        let key = next_push_id(timestamp);
1449        let child = self.child(&key)?;
1450        if let Some(value) = value {
1451            child.set(value).await?;
1452        }
1453        Ok(child)
1454    }
1455
1456    /// Returns the last path segment (the key) for this reference, mirroring
1457    /// `ref.key` in the JS SDK.
1458    pub fn key(&self) -> Option<&str> {
1459        self.path.last().map(|segment| segment.as_str())
1460    }
1461
1462    pub fn path(&self) -> String {
1463        if self.path.is_empty() {
1464            "/".to_string()
1465        } else {
1466            format!("/{}/", self.path.join("/"))
1467        }
1468    }
1469}
1470
1471impl DatabaseQuery {
1472    /// Exposes the underlying reference backing this query.
1473    pub fn reference(&self) -> &DatabaseReference {
1474        &self.reference
1475    }
1476
1477    /// Orders children by the given path, mirroring `orderByChild()`.
1478    pub fn order_by_child(mut self, path: &str) -> DatabaseResult<Self> {
1479        validate_order_by_child_target(path)?;
1480        let segments = normalize_path(path)?;
1481        if segments.is_empty() {
1482            return Err(invalid_argument("orderByChild path cannot be empty"));
1483        }
1484        let joined = segments.join("/");
1485        self.params.set_index(QueryIndex::Child(joined))?;
1486        Ok(self)
1487    }
1488
1489    /// Orders children by key, mirroring `orderByKey()`.
1490    pub fn order_by_key(mut self) -> DatabaseResult<Self> {
1491        self.params.set_index(QueryIndex::Key)?;
1492        Ok(self)
1493    }
1494
1495    /// Orders children by value, mirroring `orderByValue()`.
1496    pub fn order_by_value(mut self) -> DatabaseResult<Self> {
1497        self.params.set_index(QueryIndex::Value)?;
1498        Ok(self)
1499    }
1500
1501    /// Orders children by priority, mirroring `orderByPriority()`.
1502    pub fn order_by_priority(mut self) -> DatabaseResult<Self> {
1503        self.params.set_index(QueryIndex::Priority)?;
1504        Ok(self)
1505    }
1506
1507    /// Applies a `startAt()` constraint to the query.
1508    pub fn start_at(self, value: Value) -> DatabaseResult<Self> {
1509        self.start_at_with_key(value, None)
1510    }
1511
1512    /// Applies a keyed `startAt(value, name)` constraint to the query.
1513    pub fn start_at_with_key(mut self, value: Value, name: Option<String>) -> DatabaseResult<Self> {
1514        let bound = QueryBound {
1515            value,
1516            name,
1517            inclusive: true,
1518        };
1519        self.params.set_start(bound)?;
1520        Ok(self)
1521    }
1522
1523    /// Applies a `startAfter()` constraint to the query.
1524    pub fn start_after(self, value: Value) -> DatabaseResult<Self> {
1525        self.start_after_with_key(value, None)
1526    }
1527
1528    /// Applies a keyed `startAfter(value, name)` constraint to the query.
1529    pub fn start_after_with_key(
1530        mut self,
1531        value: Value,
1532        name: Option<String>,
1533    ) -> DatabaseResult<Self> {
1534        let bound = QueryBound {
1535            value,
1536            name,
1537            inclusive: false,
1538        };
1539        self.params.set_start(bound)?;
1540        Ok(self)
1541    }
1542
1543    /// Applies an `endAt()` constraint to the query.
1544    pub fn end_at(self, value: Value) -> DatabaseResult<Self> {
1545        self.end_at_with_key(value, None)
1546    }
1547
1548    /// Applies a keyed `endAt(value, name)` constraint to the query.
1549    pub fn end_at_with_key(mut self, value: Value, name: Option<String>) -> DatabaseResult<Self> {
1550        let bound = QueryBound {
1551            value,
1552            name,
1553            inclusive: true,
1554        };
1555        self.params.set_end(bound)?;
1556        Ok(self)
1557    }
1558
1559    /// Applies an `endBefore()` constraint to the query.
1560    pub fn end_before(self, value: Value) -> DatabaseResult<Self> {
1561        self.end_before_with_key(value, None)
1562    }
1563
1564    /// Applies a keyed `endBefore(value, name)` constraint to the query.
1565    pub fn end_before_with_key(
1566        mut self,
1567        value: Value,
1568        name: Option<String>,
1569    ) -> DatabaseResult<Self> {
1570        let bound = QueryBound {
1571            value,
1572            name,
1573            inclusive: false,
1574        };
1575        self.params.set_end(bound)?;
1576        Ok(self)
1577    }
1578
1579    /// Applies `limitToFirst()` to the query.
1580    pub fn limit_to_first(mut self, limit: u32) -> DatabaseResult<Self> {
1581        if limit == 0 {
1582            return Err(invalid_argument("limitToFirst must be greater than zero"));
1583        }
1584        self.params.set_limit(QueryLimit::First(limit))?;
1585        Ok(self)
1586    }
1587
1588    /// Applies `limitToLast()` to the query.
1589    pub fn limit_to_last(mut self, limit: u32) -> DatabaseResult<Self> {
1590        if limit == 0 {
1591            return Err(invalid_argument("limitToLast must be greater than zero"));
1592        }
1593        self.params.set_limit(QueryLimit::Last(limit))?;
1594        Ok(self)
1595    }
1596
1597    /// Applies `equalTo()` to the query.
1598    pub fn equal_to(self, value: Value) -> DatabaseResult<Self> {
1599        self.equal_to_with_key(value, None)
1600    }
1601
1602    /// Applies a keyed `equalTo(value, name)` constraint to the query.
1603    pub fn equal_to_with_key(mut self, value: Value, name: Option<String>) -> DatabaseResult<Self> {
1604        let start_bound = QueryBound {
1605            value: value.clone(),
1606            name: name.clone(),
1607            inclusive: true,
1608        };
1609        let end_bound = QueryBound {
1610            value,
1611            name,
1612            inclusive: true,
1613        };
1614        self.params.set_start(start_bound)?;
1615        self.params.set_end(end_bound)?;
1616        Ok(self)
1617    }
1618
1619    /// Executes the query and returns the JSON payload, mirroring JS `get()`.
1620    pub async fn get(&self) -> DatabaseResult<Value> {
1621        let params = self.params.to_rest_params()?;
1622        self.reference
1623            .database
1624            .inner
1625            .backend
1626            .get(&self.reference.path, params.as_slice())
1627            .await
1628    }
1629
1630    /// Registers a value listener for this query, mirroring `onValue(query, cb)`.
1631    pub async fn on_value<F>(&self, callback: F) -> DatabaseResult<ListenerRegistration>
1632    where
1633        F: Fn(Result<DataSnapshot, DatabaseError>) + Send + Sync + 'static,
1634    {
1635        let user_fn: ValueListenerCallback = Arc::new(callback);
1636        self.reference
1637            .database
1638            .register_listener(
1639                ListenerTarget::Query {
1640                    path: self.reference.path.clone(),
1641                    params: self.params.clone(),
1642                },
1643                ListenerKind::Value(user_fn),
1644            )
1645            .await
1646    }
1647}
1648
1649pub(crate) fn normalize_path(path: &str) -> DatabaseResult<Vec<String>> {
1650    let trimmed = path.trim_matches('/');
1651    if trimmed.is_empty() {
1652        return Ok(Vec::new());
1653    }
1654    let mut segments = Vec::new();
1655    for segment in trimmed.split('/') {
1656        if segment.is_empty() {
1657            return Err(invalid_argument(
1658                "Database path cannot contain empty segments",
1659            ));
1660        }
1661        segments.push(segment.to_string());
1662    }
1663    Ok(segments)
1664}
1665
1666fn validate_order_by_child_target(path: &str) -> DatabaseResult<()> {
1667    match path {
1668        "$key" => Err(invalid_argument(
1669            "order_by_child(\"$key\") is invalid; call order_by_key() instead",
1670        )),
1671        "$priority" => Err(invalid_argument(
1672            "order_by_child(\"$priority\") is invalid; call order_by_priority() instead",
1673        )),
1674        "$value" => Err(invalid_argument(
1675            "order_by_child(\"$value\") is invalid; call order_by_value() instead",
1676        )),
1677        _ => Ok(()),
1678    }
1679}
1680
1681fn paths_related(a: &[String], b: &[String]) -> bool {
1682    is_prefix(a, b) || is_prefix(b, a)
1683}
1684
1685fn is_prefix(prefix: &[String], path: &[String]) -> bool {
1686    if prefix.len() > path.len() {
1687        return false;
1688    }
1689    prefix
1690        .iter()
1691        .zip(path.iter())
1692        .all(|(left, right)| left == right)
1693}
1694
1695fn apply_realtime_value(root: &mut Value, path: &[String], value: Value) {
1696    if path.is_empty() {
1697        *root = value;
1698        return;
1699    }
1700
1701    let mut current = root;
1702    for segment in &path[..path.len() - 1] {
1703        if !current.is_object() {
1704            *current = Value::Object(Map::new());
1705        }
1706        let obj = current.as_object_mut().expect("object ensured");
1707        current = obj
1708            .entry(segment.clone())
1709            .or_insert(Value::Object(Map::new()));
1710    }
1711
1712    if value.is_null() {
1713        if let Some(obj) = current.as_object_mut() {
1714            obj.remove(path.last().expect("path non-empty"));
1715        }
1716    } else {
1717        if !current.is_object() {
1718            *current = Value::Object(Map::new());
1719        }
1720        current
1721            .as_object_mut()
1722            .expect("object ensured")
1723            .insert(path.last().expect("path non-empty").clone(), value);
1724    }
1725}
1726
1727pub(crate) fn validate_priority_value(priority: &Value) -> DatabaseResult<()> {
1728    match priority {
1729        Value::Null | Value::Number(_) | Value::String(_) => Ok(()),
1730        _ => Err(invalid_argument(
1731            "Priority must be a string, number, or null",
1732        )),
1733    }
1734}
1735
1736pub(crate) fn pack_with_priority(value: Value, priority: Value) -> Value {
1737    let mut map = Map::with_capacity(2);
1738    map.insert(".value".to_string(), value);
1739    map.insert(".priority".to_string(), priority);
1740    Value::Object(map)
1741}
1742
1743fn extract_data_ref<'a>(value: &'a Value) -> &'a Value {
1744    value
1745        .as_object()
1746        .and_then(|obj| obj.get(".value"))
1747        .unwrap_or(value)
1748}
1749
1750fn extract_data_owned(value: &Value) -> Value {
1751    extract_data_ref(value).clone()
1752}
1753
1754fn contains_server_value(value: &Value) -> bool {
1755    match value {
1756        Value::Object(map) => {
1757            if map.contains_key(".sv") {
1758                return true;
1759            }
1760            map.values().any(contains_server_value)
1761        }
1762        Value::Array(items) => items.iter().any(contains_server_value),
1763        _ => false,
1764    }
1765}
1766
1767fn resolve_server_values(value: Value, current: Option<&Value>) -> DatabaseResult<Value> {
1768    match value {
1769        Value::Object(mut map) => {
1770            if let Some(spec) = map.remove(".sv") {
1771                return resolve_server_placeholder(spec, current.map(extract_data_ref));
1772            }
1773            let mut resolved = Map::with_capacity(map.len());
1774            for (key, child) in map.into_iter() {
1775                let child_current = current
1776                    .and_then(|curr| match curr {
1777                        Value::Object(obj) => obj.get(&key),
1778                        Value::Array(arr) => key.parse::<usize>().ok().and_then(|idx| arr.get(idx)),
1779                        _ => None,
1780                    })
1781                    .map(extract_data_ref);
1782                let child_resolved = resolve_server_values(child, child_current)?;
1783                resolved.insert(key, child_resolved);
1784            }
1785            Ok(Value::Object(resolved))
1786        }
1787        Value::Array(items) => {
1788            let mut resolved = Vec::with_capacity(items.len());
1789            for (index, child) in items.into_iter().enumerate() {
1790                let child_current = current
1791                    .and_then(|curr| match curr {
1792                        Value::Array(arr) => arr.get(index),
1793                        _ => None,
1794                    })
1795                    .map(extract_data_ref);
1796                resolved.push(resolve_server_values(child, child_current)?);
1797            }
1798            Ok(Value::Array(resolved))
1799        }
1800        other => Ok(other),
1801    }
1802}
1803
1804fn resolve_server_placeholder(spec: Value, current: Option<&Value>) -> DatabaseResult<Value> {
1805    match spec {
1806        Value::String(token) if token == "timestamp" => {
1807            let millis = current_time_millis()?;
1808            Ok(Value::Number(Number::from(millis)))
1809        }
1810        Value::Object(mut map) => {
1811            if let Some(delta) = map.remove("increment") {
1812                let delta = delta.as_f64().ok_or_else(|| {
1813                    invalid_argument("ServerValue.increment delta must be numeric")
1814                })?;
1815                let base = current
1816                    .and_then(|value| match value {
1817                        Value::Number(number) => number.as_f64(),
1818                        _ => None,
1819                    })
1820                    .unwrap_or(0.0);
1821                let total = base + delta;
1822                let number = Number::from_f64(total).ok_or_else(|| {
1823                    invalid_argument("ServerValue.increment produced an invalid number")
1824                })?;
1825                Ok(Value::Number(number))
1826            } else {
1827                Err(invalid_argument("Unsupported server value placeholder"))
1828            }
1829        }
1830        _ => Err(invalid_argument("Unsupported server value placeholder")),
1831    }
1832}
1833
1834fn value_at_path(root: &Value, path: &[String]) -> Value {
1835    if path.is_empty() {
1836        return extract_data_ref(root).clone();
1837    }
1838    get_value_at_path(root, path).unwrap_or(Value::Null)
1839}
1840
1841fn children_map(value: &Value) -> BTreeMap<String, Value> {
1842    let mut map = BTreeMap::new();
1843    match extract_data_ref(value) {
1844        Value::Object(obj) => {
1845            for (key, child) in obj.iter() {
1846                map.insert(key.clone(), child.clone());
1847            }
1848        }
1849        Value::Array(array) => {
1850            for (index, child) in array.iter().enumerate() {
1851                map.insert(index.to_string(), child.clone());
1852            }
1853        }
1854        _ => {}
1855    }
1856    map
1857}
1858
1859fn previous_key(keys: &[String], key: &str) -> Option<String> {
1860    let mut previous: Option<String> = None;
1861    for current in keys {
1862        if current == key {
1863            return previous;
1864        }
1865        previous = Some(current.clone());
1866    }
1867    None
1868}
1869
1870fn get_value_at_path(root: &Value, segments: &[String]) -> Option<Value> {
1871    if segments.is_empty() {
1872        return Some(extract_data_ref(root).clone());
1873    }
1874
1875    let mut current = root;
1876    for segment in segments {
1877        match current {
1878            Value::Object(map) => {
1879                if let Some(child) = map.get(segment) {
1880                    current = child;
1881                    continue;
1882                }
1883
1884                if let Some(value_field) = map.get(".value") {
1885                    current = match value_field {
1886                        Value::Object(obj) => obj.get(segment)?,
1887                        Value::Array(items) => {
1888                            let index = segment.parse::<usize>().ok()?;
1889                            items.get(index)?
1890                        }
1891                        _ => return None,
1892                    };
1893                    continue;
1894                }
1895
1896                return None;
1897            }
1898            Value::Array(array) => {
1899                let index = segment.parse::<usize>().ok()?;
1900                current = array.get(index)?;
1901            }
1902            _ => return None,
1903        }
1904    }
1905
1906    Some(current.clone())
1907}
1908
1909fn current_time_millis() -> DatabaseResult<u64> {
1910    let duration = SystemTime::now()
1911        .duration_since(UNIX_EPOCH)
1912        .map_err(|_| internal_error("System time is before the Unix epoch"))?;
1913    let millis = duration.as_millis();
1914    millis
1915        .try_into()
1916        .map_err(|_| internal_error("Timestamp exceeds 64-bit range"))
1917}
1918
1919static DATABASE_COMPONENT: LazyLock<()> = LazyLock::new(|| {
1920    let component = Component::new(
1921        DATABASE_COMPONENT_NAME,
1922        Arc::new(database_factory),
1923        ComponentType::Public,
1924    )
1925    .with_instantiation_mode(InstantiationMode::Lazy);
1926    let _ = app::register_component(component);
1927});
1928
1929fn database_factory(
1930    container: &crate::component::ComponentContainer,
1931    _options: InstanceFactoryOptions,
1932) -> Result<DynService, ComponentError> {
1933    let app = container.root_service::<FirebaseApp>().ok_or_else(|| {
1934        ComponentError::InitializationFailed {
1935            name: DATABASE_COMPONENT_NAME.to_string(),
1936            reason: "Firebase app not attached to component container".to_string(),
1937        }
1938    })?;
1939
1940    let database = Database::new((*app).clone());
1941    Ok(Arc::new(database) as DynService)
1942}
1943
1944fn ensure_registered() {
1945    LazyLock::force(&DATABASE_COMPONENT);
1946}
1947
1948pub fn register_database_component() {
1949    ensure_registered();
1950}
1951
1952pub async fn get_database(app: Option<FirebaseApp>) -> DatabaseResult<Arc<Database>> {
1953    ensure_registered();
1954    let app = match app {
1955        Some(app) => app,
1956        None => {
1957            #[cfg(all(feature = "wasm-web", target_arch = "wasm32"))]
1958            {
1959                return Err(internal_error(
1960                    "get_database(None) is not supported on wasm; pass a FirebaseApp",
1961                ));
1962            }
1963            #[cfg(not(all(feature = "wasm-web", target_arch = "wasm32")))]
1964            {
1965                crate::app::get_app(None)
1966                    .await
1967                    .map_err(|err| internal_error(err.to_string()))?
1968            }
1969        }
1970    };
1971
1972    let provider = app::get_provider(&app, DATABASE_COMPONENT_NAME);
1973    if let Some(database) = provider.get_immediate::<Database>() {
1974        return Ok(database);
1975    }
1976
1977    match provider.initialize::<Database>(Value::Null, None) {
1978        Ok(service) => Ok(service),
1979        Err(crate::component::types::ComponentError::InstanceUnavailable { .. }) => provider
1980            .get_immediate::<Database>()
1981            .ok_or_else(|| internal_error("Database component not available")),
1982        Err(err) => Err(internal_error(err.to_string())),
1983    }
1984}
1985
1986#[cfg(all(test, not(target_arch = "wasm32")))]
1987mod tests {
1988    use super::*;
1989    use crate::app::initialize_app;
1990    use crate::app::{FirebaseAppSettings, FirebaseOptions};
1991    use crate::database::{
1992        equal_to_with_key, increment, limit_to_first, limit_to_last, order_by_child, order_by_key,
1993        query as compose_query, server_timestamp, start_at,
1994    };
1995    use httpmock::prelude::*;
1996    use httpmock::Method::{DELETE, GET, PATCH, PUT};
1997    use serde_json::{json, Value};
1998    use std::sync::{Arc, Mutex};
1999    fn unique_settings() -> FirebaseAppSettings {
2000        use std::sync::atomic::{AtomicUsize, Ordering};
2001        static COUNTER: AtomicUsize = AtomicUsize::new(0);
2002        FirebaseAppSettings {
2003            name: Some(format!(
2004                "database-{}",
2005                COUNTER.fetch_add(1, Ordering::SeqCst)
2006            )),
2007            ..Default::default()
2008        }
2009    }
2010
2011    #[tokio::test(flavor = "multi_thread")]
2012    async fn set_and_get_value() {
2013        let options = FirebaseOptions {
2014            project_id: Some("project".into()),
2015            ..Default::default()
2016        };
2017        let app = initialize_app(options, Some(unique_settings()))
2018            .await
2019            .unwrap();
2020        let database = get_database(Some(app)).await.unwrap();
2021        let ref_root = database.reference("/messages").unwrap();
2022        ref_root
2023            .set(json!({ "greeting": "hello" }))
2024            .await
2025            .expect("set");
2026        let value = ref_root.get().await.unwrap();
2027        assert_eq!(value, json!({ "greeting": "hello" }));
2028    }
2029
2030    #[tokio::test(flavor = "multi_thread")]
2031    async fn push_generates_monotonic_keys() {
2032        let options = FirebaseOptions {
2033            project_id: Some("project".into()),
2034            ..Default::default()
2035        };
2036        let app = initialize_app(options, Some(unique_settings()))
2037            .await
2038            .unwrap();
2039        let database = get_database(Some(app)).await.unwrap();
2040        let queue = database.reference("queue").unwrap();
2041
2042        let mut keys = Vec::new();
2043        for _ in 0..5 {
2044            let key = queue.push().await.unwrap().key().unwrap().to_string();
2045            keys.push(key);
2046        }
2047
2048        let mut sorted = keys.clone();
2049        sorted.sort();
2050        assert_eq!(keys, sorted);
2051    }
2052
2053    #[tokio::test(flavor = "multi_thread")]
2054    async fn push_with_value_persists_data() {
2055        let options = FirebaseOptions {
2056            project_id: Some("project".into()),
2057            ..Default::default()
2058        };
2059        let app = initialize_app(options, Some(unique_settings()))
2060            .await
2061            .unwrap();
2062        let database = get_database(Some(app)).await.unwrap();
2063        let messages = database.reference("messages").unwrap();
2064
2065        let payload = json!({ "text": "hello" });
2066        let child = messages
2067            .push_with_value(payload.clone())
2068            .await
2069            .expect("push with value");
2070
2071        let stored = child.get().await.unwrap();
2072        assert_eq!(stored, payload);
2073
2074        let parent = messages.get().await.unwrap();
2075        let key = child.key().unwrap();
2076        assert_eq!(parent.get(key), Some(&payload));
2077    }
2078
2079    #[tokio::test(flavor = "multi_thread")]
2080    async fn child_updates_merge() {
2081        let options = FirebaseOptions {
2082            project_id: Some("project".into()),
2083            ..Default::default()
2084        };
2085        let app = initialize_app(options, Some(unique_settings()))
2086            .await
2087            .unwrap();
2088        let database = get_database(Some(app)).await.unwrap();
2089        let root = database.reference("items").unwrap();
2090        root.set(json!({ "a": { "count": 1 } })).await.unwrap();
2091        root.child("a/count").unwrap().set(json!(2)).await.unwrap();
2092        let value = root.get().await.unwrap();
2093        assert_eq!(value, json!({ "a": { "count": 2 } }));
2094    }
2095
2096    #[tokio::test(flavor = "multi_thread")]
2097    async fn set_with_priority_wraps_value() {
2098        let options = FirebaseOptions {
2099            project_id: Some("project".into()),
2100            ..Default::default()
2101        };
2102        let app = initialize_app(options, Some(unique_settings()))
2103            .await
2104            .unwrap();
2105        let database = get_database(Some(app)).await.unwrap();
2106        let item = database.reference("items/main").unwrap();
2107
2108        item.set_with_priority(json!({ "count": 1 }), json!(5))
2109            .await
2110            .unwrap();
2111
2112        let stored = item.get().await.unwrap();
2113        assert_eq!(
2114            stored,
2115            json!({
2116                ".value": { "count": 1 },
2117                ".priority": 5
2118            })
2119        );
2120    }
2121
2122    #[tokio::test(flavor = "multi_thread")]
2123    async fn set_priority_updates_existing_value() {
2124        let options = FirebaseOptions {
2125            project_id: Some("project".into()),
2126            ..Default::default()
2127        };
2128        let app = initialize_app(options, Some(unique_settings()))
2129            .await
2130            .unwrap();
2131        let database = get_database(Some(app)).await.unwrap();
2132        let item = database.reference("items/main").unwrap();
2133
2134        item.set(json!({ "count": 4 })).await.unwrap();
2135        item.set_priority(json!(10)).await.unwrap();
2136
2137        let stored = item.get().await.unwrap();
2138        assert_eq!(
2139            stored,
2140            json!({
2141                ".value": { "count": 4 },
2142                ".priority": 10
2143            })
2144        );
2145    }
2146
2147    #[tokio::test(flavor = "multi_thread")]
2148    async fn server_timestamp_is_resolved_on_set() {
2149        let options = FirebaseOptions {
2150            project_id: Some("project".into()),
2151            ..Default::default()
2152        };
2153        let app = initialize_app(options, Some(unique_settings()))
2154            .await
2155            .unwrap();
2156        let database = get_database(Some(app)).await.unwrap();
2157        let created_at = database.reference("meta/created_at").unwrap();
2158
2159        created_at.set(server_timestamp()).await.unwrap();
2160
2161        let value = created_at.get().await.unwrap();
2162        let ts = value.as_u64().expect("timestamp as u64");
2163        let now = SystemTime::now()
2164            .duration_since(std::time::UNIX_EPOCH)
2165            .unwrap()
2166            .as_millis() as u64;
2167        assert!(now >= ts);
2168        assert!(now - ts < 5_000);
2169    }
2170
2171    #[tokio::test(flavor = "multi_thread")]
2172    async fn server_increment_updates_value() {
2173        let options = FirebaseOptions {
2174            project_id: Some("project".into()),
2175            ..Default::default()
2176        };
2177        let app = initialize_app(options, Some(unique_settings()))
2178            .await
2179            .unwrap();
2180        let database = get_database(Some(app)).await.unwrap();
2181        let counter = database.reference("counters/main").unwrap();
2182
2183        counter.set(json!(1)).await.unwrap();
2184        counter.set(increment(2.0)).await.unwrap();
2185
2186        let value = counter.get().await.unwrap();
2187        assert_eq!(value.as_f64().unwrap(), 3.0);
2188    }
2189
2190    #[tokio::test(flavor = "multi_thread")]
2191    async fn update_supports_server_increment() {
2192        let options = FirebaseOptions {
2193            project_id: Some("project".into()),
2194            ..Default::default()
2195        };
2196        let app = initialize_app(options, Some(unique_settings()))
2197            .await
2198            .unwrap();
2199        let database = get_database(Some(app)).await.unwrap();
2200        let scores = database.reference("scores").unwrap();
2201
2202        scores.set(json!({ "alice": 4 })).await.unwrap();
2203        let mut delta = serde_json::Map::new();
2204        delta.insert("alice".to_string(), increment(3.0));
2205        scores.update(delta).await.unwrap();
2206
2207        let stored = scores.get().await.unwrap();
2208        assert_eq!(stored.get("alice").unwrap().as_f64().unwrap(), 7.0);
2209    }
2210
2211    #[tokio::test(flavor = "multi_thread")]
2212    async fn update_rejects_empty_key() {
2213        let options = FirebaseOptions {
2214            project_id: Some("project".into()),
2215            ..Default::default()
2216        };
2217        let app = initialize_app(options, Some(unique_settings()))
2218            .await
2219            .unwrap();
2220        let database = get_database(Some(app)).await.unwrap();
2221        let reference = database.reference("items").unwrap();
2222
2223        let mut updates = serde_json::Map::new();
2224        updates.insert("".to_string(), json!(1));
2225
2226        let err = reference.update(updates).await.unwrap_err();
2227        assert_eq!(err.code_str(), "database/invalid-argument");
2228    }
2229
2230    #[tokio::test(flavor = "multi_thread")]
2231    async fn run_transaction_commits_update() {
2232        let options = FirebaseOptions {
2233            project_id: Some("project".into()),
2234            ..Default::default()
2235        };
2236        let app = initialize_app(options, Some(unique_settings()))
2237            .await
2238            .unwrap();
2239        let database = get_database(Some(app)).await.unwrap();
2240        let counter = database.reference("counters/main").unwrap();
2241        counter.set(json!(1)).await.unwrap();
2242
2243        let result = counter
2244            .run_transaction(|current| {
2245                let next = current.as_i64().unwrap_or(0) + 1;
2246                Some(json!(next))
2247            })
2248            .await
2249            .unwrap();
2250
2251        assert!(result.committed);
2252        assert_eq!(result.snapshot.into_value(), json!(2));
2253        let stored = counter.get().await.unwrap();
2254        assert_eq!(stored, json!(2));
2255    }
2256
2257    #[tokio::test(flavor = "multi_thread")]
2258    async fn run_transaction_abort_preserves_value() {
2259        let options = FirebaseOptions {
2260            project_id: Some("project".into()),
2261            ..Default::default()
2262        };
2263        let app = initialize_app(options, Some(unique_settings()))
2264            .await
2265            .unwrap();
2266        let database = get_database(Some(app)).await.unwrap();
2267        let flag = database.reference("flags/feature").unwrap();
2268        flag.set(json!(true)).await.unwrap();
2269
2270        let result = flag
2271            .run_transaction(|current| {
2272                if current == Value::Bool(true) {
2273                    None
2274                } else {
2275                    Some(Value::Bool(true))
2276                }
2277            })
2278            .await
2279            .unwrap();
2280
2281        assert!(!result.committed);
2282        assert_eq!(result.snapshot.into_value(), json!(true));
2283        let stored = flag.get().await.unwrap();
2284        assert_eq!(stored, json!(true));
2285    }
2286
2287    #[tokio::test(flavor = "multi_thread")]
2288    async fn rest_backend_performs_http_requests() {
2289        let server = MockServer::start();
2290
2291        let set_mock = server.mock(|when, then| {
2292            when.method(PUT)
2293                .path("/messages.json")
2294                .query_param("print", "silent")
2295                .json_body(json!({ "greeting": "hello" }));
2296            then.status(200)
2297                .header("content-type", "application/json")
2298                .body("null");
2299        });
2300
2301        let get_mock = server.mock(|when, then| {
2302            when.method(GET)
2303                .path("/messages.json")
2304                .query_param("format", "export");
2305            then.status(200)
2306                .header("content-type", "application/json")
2307                .body(r#"{"greeting":"hello"}"#);
2308        });
2309
2310        let options = FirebaseOptions {
2311            project_id: Some("project".into()),
2312            database_url: Some(server.url("/")),
2313            ..Default::default()
2314        };
2315        let app = initialize_app(options, Some(unique_settings()))
2316            .await
2317            .unwrap();
2318        let database = get_database(Some(app)).await.unwrap();
2319        let reference = database.reference("/messages").unwrap();
2320
2321        reference
2322            .set(json!({ "greeting": "hello" }))
2323            .await
2324            .expect("set over REST");
2325        database.clear_root_cache_for_test();
2326        let value = reference.get().await.expect("get over REST");
2327
2328        assert_eq!(value, json!({ "greeting": "hello" }));
2329        set_mock.assert();
2330        get_mock.assert();
2331    }
2332
2333    #[tokio::test(flavor = "multi_thread")]
2334    async fn reference_parent_and_root() {
2335        let options = FirebaseOptions {
2336            project_id: Some("project".into()),
2337            ..Default::default()
2338        };
2339        let app = initialize_app(options, Some(unique_settings()))
2340            .await
2341            .unwrap();
2342        let database = get_database(Some(app)).await.unwrap();
2343
2344        let nested = database.reference("users/alice/profile").unwrap();
2345        let parent = nested.parent().expect("parent reference");
2346        assert_eq!(parent.path(), "/users/alice/");
2347        assert_eq!(parent.parent().unwrap().path(), "/users/");
2348
2349        let root = nested.root();
2350        assert_eq!(root.path(), "/");
2351        assert!(root.parent().is_none());
2352    }
2353
2354    #[tokio::test(flavor = "multi_thread")]
2355    async fn realtime_set_updates_cache_and_listeners() {
2356        let options = FirebaseOptions {
2357            project_id: Some("project".into()),
2358            ..Default::default()
2359        };
2360        let app = initialize_app(options, Some(unique_settings()))
2361            .await
2362            .unwrap();
2363        let database = get_database(Some(app)).await.unwrap();
2364
2365        let reference = database.reference("items/foo").unwrap();
2366        let events: Arc<Mutex<Vec<Value>>> = Arc::new(Mutex::new(Vec::new()));
2367        let events_clone = events.clone();
2368        let _registration = reference
2369            .on_value(move |result| {
2370                if let Ok(snapshot) = result {
2371                    events_clone.lock().unwrap().push(snapshot.value().clone());
2372                }
2373            })
2374            .await
2375            .unwrap();
2376
2377        database
2378            .handle_realtime_action(
2379                "d",
2380                &json!({
2381                    "p": "items/foo",
2382                    "d": { "count": 5 }
2383                }),
2384            )
2385            .await
2386            .unwrap();
2387
2388        let values = events.lock().unwrap();
2389        assert_eq!(values.last().unwrap(), &json!({ "count": 5 }));
2390        drop(values);
2391
2392        assert_eq!(reference.get().await.unwrap(), json!({ "count": 5 }));
2393    }
2394
2395    #[tokio::test(flavor = "multi_thread")]
2396    async fn realtime_merge_updates_cache() {
2397        let options = FirebaseOptions {
2398            project_id: Some("project".into()),
2399            ..Default::default()
2400        };
2401        let app = initialize_app(options, Some(unique_settings()))
2402            .await
2403            .unwrap();
2404        let database = get_database(Some(app)).await.unwrap();
2405
2406        database
2407            .handle_realtime_action(
2408                "d",
2409                &json!({
2410                    "p": "items",
2411                    "d": {"foo": {"count": 1}}
2412                }),
2413            )
2414            .await
2415            .unwrap();
2416
2417        database
2418            .handle_realtime_action(
2419                "m",
2420                &json!({
2421                    "p": "items",
2422                    "d": {
2423                        "foo/count": 2,
2424                        "bar": {"value": true}
2425                    }
2426                }),
2427            )
2428            .await
2429            .unwrap();
2430
2431        let foo = database
2432            .reference("items/foo")
2433            .unwrap()
2434            .get()
2435            .await
2436            .unwrap();
2437        assert_eq!(foo, json!({"count": 2}));
2438
2439        let bar = database
2440            .reference("items/bar")
2441            .unwrap()
2442            .get()
2443            .await
2444            .unwrap();
2445        assert_eq!(bar, json!({"value": true}));
2446    }
2447
2448    #[tokio::test(flavor = "multi_thread")]
2449    async fn datasnapshot_child_and_metadata_helpers() {
2450        let options = FirebaseOptions {
2451            project_id: Some("project".into()),
2452            ..Default::default()
2453        };
2454        let app = initialize_app(options, Some(unique_settings()))
2455            .await
2456            .unwrap();
2457        let database = get_database(Some(app)).await.unwrap();
2458        let profiles = database.reference("profiles").unwrap();
2459
2460        profiles
2461            .set(json!({
2462                "alice": { "age": 31, "city": "Rome" },
2463                "bob": { "age": 29 }
2464            }))
2465            .await
2466            .unwrap();
2467
2468        let captured = Arc::new(Mutex::new(None));
2469        let holder = captured.clone();
2470        profiles
2471            .on_value(move |result| {
2472                if let Ok(snapshot) = result {
2473                    *holder.lock().unwrap() = Some(snapshot);
2474                }
2475            })
2476            .await
2477            .unwrap();
2478
2479        let snapshot = captured.lock().unwrap().clone().expect("initial snapshot");
2480        assert!(snapshot.exists());
2481        assert!(snapshot.has_children());
2482        assert_eq!(snapshot.size(), 2);
2483
2484        let alice = snapshot.child("alice").unwrap();
2485        assert_eq!(alice.key(), Some("alice"));
2486        assert_eq!(alice.size(), 2);
2487        assert!(alice.has_children());
2488        assert_eq!(alice.child("age").unwrap().value(), &json!(31));
2489        assert!(snapshot.has_child("bob").unwrap());
2490        assert!(!snapshot.has_child("carol").unwrap());
2491
2492        let json_output = snapshot.to_json();
2493        assert_eq!(
2494            json_output,
2495            json!({
2496                "alice": { "age": 31, "city": "Rome" },
2497                "bob": { "age": 29 }
2498            })
2499        );
2500    }
2501
2502    #[tokio::test(flavor = "multi_thread")]
2503    async fn child_event_listeners_receive_updates() {
2504        let options = FirebaseOptions {
2505            project_id: Some("project".into()),
2506            ..Default::default()
2507        };
2508        let app = initialize_app(options, Some(unique_settings()))
2509            .await
2510            .unwrap();
2511        let database = get_database(Some(app)).await.unwrap();
2512        let items = database.reference("items").unwrap();
2513
2514        items
2515            .set(json!({
2516                "a": { "count": 1 },
2517                "b": { "count": 2 }
2518            }))
2519            .await
2520            .unwrap();
2521
2522        let added_events = Arc::new(Mutex::new(Vec::<(String, Option<String>)>::new()));
2523        let capture = added_events.clone();
2524        let registration = items
2525            .on_child_added(move |result| {
2526                if let Ok(event) = result {
2527                    capture.lock().unwrap().push((
2528                        event.snapshot.key().unwrap().to_string(),
2529                        event.previous_name.clone(),
2530                    ));
2531                }
2532            })
2533            .await
2534            .unwrap();
2535
2536        {
2537            let events = added_events.lock().unwrap();
2538            assert_eq!(events.len(), 2);
2539            assert_eq!(events[0].0, "a");
2540            assert_eq!(events[1].0, "b");
2541        }
2542
2543        items
2544            .child("c")
2545            .unwrap()
2546            .set(json!({ "count": 3 }))
2547            .await
2548            .unwrap();
2549
2550        {
2551            let events = added_events.lock().unwrap();
2552            assert_eq!(events.len(), 3);
2553            assert_eq!(events[2].0, "c");
2554        }
2555
2556        registration.detach();
2557    }
2558
2559    #[tokio::test(flavor = "multi_thread")]
2560    async fn rest_backend_set_with_priority_includes_metadata() {
2561        let server = MockServer::start();
2562
2563        let put_mock = server.mock(|when, then| {
2564            when.method(PUT)
2565                .path("/items.json")
2566                .query_param("print", "silent")
2567                .json_body(json!({
2568                    ".value": { "count": 1 },
2569                    ".priority": 3
2570                }));
2571            then.status(200)
2572                .header("content-type", "application/json")
2573                .body("null");
2574        });
2575
2576        let options = FirebaseOptions {
2577            project_id: Some("project".into()),
2578            database_url: Some(server.url("/")),
2579            ..Default::default()
2580        };
2581        let app = initialize_app(options, Some(unique_settings()))
2582            .await
2583            .unwrap();
2584        let database = get_database(Some(app)).await.unwrap();
2585        let reference = database.reference("items").unwrap();
2586
2587        reference
2588            .set_with_priority(json!({ "count": 1 }), json!(3))
2589            .await
2590            .unwrap();
2591
2592        put_mock.assert();
2593    }
2594
2595    #[tokio::test(flavor = "multi_thread")]
2596    async fn push_with_value_rest_backend_performs_put() {
2597        let server = MockServer::start();
2598
2599        let push_mock = server.mock(|when, then| {
2600            when.method(PUT)
2601                .path_contains("/messages/")
2602                .query_param("print", "silent")
2603                .json_body(json!({ "text": "hello" }));
2604            then.status(200)
2605                .header("content-type", "application/json")
2606                .body("null");
2607        });
2608
2609        let options = FirebaseOptions {
2610            project_id: Some("project".into()),
2611            database_url: Some(server.url("/")),
2612            ..Default::default()
2613        };
2614        let app = initialize_app(options, Some(unique_settings()))
2615            .await
2616            .unwrap();
2617        let database = get_database(Some(app)).await.unwrap();
2618        let messages = database.reference("messages").unwrap();
2619
2620        let child = messages
2621            .push_with_value(json!({ "text": "hello" }))
2622            .await
2623            .expect("push with value rest");
2624
2625        assert_eq!(child.key().unwrap().len(), 20);
2626        push_mock.assert();
2627    }
2628
2629    #[tokio::test(flavor = "multi_thread")]
2630    async fn rest_backend_uses_patch_for_updates() {
2631        let server = MockServer::start();
2632
2633        let patch_mock = server.mock(|when, then| {
2634            when.method(PATCH)
2635                .path("/items.json")
2636                .query_param("print", "silent")
2637                .json_body(json!({ "a/count": 2, "b": { "flag": true } }));
2638            then.status(200)
2639                .header("content-type", "application/json")
2640                .body("null");
2641        });
2642
2643        let options = FirebaseOptions {
2644            project_id: Some("project".into()),
2645            database_url: Some(server.url("/")),
2646            ..Default::default()
2647        };
2648        let app = initialize_app(options, Some(unique_settings()))
2649            .await
2650            .unwrap();
2651        let database = get_database(Some(app)).await.unwrap();
2652        let reference = database.reference("items").unwrap();
2653
2654        let mut updates = serde_json::Map::new();
2655        updates.insert("a/count".to_string(), json!(2));
2656        updates.insert("b".to_string(), json!({ "flag": true }));
2657        reference.update(updates).await.expect("patch update");
2658
2659        patch_mock.assert();
2660    }
2661
2662    #[tokio::test(flavor = "multi_thread")]
2663    async fn rest_backend_delete_supports_remove() {
2664        let server = MockServer::start();
2665
2666        let delete_mock = server.mock(|when, then| {
2667            when.method(DELETE)
2668                .path("/items.json")
2669                .query_param("print", "silent");
2670            then.status(200).body("null");
2671        });
2672
2673        let options = FirebaseOptions {
2674            project_id: Some("project".into()),
2675            database_url: Some(server.url("/")),
2676            ..Default::default()
2677        };
2678        let app = initialize_app(options, Some(unique_settings()))
2679            .await
2680            .unwrap();
2681        let database = get_database(Some(app)).await.unwrap();
2682        let reference = database.reference("items").unwrap();
2683
2684        reference.remove().await.expect("delete request");
2685        delete_mock.assert();
2686    }
2687
2688    #[tokio::test(flavor = "multi_thread")]
2689    async fn rest_backend_preserves_namespace_query_parameter() {
2690        let server = MockServer::start();
2691
2692        let set_mock = server.mock(|when, then| {
2693            when.method(PUT)
2694                .path("/messages.json")
2695                .query_param("ns", "demo-ns")
2696                .query_param("print", "silent")
2697                .json_body(json!({ "value": 1 }));
2698            then.status(200).body("null");
2699        });
2700
2701        let options = FirebaseOptions {
2702            project_id: Some("project".into()),
2703            database_url: Some(format!("{}?ns=demo-ns", server.url("/"))),
2704            ..Default::default()
2705        };
2706        let app = initialize_app(options, Some(unique_settings()))
2707            .await
2708            .unwrap();
2709        let database = get_database(Some(app)).await.unwrap();
2710        let reference = database.reference("messages").unwrap();
2711
2712        reference.set(json!({ "value": 1 })).await.unwrap();
2713        set_mock.assert();
2714    }
2715
2716    #[tokio::test(flavor = "multi_thread")]
2717    async fn rest_query_order_by_child_and_limit() {
2718        let server = MockServer::start();
2719
2720        let get_mock = server.mock(|when, then| {
2721            when.method(GET)
2722                .path("/items.json")
2723                .query_param("orderBy", "\"score\"")
2724                .query_param("startAt", "100")
2725                .query_param("limitToFirst", "5")
2726                .query_param("format", "export");
2727            then.status(200)
2728                .header("content-type", "application/json")
2729                .body(r#"{"a":{"score":120}}"#);
2730        });
2731
2732        let options = FirebaseOptions {
2733            project_id: Some("project".into()),
2734            database_url: Some(server.url("/")),
2735            ..Default::default()
2736        };
2737        let app = initialize_app(options, Some(unique_settings()))
2738            .await
2739            .unwrap();
2740        let database = get_database(Some(app)).await.unwrap();
2741        let reference = database.reference("items").unwrap();
2742        let filtered = compose_query(
2743            reference,
2744            vec![order_by_child("score"), start_at(100), limit_to_first(5)],
2745        )
2746        .expect("compose query with constraints");
2747
2748        let value = filtered.get().await.unwrap();
2749        assert_eq!(value, json!({ "a": { "score": 120 } }));
2750        get_mock.assert();
2751    }
2752
2753    #[tokio::test(flavor = "multi_thread")]
2754    async fn rest_query_equal_to_with_key() {
2755        let server = MockServer::start();
2756
2757        let get_mock = server.mock(|when, then| {
2758            when.method(GET)
2759                .path("/items.json")
2760                .query_param("orderBy", "\"$key\"")
2761                .query_param("startAt", "\"item-1\",\"item-1\"")
2762                .query_param("endAt", "\"item-1\",\"item-1\"")
2763                .query_param("format", "export");
2764            then.status(200)
2765                .header("content-type", "application/json")
2766                .body(r#"{"item-1":{"value":true}}"#);
2767        });
2768
2769        let options = FirebaseOptions {
2770            project_id: Some("project".into()),
2771            database_url: Some(server.url("/")),
2772            ..Default::default()
2773        };
2774        let app = initialize_app(options, Some(unique_settings()))
2775            .await
2776            .unwrap();
2777        let database = get_database(Some(app)).await.unwrap();
2778        let filtered = compose_query(
2779            database.reference("items").unwrap(),
2780            vec![order_by_key(), equal_to_with_key("item-1", "item-1")],
2781        )
2782        .expect("compose equal_to query");
2783
2784        let value = filtered.get().await.unwrap();
2785        assert_eq!(value, json!({ "item-1": { "value": true } }));
2786        get_mock.assert();
2787    }
2788
2789    #[tokio::test(flavor = "multi_thread")]
2790    async fn limit_to_first_rejects_zero() {
2791        let options = FirebaseOptions {
2792            project_id: Some("project".into()),
2793            ..Default::default()
2794        };
2795        let app = initialize_app(options, Some(unique_settings()))
2796            .await
2797            .unwrap();
2798        let database = get_database(Some(app)).await.unwrap();
2799
2800        let err = database
2801            .reference("items")
2802            .unwrap()
2803            .query()
2804            .limit_to_first(0)
2805            .unwrap_err();
2806
2807        assert_eq!(err.code_str(), "database/invalid-argument");
2808    }
2809
2810    #[tokio::test(flavor = "multi_thread")]
2811    async fn order_by_child_rejects_empty_path() {
2812        let options = FirebaseOptions {
2813            project_id: Some("project".into()),
2814            ..Default::default()
2815        };
2816        let app = initialize_app(options, Some(unique_settings()))
2817            .await
2818            .unwrap();
2819        let database = get_database(Some(app)).await.unwrap();
2820
2821        let err = database
2822            .reference("items")
2823            .unwrap()
2824            .order_by_child("")
2825            .unwrap_err();
2826
2827        assert_eq!(err.code_str(), "database/invalid-argument");
2828    }
2829
2830    #[tokio::test(flavor = "multi_thread")]
2831    async fn query_rejects_multiple_order_by_constraints() {
2832        let options = FirebaseOptions {
2833            project_id: Some("project".into()),
2834            ..Default::default()
2835        };
2836        let app = initialize_app(options, Some(unique_settings()))
2837            .await
2838            .unwrap();
2839        let database = get_database(Some(app)).await.unwrap();
2840        let reference = database.reference("items").unwrap();
2841
2842        let err =
2843            compose_query(reference, vec![order_by_key(), order_by_child("score")]).unwrap_err();
2844
2845        assert_eq!(err.code_str(), "database/invalid-argument");
2846    }
2847
2848    #[tokio::test(flavor = "multi_thread")]
2849    async fn on_value_listener_receives_updates() {
2850        let options = FirebaseOptions {
2851            project_id: Some("project".into()),
2852            ..Default::default()
2853        };
2854        let app = initialize_app(options, Some(unique_settings()))
2855            .await
2856            .unwrap();
2857        let database = get_database(Some(app)).await.unwrap();
2858        let reference = database.reference("counters/main").unwrap();
2859
2860        let events = Arc::new(Mutex::new(Vec::<Value>::new()));
2861        let captured = events.clone();
2862
2863        let registration = reference
2864            .on_value(move |result| {
2865                if let Ok(snapshot) = result {
2866                    captured.lock().unwrap().push(snapshot.value().clone());
2867                }
2868            })
2869            .await
2870            .expect("on_value registration");
2871
2872        reference.set(json!(1)).await.unwrap();
2873        reference.set(json!(2)).await.unwrap();
2874
2875        {
2876            let events = events.lock().unwrap();
2877            assert_eq!(events.len(), 3);
2878            assert_eq!(events[0], Value::Null);
2879            assert_eq!(events[1], json!(1));
2880            assert_eq!(events[2], json!(2));
2881        }
2882
2883        registration.detach();
2884        reference.set(json!(3)).await.unwrap();
2885
2886        let events = events.lock().unwrap();
2887        assert_eq!(events.len(), 3);
2888    }
2889
2890    #[tokio::test(flavor = "multi_thread")]
2891    async fn query_on_value_reacts_to_changes() {
2892        let options = FirebaseOptions {
2893            project_id: Some("project".into()),
2894            ..Default::default()
2895        };
2896        let app = initialize_app(options, Some(unique_settings()))
2897            .await
2898            .unwrap();
2899        let database = get_database(Some(app)).await.unwrap();
2900        let scores = database.reference("scores").unwrap();
2901
2902        scores
2903            .set(json!({
2904                "a": { "score": 10 },
2905                "b": { "score": 20 },
2906                "c": { "score": 30 }
2907            }))
2908            .await
2909            .unwrap();
2910
2911        let events = Arc::new(Mutex::new(Vec::<Value>::new()));
2912        let captured = events.clone();
2913
2914        let _registration = compose_query(
2915            scores.clone(),
2916            vec![order_by_child("score"), limit_to_last(1)],
2917        )
2918        .unwrap()
2919        .on_value(move |result| {
2920            if let Ok(snapshot) = result {
2921                captured.lock().unwrap().push(snapshot.value().clone());
2922            }
2923        })
2924        .await
2925        .unwrap();
2926
2927        {
2928            let events = events.lock().unwrap();
2929            assert_eq!(events.len(), 1);
2930            assert_eq!(
2931                events[0],
2932                json!({
2933                    "a": { "score": 10 },
2934                    "b": { "score": 20 },
2935                    "c": { "score": 30 }
2936                })
2937            );
2938        }
2939
2940        scores
2941            .child("d")
2942            .unwrap()
2943            .set(json!({ "score": 50 }))
2944            .await
2945            .unwrap();
2946
2947        let events = events.lock().unwrap();
2948        assert_eq!(events.len(), 2);
2949        assert_eq!(
2950            events[1],
2951            json!({
2952                "a": { "score": 10 },
2953                "b": { "score": 20 },
2954                "c": { "score": 30 },
2955                "d": { "score": 50 }
2956            })
2957        );
2958    }
2959}