newton-core 0.4.16

newton protocol core sdk
//! Database connection management with deadpool-postgres
//!
//! This module provides a robust database connection pool manager using deadpool-postgres
//! for efficient connection pooling, automatic reconnection, and tight tokio integration.

/// API key management and authentication
pub mod api_keys;
/// Encrypted data references for privacy-preserving policy evaluation, identity data, and secrets
pub mod encrypted_data_refs;
/// Permission definitions and enforcement
pub mod permissions;

use deadpool_postgres::{Config, ManagerConfig, Pool, RecyclingMethod, Runtime};
use once_cell::sync::OnceCell;
use sqlx::{
    postgres::{PgConnectOptions, PgPoolOptions, PgSslMode},
    PgPool,
};
use std::{str::FromStr, time::Duration};
use thiserror::Error;
use tracing::{info, warn};

// Re-export commonly used types
pub use api_keys::{ApiKeyRecord, ApiKeyRepository, ApiKeyUpdate};
pub use encrypted_data_refs::{DataType, EncryptedDataRefRecord, EncryptedDataRefRepository};
pub use permissions::ApiPermission;

/// Configuration for database connection pooling
#[derive(Debug, Clone)]
pub struct DatabaseConfig {
    /// Database connection URL (e.g., postgres://user:pass@host:port/db)
    pub url: String,
    /// Maximum number of connections in the pool
    pub max_connections: u32,
    /// Minimum number of idle connections in the pool
    pub min_connections: u32,
    /// Connection timeout duration
    pub connect_timeout: Duration,
}

impl Default for DatabaseConfig {
    fn default() -> Self {
        Self {
            url: "postgres://newton:newton@localhost:5432/newton_gateway".to_string(),
            max_connections: 20,
            min_connections: 5,
            connect_timeout: Duration::from_secs(30),
        }
    }
}

/// Errors that can occur during database operations
#[derive(Error, Debug)]
pub enum DatabaseError {
    /// Failed to create the database connection pool
    #[error("Failed to create database pool: {0}")]
    PoolCreation(String),
    /// Failed to establish a database connection
    #[error("Failed to connect to database: {0}")]
    Connection(String),
    /// Database query execution error
    #[error("Database query error: {0}")]
    Query(String),
    /// Database transaction error
    #[error("Database transaction error: {0}")]
    Transaction(String),
    /// A prior caller already initialized the singleton with a different URL.
    /// Indicates two components in the same process disagree on which DB to use —
    /// typically a test harness where operator and gateway build independent configs
    /// and one was not overridden. In production each binary owns its process and
    /// this never fires.
    #[error(
        "Database singleton already initialized with a different URL. existing={existing}, requested={requested}. \
         Both callers in this process must agree on the connection URL."
    )]
    SingletonUrlMismatch {
        /// URL the singleton was first initialized with
        existing: String,
        /// URL the current caller is trying to initialize with
        requested: String,
    },
}

/// Database manager providing robust connection pooling with deadpool-postgres
///
/// This manager wraps both deadpool-postgres (for connection management) and sqlx
/// (for query execution), providing a unified interface for database operations.
#[derive(Clone)]
pub struct DatabaseManager {
    /// Connection URL the manager was initialized with. Used by `initialize_database`
    /// to detect singleton URL mismatches across in-process callers.
    url: String,
    /// Deadpool connection pool for robust connection management
    deadpool: Pool,
    /// SQLx connection pool for query execution
    sqlx_pool: PgPool,
}

impl DatabaseManager {
    /// Creates a new database manager with the given configuration
    ///
    /// # Arguments
    ///
    /// * `config` - Database configuration
    ///
    /// # Errors
    ///
    /// Returns an error if the database connection cannot be established
    pub async fn new(config: DatabaseConfig) -> Result<Self, DatabaseError> {
        info!(
            "Initializing database manager (max_connections: {}, min_connections: {})",
            config.max_connections, config.min_connections
        );

        // Parse connection URL for deadpool configuration
        let pg_config = config
            .url
            .parse::<deadpool_postgres::tokio_postgres::Config>()
            .map_err(|e| DatabaseError::Connection(format!("Invalid connection URL: {}", e)))?;

        // Configure deadpool manager
        let manager_config = ManagerConfig {
            recycling_method: RecyclingMethod::Fast,
        };

        // Create deadpool configuration
        let mut deadpool_config = Config::new();
        deadpool_config.host = pg_config.get_hosts().first().and_then(|h| match h {
            deadpool_postgres::tokio_postgres::config::Host::Tcp(host) => Some(host.clone()),
            _ => None,
        });
        deadpool_config.port = pg_config.get_ports().first().copied();
        deadpool_config.user = pg_config.get_user().map(String::from);
        deadpool_config.password = pg_config
            .get_password()
            .map(|p: &[u8]| String::from_utf8_lossy(p).to_string());
        deadpool_config.dbname = pg_config.get_dbname().map(String::from);
        deadpool_config.manager = Some(manager_config);
        deadpool_config.pool = Some(deadpool_postgres::PoolConfig::new(config.max_connections as usize));

        // Create deadpool
        let deadpool = deadpool_config
            .create_pool(Some(Runtime::Tokio1), deadpool_postgres::tokio_postgres::NoTls)
            .map_err(|e| DatabaseError::PoolCreation(format!("Failed to create deadpool: {}", e)))?;

        // Create SQLx pool for query execution
        let pg_connect_options = PgConnectOptions::from_str(&config.url)
            .map_err(|e| DatabaseError::Connection(format!("Invalid SQLx connection URL: {}", e)))?
            .ssl_mode(PgSslMode::Prefer);

        let sqlx_pool = PgPoolOptions::new()
            .max_connections(config.max_connections)
            .min_connections(config.min_connections)
            .acquire_timeout(config.connect_timeout)
            .connect_with(pg_connect_options)
            .await
            .map_err(|e| DatabaseError::Connection(format!("Failed to connect with SQLx: {}", e)))?;

        info!("Database manager initialized successfully");

        Ok(Self {
            url: config.url,
            deadpool,
            sqlx_pool,
        })
    }

