athena_rs 3.18.0

Hyper performant polyglot Database driver
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
//! Athena public Rust surface for gateway-routed queries, direct backend access,
//! provisioning helpers, and worker utilities.
//!
//! Most downstream callers start with [`client::AthenaClient`]. The client can
//! run in two modes:
//!
//! - Gateway-routed mode: set a logical client name and Athena forwards
//!   requests through the gateway using scoped API-key auth.
//! - Direct mode: omit the client name and Athena connects directly to
//!   PostgreSQL, Supabase, or Scylla depending on [`client::backend::BackendType`].
//!
//! # Quick start
//!
//! ```no_run
//! use athena_rs::client::AthenaClient;
//!
//! #[tokio::main]
//! async fn main() -> Result<(), athena_rs::client::backend::BackendError> {
//!     let client = AthenaClient::new(
//!         "http://localhost:8080",
//!         "ath_public.secret",
//!         "reporting",
//!     )
//!     .await?;
//!
//!     let rows = client
//!         .fetch("users")
//!         .where_eq("status", "active")
//!         .limit(25)
//!         .execute()
//!         .await?;
//!
//!     let _ = rows;
//!     Ok(())
//! }
//! ```
//!
//! # Backend behavior
//!
//! When `client_name` is configured, Athena always uses the gateway transport
//! and `backend_type` selects translation behavior:
//!
//! - `Native`, `PostgreSQL`, `Neon`: SQL goes to the canonical SQL gateway
//!   route.
//! - `Supabase`, `Postgrest`: CRUD operations are translated into PostgREST
//!   style gateway requests.
//! - `Scylla`: CQL is sent through the gateway using the Athena/Scylla driver.
//! - `D1`: flat CRUD and SQLite-safe raw SQL are translated into gateway SQL
//!   requests that Athena forwards to the dedicated Cloudflare D1 Worker proxy.
//!
//! Without `client_name`, Athena switches to direct backends:
//!
//! - `PostgreSQL`, `Native`, `Neon`, `Postgrest`: direct `sqlx` PostgreSQL.
//! - `Supabase`: direct PostgREST GET support only.
//! - `Scylla`: direct Scylla CQL.
//! - `D1`: direct mode is intentionally unsupported in Athena V1; D1 always
//!   resolves through catalog metadata plus the Worker proxy.
//!
//! # Security guarantees
//!
//! - Builder-based CRUD APIs validate and quote SQL identifiers through
//!   `parser::query_builder::sanitize_identifier` and
//!   `parser::query_builder::sanitize_qualified_table_identifier`. Invalid
//!   identifiers fail before Athena synthesizes SQL.
//! - Builder filter values are bound or encoded as typed values instead of
//!   interpolated into generated SQL strings.
//! - Raw SQL APIs such as [`client::AthenaClient::sql`],
//!   [`client::AthenaClient::execute_sql`], and
//!   [`client::GatewaySqlRequest`] forward operator-authored SQL verbatim. They
//!   are outside the identifier-sanitization and value-binding guarantees and
//!   should not be built from untrusted input.
//! - Update and delete builders reject unfiltered mutations unless the caller
//!   explicitly opts in with `unsafe_unfiltered()`.
//! - Gateway mode always sends `X-Athena-Client` plus API-key credentials; the
//!   Rust SDK does not invent tenant boundaries locally. Tenant isolation still
//!   lives on the server through scoped keys, resolved client context, and auth
//!   policy.
//! - Public `Debug` output on client configuration surfaces redacts connection
//!   secrets.
//!
//! # Safe defaults
//!
//! - [`client::config::ConnectionConfig::new`] enables TLS by default.
//! - [`client::config::HealthConfig::default`] enables health checks and circuit
//!   breaker tracking.
//! - [`AppState::default`] starts gateway auth and admission policy in
//!   `fail_closed` mode and keeps direct JDBC access to private hosts disabled.

// re-exports intentionally minimal in lib
use athena_pipelines::PipelineDefinition;
use deadpool_postgres::Pool;
use moka::future::Cache;
use reqwest::Client;
use serde_json::Value;
use sqlx::postgres::PgPool;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{OnceCell, Semaphore};

use crate::api::metrics::MetricsState;

