1use crate::{
2 doc_enumerator::{DocEnumerator, DocEnumeratorFlags},
3 document::{C4DocumentOwner, Document},
4 error::{c4error_init, Error, Result},
5 ffi::{
6 c4db_createIndex, c4db_getDoc, c4db_getDocumentCount, c4db_getIndexesInfo, c4db_getName,
7 c4db_getSharedFleeceEncoder, c4db_getUUIDs, c4db_openNamed, c4db_release, C4Database,
8 C4DatabaseConfig2, C4DatabaseFlags, C4DocContentLevel, C4EncryptionAlgorithm,
9 C4EncryptionKey, C4ErrorCode, C4ErrorDomain, C4IndexOptions, C4IndexType, C4UUID,
10 },
11 index::{DbIndexesListIterator, IndexInfo, IndexOptions, IndexType},
12 log_reroute::c4log_to_log_init,
13 observer::{DatabaseObserver, ObserverdChangesIter},
14 query::Query,
15 transaction::Transaction,
16 QueryLanguage,
17};
18use bitflags::bitflags;
19use fallible_streaming_iterator::FallibleStreamingIterator;
20use log::{debug, error, trace};
21use serde_fleece::FlEncoderSession;
22use std::{
23 collections::HashSet,
24 ffi::CString,
25 marker::PhantomData,
26 mem::MaybeUninit,
27 path::Path,
28 ptr::{self, NonNull},
29 sync::{Arc, Mutex, Once},
30};
31use uuid::Uuid;
32
33pub struct DatabaseConfig<'a> {
35 inner: Result<C4DatabaseConfig2>,
36 phantom: PhantomData<&'a Path>,
37}
38
39bitflags! {
40 #[repr(transparent)]
41 #[derive(Debug)]
42 pub struct DatabaseFlags: u32 {
43 const CREATE = C4DatabaseFlags::kC4DB_Create.0;
45 const READ_ONLY = C4DatabaseFlags::kC4DB_ReadOnly.0;
47 const NO_UPGRADE = C4DatabaseFlags::kC4DB_NoUpgrade.0;
49 const NON_OBSERVABLE = C4DatabaseFlags::kC4DB_NonObservable.0;
51 }
52}
53
54impl<'a> DatabaseConfig<'a> {
55 pub fn new(parent_directory: &'a Path, flags: DatabaseFlags) -> Self {
56 let os_path_utf8 = match parent_directory.to_str() {
57 Some(x) => x,
58 None => {
59 return Self {
60 inner: Err(Error::InvalidUtf8),
61 phantom: PhantomData,
62 }
63 }
64 };
65 Self {
66 inner: Ok(C4DatabaseConfig2 {
67 parentDirectory: os_path_utf8.into(),
68 flags: C4DatabaseFlags(flags.bits()),
69 encryptionKey: C4EncryptionKey {
70 algorithm: C4EncryptionAlgorithm::kC4EncryptionNone,
71 bytes: [0; 32],
72 },
73 }),
74 phantom: PhantomData,
75 }
76 }
77}
78
79pub struct Database {
81 pub(crate) inner: DbInner,
82 pub(crate) db_events: Arc<Mutex<HashSet<usize>>>,
83 pub(crate) db_observers: Vec<DatabaseObserver>,
84}
85
86pub(crate) struct DbInner(pub NonNull<C4Database>);
87unsafe impl Send for DbInner {}
91
92impl Drop for DbInner {
93 #[inline]
94 fn drop(&mut self) {
95 trace!("release db {:?}", self.0.as_ptr());
96 unsafe { c4db_release(self.0.as_ptr()) };
97 }
98}
99
100impl Drop for Database {
101 #[inline]
102 fn drop(&mut self) {
103 self.db_observers.clear();
104 }
105}
106
107impl Database {
108 pub fn open_named(name: &str, cfg: DatabaseConfig) -> Result<Self> {
109 DB_LOG_HANDLER.call_once(|| {
110 debug!("init couchbase log to rust log rerouting");
111 c4log_to_log_init();
112 });
113 let cfg = cfg.inner?;
114 let mut error = c4error_init();
115 let db_ptr = unsafe { c4db_openNamed(name.into(), &cfg, &mut error) };
116 NonNull::new(db_ptr)
117 .map(|inner| Database {
118 inner: DbInner(inner),
119 db_events: Arc::new(Mutex::new(HashSet::new())),
120 db_observers: Vec::new(),
121 })
122 .ok_or_else(|| error.into())
123 }
124 pub fn open_with_flags(path: &Path, flags: DatabaseFlags) -> Result<Self> {
125 let parent_path = path.parent().ok_or_else(|| {
126 Error::LogicError(format!("path {path:?} has no parent diretory").into())
127 })?;
128 let cfg = DatabaseConfig::new(parent_path, flags);
129 let db_name = path
130 .file_name()
131 .ok_or_else(|| Error::LogicError(format!("path {path:?} has no last part").into()))?
132 .to_str()
133 .ok_or(Error::InvalidUtf8)?
134 .strip_suffix(".cblite2")
135 .ok_or_else(|| {
136 Error::LogicError(
137 format!("path {path:?} should have last part with .cblite2 suffix").into(),
138 )
139 })?;
140
141 Database::open_named(db_name, cfg)
142 }
143 #[inline]
147 pub fn transaction<'a>(&'a mut self) -> Result<Transaction<'a>> {
148 Transaction::new(self)
149 }
150 #[inline]
152 pub fn document_count(&self) -> u64 {
153 unsafe { c4db_getDocumentCount(self.inner.0.as_ptr()) }
154 }
155 #[inline]
157 pub fn get_existing(&self, doc_id: &str) -> Result<Document> {
158 self.internal_get(doc_id, true)
159 .map(|x| Document::new_internal(x, doc_id))
160 }
161 #[inline]
165 pub fn query<'a>(&'a self, query_json: &str) -> Result<Query<'a>> {
166 Query::new(self, QueryLanguage::kC4JSONQuery, query_json)
167 }
168 #[inline]
170 pub fn n1ql_query<'a>(&'a self, query: &str) -> Result<Query<'a>> {
171 Query::new(self, QueryLanguage::kC4N1QLQuery, query)
172 }
173 #[inline]
175 pub fn enumerate_all_docs<'a>(
176 &'a self,
177 flags: DocEnumeratorFlags,
178 ) -> Result<DocEnumerator<'a>> {
179 DocEnumerator::enumerate_all_docs(self, flags)
180 }
181
182 pub fn register_observer<F>(&mut self, mut callback_f: F) -> Result<()>
186 where
187 F: FnMut() + Send + 'static,
188 {
189 let db_events = self.db_events.clone();
190 let obs = DatabaseObserver::new(self, move |obs| {
191 {
192 match db_events.lock() {
193 Ok(mut db_events) => {
194 db_events.insert(obs as usize);
195 }
196 Err(err) => {
197 error!(
198 "register_observer::DatabaseObserver::lambda db_events lock failed: {}",
199 err
200 );
201 }
202 }
203 }
204 callback_f();
205 })?;
206 self.db_observers.push(obs);
207 Ok(())
208 }
209
210 #[inline]
212 pub fn clear_observers(&mut self) {
213 self.db_observers.clear();
214 }
215
216 #[inline]
218 pub fn observed_changes<'a>(&'a mut self) -> ObserverdChangesIter<'a> {
219 ObserverdChangesIter {
220 db: self,
221 obs_it: None,
222 }
223 }
224
225 #[cfg(feature = "use-couchbase-lite-websocket")]
228 #[inline]
229 pub fn init_socket_impl() {
230 crate::replicator::init_builtin_socket_impl();
231 }
232
233 #[cfg(feature = "use-tokio-websocket")]
236 #[inline]
237 pub fn init_socket_impl(handle: tokio::runtime::Handle) {
238 crate::replicator::init_tokio_socket_impl(handle);
239 }
240
241 pub fn shared_encoder_session(&mut self) -> Result<FlEncoderSession> {
244 let enc = unsafe { c4db_getSharedFleeceEncoder(self.inner.0.as_ptr()) };
245 NonNull::new(enc)
246 .ok_or_else(|| {
247 Error::LogicError("c4db_getSharedFleeceEncoder return null.into()".into())
248 })
249 .map(FlEncoderSession::new)
250 }
251
252 pub fn get_indexes(
254 &self,
255 ) -> Result<impl FallibleStreamingIterator<Item = IndexInfo, Error = Error>> {
256 let mut c4err = c4error_init();
257 let enc_data = unsafe { c4db_getIndexesInfo(self.inner.0.as_ptr(), &mut c4err) };
258 if enc_data.buf.is_null() {
259 return Err(c4err.into());
260 }
261
262 let indexes_list = DbIndexesListIterator::new(enc_data)?;
263 Ok(indexes_list)
264 }
265
266 pub fn create_index(
273 &mut self,
274 index_name: &str,
275 expression_json: &str,
276 index_type: IndexType,
277 index_options: Option<IndexOptions>,
278 ) -> Result<()> {
279 use IndexType::*;
280 let index_type = match index_type {
281 ValueIndex => C4IndexType::kC4ValueIndex,
282 FullTextIndex => C4IndexType::kC4FullTextIndex,
283 ArrayIndex => C4IndexType::kC4ArrayIndex,
284 PredictiveIndex => C4IndexType::kC4PredictiveIndex,
285 };
286 let mut c4err = c4error_init();
287 let result = if let Some(index_options) = index_options {
288 let language = CString::new(index_options.language)?;
289 let stop_words: Option<CString> = if let Some(stop_words) = index_options.stop_words {
290 let mut list = String::with_capacity(stop_words.len() * 5);
291 for word in stop_words {
292 if !list.is_empty() {
293 list.push(' ');
294 }
295 list.push_str(word);
296 }
297 Some(CString::new(list)?)
298 } else {
299 None
300 };
301
302 let opts = C4IndexOptions {
303 language: language.as_ptr(),
304 disableStemming: index_options.disable_stemming,
305 ignoreDiacritics: index_options.ignore_diacritics,
306 stopWords: stop_words.map_or(ptr::null(), |x| x.as_ptr()),
307 };
308 unsafe {
309 c4db_createIndex(
310 self.inner.0.as_ptr(),
311 index_name.into(),
312 expression_json.into(),
313 index_type,
314 &opts,
315 &mut c4err,
316 )
317 }
318 } else {
319 unsafe {
320 c4db_createIndex(
321 self.inner.0.as_ptr(),
322 index_name.into(),
323 expression_json.into(),
324 index_type,
325 ptr::null(),
326 &mut c4err,
327 )
328 }
329 };
330 if result {
331 Ok(())
332 } else {
333 Err(c4err.into())
334 }
335 }
336
337 #[inline]
340 pub fn name(&self) -> Result<&str> {
341 unsafe { c4db_getName(self.inner.0.as_ptr()) }
342 .try_into()
343 .map_err(|_| Error::InvalidUtf8)
344 }
345
346 #[inline]
348 pub fn uuids(&self) -> Result<(Uuid, Uuid)> {
349 let mut public = MaybeUninit::<C4UUID>::uninit();
350 let mut private = MaybeUninit::<C4UUID>::uninit();
351 let mut c4err = c4error_init();
352 if unsafe {
353 c4db_getUUIDs(
354 self.inner.0.as_ptr(),
355 public.as_mut_ptr(),
356 private.as_mut_ptr(),
357 &mut c4err,
358 )
359 } {
360 let public = Uuid::from_bytes(unsafe { public.assume_init() }.bytes);
361 let private = Uuid::from_bytes(unsafe { private.assume_init() }.bytes);
362 Ok((public, private))
363 } else {
364 Err(c4err.into())
365 }
366 }
367
368 pub(crate) fn do_internal_get(
369 &self,
370 doc_id: &str,
371 must_exists: bool,
372 content_level: C4DocContentLevel,
373 ) -> Result<C4DocumentOwner> {
374 let mut c4err = c4error_init();
375 let c4doc = unsafe {
376 c4db_getDoc(
377 self.inner.0.as_ptr(),
378 doc_id.as_bytes().into(),
379 must_exists,
380 content_level,
381 &mut c4err,
382 )
383 };
384 NonNull::new(c4doc)
385 .ok_or_else(|| c4err.into())
386 .map(C4DocumentOwner)
387 }
388
389 pub(crate) fn do_internal_get_opt(
390 &self,
391 doc_id: &str,
392 must_exists: bool,
393 content_level: C4DocContentLevel,
394 ) -> Result<Option<C4DocumentOwner>> {
395 match self.do_internal_get(doc_id, must_exists, content_level) {
396 Ok(x) => Ok(Some(x)),
397 Err(Error::C4Error(err))
398 if err.domain == C4ErrorDomain::LiteCoreDomain
399 && err.code == C4ErrorCode::kC4ErrorNotFound.0 =>
400 {
401 Ok(None)
402 }
403 Err(err) => Err(err),
404 }
405 }
406
407 pub(crate) fn internal_get(&self, doc_id: &str, must_exists: bool) -> Result<C4DocumentOwner> {
408 self.do_internal_get(doc_id, must_exists, C4DocContentLevel::kDocGetCurrentRev)
409 }
410}
411
412static DB_LOG_HANDLER: Once = Once::new();