dittolive_ditto/sync/sync_subscription.rs
1use std::{
2 cmp::Ordering,
3 hash::{self, Hash},
4};
5
6use ffi_sdk::{
7 ffi_utils::{c_slice, char_p, repr_c},
8 FfiSyncSubscription,
9};
10
11use crate::{
12 ditto::Ditto,
13 error::DittoError,
14 utils::{extension_traits::FfiResultIntoRustResult, zstr::zstr},
15};
16
17/// Use [`ditto.sync().register_subscription(...)`] to create a `SyncSubscription`
18///
19/// The subscription will remain active until either:
20///
21/// - the `SyncSubscription` is explicitly canceled via [`.cancel()`], or
22/// - the owning [`Ditto`] object goes out of scope
23///
24/// [See the `sync` module documentation for more details][0].
25///
26/// [`ditto.sync().register_subscription(...)`]: crate::sync::Sync::register_subscription
27/// [`.cancel()`]: Self::cancel
28/// [0]: crate::sync
29pub struct SyncSubscription {
30 pub(crate) handle: repr_c::Box<FfiSyncSubscription>,
31}
32
33impl SyncSubscription {
34 pub(crate) fn new(
35 ditto: &Ditto,
36 query: &zstr,
37 query_args: Option<&[u8]>,
38 ) -> Result<Self, DittoError> {
39 let handle = ffi_sdk::dittoffi_sync_register_subscription_throws(
40 &ditto.ditto,
41 query.into(),
42 query_args.map(|a| a.into()),
43 )
44 .into_rust_result()?;
45
46 Ok(Self { handle })
47 }
48
49 /// Returns the DQL query string that this [`SyncSubscription`] is subscribed to.
50 ///
51 /// # Example
52 ///
53 /// ```
54 /// use dittolive_ditto::prelude::*;
55 ///
56 /// # fn example(ditto: &Ditto) -> anyhow::Result<()> {
57 /// let sync_subscription = ditto
58 /// .sync()
59 /// .register_subscription("SELECT * FROM cars", None)?;
60 ///
61 /// assert_eq!(sync_subscription.query_string(), "SELECT * FROM cars");
62 /// # Ok(())
63 /// # }
64 /// ```
65 pub fn query_string(&self) -> String {
66 let cbox: char_p::Box = ffi_sdk::dittoffi_sync_subscription_query_string(&self.handle);
67 cbox.into_string()
68 }
69
70 /// Returns the DQL query arguments that this [`SyncSubscription`] is subscribed to.
71 ///
72 /// # Example
73 ///
74 /// ```
75 /// use dittolive_ditto::prelude::*;
76 ///
77 /// # fn example(ditto: &Ditto) -> anyhow::Result<()> {
78 /// let sync_subscription = ditto
79 /// .sync()
80 /// .register_subscription("SELECT * FROM cars", Some(serde_json::json!({
81 /// "color": "red",
82 /// }).into()))?;
83 ///
84 /// let maybe_args = sync_subscription.query_arguments();
85 /// let args = maybe_args.expect("expected query arguments");
86 /// let args_json = serde_json::to_value(&args)?;
87 ///
88 /// assert_eq!(args_json, serde_json::json!({
89 /// "color": "red",
90 /// }));
91 ///
92 /// # Ok(())
93 /// # }
94 /// ```
95 pub fn query_arguments(&self) -> Option<serde_cbor::Value> {
96 let buffer: c_slice::Box<u8> =
97 ffi_sdk::dittoffi_sync_subscription_query_arguments(&self.handle)?;
98
99 let cbor = serde_cbor::from_slice(buffer.as_slice())
100 .unwrap_or_else(|error| panic!("bug: failed to deserialize CBOR from FFI: {error}"));
101 Some(cbor)
102 }
103
104 /// Cancels this [`SyncSubscription`], so that changes matching the query are no longer
105 /// synced from other peers to this one.
106 ///
107 /// # Example
108 ///
109 /// ```
110 /// use dittolive_ditto::Ditto;
111 ///
112 /// # fn example(ditto: &Ditto) -> anyhow::Result<()> {
113 /// let subscription = ditto
114 /// .sync()
115 /// .register_subscription("SELECT * FROM cars", None)?;
116 /// assert!(!subscription.is_cancelled());
117 ///
118 /// subscription.cancel();
119 /// assert!(subscription.is_cancelled());
120 /// # Ok(())
121 /// # }
122 /// ```
123 pub fn cancel(&self) {
124 ffi_sdk::dittoffi_sync_subscription_cancel(&self.handle);
125 }
126
127 /// Returns `true` if this [`SyncSubscription`] has been cancelled, `false` otherwise.
128 ///
129 /// # Example
130 ///
131 /// ```
132 /// use dittolive_ditto::Ditto;
133 ///
134 /// # fn example(ditto: &Ditto) -> anyhow::Result<()> {
135 /// let subscription = ditto
136 /// .sync()
137 /// .register_subscription("SELECT * FROM cars", None)?;
138 /// assert!(!subscription.is_cancelled());
139 ///
140 /// subscription.cancel();
141 /// assert!(subscription.is_cancelled());
142 /// # Ok(())
143 /// # }
144 /// ```
145 pub fn is_cancelled(&self) -> bool {
146 ffi_sdk::dittoffi_sync_subscription_is_cancelled(&self.handle)
147 }
148
149 /// Intentionally left non-public as the ID representation is an implementation detail
150 ///
151 /// The only reason this is here is to power the Ord/Hash/Debug impls
152 fn id(&self) -> impl '_ + Ord + Hash + core::fmt::Debug {
153 ffi_sdk::dittoffi_sync_subscription_id(&self.handle)
154 }
155}
156
157impl std::fmt::Debug for SyncSubscription {
158 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159 f.debug_struct("SyncSubscription")
160 .field("id", &self.id())
161 .finish_non_exhaustive()
162 }
163}
164
165impl std::fmt::Display for SyncSubscription {
166 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
167 std::fmt::Debug::fmt(self, f)
168 }
169}
170
171impl Ord for SyncSubscription {
172 fn cmp(&self, other: &Self) -> Ordering {
173 Ord::cmp(&self.id(), &other.id())
174 }
175}
176
177impl PartialOrd for SyncSubscription {
178 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
179 Some(Ord::cmp(self, other))
180 }
181}
182
183impl Eq for SyncSubscription {}
184impl PartialEq for SyncSubscription {
185 fn eq(&self, other: &Self) -> bool {
186 self.id() == other.id()
187 }
188}
189
190impl Hash for SyncSubscription {
191 fn hash<H: hash::Hasher>(&self, h: &mut H) {
192 self.id().hash(h)
193 }
194}