Skip to main content

dittolive_ditto/store/query_builder/live_query/
mod.rs

1use_prelude!();
2
3use std::sync::Weak;
4
5use ffi_sdk::{self, COrderByParam, ChangeHandlerWithDocsDiff, LiveQueryAvailability};
6
7use crate::{
8    ditto::TryUpgrade,
9    error::{DittoError, ErrorKind},
10    subscription::Subscription,
11};
12
13mod event;
14mod single_document_event;
15
16#[path = "move.rs"]
17#[doc(hidden)]
18mod move_module;
19
20pub use event::LiveQueryEvent;
21pub use move_module::LiveQueryMove;
22pub use single_document_event::SingleDocumentLiveQueryEvent;
23
24trait_alias! {
25    /// A closure which is called on each event for a single document live query
26    /// (`find_by_id(...).observe_local(...)`)
27    pub
28    trait SingleDocumentEventHandler =
29        FnMut(Option<BoxedDocument>, SingleDocumentLiveQueryEvent)
30        + Send // can be dropped in another thread
31        + 'static // cannot dangle
32}
33
34trait_alias! {
35    /// A closure which is called on each event
36    pub
37    trait EventHandler =
38        FnMut(Vec<BoxedDocument>, LiveQueryEvent)
39        + Send // can be dropped in another thread
40        + 'static // cannot dangle
41}
42
43/// Use [`.observe_local(...)`] to create a `LiveQuery` with a callback that fires when
44/// any queried documents are changed in the local store.
45///
46/// The `LiveQuery` object itself is a handle that keeps the registered callback alive.
47/// Dropping a `LiveQuery` will cancel any future calls to the callback, so be sure to
48/// keep it in scope for as long as you wish to continue receiving updates.
49///
50/// # Example
51///
52/// ```
53/// # use dittolive_ditto::prelude::*;
54/// # fn example(ditto: &Ditto) -> anyhow::Result<()> {
55/// // Hold the LiveQuery to keep the callback alive
56/// let _live_query =
57///     ditto
58///         .store()
59///         .collection("cars")?
60///         .find_all()
61///         .observe_local(|docs, _event| {
62///             let typed_docs: Vec<serde_json::Value> = docs
63///                 .into_iter()
64///                 .flat_map(|doc| doc.typed::<serde_json::Value>().ok())
65///                 .collect();
66///             println!("Received doc updates: {typed_docs:?}");
67///         })?;
68///
69/// // Drop the LiveQuery to cancel the callback
70/// drop(_live_query);
71/// # Ok(())
72/// # }
73/// ```
74///
75/// [`.observe_local(...)`]: crate::store::query_builder::PendingCursorOperation::observe_local
76#[must_use = "Dropping a `LiveQuery` will cancel its event handlers"]
77pub struct LiveQuery {
78    ditto: Weak<BoxedDitto>,
79    pub query: char_p::Box,
80    pub collection_name: char_p::Box,
81    // TODO: remove, as this is now always `None`. But removing it is technically a breaking change
82    pub subscription: Option<Box<Subscription>>,
83    pub id: i64,
84}
85
86const UNASSIGNED_ID_SENTINEL: i64 = -1;
87
88impl LiveQuery {
89    #[doc(hidden)]
90    #[deprecated(note = "Use `pending_cursor_op.observe_local(...)` to obtain a LiveQuery")]
91    #[rustfmt::skip]
92    #[allow(clippy::too_many_arguments)]
93    pub
94    fn with_handler<F> (
95        ditto: Weak<BoxedDitto>,
96        query: char_p::Box,
97        query_args: Option<Vec<u8>>,
98        collection_name: char_p::Box,
99        order_by: &'_ [COrderByParam<'_>], // Note COrderByParam is not opaque
100        limit: i32,
101        offset: u32,
102        event_handler: F,
103    ) -> Result<Self, DittoError>
104    where
105        F : EventHandler,
106    {
107        let strong_ditto = ditto.try_upgrade()?;
108
109        let event_handler: Arc<F> = Arc::new(event_handler);
110        let ctx = Arc::as_ptr(&event_handler) as *mut c_void;
111        let retain_ctx = {
112            unsafe extern "C"
113            fn retain<F>(ctx: *mut c_void) {
114                Arc::<F>::increment_strong_count(ctx.cast());
115            }
116
117            retain::<F>
118        };
119        let release_ctx = {
120            unsafe extern "C"
121            fn release<F>(ctx: *mut c_void) {
122                drop(Arc::<F>::from_raw(ctx.cast()));
123            }
124
125            release::<F>
126        };
127
128        #[allow(improper_ctypes_definitions)] // false positive
129        unsafe extern "C"
130        fn c_cb<F : EventHandler> (ctx: *mut c_void, p: ChangeHandlerWithDocsDiff)
131        {
132
133            let ctx: *mut F = ctx.cast();
134            // Note: this assumes non-concurrent calls from the FFI, which is currently true.
135            // Should it not be the case, we'd have to `Mutex`-wrap the `F` so as to `.lock()`,
136            // here (or require `EventHandler : Sync + Fn`, which would make the caller be the one
137            // having to do that, but it would be a breaking change).
138            let event_handler: &mut F = ctx.as_mut().expect("Got NULL");
139            let event = if p.is_initial { // more explicit flag for "initial" status
140                LiveQueryEvent::Initial
141            } else {
142                let moves: Vec<LiveQueryMove> =
143                    p   .moves
144                        .as_ref()
145                        .unwrap()
146                        .as_slice()
147                        .chunks_exact(2)
148                        .map(|it| if let [from, to] = *it {
149                            LiveQueryMove { from, to }
150                        } else { unreachable!() })
151                        .collect()
152                ;
153                LiveQueryEvent::Update {
154                    old_documents: p.old_documents.unwrap().into(),
155                    insertions: p.insertions.unwrap().into(),
156                    deletions: p.deletions.unwrap().into(),
157                    updates: p.updates.unwrap().into(),
158                    moves,
159                }
160            };
161            event_handler(p.documents.into(), event);
162        }
163
164        let mut this = LiveQuery {
165            ditto,
166            query,
167            collection_name,
168            subscription: None,
169            id: UNASSIGNED_ID_SENTINEL,
170        };
171
172        let id = &mut this.id;
173        #[allow(deprecated)] // not deprecated on the SDK side.
174        let availability = LiveQueryAvailability::Always;
175
176
177        *id = unsafe {
178            // TODO(v5): Remove QueryBuilder entirely
179            #[allow(deprecated)]
180            ffi_sdk::ditto_live_query_register_str(
181                &*strong_ditto,
182                this.collection_name.as_ref(),
183                this.query.as_ref(),
184                query_args.as_ref().map(|qa| (&qa[..]).into()),
185                order_by.into(),
186                limit,
187                offset,
188                availability,
189                ctx,
190                Some(retain_ctx),
191                Some(release_ctx),
192                c_cb::<F>,
193            ).ok_or(ErrorKind::InvalidInput)?
194        };
195
196        if *id == UNASSIGNED_ID_SENTINEL {
197            error!("live query was not given a valid id");
198        } else {
199            debug!(%id, "live query id");
200        }
201
202        // start the query
203        {
204            ffi_sdk::ditto_live_query_start(&*strong_ditto, *id);
205        }
206        Ok(this)
207    }
208}
209
210impl Drop for LiveQuery {
211    fn drop(self: &'_ mut LiveQuery) {
212        // Signals to the FFI that it is now free to *eventually* release the
213        // live query's event handler.
214        debug!(id = %self.id, "dropping LiveQuery");
215
216        if let Some(ditto) = self.ditto.upgrade() {
217            // TODO(v5): Remove QueryBuilder entirely
218            #[allow(deprecated)]
219            ffi_sdk::ditto_live_query_stop(&*ditto, self.id);
220        }
221    }
222}
223
224#[allow(deprecated)]
225impl crate::observer::Observer for LiveQuery {}