1use crate::{
2 doc_enumerator::{DocEnumerator, DocEnumeratorFlags},
3 document::{C4DocumentOwner, Document},
4 error::{c4error_init, Error, Result},
5 ffi::{
6 c4db_createIndex, c4db_getDoc, c4db_getDocumentCount, c4db_getIndexesInfo, c4db_getName,
7 c4db_getSharedFleeceEncoder, c4db_openNamed, c4db_release, C4Database, C4DatabaseConfig2,
8 C4DatabaseFlags, C4DocContentLevel, C4EncryptionAlgorithm, C4EncryptionKey, C4ErrorCode,
9 C4ErrorDomain, C4IndexOptions, C4IndexType,
10 },
11 index::{DbIndexesListIterator, IndexInfo, IndexOptions, IndexType},
12 log_reroute::c4log_to_log_init,
13 observer::{DatabaseObserver, ObserverdChangesIter},
14 query::Query,
15 transaction::Transaction,
16 QueryLanguage,
17};
18use bitflags::bitflags;
19use fallible_streaming_iterator::FallibleStreamingIterator;
20use log::{debug, error, trace};
21use serde_fleece::FlEncoderSession;
22use std::{
23 collections::HashSet,
24 ffi::CString,
25 marker::PhantomData,
26 path::Path,
27 ptr::{self, NonNull},
28 sync::{Arc, Mutex, Once},
29};
30
31pub struct DatabaseConfig<'a> {
33 inner: Result<C4DatabaseConfig2>,
34 phantom: PhantomData<&'a Path>,
35}
36
37bitflags! {
38 #[repr(transparent)]
39 #[derive(Debug)]
40 pub struct DatabaseFlags: u32 {
41 const CREATE = C4DatabaseFlags::kC4DB_Create.0;
43 const READ_ONLY = C4DatabaseFlags::kC4DB_ReadOnly.0;
45 const NO_UPGRADE = C4DatabaseFlags::kC4DB_NoUpgrade.0;
47 const NON_OBSERVABLE = C4DatabaseFlags::kC4DB_NonObservable.0;
49 }
50}
51
52impl<'a> DatabaseConfig<'a> {
53 pub fn new(parent_directory: &'a Path, flags: DatabaseFlags) -> Self {
54 let os_path_utf8 = match parent_directory.to_str() {
55 Some(x) => x,
56 None => {
57 return Self {
58 inner: Err(Error::InvalidUtf8),
59 phantom: PhantomData,
60 }
61 }
62 };
63 Self {
64 inner: Ok(C4DatabaseConfig2 {
65 parentDirectory: os_path_utf8.into(),
66 flags: C4DatabaseFlags(flags.bits()),
67 encryptionKey: C4EncryptionKey {
68 algorithm: C4EncryptionAlgorithm::kC4EncryptionNone,
69 bytes: [0; 32],
70 },
71 }),
72 phantom: PhantomData,
73 }
74 }
75}
76
77pub struct Database {
79 pub(crate) inner: DbInner,
80 pub(crate) db_events: Arc<Mutex<HashSet<usize>>>,
81 pub(crate) db_observers: Vec<DatabaseObserver>,
82}
83
84pub(crate) struct DbInner(pub NonNull<C4Database>);
85unsafe impl Send for DbInner {}
89
90impl Drop for DbInner {
91 fn drop(&mut self) {
92 trace!("release db {:?}", self.0.as_ptr());
93 unsafe { c4db_release(self.0.as_ptr()) };
94 }
95}
96
97impl Drop for Database {
98 #[inline]
99 fn drop(&mut self) {
100 self.db_observers.clear();
101 }
102}
103
104impl Database {
105 pub fn open_named(name: &str, cfg: DatabaseConfig) -> Result<Self> {
106 DB_LOG_HANDLER.call_once(|| {
107 debug!("init couchbase log to rust log rerouting");
108 c4log_to_log_init();
109 });
110 let cfg = cfg.inner?;
111 let mut error = c4error_init();
112 let db_ptr = unsafe { c4db_openNamed(name.into(), &cfg, &mut error) };
113 NonNull::new(db_ptr)
114 .map(|inner| Database {
115 inner: DbInner(inner),
116 db_events: Arc::new(Mutex::new(HashSet::new())),
117 db_observers: Vec::new(),
118 })
119 .ok_or_else(|| error.into())
120 }
121 pub fn open_with_flags(path: &Path, flags: DatabaseFlags) -> Result<Self> {
122 let parent_path = path.parent().ok_or_else(|| {
123 Error::LogicError(format!("path {path:?} has no parent diretory").into())
124 })?;
125 let cfg = DatabaseConfig::new(parent_path, flags);
126 let db_name = path
127 .file_name()
128 .ok_or_else(|| Error::LogicError(format!("path {path:?} has no last part").into()))?
129 .to_str()
130 .ok_or(Error::InvalidUtf8)?
131 .strip_suffix(".cblite2")
132 .ok_or_else(|| {
133 Error::LogicError(
134 format!("path {path:?} should have last part with .cblite2 suffix").into(),
135 )
136 })?;
137
138 Database::open_named(db_name, cfg)
139 }
140 #[inline]
144 pub fn transaction<'a>(&'a mut self) -> Result<Transaction<'a>> {
145 Transaction::new(self)
146 }
147 #[inline]
149 pub fn document_count(&self) -> u64 {
150 unsafe { c4db_getDocumentCount(self.inner.0.as_ptr()) }
151 }
152 #[inline]
154 pub fn get_existing(&self, doc_id: &str) -> Result<Document> {
155 self.internal_get(doc_id, true)
156 .map(|x| Document::new_internal(x, doc_id))
157 }
158 #[inline]
162 pub fn query<'a>(&'a self, query_json: &str) -> Result<Query<'a>> {
163 Query::new(self, QueryLanguage::kC4JSONQuery, query_json)
164 }
165 #[inline]
167 pub fn n1ql_query<'a>(&'a self, query: &str) -> Result<Query<'a>> {
168 Query::new(self, QueryLanguage::kC4N1QLQuery, query)
169 }
170 #[inline]
172 pub fn enumerate_all_docs<'a>(
173 &'a self,
174 flags: DocEnumeratorFlags,
175 ) -> Result<DocEnumerator<'a>> {
176 DocEnumerator::enumerate_all_docs(self, flags)
177 }
178
179 pub fn register_observer<F>(&mut self, mut callback_f: F) -> Result<()>
183 where
184 F: FnMut() + Send + 'static,
185 {
186 let db_events = self.db_events.clone();
187 let obs = DatabaseObserver::new(self, move |obs| {
188 {
189 match db_events.lock() {
190 Ok(mut db_events) => {
191 db_events.insert(obs as usize);
192 }
193 Err(err) => {
194 error!(
195 "register_observer::DatabaseObserver::lambda db_events lock failed: {}",
196 err
197 );
198 }
199 }
200 }
201 callback_f();
202 })?;
203 self.db_observers.push(obs);
204 Ok(())
205 }
206
207 #[inline]
209 pub fn clear_observers(&mut self) {
210 self.db_observers.clear();
211 }
212
213 #[inline]
215 pub fn observed_changes<'a>(&'a mut self) -> ObserverdChangesIter<'a> {
216 ObserverdChangesIter {
217 db: self,
218 obs_it: None,
219 }
220 }
221
222 #[cfg(feature = "use-couchbase-lite-websocket")]
225 #[inline]
226 pub fn init_socket_impl() {
227 crate::replicator::init_builtin_socket_impl();
228 }
229
230 #[cfg(feature = "use-tokio-websocket")]
233 #[inline]
234 pub fn init_socket_impl(handle: tokio::runtime::Handle) {
235 crate::replicator::init_tokio_socket_impl(handle);
236 }
237
238 pub fn shared_encoder_session(&mut self) -> Result<FlEncoderSession> {
241 let enc = unsafe { c4db_getSharedFleeceEncoder(self.inner.0.as_ptr()) };
242 NonNull::new(enc)
243 .ok_or_else(|| {
244 Error::LogicError("c4db_getSharedFleeceEncoder return null.into()".into())
245 })
246 .map(FlEncoderSession::new)
247 }
248
249 pub fn get_indexes(
251 &self,
252 ) -> Result<impl FallibleStreamingIterator<Item = IndexInfo, Error = Error>> {
253 let mut c4err = c4error_init();
254 let enc_data = unsafe { c4db_getIndexesInfo(self.inner.0.as_ptr(), &mut c4err) };
255 if enc_data.buf.is_null() {
256 return Err(c4err.into());
257 }
258
259 let indexes_list = DbIndexesListIterator::new(enc_data)?;
260 Ok(indexes_list)
261 }
262
263 pub fn create_index(
270 &mut self,
271 index_name: &str,
272 expression_json: &str,
273 index_type: IndexType,
274 index_options: Option<IndexOptions>,
275 ) -> Result<()> {
276 use IndexType::*;
277 let index_type = match index_type {
278 ValueIndex => C4IndexType::kC4ValueIndex,
279 FullTextIndex => C4IndexType::kC4FullTextIndex,
280 ArrayIndex => C4IndexType::kC4ArrayIndex,
281 PredictiveIndex => C4IndexType::kC4PredictiveIndex,
282 };
283 let mut c4err = c4error_init();
284 let result = if let Some(index_options) = index_options {
285 let language = CString::new(index_options.language)?;
286 let stop_words: Option<CString> = if let Some(stop_words) = index_options.stop_words {
287 let mut list = String::with_capacity(stop_words.len() * 5);
288 for word in stop_words {
289 if !list.is_empty() {
290 list.push(' ');
291 }
292 list.push_str(word);
293 }
294 Some(CString::new(list)?)
295 } else {
296 None
297 };
298
299 let opts = C4IndexOptions {
300 language: language.as_ptr(),
301 disableStemming: index_options.disable_stemming,
302 ignoreDiacritics: index_options.ignore_diacritics,
303 stopWords: stop_words.map_or(ptr::null(), |x| x.as_ptr()),
304 };
305 unsafe {
306 c4db_createIndex(
307 self.inner.0.as_ptr(),
308 index_name.into(),
309 expression_json.into(),
310 index_type,
311 &opts,
312 &mut c4err,
313 )
314 }
315 } else {
316 unsafe {
317 c4db_createIndex(
318 self.inner.0.as_ptr(),
319 index_name.into(),
320 expression_json.into(),
321 index_type,
322 ptr::null(),
323 &mut c4err,
324 )
325 }
326 };
327 if result {
328 Ok(())
329 } else {
330 Err(c4err.into())
331 }
332 }
333
334 #[inline]
337 pub fn name(&self) -> Result<&str> {
338 unsafe { c4db_getName(self.inner.0.as_ptr()) }
339 .try_into()
340 .map_err(|_| Error::InvalidUtf8)
341 }
342
343 pub(crate) fn do_internal_get(
344 &self,
345 doc_id: &str,
346 must_exists: bool,
347 content_level: C4DocContentLevel,
348 ) -> Result<C4DocumentOwner> {
349 let mut c4err = c4error_init();
350 let c4doc = unsafe {
351 c4db_getDoc(
352 self.inner.0.as_ptr(),
353 doc_id.as_bytes().into(),
354 must_exists,
355 content_level,
356 &mut c4err,
357 )
358 };
359 NonNull::new(c4doc)
360 .ok_or_else(|| c4err.into())
361 .map(C4DocumentOwner)
362 }
363
364 pub(crate) fn do_internal_get_opt(
365 &self,
366 doc_id: &str,
367 must_exists: bool,
368 content_level: C4DocContentLevel,
369 ) -> Result<Option<C4DocumentOwner>> {
370 match self.do_internal_get(doc_id, must_exists, content_level) {
371 Ok(x) => Ok(Some(x)),
372 Err(Error::C4Error(err))
373 if err.domain == C4ErrorDomain::LiteCoreDomain
374 && err.code == C4ErrorCode::kC4ErrorNotFound.0 =>
375 {
376 Ok(None)
377 }
378 Err(err) => Err(err),
379 }
380 }
381
382 pub(crate) fn internal_get(&self, doc_id: &str, must_exists: bool) -> Result<C4DocumentOwner> {
383 self.do_internal_get(doc_id, must_exists, C4DocContentLevel::kDocGetCurrentRev)
384 }
385}
386
387static DB_LOG_HANDLER: Once = Once::new();