Skip to main content

couchbase_lite/
replicator.rs

1#[cfg(feature = "use-tokio-websocket")]
2mod tokio_socket;
3
4use crate::{
5    error::{c4error_init, Error, Result},
6    ffi::{
7        c4address_fromURL, c4repl_free, c4repl_getStatus, c4repl_new, c4repl_retry, c4repl_start,
8        c4repl_stop, kC4DefaultCollectionSpec, C4Address, C4CollectionSpec, C4DocumentEnded,
9        C4Progress, C4ReplicationCollection, C4Replicator, C4ReplicatorActivityLevel,
10        C4ReplicatorDocumentsEndedCallback, C4ReplicatorMode, C4ReplicatorParameters,
11        C4ReplicatorStatus, C4ReplicatorStatusChangedCallback, C4ReplicatorValidationFunction,
12        C4RevisionFlags, C4String, FLDict, FLSliceResult,
13    },
14    Database,
15};
16use log::{info, trace};
17use std::{
18    mem::{self, MaybeUninit},
19    os::raw::c_void,
20    ptr,
21    ptr::NonNull,
22    slice, str,
23    sync::Once,
24};
25
26/// Replicator of database
27pub struct Replicator {
28    inner: NonNull<C4Replicator>,
29    validation: C4ReplicatorValidationFunction,
30    c_callback_on_status_changed: C4ReplicatorStatusChangedCallback,
31    c_callback_on_documents_ended: C4ReplicatorDocumentsEndedCallback,
32    free_callback_f: unsafe fn(_: *mut c_void),
33    boxed_callback_f: NonNull<c_void>,
34    mode: ReplicatorMode,
35}
36
37/// Parameters describing a replication, used when creating `Replicator`
38pub struct ReplicatorParameters<StateCallback, DocumentsEndedCallback, ValidationF> {
39    validation_cb: ValidationF,
40    state_changed_callback: StateCallback,
41    documents_ended_callback: DocumentsEndedCallback,
42    auth: ReplicatorAuthentication,
43    mode: ReplicatorMode,
44}
45
46#[derive(Clone, Copy)]
47struct ReplicatorMode {
48    push: C4ReplicatorMode,
49    pull: C4ReplicatorMode,
50}
51
52impl<SC, DEC, V> ReplicatorParameters<SC, DEC, V> {
53    #[inline]
54    pub fn with_auth(self, auth: ReplicatorAuthentication) -> Self {
55        Self { auth, ..self }
56    }
57    /// Set callback that can reject incoming revisions.
58    /// Arguments: collection_name, doc_id, rev_id, rev_flags, doc_body.
59    /// It should return false to reject document.
60    #[inline]
61    pub fn with_validation_func<ValidationF>(
62        self,
63        validation_cb: ValidationF,
64    ) -> ReplicatorParameters<SC, DEC, ValidationF>
65    where
66        ValidationF: ReplicatorValidationFunction,
67    {
68        ReplicatorParameters {
69            validation_cb,
70            state_changed_callback: self.state_changed_callback,
71            documents_ended_callback: self.documents_ended_callback,
72            auth: self.auth,
73            mode: self.mode,
74        }
75    }
76    /// Set callback to reports back change of replicator state
77    #[inline]
78    pub fn with_state_changed_callback<StateCallback>(
79        self,
80        state_changed_callback: StateCallback,
81    ) -> ReplicatorParameters<StateCallback, DEC, V>
82    where
83        StateCallback: ReplicatorStatusChangedCallback,
84    {
85        ReplicatorParameters {
86            validation_cb: self.validation_cb,
87            state_changed_callback,
88            documents_ended_callback: self.documents_ended_callback,
89            auth: self.auth,
90            mode: self.mode,
91        }
92    }
93    /// Set callback to reports about the replication status of documents
94    #[inline]
95    pub fn with_documents_ended_callback<DocumentsEndedCallback>(
96        self,
97        documents_ended_callback: DocumentsEndedCallback,
98    ) -> ReplicatorParameters<SC, DocumentsEndedCallback, V>
99    where
100        DocumentsEndedCallback: ReplicatorDocumentsEndedCallback,
101    {
102        ReplicatorParameters {
103            validation_cb: self.validation_cb,
104            state_changed_callback: self.state_changed_callback,
105            documents_ended_callback,
106            auth: self.auth,
107            mode: self.mode,
108        }
109    }
110    /// Set push mode (from db to remote/other db)
111    #[inline]
112    pub fn with_push_mode(self, push: C4ReplicatorMode) -> Self {
113        Self {
114            mode: ReplicatorMode {
115                push,
116                pull: self.mode.pull,
117            },
118            ..self
119        }
120    }
121    /// Set pull mode (from db to remote/other db)
122    #[inline]
123    pub fn with_pull_mode(self, pull: C4ReplicatorMode) -> Self {
124        Self {
125            mode: ReplicatorMode {
126                pull,
127                push: self.mode.push,
128            },
129            ..self
130        }
131    }
132}
133
134impl Default
135    for ReplicatorParameters<
136        fn(ReplicatorState),
137        fn(bool, &mut dyn Iterator<Item = &C4DocumentEnded>),
138        fn(C4String, C4String, C4String, C4RevisionFlags, FLDict) -> bool,
139    >
140{
141    fn default() -> Self {
142        Self {
143            validation_cb: |_coll_name, _doc_id, _rev_id, _rev_flags, _body| true,
144            state_changed_callback: |_repl_state| {},
145            documents_ended_callback: |_pushing, _doc_iter| {},
146            auth: ReplicatorAuthentication::None,
147            mode: ReplicatorMode {
148                push: C4ReplicatorMode::kC4Continuous,
149                pull: C4ReplicatorMode::kC4Continuous,
150            },
151        }
152    }
153}
154
155struct CallbackContext<
156    ValidationCb: ReplicatorValidationFunction,
157    StateCb: ReplicatorStatusChangedCallback,
158    DocumentsEndedCb: ReplicatorDocumentsEndedCallback,
159> {
160    validation_cb: ValidationCb,
161    state_cb: StateCb,
162    docs_ended_cb: DocumentsEndedCb,
163}
164
165#[derive(Clone)]
166pub enum ReplicatorAuthentication {
167    SessionToken(Box<str>),
168    Basic {
169        username: Box<str>,
170        password: Box<str>,
171    },
172    None,
173}
174
175/// it should be safe to call replicator API from any thread
176/// according to <https://github.com/couchbase/couchbase-lite-core/wiki/Thread-Safety>
177unsafe impl Send for Replicator {}
178
179impl Drop for Replicator {
180    #[inline]
181    fn drop(&mut self) {
182        trace!("repl drop {:?}", self.inner.as_ptr());
183        unsafe {
184            c4repl_free(self.inner.as_ptr());
185            (self.free_callback_f)(self.boxed_callback_f.as_ptr());
186        }
187    }
188}
189
190macro_rules! define_trait_alias {
191    ($alias:ident, $($tt:tt)+) => {
192        pub trait $alias: $($tt)+ {}
193        impl<T> $alias for T where T: $($tt)+ {}
194    };
195}
196
197define_trait_alias!(ReplicatorValidationFunction, FnMut(C4CollectionSpec, C4String, C4String, C4RevisionFlags, FLDict) -> bool + Send + 'static);
198define_trait_alias!(
199    ReplicatorStatusChangedCallback,
200    FnMut(ReplicatorState) + Send + 'static
201);
202define_trait_alias!(
203    ReplicatorDocumentsEndedCallback,
204    FnMut(bool, &mut dyn Iterator<Item = &C4DocumentEnded>) + Send + 'static
205);
206
207impl Replicator {
208    /// # Arguments
209    /// * `url` - should be something like "ws://192.168.1.132:4984/demo/"
210    /// * `params` - parameters of replicator
211    pub fn new<StateCallback, DocumentsEndedCallback, ValidationF>(
212        db: &Database,
213        url: &str,
214        params: ReplicatorParameters<StateCallback, DocumentsEndedCallback, ValidationF>,
215    ) -> Result<Self>
216    where
217        ValidationF: ReplicatorValidationFunction,
218        StateCallback: ReplicatorStatusChangedCallback,
219        DocumentsEndedCallback: ReplicatorDocumentsEndedCallback,
220    {
221        unsafe extern "C" fn call_validation<F, F2, F3>(
222            coll_spec: C4CollectionSpec,
223            doc_id: C4String,
224            rev_id: C4String,
225            flags: C4RevisionFlags,
226            body: FLDict,
227            ctx: *mut c_void,
228        ) -> bool
229        where
230            F: ReplicatorValidationFunction,
231            F2: ReplicatorStatusChangedCallback,
232            F3: ReplicatorDocumentsEndedCallback,
233        {
234            let ctx = ctx as *mut CallbackContext<F, F2, F3>;
235            assert!(
236                !ctx.is_null(),
237                "Replicator::call_validation: Internal error - null function pointer"
238            );
239            ((*ctx).validation_cb)(coll_spec, doc_id, rev_id, flags, body)
240        }
241
242        unsafe extern "C" fn call_on_status_changed<F1, F, F3>(
243            c4_repl: *mut C4Replicator,
244            status: C4ReplicatorStatus,
245            ctx: *mut c_void,
246        ) where
247            F1: ReplicatorValidationFunction,
248            F: ReplicatorStatusChangedCallback,
249            F3: ReplicatorDocumentsEndedCallback,
250        {
251            info!("on_status_changed: repl {c4_repl:?}, status {status:?}");
252
253            let ctx = ctx as *mut CallbackContext<F1, F, F3>;
254            assert!(
255                !ctx.is_null(),
256                "Replicator::call_on_status_changed: Internal error - null function pointer"
257            );
258            ((*ctx).state_cb)(ReplicatorState::from(status));
259        }
260
261        unsafe extern "C" fn call_on_documents_ended<F1, F2, F>(
262            c4_repl: *mut C4Replicator,
263            pushing: bool,
264            num_docs: usize,
265            docs: *mut *const C4DocumentEnded,
266            ctx: *mut ::std::os::raw::c_void,
267        ) where
268            F1: ReplicatorValidationFunction,
269            F2: ReplicatorStatusChangedCallback,
270            F: ReplicatorDocumentsEndedCallback,
271        {
272            trace!("on_documents_ended: repl {c4_repl:?} pushing {pushing}, num_docs {num_docs}");
273
274            let ctx = ctx as *mut CallbackContext<F1, F2, F>;
275            assert!(
276                !ctx.is_null(),
277                "Replicator::call_on_documents_ended: Internal error - null function pointer"
278            );
279            let docs: &[*const C4DocumentEnded] = slice::from_raw_parts(docs, num_docs);
280            let mut it = docs.iter().map(|x| &**x);
281            ((*ctx).docs_ended_cb)(pushing, &mut it);
282        }
283
284        let ctx = Box::new(CallbackContext {
285            validation_cb: params.validation_cb,
286            state_cb: params.state_changed_callback,
287            docs_ended_cb: params.documents_ended_callback,
288        });
289        let ctx_p = Box::into_raw(ctx);
290        Replicator::do_new(
291            db,
292            url,
293            &params.auth,
294            free_boxed_value::<CallbackContext<ValidationF, StateCallback, DocumentsEndedCallback>>,
295            unsafe { NonNull::new_unchecked(ctx_p as *mut c_void) },
296            Some(call_validation::<ValidationF, StateCallback, DocumentsEndedCallback>),
297            Some(call_on_status_changed::<ValidationF, StateCallback, DocumentsEndedCallback>),
298            Some(call_on_documents_ended::<ValidationF, StateCallback, DocumentsEndedCallback>),
299            params.mode,
300        )
301    }
302
303    /// starts database replication
304    /// * `reset` - If true, the replicator will reset its checkpoint and start replication from the beginning.
305    pub fn start(&mut self, reset: bool) -> Result<()> {
306        unsafe { c4repl_start(self.inner.as_ptr(), reset) };
307        let status: ReplicatorState = self.status().into();
308        if let ReplicatorState::Stopped(err) = status {
309            Err(err)
310        } else {
311            Ok(())
312        }
313    }
314
315    /// Full recreation of database replicator except callbacks,
316    ///
317    /// * `url`   - new url
318    /// * `auth`  - new auth information
319    /// * `reset` - If true, the replicator will reset its checkpoint and start replication from the beginning.
320    pub fn restart(
321        self,
322        db: &Database,
323        url: &str,
324        auth: &ReplicatorAuthentication,
325        reset: bool,
326    ) -> Result<Self> {
327        let Replicator {
328            inner: prev_inner,
329            free_callback_f,
330            boxed_callback_f,
331            validation,
332            c_callback_on_status_changed,
333            c_callback_on_documents_ended,
334            mode,
335        } = self;
336        mem::forget(self);
337        unsafe {
338            c4repl_stop(prev_inner.as_ptr());
339            c4repl_free(prev_inner.as_ptr());
340        }
341        let mut repl = Replicator::do_new(
342            db,
343            url,
344            auth,
345            free_callback_f,
346            boxed_callback_f,
347            validation,
348            c_callback_on_status_changed,
349            c_callback_on_documents_ended,
350            mode,
351        )?;
352        repl.start(reset)?;
353        Ok(repl)
354    }
355
356    /// Tells a replicator that's in the offline state to reconnect immediately.
357    /// return `true` if the replicator will reconnect, `false` if it won't.
358    pub fn retry(&mut self) -> Result<bool> {
359        trace!("repl retry {:?}", self.inner.as_ptr());
360        let mut c4err = c4error_init();
361        let will_reconnect = unsafe { c4repl_retry(self.inner.as_ptr(), &mut c4err) };
362        if c4err.code == 0 {
363            Ok(will_reconnect)
364        } else {
365            Err(c4err.into())
366        }
367    }
368
369    fn do_new(
370        db: &Database,
371        url: &str,
372        auth: &ReplicatorAuthentication,
373        free_callback_f: unsafe fn(_: *mut c_void),
374        boxed_callback_f: NonNull<c_void>,
375        validation: C4ReplicatorValidationFunction,
376        call_on_status_changed: C4ReplicatorStatusChangedCallback,
377        call_on_documents_ended: C4ReplicatorDocumentsEndedCallback,
378        mode: ReplicatorMode,
379    ) -> Result<Self> {
380        use consts::*;
381
382        let mut remote_addr = MaybeUninit::<C4Address>::uninit();
383        let mut db_name = C4String::default();
384        if !unsafe { c4address_fromURL(url.into(), remote_addr.as_mut_ptr(), &mut db_name) } {
385            return Err(Error::LogicError(format!("Can not parse URL {url}").into()));
386        }
387        let remote_addr = unsafe { remote_addr.assume_init() };
388
389        let options_dict: FLSliceResult = match auth {
390            ReplicatorAuthentication::SessionToken(token) => serde_fleece::fleece!({
391                kC4ReplicatorOptionAuthentication: {
392                    kC4ReplicatorAuthType: kC4AuthTypeSession,
393                    kC4ReplicatorAuthToken: *token,
394                }
395            }),
396            ReplicatorAuthentication::Basic { username, password } => {
397                serde_fleece::fleece!({
398                    kC4ReplicatorOptionAuthentication: {
399                        kC4ReplicatorAuthType: kC4AuthTypeBasic,
400                        kC4ReplicatorAuthUserName: *username,
401                        kC4ReplicatorAuthPassword: *password
402                    }
403                })
404            }
405            ReplicatorAuthentication::None => serde_fleece::fleece!({}),
406        }?;
407
408        let mut collect_opt = C4ReplicationCollection {
409            collection: kC4DefaultCollectionSpec,
410            push: mode.push,
411            pull: mode.pull,
412            optionsDictFleece: Default::default(),
413            pushFilter: None,
414            pullFilter: validation,
415            callbackContext: boxed_callback_f.as_ptr() as *mut c_void,
416        };
417
418        let repl_params = C4ReplicatorParameters {
419            onStatusChanged: call_on_status_changed,
420            onDocumentsEnded: call_on_documents_ended,
421            onBlobProgress: None,
422            propertyEncryptor: ptr::null_mut(),
423            propertyDecryptor: ptr::null_mut(),
424            callbackContext: boxed_callback_f.as_ptr() as *mut c_void,
425            socketFactory: ptr::null_mut(),
426            optionsDictFleece: options_dict.as_fl_slice(),
427            collections: &mut collect_opt,
428            collectionCount: 1,
429        };
430        let mut c4err = c4error_init();
431        let repl = unsafe {
432            c4repl_new(
433                db.inner.0.as_ptr(),
434                remote_addr,
435                db_name,
436                repl_params,
437                &mut c4err,
438            )
439        };
440        trace!("repl new result {repl:?}");
441        NonNull::new(repl)
442            .map(|inner| Replicator {
443                inner,
444                free_callback_f,
445                boxed_callback_f,
446                validation,
447                c_callback_on_status_changed: call_on_status_changed,
448                c_callback_on_documents_ended: call_on_documents_ended,
449                mode,
450            })
451            .ok_or_else(|| {
452                unsafe { free_callback_f(boxed_callback_f.as_ptr()) };
453                c4err.into()
454            })
455    }
456    #[inline]
457    pub fn stop(&mut self) {
458        trace!("repl stop {:?}", self.inner.as_ptr());
459        unsafe { c4repl_stop(self.inner.as_ptr()) };
460    }
461    #[inline]
462    pub fn state(&self) -> ReplicatorState {
463        self.status().into()
464    }
465    pub(crate) fn status(&self) -> C4ReplicatorStatus {
466        unsafe { c4repl_getStatus(self.inner.as_ptr()) }
467    }
468}
469
470/// Represents the current progress of a replicator.
471/// The `units` fields should not be used directly, but divided (`unitsCompleted`/`unitsTotal`)
472/// to give a _very_ approximate progress fraction.
473pub type ReplicatorProgress = C4Progress;
474
475/// The possible states of a replicator
476#[derive(Debug)]
477pub enum ReplicatorState {
478    /// Finished, or got a fatal error.
479    Stopped(Error),
480    /// Offline, replication doesn't not work
481    Offline,
482    /// Connection is in progress.
483    Connecting,
484    /// Continuous replicator has caught up and is waiting for changes.
485    Idle,
486    ///< Connected and actively working.
487    Busy(ReplicatorProgress),
488}
489
490unsafe fn free_boxed_value<T>(p: *mut c_void) {
491    drop(Box::from_raw(p as *mut T));
492}
493
494impl From<C4ReplicatorStatus> for ReplicatorState {
495    #[inline]
496    fn from(status: C4ReplicatorStatus) -> Self {
497        match status.level {
498            C4ReplicatorActivityLevel::kC4Stopped => ReplicatorState::Stopped(status.error.into()),
499            C4ReplicatorActivityLevel::kC4Offline => ReplicatorState::Offline,
500            C4ReplicatorActivityLevel::kC4Connecting => ReplicatorState::Connecting,
501            C4ReplicatorActivityLevel::kC4Idle => ReplicatorState::Idle,
502            C4ReplicatorActivityLevel::kC4Busy => ReplicatorState::Busy(status.progress),
503            C4ReplicatorActivityLevel::kC4Stopping => ReplicatorState::Busy(status.progress),
504        }
505    }
506}
507
508#[allow(non_upper_case_globals)]
509pub(crate) mod consts {
510    macro_rules! define_const_str {
511	($($name:ident,)+) => {
512	    $(pub(crate) const $name: &'static str = match ($crate::ffi::$name).to_str() {
513                Ok(x) => x,
514                Err(_) => panic!("Invalid utf-8 constant"),
515            };)*
516	};
517    }
518
519    define_const_str!(
520        kC4AuthTypeBasic,
521        kC4AuthTypeSession,
522        kC4ReplicatorAuthPassword,
523        kC4ReplicatorAuthToken,
524        kC4ReplicatorAuthType,
525        kC4ReplicatorAuthUserName,
526        kC4ReplicatorOptionAuthentication,
527    );
528
529    #[cfg(feature = "use-tokio-websocket")]
530    define_const_str!(
531        kC4ReplicatorOptionExtraHeaders,
532        kC4ReplicatorOptionCookies,
533        kC4SocketOptionWSProtocols,
534    );
535}
536
537static WEBSOCKET_IMPL: Once = Once::new();
538
539#[cfg(feature = "use-couchbase-lite-websocket")]
540pub(crate) fn init_builtin_socket_impl() {
541    WEBSOCKET_IMPL.call_once(|| {
542        unsafe { crate::ffi::C4RegisterBuiltInWebSocket() };
543    });
544}
545
546#[cfg(feature = "use-tokio-websocket")]
547pub(crate) fn init_tokio_socket_impl(handle: tokio::runtime::Handle) {
548    WEBSOCKET_IMPL.call_once(|| {
549        tokio_socket::c4socket_init(handle);
550    });
551}