#[cfg(feature = "deadpool_experimental")]
use crate::drivers::postgresql::deadpool_registry::DeadpoolPostgresRegistry;
use crate::drivers::postgresql::sqlx_driver::PostgresClientRegistry;

use crate::api::gateway::insert::{InsertWindowCoordinator, InsertWindowSettings};
use crate::deferred_write::{DeferredWriteConfig, WalManager, WriteBuffer};
use crate::utils::client_stats_batcher::ClientStatsBatcher;
use crate::utils::linux_gateway_file_log::LinuxGatewayFileLog;
use crate::utils::request_logging::GatewayLogBatcher;

/// REST API surface, endpoints and request handlers.
pub mod api;
/// Athena-owned contracts and backend resolution helpers.
pub mod athena;
/// Reverse proxy that forwards requests to the configured target services.
pub mod bootstrap;
pub mod cli;
/// Athena client SDK with unified query helpers.
pub mod client;
/// Utils for configuration
pub mod config;
pub mod config_validation;
pub mod daemon;
/// Data access helpers and Supabase clients used by endpoints.
pub mod data;
/// Database driver integrations (Scylla/Athena, PostgreSQL, Supabase).
pub mod drivers;
/// Error processing and sanitization utilities.
pub mod error;
/// Parsers and helpers related to SQL processing.
pub mod parser;
pub mod provisioning;
pub mod runtime;

/// HTTP routing helpers and target URL determination.
pub mod webhooks;
/// Background worker tasks (outbox relay, etc.).
pub mod workers;
pub mod wss;

pub mod features;

/// Deferred write-through cache: buffer mutations, flush to DB asynchronously.
pub mod deferred_write;

#[cfg(feature = "cdc")]
pub mod cdc;

/// Shared Actix application state.
pub struct AppState {
    /// Short‑lived response cache for JSON payloads.
    pub cache: Arc<Cache<String, Value>>,
    /// Non‑expiring cache for values that should persist across requests.
    pub immortal_cache: Arc<Cache<String, Value>>,
    /// Shared Reqwest client used by the proxy and endpoints.
    pub client: Client,
    /// Unix timestamp when the process started.
    pub process_start_time_seconds: i64,
    /// Monotonic clock used for uptime calculations.
    pub process_started_at: Instant,
    /// Resolved config path used during startup, when known.
    pub runtime_config_path: Option<String>,
    /// Human-readable label describing how the startup config path was chosen.
    pub runtime_config_source_label: Option<String>,
    /// Whether startup had to seed a default config file before loading.
    pub runtime_config_seeded_default: bool,
    /// Redacted startup config snapshot for privileged operator debug routes.
    pub runtime_debug_config_snapshot: Value,
    /// Effective CORS allow-any-origin flag resolved at startup.
    pub cors_allow_any_origin: bool,
    /// Effective CORS allowlist resolved at startup.
    pub cors_allowed_origins: Vec<String>,
    /// Registry of Postgres connections keyed by logical client name.
    pub pg_registry: Arc<PostgresClientRegistry>,
    /// Cache of Postgres pools created from X-JDBC-URL for direct connections.
    pub jdbc_pool_cache: Arc<Cache<String, PgPool>>,
    /// Short-lived relation-kind cache used by insert view guards (`client:schema.table` -> `relkind`).
    pub gateway_insert_relation_kind_cache: Arc<Cache<String, Option<String>>>,
    /// Local in-process cache of presigned file URLs after authorization passes.
    pub storage_file_url_cache: Arc<Cache<String, athena_s3::PresignedUrlCacheRecord>>,
    /// Experimental deadpool-backed registry of Postgres pools keyed by logical client name.
    #[cfg(feature = "deadpool_experimental")]
    pub deadpool_registry: Arc<DeadpoolPostgresRegistry>,
    /// Experimental deadpool-backed cache for direct JDBC pools (singleflight via OnceCell).
    #[cfg(feature = "deadpool_experimental")]
    pub jdbc_deadpool_cache: Arc<Cache<String, Arc<OnceCell<Pool>>>>,
    /// When true, gateway fetch routes force camelCase column names to snake_case.
    pub gateway_force_camel_case_to_snake_case: bool,
    /// When true, UUID-like filter values are explicitly cast to text when the
    /// generated SQL also casts the compared column to text.
    pub gateway_auto_cast_uuid_filter_values_to_text: bool,
    /// When true, `public.table` table references are normalized to `table`
    /// for information_schema lookups.
    pub gateway_allow_schema_names_prefixed_as_table_name: bool,
    /// Optional registry of prebuilt pipeline definitions by name.
    pub pipeline_registry: Option<Arc<HashMap<String, PipelineDefinition>>>,
    /// Optional Postgres client (by name) that receives gateway logs.
    pub logging_client_name: Option<String>,
    /// Optional Postgres client (by name) that stores API keys and auth configuration.
    pub gateway_auth_client_name: Option<String>,
    /// Logical Postgres client name used for gateway benchmark tooling (`cargo bench` E2E, etc.).
    ///
    /// Configured under `gateway.benchmark_client` and surfaced for health/metrics alongside
    /// logging and auth client configuration.
    pub gateway_benchmark_client_name: Option<String>,
    /// Whether Athena may resolve Postgres clients dynamically from `athena_clients`.
    pub gateway_database_backed_client_loading_enabled: bool,
    /// Gateway auth fail mode: `fail_closed` or `fail_open`.
    pub gateway_api_key_fail_mode: String,
    /// Whether direct JDBC connections to private/local hosts are allowed.
    pub gateway_jdbc_allow_private_hosts: bool,
    /// Optional host allowlist for direct JDBC connections.
    pub gateway_jdbc_allowed_hosts: Vec<String>,
    /// Gateway resilience: operation timeout in seconds.
    pub gateway_resilience_timeout_secs: u64,
    /// Gateway resilience: max retries for read operations on transient failures.
    pub gateway_resilience_read_max_retries: u32,
    /// Gateway resilience: initial backoff between retries in milliseconds.
    pub gateway_resilience_initial_backoff_ms: u64,
    /// Admission store backend: `memory` or `redis`.
    pub gateway_admission_store_backend: String,
    /// Admission store fail mode: `fail_closed` or `fail_open`.
    pub gateway_admission_store_fail_mode: String,
    /// Flag that enables the Prometheus exporter route when true.
    pub prometheus_metrics_enabled: bool,
    /// Shared in-process metrics registry.
    pub metrics_state: Arc<MetricsState>,
    /// Default insert execution window in ms (`0` = off unless header enables).
    pub gateway_insert_execution_window_ms: u64,

