dittolive_ditto/store/query_builder/live_query/
mod.rs1use_prelude!();
2
3use std::sync::Weak;
4
5use ffi_sdk::{self, COrderByParam, ChangeHandlerWithDocsDiff, LiveQueryAvailability};
6
7use crate::{
8 ditto::TryUpgrade,
9 error::{DittoError, ErrorKind},
10 subscription::Subscription,
11};
12
13mod event;
14mod single_document_event;
15
16#[path = "move.rs"]
17#[doc(hidden)]
18mod move_module;
19
20pub use event::LiveQueryEvent;
21pub use move_module::LiveQueryMove;
22pub use single_document_event::SingleDocumentLiveQueryEvent;
23
24trait_alias! {
25 pub
28 trait SingleDocumentEventHandler =
29 FnMut(Option<BoxedDocument>, SingleDocumentLiveQueryEvent)
30 + Send + 'static }
33
34trait_alias! {
35 pub
37 trait EventHandler =
38 FnMut(Vec<BoxedDocument>, LiveQueryEvent)
39 + Send + 'static }
42
43#[must_use = "Dropping a `LiveQuery` will cancel its event handlers"]
77pub struct LiveQuery {
78 ditto: Weak<BoxedDitto>,
79 pub query: char_p::Box,
80 pub collection_name: char_p::Box,
81 pub subscription: Option<Box<Subscription>>,
83 pub id: i64,
84}
85
86const UNASSIGNED_ID_SENTINEL: i64 = -1;
87
88impl LiveQuery {
89 #[doc(hidden)]
90 #[deprecated(note = "Use `pending_cursor_op.observe_local(...)` to obtain a LiveQuery")]
91 #[rustfmt::skip]
92 #[allow(clippy::too_many_arguments)]
93 pub
94 fn with_handler<F> (
95 ditto: Weak<BoxedDitto>,
96 query: char_p::Box,
97 query_args: Option<Vec<u8>>,
98 collection_name: char_p::Box,
99 order_by: &'_ [COrderByParam<'_>], limit: i32,
101 offset: u32,
102 event_handler: F,
103 ) -> Result<Self, DittoError>
104 where
105 F : EventHandler,
106 {
107 let strong_ditto = ditto.try_upgrade()?;
108
109 let event_handler: Arc<F> = Arc::new(event_handler);
110 let ctx = Arc::as_ptr(&event_handler) as *mut c_void;
111 let retain_ctx = {
112 unsafe extern "C"
113 fn retain<F>(ctx: *mut c_void) {
114 Arc::<F>::increment_strong_count(ctx.cast());
115 }
116
117 retain::<F>
118 };
119 let release_ctx = {
120 unsafe extern "C"
121 fn release<F>(ctx: *mut c_void) {
122 drop(Arc::<F>::from_raw(ctx.cast()));
123 }
124
125 release::<F>
126 };
127
128 #[allow(improper_ctypes_definitions)] unsafe extern "C"
130 fn c_cb<F : EventHandler> (ctx: *mut c_void, p: ChangeHandlerWithDocsDiff)
131 {
132
133 let ctx: *mut F = ctx.cast();
134 let event_handler: &mut F = ctx.as_mut().expect("Got NULL");
139 let event = if p.is_initial { LiveQueryEvent::Initial
141 } else {
142 let moves: Vec<LiveQueryMove> =
143 p .moves
144 .as_ref()
145 .unwrap()
146 .as_slice()
147 .chunks_exact(2)
148 .map(|it| if let [from, to] = *it {
149 LiveQueryMove { from, to }
150 } else { unreachable!() })
151 .collect()
152 ;
153 LiveQueryEvent::Update {
154 old_documents: p.old_documents.unwrap().into(),
155 insertions: p.insertions.unwrap().into(),
156 deletions: p.deletions.unwrap().into(),
157 updates: p.updates.unwrap().into(),
158 moves,
159 }
160 };
161 event_handler(p.documents.into(), event);
162 }
163
164 let mut this = LiveQuery {
165 ditto,
166 query,
167 collection_name,
168 subscription: None,
169 id: UNASSIGNED_ID_SENTINEL,
170 };
171
172 let id = &mut this.id;
173 #[allow(deprecated)] let availability = LiveQueryAvailability::Always;
175
176
177 *id = unsafe {
178 #[allow(deprecated)]
180 ffi_sdk::ditto_live_query_register_str(
181 &*strong_ditto,
182 this.collection_name.as_ref(),
183 this.query.as_ref(),
184 query_args.as_ref().map(|qa| (&qa[..]).into()),
185 order_by.into(),
186 limit,
187 offset,
188 availability,
189 ctx,
190 Some(retain_ctx),
191 Some(release_ctx),
192 c_cb::<F>,
193 ).ok_or(ErrorKind::InvalidInput)?
194 };
195
196 if *id == UNASSIGNED_ID_SENTINEL {
197 error!("live query was not given a valid id");
198 } else {
199 debug!(%id, "live query id");
200 }
201
202 {
204 ffi_sdk::ditto_live_query_start(&*strong_ditto, *id);
205 }
206 Ok(this)
207 }
208}
209
210impl Drop for LiveQuery {
211 fn drop(self: &'_ mut LiveQuery) {
212 debug!(id = %self.id, "dropping LiveQuery");
215
216 if let Some(ditto) = self.ditto.upgrade() {
217 #[allow(deprecated)]
219 ffi_sdk::ditto_live_query_stop(&*ditto, self.id);
220 }
221 }
222}
223
224#[allow(deprecated)]
225impl crate::observer::Observer for LiveQuery {}