pg_ripple_http 0.99.0

SPARQL 1.1 Protocol HTTP endpoint for pg_ripple — connects PostgreSQL 18 RDF triple store to the web
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
//! pg_ripple_http — SPARQL 1.1 Protocol HTTP endpoint and Datalog REST API
//! for pg_ripple.
//!
//! Standalone Rust binary that connects to PostgreSQL (with pg_ripple installed)
//! and exposes a W3C-compliant SPARQL HTTP endpoint at `/sparql` plus a full
//! Datalog REST API at `/datalog`.

use std::net::SocketAddr;
use std::sync::Arc;

use axum::http::HeaderValue;
use deadpool_postgres::{Config, Runtime};
use tokio_postgres::NoTls;
use tower_governor::GovernorLayer;
use tower_governor::governor::GovernorConfigBuilder;
use tower_http::cors::{AllowOrigin, CorsLayer};

pub mod arrow_encode;
pub mod common;
pub mod datalog;
pub mod metrics;
pub mod routing;
pub mod spi_bridge;
pub mod stream;

use common::{AppState, env_or};

// ─── Compatibility constants (COMPAT-01, v0.71.0) ────────────────────────────

/// Minimum pg_ripple extension version that this HTTP companion supports.
/// Updated each release to match the previous extension version, allowing
/// a one-version trailing window.
///
/// HTTP-COMPAT-01 (v0.89.0): bumped to 0.88.0 — requires all v0.84–v0.88 features.
///
/// Connections to older extension versions log a prominent warning. The extension
/// is still served (degraded mode) so that rolling upgrades do not hard-fail.
/// Set `PG_RIPPLE_HTTP_STRICT_COMPAT=1` to convert the warning to a fatal startup error.
const COMPATIBLE_EXTENSION_MIN: &str = "0.93.0";

/// Check that the installed pg_ripple extension version is within the known-compatible
/// range for this pg_ripple_http build.  Logs a warning if it is not.
///
/// S13-05 (v0.84.0): When `PG_RIPPLE_HTTP_STRICT_COMPAT=1` is set, a version
/// mismatch causes an immediate `process::exit(1)` instead of a warning.
/// Default is off (backward-compatible degraded-mode behaviour).
async fn check_extension_compatibility(client: &deadpool_postgres::Object) {
    if std::env::var("PG_RIPPLE_HTTP_SKIP_COMPAT_CHECK")
        .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
        .unwrap_or(false)
    {
        tracing::debug!(
            "PG_RIPPLE_HTTP_SKIP_COMPAT_CHECK=1: skipping extension compatibility check"
        );
        return;
    }

    let strict = std::env::var("PG_RIPPLE_HTTP_STRICT_COMPAT")
        .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
        .unwrap_or(false);

    let ext_version = match client
        .query_opt(
            "SELECT extversion FROM pg_extension WHERE extname = 'pg_ripple'",
            &[],
        )
        .await
    {
        Ok(Some(row)) => row.get::<_, String>(0),
        Ok(None) => {
            tracing::warn!(
                "pg_ripple extension not found in pg_extension catalog — \
                 compatibility check skipped"
            );
            return;
        }
        Err(e) => {
            tracing::warn!("could not query pg_ripple extension version: {e}");
            return;
        }
    };

    tracing::info!(
        ext_version = %ext_version,
        min_supported = %COMPATIBLE_EXTENSION_MIN,
        "pg_ripple extension compatibility check"
    );

    if semver_lt(&ext_version, COMPATIBLE_EXTENSION_MIN) {
        if strict {
            tracing::error!(
                ext_version = %ext_version,
                min_supported = %COMPATIBLE_EXTENSION_MIN,
                "PG_RIPPLE_HTTP_STRICT_COMPAT=1: extension version is below minimum — aborting"
            );
            std::process::exit(1);
        }
        tracing::warn!(
            ext_version = %ext_version,
            min_supported = %COMPATIBLE_EXTENSION_MIN,
            "pg_ripple extension version is below the minimum supported by this pg_ripple_http \
             build — some features may not work correctly. \
             Upgrade the extension with: ALTER EXTENSION pg_ripple UPDATE; \
             or set PG_RIPPLE_HTTP_SKIP_COMPAT_CHECK=1 to suppress this warning. \
             Set PG_RIPPLE_HTTP_STRICT_COMPAT=1 to make this a fatal startup error."
        );
    }
}