    pub gateway_insert_window_max_batch: usize,
    pub gateway_insert_window_max_queued: usize,
    pub gateway_insert_merge_deny_tables: HashSet<String>,
    /// Background coordinator for optional Postgres `/gateway/insert` windowing.
    pub insert_window_coordinator: Arc<InsertWindowCoordinator>,
    /// Batched `client_statistics` / `client_table_statistics` / `last_seen` writes for the logging pool.
    pub client_stats_batcher: Option<Arc<ClientStatsBatcher>>,
    /// Batched multi-row inserts for gateway request/operation logs.
    pub gateway_log_batcher: Option<Arc<GatewayLogBatcher>>,
    /// Linux-only NDJSON append sink for gateway and API-key audit logs (see `linux_file_logging_*` config).
    pub linux_gateway_file_log: Option<Arc<LinuxGatewayFileLog>>,
    /// Limits concurrent best-effort background logging work that targets the shared logging pool.
    pub logging_task_limiter: Option<Arc<Semaphore>>,
    /// Runtime configuration for the deferred write subsystem.
    pub deferred_write_config: DeferredWriteConfig,
    /// In-memory write buffer for pending deferred mutations.
    ///
    /// `None` when `deferred_write_config.enabled` is `false`.
    pub write_buffer: Option<Arc<WriteBuffer>>,
    /// WAL manager for crash-safe deferred write recovery.
    ///
    /// `None` when WAL is disabled or deferred mode is off.
    pub wal_manager: Option<Arc<WalManager>>,
    /// Optional per-IP rate limit for `/storage/*` (after auth).
    pub inbound_rate_limit_storage: Option<Arc<crate::api::rate_limit::KeyedStringRateLimiter>>,
    pub inbound_rate_limit_schema: Option<Arc<crate::api::rate_limit::KeyedStringRateLimiter>>,
    pub inbound_rate_limit_raw_sql: Option<Arc<crate::api::rate_limit::KeyedStringRateLimiter>>,
    pub inbound_rate_limit_backup_admin:
        Option<Arc<crate::api::rate_limit::KeyedStringRateLimiter>>,
    /// When true, first hop of `X-Forwarded-For` is used as the rate-limit key.
    pub inbound_rate_limit_trust_x_forwarded_for: bool,
    /// When true, `X-Forwarded-For` may be used when resolving client IP for logs and audit.
    pub logging_trust_x_forwarded_for: bool,
    /// Optional global cap on outbound Supabase HTTP requests per second (best-effort).
    pub outbound_rate_limit_supabase: Option<Arc<crate::api::rate_limit::KeyedStringRateLimiter>>,
    /// Allows HTTP (non-TLS) Typesense URLs for local development.
    pub typesense_allow_http: bool,
    /// Enables the optional background Typesense sync worker.
    pub typesense_sync_worker_enabled: bool,
    /// Poll interval for the background Typesense sync worker.
    pub typesense_sync_worker_poll_ms: u64,
    /// Max attempts for retryable Typesense import/backup/rollback requests.
    pub typesense_import_max_attempts: usize,
    /// Base backoff delay in milliseconds for Typesense retry logic.
    pub typesense_import_retry_base_ms: u64,
    /// Enables saga backup snapshots + compensation rollback for sync runs.
    pub typesense_sync_saga_backup_enabled: bool,
    /// Enables the backup schedule/enqueue/execution workers.
    pub backup_worker_enabled: bool,
    /// Poll interval for queued backup/restore execution.
    pub backup_execution_worker_poll_ms: u64,
    /// Poll interval for due backup schedules.
    pub backup_schedule_worker_poll_ms: u64,
    /// Max retry attempts for queued backup/restore jobs.
    pub backup_worker_max_attempts: i32,
    /// Lease TTL in minutes for backup/restore job ownership.
    pub backup_worker_lease_ttl_minutes: i32,
}

