Skip to main content

reddb_server/
grpc.rs

1pub(crate) use crate::application::json_input::{
2    json_bool_field, json_f32_field, json_string_field, json_usize_field,
3};
4pub(crate) use crate::application::{
5    AdminUseCases, CatalogUseCases, CreateEdgeInput, CreateEntityOutput, CreateNodeGraphLinkInput,
6    CreateNodeInput, CreateNodeTableLinkInput, CreateRowInput, CreateVectorInput,
7    DeleteEntityInput, EntityUseCases, ExecuteQueryInput, ExplainQueryInput, GraphCentralityInput,
8    GraphClusteringInput, GraphCommunitiesInput, GraphComponentsInput, GraphCyclesInput,
9    GraphHitsInput, GraphNeighborhoodInput, GraphPersonalizedPageRankInput, GraphShortestPathInput,
10    GraphTopologicalSortInput, GraphTraversalInput, GraphUseCases, InspectNativeArtifactInput,
11    NativeUseCases, PatchEntityInput, QueryUseCases, SearchHybridInput, SearchIvfInput,
12    SearchSimilarInput, SearchTextInput,
13};
14use std::collections::BTreeMap;
15use std::pin::Pin;
16use std::sync::Arc;
17use std::time::{SystemTime, UNIX_EPOCH};
18
19use crate::api::{RedDBOptions, RedDBResult};
20use crate::auth::middleware::{check_permission, AuthResult};
21use crate::auth::store::AuthStore;
22use crate::auth::Role;
23use crate::health::{HealthProvider, HealthState};
24use crate::json::{
25    from_str as json_from_str, to_string as json_to_string, Map, Value as JsonValue,
26};
27use crate::runtime::{
28    RedDBRuntime, RuntimeFilter, RuntimeFilterValue, RuntimeGraphCentralityAlgorithm,
29    RuntimeGraphCentralityResult, RuntimeGraphClusteringResult, RuntimeGraphCommunityAlgorithm,
30    RuntimeGraphCommunityResult, RuntimeGraphComponentsMode, RuntimeGraphComponentsResult,
31    RuntimeGraphCyclesResult, RuntimeGraphDirection, RuntimeGraphHitsResult,
32    RuntimeGraphNeighborhoodResult, RuntimeGraphPathAlgorithm, RuntimeGraphPathResult,
33    RuntimeGraphPattern, RuntimeGraphProjection, RuntimeGraphTopologicalSortResult,
34    RuntimeGraphTraversalResult, RuntimeGraphTraversalStrategy, RuntimeIvfSearchResult,
35    RuntimeQueryResult, RuntimeQueryWeights, RuntimeStats, ScanPage,
36};
37use crate::storage::schema::Value;
38use crate::storage::unified::devx::refs::{NodeRef, TableRef};
39use crate::storage::unified::{Metadata, MetadataValue};
40use crate::storage::{EntityData, EntityId, UnifiedEntity};
41use tokio_stream::wrappers::TcpListenerStream;
42use tonic::metadata::MetadataMap;
43use tonic::{Request, Response, Status};
44
45// gRPC protobuf types and tonic stubs live in the standalone
46// `reddb-grpc-proto` crate so `reddb-server` and `reddb-client`
47// can both consume them without a dependency cycle. We expose
48// them under the legacy `proto` module path so existing
49// `crate::grpc::proto::…` imports keep resolving.
50pub use reddb_grpc_proto as proto;
51
52use proto::red_db_server::{RedDb, RedDbServer};
53use proto::{
54    BatchQueryReply, BatchQueryRequest, BulkEntityReply, CollectionRequest, CollectionsReply,
55    DeleteEntityRequest, DeploymentProfileRequest, Empty, EntityReply, ExecutePreparedRequest,
56    ExportRequest, GraphProjectionUpsertRequest, HealthReply, IndexNameRequest, IndexToggleRequest,
57    JsonBulkCreateRequest, JsonCreateRequest, JsonPayloadRequest, KvWatchEvent, KvWatchRequest,
58    ManifestRequest, OperationReply, PayloadReply, PrepareQueryReply, PrepareQueryRequest,
59    QueryReply, QueryRequest, ScanEntity, ScanReply, ScanRequest, StatsReply, TopologyReply,
60    TopologyRequest, UpdateEntityRequest,
61};
62
63mod control_support;
64mod entity_ops;
65mod input_support;
66pub(crate) mod scan_json;
67
68use self::control_support::*;
69use self::entity_ops::*;
70use self::input_support::*;
71use self::scan_json::*;
72
73#[derive(Debug, Clone)]
74pub struct GrpcServerOptions {
75    pub bind_addr: String,
76    /// Optional TLS configuration. When set the server terminates
77    /// TLS for inbound gRPC traffic via `tonic::transport::ServerTlsConfig`.
78    /// When `None`, the listener stays plaintext (back-compat for
79    /// loopback / sidecar deployments where a sidecar terminates TLS).
80    pub tls: Option<GrpcTlsOptions>,
81}
82
83/// PEM-encoded TLS material for gRPC's tonic-rustls server.
84///
85/// The server identity is required (cert + key); the optional
86/// client-CA enables mTLS — when present, tonic verifies and
87/// requires a client cert chain that anchors at this CA bundle.
88#[derive(Debug, Clone)]
89pub struct GrpcTlsOptions {
90    /// PEM bytes for the server certificate chain (leaf first).
91    pub cert_pem: Vec<u8>,
92    /// PEM bytes for the server private key (PKCS#8 / SEC1 / RSA).
93    pub key_pem: Vec<u8>,
94    /// Optional PEM bytes for the trust anchor used to verify
95    /// client certificates. When `Some(_)`, the server requires
96    /// every client to present a cert that chains to this CA;
97    /// when `None`, the server runs one-way TLS only.
98    pub client_ca_pem: Option<Vec<u8>>,
99}
100
101impl GrpcTlsOptions {
102    /// Build a `tonic` `ServerTlsConfig` from PEM bytes, applying
103    /// rustls defaults (TLS 1.2 + 1.3 — older versions are not
104    /// negotiable on tokio-rustls 0.26).
105    pub fn to_tonic_config(
106        &self,
107    ) -> Result<tonic::transport::ServerTlsConfig, Box<dyn std::error::Error>> {
108        let identity = tonic::transport::Identity::from_pem(&self.cert_pem, &self.key_pem);
109        let mut cfg = tonic::transport::ServerTlsConfig::new().identity(identity);
110        if let Some(ca_pem) = &self.client_ca_pem {
111            cfg = cfg.client_ca_root(tonic::transport::Certificate::from_pem(ca_pem));
112        }
113        Ok(cfg)
114    }
115}
116
117impl Default for GrpcServerOptions {
118    fn default() -> Self {
119        Self {
120            bind_addr: "127.0.0.1:5555".to_string(),
121            tls: None,
122        }
123    }
124}
125
126#[derive(Clone)]
127pub struct RedDBGrpcServer {
128    runtime: RedDBRuntime,
129    options: GrpcServerOptions,
130    auth_store: Arc<AuthStore>,
131    /// Optional OAuth/OIDC JWT validator. When set, the gRPC
132    /// interceptor validates JWT-shaped bearers against the issuer's
133    /// JWKS *before* attempting `AuthStore` session/api-key lookups.
134    /// Build externally via `crate::auth::OAuthValidator::with_verifier`
135    /// and attach with [`Self::with_oauth_validator`].
136    oauth_validator: Option<Arc<crate::auth::OAuthValidator>>,
137}
138
139impl RedDBGrpcServer {
140    pub fn new(runtime: RedDBRuntime) -> Self {
141        let auth_config = crate::auth::AuthConfig::default();
142        let auth_store = Arc::new(AuthStore::new(auth_config));
143        Self::with_options(runtime, GrpcServerOptions::default(), auth_store)
144    }
145
146    pub fn from_database_options(
147        db_options: RedDBOptions,
148        options: GrpcServerOptions,
149    ) -> RedDBResult<Self> {
150        // Create runtime first so we can access the pager for vault pages.
151        let runtime = RedDBRuntime::with_options(db_options.clone())?;
152
153        let auth_store = if db_options.auth.vault_enabled {
154            // The vault stores its encrypted state in reserved pages inside
155            // the main .rdb file.  Extract the pager reference from the
156            // runtime's underlying store.
157            let pager = runtime.db().store().pager().cloned().ok_or_else(|| {
158                crate::api::RedDBError::Internal(
159                    "vault requires a paged database (persistent mode)".into(),
160                )
161            })?;
162            let store = AuthStore::with_vault(db_options.auth.clone(), pager, None)
163                .map_err(|e| crate::api::RedDBError::Internal(e.to_string()))?;
164            Arc::new(store)
165        } else {
166            Arc::new(AuthStore::new(db_options.auth.clone()))
167        };
168        auth_store.bootstrap_from_env();
169        Ok(Self::with_options(runtime, options, auth_store))
170    }
171
172    pub fn with_options(
173        runtime: RedDBRuntime,
174        options: GrpcServerOptions,
175        auth_store: Arc<AuthStore>,
176    ) -> Self {
177        // Inject the auth store into the runtime so that Value::Secret
178        // auto-encrypt/decrypt can read the vault AES key.
179        runtime.set_auth_store(Arc::clone(&auth_store));
180        Self {
181            runtime,
182            options,
183            auth_store,
184            oauth_validator: None,
185        }
186    }
187
188    /// Attach an externally-constructed OAuth/OIDC JWT validator. Once
189    /// set, JWT-shaped bearer tokens (3-segment) on the
190    /// `authorization` metadata are validated against the issuer's
191    /// JWKS, expiry, audience, etc. Non-JWT bearers fall back to the
192    /// `AuthStore` session/API-key path.
193    pub fn with_oauth_validator(mut self, validator: Arc<crate::auth::OAuthValidator>) -> Self {
194        self.oauth_validator = Some(validator);
195        self
196    }
197
198    /// Inspect the active OAuth validator, when one is configured.
199    pub fn oauth_validator(&self) -> Option<&Arc<crate::auth::OAuthValidator>> {
200        self.oauth_validator.as_ref()
201    }
202
203    pub fn runtime(&self) -> &RedDBRuntime {
204        &self.runtime
205    }
206
207    pub fn options(&self) -> &GrpcServerOptions {
208        &self.options
209    }
210
211    pub fn auth_store(&self) -> &Arc<AuthStore> {
212        &self.auth_store
213    }
214
215    fn grpc_runtime(&self) -> GrpcRuntime {
216        GrpcRuntime {
217            runtime: self.runtime.clone(),
218            auth_store: self.auth_store.clone(),
219            prepared_registry: PreparedStatementRegistry::new(),
220            oauth_validator: self.oauth_validator.clone(),
221        }
222    }
223
224    pub async fn serve(&self) -> Result<(), Box<dyn std::error::Error>> {
225        let addr = self.options.bind_addr.parse()?;
226        let mut builder = tonic::transport::Server::builder();
227        if let Some(tls) = &self.options.tls {
228            // Constant-time SHA256 fingerprint logged for ops triage —
229            // never the bytes of cert/key themselves.
230            log_grpc_tls_identity(tls);
231            builder = builder.tls_config(tls.to_tonic_config()?)?;
232        }
233        builder
234            .add_service(Self::configured_service(self.grpc_runtime()))
235            .serve(addr)
236            .await?;
237        Ok(())
238    }
239
240    pub async fn serve_on(
241        &self,
242        listener: std::net::TcpListener,
243    ) -> Result<(), Box<dyn std::error::Error>> {
244        listener.set_nonblocking(true)?;
245        let listener = tokio::net::TcpListener::from_std(listener)?;
246        let incoming = TcpListenerStream::new(listener);
247        let mut builder = tonic::transport::Server::builder();
248        if let Some(tls) = &self.options.tls {
249            log_grpc_tls_identity(tls);
250            builder = builder.tls_config(tls.to_tonic_config()?)?;
251        }
252        builder
253            .add_service(Self::configured_service(self.grpc_runtime()))
254            .serve_with_incoming(incoming)
255            .await?;
256        Ok(())
257    }
258
259    fn configured_service(runtime: GrpcRuntime) -> RedDbServer<GrpcRuntime> {
260        // Advertise zstd + gzip so clients can opt in. Server compresses
261        // outbound replies with zstd; sticking to a single send codec keeps
262        // CPU predictable while still accepting either on inbound.
263        use tonic::codec::CompressionEncoding;
264        RedDbServer::new(runtime)
265            .max_decoding_message_size(256 * 1024 * 1024)
266            .max_encoding_message_size(256 * 1024 * 1024)
267            .accept_compressed(CompressionEncoding::Zstd)
268            .accept_compressed(CompressionEncoding::Gzip)
269            .send_compressed(CompressionEncoding::Zstd)
270    }
271}
272
273/// Server-side prepared statement — parsed + parameterized once, executed N times.
274struct GrpcPreparedStatement {
275    shape: std::sync::Arc<crate::storage::query::ast::QueryExpr>,
276    parameter_count: usize,
277    created_at: std::time::Instant,
278}
279
280/// Registry of prepared statements for one server instance.
281/// Session-independent: any connection can execute any prepared statement by ID.
282struct PreparedStatementRegistry {
283    // parking_lot::RwLock never poisons on panic — safe to use without unwrap().
284    map: parking_lot::RwLock<std::collections::HashMap<u64, GrpcPreparedStatement>>,
285    next_id: std::sync::atomic::AtomicU64,
286    get_count: std::sync::atomic::AtomicU64,
287}
288
289impl PreparedStatementRegistry {
290    fn new() -> Arc<Self> {
291        Arc::new(Self {
292            map: parking_lot::RwLock::new(std::collections::HashMap::new()),
293            next_id: std::sync::atomic::AtomicU64::new(1),
294            get_count: std::sync::atomic::AtomicU64::new(0),
295        })
296    }
297
298    fn prepare(&self, shape: crate::storage::query::ast::QueryExpr, parameter_count: usize) -> u64 {
299        use std::sync::atomic::Ordering;
300        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
301        let mut map = self.map.write();
302        self.evict_old_locked(&mut map);
303        map.insert(
304            id,
305            GrpcPreparedStatement {
306                // Store as Arc to avoid cloning the full AST on every execute.
307                shape: std::sync::Arc::new(shape),
308                parameter_count,
309                created_at: std::time::Instant::now(),
310            },
311        );
312        id
313    }
314
315    fn get_shape_and_count(
316        &self,
317        id: u64,
318    ) -> Option<(std::sync::Arc<crate::storage::query::ast::QueryExpr>, usize)> {
319        // Periodic eviction on execute/get traffic so long-lived servers that
320        // prepare once and execute many times still age out stale statements.
321        let get_count = self
322            .get_count
323            .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
324            + 1;
325        if get_count.is_multiple_of(256) {
326            let mut map = self.map.write();
327            self.evict_old_locked(&mut map);
328        }
329        let map = self.map.read();
330        map.get(&id)
331            .map(|s| (std::sync::Arc::clone(&s.shape), s.parameter_count))
332    }
333
334    fn evict_old_locked(&self, map: &mut std::collections::HashMap<u64, GrpcPreparedStatement>) {
335        let threshold = std::time::Duration::from_secs(3600);
336        map.retain(|_, v| v.created_at.elapsed() < threshold);
337    }
338}
339
340#[derive(Clone)]
341struct GrpcRuntime {
342    runtime: RedDBRuntime,
343    auth_store: Arc<AuthStore>,
344    prepared_registry: Arc<PreparedStatementRegistry>,
345    /// OAuth/OIDC JWT validator built once from `auth_store.config().oauth`
346    /// when the operator enables OAuth. `None` means JWT bearers fall
347    /// back to the AuthStore lookup path.
348    oauth_validator: Option<Arc<crate::auth::OAuthValidator>>,
349}
350
351impl GrpcRuntime {
352    fn admin_use_cases(&self) -> AdminUseCases<'_, RedDBRuntime> {
353        AdminUseCases::new(&self.runtime)
354    }
355
356    fn catalog_use_cases(&self) -> CatalogUseCases<'_, RedDBRuntime> {
357        CatalogUseCases::new(&self.runtime)
358    }
359
360    fn query_use_cases(&self) -> QueryUseCases<'_, RedDBRuntime> {
361        QueryUseCases::new(&self.runtime)
362    }
363
364    fn entity_use_cases(&self) -> EntityUseCases<'_, RedDBRuntime> {
365        EntityUseCases::new(&self.runtime)
366    }
367
368    fn graph_use_cases(&self) -> GraphUseCases<'_, RedDBRuntime> {
369        GraphUseCases::new(&self.runtime)
370    }
371
372    fn native_use_cases(&self) -> NativeUseCases<'_, RedDBRuntime> {
373        NativeUseCases::new(&self.runtime)
374    }
375}
376
377/// Emit a single info-level event with the SHA-256 fingerprint of the
378/// active gRPC server cert + an mTLS flag. Never logs PEM bytes.
379fn log_grpc_tls_identity(tls: &GrpcTlsOptions) {
380    use sha2::{Digest, Sha256};
381    let cert_fp = {
382        let mut h = Sha256::new();
383        h.update(&tls.cert_pem);
384        let digest = h.finalize();
385        // First 16 hex chars are enough for human cross-check; the full
386        // SHA-256 lives in audit logs only.
387        let mut buf = String::with_capacity(64);
388        for b in digest.iter() {
389            buf.push_str(&format!("{b:02x}"));
390        }
391        buf
392    };
393    tracing::info!(
394        target: "reddb::security",
395        transport = "grpc",
396        cert_sha256 = %cert_fp,
397        mtls = tls.client_ca_pem.is_some(),
398        "gRPC TLS identity loaded"
399    );
400}
401
402include!("grpc/service_impl.rs");