Skip to main content

couchbase_lite/
database.rs

1use crate::{
2    doc_enumerator::{DocEnumerator, DocEnumeratorFlags},
3    document::{C4DocumentOwner, Document},
4    error::{c4error_init, Error, Result},
5    ffi::{
6        c4db_createIndex, c4db_getDoc, c4db_getDocumentCount, c4db_getIndexesInfo, c4db_getName,
7        c4db_getSharedFleeceEncoder, c4db_getUUIDs, c4db_openNamed, c4db_release, C4Database,
8        C4DatabaseConfig2, C4DatabaseFlags, C4DocContentLevel, C4EncryptionAlgorithm,
9        C4EncryptionKey, C4ErrorCode, C4ErrorDomain, C4IndexOptions, C4IndexType, C4UUID,
10    },
11    index::{DbIndexesListIterator, IndexInfo, IndexOptions, IndexType},
12    log_reroute::c4log_to_log_init,
13    observer::{DatabaseObserver, ObserverdChangesIter},
14    query::Query,
15    transaction::Transaction,
16    QueryLanguage,
17};
18use bitflags::bitflags;
19use fallible_streaming_iterator::FallibleStreamingIterator;
20use log::{debug, error, trace};
21use serde_fleece::FlEncoderSession;
22use std::{
23    collections::HashSet,
24    ffi::CString,
25    marker::PhantomData,
26    mem::MaybeUninit,
27    path::Path,
28    ptr::{self, NonNull},
29    sync::{Arc, Mutex, Once},
30};
31use uuid::Uuid;
32
33/// Database configuration, used during open
34pub struct DatabaseConfig<'a> {
35    inner: Result<C4DatabaseConfig2>,
36    phantom: PhantomData<&'a Path>,
37}
38
39bitflags! {
40    #[repr(transparent)]
41    #[derive(Debug)]
42    pub struct DatabaseFlags: u32 {
43        /// Create the file if it doesn't exist
44        const CREATE = C4DatabaseFlags::kC4DB_Create.0;
45        /// Open file read-only
46        const READ_ONLY = C4DatabaseFlags::kC4DB_ReadOnly.0;
47        /// Disable upgrading an older-version database
48        const NO_UPGRADE = C4DatabaseFlags::kC4DB_NoUpgrade.0;
49        /// Disable database/collection observers, for slightly faster writes
50        const NON_OBSERVABLE = C4DatabaseFlags::kC4DB_NonObservable.0;
51    }
52}
53
54impl<'a> DatabaseConfig<'a> {
55    pub fn new(parent_directory: &'a Path, flags: DatabaseFlags) -> Self {
56        let os_path_utf8 = match parent_directory.to_str() {
57            Some(x) => x,
58            None => {
59                return Self {
60                    inner: Err(Error::InvalidUtf8),
61                    phantom: PhantomData,
62                }
63            }
64        };
65        Self {
66            inner: Ok(C4DatabaseConfig2 {
67                parentDirectory: os_path_utf8.into(),
68                flags: C4DatabaseFlags(flags.bits()),
69                encryptionKey: C4EncryptionKey {
70                    algorithm: C4EncryptionAlgorithm::kC4EncryptionNone,
71                    bytes: [0; 32],
72                },
73            }),
74            phantom: PhantomData,
75        }
76    }
77}
78
79/// A connection to a couchbase-lite database.
80pub struct Database {
81    pub(crate) inner: DbInner,
82    pub(crate) db_events: Arc<Mutex<HashSet<usize>>>,
83    pub(crate) db_observers: Vec<DatabaseObserver>,
84}
85
86pub(crate) struct DbInner(pub NonNull<C4Database>);
87/// According to
88/// https://github.com/couchbase/couchbase-lite-core/wiki/Thread-Safety
89/// it is possible to call from any thread, but not concurrently
90unsafe impl Send for DbInner {}
91
92impl Drop for DbInner {
93    #[inline]
94    fn drop(&mut self) {
95        trace!("release db {:?}", self.0.as_ptr());
96        unsafe { c4db_release(self.0.as_ptr()) };
97    }
98}
99
100impl Drop for Database {
101    #[inline]
102    fn drop(&mut self) {
103        self.db_observers.clear();
104    }
105}
106
107impl Database {
108    pub fn open_named(name: &str, cfg: DatabaseConfig) -> Result<Self> {
109        DB_LOG_HANDLER.call_once(|| {
110            debug!("init couchbase log to rust log rerouting");
111            c4log_to_log_init();
112        });
113        let cfg = cfg.inner?;
114        let mut error = c4error_init();
115        let db_ptr = unsafe { c4db_openNamed(name.into(), &cfg, &mut error) };
116        NonNull::new(db_ptr)
117            .map(|inner| Database {
118                inner: DbInner(inner),
119                db_events: Arc::new(Mutex::new(HashSet::new())),
120                db_observers: Vec::new(),
121            })
122            .ok_or_else(|| error.into())
123    }
124    pub fn open_with_flags(path: &Path, flags: DatabaseFlags) -> Result<Self> {
125        let parent_path = path.parent().ok_or_else(|| {
126            Error::LogicError(format!("path {path:?} has no parent diretory").into())
127        })?;
128        let cfg = DatabaseConfig::new(parent_path, flags);
129        let db_name = path
130            .file_name()
131            .ok_or_else(|| Error::LogicError(format!("path {path:?} has no last part").into()))?
132            .to_str()
133            .ok_or(Error::InvalidUtf8)?
134            .strip_suffix(".cblite2")
135            .ok_or_else(|| {
136                Error::LogicError(
137                    format!("path {path:?} should have last part with .cblite2 suffix").into(),
138                )
139            })?;
140
141        Database::open_named(db_name, cfg)
142    }
143    /// Begin a new transaction, the transaction defaults to rolling back
144    /// when it is dropped. If you want the transaction to commit,
145    /// you must call `Transaction::commit`
146    #[inline]
147    pub fn transaction<'a>(&'a mut self) -> Result<Transaction<'a>> {
148        Transaction::new(self)
149    }
150    /// Returns the number of (undeleted) documents in the database
151    #[inline]
152    pub fn document_count(&self) -> u64 {
153        unsafe { c4db_getDocumentCount(self.inner.0.as_ptr()) }
154    }
155    /// Return existing document from database
156    #[inline]
157    pub fn get_existing(&self, doc_id: &str) -> Result<Document> {
158        self.internal_get(doc_id, true)
159            .map(|x| Document::new_internal(x, doc_id))
160    }
161    /// Compiles a query from an expression given as JSON.
162    /// The expression is a predicate that describes which documents should be returned.
163    /// A separate, optional sort expression describes the ordering of the results.
164    #[inline]
165    pub fn query<'a>(&'a self, query_json: &str) -> Result<Query<'a>> {
166        Query::new(self, QueryLanguage::kC4JSONQuery, query_json)
167    }
168    /// Compiles a query from an expression given as N1QL.
169    #[inline]
170    pub fn n1ql_query<'a>(&'a self, query: &str) -> Result<Query<'a>> {
171        Query::new(self, QueryLanguage::kC4N1QLQuery, query)
172    }
173    /// Creates an enumerator ordered by docID.
174    #[inline]
175    pub fn enumerate_all_docs<'a>(
176        &'a self,
177        flags: DocEnumeratorFlags,
178    ) -> Result<DocEnumerator<'a>> {
179        DocEnumerator::enumerate_all_docs(self, flags)
180    }
181
182    /// Register a database observer, with a callback that will be invoked after the database
183    /// changes. The callback will be called _once_, after the first change. After that it won't
184    /// be called again until all of the changes have been read by calling `Database::observed_changes`.
185    pub fn register_observer<F>(&mut self, mut callback_f: F) -> Result<()>
186    where
187        F: FnMut() + Send + 'static,
188    {
189        let db_events = self.db_events.clone();
190        let obs = DatabaseObserver::new(self, move |obs| {
191            {
192                match db_events.lock() {
193                    Ok(mut db_events) => {
194                        db_events.insert(obs as usize);
195                    }
196                    Err(err) => {
197                        error!(
198                            "register_observer::DatabaseObserver::lambda db_events lock failed: {}",
199                            err
200                        );
201                    }
202                }
203            }
204            callback_f();
205        })?;
206        self.db_observers.push(obs);
207        Ok(())
208    }
209
210    /// Remove all database observers
211    #[inline]
212    pub fn clear_observers(&mut self) {
213        self.db_observers.clear();
214    }
215
216    /// Get observed changes for this database
217    #[inline]
218    pub fn observed_changes<'a>(&'a mut self) -> ObserverdChangesIter<'a> {
219        ObserverdChangesIter {
220            db: self,
221            obs_it: None,
222        }
223    }
224
225    /// Intialize socket implementation for replication
226    /// (builtin couchbase-lite websocket library)
227    #[cfg(feature = "use-couchbase-lite-websocket")]
228    #[inline]
229    pub fn init_socket_impl() {
230        crate::replicator::init_builtin_socket_impl();
231    }
232
233    /// Intialize socket implementation for replication
234    /// (builtin couchbase-lite websocket library)
235    #[cfg(feature = "use-tokio-websocket")]
236    #[inline]
237    pub fn init_socket_impl(handle: tokio::runtime::Handle) {
238        crate::replicator::init_tokio_socket_impl(handle);
239    }
240
241    /// Get shared "fleece" encoder, `&mut self` to make possible
242    /// exists only one session
243    pub fn shared_encoder_session(&mut self) -> Result<FlEncoderSession> {
244        let enc = unsafe { c4db_getSharedFleeceEncoder(self.inner.0.as_ptr()) };
245        NonNull::new(enc)
246            .ok_or_else(|| {
247                Error::LogicError("c4db_getSharedFleeceEncoder return null.into()".into())
248            })
249            .map(FlEncoderSession::new)
250    }
251
252    /// Returns the names of all indexes in the database
253    pub fn get_indexes(
254        &self,
255    ) -> Result<impl FallibleStreamingIterator<Item = IndexInfo, Error = Error>> {
256        let mut c4err = c4error_init();
257        let enc_data = unsafe { c4db_getIndexesInfo(self.inner.0.as_ptr(), &mut c4err) };
258        if enc_data.buf.is_null() {
259            return Err(c4err.into());
260        }
261
262        let indexes_list = DbIndexesListIterator::new(enc_data)?;
263        Ok(indexes_list)
264    }
265
266    /// Creates a database index, of the values of specific expressions across
267    /// all documents. The name is used to identify the index for later updating
268    /// or deletion; if an index with the same name already exists, it will be
269    /// replaced unless it has the exact same expressions.
270    /// Note: If some documents are missing the values to be indexed,
271    /// those documents will just be omitted from the index. It's not an error.
272    pub fn create_index(
273        &mut self,
274        index_name: &str,
275        expression_json: &str,
276        index_type: IndexType,
277        index_options: Option<IndexOptions>,
278    ) -> Result<()> {
279        use IndexType::*;
280        let index_type = match index_type {
281            ValueIndex => C4IndexType::kC4ValueIndex,
282            FullTextIndex => C4IndexType::kC4FullTextIndex,
283            ArrayIndex => C4IndexType::kC4ArrayIndex,
284            PredictiveIndex => C4IndexType::kC4PredictiveIndex,
285        };
286        let mut c4err = c4error_init();
287        let result = if let Some(index_options) = index_options {
288            let language = CString::new(index_options.language)?;
289            let stop_words: Option<CString> = if let Some(stop_words) = index_options.stop_words {
290                let mut list = String::with_capacity(stop_words.len() * 5);
291                for word in stop_words {
292                    if !list.is_empty() {
293                        list.push(' ');
294                    }
295                    list.push_str(word);
296                }
297                Some(CString::new(list)?)
298            } else {
299                None
300            };
301
302            let opts = C4IndexOptions {
303                language: language.as_ptr(),
304                disableStemming: index_options.disable_stemming,
305                ignoreDiacritics: index_options.ignore_diacritics,
306                stopWords: stop_words.map_or(ptr::null(), |x| x.as_ptr()),
307            };
308            unsafe {
309                c4db_createIndex(
310                    self.inner.0.as_ptr(),
311                    index_name.into(),
312                    expression_json.into(),
313                    index_type,
314                    &opts,
315                    &mut c4err,
316                )
317            }
318        } else {
319            unsafe {
320                c4db_createIndex(
321                    self.inner.0.as_ptr(),
322                    index_name.into(),
323                    expression_json.into(),
324                    index_type,
325                    ptr::null(),
326                    &mut c4err,
327                )
328            }
329        };
330        if result {
331            Ok(())
332        } else {
333            Err(c4err.into())
334        }
335    }
336
337    /// Returns the name of the database, as given to `c4db_openNamed`.
338    /// This is the filename _without_ the ".cblite2" extension.
339    #[inline]
340    pub fn name(&self) -> Result<&str> {
341        unsafe { c4db_getName(self.inner.0.as_ptr()) }
342            .try_into()
343            .map_err(|_| Error::InvalidUtf8)
344    }
345
346    /// Returns the database's public and private UUIDs
347    #[inline]
348    pub fn uuids(&self) -> Result<(Uuid, Uuid)> {
349        let mut public = MaybeUninit::<C4UUID>::uninit();
350        let mut private = MaybeUninit::<C4UUID>::uninit();
351        let mut c4err = c4error_init();
352        if unsafe {
353            c4db_getUUIDs(
354                self.inner.0.as_ptr(),
355                public.as_mut_ptr(),
356                private.as_mut_ptr(),
357                &mut c4err,
358            )
359        } {
360            let public = Uuid::from_bytes(unsafe { public.assume_init() }.bytes);
361            let private = Uuid::from_bytes(unsafe { private.assume_init() }.bytes);
362            Ok((public, private))
363        } else {
364            Err(c4err.into())
365        }
366    }
367
368    pub(crate) fn do_internal_get(
369        &self,
370        doc_id: &str,
371        must_exists: bool,
372        content_level: C4DocContentLevel,
373    ) -> Result<C4DocumentOwner> {
374        let mut c4err = c4error_init();
375        let c4doc = unsafe {
376            c4db_getDoc(
377                self.inner.0.as_ptr(),
378                doc_id.as_bytes().into(),
379                must_exists,
380                content_level,
381                &mut c4err,
382            )
383        };
384        NonNull::new(c4doc)
385            .ok_or_else(|| c4err.into())
386            .map(C4DocumentOwner)
387    }
388
389    pub(crate) fn do_internal_get_opt(
390        &self,
391        doc_id: &str,
392        must_exists: bool,
393        content_level: C4DocContentLevel,
394    ) -> Result<Option<C4DocumentOwner>> {
395        match self.do_internal_get(doc_id, must_exists, content_level) {
396            Ok(x) => Ok(Some(x)),
397            Err(Error::C4Error(err))
398                if err.domain == C4ErrorDomain::LiteCoreDomain
399                    && err.code == C4ErrorCode::kC4ErrorNotFound.0 =>
400            {
401                Ok(None)
402            }
403            Err(err) => Err(err),
404        }
405    }
406
407    pub(crate) fn internal_get(&self, doc_id: &str, must_exists: bool) -> Result<C4DocumentOwner> {
408        self.do_internal_get(doc_id, must_exists, C4DocContentLevel::kDocGetCurrentRev)
409    }
410}
411
412static DB_LOG_HANDLER: Once = Once::new();