db_library/
database.rs

1//! # Database Module
2//!
3//! This module provides an abstraction for listening to database changes across multiple database types.
4//! It supports PostgreSQL, MongoDB, MySQL, and SQLite through a unified interface.
5//!
6//! ## Supported Databases
7//! - **PostgreSQL**: Uses LISTEN/NOTIFY for real-time notifications.
8//! - **MongoDB**: Uses Change Streams to track document updates.
9//! - **MySQL**: Planned to support in future.
10//! - **SQLite**: Planned support in future.
11
12pub mod mongodb;
13pub mod mysql;
14pub mod postgres;
15pub mod sqlite;
16
17use crate::config::DBListenerError;
18use async_trait::async_trait;
19use mongodb::MongoDocumentListener;
20use postgres::PostgresTableListener;
21use serde_json::Value;
22use std::sync::Arc;
23use tokio::{
24    signal,
25    sync::{mpsc::Receiver, Mutex},
26    task::JoinHandle,
27};
28
29/// Represents the type of database events that can be monitored.
30#[derive(Debug, Clone, PartialEq)]
31pub enum EventType {
32    /// Triggered when a new row/document is inserted.
33    INSERT,
34    /// Triggered when an existing row/document is updated.
35    UPDATE,
36    /// Triggered when a row/document is deleted.
37    DELETE,
38}
39
40/// Configuration options for different database types.
41#[derive(Debug, Clone)]
42pub enum DBConfig {
43    /// Configuration for PostgreSQL listener.
44    Postgres {
45        /// Database connection URL.
46        url: String,
47        /// Table name to monitor.
48        table_name: String,
49        /// List of columns to track for updates.
50        columns: Vec<String>,
51        /// Unique identifier for the table.
52        table_identifier: String,
53    },
54    /// Configuration for MongoDB listener.
55    MongoDB {
56        /// Database connection URL.
57        url: String,
58        /// Database name.
59        database: String,
60        /// Collection name to monitor.
61        collection: String,
62    },
63    /// Configuration for MySQL listener (planned feature).
64    MySQL {
65        /// Database connection URL.
66        url: String,
67        /// Table name to monitor.
68        table_name: String,
69        /// List of columns to track for updates.
70        columns: Vec<String>,
71        /// Unique identifier for the table.
72        table_identifier: String,
73    },
74}
75/// Core struct that manages database listeners.
76pub struct DBListener {
77    /// Generic trait object representing a database listener.
78    pub listener: Arc<Box<dyn DBListenerTrait + Send + Sync>>,
79}
80
81impl DBListener {
82    /// Creates a new database listener instance based on the provided configuration.
83    ///
84    /// # Arguments
85    /// * `config` - The `DBConfig` specifying the database type and connection details.
86    /// * `events` - A vector of `EventType` values indicating which events to track.
87    ///
88    /// # Returns
89    /// A `Result` containing a `DBListener` instance or an error.
90    pub async fn new(config: DBConfig, events: Vec<EventType>) -> Result<Self, DBListenerError> {
91        match &config {
92            DBConfig::Postgres {
93                url,
94                table_name,
95                columns,
96                table_identifier,
97            } => {
98                let postgres_table_listener = PostgresTableListener::new(
99                    url,
100                    &table_name,
101                    columns.clone(),
102                    &table_identifier,
103                    events,
104                )
105                .await?;
106
107                Ok(DBListener {
108                    listener: Arc::new(Box::new(postgres_table_listener)),
109                })
110                // Box::new(postgres_table_listener)
111            }
112            DBConfig::MongoDB {
113                url,
114                database,
115                collection,
116            } => {
117                let mongo_document_listener =
118                    MongoDocumentListener::new(url, database, collection, events).await?;
119
120                Ok(DBListener {
121                    listener: Arc::new(Box::new(mongo_document_listener)),
122                })
123            }
124            _ => unimplemented!(),
125        }
126    }
127
128    /// Starts listening for database changes and executes a callback function when an event occurs.
129    ///
130    /// # Arguments
131    /// * `callback` - A function to be called when a database event is received.
132    ///
133    /// # Returns
134    /// A `Result` indicating success or failure.
135    ///
136    pub async fn listen<F>(&self, callback: F) -> Result<(), DBListenerError>
137    where
138        F: Fn(Value) + Send + Sync + 'static,
139    {
140        let rx = self.listener.start().await?;
141
142        let (rx, handle) = rx;
143
144        tokio::spawn(async move {
145            let mut rx = rx.lock().await;
146
147            while let Some(payload) = rx.recv().await {
148                callback(payload); // Pass the received event payload to the callback function
149            }
150        });
151
152        if let Err(err) = signal::ctrl_c().await {
153            return Err(DBListenerError::Other(format!(
154                "Graceful shutdown error: {:#?}",
155                err
156            )));
157        }
158
159        self.listener.stop().await?;
160        handle.abort();
161        Ok(())
162    }
163}
164
165/// Trait defining a generic database listener.
166#[async_trait]
167pub trait DBListenerTrait {
168    /// Starts the listener and returns a receiver channel for event notifications.
169    ///
170    /// # Returns
171    /// A `Result` containing a tuple of an `Arc<Mutex<Receiver<Value>>>` and a `JoinHandle<()>`,
172    /// or an error if the listener fails to start.
173    async fn start(&self)
174        -> Result<(Arc<Mutex<Receiver<Value>>>, JoinHandle<()>), DBListenerError>;
175
176    /// Stops the database listener gracefully.
177    ///
178    /// # Returns
179    /// A `Result` indicating success or failure.
180    async fn stop(&self) -> Result<(), DBListenerError>;
181}