db_library/
database.rs

1pub mod mongodb;
2pub mod mysql;
3pub mod postgres;
4pub mod sqlite;
5
6use crate::config::DBListenerError;
7use async_trait::async_trait;
8use postgres::PostgresTableListener;
9use serde_json::Value;
10use sqlx::PgPool;
11use std::sync::Arc;
12use tokio::{
13    sync::{mpsc::Receiver, Mutex},
14    task::JoinHandle,
15};
16
17#[derive(Debug, Clone, PartialEq)]
18pub enum EventType {
19    INSERT,
20    UPDATE,
21    DELETE,
22}
23impl EventType {
24    pub fn get_events_list(events: &[EventType], columns_list: &str) -> String {
25        events
26            .iter()
27            .map(|event| match event {
28                EventType::INSERT => "INSERT".to_string(),
29                EventType::UPDATE => format!("UPDATE OF {}", columns_list),
30                EventType::DELETE => "DELETE".to_string(),
31            })
32            .collect::<Vec<_>>()
33            .join(" OR ")
34    }
35}
36
37#[derive(Debug, Clone)]
38pub enum DBConfig {
39    Postgres {
40        pool: Arc<PgPool>,
41        table_name: String,
42        columns: Vec<String>,
43        table_identifier: String,
44    },
45    MongoDB {
46        url: String,
47        collection: String,
48    },
49    MySQL {
50        url: String,
51        table: String,
52        id_column: String,
53    },
54}
55
56pub struct DBListener {
57    pub listener: Box<dyn DBListenerTrait>,
58}
59
60impl DBListener {
61    pub async fn new(config: DBConfig, events: Vec<EventType>) -> Result<Self, DBListenerError> {
62        let listener: Box<dyn DBListenerTrait> = match &config {
63            DBConfig::Postgres {
64                pool,
65                table_name,
66                columns,
67                table_identifier,
68            } => {
69                let postgres_table_listener = PostgresTableListener::new(
70                    Arc::clone(&pool),
71                    &table_name,
72                    columns.clone(),
73                    &table_identifier,
74                    events,
75                )
76                .await?;
77
78                Box::new(postgres_table_listener)
79            }
80            _ => unimplemented!(),
81        };
82
83        Ok(DBListener { listener })
84    }
85}
86
87#[async_trait]
88pub trait DBListenerTrait {
89    async fn start(&self)
90        -> Result<(Arc<Mutex<Receiver<Value>>>, JoinHandle<()>), DBListenerError>;
91
92    async fn stop(&self) -> Result<(), DBListenerError>;
93}