1#[cfg(feature = "use-tokio-websocket")]
2mod tokio_socket;
3
4use crate::{
5 error::{c4error_init, Error, Result},
6 ffi::{
7 c4address_fromURL, c4repl_free, c4repl_getStatus, c4repl_new, c4repl_retry, c4repl_start,
8 c4repl_stop, kC4DefaultCollectionSpec, C4Address, C4CollectionSpec, C4DocumentEnded,
9 C4Progress, C4ReplicationCollection, C4Replicator, C4ReplicatorActivityLevel,
10 C4ReplicatorDocumentsEndedCallback, C4ReplicatorMode, C4ReplicatorParameters,
11 C4ReplicatorStatus, C4ReplicatorStatusChangedCallback, C4ReplicatorValidationFunction,
12 C4RevisionFlags, C4String, FLDict, FLSliceResult,
13 },
14 Database,
15};
16use log::{error, info, trace};
17use std::{
18 mem::{self, MaybeUninit},
19 os::raw::c_void,
20 panic::catch_unwind,
21 process::abort,
22 ptr,
23 ptr::NonNull,
24 slice, str,
25 sync::Once,
26};
27
28pub struct Replicator {
30 inner: NonNull<C4Replicator>,
31 validation: C4ReplicatorValidationFunction,
32 c_callback_on_status_changed: C4ReplicatorStatusChangedCallback,
33 c_callback_on_documents_ended: C4ReplicatorDocumentsEndedCallback,
34 free_callback_f: unsafe fn(_: *mut c_void),
35 boxed_callback_f: NonNull<c_void>,
36 mode: ReplicatorMode,
37}
38
39pub struct ReplicatorParameters<StateCallback, DocumentsEndedCallback, ValidationF> {
41 validation_cb: ValidationF,
42 state_changed_callback: StateCallback,
43 documents_ended_callback: DocumentsEndedCallback,
44 auth: ReplicatorAuthentication,
45 mode: ReplicatorMode,
46}
47
48#[derive(Clone, Copy)]
49struct ReplicatorMode {
50 push: C4ReplicatorMode,
51 pull: C4ReplicatorMode,
52}
53
54impl<SC, DEC, V> ReplicatorParameters<SC, DEC, V> {
55 #[inline]
56 pub fn with_auth(self, auth: ReplicatorAuthentication) -> Self {
57 Self { auth, ..self }
58 }
59 #[inline]
63 pub fn with_validation_func<ValidationF>(
64 self,
65 validation_cb: ValidationF,
66 ) -> ReplicatorParameters<SC, DEC, ValidationF>
67 where
68 ValidationF: ReplicatorValidationFunction,
69 {
70 ReplicatorParameters {
71 validation_cb,
72 state_changed_callback: self.state_changed_callback,
73 documents_ended_callback: self.documents_ended_callback,
74 auth: self.auth,
75 mode: self.mode,
76 }
77 }
78 #[inline]
80 pub fn with_state_changed_callback<StateCallback>(
81 self,
82 state_changed_callback: StateCallback,
83 ) -> ReplicatorParameters<StateCallback, DEC, V>
84 where
85 StateCallback: ReplicatorStatusChangedCallback,
86 {
87 ReplicatorParameters {
88 validation_cb: self.validation_cb,
89 state_changed_callback,
90 documents_ended_callback: self.documents_ended_callback,
91 auth: self.auth,
92 mode: self.mode,
93 }
94 }
95 #[inline]
97 pub fn with_documents_ended_callback<DocumentsEndedCallback>(
98 self,
99 documents_ended_callback: DocumentsEndedCallback,
100 ) -> ReplicatorParameters<SC, DocumentsEndedCallback, V>
101 where
102 DocumentsEndedCallback: ReplicatorDocumentsEndedCallback,
103 {
104 ReplicatorParameters {
105 validation_cb: self.validation_cb,
106 state_changed_callback: self.state_changed_callback,
107 documents_ended_callback,
108 auth: self.auth,
109 mode: self.mode,
110 }
111 }
112 #[inline]
114 pub fn with_push_mode(self, push: C4ReplicatorMode) -> Self {
115 Self {
116 mode: ReplicatorMode {
117 push,
118 pull: self.mode.pull,
119 },
120 ..self
121 }
122 }
123 #[inline]
125 pub fn with_pull_mode(self, pull: C4ReplicatorMode) -> Self {
126 Self {
127 mode: ReplicatorMode {
128 pull,
129 push: self.mode.push,
130 },
131 ..self
132 }
133 }
134}
135
136impl Default
137 for ReplicatorParameters<
138 fn(ReplicatorState),
139 fn(bool, &mut dyn Iterator<Item = &C4DocumentEnded>),
140 fn(C4String, C4String, C4String, C4RevisionFlags, FLDict) -> bool,
141 >
142{
143 fn default() -> Self {
144 Self {
145 validation_cb: |_coll_name, _doc_id, _rev_id, _rev_flags, _body| true,
146 state_changed_callback: |_repl_state| {},
147 documents_ended_callback: |_pushing, _doc_iter| {},
148 auth: ReplicatorAuthentication::None,
149 mode: ReplicatorMode {
150 push: C4ReplicatorMode::kC4Continuous,
151 pull: C4ReplicatorMode::kC4Continuous,
152 },
153 }
154 }
155}
156
157struct CallbackContext<
158 ValidationCb: ReplicatorValidationFunction,
159 StateCb: ReplicatorStatusChangedCallback,
160 DocumentsEndedCb: ReplicatorDocumentsEndedCallback,
161> {
162 validation_cb: ValidationCb,
163 state_cb: StateCb,
164 docs_ended_cb: DocumentsEndedCb,
165}
166
167#[derive(Clone)]
168pub enum ReplicatorAuthentication {
169 SessionToken(String),
170 Basic { username: String, password: String },
171 None,
172}
173
174unsafe impl Send for Replicator {}
177
178impl Drop for Replicator {
179 #[inline]
180 fn drop(&mut self) {
181 trace!("repl drop {:?}", self.inner.as_ptr());
182 unsafe {
183 c4repl_free(self.inner.as_ptr());
184 (self.free_callback_f)(self.boxed_callback_f.as_ptr());
185 }
186 }
187}
188
189macro_rules! define_trait_alias {
190 ($alias:ident, $($tt:tt)+) => {
191 pub trait $alias: $($tt)+ {}
192 impl<T> $alias for T where T: $($tt)+ {}
193 };
194}
195
196define_trait_alias!(ReplicatorValidationFunction, FnMut(C4CollectionSpec, C4String, C4String, C4RevisionFlags, FLDict) -> bool + Send + 'static);
197define_trait_alias!(
198 ReplicatorStatusChangedCallback,
199 FnMut(ReplicatorState) + Send + 'static
200);
201define_trait_alias!(
202 ReplicatorDocumentsEndedCallback,
203 FnMut(bool, &mut dyn Iterator<Item = &C4DocumentEnded>) + Send + 'static
204);
205
206impl Replicator {
207 pub fn new<StateCallback, DocumentsEndedCallback, ValidationF>(
211 db: &Database,
212 url: &str,
213 params: ReplicatorParameters<StateCallback, DocumentsEndedCallback, ValidationF>,
214 ) -> Result<Self>
215 where
216 ValidationF: ReplicatorValidationFunction,
217 StateCallback: ReplicatorStatusChangedCallback,
218 DocumentsEndedCallback: ReplicatorDocumentsEndedCallback,
219 {
220 unsafe extern "C" fn call_validation<F, F2, F3>(
221 coll_spec: C4CollectionSpec,
222 doc_id: C4String,
223 rev_id: C4String,
224 flags: C4RevisionFlags,
225 body: FLDict,
226 ctx: *mut c_void,
227 ) -> bool
228 where
229 F: ReplicatorValidationFunction,
230 F2: ReplicatorStatusChangedCallback,
231 F3: ReplicatorDocumentsEndedCallback,
232 {
233 let r = catch_unwind(|| {
234 let ctx = ctx as *mut CallbackContext<F, F2, F3>;
235 assert!(
236 !ctx.is_null(),
237 "Replicator::call_validation: Internal error - null function pointer"
238 );
239 ((*ctx).validation_cb)(coll_spec, doc_id, rev_id, flags, body)
240 });
241 r.unwrap_or_else(|_| {
242 error!("Replicator::call_validation: catch panic aborting");
243 abort();
244 })
245 }
246
247 unsafe extern "C" fn call_on_status_changed<F1, F, F3>(
248 c4_repl: *mut C4Replicator,
249 status: C4ReplicatorStatus,
250 ctx: *mut c_void,
251 ) where
252 F1: ReplicatorValidationFunction,
253 F: ReplicatorStatusChangedCallback,
254 F3: ReplicatorDocumentsEndedCallback,
255 {
256 info!("on_status_changed: repl {c4_repl:?}, status {status:?}");
257 let r = catch_unwind(|| {
258 let ctx = ctx as *mut CallbackContext<F1, F, F3>;
259 assert!(
260 !ctx.is_null(),
261 "Replicator::call_on_status_changed: Internal error - null function pointer"
262 );
263 ((*ctx).state_cb)(ReplicatorState::from(status));
264 });
265 if r.is_err() {
266 error!("Replicator::call_on_status_changed: catch panic aborting");
267 abort();
268 }
269 }
270
271 unsafe extern "C" fn call_on_documents_ended<F1, F2, F>(
272 c4_repl: *mut C4Replicator,
273 pushing: bool,
274 num_docs: usize,
275 docs: *mut *const C4DocumentEnded,
276 ctx: *mut ::std::os::raw::c_void,
277 ) where
278 F1: ReplicatorValidationFunction,
279 F2: ReplicatorStatusChangedCallback,
280 F: ReplicatorDocumentsEndedCallback,
281 {
282 trace!(
283 "on_documents_ended: repl {:?} pushing {}, num_docs {}",
284 c4_repl,
285 pushing,
286 num_docs
287 );
288 let r = catch_unwind(|| {
289 let ctx = ctx as *mut CallbackContext<F1, F2, F>;
290 assert!(
291 !ctx.is_null(),
292 "Replicator::call_on_documents_ended: Internal error - null function pointer"
293 );
294 let docs: &[*const C4DocumentEnded] = slice::from_raw_parts(docs, num_docs);
295 let mut it = docs.iter().map(|x| &**x);
296 ((*ctx).docs_ended_cb)(pushing, &mut it);
297 });
298 if r.is_err() {
299 error!("Replicator::call_on_documents_ended: catch panic aborting");
300 abort();
301 }
302 }
303
304 let ctx = Box::new(CallbackContext {
305 validation_cb: params.validation_cb,
306 state_cb: params.state_changed_callback,
307 docs_ended_cb: params.documents_ended_callback,
308 });
309 let ctx_p = Box::into_raw(ctx);
310 Replicator::do_new(
311 db,
312 url,
313 ¶ms.auth,
314 free_boxed_value::<CallbackContext<ValidationF, StateCallback, DocumentsEndedCallback>>,
315 unsafe { NonNull::new_unchecked(ctx_p as *mut c_void) },
316 Some(call_validation::<ValidationF, StateCallback, DocumentsEndedCallback>),
317 Some(call_on_status_changed::<ValidationF, StateCallback, DocumentsEndedCallback>),
318 Some(call_on_documents_ended::<ValidationF, StateCallback, DocumentsEndedCallback>),
319 params.mode,
320 )
321 }
322
323 pub fn start(&mut self, reset: bool) -> Result<()> {
327 unsafe { c4repl_start(self.inner.as_ptr(), reset) };
328 let status: ReplicatorState = self.status().into();
329 if let ReplicatorState::Stopped(err) = status {
330 Err(err)
331 } else {
332 Ok(())
333 }
334 }
335
336 pub fn restart(
343 self,
344 db: &Database,
345 url: &str,
346 auth: &ReplicatorAuthentication,
347 reset: bool,
348 ) -> Result<Self> {
349 let Replicator {
350 inner: prev_inner,
351 free_callback_f,
352 boxed_callback_f,
353 validation,
354 c_callback_on_status_changed,
355 c_callback_on_documents_ended,
356 mode,
357 } = self;
358 mem::forget(self);
359 unsafe {
360 c4repl_stop(prev_inner.as_ptr());
361 c4repl_free(prev_inner.as_ptr());
362 }
363 let mut repl = Replicator::do_new(
364 db,
365 url,
366 auth,
367 free_callback_f,
368 boxed_callback_f,
369 validation,
370 c_callback_on_status_changed,
371 c_callback_on_documents_ended,
372 mode,
373 )?;
374 repl.start(reset)?;
375 Ok(repl)
376 }
377
378 pub fn retry(&mut self) -> Result<bool> {
381 trace!("repl retry {:?}", self.inner.as_ptr());
382 let mut c4err = c4error_init();
383 let will_reconnect = unsafe { c4repl_retry(self.inner.as_ptr(), &mut c4err) };
384 if c4err.code == 0 {
385 Ok(will_reconnect)
386 } else {
387 Err(c4err.into())
388 }
389 }
390
391 fn do_new(
392 db: &Database,
393 url: &str,
394 auth: &ReplicatorAuthentication,
395 free_callback_f: unsafe fn(_: *mut c_void),
396 boxed_callback_f: NonNull<c_void>,
397 validation: C4ReplicatorValidationFunction,
398 call_on_status_changed: C4ReplicatorStatusChangedCallback,
399 call_on_documents_ended: C4ReplicatorDocumentsEndedCallback,
400 mode: ReplicatorMode,
401 ) -> Result<Self> {
402 use consts::*;
403
404 let mut remote_addr = MaybeUninit::<C4Address>::uninit();
405 let mut db_name = C4String::default();
406 if !unsafe { c4address_fromURL(url.into(), remote_addr.as_mut_ptr(), &mut db_name) } {
407 return Err(Error::LogicError(format!("Can not parse URL {url}").into()));
408 }
409 let remote_addr = unsafe { remote_addr.assume_init() };
410
411 let options_dict: FLSliceResult = match auth {
412 ReplicatorAuthentication::SessionToken(token) => serde_fleece::fleece!({
413 kC4ReplicatorOptionAuthentication: {
414 kC4ReplicatorAuthType: kC4AuthTypeSession,
415 kC4ReplicatorAuthToken: token.as_str(),
416 }
417 }),
418 ReplicatorAuthentication::Basic { username, password } => {
419 serde_fleece::fleece!({
420 kC4ReplicatorOptionAuthentication: {
421 kC4ReplicatorAuthType: kC4AuthTypeBasic,
422 kC4ReplicatorAuthUserName: username.as_str(),
423 kC4ReplicatorAuthPassword: password.as_str()
424 }
425 })
426 }
427 ReplicatorAuthentication::None => serde_fleece::fleece!({}),
428 }?;
429
430 let mut collect_opt = C4ReplicationCollection {
431 collection: kC4DefaultCollectionSpec,
432 push: mode.push,
433 pull: mode.pull,
434 optionsDictFleece: Default::default(),
435 pushFilter: None,
436 pullFilter: validation,
437 callbackContext: boxed_callback_f.as_ptr() as *mut c_void,
438 };
439
440 let repl_params = C4ReplicatorParameters {
441 onStatusChanged: call_on_status_changed,
442 onDocumentsEnded: call_on_documents_ended,
443 onBlobProgress: None,
444 propertyEncryptor: ptr::null_mut(),
445 propertyDecryptor: ptr::null_mut(),
446 callbackContext: boxed_callback_f.as_ptr() as *mut c_void,
447 socketFactory: ptr::null_mut(),
448 optionsDictFleece: options_dict.as_fl_slice(),
449 collections: &mut collect_opt,
450 collectionCount: 1,
451 };
452 let mut c4err = c4error_init();
453 let repl = unsafe {
454 c4repl_new(
455 db.inner.0.as_ptr(),
456 remote_addr,
457 db_name,
458 repl_params,
459 &mut c4err,
460 )
461 };
462 trace!("repl new result {repl:?}");
463 NonNull::new(repl)
464 .map(|inner| Replicator {
465 inner,
466 free_callback_f,
467 boxed_callback_f,
468 validation,
469 c_callback_on_status_changed: call_on_status_changed,
470 c_callback_on_documents_ended: call_on_documents_ended,
471 mode,
472 })
473 .ok_or_else(|| {
474 unsafe { free_callback_f(boxed_callback_f.as_ptr()) };
475 c4err.into()
476 })
477 }
478 #[inline]
479 pub fn stop(&mut self) {
480 trace!("repl stop {:?}", self.inner.as_ptr());
481 unsafe { c4repl_stop(self.inner.as_ptr()) };
482 }
483 #[inline]
484 pub fn state(&self) -> ReplicatorState {
485 self.status().into()
486 }
487 pub(crate) fn status(&self) -> C4ReplicatorStatus {
488 unsafe { c4repl_getStatus(self.inner.as_ptr()) }
489 }
490}
491
492pub type ReplicatorProgress = C4Progress;
496
497#[derive(Debug)]
499pub enum ReplicatorState {
500 Stopped(Error),
502 Offline,
504 Connecting,
506 Idle,
508 Busy(ReplicatorProgress),
510}
511
512unsafe fn free_boxed_value<T>(p: *mut c_void) {
513 drop(Box::from_raw(p as *mut T));
514}
515
516impl From<C4ReplicatorStatus> for ReplicatorState {
517 fn from(status: C4ReplicatorStatus) -> Self {
518 match status.level {
519 C4ReplicatorActivityLevel::kC4Stopped => ReplicatorState::Stopped(status.error.into()),
520 C4ReplicatorActivityLevel::kC4Offline => ReplicatorState::Offline,
521 C4ReplicatorActivityLevel::kC4Connecting => ReplicatorState::Connecting,
522 C4ReplicatorActivityLevel::kC4Idle => ReplicatorState::Idle,
523 C4ReplicatorActivityLevel::kC4Busy => ReplicatorState::Busy(status.progress),
524 C4ReplicatorActivityLevel::kC4Stopping => ReplicatorState::Busy(status.progress),
525 }
526 }
527}
528
529#[allow(non_upper_case_globals)]
530pub(crate) mod consts {
531 macro_rules! define_const_str {
532 ($($name:ident,)+) => {
533 $(pub(crate) const $name: &'static str = match ($crate::ffi::$name).to_str() {
534 Ok(x) => x,
535 Err(_) => panic!("Invalid utf-8 constant"),
536 };)*
537 };
538 }
539
540 define_const_str!(
541 kC4AuthTypeBasic,
542 kC4AuthTypeSession,
543 kC4ReplicatorAuthPassword,
544 kC4ReplicatorAuthToken,
545 kC4ReplicatorAuthType,
546 kC4ReplicatorAuthUserName,
547 kC4ReplicatorOptionAuthentication,
548 );
549
550 #[cfg(feature = "use-tokio-websocket")]
551 define_const_str!(
552 kC4ReplicatorOptionExtraHeaders,
553 kC4ReplicatorOptionCookies,
554 kC4SocketOptionWSProtocols,
555 );
556}
557
558static WEBSOCKET_IMPL: Once = Once::new();
559
560#[cfg(feature = "use-couchbase-lite-websocket")]
561pub(crate) fn init_builtin_socket_impl() {
562 WEBSOCKET_IMPL.call_once(|| {
563 unsafe { crate::ffi::C4RegisterBuiltInWebSocket() };
564 });
565}
566
567#[cfg(feature = "use-tokio-websocket")]
568pub(crate) fn init_tokio_socket_impl(handle: tokio::runtime::Handle) {
569 WEBSOCKET_IMPL.call_once(|| {
570 tokio_socket::c4socket_init(handle);
571 });
572}