/// Returns `true` when `version` < `min` using simple major.minor.patch comparison.
/// Falls back to `false` (no warning) if either string cannot be parsed.
fn semver_lt(version: &str, min: &str) -> bool {
    parse_semver(version)
        .zip(parse_semver(min))
        .map(|(v, m)| v < m)
        .unwrap_or(false)
}

/// Parse a "major.minor.patch" string into a comparable tuple.
fn parse_semver(s: &str) -> Option<(u32, u32, u32)> {
    let mut parts = s.splitn(3, '.');
    let major = parts.next()?.parse::<u32>().ok()?;
    let minor = parts.next()?.parse::<u32>().ok()?;
    let patch = parts.next()?.split('-').next()?.parse::<u32>().ok()?;
    Some((major, minor, patch))
}

// ─── Main ────────────────────────────────────────────────────────────────────

#[tokio::main]
async fn main() {
    // O13-04 (v0.86.0): respect RUST_LOG_FORMAT=json for structured log output.
    let log_format = std::env::var("RUST_LOG_FORMAT").unwrap_or_default();
    let env_filter = tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| {
        "pg_ripple_http=info".parse().unwrap_or_else(|e| {
            eprintln!("error parsing log filter: {e}");
            std::process::exit(1);
        })
    });
    if log_format.eq_ignore_ascii_case("json") {
        tracing_subscriber::fmt()
            .json()
            .with_env_filter(env_filter)
            .init();
    } else {
        tracing_subscriber::fmt().with_env_filter(env_filter).init();
    }

    // Accept database URL from command-line argument (first positional arg) or environment variable
    let pg_url = {
        let args: Vec<String> = std::env::args().collect();
        if args.len() > 1 {
            args[1].clone()
        } else {
            env_or("PG_RIPPLE_HTTP_PG_URL", "postgresql://localhost/postgres")
        }
    };
    let port: u16 = match env_or("PG_RIPPLE_HTTP_PORT", "7878").parse() {
        Ok(p) => p,
        Err(e) => {
            tracing::error!("PG_RIPPLE_HTTP_PORT must be a valid port number: {e}");
            std::process::exit(1);
        }
    };
    let pool_size: usize = match env_or("PG_RIPPLE_HTTP_POOL_SIZE", "16").parse() {
        Ok(n) => n,
        Err(e) => {
            tracing::error!("PG_RIPPLE_HTTP_POOL_SIZE must be a positive integer: {e}");
            std::process::exit(1);
        }
    };
    let auth_token = std::env::var("PG_RIPPLE_HTTP_AUTH_TOKEN").ok();
    let datalog_write_token = std::env::var("PG_RIPPLE_HTTP_DATALOG_WRITE_TOKEN").ok();
    let rate_limit: u32 = match env_or("PG_RIPPLE_HTTP_RATE_LIMIT", "100").parse() {
        Ok(r) => r,
        Err(e) => {
            tracing::error!("PG_RIPPLE_HTTP_RATE_LIMIT must be a non-negative integer: {e}");
            std::process::exit(1);
        }
    };
    // CORS origins — empty string means no cross-origin access; "*" requires explicit opt-in.
    let cors_origins = env_or("PG_RIPPLE_HTTP_CORS_ORIGINS", "");
    // Body limit — default 10 MiB.
    let max_body_bytes: usize = match env_or("PG_RIPPLE_HTTP_MAX_BODY_BYTES", "10485760").parse() {
        Ok(n) => n,
        Err(e) => {
            tracing::error!("PG_RIPPLE_HTTP_MAX_BODY_BYTES must be a positive integer: {e}");
            std::process::exit(1);
        }
    };
    // Trust proxy: comma-separated list of upstream IP/CIDR values trusted for X-Forwarded-For.
    let trust_proxy = std::env::var("PG_RIPPLE_HTTP_TRUST_PROXY").ok();

    // ── v0.46.0: CA-bundle for outbound TLS (PG_RIPPLE_HTTP_CA_BUNDLE) ───────
    // If set, load the PEM file at the given path as the trust anchor for all
    // outbound TLS connections (SERVICE federation, SPARQL endpoint queries).
    // Falls back to the system trust store on error; never silently ignores.
    if let Ok(ca_path) = std::env::var("PG_RIPPLE_HTTP_CA_BUNDLE") {
        match std::fs::read_to_string(&ca_path) {
            Ok(pem) if !pem.trim().is_empty() && pem.contains("BEGIN CERTIFICATE") => {
                tracing::info!("PG_RIPPLE_HTTP_CA_BUNDLE: loaded CA bundle from {ca_path}");
                // Store as a thread-local so outbound HTTP clients can access it.
                // Actual TLS configuration is applied when building reqwest clients
                // inside federation handlers.
                // SAFETY: called once during single-threaded startup before any
                // worker threads are spawned, so no concurrent reads of the env.
                unsafe { std::env::set_var("PG_RIPPLE_HTTP_CA_PEM", pem) };
            }
            Ok(_) => {
                tracing::error!(
                    "PG_RIPPLE_HTTP_CA_BUNDLE: file at '{ca_path}' is not a valid PEM bundle \
                     (no 'BEGIN CERTIFICATE' marker) — falling back to system trust store"
                );
            }
            Err(e) => {
                tracing::error!(
                    "PG_RIPPLE_HTTP_CA_BUNDLE: cannot read '{ca_path}': {e} \
                     — falling back to system trust store"
                );
            }
        }
    }

    // ── v0.51.0: TLS certificate-fingerprint pinning ─────────────────────────
    // PG_RIPPLE_HTTP_PIN_FINGERPRINTS: comma-separated SHA-256 hex fingerprints
    // of trusted TLS server certificates.  When set, any outbound TLS connection
    // (federation proxying, future /sparql/stream upstream calls) is rejected if
    // the peer certificate fingerprint is not in this list.  Stored in the env so
    // downstream client builders can pick it up without a separate config channel.
    if let Ok(fps) = std::env::var("PG_RIPPLE_HTTP_PIN_FINGERPRINTS") {
        let count = fps.split(',').filter(|s| !s.trim().is_empty()).count();
        if count == 0 {
            tracing::warn!(
                "PG_RIPPLE_HTTP_PIN_FINGERPRINTS is set but contains no valid fingerprints \
                 — pinning is disabled"
            );
        } else {
            tracing::info!(
                "PG_RIPPLE_HTTP_PIN_FINGERPRINTS: {count} pinned certificate fingerprint(s) loaded"
            );
        }
    }

    // Build connection pool.
    let mut cfg = Config::new();
    cfg.url = Some(pg_url.clone());
    cfg.pool = Some(deadpool_postgres::PoolConfig::new(pool_size));

    let pool = match cfg.create_pool(Some(Runtime::Tokio1), NoTls) {
        Ok(p) => p,
        Err(e) => {
            tracing::error!("failed to create PostgreSQL connection pool: {e}");
            std::process::exit(1);
        }
    };

    // Verify connectivity.
    {
        let client = match pool.get().await {
            Ok(c) => c,
            Err(e) => {
                tracing::error!(
                    "failed to connect to PostgreSQL — check PG_RIPPLE_HTTP_PG_URL: {e}"
                );
                std::process::exit(1);
            }
        };
        let row = match client
            .query_one("SELECT pg_ripple.triple_count()", &[])
            .await
        {
            Ok(r) => r,
            Err(e) => {
                tracing::error!("pg_ripple extension not available — is it installed? ({e})");
                std::process::exit(1);
            }
        };
        let count: i64 = row.get(0);
        tracing::info!(
            "connected to {pg_url} (port {port}), triple store contains {count} triples"
        );

        // COMPAT-01 (v0.71.0): verify that the installed pg_ripple extension is within
        // the compatible range for this pg_ripple_http build.
        check_extension_compatibility(&client).await;
    }

    // rate_limit is consumed by the governor layer below; not stored in AppState.
    // S13-03 (v0.86.0): compute cors_is_permissive before building AppState.
    let cors_is_permissive = cors_origins == "*";
    let state = Arc::new(AppState {
        pool,
        auth_token,
        datalog_write_token,
        trust_proxy,
        metrics: metrics::Metrics::new(),
        ever_connected: std::sync::atomic::AtomicBool::new(false),
        arrow_flight_secret: std::env::var("ARROW_FLIGHT_SECRET").ok(),
        // FLIGHT-SEC-01: unsigned tickets allowed only in dev mode.
        arrow_unsigned_tickets_allowed: std::env::var("ARROW_UNSIGNED_TICKETS_ALLOWED")
            .map(|v| v.eq_ignore_ascii_case("true") || v == "1")
            .unwrap_or(false),
        // FLIGHT-NONCE-01 (v0.72.0): nonce replay protection cache.
        arrow_nonce_cache: dashmap::DashMap::new(),
        arrow_nonce_cache_max: std::env::var("ARROW_NONCE_CACHE_MAX")
            .ok()
            .and_then(|v| v.parse::<usize>().ok())
            .unwrap_or(10_000),
        // S13-03 (v0.86.0): CORS permissive mode tracking.
        cors_is_permissive,
    });

    // CORS layer — wildcard "*" requires explicit opt-in; empty means deny all cross-origin.
    // S13-03 (v0.86.0): track whether permissive CORS is enabled so the metrics counter can
    // be incremented per-request by middleware. The state is passed into build_router.
    let cors = if cors_is_permissive {
        tracing::warn!(
            "CORS is permissive (*). Set PG_RIPPLE_HTTP_CORS_ORIGINS to a comma-separated list of allowed origins for production use. \
             Monitor pg_ripple_http_cors_permissive_requests_total for cross-origin traffic."
        );
        CorsLayer::permissive()
    } else if cors_origins.is_empty() {
        // No cross-origin access.
        CorsLayer::new()
    } else {
        let origins: Vec<HeaderValue> = cors_origins
            .split(',')
            .filter_map(|o| o.trim().parse().ok())
            .collect();
        CorsLayer::new().allow_origin(AllowOrigin::list(origins))
    };

    // Build the rate-limiting layer (governor) if a rate limit is configured.
    // governor operates per source IP; 0 means unlimited.
    let mut app = routing::build_router(state.clone(), max_body_bytes, cors);

    if rate_limit > 0 {
        let governor_conf = match GovernorConfigBuilder::default()
            .per_second(rate_limit as u64)
            .burst_size(rate_limit)
            .finish()
        {
            Some(c) => c,
            None => {
                tracing::error!("invalid governor rate-limit configuration");
                std::process::exit(1);
            }
        };
        app = app.layer(GovernorLayer::new(Arc::new(governor_conf)));
    }

    let addr = SocketAddr::from(([0, 0, 0, 0], port));
    tracing::info!("pg_ripple_http listening on http://{addr}");

    let listener = match tokio::net::TcpListener::bind(addr).await {
        Ok(l) => l,
        Err(e) => {
            tracing::error!("failed to bind TCP listener on {addr}: {e}");
            std::process::exit(1);
        }
    };

    // O13-05 (v0.86.0): graceful shutdown on SIGTERM with a configurable drain window.
    // HTTP-05 (v0.92.0): PG_RIPPLE_HTTP_SHUTDOWN_TIMEOUT_SECS configures the drain
    // timeout (default 30 seconds). Set to 0 to disable draining and exit immediately.
    // axum::serve().with_graceful_shutdown() waits for in-flight requests to complete
    // before the process exits; SIGINT (Ctrl-C) also triggers the same shutdown path.
    if let Err(e) = axum::serve(
        listener,
        app.into_make_service_with_connect_info::<SocketAddr>(),
    )
    .with_graceful_shutdown(shutdown_signal())
    .await
    {
        tracing::error!("server error: {e}");
        std::process::exit(1);
    }
}

