1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
use crate::{
    doc_enumerator::{DocEnumerator, DocEnumeratorFlags},
    document::{C4DocumentOwner, Document},
    error::{c4error_init, Error, Result},
    ffi::{
        c4db_createIndex, c4db_getDoc, c4db_getDocumentCount, c4db_getIndexesInfo, c4db_getName,
        c4db_getSharedFleeceEncoder, c4db_openNamed, c4db_release, kC4DB_Create, kC4DB_NoUpgrade,
        kC4DB_NonObservable, kC4DB_ReadOnly, C4Database, C4DatabaseConfig2, C4DocContentLevel,
        C4EncryptionAlgorithm, C4EncryptionKey, C4ErrorCode, C4ErrorDomain, C4IndexOptions,
        C4IndexType,
    },
    index::{DbIndexesListIterator, IndexInfo, IndexOptions, IndexType},
    log_reroute::c4log_to_log_init,
    observer::{DatabaseObserver, ObserverdChangesIter},
    query::Query,
    transaction::Transaction,
    QueryLanguage,
};
use bitflags::bitflags;
use fallible_streaming_iterator::FallibleStreamingIterator;
use log::{debug, error, trace};
use serde_fleece::FlEncoderSession;
use std::{
    collections::HashSet,
    ffi::CString,
    marker::PhantomData,
    path::Path,
    ptr::{self, NonNull},
    sync::{Arc, Mutex, Once},
};

/// Database configuration, used during open
pub struct DatabaseConfig<'a> {
    inner: Result<C4DatabaseConfig2>,
    phantom: PhantomData<&'a Path>,
}

bitflags! {
    #[repr(transparent)]
    pub struct DatabaseFlags: u32 {
        /// Create the file if it doesn't exist
        const CREATE = kC4DB_Create;
        /// Open file read-only
        const READ_ONLY = kC4DB_ReadOnly;
        /// Disable upgrading an older-version database
        const NO_UPGRADE = kC4DB_NoUpgrade;
        /// Disable database/collection observers, for slightly faster writes
        const NON_OBSERVABLE = kC4DB_NonObservable;
    }
}

impl<'a> DatabaseConfig<'a> {
    pub fn new(parent_directory: &'a Path, flags: DatabaseFlags) -> Self {
        let os_path_utf8 = match parent_directory.to_str() {
            Some(x) => x,
            None => {
                return Self {
                    inner: Err(Error::InvalidUtf8),
                    phantom: PhantomData,
                }
            }
        };
        Self {
            inner: Ok(C4DatabaseConfig2 {
                parentDirectory: os_path_utf8.into(),
                flags: flags.bits(),
                encryptionKey: C4EncryptionKey {
                    algorithm: C4EncryptionAlgorithm::kC4EncryptionNone,
                    bytes: [0; 32],
                },
            }),
            phantom: PhantomData,
        }
    }
}

/// A connection to a couchbase-lite database.
pub struct Database {
    pub(crate) inner: DbInner,
    pub(crate) db_events: Arc<Mutex<HashSet<usize>>>,
    pub(crate) db_observers: Vec<DatabaseObserver>,
}

pub(crate) struct DbInner(pub NonNull<C4Database>);
/// According to
/// https://github.com/couchbase/couchbase-lite-core/wiki/Thread-Safety
/// it is possible to call from any thread, but not concurrently
unsafe impl Send for DbInner {}

impl Drop for DbInner {
    fn drop(&mut self) {
        trace!("release db {:?}", self.0.as_ptr());
        unsafe { c4db_release(self.0.as_ptr()) };
    }
}

impl Drop for Database {
    fn drop(&mut self) {
        self.db_observers.clear();
    }
}

