1pub mod mongodb;
2pub mod mysql;
3pub mod postgres;
4pub mod sqlite;
5
6use crate::config::DBListenerError;
7use async_trait::async_trait;
8use postgres::PostgresTableListener;
9use serde_json::Value;
10use sqlx::PgPool;
11use std::sync::Arc;
12use tokio::{
13 sync::{mpsc::Receiver, Mutex},
14 task::JoinHandle,
15};
16
17#[derive(Debug, Clone, PartialEq)]
18pub enum EventType {
19 INSERT,
20 UPDATE,
21 DELETE,
22}
23impl EventType {
24 pub fn get_events_list(events: &[EventType], columns_list: &str) -> String {
25 events
26 .iter()
27 .map(|event| match event {
28 EventType::INSERT => "INSERT".to_string(),
29 EventType::UPDATE => format!("UPDATE OF {}", columns_list),
30 EventType::DELETE => "DELETE".to_string(),
31 })
32 .collect::<Vec<_>>()
33 .join(" OR ")
34 }
35}
36
37#[derive(Debug, Clone)]
38pub enum DBConfig {
39 Postgres {
40 pool: Arc<PgPool>,
41 table_name: String,
42 columns: Vec<String>,
43 table_identifier: String,
44 },
45 MongoDB {
46 url: String,
47 collection: String,
48 },
49 MySQL {
50 url: String,
51 table: String,
52 id_column: String,
53 },
54}
55
56pub struct DBListener {
57 pub listener: Box<dyn DBListenerTrait>,
58}
59
60impl DBListener {
61 pub async fn new(config: DBConfig, events: Vec<EventType>) -> Result<Self, DBListenerError> {
62 let listener: Box<dyn DBListenerTrait> = match &config {
63 DBConfig::Postgres {
64 pool,
65 table_name,
66 columns,
67 table_identifier,
68 } => {
69 let postgres_table_listener = PostgresTableListener::new(
70 Arc::clone(&pool),
71 &table_name,
72 columns.clone(),
73 &table_identifier,
74 events,
75 )
76 .await?;
77
78 Box::new(postgres_table_listener)
79 }
80 _ => unimplemented!(),
81 };
82
83 Ok(DBListener { listener })
84 }
85}
86
87#[async_trait]
88pub trait DBListenerTrait {
89 async fn start(&self)
90 -> Result<(Arc<Mutex<Receiver<Value>>>, JoinHandle<()>), DBListenerError>;
91
92 async fn stop(&self) -> Result<(), DBListenerError>;
93}