impl Default for AppState {
    fn default() -> Self {
        let insert_window_coordinator: Arc<InsertWindowCoordinator> =
            InsertWindowCoordinator::new(InsertWindowSettings {
                max_batch: 100,
                max_queued: 10_000,
                deny_tables: HashSet::new(),
            });
        Self {
            cache: Arc::new(Cache::builder().support_invalidation_closures().build()),
            immortal_cache: Arc::new(Cache::builder().build()),
            client: Client::new(),
            process_start_time_seconds: 0,
            process_started_at: Instant::now(),
            runtime_config_path: None,
            runtime_config_source_label: None,
            runtime_config_seeded_default: false,
            runtime_debug_config_snapshot: Value::Object(Default::default()),
            cors_allow_any_origin: false,
            cors_allowed_origins: Vec::new(),
            pg_registry: Arc::new(PostgresClientRegistry::empty()),
            jdbc_pool_cache: Arc::new(Cache::builder().max_capacity(64).build()),
            gateway_insert_relation_kind_cache: Arc::new(
                Cache::builder()
                    .max_capacity(20_000)
                    .time_to_live(Duration::from_secs(60))
                    .build(),
            ),
            storage_file_url_cache: Arc::new(Cache::builder().max_capacity(20_000).build()),
            #[cfg(feature = "deadpool_experimental")]
            deadpool_registry: Arc::new(DeadpoolPostgresRegistry::empty()),
            #[cfg(feature = "deadpool_experimental")]
            jdbc_deadpool_cache: Arc::new(Cache::builder().max_capacity(4).build()),
            gateway_force_camel_case_to_snake_case: false,
            gateway_auto_cast_uuid_filter_values_to_text: true,
            gateway_allow_schema_names_prefixed_as_table_name: true,
            pipeline_registry: None,
            logging_client_name: None,
            gateway_auth_client_name: None,
            gateway_benchmark_client_name: None,
            gateway_database_backed_client_loading_enabled: true,
            gateway_api_key_fail_mode: "fail_closed".to_string(),
            gateway_jdbc_allow_private_hosts: false,
            gateway_jdbc_allowed_hosts: Vec::new(),
            gateway_resilience_timeout_secs: 30,
            gateway_resilience_read_max_retries: 1,
            gateway_resilience_initial_backoff_ms: 100,
            gateway_admission_store_backend: "redis".to_string(),
            gateway_admission_store_fail_mode: "fail_closed".to_string(),
            prometheus_metrics_enabled: false,
            metrics_state: Arc::new(MetricsState::new()),
            gateway_insert_execution_window_ms: 0,
            gateway_insert_window_max_batch: 100,
            gateway_insert_window_max_queued: 10_000,
            gateway_insert_merge_deny_tables: HashSet::new(),
            insert_window_coordinator,
            client_stats_batcher: None,
            gateway_log_batcher: None,
            linux_gateway_file_log: None,
            logging_task_limiter: None,
            deferred_write_config: DeferredWriteConfig::default(),
            write_buffer: None,
            wal_manager: None,
            inbound_rate_limit_storage: None,
            inbound_rate_limit_schema: None,
            inbound_rate_limit_raw_sql: None,
            inbound_rate_limit_backup_admin: None,
            inbound_rate_limit_trust_x_forwarded_for: false,
            logging_trust_x_forwarded_for: true,
            outbound_rate_limit_supabase: None,
            typesense_allow_http: false,
            typesense_sync_worker_enabled: true,
            typesense_sync_worker_poll_ms: 30_000,
            typesense_import_max_attempts: 3,
            typesense_import_retry_base_ms: 400,
            typesense_sync_saga_backup_enabled: true,
            backup_worker_enabled: true,
            backup_execution_worker_poll_ms: 1_500,
            backup_schedule_worker_poll_ms: 30_000,
            backup_worker_max_attempts: 3,
            backup_worker_lease_ttl_minutes: 15,
        }
    }
}

