use crate::collection::Document;
use crate::common::persistent_collection::PersistentCollection;
use crate::errors::NitriteResult;
use dashmap::DashMap;
use std::sync::Arc;
pub trait ProcessorProvider: Send + Sync {
fn name(&self) -> String;
fn process_before_write(&self, doc: Document) -> NitriteResult<Document>;
fn process_after_read(&self, doc: Document) -> NitriteResult<Document>;
}
#[derive(Clone)]
pub struct Processor {
inner: Arc<dyn ProcessorProvider>,
}
impl Processor {
pub fn new<T: ProcessorProvider + 'static>(inner: T) -> Self {
Processor { inner: Arc::new(inner) }
}
pub fn name(&self) -> String {
self.inner.name()
}
pub fn process_before_write(&self, doc: Document) -> NitriteResult<Document> {
self.inner.process_before_write(doc)
}
pub fn process_after_read(&self, doc: Document) -> NitriteResult<Document> {
self.inner.process_after_read(doc)
}
}
#[derive(Clone)]
pub struct ProcessorChain {
inner: Arc<ProcessorChainInner>,
}
impl Default for ProcessorChain {
fn default() -> Self {
Self::new()
}
}
impl ProcessorChain {
pub fn new() -> Self {
ProcessorChain {
inner: Arc::new(ProcessorChainInner::new()),
}
}
pub fn add_processor(&self, processor: Processor) {
self.inner.add_processor(processor);
}
pub fn remove_processor(&self, processor_name: &str) {
self.inner.remove_processor(processor_name);
}
pub fn process_before_write(&self, doc: Document) -> NitriteResult<Document> {
self.inner.process_before_write(doc)
}
pub fn process_after_read(&self, doc: Document) -> NitriteResult<Document> {
self.inner.process_after_read(doc)
}
}
impl ProcessorProvider for ProcessorChain {
fn name(&self) -> String {
"ProcessorChain".to_string()
}
fn process_before_write(&self, doc: Document) -> NitriteResult<Document> {
self.inner.process_before_write(doc)
}
fn process_after_read(&self, doc: Document) -> NitriteResult<Document> {
self.inner.process_after_read(doc)
}
}
pub(crate) struct ProcessorChainInner {
processors: DashMap<String, Processor>,
}
impl ProcessorChainInner {
fn new() -> Self {
Self {
processors: DashMap::new(),
}
}
#[inline]
fn add_processor(&self, processor: Processor) {
self.processors.insert(processor.name(), processor);
}
#[inline]
fn remove_processor(&self, processor_name: &str) {
self.processors.remove(processor_name);
}
#[inline]
fn name(&self) -> String {
"ProcessorChain".to_string()
}
#[inline]
fn process_before_write(&self, doc: Document) -> NitriteResult<Document> {
if self.processors.is_empty() {
return Ok(doc);
}
let mut processed_doc = doc.clone();
for processor in self.processors.iter() {
processed_doc = processor.process_before_write(processed_doc)?;
}
Ok(processed_doc)
}
#[inline]
fn process_after_read(&self, doc: Document) -> NitriteResult<Document> {
if self.processors.is_empty() {
return Ok(doc);
}
let mut processed_doc = doc.clone();
for processor in self.processors.iter() {
processed_doc = processor.process_after_read(processed_doc)?;
}
Ok(processed_doc)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::collection::Document;
use crate::errors::{ErrorKind, NitriteError};
struct MockProcessor;
impl ProcessorProvider for MockProcessor {
fn name(&self) -> String {
"MockProcessor".to_string()
}
fn process_before_write(&self, doc: Document) -> NitriteResult<Document> {
let mut new_doc = doc.clone();
new_doc.put("processed", "before_write")?;
Ok(new_doc)
}
fn process_after_read(&self, doc: Document) -> NitriteResult<Document> {
let mut new_doc = doc.clone();
new_doc.put("processed", "after_read")?;
Ok(new_doc)
}
}
#[test]
fn test_processor_new() {
let processor = Processor::new(MockProcessor);
assert_eq!(processor.name(), "MockProcessor");
}
#[test]
fn test_processor_process_before_write() {
let processor = Processor::new(MockProcessor);
let mut doc = Document::new();
doc.put("key", "value").unwrap();
let processed_doc = processor.process_before_write(doc).unwrap();
assert_eq!(processed_doc.get("processed").unwrap(), "before_write".into());
}
#[test]
fn test_processor_process_after_read() {
let processor = Processor::new(MockProcessor);
let mut doc = Document::new();
doc.put("key", "value").unwrap();
let processed_doc = processor.process_after_read(doc).unwrap();
assert_eq!(processed_doc.get("processed").unwrap(), "after_read".into());
}
#[test]
fn test_processor_chain_new() {
let processor_chain = ProcessorChain::new();
assert_eq!(processor_chain.name(), "ProcessorChain");
}
#[test]
fn test_processor_chain_add_processor() {
let processor_chain = ProcessorChain::new();
let processor = Processor::new(MockProcessor);
processor_chain.add_processor(processor.clone());
assert_eq!(processor_chain.inner.processors.len(), 1);
}
#[test]
fn test_processor_chain_remove_processor() {
let processor_chain = ProcessorChain::new();
let processor = Processor::new(MockProcessor);
processor_chain.add_processor(processor.clone());
processor_chain.remove_processor("MockProcessor");
assert_eq!(processor_chain.inner.processors.len(), 0);
}
#[test]
fn test_processor_chain_process_before_write() {
let processor_chain = ProcessorChain::new();
let processor = Processor::new(MockProcessor);
processor_chain.add_processor(processor.clone());
let mut doc = Document::new();
doc.put("key", "value").unwrap();
let processed_doc = processor_chain.process_before_write(doc).unwrap();
assert_eq!(processed_doc.get("processed").unwrap(), "before_write".into());
}
#[test]
fn test_processor_chain_process_after_read() {
let processor_chain = ProcessorChain::new();
let processor = Processor::new(MockProcessor);
processor_chain.add_processor(processor.clone());
let mut doc = Document::new();
doc.put("key", "value").unwrap();
let processed_doc = processor_chain.process_after_read(doc).unwrap();
assert_eq!(processed_doc.get("processed").unwrap(), "after_read".into());
}
#[test]
fn test_processor_chain_process_before_write_no_processors() {
let processor_chain = ProcessorChain::new();
let mut doc = Document::new();
doc.put("key", "value").unwrap();
let processed_doc = processor_chain.process_before_write(doc.clone()).unwrap();
assert_eq!(processed_doc, doc);
}
#[test]
fn test_processor_chain_process_after_read_no_processors() {
let processor_chain = ProcessorChain::new();
let mut doc = Document::new();
doc.put("key", "value").unwrap();
let processed_doc = processor_chain.process_after_read(doc.clone()).unwrap();
assert_eq!(processed_doc, doc);
}
#[test]
fn test_processor_chain_process_before_write_error() {
struct ErrorProcessor;
impl ProcessorProvider for ErrorProcessor {
fn name(&self) -> String {
"ErrorProcessor".to_string()
}
fn process_before_write(&self, _doc: Document) -> NitriteResult<Document> {
Err(NitriteError::new("Error in process_before_write", ErrorKind::IOError))
}
fn process_after_read(&self, _doc: Document) -> NitriteResult<Document> {
Ok(Document::new())
}
}
let processor_chain = ProcessorChain::new();
let processor = Processor::new(ErrorProcessor);
processor_chain.add_processor(processor.clone());
let mut doc = Document::new();
doc.put("key", "value").unwrap();
let result = processor_chain.process_before_write(doc);
assert!(result.is_err());
}
#[test]
fn test_processor_chain_process_after_read_error() {
struct ErrorProcessor;
impl ProcessorProvider for ErrorProcessor {
fn name(&self) -> String {
"ErrorProcessor".to_string()
}
fn process_before_write(&self, _doc: Document) -> NitriteResult<Document> {
Ok(Document::new())
}
fn process_after_read(&self, _doc: Document) -> NitriteResult<Document> {
Err(NitriteError::new("Error in process_after_read", ErrorKind::IOError))
}
}
let processor_chain = ProcessorChain::new();
let processor = Processor::new(ErrorProcessor);
processor_chain.add_processor(processor.clone());
let mut doc = Document::new();
doc.put("key", "value").unwrap();
let result = processor_chain.process_after_read(doc);
assert!(result.is_err());
}
#[test]
fn test_processor_chain_add_duplicate_processor() {
let processor_chain = ProcessorChain::new();
let processor = Processor::new(MockProcessor);
processor_chain.add_processor(processor.clone());
processor_chain.add_processor(processor.clone());
assert_eq!(processor_chain.inner.processors.len(), 1);
}
#[test]
fn test_processor_chain_remove_nonexistent_processor() {
let processor_chain = ProcessorChain::new();
processor_chain.remove_processor("NonExistentProcessor");
assert_eq!(processor_chain.inner.processors.len(), 0);
}
#[test]
fn bench_processor_chain_creation() {
for _ in 0..1000 {
let _ = ProcessorChain::new();
}
}
#[test]
fn bench_processor_chain_add_remove() {
let chain = ProcessorChain::new();
let processor = Processor::new(MockProcessor);
for _ in 0..100 {
chain.add_processor(processor.clone());
chain.remove_processor("MockProcessor");
}
}
#[test]
fn bench_processor_chain_process_no_processors() {
let chain = ProcessorChain::new();
let mut doc = Document::new();
doc.put("key", "value").unwrap();
for _ in 0..500 {
let _ = chain.process_before_write(doc.clone());
let _ = chain.process_after_read(doc.clone());
}
}
}