use crate::{
doc_enumerator::{DocEnumerator, DocEnumeratorFlags},
document::{C4DocumentOwner, Document},
error::{c4error_init, Error, Result},
ffi::{
c4db_createIndex, c4db_getDoc, c4db_getDocumentCount, c4db_getIndexesInfo, c4db_getName,
c4db_getSharedFleeceEncoder, c4db_openNamed, c4db_release, C4Database, C4DatabaseConfig2,
C4DatabaseFlags, C4DocContentLevel, C4EncryptionAlgorithm, C4EncryptionKey, C4ErrorCode,
C4ErrorDomain, C4IndexOptions, C4IndexType,
},
index::{DbIndexesListIterator, IndexInfo, IndexOptions, IndexType},
log_reroute::c4log_to_log_init,
observer::{DatabaseObserver, ObserverdChangesIter},
query::Query,
transaction::Transaction,
QueryLanguage,
};
use bitflags::bitflags;
use fallible_streaming_iterator::FallibleStreamingIterator;
use log::{debug, error, trace};
use serde_fleece::FlEncoderSession;
use std::{
collections::HashSet,
ffi::CString,
marker::PhantomData,
path::Path,
ptr::{self, NonNull},
sync::{Arc, Mutex, Once},
};
pub struct DatabaseConfig<'a> {
inner: Result<C4DatabaseConfig2>,
phantom: PhantomData<&'a Path>,
}
bitflags! {
#[repr(transparent)]
pub struct DatabaseFlags: u32 {
const CREATE = C4DatabaseFlags::kC4DB_Create.0;
const READ_ONLY = C4DatabaseFlags::kC4DB_ReadOnly.0;
const NO_UPGRADE = C4DatabaseFlags::kC4DB_NoUpgrade.0;
const NON_OBSERVABLE = C4DatabaseFlags::kC4DB_NonObservable.0;
}
}
impl<'a> DatabaseConfig<'a> {
pub fn new(parent_directory: &'a Path, flags: DatabaseFlags) -> Self {
let os_path_utf8 = match parent_directory.to_str() {
Some(x) => x,
None => {
return Self {
inner: Err(Error::InvalidUtf8),
phantom: PhantomData,
}
}
};
Self {
inner: Ok(C4DatabaseConfig2 {
parentDirectory: os_path_utf8.into(),
flags: C4DatabaseFlags(flags.bits()),
encryptionKey: C4EncryptionKey {
algorithm: C4EncryptionAlgorithm::kC4EncryptionNone,
bytes: [0; 32],
},
}),
phantom: PhantomData,
}
}
}
pub struct Database {
pub(crate) inner: DbInner,
pub(crate) db_events: Arc<Mutex<HashSet<usize>>>,
pub(crate) db_observers: Vec<DatabaseObserver>,
}
pub(crate) struct DbInner(pub NonNull<C4Database>);
unsafe impl Send for DbInner {}
impl Drop for DbInner {
fn drop(&mut self) {
trace!("release db {:?}", self.0.as_ptr());
unsafe { c4db_release(self.0.as_ptr()) };
}
}
impl Drop for Database {
#[inline]
fn drop(&mut self) {
self.db_observers.clear();
}
}
impl Database {
pub fn open_named(name: &str, cfg: DatabaseConfig) -> Result<Self> {
DB_LOG_HANDLER.call_once(|| {
debug!("init couchbase log to rust log rerouting");
c4log_to_log_init();
});
let cfg = cfg.inner?;
let mut error = c4error_init();
let db_ptr = unsafe { c4db_openNamed(name.into(), &cfg, &mut error) };
NonNull::new(db_ptr)
.map(|inner| Database {
inner: DbInner(inner),
db_events: Arc::new(Mutex::new(HashSet::new())),
db_observers: Vec::new(),
})
.ok_or_else(|| error.into())
}
pub fn open_with_flags(path: &Path, flags: DatabaseFlags) -> Result<Self> {
let parent_path = path
.parent()
.ok_or_else(|| Error::LogicError(format!("path {path:?} has no parent diretory")))?;
let cfg = DatabaseConfig::new(parent_path, flags);
let db_name = path
.file_name()
.ok_or_else(|| Error::LogicError(format!("path {path:?} has no last part")))?
.to_str()
.ok_or(Error::InvalidUtf8)?
.strip_suffix(".cblite2")
.ok_or_else(|| {
Error::LogicError(format!(
"path {path:?} should have last part with .cblite2 suffix"
))
})?;
Database::open_named(db_name, cfg)
}
#[inline]
pub fn transaction(&mut self) -> Result<Transaction> {
Transaction::new(self)
}
#[inline]
pub fn document_count(&self) -> u64 {
unsafe { c4db_getDocumentCount(self.inner.0.as_ptr()) }
}
#[inline]
pub fn get_existing(&self, doc_id: &str) -> Result<Document> {
self.internal_get(doc_id, true)
.map(|x| Document::new_internal(x, doc_id))
}
#[inline]
pub fn query(&self, query_json: &str) -> Result<Query> {
Query::new(self, QueryLanguage::kC4JSONQuery, query_json)
}
#[inline]
pub fn n1ql_query(&self, query: &str) -> Result<Query> {
Query::new(self, QueryLanguage::kC4N1QLQuery, query)
}
#[inline]
pub fn enumerate_all_docs(&self, flags: DocEnumeratorFlags) -> Result<DocEnumerator> {
DocEnumerator::enumerate_all_docs(self, flags)
}
pub fn register_observer<F>(&mut self, mut callback_f: F) -> Result<()>
where
F: FnMut() + Send + 'static,
{
let db_events = self.db_events.clone();
let obs = DatabaseObserver::new(self, move |obs| {
{
match db_events.lock() {
Ok(mut db_events) => {
db_events.insert(obs as usize);
}
Err(err) => {
error!(
"register_observer::DatabaseObserver::lambda db_events lock failed: {}",
err
);
}
}
}
callback_f();
})?;
self.db_observers.push(obs);
Ok(())
}
#[inline]
pub fn clear_observers(&mut self) {
self.db_observers.clear();
}
#[inline]
pub fn observed_changes(&mut self) -> ObserverdChangesIter {
ObserverdChangesIter {
db: self,
obs_it: None,
}
}
#[cfg(feature = "use-couchbase-lite-websocket")]
#[inline]
pub fn init_socket_impl() {
crate::replicator::init_builtin_socket_impl();
}
#[cfg(feature = "use-tokio-websocket")]
#[inline]
pub fn init_socket_impl(handle: tokio::runtime::Handle) {
crate::replicator::init_tokio_socket_impl(handle);
}
pub fn shared_encoder_session(&mut self) -> Result<FlEncoderSession> {
let enc = unsafe { c4db_getSharedFleeceEncoder(self.inner.0.as_ptr()) };
NonNull::new(enc)
.ok_or_else(|| {
Error::LogicError("c4db_getSharedFleeceEncoder return null.into()".into())
})
.map(FlEncoderSession::new)
}
pub fn get_indexes(
&self,
) -> Result<impl FallibleStreamingIterator<Item = IndexInfo, Error = Error>> {
let mut c4err = c4error_init();
let enc_data = unsafe { c4db_getIndexesInfo(self.inner.0.as_ptr(), &mut c4err) };
if enc_data.buf.is_null() {
return Err(c4err.into());
}
let indexes_list = DbIndexesListIterator::new(enc_data)?;
Ok(indexes_list)
}
pub fn create_index(
&mut self,
index_name: &str,
expression_json: &str,
index_type: IndexType,
index_options: Option<IndexOptions>,
) -> Result<()> {
use IndexType::*;
let index_type = match index_type {
ValueIndex => C4IndexType::kC4ValueIndex,
FullTextIndex => C4IndexType::kC4FullTextIndex,
ArrayIndex => C4IndexType::kC4ArrayIndex,
PredictiveIndex => C4IndexType::kC4PredictiveIndex,
};
let mut c4err = c4error_init();
let result = if let Some(index_options) = index_options {
let language = CString::new(index_options.language)?;
let stop_words: Option<CString> = if let Some(stop_words) = index_options.stop_words {
let mut list = String::with_capacity(stop_words.len() * 5);
for word in stop_words {
if !list.is_empty() {
list.push(' ');
}
list.push_str(word);
}
Some(CString::new(list)?)
} else {
None
};
let opts = C4IndexOptions {
language: language.as_ptr(),
disableStemming: index_options.disable_stemming,
ignoreDiacritics: index_options.ignore_diacritics,
stopWords: stop_words.map_or(ptr::null(), |x| x.as_ptr()),
};
unsafe {
c4db_createIndex(
self.inner.0.as_ptr(),
index_name.into(),
expression_json.into(),
index_type,
&opts,
&mut c4err,
)
}
} else {
unsafe {
c4db_createIndex(
self.inner.0.as_ptr(),
index_name.into(),
expression_json.into(),
index_type,
ptr::null(),
&mut c4err,
)
}
};
if result {
Ok(())
} else {
Err(c4err.into())
}
}
#[inline]
pub fn name(&self) -> Result<&str> {
unsafe { c4db_getName(self.inner.0.as_ptr()) }
.try_into()
.map_err(|_| Error::InvalidUtf8)
}
pub(crate) fn do_internal_get(
&self,
doc_id: &str,
must_exists: bool,
content_level: C4DocContentLevel,
) -> Result<C4DocumentOwner> {
let mut c4err = c4error_init();
let c4doc = unsafe {
c4db_getDoc(
self.inner.0.as_ptr(),
doc_id.as_bytes().into(),
must_exists,
content_level,
&mut c4err,
)
};
NonNull::new(c4doc)
.ok_or_else(|| c4err.into())
.map(C4DocumentOwner)
}
pub(crate) fn do_internal_get_opt(
&self,
doc_id: &str,
must_exists: bool,
content_level: C4DocContentLevel,
) -> Result<Option<C4DocumentOwner>> {
match self.do_internal_get(doc_id, must_exists, content_level) {
Ok(x) => Ok(Some(x)),
Err(Error::C4Error(err))
if err.domain == C4ErrorDomain::LiteCoreDomain
&& err.code == C4ErrorCode::kC4ErrorNotFound.0 =>
{
Ok(None)
}
Err(err) => Err(err),
}
}
pub(crate) fn internal_get(&self, doc_id: &str, must_exists: bool) -> Result<C4DocumentOwner> {
self.do_internal_get(doc_id, must_exists, C4DocContentLevel::kDocGetCurrentRev)
}
}
static DB_LOG_HANDLER: Once = Once::new();