/// Wrapper for the non‑expiring cache used by some endpoints.
pub struct ImmortalCache {
    /// Non‑expiring cache instance.
    pub cache: Arc<Cache<String, Value>>,
}

/// Miscellaneous utilities (logging, helpers).
pub mod utils;

/// Helpers for integration tests in `tests/*.rs` (holds `ATHENA_KEY_12` lock).
#[doc(hidden)]
pub mod test_support;

pub use client::AthenaClient;
pub use client::backend::BackendType;
pub use client::backend::QueryResult;
pub use client::builder::AthenaClientBuilder;
pub use client::{Condition, ConditionOperator, OrderDirection};
pub use client::{
    Gateway, GatewayDeleteRequest, GatewayDriverRequest, GatewayFetchRequest, GatewayInsertRequest,
    GatewayOperation, GatewayPath, GatewayQueryResult, GatewayRequest, GatewayRequestFactory,
    GatewayRequestPayload, GatewayRoutes, GatewayRpcFilter, GatewayRpcFilterOperator,
    GatewayRpcRequest, GatewaySqlRequest, GatewayUpdateRequest, RpcBuilder, build_gateway_endpoint,
    request,
};

/// Backward-compatible route alias. Prefer `Gateway::FETCH_PATH`.
pub const GATEWAY_FETCH_PATH: &str = Gateway::FETCH_PATH;
/// Backward-compatible route alias. Prefer `Gateway::INSERT_PATH`.
pub const GATEWAY_INSERT_PATH: &str = Gateway::INSERT_PATH;
/// Backward-compatible route alias. Prefer `Gateway::UPDATE_PATH`.
pub const GATEWAY_UPDATE_PATH: &str = Gateway::UPDATE_PATH;
/// Backward-compatible route alias. Prefer `Gateway::DELETE_PATH`.
pub const GATEWAY_DELETE_PATH: &str = Gateway::DELETE_PATH;
/// Backward-compatible route alias. Prefer `Gateway::QUERY_PATH`.
pub const GATEWAY_QUERY_PATH: &str = Gateway::QUERY_PATH;
/// Backward-compatible route alias. Prefer `Gateway::SQL_PATH`.
pub const GATEWAY_SQL_PATH: &str = Gateway::SQL_PATH;
/// Backward-compatible route alias. Prefer `Gateway::RPC_PATH`.
pub const GATEWAY_RPC_PATH: &str = Gateway::RPC_PATH;
/// Backward-compatible route alias. Prefer `Gateway::LEGACY_SQL_PATH`.
pub const LEGACY_SQL_PATH: &str = Gateway::LEGACY_SQL_PATH;