newton-core 0.4.17

newton protocol core sdk
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
//! Database connection management with deadpool-postgres
//!
//! This module provides a robust database connection pool manager using deadpool-postgres
//! for efficient connection pooling, automatic reconnection, and tight tokio integration.

/// API key management and authentication
pub mod api_keys;
/// Encrypted data references for privacy-preserving policy evaluation, identity data, and secrets
pub mod encrypted_data_refs;
/// Permission definitions and enforcement
pub mod permissions;

use deadpool_postgres::{Config, ManagerConfig, Pool, RecyclingMethod, Runtime};
use once_cell::sync::OnceCell;
use sqlx::{
    postgres::{PgConnectOptions, PgPoolOptions, PgSslMode},
    PgPool,
};
use std::{str::FromStr, time::Duration};
use thiserror::Error;
use tracing::{info, warn};

// Re-export commonly used types
pub use api_keys::{redact_bearer, ApiKeyRecord, ApiKeyRepository, ApiKeyUpdate};
pub use encrypted_data_refs::{DataType, EncryptedDataRefRecord, EncryptedDataRefRepository};
pub use permissions::ApiPermission;

/// TLS mode for database connections.
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub enum SslMode {
    /// Require TLS; fail if the server doesn't support it.
    #[default]
    Require,
    /// Prefer TLS but fall back to cleartext (for test containers without certs).
    Prefer,
}

/// Configuration for database connection pooling
#[derive(Debug, Clone)]
pub struct DatabaseConfig {
    /// Database connection URL (e.g., postgres://user:pass@host:port/db)
    pub url: String,
    /// Maximum number of connections in the pool
    pub max_connections: u32,
    /// Minimum number of idle connections in the pool
    pub min_connections: u32,
    /// Connection timeout duration
    pub connect_timeout: Duration,
    /// TLS mode for the connection
    pub ssl_mode: SslMode,
}

impl Default for DatabaseConfig {
    fn default() -> Self {
        Self {
            url: "postgres://newton:newton@localhost:5432/newton_gateway".to_string(),
            max_connections: 20,
            min_connections: 5,
            connect_timeout: Duration::from_secs(30),
            ssl_mode: SslMode::default(),
        }
    }
}

/// Errors that can occur during database operations
#[derive(Error, Debug)]
pub enum DatabaseError {
    /// Failed to create the database connection pool
    #[error("Failed to create database pool: {0}")]
    PoolCreation(String),
    /// Failed to establish a database connection
    #[error("Failed to connect to database: {0}")]
    Connection(String),
    /// Database query execution error
    #[error("Database query error: {0}")]
    Query(String),
    /// Database transaction error
    #[error("Database transaction error: {0}")]
    Transaction(String),
    /// A prior caller already initialized the singleton with a different URL.
    /// Indicates two components in the same process disagree on which DB to use —
    /// typically a test harness where operator and gateway build independent configs
    /// and one was not overridden. In production each binary owns its process and
    /// this never fires.
    #[error(
        "Database singleton already initialized with a different URL. existing={existing}, requested={requested}. \
         Both callers in this process must agree on the connection URL."
    )]
    SingletonUrlMismatch {
        /// URL the singleton was first initialized with
        existing: String,
        /// URL the current caller is trying to initialize with
        requested: String,
    },
}

/// Database manager providing robust connection pooling with deadpool-postgres
///
/// This manager wraps both deadpool-postgres (for connection management) and sqlx
/// (for query execution), providing a unified interface for database operations.
#[derive(Clone)]
pub struct DatabaseManager {
    /// Connection URL the manager was initialized with. Used by `initialize_database`
    /// to detect singleton URL mismatches across in-process callers.
    url: String,
    /// Deadpool connection pool for robust connection management
    deadpool: Pool,
    /// SQLx connection pool for query execution
    sqlx_pool: PgPool,
}

