dittolive_ditto/store/
observer.rs1use_prelude!();
2
3use std::hash::{self, Hash};
4
5use ffi_sdk::{
6 ffi_utils::repr_c, FfiDynChangeHandlerWithSignalNext, FfiDynSignalNext, FfiQueryResult,
7 FfiStoreObserver,
8};
9use uuid::Uuid;
10
11pub use super::*;
12use crate::{error::DittoError, utils::zstr::zstr};
13
14pub struct StoreObserver {
18 pub(crate) handle: repr_c::Box<FfiStoreObserver>,
19}
20
21trait_alias! {
22 pub trait ChangeHandler =
24 FnMut(QueryResult)
25 + Send + Sync
27 + 'static }
29
30trait_alias! {
31 pub trait SignalNext =
33 FnOnce()
34 + 'static + Send }
37
38trait_alias! {
39 pub trait ChangeHandlerWithSignalNext =
44 FnMut(QueryResult, Box<dyn SignalNext>)
45 + 'static + Send + Sync
48}
49
50fn dittoffi_store_observer_register_safe<F>(
51 ditto: &'_ Ditto,
52 query: &zstr,
53 query_args: Option<&[u8]>,
54 mut observer: F,
55) -> Result<StoreObserver>
56where
57 F: ChangeHandlerWithSignalNext,
58{
59 let ffi_callback: repr_c::Box<FfiDynChangeHandlerWithSignalNext> = {
60 fn make_callback<F>(f: F) -> repr_c::Box<FfiDynChangeHandlerWithSignalNext>
61 where
62 F: FnMut(repr_c::Box<FfiQueryResult>, repr_c::Arc<FfiDynSignalNext>) + 'static + Send,
63 {
64 Box::new(f).into()
65 }
66
67 make_callback(
68 move |ffi_query_result: repr_c::Box<FfiQueryResult>,
69 signal_next: repr_c::Arc<FfiDynSignalNext>| {
70 let signal_next = Box::new(move || {
71 signal_next.call();
72 });
73
74 observer(QueryResult::from(ffi_query_result), signal_next);
75 },
76 )
77 };
78
79 let query = query.into();
80 let query_args_cbor = query_args.map(|qa| qa.into());
81
82 let handle = ffi_sdk::dittoffi_store_register_observer_throws(
83 &ditto.ditto,
84 query,
85 query_args_cbor,
86 ffi_callback,
87 )
88 .into_rust_result()?;
89
90 Ok(StoreObserver { handle })
91}
92
93impl StoreObserver {
94 pub(crate) fn new<F>(
95 ditto: &Ditto,
96 query: &zstr,
97 query_args: Option<&[u8]>,
98 mut on_change: F,
99 ) -> Result<Self, DittoError>
100 where
101 F: ChangeHandler,
102 {
103 Self::with_signal_next(ditto, query, query_args, move |args, signal_next| {
104 on_change(args);
105 signal_next();
106 })
107 }
108
109 pub(crate) fn with_signal_next<F>(
110 ditto: &Ditto,
111 query: &zstr,
112 query_args: Option<&[u8]>,
113 on_change: F,
114 ) -> Result<Self, DittoError>
115 where
116 F: ChangeHandlerWithSignalNext,
117 {
118 let on_change = {
119 let on_change = ::std::sync::Mutex::new(on_change);
120 move |arg: QueryResult, signal_next: Box<dyn SignalNext>| {
121 let mut on_change = on_change
122 .lock()
123 .expect("`on_change` observer not to be poisoned");
124 on_change(arg, signal_next)
125 }
126 };
127 dittoffi_store_observer_register_safe(ditto, query, query_args, on_change)
128 }
129
130 pub fn query_string(&self) -> String {
132 let char_p = ffi_sdk::dittoffi_store_observer_query_string(&self.handle);
133 char_p.into_string()
134 }
135
136 pub fn query_arguments(&self) -> Option<serde_cbor::Value> {
138 let buffer: c_slice::Box<u8> =
139 ffi_sdk::dittoffi_store_observer_query_arguments(&self.handle)?;
140 let cbor = serde_cbor::from_slice(buffer.as_slice())
141 .unwrap_or_else(|error| panic!("bug: failed to deserialize CBOR from FFI: {error}"));
142
143 Some(cbor)
144 }
145
146 pub fn cancel(&self) {
156 ffi_sdk::dittoffi_store_observer_cancel(&self.handle);
157 }
158
159 pub fn is_cancelled(&self) -> bool {
161 ffi_sdk::dittoffi_store_observer_is_cancelled(&self.handle)
162 }
163
164 fn id(&self) -> Uuid {
166 let buffer = ffi_sdk::dittoffi_store_observer_id(&self.handle);
167 Uuid::from_slice(buffer.as_slice()).expect("bug: expected valid UUID")
168 }
169}
170
171impl StoreObserver {
172 fn comparable_parts(&self) -> impl '_ + Eq + Hash {
173 self.id()
174 }
175}
176
177impl Eq for StoreObserver {}
178impl PartialEq for StoreObserver {
179 fn eq(&self, other: &Self) -> bool {
180 self.comparable_parts() == other.comparable_parts()
181 }
182}
183
184impl Hash for StoreObserver {
185 fn hash<H: hash::Hasher>(&self, h: &mut H) {
186 self.comparable_parts().hash(h)
187 }
188}