use super::{
index_writer::DocumentIndexWriter, read_operations::ReadOperations, write_result::WriteResult,
};
use crate::{
collection::{
CollectionEventInfo, CollectionEventListener, CollectionEvents, Document, FindOptions, NitriteId, UpdateOptions
}, common::get_current_time_or_zero, errors::{ErrorKind, NitriteError, NitriteResult}, filter::Filter, get_current_time, store::{NitriteMap, NitriteMapProvider}, Key, NitriteEventBus, ProcessorChain, ProcessorProvider, Value, DOC_ID, DOC_MODIFIED, DOC_REVISION, DOC_SOURCE, REPLICATOR
};
use std::sync::Arc;
#[derive(Clone)]
pub(crate) struct WriteOperations {
inner: Arc<WriteOperationsInner>,
}
impl WriteOperations {
pub fn new(
document_index_writer: DocumentIndexWriter,
read_operations: ReadOperations,
event_bus: NitriteEventBus<CollectionEventInfo, CollectionEventListener>,
nitrite_map: NitriteMap,
processor_chain: ProcessorChain,
) -> Self {
let inner = WriteOperationsInner::new(
document_index_writer,
read_operations,
event_bus,
nitrite_map,
processor_chain,
);
Self {
inner: Arc::new(inner),
}
}
fn document_index_writer(&self) -> DocumentIndexWriter {
self.inner.document_index_writer.clone()
}
fn read_operations(&self) -> ReadOperations {
self.inner.read_operations.clone()
}
fn event_bus(&self) -> NitriteEventBus<CollectionEventInfo, CollectionEventListener> {
self.inner.event_bus.clone()
}
fn nitrite_map(&self) -> NitriteMap {
self.inner.nitrite_map.clone()
}
fn processor_chain(&self) -> ProcessorChain {
self.inner.processor_chain.clone()
}
fn with_atomic<T, F>(&self, op: F) -> NitriteResult<T>
where
F: FnOnce() -> NitriteResult<T>,
{
self.inner.nitrite_map.get_store()?.with_atomic(op)
}
pub fn insert(&self, document: Document) -> NitriteResult<WriteResult> {
self.with_atomic(move || self.inner.insert(document))
}
pub fn insert_batch(&self, documents: Vec<Document>) -> NitriteResult<WriteResult> {
self.with_atomic(move || self.inner.insert_batch(documents))
}
pub fn update(
&self,
filter: Filter,
update: &Document,
update_options: &UpdateOptions,
) -> NitriteResult<WriteResult> {
self.with_atomic(move || self.inner.update(filter, update, update_options))
}
pub fn update_by_id(
&self,
id: &NitriteId,
update: &Document,
insert_if_absent: bool,
) -> NitriteResult<WriteResult> {
self.with_atomic(move || self.inner.update_by_id(id, update, insert_if_absent))
}
pub fn remove(&self, filter: Filter, just_once: bool) -> NitriteResult<WriteResult> {
self.with_atomic(move || self.inner.remove(filter, just_once))
}
pub fn remove_document(&self, document: &Document) -> NitriteResult<WriteResult> {
self.with_atomic(|| self.inner.remove_document(document))
}
}
struct WriteOperationsInner {
document_index_writer: DocumentIndexWriter,
read_operations: ReadOperations,
event_bus: NitriteEventBus<CollectionEventInfo, CollectionEventListener>,
nitrite_map: NitriteMap,
processor_chain: ProcessorChain,
}
impl WriteOperationsInner {
fn new(
document_index_writer: DocumentIndexWriter,
read_operations: ReadOperations,
event_bus: NitriteEventBus<CollectionEventInfo, CollectionEventListener>,
nitrite_map: NitriteMap,
processor_chain: ProcessorChain,
) -> Self {
Self {
document_index_writer,
read_operations,
event_bus,
nitrite_map,
processor_chain,
}
}
pub fn insert(&self, document: Document) -> NitriteResult<WriteResult> {
self.insert_batch(vec![document])
}
pub fn insert_batch(&self, documents: Vec<Document>) -> NitriteResult<WriteResult> {
if documents.is_empty() {
return Ok(WriteResult::new(vec![]));
}
if documents.len() <= 10 {
return self.insert_batch_sequential(documents);
}
self.insert_batch_optimized(documents)
}
fn insert_batch_sequential(&self, documents: Vec<Document>) -> NitriteResult<WriteResult> {
let mut nitrite_ids = Vec::with_capacity(documents.len());
for document in documents {
let id = self.process_insert(document)?;
nitrite_ids.push(id);
}
Ok(WriteResult::new(nitrite_ids))
}
fn insert_batch_optimized(&self, documents: Vec<Document>) -> NitriteResult<WriteResult> {
let batch_size = documents.len();
let prepared: Vec<(NitriteId, Document, Document, String)> = documents
.into_iter()
.map(|doc| self.prepare_document_for_insert(doc))
.collect::<NitriteResult<Vec<_>>>()?;
let keys: Vec<_> = prepared.iter()
.map(|(id, _, _, _)| Value::NitriteId(*id))
.collect();
self.validate_no_duplicates(&keys)?;
let all_ids: Vec<NitriteId> = prepared.iter()
.map(|(id, _, _, _)| *id)
.collect();
let entries: Vec<(Key, Value)> = prepared.iter()
.map(|(id, processed_doc, _, _)| {
(Value::NitriteId(*id), Value::Document(processed_doc.clone()))
})
.collect();
self.nitrite_map.put_all(entries)
.map_err(|e| NitriteError::new(
&format!("Failed to batch insert documents: {}", e),
e.kind().clone(),
))?;
let mut nitrite_ids = Vec::with_capacity(batch_size);
let mut indexed_docs: Vec<Document> = Vec::with_capacity(batch_size);
for (id, mut processed_doc, original_doc, source) in prepared {
if let Err(e) = self.document_index_writer.write_index_entry(&mut processed_doc) {
self.rollback_batch_indexes(&indexed_docs);
self.rollback_batch_insert(&all_ids);
return Err(NitriteError::new(
&format!("Failed to write index entries during batch insert (unique constraint violation?): {}", e),
e.kind().clone(),
));
}
indexed_docs.push(processed_doc);
let value = Value::Document(original_doc);
let event = CollectionEventInfo::new(Some(value), CollectionEvents::Insert, source);
if let Err(e) = self.event_bus.publish(event) {
log::warn!("Failed to publish insert event for {}: {}", id, e);
}
nitrite_ids.push(id);
}
Ok(WriteResult::new(nitrite_ids))
}
fn prepare_document_for_insert(&self, document: Document) -> NitriteResult<(NitriteId, Document, Document, String)> {
let mut new_doc = document;
let nitrite_id = new_doc.id()
.map_err(|e| NitriteError::new(
&format!("Failed to retrieve document ID during insert: {}", e),
e.kind().clone(),
))?;
let source = new_doc.source()
.map_err(|e| NitriteError::new(
&format!("Failed to retrieve document source during insert: {}", e),
e.kind().clone(),
))?;
let time = get_current_time_or_zero();
if REPLICATOR.ne(&source) {
new_doc.remove(DOC_SOURCE)
.map_err(|e| NitriteError::new(
&format!("Failed to remove document source field during insert: {}", e),
e.kind().clone(),
))?;
new_doc.put(DOC_REVISION, Value::I32(1))
.map_err(|e| NitriteError::new(
&format!("Failed to set document revision during insert: {}", e),
e.kind().clone(),
))?;
new_doc.put(DOC_MODIFIED, Value::U128(time))
.map_err(|e| NitriteError::new(
&format!("Failed to set document modification time during insert: {}", e),
e.kind().clone(),
))?;
} else {
new_doc.remove(DOC_SOURCE)
.map_err(|e| NitriteError::new(
&format!("Failed to remove document source field during replication insert: {}", e),
e.kind().clone(),
))?;
}
let processed = self.processor_chain.process_before_write(new_doc.clone())
.map_err(|e| NitriteError::new(
&format!("Failed to process document before write during insert: {}", e),
e.kind().clone(),
))?;
Ok((nitrite_id, processed, new_doc, source))
}
fn validate_no_duplicates(&self, keys: &[Value]) -> NitriteResult<()> {
if keys.len() > 50 {
let num_threads = std::thread::available_parallelism()
.map(|p| p.get())
.unwrap_or(4)
.min(keys.len());
let chunk_size = keys.len().div_ceil(num_threads);
let chunks: Vec<_> = keys.chunks(chunk_size).collect();
let results: Vec<NitriteResult<Option<NitriteId>>> = std::thread::scope(|s| {
let handles: Vec<_> = chunks
.into_iter()
.map(|chunk| {
s.spawn(move || {
for key in chunk {
if self.nitrite_map.contains_key(key)? {
if let Value::NitriteId(id) = key {
return Ok(Some(*id));
}
}
}
Ok(None)
})
})
.collect();
handles.into_iter()
.map(|h| h.join().unwrap())
.collect()
});
for result in results {
if let Some(duplicate_id) = result? {
log::error!("Document already exists with id {}", duplicate_id);
return Err(NitriteError::new(
&format!("Document already exists with id {}", duplicate_id),
ErrorKind::UniqueConstraintViolation,
));
}
}
} else {
for key in keys {
if self.nitrite_map.contains_key(key)? {
if let Value::NitriteId(id) = key {
log::error!("Document already exists with id {}", id);
return Err(NitriteError::new(
&format!("Document already exists with id {}", id),
ErrorKind::UniqueConstraintViolation,
));
}
}
}
}
Ok(())
}
fn rollback_batch_insert(&self, ids: &[NitriteId]) {
for id in ids {
if let Err(e) = self.nitrite_map.remove(&Value::NitriteId(*id)) {
log::error!("Failed to rollback document {} during batch insert: {}", id, e);
}
}
}
fn rollback_batch_indexes(&self, indexed_docs: &[Document]) {
for doc in indexed_docs {
let mut doc_clone = doc.clone();
if let Err(e) = self.document_index_writer.remove_index_entry(&mut doc_clone) {
log::error!("Failed to rollback index entries during batch insert: {}", e);
}
}
}
fn process_insert(&self, document: Document) -> NitriteResult<NitriteId> {
let mut new_doc = document;
let nitrite_id = new_doc.id()
.map_err(|e| NitriteError::new(&format!("Failed to retrieve document ID during insert: {}", e), e.kind().clone()))?;
let source = new_doc.source()
.map_err(|e| NitriteError::new(&format!("Failed to retrieve document source during insert: {}", e), e.kind().clone()))?;
let time = get_current_time_or_zero();
if REPLICATOR.ne(&source) {
new_doc.remove(DOC_SOURCE)
.map_err(|e| NitriteError::new(&format!("Failed to remove document source field during insert: {}", e), e.kind().clone()))?;
new_doc.put(DOC_REVISION, Value::I32(1))
.map_err(|e| NitriteError::new(&format!("Failed to set document revision during insert: {}", e), e.kind().clone()))?;
new_doc.put(DOC_MODIFIED, Value::U128(time))
.map_err(|e| NitriteError::new(&format!("Failed to set document modification time during insert: {}", e), e.kind().clone()))?;
} else {
new_doc.remove(DOC_SOURCE)
.map_err(|e| NitriteError::new(&format!("Failed to remove document source field during replication insert: {}", e), e.kind().clone()))?;
}
let mut processed = self.processor_chain.process_before_write(new_doc.clone())
.map_err(|e| NitriteError::new(&format!("Failed to process document before write during insert: {}", e), e.kind().clone()))?;
let existing = self.nitrite_map.put_if_absent(
Value::NitriteId(nitrite_id),
Value::Document(processed.clone()),
).map_err(|e| NitriteError::new(&format!("Failed to store document in map during insert: {}", e), e.kind().clone()))?;
if existing.is_some() {
log::error!("Document already exists with id {}", nitrite_id.clone());
return Err(NitriteError::new(
&format!("Document already exists with id {}", nitrite_id.clone()),
ErrorKind::UniqueConstraintViolation,
));
} else {
let result = self.document_index_writer.write_index_entry(&mut processed);
if let Err(e) = result {
self.nitrite_map.remove(&Value::NitriteId(nitrite_id))
.map_err(|remove_err| NitriteError::new(&format!("Failed to rollback document storage after index write failure: {}", remove_err), remove_err.kind().clone()))?;
return Err(NitriteError::new(&format!("Failed to write index entries during insert: {}", e), e.kind().clone()));
}
}
let value = Value::Document(new_doc);
let event = CollectionEventInfo::new(Some(value), CollectionEvents::Insert, source);
self.event_bus.publish(event)
.map_err(|e| NitriteError::new(&format!("Failed to publish insert event: {}", e), e.kind().clone()))?;
Ok(nitrite_id)
}
pub fn update(
&self,
filter: Filter,
update: &Document,
update_options: &UpdateOptions,
) -> NitriteResult<WriteResult> {
let cursor = self.read_operations.find(filter, &FindOptions::new())?;
let mut nitrite_ids = Vec::new();
let mut document = update.clone();
document.remove(DOC_ID)?;
if REPLICATOR.ne(&document.source()?) {
document.remove(DOC_REVISION)?;
}
if document.is_empty() {
return Ok(WriteResult::new(nitrite_ids));
}
let mut count = 0usize;
let mut batch_size = 10;
let mut docs = Vec::with_capacity(batch_size);
for doc_result in cursor {
let doc = doc_result?;
count += 1;
docs.push(doc);
if count >= 1 && update_options.is_just_once() {
break;
}
if docs.len() >= batch_size {
self.process_update_batch(&document, &mut nitrite_ids, docs)?;
docs = Vec::with_capacity(batch_size);
if count > 100 {
batch_size = 50;
} else if count > 1000 {
batch_size = 200;
}
}
}
if !docs.is_empty() {
self.process_update_batch(&document, &mut nitrite_ids, docs)?;
}
if count == 0 && update_options.is_insert_if_absent() {
return self.insert(update.clone());
}
Ok(WriteResult::new(nitrite_ids))
}
fn process_update_batch(
&self,
update_doc: &Document,
nitrite_ids: &mut Vec<NitriteId>,
docs: Vec<Document>
) -> NitriteResult<()> {
if docs.len() <= 10 {
for doc in docs {
if let Some(id) = self.process_single_update(doc, update_doc)? {
nitrite_ids.push(id);
}
}
return Ok(());
}
self.process_update_batch_optimized(update_doc, nitrite_ids, docs)
}
fn process_update_batch_optimized(
&self,
update_doc: &Document,
nitrite_ids: &mut Vec<NitriteId>,
docs: Vec<Document>,
) -> NitriteResult<()> {
let source = update_doc.source()?;
let time = get_current_time_or_zero();
let mut prepared: Vec<(NitriteId, Document, Document, Document)> = Vec::with_capacity(docs.len());
for doc in docs {
let mut new_doc = doc.clone();
let old_doc = doc;
let nitrite_id = new_doc.id()?;
if REPLICATOR.ne(&source) {
new_doc.merge(update_doc)?;
let revision = new_doc.revision()?;
new_doc.put(DOC_REVISION, Value::I32(revision + 1))?;
new_doc.put(DOC_MODIFIED, Value::U128(time))?;
} else {
new_doc.merge(update_doc)?;
}
let processed = self.processor_chain.process_before_write(new_doc.clone())?;
prepared.push((nitrite_id, old_doc, new_doc, processed));
}
let entries: Vec<(Key, Value)> = prepared.iter()
.map(|(id, _, _, processed)| {
(Value::NitriteId(*id), Value::Document(processed.clone()))
})
.collect();
self.nitrite_map.put_all(entries)?;
let mut updated_indexes: Vec<(NitriteId, Document, Document)> = Vec::with_capacity(prepared.len());
for (id, mut old_doc, new_doc, mut processed) in prepared {
let result = self.document_index_writer.update_index_entry(
&mut old_doc,
&mut processed,
update_doc,
);
if let Err(e) = result {
self.rollback_batch_update(&updated_indexes, &id, &old_doc, update_doc)?;
return Err(e);
}
updated_indexes.push((id, old_doc, processed.clone()));
let value = Value::Document(new_doc);
let event = CollectionEventInfo::new(Some(value), CollectionEvents::Update, source.clone());
if let Err(e) = self.event_bus.publish(event) {
log::warn!("Failed to publish update event for {}: {}", id, e);
}
if update_doc.size() > 0 {
nitrite_ids.push(id);
}
}
Ok(())
}
fn rollback_batch_update(
&self,
updated_indexes: &[(NitriteId, Document, Document)],
failed_id: &NitriteId,
failed_old_doc: &Document,
update_doc: &Document,
) -> NitriteResult<()> {
self.nitrite_map.put(
Value::NitriteId(*failed_id),
Value::Document(failed_old_doc.clone()),
)?;
let mut restore_entries: Vec<(Key, Value)> = Vec::with_capacity(updated_indexes.len());
for (id, old_doc, processed) in updated_indexes {
restore_entries.push((
Value::NitriteId(*id),
Value::Document(old_doc.clone()),
));
let mut old_doc_clone = old_doc.clone();
let mut processed_clone = processed.clone();
if let Err(e) = self.document_index_writer.update_index_entry(
&mut processed_clone,
&mut old_doc_clone,
update_doc,
) {
log::error!("Failed to rollback index entry for {}: {}", id, e);
}
}
if !restore_entries.is_empty() {
if let Err(e) = self.nitrite_map.put_all(restore_entries) {
log::error!("Failed to batch restore documents during rollback: {}", e);
}
}
Ok(())
}
fn process_single_update(&self, doc: Document, update_doc: &Document) -> NitriteResult<Option<NitriteId>> {
let mut new_doc = doc.clone();
let mut old_doc = doc;
let source = update_doc.source()?;
let time = get_current_time_or_zero();
let nitrite_id = new_doc.id()?;
if REPLICATOR.ne(&source) {
new_doc.merge(update_doc)?;
let revision = new_doc.revision()?;
new_doc.put(DOC_REVISION, Value::I32(revision + 1))?;
new_doc.put(DOC_MODIFIED, Value::U128(time))?;
} else {
new_doc.merge(update_doc)?;
}
let mut processed = self.processor_chain.process_before_write(new_doc.clone())?;
self.nitrite_map.put(
Value::NitriteId(nitrite_id),
Value::Document(processed.clone()),
)?;
let result = self.document_index_writer.update_index_entry(
&mut old_doc,
&mut processed,
update_doc,
);
if let Err(e) = result {
self.nitrite_map.put(
Value::NitriteId(nitrite_id),
Value::Document(old_doc.clone()),
)?;
self.document_index_writer.update_index_entry(
&mut processed,
&mut old_doc,
update_doc,
)?;
return Err(e);
}
let value = Value::Document(new_doc.clone());
let event = CollectionEventInfo::new(Some(value), CollectionEvents::Update, source);
self.event_bus.publish(event)?;
if update_doc.size() > 0 {
Ok(Some(nitrite_id))
} else {
Ok(None)
}
}
pub fn update_by_id(
&self,
id: &NitriteId,
update: &Document,
insert_if_absent: bool,
) -> NitriteResult<WriteResult> {
let existing = self.nitrite_map.get(&Value::NitriteId(*id))?;
match existing {
Some(value) => {
let doc = match value.as_document() {
Some(d) => d.clone(),
None => {
log::error!("Expected Document value in collection store for ID {:?}", id);
return Err(NitriteError::new(
"Invalid value type in collection store",
ErrorKind::ValidationError,
));
}
};
let mut nitrite_ids = Vec::new();
if let Some(updated_id) = self.process_single_update(doc, update)? {
nitrite_ids.push(updated_id);
}
Ok(WriteResult::new(nitrite_ids))
}
None => {
if insert_if_absent {
let mut new_doc = update.clone();
new_doc.put(DOC_ID, Value::NitriteId(*id))?;
self.insert(new_doc)
} else {
Ok(WriteResult::new(vec![]))
}
}
}
}
pub fn remove(&self, filter: Filter, just_once: bool) -> NitriteResult<WriteResult> {
let cursor = self.read_operations.find(filter, &FindOptions::new())?;
let mut nitrite_ids = Vec::new();
for doc_result in cursor {
let doc = doc_result?;
let processed = self.processor_chain.process_before_write(doc.clone())?;
let event = self.remove_internal(processed.clone(), &mut nitrite_ids)?;
if let Some(event) = event {
self.event_bus.publish(event)?;
}
if just_once {
break;
}
}
Ok(WriteResult::new(nitrite_ids))
}
pub fn remove_document(&self, document: &Document) -> NitriteResult<WriteResult> {
let mut nitrite_ids = Vec::new();
let event = self.remove_internal(document.clone(), &mut nitrite_ids)?;
if let Some(event) = event {
event.set_originator(document.source()?);
self.event_bus.publish(event)?;
}
Ok(WriteResult::new(nitrite_ids))
}
fn remove_internal(
&self,
mut document: Document,
nitrite_ids: &mut Vec<NitriteId>,
) -> NitriteResult<Option<CollectionEventInfo>> {
let nitrite_id = document.id()?;
let document = self
.nitrite_map
.remove(&Key::NitriteId(nitrite_id))?;
let mut document = match document {
Some(Value::Document(doc)) => doc,
Some(other_value) => {
log::error!("Data corruption: Expected Document in collection store, found {:?}", other_value);
return Err(NitriteError::new(
"Expected Document value in collection store, found corrupted type",
ErrorKind::IndexingError,
));
}
None => return Ok(None),
};
let remove_at = get_current_time_or_zero();
self.document_index_writer
.remove_index_entry(&mut document)?;
nitrite_ids.push(nitrite_id);
let revision = document.revision()? + 1;
document.put(DOC_REVISION, Value::I32(revision))?;
document.put(DOC_MODIFIED, Value::U128(remove_at))?;
let value = Value::Document(document.clone());
let event = CollectionEventInfo::new(Some(value), CollectionEvents::Remove, document.source()?);
Ok(Some(event))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::collection::operation::find_optimizer::FindOptimizer;
use crate::collection::operation::index_operations::IndexOperations;
use crate::collection::operation::index_writer::DocumentIndexWriter;
use crate::collection::operation::read_operations::ReadOperations;
use crate::collection::{
Document, UpdateOptions,
};
use crate::filter::{all, field};
use crate::nitrite_config::NitriteConfig;
use crate::store::{NitriteMapProvider, NitriteStoreProvider};
use crate::{
NitriteEventBus, ProcessorChain, Value
,
};
fn setup_write_operations() -> WriteOperations {
let collection_name = "test_collection".to_string();
let nitrite_config = NitriteConfig::default();
nitrite_config
.auto_configure()
.expect("Failed to auto configure");
nitrite_config.initialize().expect("Failed to initialize");
let store = nitrite_config.nitrite_store().expect("Failed to get store");
let nitrite_map = store
.open_map(&collection_name.clone())
.expect("Failed to open map");
let event_bus = NitriteEventBus::new();
let find_optimizer = FindOptimizer::new();
let index_operations = IndexOperations::new(
collection_name.clone(),
nitrite_config.clone(),
nitrite_map.clone(),
find_optimizer.clone(),
event_bus.clone(),
)
.unwrap();
let document_index_writer =
DocumentIndexWriter::new(nitrite_config.clone(), index_operations.clone());
let find_optimizer = FindOptimizer::new();
let processor_chain = ProcessorChain::new();
let read_operations = ReadOperations::new(
collection_name,
index_operations,
nitrite_config.clone(),
nitrite_map.clone(),
find_optimizer,
processor_chain.clone(),
);
WriteOperations::new(
document_index_writer,
read_operations,
event_bus,
nitrite_map,
processor_chain,
)
}
#[test]
fn test_insert() {
let write_operations = setup_write_operations();
let document = Document::new();
let result = write_operations.insert(document);
assert!(result.is_ok());
}
#[test]
fn test_update() {
let write_operations = setup_write_operations();
let filter = all();
let document = Document::new();
let update_options = UpdateOptions::default();
let result = write_operations.update(filter, &document, &update_options);
assert!(result.is_ok());
}
#[test]
fn test_remove() {
let write_operations = setup_write_operations();
let filter = all();
let result = write_operations.remove(filter, false);
assert!(result.is_ok());
}
#[test]
fn test_remove_document() {
let write_operations = setup_write_operations();
let document = Document::new();
let result = write_operations.remove_document(&document);
assert!(result.is_ok());
}
#[test]
fn test_insert_duplicate_document() {
let write_operations = setup_write_operations();
let mut document = Document::new();
let _ = document.id();
let _ = write_operations.insert(document.clone());
let result = write_operations.insert(document);
assert!(result.is_err());
}
#[test]
fn test_update_nonexistent_document() {
let write_operations = setup_write_operations();
let filter = all();
let document = Document::new();
let update_options = UpdateOptions::default();
let result = write_operations.update(filter, &document, &update_options);
assert!(result.is_ok());
}
#[test]
fn test_remove_nonexistent_document() {
let write_operations = setup_write_operations();
let filter = all();
let result = write_operations.remove(filter, false);
assert!(result.is_ok());
}
#[test]
fn test_remove_internal_with_valid_document() {
let write_operations = setup_write_operations();
let mut document = Document::new();
let _ = document.id();
let insert_result = write_operations.insert(document.clone());
assert!(insert_result.is_ok());
let remove_result = write_operations.remove_document(&document);
assert!(remove_result.is_ok());
}
#[test]
fn test_remove_internal_handles_non_existent() {
let write_operations = setup_write_operations();
let document = Document::new();
let result = write_operations.remove_document(&document);
assert!(result.is_ok());
}
#[test]
fn test_remove_document_preserves_document_metadata() {
let write_operations = setup_write_operations();
let mut document = Document::new();
let _ = document.id();
let insert_result = write_operations.insert(document.clone());
assert!(insert_result.is_ok());
let remove_result = write_operations.remove_document(&document);
assert!(remove_result.is_ok());
let write_result = remove_result.unwrap();
let affected_ids = write_result.affected_nitrite_ids();
assert!(!affected_ids.is_empty());
}
#[test]
fn test_process_update_batch_small_batch() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
let mut update_doc = Document::new();
update_doc.put("field", Value::from("updated")).unwrap();
let mut docs = Vec::new();
for _ in 0..10 {
let mut doc = Document::new();
doc.put("field", Value::from("original")).unwrap();
docs.push(doc);
}
let mut ids = Vec::new();
let result = inner.process_update_batch(&update_doc, &mut ids, docs);
assert!(result.is_ok());
}
#[test]
fn test_process_update_batch_vec_extend_optimization() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
let mut update_doc = Document::new();
update_doc.put("field", Value::from("updated")).unwrap();
let mut docs = Vec::new();
for i in 0..5 {
let mut doc = Document::new();
doc.put("id", Value::from(i)).unwrap();
doc.put("field", Value::from("original")).unwrap();
docs.push(doc);
}
let mut ids = Vec::with_capacity(5);
let initial_capacity = ids.capacity();
let result = inner.process_update_batch(&update_doc, &mut ids, docs);
assert!(result.is_ok());
assert!(ids.capacity() >= initial_capacity);
}
#[test]
fn test_remove_operation_optimized_no_redundant_checks() {
let write_operations = setup_write_operations();
let mut doc = Document::new();
let _ = doc.id();
let insert_result = write_operations.insert(doc.clone());
assert!(insert_result.is_ok());
let filter = all();
let result = write_operations.remove(filter, true);
assert!(result.is_ok());
let write_result = result.unwrap();
let ids = write_result.affected_nitrite_ids();
assert!(!ids.is_empty());
}
#[test]
fn test_remove_operation_multiple_documents() {
let write_operations = setup_write_operations();
for i in 0..5 {
let mut doc = Document::new();
doc.put("index", Value::from(i)).unwrap();
let result = write_operations.insert(doc);
assert!(result.is_ok());
}
let filter = all();
let result = write_operations.remove(filter, false);
assert!(result.is_ok());
let write_result = result.unwrap();
let ids = write_result.affected_nitrite_ids();
assert_eq!(ids.len(), 5);
}
#[test]
fn test_insert_batch_sequential_vs_parallel() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
let mut small_docs = Vec::new();
for i in 0..10 {
let mut doc = Document::new();
doc.put("batch", Value::from("small")).unwrap();
doc.put("index", Value::from(i)).unwrap();
small_docs.push(doc);
}
let result = inner.insert_batch(small_docs);
assert!(result.is_ok());
let write_result = result.unwrap();
assert_eq!(write_result.affected_nitrite_ids().len(), 10);
}
#[test]
fn test_insert_batch_large_parallel() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
let mut large_docs = Vec::new();
for i in 0..50 {
let mut doc = Document::new();
doc.put("batch", Value::from("large")).unwrap();
doc.put("index", Value::from(i)).unwrap();
large_docs.push(doc);
}
let result = inner.insert_batch(large_docs);
assert!(result.is_ok());
let write_result = result.unwrap();
assert_eq!(write_result.affected_nitrite_ids().len(), 50);
}
#[test]
fn test_insert_batch_empty() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
let result = inner.insert_batch(vec![]);
assert!(result.is_ok());
assert_eq!(result.unwrap().affected_nitrite_ids().len(), 0);
}
#[test]
fn test_insert_batch_sequential_threshold() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
let docs: Vec<Document> = (0..10).map(|i| {
let mut doc = Document::new();
doc.put("index", Value::from(i)).unwrap();
doc
}).collect();
let result = inner.insert_batch(docs);
assert!(result.is_ok());
assert_eq!(result.unwrap().affected_nitrite_ids().len(), 10);
}
#[test]
fn test_insert_batch_optimized_threshold() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
let docs: Vec<Document> = (0..11).map(|i| {
let mut doc = Document::new();
doc.put("batch_test", Value::from("optimized")).unwrap();
doc.put("index", Value::from(i)).unwrap();
doc
}).collect();
let result = inner.insert_batch(docs);
assert!(result.is_ok());
assert_eq!(result.unwrap().affected_nitrite_ids().len(), 11);
}
#[test]
fn test_insert_batch_duplicate_detection() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
let mut doc = Document::new();
let id = doc.id().unwrap();
doc.put("field", Value::from("original")).unwrap();
inner.insert(doc.clone()).unwrap();
let mut docs: Vec<Document> = (0..15).map(|i| {
let mut d = Document::new();
d.put("index", Value::from(i)).unwrap();
d
}).collect();
let mut duplicate = Document::new();
duplicate.put(DOC_ID, Value::NitriteId(id)).unwrap();
duplicate.put("field", Value::from("duplicate")).unwrap();
docs.insert(5, duplicate);
let result = inner.insert_batch(docs);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(matches!(err.kind(), ErrorKind::UniqueConstraintViolation));
}
#[test]
fn test_insert_batch_all_unique_ids() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
let docs: Vec<Document> = (0..20).map(|i| {
let mut doc = Document::new();
doc.put("unique_test", Value::from(i)).unwrap();
doc.put("data", Value::from(format!("document_{}", i))).unwrap();
doc
}).collect();
let result = inner.insert_batch(docs);
assert!(result.is_ok());
let write_result = result.unwrap();
let ids = write_result.affected_nitrite_ids();
assert_eq!(ids.len(), 20);
let unique_ids: std::collections::HashSet<_> = ids.iter().collect();
assert_eq!(unique_ids.len(), 20);
}
#[test]
fn test_validate_no_duplicates_small_batch() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
for i in 0..5 {
let mut doc = Document::new();
doc.put("index", Value::from(i)).unwrap();
inner.insert(doc).unwrap();
}
let new_keys: Vec<Value> = (100..110).map(|_| {
Value::NitriteId(NitriteId::new())
}).collect();
let result = inner.validate_no_duplicates(&new_keys);
assert!(result.is_ok());
}
#[test]
fn test_prepare_document_for_insert() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
let mut doc = Document::new();
doc.put("test_field", Value::from("test_value")).unwrap();
let result = inner.prepare_document_for_insert(doc);
assert!(result.is_ok());
let (id, processed, original, source) = result.unwrap();
assert!(!id.to_string().is_empty());
let revision = processed.get(DOC_REVISION).unwrap();
assert!(!matches!(revision, Value::Null));
let test_field = original.get("test_field").unwrap();
assert!(!matches!(test_field, Value::Null));
assert_ne!(source, REPLICATOR);
}
#[test]
fn test_rollback_batch_insert() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
let mut ids: Vec<NitriteId> = Vec::new();
for i in 0..5 {
let mut doc = Document::new();
doc.put("rollback_test", Value::from(i)).unwrap();
let result = inner.insert(doc).unwrap();
ids.extend(result.affected_nitrite_ids());
}
for id in &ids {
assert!(inner.nitrite_map.contains_key(&Value::NitriteId(*id)).unwrap());
}
inner.rollback_batch_insert(&ids);
for id in &ids {
assert!(!inner.nitrite_map.contains_key(&Value::NitriteId(*id)).unwrap());
}
}
#[test]
fn test_rollback_batch_indexes() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
let mut docs: Vec<Document> = Vec::new();
for i in 0..3 {
let mut doc = Document::new();
doc.put("index_rollback_test", Value::from(i)).unwrap();
let _ = doc.id(); docs.push(doc);
}
let mut indexed_docs: Vec<Document> = Vec::new();
for doc in docs {
let (_, mut processed, _, _) = inner.prepare_document_for_insert(doc).unwrap();
inner.document_index_writer.write_index_entry(&mut processed).unwrap();
indexed_docs.push(processed);
}
inner.rollback_batch_indexes(&indexed_docs);
}
#[test]
fn test_update_batch_small_sequential() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
for i in 0..5 {
let mut doc = Document::new();
doc.put("value", Value::from(i)).unwrap();
inner.insert(doc).unwrap();
}
let mut update_doc = Document::new();
update_doc.put("updated", Value::from(true)).unwrap();
let filter = all();
let update_options = UpdateOptions::default();
let result = inner.update(filter, &update_doc, &update_options);
assert!(result.is_ok());
let write_result = result.unwrap();
assert_eq!(write_result.affected_nitrite_ids().len(), 5);
}
#[test]
fn test_update_batch_optimized_threshold() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
for i in 0..15 {
let mut doc = Document::new();
doc.put("batch_update_test", Value::from("original")).unwrap();
doc.put("index", Value::from(i)).unwrap();
inner.insert(doc).unwrap();
}
let mut update_doc = Document::new();
update_doc.put("batch_update_test", Value::from("updated")).unwrap();
let filter = all();
let update_options = UpdateOptions::default();
let result = inner.update(filter, &update_doc, &update_options);
assert!(result.is_ok());
let write_result = result.unwrap();
assert_eq!(write_result.affected_nitrite_ids().len(), 15);
}
#[test]
fn test_update_batch_preserves_document_data() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
let mut inserted_ids: Vec<NitriteId> = Vec::new();
for i in 0..12 {
let mut doc = Document::new();
doc.put("original_field", Value::from(format!("value_{}", i))).unwrap();
doc.put("index", Value::from(i)).unwrap();
let result = inner.insert(doc).unwrap();
inserted_ids.extend(result.affected_nitrite_ids());
}
let mut update_doc = Document::new();
update_doc.put("new_field", Value::from("added")).unwrap();
let filter = all();
let update_options = UpdateOptions::default();
let result = inner.update(filter, &update_doc, &update_options);
assert!(result.is_ok());
for id in &inserted_ids {
let stored = inner.nitrite_map.get(&Value::NitriteId(*id)).unwrap();
assert!(stored.is_some());
let doc = stored.unwrap();
let document = doc.as_document().unwrap();
let original = document.get("original_field").unwrap();
assert!(!matches!(original, Value::Null));
let new_field = document.get("new_field").unwrap();
assert!(!matches!(new_field, Value::Null));
}
}
#[test]
fn test_update_batch_revision_increment() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
for i in 0..12 {
let mut doc = Document::new();
doc.put("rev_test", Value::from(i)).unwrap();
inner.insert(doc).unwrap();
}
let mut update_doc = Document::new();
update_doc.put("updated", Value::from(true)).unwrap();
let filter = all();
let update_options = UpdateOptions::default();
inner.update(filter.clone(), &update_doc, &update_options).unwrap();
let mut update_doc2 = Document::new();
update_doc2.put("updated_again", Value::from(true)).unwrap();
inner.update(filter, &update_doc2, &update_options).unwrap();
}
#[test]
fn test_process_update_batch_optimized_directly() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
let mut docs = Vec::new();
for i in 0..15 {
let mut doc = Document::new();
doc.put("direct_test", Value::from(i)).unwrap();
let result = inner.insert(doc.clone()).unwrap();
let id = *result.affected_nitrite_ids().first().unwrap();
let stored = inner.nitrite_map.get(&Value::NitriteId(id)).unwrap().unwrap();
docs.push(stored.as_document().unwrap().clone());
}
let mut update_doc = Document::new();
update_doc.put("batch_processed", Value::from(true)).unwrap();
let mut nitrite_ids = Vec::new();
let result = inner.process_update_batch_optimized(&update_doc, &mut nitrite_ids, docs);
assert!(result.is_ok());
assert_eq!(nitrite_ids.len(), 15);
}
#[test]
fn test_batch_insert_exactly_at_threshold() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
let docs: Vec<Document> = (0..10).map(|i| {
let mut doc = Document::new();
doc.put("threshold_test", Value::from(i)).unwrap();
doc
}).collect();
let result = inner.insert_batch(docs);
assert!(result.is_ok());
assert_eq!(result.unwrap().affected_nitrite_ids().len(), 10);
}
#[test]
fn test_batch_insert_just_above_threshold() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
let docs: Vec<Document> = (0..11).map(|i| {
let mut doc = Document::new();
doc.put("above_threshold", Value::from(i)).unwrap();
doc
}).collect();
let result = inner.insert_batch(docs);
assert!(result.is_ok());
assert_eq!(result.unwrap().affected_nitrite_ids().len(), 11);
}
#[test]
fn test_batch_insert_with_complex_nested_documents() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
let docs: Vec<Document> = (0..15).map(|i| {
let mut doc = Document::new();
doc.put("level1.level2.level3.value", Value::from(i)).unwrap();
doc.put("array_field", Value::Array(vec![
Value::from(i),
Value::from(i * 2),
Value::from(i * 3),
])).unwrap();
doc
}).collect();
let result = inner.insert_batch(docs);
assert!(result.is_ok());
assert_eq!(result.unwrap().affected_nitrite_ids().len(), 15);
}
#[test]
fn test_batch_insert_single_document() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
let mut doc = Document::new();
doc.put("single", Value::from("test")).unwrap();
let result = inner.insert_batch(vec![doc]);
assert!(result.is_ok());
assert_eq!(result.unwrap().affected_nitrite_ids().len(), 1);
}
#[test]
fn test_batch_insert_returns_correct_ids() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
let docs: Vec<Document> = (0..20).map(|i| {
let mut doc = Document::new();
doc.put("verify_id", Value::from(i)).unwrap();
doc
}).collect();
let result = inner.insert_batch(docs).unwrap();
let ids = result.affected_nitrite_ids();
for id in ids {
let stored = inner.nitrite_map.get(&Value::NitriteId(*id)).unwrap();
assert!(stored.is_some(), "Document with ID {} should exist", id);
}
}
#[test]
fn test_batch_insert_duplicate_within_batch_same_id() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
let mut doc1 = Document::new();
let id = doc1.id().unwrap();
doc1.put("field", Value::from("first")).unwrap();
let mut doc2 = Document::new();
doc2.put(DOC_ID, Value::NitriteId(id)).unwrap();
doc2.put("field", Value::from("second")).unwrap();
let result = inner.insert_batch(vec![doc1, doc2]);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(matches!(err.kind(), ErrorKind::UniqueConstraintViolation));
}
#[test]
fn test_batch_insert_duplicate_within_large_batch() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
let mut existing = Document::new();
let existing_id = existing.id().unwrap();
existing.put("existing", Value::from(true)).unwrap();
inner.insert(existing).unwrap();
let mut docs: Vec<Document> = (0..15).map(|i| {
let mut doc = Document::new();
doc.put("new_doc", Value::from(i)).unwrap();
doc
}).collect();
let mut duplicate = Document::new();
duplicate.put(DOC_ID, Value::NitriteId(existing_id)).unwrap();
duplicate.put("duplicate", Value::from(true)).unwrap();
docs.push(duplicate);
let result = inner.insert_batch(docs);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(matches!(err.kind(), ErrorKind::UniqueConstraintViolation));
}
#[test]
fn test_batch_insert_fails_on_existing_document() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
let mut existing = Document::new();
let existing_id = existing.id().unwrap();
existing.put("existing", Value::from(true)).unwrap();
inner.insert(existing).unwrap();
let mut docs: Vec<Document> = (0..15).map(|i| {
let mut doc = Document::new();
doc.put("new_doc", Value::from(i)).unwrap();
doc
}).collect();
let mut duplicate = Document::new();
duplicate.put(DOC_ID, Value::NitriteId(existing_id)).unwrap();
duplicate.put("duplicate", Value::from(true)).unwrap();
docs.push(duplicate);
let result = inner.insert_batch(docs);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(matches!(err.kind(), ErrorKind::UniqueConstraintViolation));
}
#[test]
fn test_batch_insert_empty_documents() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
let docs: Vec<Document> = (0..15).map(|_| Document::new()).collect();
let result = inner.insert_batch(docs);
assert!(result.is_ok());
assert_eq!(result.unwrap().affected_nitrite_ids().len(), 15);
}
#[test]
fn test_batch_insert_large_batch() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
let docs: Vec<Document> = (0..100).map(|i| {
let mut doc = Document::new();
doc.put("large_batch", Value::from(i)).unwrap();
doc.put("data", Value::from(format!("data_{}", i))).unwrap();
doc
}).collect();
let result = inner.insert_batch(docs);
assert!(result.is_ok());
assert_eq!(result.unwrap().affected_nitrite_ids().len(), 100);
}
#[test]
fn test_batch_insert_with_null_values() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
let docs: Vec<Document> = (0..15).map(|i| {
let mut doc = Document::new();
doc.put("index", Value::from(i)).unwrap();
doc.put("nullable", Value::Null).unwrap();
doc
}).collect();
let result = inner.insert_batch(docs);
assert!(result.is_ok());
assert_eq!(result.unwrap().affected_nitrite_ids().len(), 15);
}
#[test]
fn test_batch_insert_preserves_document_order() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
let mut docs = Vec::new();
let mut expected_order = Vec::new();
for i in 0..20 {
let mut doc = Document::new();
let id = doc.id().unwrap();
doc.put("order_test", Value::from(i)).unwrap();
expected_order.push(id);
docs.push(doc);
}
let result = inner.insert_batch(docs).unwrap();
let returned_ids = result.affected_nitrite_ids();
assert_eq!(returned_ids.len(), expected_order.len());
for (expected, actual) in expected_order.iter().zip(returned_ids.iter()) {
assert_eq!(expected, actual, "ID order should be preserved");
}
}
#[test]
fn test_batch_update_at_threshold() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
for i in 0..10 {
let mut doc = Document::new();
doc.put("update_threshold", Value::from(i)).unwrap();
inner.insert(doc).unwrap();
}
let mut update_doc = Document::new();
update_doc.put("updated_at_threshold", Value::from(true)).unwrap();
let result = inner.update(all(), &update_doc, &UpdateOptions::default());
assert!(result.is_ok());
assert_eq!(result.unwrap().affected_nitrite_ids().len(), 10);
}
#[test]
fn test_batch_update_above_threshold() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
for i in 0..25 {
let mut doc = Document::new();
doc.put("batch_update", Value::from(i)).unwrap();
inner.insert(doc).unwrap();
}
let mut update_doc = Document::new();
update_doc.put("batch_updated", Value::from(true)).unwrap();
let result = inner.update(all(), &update_doc, &UpdateOptions::default());
assert!(result.is_ok());
assert_eq!(result.unwrap().affected_nitrite_ids().len(), 25);
}
#[test]
fn test_batch_update_partial_match() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
for i in 0..30 {
let mut doc = Document::new();
doc.put("category", Value::from(if i % 2 == 0 { "even" } else { "odd" })).unwrap();
doc.put("index", Value::from(i)).unwrap();
inner.insert(doc).unwrap();
}
let mut update_doc = Document::new();
update_doc.put("processed", Value::from(true)).unwrap();
let filter = field("category").eq("even");
let result = inner.update(filter, &update_doc, &UpdateOptions::default());
assert!(result.is_ok());
assert_eq!(result.unwrap().affected_nitrite_ids().len(), 15); }
#[test]
fn test_batch_update_merges_fields_correctly() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
let mut inserted_ids: Vec<NitriteId> = Vec::new();
for i in 0..20 {
let mut doc = Document::new();
doc.put("original", Value::from(format!("original_{}", i))).unwrap();
doc.put("keep_this", Value::from(i * 10)).unwrap();
let result = inner.insert(doc).unwrap();
inserted_ids.extend(result.affected_nitrite_ids());
}
let mut update_doc = Document::new();
update_doc.put("new_field", Value::from("added")).unwrap();
update_doc.put("original", Value::from("overwritten")).unwrap();
inner.update(all(), &update_doc, &UpdateOptions::default()).unwrap();
for id in &inserted_ids {
let stored = inner.nitrite_map.get(&Value::NitriteId(*id)).unwrap().unwrap();
let doc = stored.as_document().unwrap();
let original = doc.get("original").unwrap();
assert_eq!(original.as_string().unwrap(), "overwritten");
let keep = doc.get("keep_this").unwrap();
assert!(!matches!(keep, Value::Null));
let new_field = doc.get("new_field").unwrap();
assert_eq!(new_field.as_string().unwrap(), "added");
}
}
#[test]
fn test_batch_update_no_matching_documents() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
for i in 0..15 {
let mut doc = Document::new();
doc.put("status", Value::from("active")).unwrap();
doc.put("index", Value::from(i)).unwrap();
inner.insert(doc).unwrap();
}
let mut update_doc = Document::new();
update_doc.put("updated", Value::from(true)).unwrap();
let filter = field("status").eq("nonexistent");
let result = inner.update(filter, &update_doc, &UpdateOptions::default());
assert!(result.is_ok());
assert_eq!(result.unwrap().affected_nitrite_ids().len(), 0);
}
#[test]
fn test_batch_update_with_empty_update_document() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
for i in 0..15 {
let mut doc = Document::new();
doc.put("data", Value::from(i)).unwrap();
inner.insert(doc).unwrap();
}
let update_doc = Document::new(); let result = inner.update(all(), &update_doc, &UpdateOptions::default());
assert!(result.is_ok());
assert_eq!(result.unwrap().affected_nitrite_ids().len(), 0);
}
#[test]
fn test_batch_update_just_once_option() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
for i in 0..20 {
let mut doc = Document::new();
doc.put("just_once_test", Value::from(i)).unwrap();
inner.insert(doc).unwrap();
}
let mut update_doc = Document::new();
update_doc.put("updated", Value::from(true)).unwrap();
let options = UpdateOptions::new(false, true);
let result = inner.update(all(), &update_doc, &options);
assert!(result.is_ok());
assert_eq!(result.unwrap().affected_nitrite_ids().len(), 1);
}
#[test]
fn test_batch_update_insert_if_absent() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
let mut update_doc = Document::new();
update_doc.put("upserted", Value::from(true)).unwrap();
let options = UpdateOptions::new(true, false);
let filter = field("nonexistent").eq(true);
let result = inner.update(filter, &update_doc, &options);
assert!(result.is_ok());
assert_eq!(result.unwrap().affected_nitrite_ids().len(), 1);
}
#[test]
fn test_batch_update_increments_revision() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
let mut inserted_ids: Vec<NitriteId> = Vec::new();
for i in 0..15 {
let mut doc = Document::new();
doc.put("rev_check", Value::from(i)).unwrap();
let result = inner.insert(doc).unwrap();
inserted_ids.extend(result.affected_nitrite_ids());
}
let mut update1 = Document::new();
update1.put("update1", Value::from(true)).unwrap();
inner.update(all(), &update1, &UpdateOptions::default()).unwrap();
let mut update2 = Document::new();
update2.put("update2", Value::from(true)).unwrap();
inner.update(all(), &update2, &UpdateOptions::default()).unwrap();
for id in &inserted_ids {
let stored = inner.nitrite_map.get(&Value::NitriteId(*id)).unwrap().unwrap();
let doc = stored.as_document().unwrap();
let revision = doc.revision().unwrap();
assert_eq!(revision, 3, "Revision should be 3 after insert + 2 updates");
}
}
#[test]
fn test_rollback_batch_update_restores_old_state() {
let write_operations = setup_write_operations();
let inner = write_operations.inner.clone();
let mut ids: Vec<NitriteId> = Vec::new();
for i in 0..5 {
let mut doc = Document::new();
doc.put("original_value", Value::from(format!("original_{}", i))).unwrap();
let result = inner.insert(doc).unwrap();
ids.extend(result.affected_nitrite_ids());
}
let mut update_doc = Document::new();
update_doc.put("new_value", Value::from("changed")).unwrap();
let updated_indexes: Vec<(NitriteId, Document, Document)> = Vec::new();
let failed_id = ids[0];
let failed_old_doc = inner.nitrite_map
.get(&Value::NitriteId(failed_id))
.unwrap()
.unwrap()
.as_document()
.unwrap()
.clone();
let result = inner.rollback_batch_update(&updated_indexes, &failed_id, &failed_old_doc, &update_doc);
assert!(result.is_ok());
}
}