Skip to main content

dittolive_ditto/sync/
sync_subscription.rs

1use std::{
2    cmp::Ordering,
3    hash::{self, Hash},
4};
5
6use ffi_sdk::{
7    ffi_utils::{c_slice, char_p, repr_c},
8    FfiSyncSubscription,
9};
10
11use crate::{
12    ditto::Ditto,
13    error::DittoError,
14    utils::{extension_traits::FfiResultIntoRustResult, zstr::zstr},
15};
16
17/// Use [`ditto.sync().register_subscription(...)`] to create a `SyncSubscription`
18///
19/// The subscription will remain active until either:
20///
21/// - the `SyncSubscription` is explicitly canceled via [`.cancel()`], or
22/// - the owning [`Ditto`] object goes out of scope
23///
24/// [See the `sync` module documentation for more details][0].
25///
26/// [`ditto.sync().register_subscription(...)`]: crate::sync::Sync::register_subscription
27/// [`.cancel()`]: Self::cancel
28/// [0]: crate::sync
29pub struct SyncSubscription {
30    pub(crate) handle: repr_c::Box<FfiSyncSubscription>,
31}
32
33impl SyncSubscription {
34    pub(crate) fn new(
35        ditto: &Ditto,
36        query: &zstr,
37        query_args: Option<&[u8]>,
38    ) -> Result<Self, DittoError> {
39        let handle = ffi_sdk::dittoffi_sync_register_subscription_throws(
40            &ditto.ditto,
41            query.into(),
42            query_args.map(|a| a.into()),
43        )
44        .into_rust_result()?;
45
46        Ok(Self { handle })
47    }
48
49    /// Returns the DQL query string that this [`SyncSubscription`] is subscribed to.
50    ///
51    /// # Example
52    ///
53    /// ```
54    /// use dittolive_ditto::prelude::*;
55    ///
56    /// # fn example(ditto: &Ditto) -> anyhow::Result<()> {
57    /// let sync_subscription = ditto
58    ///     .sync()
59    ///     .register_subscription("SELECT * FROM cars", None)?;
60    ///
61    /// assert_eq!(sync_subscription.query_string(), "SELECT * FROM cars");
62    /// # Ok(())
63    /// # }
64    /// ```
65    pub fn query_string(&self) -> String {
66        let cbox: char_p::Box = ffi_sdk::dittoffi_sync_subscription_query_string(&self.handle);
67        cbox.into_string()
68    }
69
70    /// Returns the DQL query arguments that this [`SyncSubscription`] is subscribed to.
71    ///
72    /// # Example
73    ///
74    /// ```
75    /// use dittolive_ditto::prelude::*;
76    ///
77    /// # fn example(ditto: &Ditto) -> anyhow::Result<()> {
78    /// let sync_subscription = ditto
79    ///     .sync()
80    ///     .register_subscription("SELECT * FROM cars", Some(serde_json::json!({
81    ///         "color": "red",
82    ///     }).into()))?;
83    ///
84    /// let maybe_args = sync_subscription.query_arguments();
85    /// let args = maybe_args.expect("expected query arguments");
86    /// let args_json = serde_json::to_value(&args)?;
87    ///
88    /// assert_eq!(args_json, serde_json::json!({
89    ///    "color": "red",
90    /// }));
91    ///
92    /// # Ok(())
93    /// # }
94    /// ```
95    pub fn query_arguments(&self) -> Option<serde_cbor::Value> {
96        let buffer: c_slice::Box<u8> =
97            ffi_sdk::dittoffi_sync_subscription_query_arguments(&self.handle)?;
98
99        let cbor = serde_cbor::from_slice(buffer.as_slice())
100            .unwrap_or_else(|error| panic!("bug: failed to deserialize CBOR from FFI: {error}"));
101        Some(cbor)
102    }
103
104    /// Cancels this [`SyncSubscription`], so that changes matching the query are no longer
105    /// synced from other peers to this one.
106    ///
107    /// # Example
108    ///
109    /// ```
110    /// use dittolive_ditto::Ditto;
111    ///
112    /// # fn example(ditto: &Ditto) -> anyhow::Result<()> {
113    /// let subscription = ditto
114    ///     .sync()
115    ///     .register_subscription("SELECT * FROM cars", None)?;
116    /// assert!(!subscription.is_cancelled());
117    ///
118    /// subscription.cancel();
119    /// assert!(subscription.is_cancelled());
120    /// # Ok(())
121    /// # }
122    /// ```
123    pub fn cancel(&self) {
124        ffi_sdk::dittoffi_sync_subscription_cancel(&self.handle);
125    }
126
127    /// Returns `true` if this [`SyncSubscription`] has been cancelled, `false` otherwise.
128    ///
129    /// # Example
130    ///
131    /// ```
132    /// use dittolive_ditto::Ditto;
133    ///
134    /// # fn example(ditto: &Ditto) -> anyhow::Result<()> {
135    /// let subscription = ditto
136    ///     .sync()
137    ///     .register_subscription("SELECT * FROM cars", None)?;
138    /// assert!(!subscription.is_cancelled());
139    ///
140    /// subscription.cancel();
141    /// assert!(subscription.is_cancelled());
142    /// # Ok(())
143    /// # }
144    /// ```
145    pub fn is_cancelled(&self) -> bool {
146        ffi_sdk::dittoffi_sync_subscription_is_cancelled(&self.handle)
147    }
148
149    /// Intentionally left non-public as the ID representation is an implementation detail
150    ///
151    /// The only reason this is here is to power the Ord/Hash/Debug impls
152    fn id(&self) -> impl '_ + Ord + Hash + core::fmt::Debug {
153        ffi_sdk::dittoffi_sync_subscription_id(&self.handle)
154    }
155}
156
157impl std::fmt::Debug for SyncSubscription {
158    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159        f.debug_struct("SyncSubscription")
160            .field("id", &self.id())
161            .finish_non_exhaustive()
162    }
163}
164
165impl std::fmt::Display for SyncSubscription {
166    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
167        std::fmt::Debug::fmt(self, f)
168    }
169}
170
171impl Ord for SyncSubscription {
172    fn cmp(&self, other: &Self) -> Ordering {
173        Ord::cmp(&self.id(), &other.id())
174    }
175}
176
177impl PartialOrd for SyncSubscription {
178    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
179        Some(Ord::cmp(self, other))
180    }
181}
182
183impl Eq for SyncSubscription {}
184impl PartialEq for SyncSubscription {
185    fn eq(&self, other: &Self) -> bool {
186        self.id() == other.id()
187    }
188}
189
190impl Hash for SyncSubscription {
191    fn hash<H: hash::Hasher>(&self, h: &mut H) {
192        self.id().hash(h)
193    }
194}