db-library 0.1.2

A Rust library for listening to database changes and notifying connected backend services.
Documentation
//! # Database Module
//!
//! This module provides an abstraction for listening to database changes across multiple database types.
//! It supports PostgreSQL, MongoDB, MySQL, and SQLite through a unified interface.
//!
//! ## Supported Databases
//! - **PostgreSQL**: Uses LISTEN/NOTIFY for real-time notifications.
//! - **MongoDB**: Uses Change Streams to track document updates.
//! - **MySQL**: Planned to support in future.
//! - **SQLite**: Planned support in future.

pub mod mongodb;
pub mod mysql;
pub mod postgres;
pub mod sqlite;

use crate::config::DBListenerError;
use async_trait::async_trait;
use mongodb::MongoDocumentListener;
use postgres::PostgresTableListener;
use serde_json::Value;
use std::sync::Arc;
use tokio::{
    signal,
    sync::{mpsc::Receiver, Mutex},
    task::JoinHandle,
};

/// Represents the type of database events that can be monitored.
#[derive(Debug, Clone, PartialEq)]
pub enum EventType {
    /// Triggered when a new row/document is inserted.
    INSERT,
    /// Triggered when an existing row/document is updated.
    UPDATE,
    /// Triggered when a row/document is deleted.
    DELETE,
}

/// Configuration options for different database types.
#[derive(Debug, Clone)]
pub enum DBConfig {
    /// Configuration for PostgreSQL listener.
    Postgres {
        /// Database connection URL.
        url: String,
        /// Table name to monitor.
        table_name: String,
        /// List of columns to track for updates.
        columns: Vec<String>,
        /// Unique identifier for the table.
        table_identifier: String,
    },
    /// Configuration for MongoDB listener.
    MongoDB {
        /// Database connection URL.
        url: String,
        /// Database name.
        database: String,
        /// Collection name to monitor.
        collection: String,
    },
    /// Configuration for MySQL listener (planned feature).
    MySQL {
        /// Database connection URL.
        url: String,
        /// Table name to monitor.
        table_name: String,
        /// List of columns to track for updates.
        columns: Vec<String>,
        /// Unique identifier for the table.
        table_identifier: String,
    },
}
/// Core struct that manages database listeners.
pub struct DBListener {
    /// Generic trait object representing a database listener.
    pub listener: Arc<Box<dyn DBListenerTrait + Send + Sync>>,
}

impl DBListener {
    /// Creates a new database listener instance based on the provided configuration.
    ///
    /// # Arguments
    /// * `config` - The `DBConfig` specifying the database type and connection details.
    /// * `events` - A vector of `EventType` values indicating which events to track.
    ///
    /// # Returns
    /// A `Result` containing a `DBListener` instance or an error.
    pub async fn new(config: DBConfig, events: Vec<EventType>) -> Result<Self, DBListenerError> {
        match &config {
            DBConfig::Postgres {
                url,
                table_name,
                columns,
                table_identifier,
            } => {
                let postgres_table_listener = PostgresTableListener::new(
                    url,
                    &table_name,
                    columns.clone(),
                    &table_identifier,
                    events,
                )
                .await?;

                Ok(DBListener {
                    listener: Arc::new(Box::new(postgres_table_listener)),
                })
                // Box::new(postgres_table_listener)
            }
            DBConfig::MongoDB {
                url,
                database,
                collection,
            } => {
                let mongo_document_listener =
                    MongoDocumentListener::new(url, database, collection, events).await?;

                Ok(DBListener {
                    listener: Arc::new(Box::new(mongo_document_listener)),
                })
            }
            _ => unimplemented!(),
        }
    }

    /// Starts listening for database changes and executes a callback function when an event occurs.
    ///
    /// # Arguments
    /// * `callback` - A function to be called when a database event is received.
    ///
    /// # Returns
    /// A `Result` indicating success or failure.
    ///
    pub async fn listen<F>(&self, callback: F) -> Result<(), DBListenerError>
    where
        F: Fn(Value) + Send + Sync + 'static,
    {
        let rx = self.listener.start().await?;

        let (rx, handle) = rx;

        tokio::spawn(async move {
            let mut rx = rx.lock().await;

            while let Some(payload) = rx.recv().await {
                callback(payload); // Pass the received event payload to the callback function
            }
        });

        if let Err(err) = signal::ctrl_c().await {
            return Err(DBListenerError::Other(format!(
                "Graceful shutdown error: {:#?}",
                err
            )));
        }

        self.listener.stop().await?;
        handle.abort();
        Ok(())
    }
}

/// Trait defining a generic database listener.
#[async_trait]
pub trait DBListenerTrait {
    /// Starts the listener and returns a receiver channel for event notifications.
    ///
    /// # Returns
    /// A `Result` containing a tuple of an `Arc<Mutex<Receiver<Value>>>` and a `JoinHandle<()>`,
    /// or an error if the listener fails to start.
    async fn start(&self)
        -> Result<(Arc<Mutex<Receiver<Value>>>, JoinHandle<()>), DBListenerError>;

    /// Stops the database listener gracefully.
    ///
    /// # Returns
    /// A `Result` indicating success or failure.
    async fn stop(&self) -> Result<(), DBListenerError>;
}