db_library/database.rs
1//! # Database Module
2//!
3//! This module provides an abstraction for listening to database changes across multiple database types.
4//! It supports PostgreSQL, MongoDB, MySQL, and SQLite through a unified interface.
5//!
6//! ## Supported Databases
7//! - **PostgreSQL**: Uses LISTEN/NOTIFY for real-time notifications.
8//! - **MongoDB**: Uses Change Streams to track document updates.
9//! - **MySQL**: Planned to support in future.
10//! - **SQLite**: Planned support in future.
11
12pub mod mongodb;
13pub mod mysql;
14pub mod postgres;
15pub mod sqlite;
16
17use crate::config::DBListenerError;
18use async_trait::async_trait;
19use mongodb::MongoDocumentListener;
20use postgres::PostgresTableListener;
21use serde_json::Value;
22use std::sync::Arc;
23use tokio::{
24 signal,
25 sync::{mpsc::Receiver, Mutex},
26 task::JoinHandle,
27};
28
29/// Represents the type of database events that can be monitored.
30#[derive(Debug, Clone, PartialEq)]
31pub enum EventType {
32 /// Triggered when a new row/document is inserted.
33 INSERT,
34 /// Triggered when an existing row/document is updated.
35 UPDATE,
36 /// Triggered when a row/document is deleted.
37 DELETE,
38}
39
40/// Configuration options for different database types.
41#[derive(Debug, Clone)]
42pub enum DBConfig {
43 /// Configuration for PostgreSQL listener.
44 Postgres {
45 /// Database connection URL.
46 url: String,
47 /// Table name to monitor.
48 table_name: String,
49 /// List of columns to track for updates.
50 columns: Vec<String>,
51 /// Unique identifier for the table.
52 table_identifier: String,
53 },
54 /// Configuration for MongoDB listener.
55 MongoDB {
56 /// Database connection URL.
57 url: String,
58 /// Database name.
59 database: String,
60 /// Collection name to monitor.
61 collection: String,
62 },
63 /// Configuration for MySQL listener (planned feature).
64 MySQL {
65 /// Database connection URL.
66 url: String,
67 /// Table name to monitor.
68 table_name: String,
69 /// List of columns to track for updates.
70 columns: Vec<String>,
71 /// Unique identifier for the table.
72 table_identifier: String,
73 },
74}
75/// Core struct that manages database listeners.
76pub struct DBListener {
77 /// Generic trait object representing a database listener.
78 pub listener: Arc<Box<dyn DBListenerTrait + Send + Sync>>,
79}
80
81impl DBListener {
82 /// Creates a new database listener instance based on the provided configuration.
83 ///
84 /// # Arguments
85 /// * `config` - The `DBConfig` specifying the database type and connection details.
86 /// * `events` - A vector of `EventType` values indicating which events to track.
87 ///
88 /// # Returns
89 /// A `Result` containing a `DBListener` instance or an error.
90 pub async fn new(config: DBConfig, events: Vec<EventType>) -> Result<Self, DBListenerError> {
91 match &config {
92 DBConfig::Postgres {
93 url,
94 table_name,
95 columns,
96 table_identifier,
97 } => {
98 let postgres_table_listener = PostgresTableListener::new(
99 url,
100 &table_name,
101 columns.clone(),
102 &table_identifier,
103 events,
104 )
105 .await?;
106
107 Ok(DBListener {
108 listener: Arc::new(Box::new(postgres_table_listener)),
109 })
110 // Box::new(postgres_table_listener)
111 }
112 DBConfig::MongoDB {
113 url,
114 database,
115 collection,
116 } => {
117 let mongo_document_listener =
118 MongoDocumentListener::new(url, database, collection, events).await?;
119
120 Ok(DBListener {
121 listener: Arc::new(Box::new(mongo_document_listener)),
122 })
123 }
124 _ => unimplemented!(),
125 }
126 }
127
128 /// Starts listening for database changes and executes a callback function when an event occurs.
129 ///
130 /// # Arguments
131 /// * `callback` - A function to be called when a database event is received.
132 ///
133 /// # Returns
134 /// A `Result` indicating success or failure.
135 ///
136 pub async fn listen<F>(&self, callback: F) -> Result<(), DBListenerError>
137 where
138 F: Fn(Value) + Send + Sync + 'static,
139 {
140 let rx = self.listener.start().await?;
141
142 let (rx, handle) = rx;
143
144 tokio::spawn(async move {
145 let mut rx = rx.lock().await;
146
147 while let Some(payload) = rx.recv().await {
148 callback(payload); // Pass the received event payload to the callback function
149 }
150 });
151
152 if let Err(err) = signal::ctrl_c().await {
153 return Err(DBListenerError::Other(format!(
154 "Graceful shutdown error: {:#?}",
155 err
156 )));
157 }
158
159 self.listener.stop().await?;
160 handle.abort();
161 Ok(())
162 }
163}
164
165/// Trait defining a generic database listener.
166#[async_trait]
167pub trait DBListenerTrait {
168 /// Starts the listener and returns a receiver channel for event notifications.
169 ///
170 /// # Returns
171 /// A `Result` containing a tuple of an `Arc<Mutex<Receiver<Value>>>` and a `JoinHandle<()>`,
172 /// or an error if the listener fails to start.
173 async fn start(&self)
174 -> Result<(Arc<Mutex<Receiver<Value>>>, JoinHandle<()>), DBListenerError>;
175
176 /// Stops the database listener gracefully.
177 ///
178 /// # Returns
179 /// A `Result` indicating success or failure.
180 async fn stop(&self) -> Result<(), DBListenerError>;
181}