use anyhow::Result;
use std::fmt;
use std::sync::Arc;
use tokio::sync::Mutex;
use crate::types::S3syncObject;
use crate::types::filter_callback::FilterCallback;
#[derive(Clone)]
pub struct FilterManager {
pub preprocess_callback: Option<Arc<Mutex<Box<dyn FilterCallback + Send + Sync>>>>,
}
impl Default for FilterManager {
fn default() -> Self {
Self::new()
}
}
impl FilterManager {
pub fn new() -> Self {
Self {
preprocess_callback: None,
}
}
pub fn register_callback<T: FilterCallback + Send + Sync + 'static>(&mut self, callback: T) {
self.preprocess_callback = Some(Arc::new(Mutex::new(Box::new(callback))));
}
pub fn is_callback_registered(&self) -> bool {
self.preprocess_callback.is_some()
}
pub async fn execute_filter(&mut self, source: &S3syncObject) -> Result<bool> {
if let Some(callback) = &self.preprocess_callback {
callback.lock().await.filter(source).await
} else {
panic!("Filter callback is not registered");
}
}
}
impl fmt::Debug for FilterManager {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FilterManager")
.field("filter_callback", &self.preprocess_callback.is_some())
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use aws_sdk_s3::types::ObjectVersion;
#[tokio::test]
async fn create_preprocess_manager() {
let filter_manager = FilterManager::new();
assert!(!filter_manager.is_callback_registered());
println!("{:?}", filter_manager);
let filter_manager = FilterManager::default();
assert!(!filter_manager.is_callback_registered());
}
#[tokio::test]
#[should_panic]
async fn execute_filter_test_panic() {
let mut filter_manager = FilterManager::default();
filter_manager
.execute_filter(&S3syncObject::Versioning(ObjectVersion::builder().build()))
.await
.unwrap();
}
}