    /// Returns the connection URL this manager was initialized with.
    pub fn url(&self) -> &str {
        &self.url
    }

    /// Gets a connection from the deadpool
    ///
    /// This is useful for operations that need direct access to tokio_postgres
    pub async fn get_connection(&self) -> Result<deadpool_postgres::Client, DatabaseError> {
        self.deadpool
            .get()
            .await
            .map_err(|e| DatabaseError::Connection(format!("Failed to get connection: {}", e)))
    }

    /// Gets the SQLx pool for query execution
    ///
    /// This is the primary interface for executing queries using sqlx
    pub fn pool(&self) -> &PgPool {
        &self.sqlx_pool
    }

    /// Gets the deadpool for advanced connection management
    pub fn deadpool(&self) -> &Pool {
        &self.deadpool
    }

    /// Returns the current pool size statistics
    pub fn pool_stats(&self) -> PoolStats {
        PoolStats {
            size: self.sqlx_pool.size(),
            idle: self.sqlx_pool.num_idle(),
            max_size: self.sqlx_pool.options().get_max_connections(),
        }
    }

    /// Performs a health check by executing a simple query
    pub async fn health_check(&self) -> Result<(), DatabaseError> {
        sqlx::query("SELECT 1")
            .execute(&self.sqlx_pool)
            .await
            .map_err(|e| DatabaseError::Query(format!("Health check failed: {}", e)))?;
        Ok(())
    }
}

/// Statistics about the database connection pool
#[derive(Debug, Clone, Copy)]
pub struct PoolStats {
    /// Current number of connections in the pool
    pub size: u32,
    /// Number of idle connections
    pub idle: usize,
    /// Maximum pool size
    pub max_size: u32,
}

impl std::fmt::Debug for DatabaseManager {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        let stats = self.pool_stats();
        f.debug_struct("DatabaseManager")
            .field("pool_size", &stats.size)
            .field("idle_connections", &stats.idle)
            .field("max_connections", &stats.max_size)
            .finish()
    }
}

/// Global database singleton instance
static DATABASE: OnceCell<DatabaseManager> = OnceCell::new();

/// Initialize the global database singleton
///
/// This should be called once at application startup. Subsequent calls will return an error.
///
/// # Arguments
///
/// * `config` - Database configuration
///
/// # Errors
///
/// Returns an error if:
/// - The database connection cannot be established
/// - The singleton has already been initialized
///
/// # Example
///
/// ```no_run
/// use newton_core::database::{initialize_database, DatabaseConfig};
///
/// #[tokio::main]
/// async fn main() {
///     let config = DatabaseConfig::default();
///     initialize_database(config)
///         .await
///         .expect("Failed to initialize database");
/// }
/// ```
pub async fn initialize_database(config: DatabaseConfig) -> Result<(), DatabaseError> {
    // Fast path: singleton already populated. Validate URL agreement BEFORE opening a
    // second connection pool — saves the cost of the wasted handshake and, more
    // importantly, surfaces misconfiguration deterministically without requiring the
    // new URL to even be reachable.
    if let Some(existing) = DATABASE.get() {
        if existing.url() != config.url {
            warn!(
                existing_url = %existing.url(),
                requested_url = %config.url,
                "Database singleton URL mismatch — rejecting. In-process callers must agree on the DB URL."
            );
            return Err(DatabaseError::SingletonUrlMismatch {
                existing: existing.url().to_string(),
                requested: config.url,
            });
        }
        info!("Global database singleton already initialized with matching URL; reusing");
        return Ok(());
    }

    let manager = DatabaseManager::new(config).await?;
    match DATABASE.set(manager) {
        Ok(()) => {
            info!("Global database singleton initialized");
            Ok(())
        }
        Err(manager) => {
            // Race: another caller won between our `get()` above and `set()`.
            // Validate URL agreement against the winner; if they match, the
            // post-condition holds and we can treat this as success.
            let existing = DATABASE.get().expect("cell was just populated");
            if existing.url() != manager.url() {
                warn!(
                    existing_url = %existing.url(),
                    requested_url = %manager.url(),
                    "Database singleton URL mismatch detected on race — rejecting."
                );
                return Err(DatabaseError::SingletonUrlMismatch {
                    existing: existing.url().to_string(),
                    requested: manager.url().to_string(),
                });
            }
            info!("Global database singleton raced; both callers agreed on URL");
            Ok(())
        }
    }
}

/// Get the global database singleton
///
/// # Panics
///
/// Panics if the database has not been initialized via `initialize_database()`
///
/// # Example
///
/// ```no_run
/// use newton_core::database::get_database;
///
/// let db = get_database();
/// let pool = db.pool();
/// ```
pub fn get_database() -> &'static DatabaseManager {
    DATABASE
        .get()
        .expect("Database not initialized. Call initialize_database() first.")
}

/// Try to get the global database singleton without panicking
///
/// Returns None if the database has not been initialized.
///
/// # Example
///
/// ```no_run
/// use newton_core::database::try_get_database;
///
/// if let Some(db) = try_get_database() {
///     let pool = db.pool();
///     // use pool
/// }
/// ```
pub fn try_get_database() -> Option<&'static DatabaseManager> {
    DATABASE.get()
}