impl Database {
    pub fn open_named(name: &str, cfg: DatabaseConfig) -> Result<Self> {
        DB_LOG_HANDLER.call_once(|| {
            debug!("init couchbase log to rust log rerouting");
            c4log_to_log_init();
        });
        let cfg = cfg.inner?;
        let mut error = c4error_init();
        let db_ptr = unsafe { c4db_openNamed(name.into(), &cfg, &mut error) };
        NonNull::new(db_ptr)
            .map(|inner| Database {
                inner: DbInner(inner),
                db_events: Arc::new(Mutex::new(HashSet::new())),
                db_observers: Vec::new(),
            })
            .ok_or_else(|| error.into())
    }
    pub fn open_with_flags(path: &Path, flags: DatabaseFlags) -> Result<Self> {
        let parent_path = path
            .parent()
            .ok_or_else(|| Error::LogicError(format!("path {:?} has no parent diretory", path)))?;
        let cfg = DatabaseConfig::new(parent_path, flags);
        let db_name = path
            .file_name()
            .ok_or_else(|| Error::LogicError(format!("path {:?} has no last part", path)))?
            .to_str()
            .ok_or(Error::InvalidUtf8)?
            .strip_suffix(".cblite2")
            .ok_or_else(|| {
                Error::LogicError(format!(
                    "path {:?} should have last part with .cblite2 suffix",
                    path
                ))
            })?;

        Database::open_named(db_name, cfg)
    }
    /// Begin a new transaction, the transaction defaults to rolling back
    /// when it is dropped. If you want the transaction to commit,
    /// you must call `Transaction::commit`
    pub fn transaction(&mut self) -> Result<Transaction> {
        Transaction::new(self)
    }
    /// Returns the number of (undeleted) documents in the database
    pub fn document_count(&self) -> u64 {
        unsafe { c4db_getDocumentCount(self.inner.0.as_ptr()) }
    }
    /// Return existing document from database
    pub fn get_existing(&self, doc_id: &str) -> Result<Document> {
        self.internal_get(doc_id, true)
            .map(|x| Document::new_internal(x, doc_id))
    }
    /// Compiles a query from an expression given as JSON.
    /// The expression is a predicate that describes which documents should be returned.
    /// A separate, optional sort expression describes the ordering of the results.
    pub fn query(&self, query_json: &str) -> Result<Query> {
        Query::new(self, QueryLanguage::kC4JSONQuery, query_json)
    }
    /// Compiles a query from an expression given as N1QL.
    pub fn n1ql_query(&self, query: &str) -> Result<Query> {
        Query::new(self, QueryLanguage::kC4N1QLQuery, query)
    }
    /// Creates an enumerator ordered by docID.
    pub fn enumerate_all_docs(&self, flags: DocEnumeratorFlags) -> Result<DocEnumerator> {
        DocEnumerator::enumerate_all_docs(self, flags)
    }

    /// Register a database observer, with a callback that will be invoked after the database
    /// changes. The callback will be called _once_, after the first change. After that it won't
    /// be called again until all of the changes have been read by calling `Database::observed_changes`.
    pub fn register_observer<F>(&mut self, mut callback_f: F) -> Result<()>
    where
        F: FnMut() + Send + 'static,
    {
        let db_events = self.db_events.clone();
        let obs = DatabaseObserver::new(self, move |obs| {
            {
                match db_events.lock() {
                    Ok(mut db_events) => {
                        db_events.insert(obs as usize);
                    }
                    Err(err) => {
                        error!(
                            "register_observer::DatabaseObserver::lambda db_events lock failed: {}",
                            err
                        );
                    }
                }
            }
            callback_f();
        })?;
        self.db_observers.push(obs);
        Ok(())
    }

    /// Remove all database observers
    pub fn clear_observers(&mut self) {
        self.db_observers.clear();
    }

    /// Get observed changes for this database
    pub fn observed_changes(&mut self) -> ObserverdChangesIter {
        ObserverdChangesIter {
            db: self,
            obs_it: None,
        }
    }

    /// Intialize socket implementation for replication
    /// (builtin couchbase-lite websocket library)
    #[cfg(feature = "use-couchbase-lite-websocket")]
    pub fn init_socket_impl() {
        crate::replicator::init_builtin_socket_impl();
    }

    /// Intialize socket implementation for replication
    /// (builtin couchbase-lite websocket library)
    #[cfg(feature = "use-tokio-websocket")]
    pub fn init_socket_impl(handle: tokio::runtime::Handle) {
        crate::replicator::init_tokio_socket_impl(handle);
    }

    /// Get shared "fleece" encoder, `&mut self` to make possible
    /// exists only one session
    pub fn shared_encoder_session(&mut self) -> Result<FlEncoderSession> {
        let enc = unsafe { c4db_getSharedFleeceEncoder(self.inner.0.as_ptr()) };
        NonNull::new(enc)
            .ok_or_else(|| {
                Error::LogicError("c4db_getSharedFleeceEncoder return null.into()".into())
            })
            .map(FlEncoderSession::new)
    }

