couchbase_lite/
replicator.rs

1#[cfg(feature = "use-tokio-websocket")]
2mod tokio_socket;
3
4use crate::{
5    error::{c4error_init, Error, Result},
6    ffi::{
7        c4address_fromURL, c4repl_free, c4repl_getStatus, c4repl_new, c4repl_retry, c4repl_start,
8        c4repl_stop, kC4DefaultCollectionSpec, C4Address, C4CollectionSpec, C4DocumentEnded,
9        C4Progress, C4ReplicationCollection, C4Replicator, C4ReplicatorActivityLevel,
10        C4ReplicatorDocumentsEndedCallback, C4ReplicatorMode, C4ReplicatorParameters,
11        C4ReplicatorStatus, C4ReplicatorStatusChangedCallback, C4ReplicatorValidationFunction,
12        C4RevisionFlags, C4String, FLDict, FLSliceResult,
13    },
14    Database,
15};
16use log::{error, info, trace};
17use std::{
18    mem::{self, MaybeUninit},
19    os::raw::c_void,
20    panic::catch_unwind,
21    process::abort,
22    ptr,
23    ptr::NonNull,
24    slice, str,
25    sync::Once,
26};
27
28/// Replicator of database
29pub struct Replicator {
30    inner: NonNull<C4Replicator>,
31    validation: C4ReplicatorValidationFunction,
32    c_callback_on_status_changed: C4ReplicatorStatusChangedCallback,
33    c_callback_on_documents_ended: C4ReplicatorDocumentsEndedCallback,
34    free_callback_f: unsafe fn(_: *mut c_void),
35    boxed_callback_f: NonNull<c_void>,
36    mode: ReplicatorMode,
37}
38
39/// Parameters describing a replication, used when creating `Replicator`
40pub struct ReplicatorParameters<StateCallback, DocumentsEndedCallback, ValidationF> {
41    validation_cb: ValidationF,
42    state_changed_callback: StateCallback,
43    documents_ended_callback: DocumentsEndedCallback,
44    auth: ReplicatorAuthentication,
45    mode: ReplicatorMode,
46}
47
48#[derive(Clone, Copy)]
49struct ReplicatorMode {
50    push: C4ReplicatorMode,
51    pull: C4ReplicatorMode,
52}
53
54impl<SC, DEC, V> ReplicatorParameters<SC, DEC, V> {
55    #[inline]
56    pub fn with_auth(self, auth: ReplicatorAuthentication) -> Self {
57        Self { auth, ..self }
58    }
59    /// Set callback that can reject incoming revisions.
60    /// Arguments: collection_name, doc_id, rev_id, rev_flags, doc_body.
61    /// It should return false to reject document.
62    #[inline]
63    pub fn with_validation_func<ValidationF>(
64        self,
65        validation_cb: ValidationF,
66    ) -> ReplicatorParameters<SC, DEC, ValidationF>
67    where
68        ValidationF: ReplicatorValidationFunction,
69    {
70        ReplicatorParameters {
71            validation_cb,
72            state_changed_callback: self.state_changed_callback,
73            documents_ended_callback: self.documents_ended_callback,
74            auth: self.auth,
75            mode: self.mode,
76        }
77    }
78    /// Set callback to reports back change of replicator state
79    #[inline]
80    pub fn with_state_changed_callback<StateCallback>(
81        self,
82        state_changed_callback: StateCallback,
83    ) -> ReplicatorParameters<StateCallback, DEC, V>
84    where
85        StateCallback: ReplicatorStatusChangedCallback,
86    {
87        ReplicatorParameters {
88            validation_cb: self.validation_cb,
89            state_changed_callback,
90            documents_ended_callback: self.documents_ended_callback,
91            auth: self.auth,
92            mode: self.mode,
93        }
94    }
95    /// Set callback to reports about the replication status of documents
96    #[inline]
97    pub fn with_documents_ended_callback<DocumentsEndedCallback>(
98        self,
99        documents_ended_callback: DocumentsEndedCallback,
100    ) -> ReplicatorParameters<SC, DocumentsEndedCallback, V>
101    where
102        DocumentsEndedCallback: ReplicatorDocumentsEndedCallback,
103    {
104        ReplicatorParameters {
105            validation_cb: self.validation_cb,
106            state_changed_callback: self.state_changed_callback,
107            documents_ended_callback,
108            auth: self.auth,
109            mode: self.mode,
110        }
111    }
112    /// Set push mode (from db to remote/other db)
113    #[inline]
114    pub fn with_push_mode(self, push: C4ReplicatorMode) -> Self {
115        Self {
116            mode: ReplicatorMode {
117                push,
118                pull: self.mode.pull,
119            },
120            ..self
121        }
122    }
123    /// Set pull mode (from db to remote/other db)
124    #[inline]
125    pub fn with_pull_mode(self, pull: C4ReplicatorMode) -> Self {
126        Self {
127            mode: ReplicatorMode {
128                pull,
129                push: self.mode.push,
130            },
131            ..self
132        }
133    }
134}
135
136impl Default
137    for ReplicatorParameters<
138        fn(ReplicatorState),
139        fn(bool, &mut dyn Iterator<Item = &C4DocumentEnded>),
140        fn(C4String, C4String, C4String, C4RevisionFlags, FLDict) -> bool,
141    >
142{
143    fn default() -> Self {
144        Self {
145            validation_cb: |_coll_name, _doc_id, _rev_id, _rev_flags, _body| true,
146            state_changed_callback: |_repl_state| {},
147            documents_ended_callback: |_pushing, _doc_iter| {},
148            auth: ReplicatorAuthentication::None,
149            mode: ReplicatorMode {
150                push: C4ReplicatorMode::kC4Continuous,
151                pull: C4ReplicatorMode::kC4Continuous,
152            },
153        }
154    }
155}
156
157struct CallbackContext<
158    ValidationCb: ReplicatorValidationFunction,
159    StateCb: ReplicatorStatusChangedCallback,
160    DocumentsEndedCb: ReplicatorDocumentsEndedCallback,
161> {
162    validation_cb: ValidationCb,
163    state_cb: StateCb,
164    docs_ended_cb: DocumentsEndedCb,
165}
166
167#[derive(Clone)]
168pub enum ReplicatorAuthentication {
169    SessionToken(String),
170    Basic { username: String, password: String },
171    None,
172}
173
174/// it should be safe to call replicator API from any thread
175/// according to <https://github.com/couchbase/couchbase-lite-core/wiki/Thread-Safety>
176unsafe impl Send for Replicator {}
177
178impl Drop for Replicator {
179    #[inline]
180    fn drop(&mut self) {
181        trace!("repl drop {:?}", self.inner.as_ptr());
182        unsafe {
183            c4repl_free(self.inner.as_ptr());
184            (self.free_callback_f)(self.boxed_callback_f.as_ptr());
185        }
186    }
187}
188
189macro_rules! define_trait_alias {
190    ($alias:ident, $($tt:tt)+) => {
191        pub trait $alias: $($tt)+ {}
192        impl<T> $alias for T where T: $($tt)+ {}
193    };
194}
195
196define_trait_alias!(ReplicatorValidationFunction, FnMut(C4CollectionSpec, C4String, C4String, C4RevisionFlags, FLDict) -> bool + Send + 'static);
197define_trait_alias!(
198    ReplicatorStatusChangedCallback,
199    FnMut(ReplicatorState) + Send + 'static
200);
201define_trait_alias!(
202    ReplicatorDocumentsEndedCallback,
203    FnMut(bool, &mut dyn Iterator<Item = &C4DocumentEnded>) + Send + 'static
204);
205
206impl Replicator {
207    /// # Arguments
208    /// * `url` - should be something like "ws://192.168.1.132:4984/demo/"
209    /// * `params` - parameters of replicator
210    pub fn new<StateCallback, DocumentsEndedCallback, ValidationF>(
211        db: &Database,
212        url: &str,
213        params: ReplicatorParameters<StateCallback, DocumentsEndedCallback, ValidationF>,
214    ) -> Result<Self>
215    where
216        ValidationF: ReplicatorValidationFunction,
217        StateCallback: ReplicatorStatusChangedCallback,
218        DocumentsEndedCallback: ReplicatorDocumentsEndedCallback,
219    {
220        unsafe extern "C" fn call_validation<F, F2, F3>(
221            coll_spec: C4CollectionSpec,
222            doc_id: C4String,
223            rev_id: C4String,
224            flags: C4RevisionFlags,
225            body: FLDict,
226            ctx: *mut c_void,
227        ) -> bool
228        where
229            F: ReplicatorValidationFunction,
230            F2: ReplicatorStatusChangedCallback,
231            F3: ReplicatorDocumentsEndedCallback,
232        {
233            let r = catch_unwind(|| {
234                let ctx = ctx as *mut CallbackContext<F, F2, F3>;
235                assert!(
236                    !ctx.is_null(),
237                    "Replicator::call_validation: Internal error - null function pointer"
238                );
239                ((*ctx).validation_cb)(coll_spec, doc_id, rev_id, flags, body)
240            });
241            r.unwrap_or_else(|_| {
242                error!("Replicator::call_validation: catch panic aborting");
243                abort();
244            })
245        }
246
247        unsafe extern "C" fn call_on_status_changed<F1, F, F3>(
248            c4_repl: *mut C4Replicator,
249            status: C4ReplicatorStatus,
250            ctx: *mut c_void,
251        ) where
252            F1: ReplicatorValidationFunction,
253            F: ReplicatorStatusChangedCallback,
254            F3: ReplicatorDocumentsEndedCallback,
255        {
256            info!("on_status_changed: repl {c4_repl:?}, status {status:?}");
257            let r = catch_unwind(|| {
258                let ctx = ctx as *mut CallbackContext<F1, F, F3>;
259                assert!(
260                    !ctx.is_null(),
261                    "Replicator::call_on_status_changed: Internal error - null function pointer"
262                );
263                ((*ctx).state_cb)(ReplicatorState::from(status));
264            });
265            if r.is_err() {
266                error!("Replicator::call_on_status_changed: catch panic aborting");
267                abort();
268            }
269        }
270
271        unsafe extern "C" fn call_on_documents_ended<F1, F2, F>(
272            c4_repl: *mut C4Replicator,
273            pushing: bool,
274            num_docs: usize,
275            docs: *mut *const C4DocumentEnded,
276            ctx: *mut ::std::os::raw::c_void,
277        ) where
278            F1: ReplicatorValidationFunction,
279            F2: ReplicatorStatusChangedCallback,
280            F: ReplicatorDocumentsEndedCallback,
281        {
282            trace!(
283                "on_documents_ended: repl {:?} pushing {}, num_docs {}",
284                c4_repl,
285                pushing,
286                num_docs
287            );
288            let r = catch_unwind(|| {
289                let ctx = ctx as *mut CallbackContext<F1, F2, F>;
290                assert!(
291                    !ctx.is_null(),
292                    "Replicator::call_on_documents_ended: Internal error - null function pointer"
293                );
294                let docs: &[*const C4DocumentEnded] = slice::from_raw_parts(docs, num_docs);
295                let mut it = docs.iter().map(|x| &**x);
296                ((*ctx).docs_ended_cb)(pushing, &mut it);
297            });
298            if r.is_err() {
299                error!("Replicator::call_on_documents_ended: catch panic aborting");
300                abort();
301            }
302        }
303
304        let ctx = Box::new(CallbackContext {
305            validation_cb: params.validation_cb,
306            state_cb: params.state_changed_callback,
307            docs_ended_cb: params.documents_ended_callback,
308        });
309        let ctx_p = Box::into_raw(ctx);
310        Replicator::do_new(
311            db,
312            url,
313            &params.auth,
314            free_boxed_value::<CallbackContext<ValidationF, StateCallback, DocumentsEndedCallback>>,
315            unsafe { NonNull::new_unchecked(ctx_p as *mut c_void) },
316            Some(call_validation::<ValidationF, StateCallback, DocumentsEndedCallback>),
317            Some(call_on_status_changed::<ValidationF, StateCallback, DocumentsEndedCallback>),
318            Some(call_on_documents_ended::<ValidationF, StateCallback, DocumentsEndedCallback>),
319            params.mode,
320        )
321    }
322
323    /// starts database replication
324    /// * `reset` - If true, the replicator will reset its checkpoint
325    ///             and start replication from the beginning.
326    pub fn start(&mut self, reset: bool) -> Result<()> {
327        unsafe { c4repl_start(self.inner.as_ptr(), reset) };
328        let status: ReplicatorState = self.status().into();
329        if let ReplicatorState::Stopped(err) = status {
330            Err(err)
331        } else {
332            Ok(())
333        }
334    }
335
336    /// Full recreation of database replicator except callbacks,
337    ///
338    /// * `url`   - new url
339    /// * `auth`  - new auth information
340    /// * `reset` - If true, the replicator will reset its checkpoint
341    ///             and start replication from the beginning.
342    pub fn restart(
343        self,
344        db: &Database,
345        url: &str,
346        auth: &ReplicatorAuthentication,
347        reset: bool,
348    ) -> Result<Self> {
349        let Replicator {
350            inner: prev_inner,
351            free_callback_f,
352            boxed_callback_f,
353            validation,
354            c_callback_on_status_changed,
355            c_callback_on_documents_ended,
356            mode,
357        } = self;
358        mem::forget(self);
359        unsafe {
360            c4repl_stop(prev_inner.as_ptr());
361            c4repl_free(prev_inner.as_ptr());
362        }
363        let mut repl = Replicator::do_new(
364            db,
365            url,
366            auth,
367            free_callback_f,
368            boxed_callback_f,
369            validation,
370            c_callback_on_status_changed,
371            c_callback_on_documents_ended,
372            mode,
373        )?;
374        repl.start(reset)?;
375        Ok(repl)
376    }
377
378    /// Tells a replicator that's in the offline state to reconnect immediately.
379    /// return `true` if the replicator will reconnect, `false` if it won't.
380    pub fn retry(&mut self) -> Result<bool> {
381        trace!("repl retry {:?}", self.inner.as_ptr());
382        let mut c4err = c4error_init();
383        let will_reconnect = unsafe { c4repl_retry(self.inner.as_ptr(), &mut c4err) };
384        if c4err.code == 0 {
385            Ok(will_reconnect)
386        } else {
387            Err(c4err.into())
388        }
389    }
390
391    fn do_new(
392        db: &Database,
393        url: &str,
394        auth: &ReplicatorAuthentication,
395        free_callback_f: unsafe fn(_: *mut c_void),
396        boxed_callback_f: NonNull<c_void>,
397        validation: C4ReplicatorValidationFunction,
398        call_on_status_changed: C4ReplicatorStatusChangedCallback,
399        call_on_documents_ended: C4ReplicatorDocumentsEndedCallback,
400        mode: ReplicatorMode,
401    ) -> Result<Self> {
402        use consts::*;
403
404        let mut remote_addr = MaybeUninit::<C4Address>::uninit();
405        let mut db_name = C4String::default();
406        if !unsafe { c4address_fromURL(url.into(), remote_addr.as_mut_ptr(), &mut db_name) } {
407            return Err(Error::LogicError(format!("Can not parse URL {url}").into()));
408        }
409        let remote_addr = unsafe { remote_addr.assume_init() };
410
411        let options_dict: FLSliceResult = match auth {
412            ReplicatorAuthentication::SessionToken(token) => serde_fleece::fleece!({
413                kC4ReplicatorOptionAuthentication: {
414                    kC4ReplicatorAuthType: kC4AuthTypeSession,
415                    kC4ReplicatorAuthToken: token.as_str(),
416                }
417            }),
418            ReplicatorAuthentication::Basic { username, password } => {
419                serde_fleece::fleece!({
420                    kC4ReplicatorOptionAuthentication: {
421                        kC4ReplicatorAuthType: kC4AuthTypeBasic,
422                        kC4ReplicatorAuthUserName: username.as_str(),
423                        kC4ReplicatorAuthPassword: password.as_str()
424                    }
425                })
426            }
427            ReplicatorAuthentication::None => serde_fleece::fleece!({}),
428        }?;
429
430        let mut collect_opt = C4ReplicationCollection {
431            collection: kC4DefaultCollectionSpec,
432            push: mode.push,
433            pull: mode.pull,
434            optionsDictFleece: Default::default(),
435            pushFilter: None,
436            pullFilter: validation,
437            callbackContext: boxed_callback_f.as_ptr() as *mut c_void,
438        };
439
440        let repl_params = C4ReplicatorParameters {
441            onStatusChanged: call_on_status_changed,
442            onDocumentsEnded: call_on_documents_ended,
443            onBlobProgress: None,
444            propertyEncryptor: ptr::null_mut(),
445            propertyDecryptor: ptr::null_mut(),
446            callbackContext: boxed_callback_f.as_ptr() as *mut c_void,
447            socketFactory: ptr::null_mut(),
448            optionsDictFleece: options_dict.as_fl_slice(),
449            collections: &mut collect_opt,
450            collectionCount: 1,
451        };
452        let mut c4err = c4error_init();
453        let repl = unsafe {
454            c4repl_new(
455                db.inner.0.as_ptr(),
456                remote_addr,
457                db_name,
458                repl_params,
459                &mut c4err,
460            )
461        };
462        trace!("repl new result {repl:?}");
463        NonNull::new(repl)
464            .map(|inner| Replicator {
465                inner,
466                free_callback_f,
467                boxed_callback_f,
468                validation,
469                c_callback_on_status_changed: call_on_status_changed,
470                c_callback_on_documents_ended: call_on_documents_ended,
471                mode,
472            })
473            .ok_or_else(|| {
474                unsafe { free_callback_f(boxed_callback_f.as_ptr()) };
475                c4err.into()
476            })
477    }
478    #[inline]
479    pub fn stop(&mut self) {
480        trace!("repl stop {:?}", self.inner.as_ptr());
481        unsafe { c4repl_stop(self.inner.as_ptr()) };
482    }
483    #[inline]
484    pub fn state(&self) -> ReplicatorState {
485        self.status().into()
486    }
487    pub(crate) fn status(&self) -> C4ReplicatorStatus {
488        unsafe { c4repl_getStatus(self.inner.as_ptr()) }
489    }
490}
491
492/// Represents the current progress of a replicator.
493/// The `units` fields should not be used directly, but divided (`unitsCompleted`/`unitsTotal`)
494/// to give a _very_ approximate progress fraction.
495pub type ReplicatorProgress = C4Progress;
496
497/// The possible states of a replicator
498#[derive(Debug)]
499pub enum ReplicatorState {
500    /// Finished, or got a fatal error.
501    Stopped(Error),
502    /// Offline, replication doesn't not work
503    Offline,
504    /// Connection is in progress.
505    Connecting,
506    /// Continuous replicator has caught up and is waiting for changes.
507    Idle,
508    ///< Connected and actively working.
509    Busy(ReplicatorProgress),
510}
511
512unsafe fn free_boxed_value<T>(p: *mut c_void) {
513    drop(Box::from_raw(p as *mut T));
514}
515
516impl From<C4ReplicatorStatus> for ReplicatorState {
517    fn from(status: C4ReplicatorStatus) -> Self {
518        match status.level {
519            C4ReplicatorActivityLevel::kC4Stopped => ReplicatorState::Stopped(status.error.into()),
520            C4ReplicatorActivityLevel::kC4Offline => ReplicatorState::Offline,
521            C4ReplicatorActivityLevel::kC4Connecting => ReplicatorState::Connecting,
522            C4ReplicatorActivityLevel::kC4Idle => ReplicatorState::Idle,
523            C4ReplicatorActivityLevel::kC4Busy => ReplicatorState::Busy(status.progress),
524            C4ReplicatorActivityLevel::kC4Stopping => ReplicatorState::Busy(status.progress),
525        }
526    }
527}
528
529#[allow(non_upper_case_globals)]
530pub(crate) mod consts {
531    macro_rules! define_const_str {
532	($($name:ident,)+) => {
533	    $(pub(crate) const $name: &'static str = match ($crate::ffi::$name).to_str() {
534                Ok(x) => x,
535                Err(_) => panic!("Invalid utf-8 constant"),
536            };)*
537	};
538    }
539
540    define_const_str!(
541        kC4AuthTypeBasic,
542        kC4AuthTypeSession,
543        kC4ReplicatorAuthPassword,
544        kC4ReplicatorAuthToken,
545        kC4ReplicatorAuthType,
546        kC4ReplicatorAuthUserName,
547        kC4ReplicatorOptionAuthentication,
548    );
549
550    #[cfg(feature = "use-tokio-websocket")]
551    define_const_str!(
552        kC4ReplicatorOptionExtraHeaders,
553        kC4ReplicatorOptionCookies,
554        kC4SocketOptionWSProtocols,
555    );
556}
557
558static WEBSOCKET_IMPL: Once = Once::new();
559
560#[cfg(feature = "use-couchbase-lite-websocket")]
561pub(crate) fn init_builtin_socket_impl() {
562    WEBSOCKET_IMPL.call_once(|| {
563        unsafe { crate::ffi::C4RegisterBuiltInWebSocket() };
564    });
565}
566
567#[cfg(feature = "use-tokio-websocket")]
568pub(crate) fn init_tokio_socket_impl(handle: tokio::runtime::Handle) {
569    WEBSOCKET_IMPL.call_once(|| {
570        tokio_socket::c4socket_init(handle);
571    });
572}