impl DatabaseManager {
    /// Creates a new database manager with the given configuration
    ///
    /// # Arguments
    ///
    /// * `config` - Database configuration
    ///
    /// # Errors
    ///
    /// Returns an error if the database connection cannot be established
    pub async fn new(config: DatabaseConfig) -> Result<Self, DatabaseError> {
        info!(
            "Initializing database manager (max_connections: {}, min_connections: {})",
            config.max_connections, config.min_connections
        );

        // Parse connection URL for deadpool configuration
        let pg_config = config
            .url
            .parse::<deadpool_postgres::tokio_postgres::Config>()
            .map_err(|e| DatabaseError::Connection(format!("Invalid connection URL: {}", e)))?;

        // Configure deadpool manager
        let manager_config = ManagerConfig {
            recycling_method: RecyclingMethod::Fast,
        };

        // Create deadpool configuration
        let mut deadpool_config = Config::new();
        deadpool_config.host = pg_config.get_hosts().first().and_then(|h| match h {
            deadpool_postgres::tokio_postgres::config::Host::Tcp(host) => Some(host.clone()),
            _ => None,
        });
        deadpool_config.port = pg_config.get_ports().first().copied();
        deadpool_config.user = pg_config.get_user().map(String::from);
        deadpool_config.password = pg_config
            .get_password()
            .map(|p: &[u8]| String::from_utf8_lossy(p).to_string());
        deadpool_config.dbname = pg_config.get_dbname().map(String::from);
        deadpool_config.manager = Some(manager_config);
        deadpool_config.pool = Some(deadpool_postgres::PoolConfig::new(config.max_connections as usize));

        // Create deadpool
        let deadpool = deadpool_config
            .create_pool(Some(Runtime::Tokio1), deadpool_postgres::tokio_postgres::NoTls)
            .map_err(|e| DatabaseError::PoolCreation(format!("Failed to create deadpool: {}", e)))?;

        // When configured for Require, reject URLs that explicitly downgrade TLS.
        if config.ssl_mode == SslMode::Require {
            reject_downgraded_sslmode(&config.url)?;
        }

        let pg_ssl_mode = match config.ssl_mode {
            SslMode::Require => PgSslMode::Require,
            SslMode::Prefer => PgSslMode::Prefer,
        };

        let pg_connect_options = PgConnectOptions::from_str(&config.url)
            .map_err(|e| DatabaseError::Connection(format!("Invalid SQLx connection URL: {}", e)))?
            .ssl_mode(pg_ssl_mode);
        info!("Database TLS: sslmode={:?}", config.ssl_mode);

        let sqlx_pool = PgPoolOptions::new()
            .max_connections(config.max_connections)
            .min_connections(config.min_connections)
            .acquire_timeout(config.connect_timeout)
            .connect_with(pg_connect_options)
            .await
            .map_err(|e| DatabaseError::Connection(format!("Failed to connect with SQLx: {}", e)))?;

        info!("Database manager initialized successfully");

        Ok(Self {
            url: config.url,
            deadpool,
            sqlx_pool,
        })
    }

    /// Returns the connection URL this manager was initialized with.
    pub fn url(&self) -> &str {
        &self.url
    }

    /// Gets a connection from the deadpool
    ///
    /// This is useful for operations that need direct access to tokio_postgres
    pub async fn get_connection(&self) -> Result<deadpool_postgres::Client, DatabaseError> {
        self.deadpool
            .get()
            .await
            .map_err(|e| DatabaseError::Connection(format!("Failed to get connection: {}", e)))
    }

    /// Gets the SQLx pool for query execution
    ///
    /// This is the primary interface for executing queries using sqlx
    pub fn pool(&self) -> &PgPool {
        &self.sqlx_pool
    }

    /// Gets the deadpool for advanced connection management
    pub fn deadpool(&self) -> &Pool {
        &self.deadpool
    }

    /// Returns the current pool size statistics
    pub fn pool_stats(&self) -> PoolStats {
        PoolStats {
            size: self.sqlx_pool.size(),
            idle: self.sqlx_pool.num_idle(),
            max_size: self.sqlx_pool.options().get_max_connections(),
        }
    }

    /// Performs a health check by executing a simple query
    pub async fn health_check(&self) -> Result<(), DatabaseError> {
        sqlx::query("SELECT 1")
            .execute(&self.sqlx_pool)
            .await
            .map_err(|e| DatabaseError::Query(format!("Health check failed: {}", e)))?;
        Ok(())
    }
}

/// Reject database URLs that explicitly downgrade TLS via `sslmode`. Looks
/// only at the query string, lowercased; absent param is fine because we
/// override to Require explicitly. Conservative: parse uses `PgConnectOptions`
/// downstream, but we want a deterministic fail-fast before any pool work.
fn reject_downgraded_sslmode(url: &str) -> Result<(), DatabaseError> {
    let query = url.split_once('?').map(|(_, q)| q).unwrap_or("");
    for pair in query.split('&') {
        let (k, v) = pair.split_once('=').unwrap_or((pair, ""));
        if !k.eq_ignore_ascii_case("sslmode") {
            continue;
        }
        let mode = v.trim().to_ascii_lowercase();
        if matches!(mode.as_str(), "disable" | "allow" | "prefer") {
            return Err(DatabaseError::Connection(format!(
                "DATABASE_URL has sslmode={mode} which downgrades TLS; \
                 use 'require', 'verify-ca', or 'verify-full'"
            )));
        }
    }
    Ok(())
}