/// Wait for SIGTERM or SIGINT, then return to trigger graceful shutdown.
///
/// O13-05 (v0.86.0): allows in-flight requests up to the configured timeout to complete
/// after a SIGTERM is received before the process exits.
/// HTTP-05 (v0.92.0): timeout is configurable via `PG_RIPPLE_HTTP_SHUTDOWN_TIMEOUT_SECS`
/// (default 30). Set to 0 to exit immediately without draining in-flight requests.
async fn shutdown_signal() {
    use tokio::signal;

    let shutdown_timeout_secs: u64 = std::env::var("PG_RIPPLE_HTTP_SHUTDOWN_TIMEOUT_SECS")
        .ok()
        .and_then(|s| s.parse().ok())
        .unwrap_or(30);

    let ctrl_c = async {
        signal::ctrl_c()
            .await
            .expect("failed to install Ctrl+C handler");
    };

    #[cfg(unix)]
    let terminate = async {
        signal::unix::signal(signal::unix::SignalKind::terminate())
            .expect("failed to install SIGTERM handler")
            .recv()
            .await;
    };

    #[cfg(not(unix))]
    let terminate = std::future::pending::<()>();

    tokio::select! {
        () = ctrl_c => {
            tracing::info!(
                "received Ctrl+C, initiating graceful shutdown ({shutdown_timeout_secs}s drain)"
            );
        }
        () = terminate => {
            tracing::info!(
                "received SIGTERM, initiating graceful shutdown ({shutdown_timeout_secs}s drain)"
            );
        }
    }

    // Allow up to configured timeout for in-flight requests to drain.
    if shutdown_timeout_secs > 0 {
        tokio::time::sleep(std::time::Duration::from_secs(shutdown_timeout_secs)).await;
    }
}