pub mod mongodb;
pub mod mysql;
pub mod postgres;
pub mod sqlite;
use crate::config::DBListenerError;
use async_trait::async_trait;
use mongodb::MongoDocumentListener;
use postgres::PostgresTableListener;
use serde_json::Value;
use std::sync::Arc;
use tokio::{
signal,
sync::{mpsc::Receiver, Mutex},
task::JoinHandle,
};
#[derive(Debug, Clone, PartialEq)]
pub enum EventType {
INSERT,
UPDATE,
DELETE,
}
#[derive(Debug, Clone)]
pub enum DBConfig {
Postgres {
url: String,
table_name: String,
columns: Vec<String>,
table_identifier: String,
},
MongoDB {
url: String,
database: String,
collection: String,
},
MySQL {
url: String,
table_name: String,
columns: Vec<String>,
table_identifier: String,
},
}
pub struct DBListener {
pub listener: Arc<Box<dyn DBListenerTrait + Send + Sync>>,
}
impl DBListener {
pub async fn new(config: DBConfig, events: Vec<EventType>) -> Result<Self, DBListenerError> {
match &config {
DBConfig::Postgres {
url,
table_name,
columns,
table_identifier,
} => {
let postgres_table_listener = PostgresTableListener::new(
url,
&table_name,
columns.clone(),
&table_identifier,
events,
)
.await?;
Ok(DBListener {
listener: Arc::new(Box::new(postgres_table_listener)),
})
}
DBConfig::MongoDB {
url,
database,
collection,
} => {
let mongo_document_listener =
MongoDocumentListener::new(url, database, collection, events).await?;
Ok(DBListener {
listener: Arc::new(Box::new(mongo_document_listener)),
})
}
_ => unimplemented!(),
}
}
pub async fn listen<F>(&self, callback: F) -> Result<(), DBListenerError>
where
F: Fn(Value) + Send + Sync + 'static,
{
let rx = self.listener.start().await?;
let (rx, handle) = rx;
tokio::spawn(async move {
let mut rx = rx.lock().await;
while let Some(payload) = rx.recv().await {
callback(payload); }
});
if let Err(err) = signal::ctrl_c().await {
return Err(DBListenerError::Other(format!(
"Graceful shutdown error: {:#?}",
err
)));
}
self.listener.stop().await?;
handle.abort();
Ok(())
}
}
#[async_trait]
pub trait DBListenerTrait {
async fn start(&self)
-> Result<(Arc<Mutex<Receiver<Value>>>, JoinHandle<()>), DBListenerError>;
async fn stop(&self) -> Result<(), DBListenerError>;
}