1#[cfg(feature = "use-tokio-websocket")]
2mod tokio_socket;
3
4use crate::{
5 error::{c4error_init, Error, Result},
6 ffi::{
7 c4address_fromURL, c4repl_free, c4repl_getStatus, c4repl_new, c4repl_retry, c4repl_start,
8 c4repl_stop, kC4DefaultCollectionSpec, C4Address, C4CollectionSpec, C4DocumentEnded,
9 C4Progress, C4ReplicationCollection, C4Replicator, C4ReplicatorActivityLevel,
10 C4ReplicatorDocumentsEndedCallback, C4ReplicatorMode, C4ReplicatorParameters,
11 C4ReplicatorStatus, C4ReplicatorStatusChangedCallback, C4ReplicatorValidationFunction,
12 C4RevisionFlags, C4String, FLDict, FLSliceResult,
13 },
14 Database,
15};
16use log::{info, trace};
17use std::{
18 mem::{self, MaybeUninit},
19 os::raw::c_void,
20 ptr,
21 ptr::NonNull,
22 slice, str,
23 sync::Once,
24};
25
26pub struct Replicator {
28 inner: NonNull<C4Replicator>,
29 validation: C4ReplicatorValidationFunction,
30 c_callback_on_status_changed: C4ReplicatorStatusChangedCallback,
31 c_callback_on_documents_ended: C4ReplicatorDocumentsEndedCallback,
32 free_callback_f: unsafe fn(_: *mut c_void),
33 boxed_callback_f: NonNull<c_void>,
34 mode: ReplicatorMode,
35}
36
37pub struct ReplicatorParameters<StateCallback, DocumentsEndedCallback, ValidationF> {
39 validation_cb: ValidationF,
40 state_changed_callback: StateCallback,
41 documents_ended_callback: DocumentsEndedCallback,
42 auth: ReplicatorAuthentication,
43 mode: ReplicatorMode,
44}
45
46#[derive(Clone, Copy)]
47struct ReplicatorMode {
48 push: C4ReplicatorMode,
49 pull: C4ReplicatorMode,
50}
51
52impl<SC, DEC, V> ReplicatorParameters<SC, DEC, V> {
53 #[inline]
54 pub fn with_auth(self, auth: ReplicatorAuthentication) -> Self {
55 Self { auth, ..self }
56 }
57 #[inline]
61 pub fn with_validation_func<ValidationF>(
62 self,
63 validation_cb: ValidationF,
64 ) -> ReplicatorParameters<SC, DEC, ValidationF>
65 where
66 ValidationF: ReplicatorValidationFunction,
67 {
68 ReplicatorParameters {
69 validation_cb,
70 state_changed_callback: self.state_changed_callback,
71 documents_ended_callback: self.documents_ended_callback,
72 auth: self.auth,
73 mode: self.mode,
74 }
75 }
76 #[inline]
78 pub fn with_state_changed_callback<StateCallback>(
79 self,
80 state_changed_callback: StateCallback,
81 ) -> ReplicatorParameters<StateCallback, DEC, V>
82 where
83 StateCallback: ReplicatorStatusChangedCallback,
84 {
85 ReplicatorParameters {
86 validation_cb: self.validation_cb,
87 state_changed_callback,
88 documents_ended_callback: self.documents_ended_callback,
89 auth: self.auth,
90 mode: self.mode,
91 }
92 }
93 #[inline]
95 pub fn with_documents_ended_callback<DocumentsEndedCallback>(
96 self,
97 documents_ended_callback: DocumentsEndedCallback,
98 ) -> ReplicatorParameters<SC, DocumentsEndedCallback, V>
99 where
100 DocumentsEndedCallback: ReplicatorDocumentsEndedCallback,
101 {
102 ReplicatorParameters {
103 validation_cb: self.validation_cb,
104 state_changed_callback: self.state_changed_callback,
105 documents_ended_callback,
106 auth: self.auth,
107 mode: self.mode,
108 }
109 }
110 #[inline]
112 pub fn with_push_mode(self, push: C4ReplicatorMode) -> Self {
113 Self {
114 mode: ReplicatorMode {
115 push,
116 pull: self.mode.pull,
117 },
118 ..self
119 }
120 }
121 #[inline]
123 pub fn with_pull_mode(self, pull: C4ReplicatorMode) -> Self {
124 Self {
125 mode: ReplicatorMode {
126 pull,
127 push: self.mode.push,
128 },
129 ..self
130 }
131 }
132}
133
134impl Default
135 for ReplicatorParameters<
136 fn(ReplicatorState),
137 fn(bool, &mut dyn Iterator<Item = &C4DocumentEnded>),
138 fn(C4String, C4String, C4String, C4RevisionFlags, FLDict) -> bool,
139 >
140{
141 fn default() -> Self {
142 Self {
143 validation_cb: |_coll_name, _doc_id, _rev_id, _rev_flags, _body| true,
144 state_changed_callback: |_repl_state| {},
145 documents_ended_callback: |_pushing, _doc_iter| {},
146 auth: ReplicatorAuthentication::None,
147 mode: ReplicatorMode {
148 push: C4ReplicatorMode::kC4Continuous,
149 pull: C4ReplicatorMode::kC4Continuous,
150 },
151 }
152 }
153}
154
155struct CallbackContext<
156 ValidationCb: ReplicatorValidationFunction,
157 StateCb: ReplicatorStatusChangedCallback,
158 DocumentsEndedCb: ReplicatorDocumentsEndedCallback,
159> {
160 validation_cb: ValidationCb,
161 state_cb: StateCb,
162 docs_ended_cb: DocumentsEndedCb,
163}
164
165#[derive(Clone)]
166pub enum ReplicatorAuthentication {
167 SessionToken(Box<str>),
168 Basic {
169 username: Box<str>,
170 password: Box<str>,
171 },
172 None,
173}
174
175unsafe impl Send for Replicator {}
178
179impl Drop for Replicator {
180 #[inline]
181 fn drop(&mut self) {
182 trace!("repl drop {:?}", self.inner.as_ptr());
183 unsafe {
184 c4repl_free(self.inner.as_ptr());
185 (self.free_callback_f)(self.boxed_callback_f.as_ptr());
186 }
187 }
188}
189
190macro_rules! define_trait_alias {
191 ($alias:ident, $($tt:tt)+) => {
192 pub trait $alias: $($tt)+ {}
193 impl<T> $alias for T where T: $($tt)+ {}
194 };
195}
196
197define_trait_alias!(ReplicatorValidationFunction, FnMut(C4CollectionSpec, C4String, C4String, C4RevisionFlags, FLDict) -> bool + Send + 'static);
198define_trait_alias!(
199 ReplicatorStatusChangedCallback,
200 FnMut(ReplicatorState) + Send + 'static
201);
202define_trait_alias!(
203 ReplicatorDocumentsEndedCallback,
204 FnMut(bool, &mut dyn Iterator<Item = &C4DocumentEnded>) + Send + 'static
205);
206
207impl Replicator {
208 pub fn new<StateCallback, DocumentsEndedCallback, ValidationF>(
212 db: &Database,
213 url: &str,
214 params: ReplicatorParameters<StateCallback, DocumentsEndedCallback, ValidationF>,
215 ) -> Result<Self>
216 where
217 ValidationF: ReplicatorValidationFunction,
218 StateCallback: ReplicatorStatusChangedCallback,
219 DocumentsEndedCallback: ReplicatorDocumentsEndedCallback,
220 {
221 unsafe extern "C" fn call_validation<F, F2, F3>(
222 coll_spec: C4CollectionSpec,
223 doc_id: C4String,
224 rev_id: C4String,
225 flags: C4RevisionFlags,
226 body: FLDict,
227 ctx: *mut c_void,
228 ) -> bool
229 where
230 F: ReplicatorValidationFunction,
231 F2: ReplicatorStatusChangedCallback,
232 F3: ReplicatorDocumentsEndedCallback,
233 {
234 let ctx = ctx as *mut CallbackContext<F, F2, F3>;
235 assert!(
236 !ctx.is_null(),
237 "Replicator::call_validation: Internal error - null function pointer"
238 );
239 ((*ctx).validation_cb)(coll_spec, doc_id, rev_id, flags, body)
240 }
241
242 unsafe extern "C" fn call_on_status_changed<F1, F, F3>(
243 c4_repl: *mut C4Replicator,
244 status: C4ReplicatorStatus,
245 ctx: *mut c_void,
246 ) where
247 F1: ReplicatorValidationFunction,
248 F: ReplicatorStatusChangedCallback,
249 F3: ReplicatorDocumentsEndedCallback,
250 {
251 info!("on_status_changed: repl {c4_repl:?}, status {status:?}");
252
253 let ctx = ctx as *mut CallbackContext<F1, F, F3>;
254 assert!(
255 !ctx.is_null(),
256 "Replicator::call_on_status_changed: Internal error - null function pointer"
257 );
258 ((*ctx).state_cb)(ReplicatorState::from(status));
259 }
260
261 unsafe extern "C" fn call_on_documents_ended<F1, F2, F>(
262 c4_repl: *mut C4Replicator,
263 pushing: bool,
264 num_docs: usize,
265 docs: *mut *const C4DocumentEnded,
266 ctx: *mut ::std::os::raw::c_void,
267 ) where
268 F1: ReplicatorValidationFunction,
269 F2: ReplicatorStatusChangedCallback,
270 F: ReplicatorDocumentsEndedCallback,
271 {
272 trace!("on_documents_ended: repl {c4_repl:?} pushing {pushing}, num_docs {num_docs}");
273
274 let ctx = ctx as *mut CallbackContext<F1, F2, F>;
275 assert!(
276 !ctx.is_null(),
277 "Replicator::call_on_documents_ended: Internal error - null function pointer"
278 );
279 let docs: &[*const C4DocumentEnded] = slice::from_raw_parts(docs, num_docs);
280 let mut it = docs.iter().map(|x| &**x);
281 ((*ctx).docs_ended_cb)(pushing, &mut it);
282 }
283
284 let ctx = Box::new(CallbackContext {
285 validation_cb: params.validation_cb,
286 state_cb: params.state_changed_callback,
287 docs_ended_cb: params.documents_ended_callback,
288 });
289 let ctx_p = Box::into_raw(ctx);
290 Replicator::do_new(
291 db,
292 url,
293 ¶ms.auth,
294 free_boxed_value::<CallbackContext<ValidationF, StateCallback, DocumentsEndedCallback>>,
295 unsafe { NonNull::new_unchecked(ctx_p as *mut c_void) },
296 Some(call_validation::<ValidationF, StateCallback, DocumentsEndedCallback>),
297 Some(call_on_status_changed::<ValidationF, StateCallback, DocumentsEndedCallback>),
298 Some(call_on_documents_ended::<ValidationF, StateCallback, DocumentsEndedCallback>),
299 params.mode,
300 )
301 }
302
303 pub fn start(&mut self, reset: bool) -> Result<()> {
306 unsafe { c4repl_start(self.inner.as_ptr(), reset) };
307 let status: ReplicatorState = self.status().into();
308 if let ReplicatorState::Stopped(err) = status {
309 Err(err)
310 } else {
311 Ok(())
312 }
313 }
314
315 pub fn restart(
321 self,
322 db: &Database,
323 url: &str,
324 auth: &ReplicatorAuthentication,
325 reset: bool,
326 ) -> Result<Self> {
327 let Replicator {
328 inner: prev_inner,
329 free_callback_f,
330 boxed_callback_f,
331 validation,
332 c_callback_on_status_changed,
333 c_callback_on_documents_ended,
334 mode,
335 } = self;
336 mem::forget(self);
337 unsafe {
338 c4repl_stop(prev_inner.as_ptr());
339 c4repl_free(prev_inner.as_ptr());
340 }
341 let mut repl = Replicator::do_new(
342 db,
343 url,
344 auth,
345 free_callback_f,
346 boxed_callback_f,
347 validation,
348 c_callback_on_status_changed,
349 c_callback_on_documents_ended,
350 mode,
351 )?;
352 repl.start(reset)?;
353 Ok(repl)
354 }
355
356 pub fn retry(&mut self) -> Result<bool> {
359 trace!("repl retry {:?}", self.inner.as_ptr());
360 let mut c4err = c4error_init();
361 let will_reconnect = unsafe { c4repl_retry(self.inner.as_ptr(), &mut c4err) };
362 if c4err.code == 0 {
363 Ok(will_reconnect)
364 } else {
365 Err(c4err.into())
366 }
367 }
368
369 fn do_new(
370 db: &Database,
371 url: &str,
372 auth: &ReplicatorAuthentication,
373 free_callback_f: unsafe fn(_: *mut c_void),
374 boxed_callback_f: NonNull<c_void>,
375 validation: C4ReplicatorValidationFunction,
376 call_on_status_changed: C4ReplicatorStatusChangedCallback,
377 call_on_documents_ended: C4ReplicatorDocumentsEndedCallback,
378 mode: ReplicatorMode,
379 ) -> Result<Self> {
380 use consts::*;
381
382 let mut remote_addr = MaybeUninit::<C4Address>::uninit();
383 let mut db_name = C4String::default();
384 if !unsafe { c4address_fromURL(url.into(), remote_addr.as_mut_ptr(), &mut db_name) } {
385 return Err(Error::LogicError(format!("Can not parse URL {url}").into()));
386 }
387 let remote_addr = unsafe { remote_addr.assume_init() };
388
389 let options_dict: FLSliceResult = match auth {
390 ReplicatorAuthentication::SessionToken(token) => serde_fleece::fleece!({
391 kC4ReplicatorOptionAuthentication: {
392 kC4ReplicatorAuthType: kC4AuthTypeSession,
393 kC4ReplicatorAuthToken: *token,
394 }
395 }),
396 ReplicatorAuthentication::Basic { username, password } => {
397 serde_fleece::fleece!({
398 kC4ReplicatorOptionAuthentication: {
399 kC4ReplicatorAuthType: kC4AuthTypeBasic,
400 kC4ReplicatorAuthUserName: *username,
401 kC4ReplicatorAuthPassword: *password
402 }
403 })
404 }
405 ReplicatorAuthentication::None => serde_fleece::fleece!({}),
406 }?;
407
408 let mut collect_opt = C4ReplicationCollection {
409 collection: kC4DefaultCollectionSpec,
410 push: mode.push,
411 pull: mode.pull,
412 optionsDictFleece: Default::default(),
413 pushFilter: None,
414 pullFilter: validation,
415 callbackContext: boxed_callback_f.as_ptr() as *mut c_void,
416 };
417
418 let repl_params = C4ReplicatorParameters {
419 onStatusChanged: call_on_status_changed,
420 onDocumentsEnded: call_on_documents_ended,
421 onBlobProgress: None,
422 propertyEncryptor: ptr::null_mut(),
423 propertyDecryptor: ptr::null_mut(),
424 callbackContext: boxed_callback_f.as_ptr() as *mut c_void,
425 socketFactory: ptr::null_mut(),
426 optionsDictFleece: options_dict.as_fl_slice(),
427 collections: &mut collect_opt,
428 collectionCount: 1,
429 };
430 let mut c4err = c4error_init();
431 let repl = unsafe {
432 c4repl_new(
433 db.inner.0.as_ptr(),
434 remote_addr,
435 db_name,
436 repl_params,
437 &mut c4err,
438 )
439 };
440 trace!("repl new result {repl:?}");
441 NonNull::new(repl)
442 .map(|inner| Replicator {
443 inner,
444 free_callback_f,
445 boxed_callback_f,
446 validation,
447 c_callback_on_status_changed: call_on_status_changed,
448 c_callback_on_documents_ended: call_on_documents_ended,
449 mode,
450 })
451 .ok_or_else(|| {
452 unsafe { free_callback_f(boxed_callback_f.as_ptr()) };
453 c4err.into()
454 })
455 }
456 #[inline]
457 pub fn stop(&mut self) {
458 trace!("repl stop {:?}", self.inner.as_ptr());
459 unsafe { c4repl_stop(self.inner.as_ptr()) };
460 }
461 #[inline]
462 pub fn state(&self) -> ReplicatorState {
463 self.status().into()
464 }
465 pub(crate) fn status(&self) -> C4ReplicatorStatus {
466 unsafe { c4repl_getStatus(self.inner.as_ptr()) }
467 }
468}
469
470pub type ReplicatorProgress = C4Progress;
474
475#[derive(Debug)]
477pub enum ReplicatorState {
478 Stopped(Error),
480 Offline,
482 Connecting,
484 Idle,
486 Busy(ReplicatorProgress),
488}
489
490unsafe fn free_boxed_value<T>(p: *mut c_void) {
491 drop(Box::from_raw(p as *mut T));
492}
493
494impl From<C4ReplicatorStatus> for ReplicatorState {
495 #[inline]
496 fn from(status: C4ReplicatorStatus) -> Self {
497 match status.level {
498 C4ReplicatorActivityLevel::kC4Stopped => ReplicatorState::Stopped(status.error.into()),
499 C4ReplicatorActivityLevel::kC4Offline => ReplicatorState::Offline,
500 C4ReplicatorActivityLevel::kC4Connecting => ReplicatorState::Connecting,
501 C4ReplicatorActivityLevel::kC4Idle => ReplicatorState::Idle,
502 C4ReplicatorActivityLevel::kC4Busy => ReplicatorState::Busy(status.progress),
503 C4ReplicatorActivityLevel::kC4Stopping => ReplicatorState::Busy(status.progress),
504 }
505 }
506}
507
508#[allow(non_upper_case_globals)]
509pub(crate) mod consts {
510 macro_rules! define_const_str {
511 ($($name:ident,)+) => {
512 $(pub(crate) const $name: &'static str = match ($crate::ffi::$name).to_str() {
513 Ok(x) => x,
514 Err(_) => panic!("Invalid utf-8 constant"),
515 };)*
516 };
517 }
518
519 define_const_str!(
520 kC4AuthTypeBasic,
521 kC4AuthTypeSession,
522 kC4ReplicatorAuthPassword,
523 kC4ReplicatorAuthToken,
524 kC4ReplicatorAuthType,
525 kC4ReplicatorAuthUserName,
526 kC4ReplicatorOptionAuthentication,
527 );
528
529 #[cfg(feature = "use-tokio-websocket")]
530 define_const_str!(
531 kC4ReplicatorOptionExtraHeaders,
532 kC4ReplicatorOptionCookies,
533 kC4SocketOptionWSProtocols,
534 );
535}
536
537static WEBSOCKET_IMPL: Once = Once::new();
538
539#[cfg(feature = "use-couchbase-lite-websocket")]
540pub(crate) fn init_builtin_socket_impl() {
541 WEBSOCKET_IMPL.call_once(|| {
542 unsafe { crate::ffi::C4RegisterBuiltInWebSocket() };
543 });
544}
545
546#[cfg(feature = "use-tokio-websocket")]
547pub(crate) fn init_tokio_socket_impl(handle: tokio::runtime::Handle) {
548 WEBSOCKET_IMPL.call_once(|| {
549 tokio_socket::c4socket_init(handle);
550 });
551}