Skip to main content

dittolive_ditto/store/
observer.rs

1use_prelude!();
2
3use std::hash::{self, Hash};
4
5use ffi_sdk::{
6    ffi_utils::repr_c, FfiDynChangeHandlerWithSignalNext, FfiDynSignalNext, FfiQueryResult,
7    FfiStoreObserver,
8};
9use uuid::Uuid;
10
11pub use super::*;
12use crate::{error::DittoError, utils::zstr::zstr};
13
14/// Use [`ditto.store().register_observer(...)`] to create an observer with a callback.
15///
16/// [`ditto.store().register_observer(...)`]: crate::store::Store::register_observer
17pub struct StoreObserver {
18    pub(crate) handle: repr_c::Box<FfiStoreObserver>,
19}
20
21trait_alias! {
22    /// A change handler is called whenever an active store observer receives new results.
23    pub trait ChangeHandler =
24        FnMut(QueryResult)
25        + Send // can be dropped in another thread
26        + Sync
27        + 'static // cannot dangle
28}
29
30trait_alias! {
31    /// A callback used to signal that the observer is ready to handle new events.
32    pub trait SignalNext =
33        FnOnce()
34        + 'static // cannot dangle
35        + Send // can be dropped in another thread
36}
37
38trait_alias! {
39    /// A change handler is called whenever an active store observer receives new results.
40    ///
41    /// Call the provided [`SignalNext`] function to signal that the handler is ready to
42    /// receive the next callback from the store observer.
43    pub trait ChangeHandlerWithSignalNext =
44        FnMut(QueryResult, Box<dyn SignalNext>)
45        + 'static // cannot dangle
46        + Send // can be dropped in another thread
47        + Sync
48}
49
50fn dittoffi_store_observer_register_safe<F>(
51    ditto: &'_ Ditto,
52    query: &zstr,
53    query_args: Option<&[u8]>,
54    mut observer: F,
55) -> Result<StoreObserver>
56where
57    F: ChangeHandlerWithSignalNext,
58{
59    let ffi_callback: repr_c::Box<FfiDynChangeHandlerWithSignalNext> = {
60        fn make_callback<F>(f: F) -> repr_c::Box<FfiDynChangeHandlerWithSignalNext>
61        where
62            F: FnMut(repr_c::Box<FfiQueryResult>, repr_c::Arc<FfiDynSignalNext>) + 'static + Send,
63        {
64            Box::new(f).into()
65        }
66
67        make_callback(
68            move |ffi_query_result: repr_c::Box<FfiQueryResult>,
69                  signal_next: repr_c::Arc<FfiDynSignalNext>| {
70                let signal_next = Box::new(move || {
71                    signal_next.call();
72                });
73
74                observer(QueryResult::from(ffi_query_result), signal_next);
75            },
76        )
77    };
78
79    let query = query.into();
80    let query_args_cbor = query_args.map(|qa| qa.into());
81
82    let handle = ffi_sdk::dittoffi_store_register_observer_throws(
83        &ditto.ditto,
84        query,
85        query_args_cbor,
86        ffi_callback,
87    )
88    .into_rust_result()?;
89
90    Ok(StoreObserver { handle })
91}
92
93impl StoreObserver {
94    pub(crate) fn new<F>(
95        ditto: &Ditto,
96        query: &zstr,
97        query_args: Option<&[u8]>,
98        mut on_change: F,
99    ) -> Result<Self, DittoError>
100    where
101        F: ChangeHandler,
102    {
103        Self::with_signal_next(ditto, query, query_args, move |args, signal_next| {
104            on_change(args);
105            signal_next();
106        })
107    }
108
109    pub(crate) fn with_signal_next<F>(
110        ditto: &Ditto,
111        query: &zstr,
112        query_args: Option<&[u8]>,
113        on_change: F,
114    ) -> Result<Self, DittoError>
115    where
116        F: ChangeHandlerWithSignalNext,
117    {
118        let on_change = {
119            let on_change = ::std::sync::Mutex::new(on_change);
120            move |arg: QueryResult, signal_next: Box<dyn SignalNext>| {
121                let mut on_change = on_change
122                    .lock()
123                    .expect("`on_change` observer not to be poisoned");
124                on_change(arg, signal_next)
125            }
126        };
127        dittoffi_store_observer_register_safe(ditto, query, query_args, on_change)
128    }
129
130    /// Return the query string used to create this `StoreObserver`.
131    pub fn query_string(&self) -> String {
132        let char_p = ffi_sdk::dittoffi_store_observer_query_string(&self.handle);
133        char_p.into_string()
134    }
135
136    /// Return the query arguments used to create this `StoreObserver`, if any.
137    pub fn query_arguments(&self) -> Option<serde_cbor::Value> {
138        let buffer: c_slice::Box<u8> =
139            ffi_sdk::dittoffi_store_observer_query_arguments(&self.handle)?;
140        let cbor = serde_cbor::from_slice(buffer.as_slice())
141            .unwrap_or_else(|error| panic!("bug: failed to deserialize CBOR from FFI: {error}"));
142
143        Some(cbor)
144    }
145
146    /// Cancels this [`StoreObserver`] and its callback.
147    ///
148    /// The callback registered with this [`StoreObserver`] will no longer be called
149    /// after calling [`.cancel()`].
150    ///
151    /// This call is a no-op if this [`StoreObserver`] has already been cancelled
152    /// or if the owning [`Ditto`] object has gone out of scope.
153    ///
154    /// [`.cancel()`]: Self::cancel
155    pub fn cancel(&self) {
156        ffi_sdk::dittoffi_store_observer_cancel(&self.handle);
157    }
158
159    /// Returns true if this [`StoreObserver`] has been cancelled.
160    pub fn is_cancelled(&self) -> bool {
161        ffi_sdk::dittoffi_store_observer_is_cancelled(&self.handle)
162    }
163
164    /// Returns the unique identifier for this [`StoreObserver`].
165    fn id(&self) -> Uuid {
166        let buffer = ffi_sdk::dittoffi_store_observer_id(&self.handle);
167        Uuid::from_slice(buffer.as_slice()).expect("bug: expected valid UUID")
168    }
169}
170
171impl StoreObserver {
172    fn comparable_parts(&self) -> impl '_ + Eq + Hash {
173        self.id()
174    }
175}
176
177impl Eq for StoreObserver {}
178impl PartialEq for StoreObserver {
179    fn eq(&self, other: &Self) -> bool {
180        self.comparable_parts() == other.comparable_parts()
181    }
182}
183
184impl Hash for StoreObserver {
185    fn hash<H: hash::Hasher>(&self, h: &mut H) {
186        self.comparable_parts().hash(h)
187    }
188}