#[cfg(test)]
mod sslmode_tests {
    use super::*;

    #[test]
    fn accepts_no_sslmode() {
        reject_downgraded_sslmode("postgres://u:p@h/db").unwrap();
    }

    #[test]
    fn accepts_require() {
        reject_downgraded_sslmode("postgres://u:p@h/db?sslmode=require").unwrap();
        reject_downgraded_sslmode("postgres://u:p@h/db?sslmode=verify-full").unwrap();
        reject_downgraded_sslmode("postgres://u:p@h/db?foo=bar&sslmode=Require").unwrap();
    }

    #[test]
    fn rejects_downgrade() {
        for mode in ["disable", "allow", "prefer", "Prefer"] {
            let url = format!("postgres://u:p@h/db?sslmode={mode}");
            assert!(reject_downgraded_sslmode(&url).is_err(), "should reject {mode}");
        }
    }
}

/// Statistics about the database connection pool
#[derive(Debug, Clone, Copy)]
pub struct PoolStats {
    /// Current number of connections in the pool
    pub size: u32,
    /// Number of idle connections
    pub idle: usize,
    /// Maximum pool size
    pub max_size: u32,
}

impl std::fmt::Debug for DatabaseManager {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        let stats = self.pool_stats();
        f.debug_struct("DatabaseManager")
            .field("pool_size", &stats.size)
            .field("idle_connections", &stats.idle)
            .field("max_connections", &stats.max_size)
            .finish()
    }
}

/// Global database singleton instance
static DATABASE: OnceCell<DatabaseManager> = OnceCell::new();

/// Initialize the global database singleton
///
/// This should be called once at application startup. Subsequent calls will return an error.
///
/// # Arguments
///
/// * `config` - Database configuration
///
/// # Errors
///
/// Returns an error if:
/// - The database connection cannot be established
/// - The singleton has already been initialized
///
/// # Example
///
/// ```no_run
/// use newton_core::database::{initialize_database, DatabaseConfig};
///
/// #[tokio::main]
/// async fn main() {
///     let config = DatabaseConfig::default();
///     initialize_database(config)
///         .await
///         .expect("Failed to initialize database");
/// }
/// ```
pub async fn initialize_database(config: DatabaseConfig) -> Result<(), DatabaseError> {
    // Fast path: singleton already populated. Validate URL agreement BEFORE opening a
    // second connection pool — saves the cost of the wasted handshake and, more
    // importantly, surfaces misconfiguration deterministically without requiring the
    // new URL to even be reachable.
    if let Some(existing) = DATABASE.get() {
        if existing.url() != config.url {
            warn!(
                existing_url = %existing.url(),
                requested_url = %config.url,
                "Database singleton URL mismatch — rejecting. In-process callers must agree on the DB URL."
            );
            return Err(DatabaseError::SingletonUrlMismatch {
                existing: existing.url().to_string(),
                requested: config.url,
            });
        }
        info!("Global database singleton already initialized with matching URL; reusing");
        return Ok(());
    }

    let manager = DatabaseManager::new(config).await?;
    match DATABASE.set(manager) {
        Ok(()) => {
            info!("Global database singleton initialized");
            Ok(())
        }
        Err(manager) => {
            // Race: another caller won between our `get()` above and `set()`.
            // Validate URL agreement against the winner; if they match, the
            // post-condition holds and we can treat this as success.
            let existing = DATABASE.get().expect("cell was just populated");
            if existing.url() != manager.url() {
                warn!(
                    existing_url = %existing.url(),
                    requested_url = %manager.url(),
                    "Database singleton URL mismatch detected on race — rejecting."
                );
                return Err(DatabaseError::SingletonUrlMismatch {
                    existing: existing.url().to_string(),
                    requested: manager.url().to_string(),
                });
            }
            info!("Global database singleton raced; both callers agreed on URL");
            Ok(())
        }
    }
}

/// Get the global database singleton
///
/// # Panics
///
/// Panics if the database has not been initialized via `initialize_database()`
///
/// # Example
///
/// ```no_run
/// use newton_core::database::get_database;
///
/// let db = get_database();
/// let pool = db.pool();
/// ```
pub fn get_database() -> &'static DatabaseManager {
    DATABASE
        .get()
        .expect("Database not initialized. Call initialize_database() first.")
}

/// Try to get the global database singleton without panicking
///
/// Returns None if the database has not been initialized.
///
/// # Example
///
/// ```no_run
/// use newton_core::database::try_get_database;
///
/// if let Some(db) = try_get_database() {
///     let pool = db.pool();
///     // use pool
/// }
/// ```
pub fn try_get_database() -> Option<&'static DatabaseManager> {
    DATABASE.get()
}