couchbase_lite/
database.rs

1use crate::{
2    doc_enumerator::{DocEnumerator, DocEnumeratorFlags},
3    document::{C4DocumentOwner, Document},
4    error::{c4error_init, Error, Result},
5    ffi::{
6        c4db_createIndex, c4db_getDoc, c4db_getDocumentCount, c4db_getIndexesInfo, c4db_getName,
7        c4db_getSharedFleeceEncoder, c4db_openNamed, c4db_release, C4Database, C4DatabaseConfig2,
8        C4DatabaseFlags, C4DocContentLevel, C4EncryptionAlgorithm, C4EncryptionKey, C4ErrorCode,
9        C4ErrorDomain, C4IndexOptions, C4IndexType,
10    },
11    index::{DbIndexesListIterator, IndexInfo, IndexOptions, IndexType},
12    log_reroute::c4log_to_log_init,
13    observer::{DatabaseObserver, ObserverdChangesIter},
14    query::Query,
15    transaction::Transaction,
16    QueryLanguage,
17};
18use bitflags::bitflags;
19use fallible_streaming_iterator::FallibleStreamingIterator;
20use log::{debug, error, trace};
21use serde_fleece::FlEncoderSession;
22use std::{
23    collections::HashSet,
24    ffi::CString,
25    marker::PhantomData,
26    path::Path,
27    ptr::{self, NonNull},
28    sync::{Arc, Mutex, Once},
29};
30
31/// Database configuration, used during open
32pub struct DatabaseConfig<'a> {
33    inner: Result<C4DatabaseConfig2>,
34    phantom: PhantomData<&'a Path>,
35}
36
37bitflags! {
38    #[repr(transparent)]
39    #[derive(Debug)]
40    pub struct DatabaseFlags: u32 {
41        /// Create the file if it doesn't exist
42        const CREATE = C4DatabaseFlags::kC4DB_Create.0;
43        /// Open file read-only
44        const READ_ONLY = C4DatabaseFlags::kC4DB_ReadOnly.0;
45        /// Disable upgrading an older-version database
46        const NO_UPGRADE = C4DatabaseFlags::kC4DB_NoUpgrade.0;
47        /// Disable database/collection observers, for slightly faster writes
48        const NON_OBSERVABLE = C4DatabaseFlags::kC4DB_NonObservable.0;
49    }
50}
51
52impl<'a> DatabaseConfig<'a> {
53    pub fn new(parent_directory: &'a Path, flags: DatabaseFlags) -> Self {
54        let os_path_utf8 = match parent_directory.to_str() {
55            Some(x) => x,
56            None => {
57                return Self {
58                    inner: Err(Error::InvalidUtf8),
59                    phantom: PhantomData,
60                }
61            }
62        };
63        Self {
64            inner: Ok(C4DatabaseConfig2 {
65                parentDirectory: os_path_utf8.into(),
66                flags: C4DatabaseFlags(flags.bits()),
67                encryptionKey: C4EncryptionKey {
68                    algorithm: C4EncryptionAlgorithm::kC4EncryptionNone,
69                    bytes: [0; 32],
70                },
71            }),
72            phantom: PhantomData,
73        }
74    }
75}
76
77/// A connection to a couchbase-lite database.
78pub struct Database {
79    pub(crate) inner: DbInner,
80    pub(crate) db_events: Arc<Mutex<HashSet<usize>>>,
81    pub(crate) db_observers: Vec<DatabaseObserver>,
82}
83
84pub(crate) struct DbInner(pub NonNull<C4Database>);
85/// According to
86/// https://github.com/couchbase/couchbase-lite-core/wiki/Thread-Safety
87/// it is possible to call from any thread, but not concurrently
88unsafe impl Send for DbInner {}
89
90impl Drop for DbInner {
91    fn drop(&mut self) {
92        trace!("release db {:?}", self.0.as_ptr());
93        unsafe { c4db_release(self.0.as_ptr()) };
94    }
95}
96
97impl Drop for Database {
98    #[inline]
99    fn drop(&mut self) {
100        self.db_observers.clear();
101    }
102}
103
104impl Database {
105    pub fn open_named(name: &str, cfg: DatabaseConfig) -> Result<Self> {
106        DB_LOG_HANDLER.call_once(|| {
107            debug!("init couchbase log to rust log rerouting");
108            c4log_to_log_init();
109        });
110        let cfg = cfg.inner?;
111        let mut error = c4error_init();
112        let db_ptr = unsafe { c4db_openNamed(name.into(), &cfg, &mut error) };
113        NonNull::new(db_ptr)
114            .map(|inner| Database {
115                inner: DbInner(inner),
116                db_events: Arc::new(Mutex::new(HashSet::new())),
117                db_observers: Vec::new(),
118            })
119            .ok_or_else(|| error.into())
120    }
121    pub fn open_with_flags(path: &Path, flags: DatabaseFlags) -> Result<Self> {
122        let parent_path = path.parent().ok_or_else(|| {
123            Error::LogicError(format!("path {path:?} has no parent diretory").into())
124        })?;
125        let cfg = DatabaseConfig::new(parent_path, flags);
126        let db_name = path
127            .file_name()
128            .ok_or_else(|| Error::LogicError(format!("path {path:?} has no last part").into()))?
129            .to_str()
130            .ok_or(Error::InvalidUtf8)?
131            .strip_suffix(".cblite2")
132            .ok_or_else(|| {
133                Error::LogicError(
134                    format!("path {path:?} should have last part with .cblite2 suffix").into(),
135                )
136            })?;
137
138        Database::open_named(db_name, cfg)
139    }
140    /// Begin a new transaction, the transaction defaults to rolling back
141    /// when it is dropped. If you want the transaction to commit,
142    /// you must call `Transaction::commit`
143    #[inline]
144    pub fn transaction<'a>(&'a mut self) -> Result<Transaction<'a>> {
145        Transaction::new(self)
146    }
147    /// Returns the number of (undeleted) documents in the database
148    #[inline]
149    pub fn document_count(&self) -> u64 {
150        unsafe { c4db_getDocumentCount(self.inner.0.as_ptr()) }
151    }
152    /// Return existing document from database
153    #[inline]
154    pub fn get_existing(&self, doc_id: &str) -> Result<Document> {
155        self.internal_get(doc_id, true)
156            .map(|x| Document::new_internal(x, doc_id))
157    }
158    /// Compiles a query from an expression given as JSON.
159    /// The expression is a predicate that describes which documents should be returned.
160    /// A separate, optional sort expression describes the ordering of the results.
161    #[inline]
162    pub fn query<'a>(&'a self, query_json: &str) -> Result<Query<'a>> {
163        Query::new(self, QueryLanguage::kC4JSONQuery, query_json)
164    }
165    /// Compiles a query from an expression given as N1QL.
166    #[inline]
167    pub fn n1ql_query<'a>(&'a self, query: &str) -> Result<Query<'a>> {
168        Query::new(self, QueryLanguage::kC4N1QLQuery, query)
169    }
170    /// Creates an enumerator ordered by docID.
171    #[inline]
172    pub fn enumerate_all_docs<'a>(
173        &'a self,
174        flags: DocEnumeratorFlags,
175    ) -> Result<DocEnumerator<'a>> {
176        DocEnumerator::enumerate_all_docs(self, flags)
177    }
178
179    /// Register a database observer, with a callback that will be invoked after the database
180    /// changes. The callback will be called _once_, after the first change. After that it won't
181    /// be called again until all of the changes have been read by calling `Database::observed_changes`.
182    pub fn register_observer<F>(&mut self, mut callback_f: F) -> Result<()>
183    where
184        F: FnMut() + Send + 'static,
185    {
186        let db_events = self.db_events.clone();
187        let obs = DatabaseObserver::new(self, move |obs| {
188            {
189                match db_events.lock() {
190                    Ok(mut db_events) => {
191                        db_events.insert(obs as usize);
192                    }
193                    Err(err) => {
194                        error!(
195                            "register_observer::DatabaseObserver::lambda db_events lock failed: {}",
196                            err
197                        );
198                    }
199                }
200            }
201            callback_f();
202        })?;
203        self.db_observers.push(obs);
204        Ok(())
205    }
206
207    /// Remove all database observers
208    #[inline]
209    pub fn clear_observers(&mut self) {
210        self.db_observers.clear();
211    }
212
213    /// Get observed changes for this database
214    #[inline]
215    pub fn observed_changes<'a>(&'a mut self) -> ObserverdChangesIter<'a> {
216        ObserverdChangesIter {
217            db: self,
218            obs_it: None,
219        }
220    }
221
222    /// Intialize socket implementation for replication
223    /// (builtin couchbase-lite websocket library)
224    #[cfg(feature = "use-couchbase-lite-websocket")]
225    #[inline]
226    pub fn init_socket_impl() {
227        crate::replicator::init_builtin_socket_impl();
228    }
229
230    /// Intialize socket implementation for replication
231    /// (builtin couchbase-lite websocket library)
232    #[cfg(feature = "use-tokio-websocket")]
233    #[inline]
234    pub fn init_socket_impl(handle: tokio::runtime::Handle) {
235        crate::replicator::init_tokio_socket_impl(handle);
236    }
237
238    /// Get shared "fleece" encoder, `&mut self` to make possible
239    /// exists only one session
240    pub fn shared_encoder_session(&mut self) -> Result<FlEncoderSession> {
241        let enc = unsafe { c4db_getSharedFleeceEncoder(self.inner.0.as_ptr()) };
242        NonNull::new(enc)
243            .ok_or_else(|| {
244                Error::LogicError("c4db_getSharedFleeceEncoder return null.into()".into())
245            })
246            .map(FlEncoderSession::new)
247    }
248
249    /// Returns the names of all indexes in the database
250    pub fn get_indexes(
251        &self,
252    ) -> Result<impl FallibleStreamingIterator<Item = IndexInfo, Error = Error>> {
253        let mut c4err = c4error_init();
254        let enc_data = unsafe { c4db_getIndexesInfo(self.inner.0.as_ptr(), &mut c4err) };
255        if enc_data.buf.is_null() {
256            return Err(c4err.into());
257        }
258
259        let indexes_list = DbIndexesListIterator::new(enc_data)?;
260        Ok(indexes_list)
261    }
262
263    /// Creates a database index, of the values of specific expressions across
264    /// all documents. The name is used to identify the index for later updating
265    /// or deletion; if an index with the same name already exists, it will be
266    /// replaced unless it has the exact same expressions.
267    /// Note: If some documents are missing the values to be indexed,
268    /// those documents will just be omitted from the index. It's not an error.
269    pub fn create_index(
270        &mut self,
271        index_name: &str,
272        expression_json: &str,
273        index_type: IndexType,
274        index_options: Option<IndexOptions>,
275    ) -> Result<()> {
276        use IndexType::*;
277        let index_type = match index_type {
278            ValueIndex => C4IndexType::kC4ValueIndex,
279            FullTextIndex => C4IndexType::kC4FullTextIndex,
280            ArrayIndex => C4IndexType::kC4ArrayIndex,
281            PredictiveIndex => C4IndexType::kC4PredictiveIndex,
282        };
283        let mut c4err = c4error_init();
284        let result = if let Some(index_options) = index_options {
285            let language = CString::new(index_options.language)?;
286            let stop_words: Option<CString> = if let Some(stop_words) = index_options.stop_words {
287                let mut list = String::with_capacity(stop_words.len() * 5);
288                for word in stop_words {
289                    if !list.is_empty() {
290                        list.push(' ');
291                    }
292                    list.push_str(word);
293                }
294                Some(CString::new(list)?)
295            } else {
296                None
297            };
298
299            let opts = C4IndexOptions {
300                language: language.as_ptr(),
301                disableStemming: index_options.disable_stemming,
302                ignoreDiacritics: index_options.ignore_diacritics,
303                stopWords: stop_words.map_or(ptr::null(), |x| x.as_ptr()),
304            };
305            unsafe {
306                c4db_createIndex(
307                    self.inner.0.as_ptr(),
308                    index_name.into(),
309                    expression_json.into(),
310                    index_type,
311                    &opts,
312                    &mut c4err,
313                )
314            }
315        } else {
316            unsafe {
317                c4db_createIndex(
318                    self.inner.0.as_ptr(),
319                    index_name.into(),
320                    expression_json.into(),
321                    index_type,
322                    ptr::null(),
323                    &mut c4err,
324                )
325            }
326        };
327        if result {
328            Ok(())
329        } else {
330            Err(c4err.into())
331        }
332    }
333
334    /// Returns the name of the database, as given to `c4db_openNamed`.
335    /// This is the filename _without_ the ".cblite2" extension.
336    #[inline]
337    pub fn name(&self) -> Result<&str> {
338        unsafe { c4db_getName(self.inner.0.as_ptr()) }
339            .try_into()
340            .map_err(|_| Error::InvalidUtf8)
341    }
342
343    pub(crate) fn do_internal_get(
344        &self,
345        doc_id: &str,
346        must_exists: bool,
347        content_level: C4DocContentLevel,
348    ) -> Result<C4DocumentOwner> {
349        let mut c4err = c4error_init();
350        let c4doc = unsafe {
351            c4db_getDoc(
352                self.inner.0.as_ptr(),
353                doc_id.as_bytes().into(),
354                must_exists,
355                content_level,
356                &mut c4err,
357            )
358        };
359        NonNull::new(c4doc)
360            .ok_or_else(|| c4err.into())
361            .map(C4DocumentOwner)
362    }
363
364    pub(crate) fn do_internal_get_opt(
365        &self,
366        doc_id: &str,
367        must_exists: bool,
368        content_level: C4DocContentLevel,
369    ) -> Result<Option<C4DocumentOwner>> {
370        match self.do_internal_get(doc_id, must_exists, content_level) {
371            Ok(x) => Ok(Some(x)),
372            Err(Error::C4Error(err))
373                if err.domain == C4ErrorDomain::LiteCoreDomain
374                    && err.code == C4ErrorCode::kC4ErrorNotFound.0 =>
375            {
376                Ok(None)
377            }
378            Err(err) => Err(err),
379        }
380    }
381
382    pub(crate) fn internal_get(&self, doc_id: &str, must_exists: bool) -> Result<C4DocumentOwner> {
383        self.do_internal_get(doc_id, must_exists, C4DocContentLevel::kDocGetCurrentRev)
384    }
385}
386
387static DB_LOG_HANDLER: Once = Once::new();