    /// Returns the names of all indexes in the database
    pub fn get_indexes(
        &self,
    ) -> Result<impl FallibleStreamingIterator<Item = IndexInfo, Error = Error>> {
        let mut c4err = c4error_init();
        let enc_data = unsafe { c4db_getIndexesInfo(self.inner.0.as_ptr(), &mut c4err) };
        if enc_data.buf.is_null() {
            return Err(c4err.into());
        }

        let indexes_list = DbIndexesListIterator::new(enc_data)?;
        Ok(indexes_list)
    }

    /// Creates a database index, of the values of specific expressions across
    /// all documents. The name is used to identify the index for later updating
    /// or deletion; if an index with the same name already exists, it will be
    /// replaced unless it has the exact same expressions.
    /// Note: If some documents are missing the values to be indexed,
    /// those documents will just be omitted from the index. It's not an error.
    pub fn create_index(
        &mut self,
        index_name: &str,
        expression_json: &str,
        index_type: IndexType,
        index_options: Option<IndexOptions>,
    ) -> Result<()> {
        use IndexType::*;
        let index_type = match index_type {
            ValueIndex => C4IndexType::kC4ValueIndex,
            FullTextIndex => C4IndexType::kC4FullTextIndex,
            ArrayIndex => C4IndexType::kC4ArrayIndex,
            PredictiveIndex => C4IndexType::kC4PredictiveIndex,
        };
        let mut c4err = c4error_init();
        let result = if let Some(index_options) = index_options {
            let language = CString::new(index_options.language)?;
            let stop_words: Option<CString> = if let Some(stop_words) = index_options.stop_words {
                let mut list = String::with_capacity(stop_words.len() * 5);
                for word in stop_words {
                    if !list.is_empty() {
                        list.push(' ');
                    }
                    list.push_str(word);
                }
                Some(CString::new(list)?)
            } else {
                None
            };

            let opts = C4IndexOptions {
                language: language.as_ptr(),
                disableStemming: index_options.disable_stemming,
                ignoreDiacritics: index_options.ignore_diacritics,
                stopWords: stop_words.map_or(ptr::null(), |x| x.as_ptr()),
            };
            unsafe {
                c4db_createIndex(
                    self.inner.0.as_ptr(),
                    index_name.into(),
                    expression_json.into(),
                    index_type,
                    &opts,
                    &mut c4err,
                )
            }
        } else {
            unsafe {
                c4db_createIndex(
                    self.inner.0.as_ptr(),
                    index_name.into(),
                    expression_json.into(),
                    index_type,
                    ptr::null(),
                    &mut c4err,
                )
            }
        };
        if result {
            Ok(())
        } else {
            Err(c4err.into())
        }
    }

    /// Returns the name of the database, as given to `c4db_openNamed`.
    /// This is the filename _without_ the ".cblite2" extension.
    #[inline]
    pub fn name(&self) -> Result<&str> {
        unsafe { c4db_getName(self.inner.0.as_ptr()) }
            .try_into()
            .map_err(|_| Error::InvalidUtf8)
    }

    pub(crate) fn do_internal_get(
        &self,
        doc_id: &str,
        must_exists: bool,
        content_level: C4DocContentLevel,
    ) -> Result<C4DocumentOwner> {
        let mut c4err = c4error_init();
        let c4doc = unsafe {
            c4db_getDoc(
                self.inner.0.as_ptr(),
                doc_id.as_bytes().into(),
                must_exists,
                content_level,
                &mut c4err,
            )
        };
        NonNull::new(c4doc)
            .ok_or_else(|| c4err.into())
            .map(C4DocumentOwner)
    }

    pub(crate) fn do_internal_get_opt(
        &self,
        doc_id: &str,
        must_exists: bool,
        content_level: C4DocContentLevel,
    ) -> Result<Option<C4DocumentOwner>> {
        match self.do_internal_get(doc_id, must_exists, content_level) {
            Ok(x) => Ok(Some(x)),
            Err(Error::C4Error(err))
                if err.domain == C4ErrorDomain::LiteCoreDomain
                    && err.code == C4ErrorCode::kC4ErrorNotFound.0 =>
            {
                Ok(None)
            }
            Err(err) => Err(err),
        }
    }

    #[inline]
    pub(crate) fn internal_get(&self, doc_id: &str, must_exists: bool) -> Result<C4DocumentOwner> {
        self.do_internal_get(doc_id, must_exists, C4DocContentLevel::kDocGetCurrentRev)
    }
}

static DB_LOG_HANDLER: Once = Once::new();