use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use tokio::sync::RwLock;
pub use rouchdb_core::adapter::Adapter;
pub use rouchdb_core::document::*;
pub use rouchdb_core::error::{Result, RouchError};
pub use rouchdb_core::merge::{is_deleted, winning_rev};
pub use rouchdb_adapter_http::HttpAdapter;
pub use rouchdb_adapter_http::auth::{AuthClient, Session, UserContext};
pub use rouchdb_adapter_memory::MemoryAdapter;
pub use rouchdb_adapter_redb::RedbAdapter;
pub use rouchdb_changes::{
ChangeReceiver, ChangeSender, ChangesEvent, ChangesFilter, ChangesHandle, ChangesStreamOptions,
LiveChangesStream, live_changes, live_changes_events,
};
pub use rouchdb_query::{
BuiltIndex, CreateIndexResponse, ExplainIndex, ExplainResponse, FindOptions, FindResponse,
IndexDefinition, IndexFields, IndexInfo, ReduceFn, SortField, StaleOption, ViewQueryOptions,
ViewResult, build_index, find, matches_selector, query_view,
};
pub use rouchdb_views::{DesignDocument, PersistentViewIndex, ViewDef, ViewEngine};
pub use rouchdb_replication::{
ReplicationEvent, ReplicationFilter, ReplicationHandle, ReplicationOptions, ReplicationResult,
replicate, replicate_live, replicate_with_events,
};
#[async_trait::async_trait]
pub trait Plugin: Send + Sync {
fn name(&self) -> &str;
async fn before_write(&self, _docs: &mut Vec<Document>) -> Result<()> {
Ok(())
}
async fn after_write(&self, _results: &[DocResult]) -> Result<()> {
Ok(())
}
async fn on_destroy(&self) -> Result<()> {
Ok(())
}
}
pub struct Database {
adapter: Arc<dyn Adapter>,
indexes: Arc<RwLock<HashMap<String, BuiltIndex>>>,
plugins: Vec<Arc<dyn Plugin>>,
}
impl Database {
pub fn memory(name: &str) -> Self {
Self {
adapter: Arc::new(MemoryAdapter::new(name)),
indexes: Arc::new(RwLock::new(HashMap::new())),
plugins: Vec::new(),
}
}
pub fn open(path: impl AsRef<Path>, name: &str) -> Result<Self> {
let adapter = RedbAdapter::open(path, name)?;
Ok(Self {
adapter: Arc::new(adapter),
indexes: Arc::new(RwLock::new(HashMap::new())),
plugins: Vec::new(),
})
}
pub fn http(url: &str) -> Self {
Self {
adapter: Arc::new(HttpAdapter::new(url)),
indexes: Arc::new(RwLock::new(HashMap::new())),
plugins: Vec::new(),
}
}
pub fn http_with_auth(url: &str, auth: &AuthClient) -> Self {
Self {
adapter: Arc::new(HttpAdapter::with_auth_client(url, auth)),
indexes: Arc::new(RwLock::new(HashMap::new())),
plugins: Vec::new(),
}
}
pub fn from_adapter(adapter: Arc<dyn Adapter>) -> Self {
Self {
adapter,
indexes: Arc::new(RwLock::new(HashMap::new())),
plugins: Vec::new(),
}
}
pub fn with_plugin(mut self, plugin: Arc<dyn Plugin>) -> Self {
self.plugins.push(plugin);
self
}
pub fn adapter(&self) -> &dyn Adapter {
self.adapter.as_ref()
}
pub async fn info(&self) -> Result<DbInfo> {
self.adapter.info().await
}
pub async fn get(&self, id: &str) -> Result<Document> {
self.adapter.get(id, GetOptions::default()).await
}
pub async fn get_with_opts(&self, id: &str, opts: GetOptions) -> Result<Document> {
self.adapter.get(id, opts).await
}
pub async fn post(&self, data: serde_json::Value) -> Result<DocResult> {
let id = uuid::Uuid::new_v4().to_string();
self.put(&id, data).await
}
pub async fn put(&self, id: &str, data: serde_json::Value) -> Result<DocResult> {
if id.is_empty() {
return Err(RouchError::MissingId);
}
let doc = Document {
id: id.to_string(),
rev: None,
deleted: false,
data,
attachments: HashMap::new(),
};
let results = self.bulk_docs(vec![doc], BulkDocsOptions::new()).await?;
first_result(results)
}
pub async fn update(&self, id: &str, rev: &str, data: serde_json::Value) -> Result<DocResult> {
if id.is_empty() {
return Err(RouchError::MissingId);
}
let revision: Revision = rev.parse()?;
let doc = Document {
id: id.to_string(),
rev: Some(revision),
deleted: false,
data,
attachments: HashMap::new(),
};
let results = self.bulk_docs(vec![doc], BulkDocsOptions::new()).await?;
first_result(results)
}
pub async fn remove(&self, id: &str, rev: &str) -> Result<DocResult> {
if id.is_empty() {
return Err(RouchError::MissingId);
}
let revision: Revision = rev.parse()?;
let doc = Document {
id: id.to_string(),
rev: Some(revision),
deleted: true,
data: serde_json::json!({}),
attachments: HashMap::new(),
};
let results = self.bulk_docs(vec![doc], BulkDocsOptions::new()).await?;
first_result(results)
}
pub async fn bulk_docs(
&self,
mut docs: Vec<Document>,
opts: BulkDocsOptions,
) -> Result<Vec<DocResult>> {
for plugin in &self.plugins {
plugin.before_write(&mut docs).await?;
}
let results = self.adapter.bulk_docs(docs, opts).await?;
for plugin in &self.plugins {
plugin.after_write(&results).await?;
}
Ok(results)
}
pub async fn all_docs(&self, opts: AllDocsOptions) -> Result<AllDocsResponse> {
self.adapter.all_docs(opts).await
}
pub async fn changes(&self, opts: ChangesOptions) -> Result<ChangesResponse> {
if let Some(ref selector) = opts.selector {
let selector = selector.clone();
let user_wants_docs = opts.include_docs;
let mut fetch_opts = opts;
fetch_opts.include_docs = true;
fetch_opts.selector = None; let mut response = self.adapter.changes(fetch_opts).await?;
response.results.retain(|event| {
event
.doc
.as_ref()
.is_some_and(|d| matches_selector(d, &selector))
});
if !user_wants_docs {
for event in &mut response.results {
event.doc = None;
}
}
Ok(response)
} else {
self.adapter.changes(opts).await
}
}
pub fn live_changes(
&self,
opts: ChangesStreamOptions,
) -> (tokio::sync::mpsc::Receiver<ChangeEvent>, ChangesHandle) {
if let Some(selector) = opts.selector.clone() {
let user_wants_docs = opts.include_docs;
let existing = opts.filter.clone();
let filter: ChangesFilter = Arc::new(move |e: &ChangeEvent| {
if let Some(ref ex) = existing
&& !ex(e)
{
return false;
}
e.doc
.as_ref()
.is_some_and(|d| matches_selector(d, &selector))
});
let inner_opts = ChangesStreamOptions {
include_docs: true, selector: None,
filter: Some(filter),
..opts
};
let (inner_rx, handle) = live_changes(self.adapter.clone(), inner_opts);
if user_wants_docs {
return (inner_rx, handle);
}
let (tx, rx) = tokio::sync::mpsc::channel(64);
tokio::spawn(async move {
let mut inner_rx = inner_rx;
while let Some(mut event) = inner_rx.recv().await {
event.doc = None;
if tx.send(event).await.is_err() {
break;
}
}
});
(rx, handle)
} else {
live_changes(self.adapter.clone(), opts)
}
}
pub fn live_changes_events(
&self,
opts: ChangesStreamOptions,
) -> (tokio::sync::mpsc::Receiver<ChangesEvent>, ChangesHandle) {
if let Some(selector) = opts.selector.clone() {
let user_wants_docs = opts.include_docs;
let existing = opts.filter.clone();
let filter: ChangesFilter = Arc::new(move |e: &ChangeEvent| {
if let Some(ref ex) = existing
&& !ex(e)
{
return false;
}
e.doc
.as_ref()
.is_some_and(|d| matches_selector(d, &selector))
});
let inner_opts = ChangesStreamOptions {
include_docs: true,
selector: None,
filter: Some(filter),
..opts
};
let (inner_rx, handle) = live_changes_events(self.adapter.clone(), inner_opts);
if user_wants_docs {
return (inner_rx, handle);
}
let (tx, rx) = tokio::sync::mpsc::channel(64);
tokio::spawn(async move {
let mut inner_rx = inner_rx;
while let Some(event) = inner_rx.recv().await {
let forward = match event {
ChangesEvent::Change(mut ce) => {
ce.doc = None;
ChangesEvent::Change(ce)
}
other => other,
};
if tx.send(forward).await.is_err() {
break;
}
}
});
(rx, handle)
} else {
live_changes_events(self.adapter.clone(), opts)
}
}
pub async fn put_attachment(
&self,
doc_id: &str,
att_id: &str,
rev: &str,
data: Vec<u8>,
content_type: &str,
) -> Result<DocResult> {
self.adapter
.put_attachment(doc_id, att_id, rev, data, content_type)
.await
}
pub async fn get_attachment(&self, doc_id: &str, att_id: &str) -> Result<Vec<u8>> {
self.adapter
.get_attachment(doc_id, att_id, GetAttachmentOptions::default())
.await
}
pub async fn get_attachment_with_opts(
&self,
doc_id: &str,
att_id: &str,
opts: GetAttachmentOptions,
) -> Result<Vec<u8>> {
self.adapter.get_attachment(doc_id, att_id, opts).await
}
pub async fn remove_attachment(
&self,
doc_id: &str,
att_id: &str,
rev: &str,
) -> Result<DocResult> {
self.adapter.remove_attachment(doc_id, att_id, rev).await
}
pub async fn find(&self, opts: FindOptions) -> Result<FindResponse> {
let mut indexes = self.indexes.write().await;
let usable_name = indexes
.iter()
.find(|(_, idx)| {
if idx.def.fields.is_empty() {
return false;
}
let (first_field, _) = idx.def.fields[0].field_and_direction();
opts.selector.get(first_field).is_some()
})
.map(|(name, _)| name.clone());
if let Some(name) = usable_name {
let def = indexes[&name].def.clone();
let rebuilt = build_index(self.adapter.as_ref(), &def).await?;
indexes.insert(name.clone(), rebuilt);
let candidate_ids = indexes[&name].find_matching(&opts.selector);
drop(indexes);
let all = self
.adapter
.all_docs(AllDocsOptions {
include_docs: true,
keys: Some(candidate_ids),
..AllDocsOptions::new()
})
.await?;
let mut matched: Vec<serde_json::Value> = Vec::new();
for row in &all.rows {
if let Some(ref doc_json) = row.doc
&& matches_selector(doc_json, &opts.selector)
{
matched.push(doc_json.clone());
}
}
if let Some(ref sort_fields) = opts.sort {
matched.sort_by(|a, b| {
use rouchdb_core::collation::collate;
use rouchdb_query::SortDirection;
for sf in sort_fields {
let (field, direction) = sf.field_and_direction();
let va = rouchdb_query::get_nested_field(a, field)
.unwrap_or(&serde_json::Value::Null);
let vb = rouchdb_query::get_nested_field(b, field)
.unwrap_or(&serde_json::Value::Null);
let cmp = collate(va, vb);
let cmp = if direction == SortDirection::Desc {
cmp.reverse()
} else {
cmp
};
if cmp != std::cmp::Ordering::Equal {
return cmp;
}
}
std::cmp::Ordering::Equal
});
}
if let Some(skip) = opts.skip {
matched = matched.into_iter().skip(skip as usize).collect();
}
if let Some(limit) = opts.limit {
matched.truncate(limit as usize);
}
if let Some(ref fields) = opts.fields {
matched = matched
.into_iter()
.map(|doc| {
let mut result = serde_json::Map::new();
if let serde_json::Value::Object(map) = &doc {
for field in fields {
if let Some(val) = map.get(field) {
result.insert(field.clone(), val.clone());
}
}
if let Some(id) = map.get("_id") {
result
.entry("_id".to_string())
.or_insert_with(|| id.clone());
}
}
serde_json::Value::Object(result)
})
.collect();
}
Ok(FindResponse { docs: matched })
} else {
drop(indexes);
find(self.adapter.as_ref(), opts).await
}
}
pub async fn create_index(&self, def: IndexDefinition) -> Result<CreateIndexResponse> {
let name = if def.name.is_empty() {
let field_names: Vec<&str> = def
.fields
.iter()
.map(|sf| {
let (f, _) = sf.field_and_direction();
f
})
.collect();
format!("idx-{}", field_names.join("-"))
} else {
def.name.clone()
};
let mut indexes = self.indexes.write().await;
if indexes.contains_key(&name) {
return Ok(CreateIndexResponse {
result: "exists".to_string(),
name,
});
}
let index_def = IndexDefinition {
name: name.clone(),
fields: def.fields,
ddoc: def.ddoc,
};
let built = build_index(self.adapter.as_ref(), &index_def).await?;
indexes.insert(name.clone(), built);
Ok(CreateIndexResponse {
result: "created".to_string(),
name,
})
}
pub async fn get_indexes(&self) -> Vec<IndexInfo> {
let indexes = self.indexes.read().await;
let mut result: Vec<IndexInfo> = indexes
.values()
.map(|idx| IndexInfo {
name: idx.def.name.clone(),
ddoc: idx.def.ddoc.clone(),
def: IndexFields {
fields: idx.def.fields.clone(),
},
})
.collect();
result.sort_by(|a, b| a.name.cmp(&b.name));
result
}
pub async fn explain(&self, opts: FindOptions) -> ExplainResponse {
let indexes = self.indexes.read().await;
let usable = indexes.values().find(|idx| {
if idx.def.fields.is_empty() {
return false;
}
let (first_field, _) = idx.def.fields[0].field_and_direction();
opts.selector.get(first_field).is_some()
});
let dbname = self.info().await.map(|i| i.db_name).unwrap_or_default();
if let Some(index) = usable {
ExplainResponse {
dbname,
index: ExplainIndex {
ddoc: index.def.ddoc.clone(),
name: index.def.name.clone(),
index_type: "json".into(),
def: IndexFields {
fields: index.def.fields.clone(),
},
},
selector: opts.selector,
fields: opts.fields,
}
} else {
ExplainResponse {
dbname,
index: ExplainIndex {
ddoc: None,
name: "_all_docs".into(),
index_type: "special".into(),
def: IndexFields { fields: vec![] },
},
selector: opts.selector,
fields: opts.fields,
}
}
}
pub async fn delete_index(&self, name: &str) -> Result<()> {
let mut indexes = self.indexes.write().await;
indexes
.remove(name)
.ok_or_else(|| RouchError::NotFound(format!("index {}", name)))?;
Ok(())
}
pub async fn put_design(&self, ddoc: DesignDocument) -> Result<DocResult> {
let json = ddoc.to_json();
let doc = Document::from_json(json)?;
let mut results = self.bulk_docs(vec![doc], BulkDocsOptions::new()).await?;
Ok(results.remove(0))
}
pub async fn get_design(&self, name: &str) -> Result<DesignDocument> {
let id = if name.starts_with("_design/") {
name.to_string()
} else {
format!("_design/{}", name)
};
let doc = self.adapter.get(&id, GetOptions::default()).await?;
DesignDocument::from_json(doc.to_json())
}
pub async fn delete_design(&self, name: &str, rev: &str) -> Result<DocResult> {
let id = if name.starts_with("_design/") {
name.to_string()
} else {
format!("_design/{}", name)
};
self.remove(&id, rev).await
}
pub async fn view_cleanup(&self) -> Result<()> {
Ok(())
}
pub async fn replicate_to(&self, target: &Database) -> Result<ReplicationResult> {
replicate(
self.adapter.as_ref(),
target.adapter.as_ref(),
ReplicationOptions::default(),
)
.await
}
pub async fn replicate_from(&self, source: &Database) -> Result<ReplicationResult> {
replicate(
source.adapter.as_ref(),
self.adapter.as_ref(),
ReplicationOptions::default(),
)
.await
}
pub async fn replicate_to_with_opts(
&self,
target: &Database,
opts: ReplicationOptions,
) -> Result<ReplicationResult> {
replicate(self.adapter.as_ref(), target.adapter.as_ref(), opts).await
}
pub async fn replicate_to_with_events(
&self,
target: &Database,
opts: ReplicationOptions,
) -> Result<(
ReplicationResult,
tokio::sync::mpsc::Receiver<ReplicationEvent>,
)> {
let (tx, rx) = tokio::sync::mpsc::channel(64);
let result =
replicate_with_events(self.adapter.as_ref(), target.adapter.as_ref(), opts, tx).await?;
Ok((result, rx))
}
pub fn replicate_to_live(
&self,
target: &Database,
opts: ReplicationOptions,
) -> (
tokio::sync::mpsc::Receiver<ReplicationEvent>,
ReplicationHandle,
) {
replicate_live(self.adapter.clone(), target.adapter.clone(), opts)
}
pub async fn sync(&self, other: &Database) -> Result<(ReplicationResult, ReplicationResult)> {
let push = self.replicate_to(other).await?;
let pull = self.replicate_from(other).await?;
Ok((push, pull))
}
pub async fn close(&self) -> Result<()> {
self.adapter.close().await
}
pub async fn compact(&self) -> Result<()> {
self.adapter.compact().await
}
pub async fn destroy(&self) -> Result<()> {
for plugin in &self.plugins {
plugin.on_destroy().await?;
}
self.adapter.destroy().await
}
pub async fn purge(&self, doc_id: &str, revs: Vec<String>) -> Result<PurgeResponse> {
let mut req = HashMap::new();
req.insert(doc_id.to_string(), revs);
self.adapter.purge(req).await
}
pub async fn get_security(&self) -> Result<SecurityDocument> {
self.adapter.get_security().await
}
pub async fn put_security(&self, doc: SecurityDocument) -> Result<()> {
self.adapter.put_security(doc).await
}
}
pub struct Partition<'a> {
db: &'a Database,
name: String,
}
impl Database {
pub fn partition(&self, name: &str) -> Partition<'_> {
Partition {
db: self,
name: name.to_string(),
}
}
}
fn first_result(results: Vec<DocResult>) -> Result<DocResult> {
results.into_iter().next().ok_or_else(|| {
RouchError::DatabaseError("bulk_docs returned no result for the written document".into())
})
}
fn regex_escape(s: &str) -> String {
let mut escaped = String::with_capacity(s.len() * 2);
for c in s.chars() {
if matches!(
c,
'.' | '+' | '*' | '?' | '(' | ')' | '[' | ']' | '{' | '}' | '\\' | '|' | '^' | '$'
) {
escaped.push('\\');
}
escaped.push(c);
}
escaped
}
impl Partition<'_> {
pub async fn all_docs(&self, mut opts: AllDocsOptions) -> Result<AllDocsResponse> {
let prefix = format!("{}:", self.name);
let end = format!("{}:\u{ffff}", self.name);
if opts.start_key.is_none() {
opts.start_key = Some(prefix);
}
if opts.end_key.is_none() {
opts.end_key = Some(end);
}
self.db.all_docs(opts).await
}
pub async fn find(&self, mut opts: FindOptions) -> Result<FindResponse> {
let escaped = regex_escape(&self.name);
let partition_filter = serde_json::json!({"_id": {"$regex": format!("^{}:", escaped)}});
opts.selector = serde_json::json!({"$and": [opts.selector, partition_filter]});
self.db.find(opts).await
}
pub async fn get(&self, id: &str) -> Result<Document> {
let full_id = if id.starts_with(&format!("{}:", self.name)) {
id.to_string()
} else {
format!("{}:{}", self.name, id)
};
self.db.get(&full_id).await
}
pub async fn put(&self, id: &str, data: serde_json::Value) -> Result<DocResult> {
let full_id = if id.starts_with(&format!("{}:", self.name)) {
id.to_string()
} else {
format!("{}:{}", self.name, id)
};
self.db.put(&full_id, data).await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn database_put_and_get() {
let db = Database::memory("test");
let result = db
.put("doc1", serde_json::json!({"name": "Alice"}))
.await
.unwrap();
assert!(result.ok);
assert_eq!(result.id, "doc1");
let doc = db.get("doc1").await.unwrap();
assert_eq!(doc.data["name"], "Alice");
}
#[tokio::test]
async fn database_update() {
let db = Database::memory("test");
let r1 = db.put("doc1", serde_json::json!({"v": 1})).await.unwrap();
let rev = r1.rev.unwrap();
let r2 = db
.update("doc1", &rev, serde_json::json!({"v": 2}))
.await
.unwrap();
assert!(r2.ok);
let doc = db.get("doc1").await.unwrap();
assert_eq!(doc.data["v"], 2);
}
#[tokio::test]
async fn database_remove() {
let db = Database::memory("test");
let r1 = db.put("doc1", serde_json::json!({"v": 1})).await.unwrap();
let rev = r1.rev.unwrap();
let r2 = db.remove("doc1", &rev).await.unwrap();
assert!(r2.ok);
let err = db.get("doc1").await;
assert!(err.is_err());
}
#[tokio::test]
async fn database_find() {
let db = Database::memory("test");
db.put("alice", serde_json::json!({"name": "Alice", "age": 30}))
.await
.unwrap();
db.put("bob", serde_json::json!({"name": "Bob", "age": 25}))
.await
.unwrap();
let result = db
.find(FindOptions {
selector: serde_json::json!({"age": {"$gte": 28}}),
..Default::default()
})
.await
.unwrap();
assert_eq!(result.docs.len(), 1);
assert_eq!(result.docs[0]["name"], "Alice");
}
#[tokio::test]
async fn database_sync() {
let local = Database::memory("local");
let remote = Database::memory("remote");
local
.put("doc1", serde_json::json!({"from": "local"}))
.await
.unwrap();
remote
.put("doc2", serde_json::json!({"from": "remote"}))
.await
.unwrap();
let (push, pull) = local.sync(&remote).await.unwrap();
assert!(push.ok);
assert!(pull.ok);
let local_info = local.info().await.unwrap();
let remote_info = remote.info().await.unwrap();
assert_eq!(local_info.doc_count, 2);
assert_eq!(remote_info.doc_count, 2);
}
#[tokio::test]
async fn database_info() {
let db = Database::memory("test");
db.put("a", serde_json::json!({})).await.unwrap();
db.put("b", serde_json::json!({})).await.unwrap();
let info = db.info().await.unwrap();
assert_eq!(info.doc_count, 2);
assert_eq!(info.db_name, "test");
}
#[tokio::test]
async fn database_open_redb() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.redb");
let db = Database::open(&path, "test_redb").unwrap();
db.put("doc1", serde_json::json!({"x": 1})).await.unwrap();
let doc = db.get("doc1").await.unwrap();
assert_eq!(doc.data["x"], 1);
}
#[tokio::test]
async fn database_from_adapter_and_accessor() {
let adapter = Arc::new(MemoryAdapter::new("custom"));
let db = Database::from_adapter(adapter);
let _adapter_ref = db.adapter();
db.put("doc1", serde_json::json!({})).await.unwrap();
let info = db.info().await.unwrap();
assert_eq!(info.doc_count, 1);
}
#[tokio::test]
async fn database_get_with_opts() {
let db = Database::memory("test");
let r1 = db.put("doc1", serde_json::json!({"v": 1})).await.unwrap();
let rev = r1.rev.unwrap();
let doc = db
.get_with_opts(
"doc1",
GetOptions {
rev: Some(rev),
..Default::default()
},
)
.await
.unwrap();
assert_eq!(doc.data["v"], 1);
}
#[tokio::test]
async fn database_bulk_docs() {
let db = Database::memory("test");
let docs = vec![
Document {
id: "a".into(),
rev: None,
deleted: false,
data: serde_json::json!({"x": 1}),
attachments: std::collections::HashMap::new(),
},
Document {
id: "b".into(),
rev: None,
deleted: false,
data: serde_json::json!({"x": 2}),
attachments: std::collections::HashMap::new(),
},
];
let results = db.bulk_docs(docs, BulkDocsOptions::new()).await.unwrap();
assert_eq!(results.len(), 2);
assert!(results[0].ok);
assert!(results[1].ok);
}
#[tokio::test]
async fn database_all_docs() {
let db = Database::memory("test");
db.put("a", serde_json::json!({})).await.unwrap();
db.put("b", serde_json::json!({})).await.unwrap();
let result = db.all_docs(AllDocsOptions::new()).await.unwrap();
assert_eq!(result.rows.len(), 2);
}
#[tokio::test]
async fn database_changes() {
let db = Database::memory("test");
db.put("a", serde_json::json!({})).await.unwrap();
db.put("b", serde_json::json!({})).await.unwrap();
let changes = db.changes(ChangesOptions::default()).await.unwrap();
assert_eq!(changes.results.len(), 2);
}
#[tokio::test]
async fn database_replicate_to_with_opts() {
let local = Database::memory("local");
let remote = Database::memory("remote");
local
.put("doc1", serde_json::json!({"v": 1}))
.await
.unwrap();
let result = local
.replicate_to_with_opts(
&remote,
ReplicationOptions {
batch_size: 1,
batches_limit: 10,
..Default::default()
},
)
.await
.unwrap();
assert!(result.ok);
let doc = remote.get("doc1").await.unwrap();
assert_eq!(doc.data["v"], 1);
}
#[tokio::test]
async fn database_post() {
let db = Database::memory("test");
let r1 = db.post(serde_json::json!({"name": "Alice"})).await.unwrap();
assert!(r1.ok);
assert!(!r1.id.is_empty());
let r2 = db.post(serde_json::json!({"name": "Bob"})).await.unwrap();
assert!(r2.ok);
assert_ne!(r1.id, r2.id);
let doc = db.get(&r1.id).await.unwrap();
assert_eq!(doc.data["name"], "Alice");
let info = db.info().await.unwrap();
assert_eq!(info.doc_count, 2);
}
#[tokio::test]
async fn database_remove_attachment() {
let db = Database::memory("test");
let r1 = db.put("doc1", serde_json::json!({"v": 1})).await.unwrap();
let rev = r1.rev.unwrap();
let r2 = db
.remove_attachment("doc1", "photo.jpg", &rev)
.await
.unwrap();
assert!(r2.ok);
assert!(r2.rev.is_some());
assert_ne!(r2.rev.as_deref().unwrap(), rev);
}
#[tokio::test]
async fn database_create_and_use_index() {
let db = Database::memory("test");
db.put("alice", serde_json::json!({"name": "Alice", "age": 30}))
.await
.unwrap();
db.put("bob", serde_json::json!({"name": "Bob", "age": 25}))
.await
.unwrap();
db.put("charlie", serde_json::json!({"name": "Charlie", "age": 35}))
.await
.unwrap();
let result = db
.create_index(IndexDefinition {
name: String::new(),
fields: vec![SortField::Simple("age".into())],
ddoc: None,
})
.await
.unwrap();
assert_eq!(result.result, "created");
assert_eq!(result.name, "idx-age");
let result = db
.create_index(IndexDefinition {
name: "idx-age".into(),
fields: vec![SortField::Simple("age".into())],
ddoc: None,
})
.await
.unwrap();
assert_eq!(result.result, "exists");
let found = db
.find(FindOptions {
selector: serde_json::json!({"age": {"$gte": 30}}),
..Default::default()
})
.await
.unwrap();
assert_eq!(found.docs.len(), 2);
let indexes = db.get_indexes().await;
assert_eq!(indexes.len(), 1);
assert_eq!(indexes[0].name, "idx-age");
db.delete_index("idx-age").await.unwrap();
assert!(db.delete_index("nonexistent").await.is_err());
let indexes = db.get_indexes().await;
assert!(indexes.is_empty());
}
#[tokio::test]
async fn database_replicate_with_events() {
let local = Database::memory("local");
let remote = Database::memory("remote");
local
.put("doc1", serde_json::json!({"v": 1}))
.await
.unwrap();
local
.put("doc2", serde_json::json!({"v": 2}))
.await
.unwrap();
let (result, mut rx) = local
.replicate_to_with_events(&remote, ReplicationOptions::default())
.await
.unwrap();
assert!(result.ok);
assert_eq!(result.docs_written, 2);
let mut events = Vec::new();
while let Ok(event) = rx.try_recv() {
events.push(event);
}
assert!(events.iter().any(|e| matches!(e, ReplicationEvent::Active)));
assert!(
events
.iter()
.any(|e| matches!(e, ReplicationEvent::Complete(_)))
);
}
#[tokio::test]
async fn database_live_replication() {
let local = Database::memory("local");
let remote = Database::memory("remote");
local
.put("doc1", serde_json::json!({"v": 1}))
.await
.unwrap();
let (mut rx, handle) = local.replicate_to_live(
&remote,
ReplicationOptions {
poll_interval: std::time::Duration::from_millis(50),
live: true,
..Default::default()
},
);
let mut got_complete = false;
let timeout = tokio::time::sleep(std::time::Duration::from_secs(2));
tokio::pin!(timeout);
loop {
tokio::select! {
event = rx.recv() => {
match event {
Some(ReplicationEvent::Complete(r)) => {
if r.docs_written > 0 {
got_complete = true;
break;
}
}
Some(ReplicationEvent::Paused) if remote.get("doc1").await.is_ok() => {
got_complete = true;
break;
}
None => break,
_ => {}
}
}
_ = &mut timeout => break,
}
}
handle.cancel();
assert!(got_complete || remote.get("doc1").await.is_ok());
}
#[tokio::test]
async fn database_changes_with_selector() {
let db = Database::memory("test");
db.put("alice", serde_json::json!({"type": "user", "age": 30}))
.await
.unwrap();
db.put(
"inv1",
serde_json::json!({"type": "invoice", "amount": 100}),
)
.await
.unwrap();
db.put("bob", serde_json::json!({"type": "user", "age": 25}))
.await
.unwrap();
let changes = db
.changes(ChangesOptions {
selector: Some(serde_json::json!({"type": "user"})),
..Default::default()
})
.await
.unwrap();
assert_eq!(changes.results.len(), 2);
assert!(
changes
.results
.iter()
.all(|c| c.id == "alice" || c.id == "bob")
);
assert!(changes.results[0].doc.is_none());
}
#[tokio::test]
async fn database_changes_with_selector_and_include_docs() {
let db = Database::memory("test");
db.put("a", serde_json::json!({"score": 10})).await.unwrap();
db.put("b", serde_json::json!({"score": 50})).await.unwrap();
db.put("c", serde_json::json!({"score": 90})).await.unwrap();
let changes = db
.changes(ChangesOptions {
selector: Some(serde_json::json!({"score": {"$gte": 50}})),
include_docs: true,
..Default::default()
})
.await
.unwrap();
assert_eq!(changes.results.len(), 2);
assert!(changes.results[0].doc.is_some());
}
#[tokio::test]
async fn database_live_changes_basic() {
let db = Database::memory("test");
db.put("a", serde_json::json!({"v": 1})).await.unwrap();
let (mut rx, handle) = db.live_changes(ChangesStreamOptions {
poll_interval: std::time::Duration::from_millis(50),
..Default::default()
});
let event = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv())
.await
.unwrap()
.unwrap();
assert_eq!(event.id, "a");
db.put("b", serde_json::json!({"v": 2})).await.unwrap();
let event = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv())
.await
.unwrap()
.unwrap();
assert_eq!(event.id, "b");
handle.cancel();
}
#[tokio::test]
async fn database_live_changes_with_selector() {
let db = Database::memory("test");
db.put(
"alice",
serde_json::json!({"type": "user", "name": "Alice"}),
)
.await
.unwrap();
db.put(
"inv1",
serde_json::json!({"type": "invoice", "amount": 100}),
)
.await
.unwrap();
db.put("bob", serde_json::json!({"type": "user", "name": "Bob"}))
.await
.unwrap();
let (mut rx, handle) = db.live_changes(ChangesStreamOptions {
selector: Some(serde_json::json!({"type": "user"})),
poll_interval: std::time::Duration::from_millis(50),
..Default::default()
});
let e1 = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv())
.await
.unwrap()
.unwrap();
assert!(e1.id == "alice" || e1.id == "bob");
assert!(e1.doc.is_none());
let e2 = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv())
.await
.unwrap()
.unwrap();
assert!(e2.id == "alice" || e2.id == "bob");
assert_ne!(e1.id, e2.id);
handle.cancel();
}
#[tokio::test]
async fn database_compact() {
let db = Database::memory("test");
db.compact().await.unwrap();
}
#[tokio::test]
async fn database_destroy() {
let db = Database::memory("test");
db.put("doc1", serde_json::json!({})).await.unwrap();
db.destroy().await.unwrap();
let info = db.info().await.unwrap();
assert_eq!(info.doc_count, 0);
}
struct DropAllPlugin;
#[async_trait::async_trait]
impl Plugin for DropAllPlugin {
fn name(&self) -> &str {
"drop-all"
}
async fn before_write(&self, docs: &mut Vec<Document>) -> Result<()> {
docs.clear(); Ok(())
}
}
#[tokio::test]
async fn put_returns_error_when_plugin_drops_document() {
let db = Database::memory("test").with_plugin(Arc::new(DropAllPlugin));
let result = db.put("doc1", serde_json::json!({"v": 1})).await;
assert!(result.is_err());
}
#[tokio::test]
async fn indexed_find_sorts_on_nested_field() {
let db = Database::memory("test");
db.put(
"a",
serde_json::json!({"category": "x", "address": {"city": "Zurich"}}),
)
.await
.unwrap();
db.put(
"b",
serde_json::json!({"category": "x", "address": {"city": "Amsterdam"}}),
)
.await
.unwrap();
db.put(
"c",
serde_json::json!({"category": "x", "address": {"city": "Madrid"}}),
)
.await
.unwrap();
db.create_index(IndexDefinition {
name: String::new(),
fields: vec![SortField::Simple("category".into())],
ddoc: None,
})
.await
.unwrap();
let result = db
.find(FindOptions {
selector: serde_json::json!({"category": "x"}),
sort: Some(vec![SortField::Simple("address.city".into())]),
..Default::default()
})
.await
.unwrap();
let cities: Vec<&str> = result
.docs
.iter()
.map(|d| d["address"]["city"].as_str().unwrap())
.collect();
assert_eq!(cities, vec!["Amsterdam", "Madrid", "Zurich"]);
}
}