Skip to main content

reddb_server/
grpc.rs

1pub(crate) use crate::application::json_input::{
2    json_bool_field, json_f32_field, json_string_field, json_usize_field,
3};
4pub(crate) use crate::application::{
5    AdminUseCases, CatalogUseCases, CreateEdgeInput, CreateEntityOutput, CreateNodeGraphLinkInput,
6    CreateNodeInput, CreateNodeTableLinkInput, CreateRowInput, CreateVectorInput,
7    DeleteEntityInput, EntityUseCases, ExecuteQueryInput, ExplainQueryInput, GraphCentralityInput,
8    GraphClusteringInput, GraphCommunitiesInput, GraphComponentsInput, GraphCyclesInput,
9    GraphHitsInput, GraphNeighborhoodInput, GraphPersonalizedPageRankInput, GraphShortestPathInput,
10    GraphTopologicalSortInput, GraphTraversalInput, GraphUseCases, InspectNativeArtifactInput,
11    NativeUseCases, PatchEntityInput, QueryUseCases, SearchHybridInput, SearchIvfInput,
12    SearchSimilarInput, SearchTextInput,
13};
14use std::collections::BTreeMap;
15use std::pin::Pin;
16use std::sync::Arc;
17use std::time::{SystemTime, UNIX_EPOCH};
18
19use crate::api::{RedDBOptions, RedDBResult};
20use crate::auth::middleware::{check_permission, AuthResult, AuthSource};
21use crate::auth::store::AuthStore;
22use crate::auth::Role;
23use crate::health::{HealthProvider, HealthState};
24use crate::json::{
25    from_str as json_from_str, to_string as json_to_string, Map, Value as JsonValue,
26};
27use crate::runtime::{
28    RedDBRuntime, RuntimeFilter, RuntimeFilterValue, RuntimeGraphCentralityAlgorithm,
29    RuntimeGraphCentralityResult, RuntimeGraphClusteringResult, RuntimeGraphCommunityAlgorithm,
30    RuntimeGraphCommunityResult, RuntimeGraphComponentsMode, RuntimeGraphComponentsResult,
31    RuntimeGraphCyclesResult, RuntimeGraphDirection, RuntimeGraphHitsResult,
32    RuntimeGraphNeighborhoodResult, RuntimeGraphPathAlgorithm, RuntimeGraphPathResult,
33    RuntimeGraphPattern, RuntimeGraphProjection, RuntimeGraphTopologicalSortResult,
34    RuntimeGraphTraversalResult, RuntimeGraphTraversalStrategy, RuntimeIvfSearchResult,
35    RuntimeQueryResult, RuntimeQueryWeights, RuntimeStats, ScanPage,
36};
37use crate::storage::schema::Value;
38use crate::storage::unified::devx::refs::{NodeRef, TableRef};
39use crate::storage::unified::{Metadata, MetadataValue};
40use crate::storage::{EntityData, EntityId, UnifiedEntity};
41use tokio_stream::wrappers::TcpListenerStream;
42use tonic::metadata::MetadataMap;
43use tonic::{Request, Response, Status};
44
45// gRPC protobuf types and tonic stubs live in the standalone
46// `reddb-grpc-proto` crate so `reddb-server` and `reddb-client`
47// can both consume them without a dependency cycle. We expose
48// them under the legacy `proto` module path so existing
49// `crate::grpc::proto::…` imports keep resolving.
50pub use reddb_grpc_proto as proto;
51
52use proto::red_db_server::{RedDb, RedDbServer};
53use proto::{
54    ask_stream_event, AskAnswerToken, AskReply, AskRequest, AskSources, AskStreamEvent,
55    BatchInsertChunk, BatchInsertReply, BatchQueryReply, BatchQueryRequest, BulkEntityReply,
56    Citation, CollectionRequest, CollectionsReply, DeleteEntityRequest, DeploymentProfileRequest,
57    Empty, EntityReply, ExecutePreparedRequest, ExportRequest, GraphProjectionUpsertRequest,
58    HealthReply, IndexNameRequest, IndexToggleRequest, JsonBulkCreateRequest, JsonCreateRequest,
59    JsonPayloadRequest, KvWatchEvent, KvWatchRequest, ManifestRequest, OperationReply,
60    PayloadReply, PrepareQueryReply, PrepareQueryRequest, QueryReply, QueryRequest, QueryValue,
61    ScanEntity, ScanReply, ScanRequest, StatsReply, TopologyReply, TopologyRequest,
62    UpdateEntityRequest, Validation, ValidationItem,
63};
64
65mod control_support;
66mod entity_ops;
67mod input_support;
68pub(crate) mod scan_json;
69
70use self::control_support::*;
71use self::entity_ops::*;
72use self::input_support::*;
73use self::scan_json::*;
74
75#[derive(Debug, Clone)]
76pub struct GrpcServerOptions {
77    pub bind_addr: String,
78    /// Optional TLS configuration. When set the server terminates
79    /// TLS for inbound gRPC traffic via `tonic::transport::ServerTlsConfig`.
80    /// When `None`, the listener stays plaintext (back-compat for
81    /// loopback / sidecar deployments where a sidecar terminates TLS).
82    pub tls: Option<GrpcTlsOptions>,
83}
84
85/// PEM-encoded TLS material for gRPC's tonic-rustls server.
86///
87/// The server identity is required (cert + key); the optional
88/// client-CA enables mTLS — when present, tonic verifies and
89/// requires a client cert chain that anchors at this CA bundle.
90#[derive(Debug, Clone)]
91pub struct GrpcTlsOptions {
92    /// PEM bytes for the server certificate chain (leaf first).
93    pub cert_pem: Vec<u8>,
94    /// PEM bytes for the server private key (PKCS#8 / SEC1 / RSA).
95    pub key_pem: Vec<u8>,
96    /// Optional PEM bytes for the trust anchor used to verify
97    /// client certificates. When `Some(_)`, the server requires
98    /// every client to present a cert that chains to this CA;
99    /// when `None`, the server runs one-way TLS only.
100    pub client_ca_pem: Option<Vec<u8>>,
101}
102
103impl GrpcTlsOptions {
104    /// Build a `tonic` `ServerTlsConfig` from PEM bytes, applying
105    /// rustls defaults (TLS 1.2 + 1.3 — older versions are not
106    /// negotiable on tokio-rustls 0.26).
107    pub fn to_tonic_config(
108        &self,
109    ) -> Result<tonic::transport::ServerTlsConfig, Box<dyn std::error::Error>> {
110        let identity = tonic::transport::Identity::from_pem(&self.cert_pem, &self.key_pem);
111        let mut cfg = tonic::transport::ServerTlsConfig::new().identity(identity);
112        if let Some(ca_pem) = &self.client_ca_pem {
113            cfg = cfg.client_ca_root(tonic::transport::Certificate::from_pem(ca_pem));
114        }
115        Ok(cfg)
116    }
117}
118
119impl Default for GrpcServerOptions {
120    fn default() -> Self {
121        Self {
122            bind_addr: "127.0.0.1:55055".to_string(),
123            tls: None,
124        }
125    }
126}
127
128#[derive(Clone)]
129pub struct RedDBGrpcServer {
130    runtime: RedDBRuntime,
131    options: GrpcServerOptions,
132    auth_store: Arc<AuthStore>,
133    /// Optional OAuth/OIDC JWT validator. When set, the gRPC
134    /// interceptor validates JWT-shaped bearers against the issuer's
135    /// JWKS *before* attempting `AuthStore` session/api-key lookups.
136    /// Build externally via `crate::auth::OAuthValidator::with_verifier`
137    /// and attach with [`Self::with_oauth_validator`].
138    oauth_validator: Option<Arc<crate::auth::OAuthValidator>>,
139}
140
141impl RedDBGrpcServer {
142    pub fn new(runtime: RedDBRuntime) -> Self {
143        let auth_config = crate::auth::AuthConfig::default();
144        let auth_store = Arc::new(AuthStore::new(auth_config));
145        Self::with_options(runtime, GrpcServerOptions::default(), auth_store)
146    }
147
148    pub fn from_database_options(
149        db_options: RedDBOptions,
150        options: GrpcServerOptions,
151    ) -> RedDBResult<Self> {
152        // Create runtime first so we can access the pager for vault pages.
153        let runtime = RedDBRuntime::with_options(db_options.clone())?;
154
155        let auth_store = if db_options.auth.vault_enabled {
156            // The vault stores its encrypted state in reserved pages inside
157            // the main .rdb file.  Extract the pager reference from the
158            // runtime's underlying store.
159            let pager = runtime.db().store().pager().cloned().ok_or_else(|| {
160                crate::api::RedDBError::Internal(
161                    "vault requires a paged database (persistent mode)".into(),
162                )
163            })?;
164            let store = AuthStore::with_vault(db_options.auth.clone(), pager)
165                .map_err(|e| crate::api::RedDBError::Internal(e.to_string()))?;
166            Arc::new(store)
167        } else {
168            Arc::new(AuthStore::new(db_options.auth.clone()))
169        };
170        auth_store.bootstrap_from_env();
171        Ok(Self::with_options(runtime, options, auth_store))
172    }
173
174    pub fn with_options(
175        runtime: RedDBRuntime,
176        options: GrpcServerOptions,
177        auth_store: Arc<AuthStore>,
178    ) -> Self {
179        // Inject the auth store into the runtime so that Value::Secret
180        // auto-encrypt/decrypt can read the vault AES key.
181        runtime.set_auth_store(Arc::clone(&auth_store));
182        Self {
183            runtime,
184            options,
185            auth_store,
186            oauth_validator: None,
187        }
188    }
189
190    /// Attach an externally-constructed OAuth/OIDC JWT validator. Once
191    /// set, JWT-shaped bearer tokens (3-segment) on the
192    /// `authorization` metadata are validated against the issuer's
193    /// JWKS, expiry, audience, etc. Non-JWT bearers fall back to the
194    /// `AuthStore` session/API-key path.
195    pub fn with_oauth_validator(mut self, validator: Arc<crate::auth::OAuthValidator>) -> Self {
196        self.oauth_validator = Some(validator);
197        self
198    }
199
200    /// Inspect the active OAuth validator, when one is configured.
201    pub fn oauth_validator(&self) -> Option<&Arc<crate::auth::OAuthValidator>> {
202        self.oauth_validator.as_ref()
203    }
204
205    pub fn runtime(&self) -> &RedDBRuntime {
206        &self.runtime
207    }
208
209    pub fn options(&self) -> &GrpcServerOptions {
210        &self.options
211    }
212
213    pub fn auth_store(&self) -> &Arc<AuthStore> {
214        &self.auth_store
215    }
216
217    fn grpc_runtime(&self) -> GrpcRuntime {
218        GrpcRuntime {
219            runtime: self.runtime.clone(),
220            auth_store: self.auth_store.clone(),
221            prepared_registry: PreparedStatementRegistry::new(),
222            oauth_validator: self.oauth_validator.clone(),
223        }
224    }
225
226    pub async fn serve(&self) -> Result<(), Box<dyn std::error::Error>> {
227        let addr = self.options.bind_addr.parse()?;
228        let mut builder = tonic::transport::Server::builder();
229        if let Some(tls) = &self.options.tls {
230            // Constant-time SHA256 fingerprint logged for ops triage —
231            // never the bytes of cert/key themselves.
232            log_grpc_tls_identity(tls);
233            builder = builder.tls_config(tls.to_tonic_config()?)?;
234        }
235        builder
236            .add_service(Self::configured_service(self.grpc_runtime()))
237            .serve(addr)
238            .await?;
239        Ok(())
240    }
241
242    pub async fn serve_on(
243        &self,
244        listener: std::net::TcpListener,
245    ) -> Result<(), Box<dyn std::error::Error>> {
246        listener.set_nonblocking(true)?;
247        let listener = tokio::net::TcpListener::from_std(listener)?;
248        let incoming = TcpListenerStream::new(listener);
249        let mut builder = tonic::transport::Server::builder();
250        if let Some(tls) = &self.options.tls {
251            log_grpc_tls_identity(tls);
252            builder = builder.tls_config(tls.to_tonic_config()?)?;
253        }
254        builder
255            .add_service(Self::configured_service(self.grpc_runtime()))
256            .serve_with_incoming(incoming)
257            .await?;
258        Ok(())
259    }
260
261    /// Serve gRPC over a stream of already-accepted connections fed by the
262    /// in-process protocol demux (issue #933). The demux classifies each
263    /// inbound connection on the shared port and hands the HTTP/2 ones
264    /// straight in through `rx` — there is no loopback socket and no
265    /// `copy_bidirectional` hop. The server runs until `rx` closes (the
266    /// demux acceptor dropped its sender) and all in-flight RPCs drain.
267    pub(crate) async fn serve_router_demux(
268        &self,
269        rx: tokio::sync::mpsc::Receiver<tokio::net::TcpStream>,
270    ) -> Result<(), Box<dyn std::error::Error>> {
271        use tokio_stream::StreamExt;
272        let incoming = tokio_stream::wrappers::ReceiverStream::new(rx).map(Ok::<_, std::io::Error>);
273        let mut builder = tonic::transport::Server::builder();
274        if let Some(tls) = &self.options.tls {
275            log_grpc_tls_identity(tls);
276            builder = builder.tls_config(tls.to_tonic_config()?)?;
277        }
278        builder
279            .add_service(Self::configured_service(self.grpc_runtime()))
280            .serve_with_incoming(incoming)
281            .await?;
282        Ok(())
283    }
284
285    fn configured_service(runtime: GrpcRuntime) -> RedDbServer<GrpcRuntime> {
286        // Advertise zstd + gzip so clients can opt in. Server compresses
287        // outbound replies with zstd; sticking to a single send codec keeps
288        // CPU predictable while still accepting either on inbound.
289        use tonic::codec::CompressionEncoding;
290        RedDbServer::new(runtime)
291            .max_decoding_message_size(256 * 1024 * 1024)
292            .max_encoding_message_size(256 * 1024 * 1024)
293            .accept_compressed(CompressionEncoding::Zstd)
294            .accept_compressed(CompressionEncoding::Gzip)
295            .send_compressed(CompressionEncoding::Zstd)
296    }
297}
298
299/// Server-side prepared statement — parsed + parameterized once, executed N times.
300struct GrpcPreparedStatement {
301    shape: std::sync::Arc<crate::storage::query::ast::QueryExpr>,
302    parameter_count: usize,
303    created_at: std::time::Instant,
304}
305
306/// Registry of prepared statements for one server instance.
307/// Session-independent: any connection can execute any prepared statement by ID.
308struct PreparedStatementRegistry {
309    // parking_lot::RwLock never poisons on panic — safe to use without unwrap().
310    map: parking_lot::RwLock<std::collections::HashMap<u64, GrpcPreparedStatement>>,
311    next_id: std::sync::atomic::AtomicU64,
312    get_count: std::sync::atomic::AtomicU64,
313}
314
315impl PreparedStatementRegistry {
316    fn new() -> Arc<Self> {
317        Arc::new(Self {
318            map: parking_lot::RwLock::new(std::collections::HashMap::new()),
319            next_id: std::sync::atomic::AtomicU64::new(1),
320            get_count: std::sync::atomic::AtomicU64::new(0),
321        })
322    }
323
324    fn prepare(&self, shape: crate::storage::query::ast::QueryExpr, parameter_count: usize) -> u64 {
325        use std::sync::atomic::Ordering;
326        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
327        let mut map = self.map.write();
328        self.evict_old_locked(&mut map);
329        map.insert(
330            id,
331            GrpcPreparedStatement {
332                // Store as Arc to avoid cloning the full AST on every execute.
333                shape: std::sync::Arc::new(shape),
334                parameter_count,
335                created_at: std::time::Instant::now(),
336            },
337        );
338        id
339    }
340
341    fn get_shape_and_count(
342        &self,
343        id: u64,
344    ) -> Option<(std::sync::Arc<crate::storage::query::ast::QueryExpr>, usize)> {
345        // Periodic eviction on execute/get traffic so long-lived servers that
346        // prepare once and execute many times still age out stale statements.
347        let get_count = self
348            .get_count
349            .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
350            + 1;
351        if get_count.is_multiple_of(256) {
352            let mut map = self.map.write();
353            self.evict_old_locked(&mut map);
354        }
355        let map = self.map.read();
356        map.get(&id)
357            .map(|s| (std::sync::Arc::clone(&s.shape), s.parameter_count))
358    }
359
360    fn evict_old_locked(&self, map: &mut std::collections::HashMap<u64, GrpcPreparedStatement>) {
361        let threshold = std::time::Duration::from_secs(3600);
362        map.retain(|_, v| v.created_at.elapsed() < threshold);
363    }
364}
365
366#[derive(Clone)]
367struct GrpcRuntime {
368    runtime: RedDBRuntime,
369    auth_store: Arc<AuthStore>,
370    prepared_registry: Arc<PreparedStatementRegistry>,
371    /// OAuth/OIDC JWT validator built once from `auth_store.config().oauth`
372    /// when the operator enables OAuth. `None` means JWT bearers fall
373    /// back to the AuthStore lookup path.
374    oauth_validator: Option<Arc<crate::auth::OAuthValidator>>,
375}
376
377impl GrpcRuntime {
378    fn admin_use_cases(&self) -> AdminUseCases<'_, RedDBRuntime> {
379        AdminUseCases::new(&self.runtime)
380    }
381
382    fn catalog_use_cases(&self) -> CatalogUseCases<'_, RedDBRuntime> {
383        CatalogUseCases::new(&self.runtime)
384    }
385
386    fn query_use_cases(&self) -> QueryUseCases<'_, RedDBRuntime> {
387        QueryUseCases::new(&self.runtime)
388    }
389
390    fn entity_use_cases(&self) -> EntityUseCases<'_, RedDBRuntime> {
391        EntityUseCases::new(&self.runtime)
392    }
393
394    fn graph_use_cases(&self) -> GraphUseCases<'_, RedDBRuntime> {
395        GraphUseCases::new(&self.runtime)
396    }
397
398    fn native_use_cases(&self) -> NativeUseCases<'_, RedDBRuntime> {
399        NativeUseCases::new(&self.runtime)
400    }
401}
402
403fn grpc_query_value_to_schema_value(value: QueryValue) -> Result<Value, Status> {
404    use proto::query_value::Kind;
405
406    match value
407        .kind
408        .ok_or_else(|| Status::invalid_argument("missing query param value"))?
409    {
410        Kind::NullValue(_) => Ok(Value::Null),
411        Kind::BoolValue(value) => Ok(Value::Boolean(value)),
412        Kind::IntValue(value) => Ok(Value::Integer(value)),
413        Kind::FloatValue(value) => Ok(Value::Float(value)),
414        Kind::TextValue(value) => Ok(Value::Text(std::sync::Arc::from(value))),
415        Kind::BytesValue(value) => Ok(Value::Blob(value)),
416        Kind::VectorValue(value) => Ok(Value::Vector(value.values)),
417        Kind::JsonValue(value) => {
418            let parsed = json_from_str::<JsonValue>(&value)
419                .map_err(|e| Status::invalid_argument(format!("json param parse error: {e}")))?;
420            let encoded = json_to_string(&parsed)
421                .map_err(|e| Status::invalid_argument(format!("json param encode error: {e}")))?;
422            Ok(Value::Json(encoded.into_bytes()))
423        }
424        Kind::TimestampValue(value) => Ok(Value::Timestamp(value)),
425        Kind::UuidValue(value) => {
426            let bytes: [u8; 16] = value.try_into().map_err(|value: Vec<u8>| {
427                Status::invalid_argument(format!(
428                    "uuid param must be 16 bytes, got {}",
429                    value.len()
430                ))
431            })?;
432            Ok(Value::Uuid(bytes))
433        }
434    }
435}
436
437fn execute_grpc_query_with_optional_params(
438    runtime: &RedDBRuntime,
439    query: String,
440    params: Vec<QueryValue>,
441) -> Result<RuntimeQueryResult, Status> {
442    if query.trim().is_empty() {
443        return Err(Status::invalid_argument("query field cannot be empty"));
444    }
445
446    if params.is_empty() {
447        let result = runtime.execute_query(&query).map_err(to_status)?;
448        enforce_grpc_commit_policy_after_query_result(runtime, &result)?;
449        return Ok(result);
450    }
451
452    let binds = params
453        .into_iter()
454        .map(grpc_query_value_to_schema_value)
455        .collect::<Result<Vec<_>, _>>()?;
456    let result = runtime
457        .execute_query_with_params(&query, &binds)
458        .map_err(to_status)?;
459    enforce_grpc_commit_policy_after_query_result(runtime, &result)?;
460    Ok(result)
461}
462
463fn enforce_grpc_commit_policy_after_query_result(
464    runtime: &RedDBRuntime,
465    result: &RuntimeQueryResult,
466) -> Result<(), Status> {
467    let is_mutation = matches!(result.statement_type, "insert" | "update" | "delete");
468    if !is_mutation {
469        return Ok(());
470    }
471    let post_lsn = runtime.cdc_current_lsn();
472    runtime
473        .enforce_commit_policy(post_lsn)
474        .map(|_| ())
475        .map_err(|err| Status::deadline_exceeded(err.to_string()))
476}
477
478#[cfg(test)]
479mod grpc_query_value_tests {
480    use super::*;
481    use proto::query_value::Kind;
482
483    #[test]
484    fn grpc_query_value_maps_to_schema_value_variants() {
485        let cases = vec![
486            (
487                QueryValue {
488                    kind: Some(Kind::NullValue(proto::QueryNull {})),
489                },
490                Value::Null,
491            ),
492            (
493                QueryValue {
494                    kind: Some(Kind::BoolValue(true)),
495                },
496                Value::Boolean(true),
497            ),
498            (
499                QueryValue {
500                    kind: Some(Kind::IntValue(42)),
501                },
502                Value::Integer(42),
503            ),
504            (
505                QueryValue {
506                    kind: Some(Kind::FloatValue(1.5)),
507                },
508                Value::Float(1.5),
509            ),
510            (
511                QueryValue {
512                    kind: Some(Kind::BytesValue(vec![0, 1, 2])),
513                },
514                Value::Blob(vec![0, 1, 2]),
515            ),
516            (
517                QueryValue {
518                    kind: Some(Kind::VectorValue(proto::QueryVector {
519                        values: vec![0.25, 0.5],
520                    })),
521                },
522                Value::Vector(vec![0.25, 0.5]),
523            ),
524            (
525                QueryValue {
526                    kind: Some(Kind::TimestampValue(1_779_999_000)),
527                },
528                Value::Timestamp(1_779_999_000),
529            ),
530            (
531                QueryValue {
532                    kind: Some(Kind::UuidValue(vec![0x11; 16])),
533                },
534                Value::Uuid([0x11; 16]),
535            ),
536        ];
537
538        for (input, expected) in cases {
539            assert_eq!(grpc_query_value_to_schema_value(input).unwrap(), expected);
540        }
541
542        assert_eq!(
543            grpc_query_value_to_schema_value(QueryValue {
544                kind: Some(Kind::TextValue("alice".into())),
545            })
546            .unwrap(),
547            Value::Text(std::sync::Arc::from("alice"))
548        );
549        assert_eq!(
550            grpc_query_value_to_schema_value(QueryValue {
551                kind: Some(Kind::JsonValue("{\"role\":\"admin\"}".into())),
552            })
553            .unwrap(),
554            Value::Json(b"{\"role\":\"admin\"}".to_vec())
555        );
556    }
557
558    #[test]
559    fn grpc_query_value_rejects_missing_kind_and_bad_uuid() {
560        assert!(grpc_query_value_to_schema_value(QueryValue { kind: None }).is_err());
561        assert!(grpc_query_value_to_schema_value(QueryValue {
562            kind: Some(Kind::UuidValue(vec![0; 15])),
563        })
564        .is_err());
565    }
566
567    #[test]
568    fn grpc_query_rejects_empty_query_before_runtime_parse() {
569        let runtime =
570            RedDBRuntime::with_options(crate::api::RedDBOptions::in_memory()).expect("runtime");
571        let err = execute_grpc_query_with_optional_params(&runtime, "  ".to_string(), Vec::new())
572            .expect_err("empty query should fail");
573
574        assert_eq!(err.code(), tonic::Code::InvalidArgument);
575        assert_eq!(err.message(), "query field cannot be empty");
576    }
577
578    #[test]
579    fn grpc_query_params_are_bound_before_execution() {
580        let runtime =
581            RedDBRuntime::with_options(crate::api::RedDBOptions::in_memory()).expect("runtime");
582        seed_grpc_param_table(&runtime);
583
584        let result = execute_grpc_query_with_optional_params(
585            &runtime,
586            "SELECT id, name FROM p WHERE id = $1 AND name = $2".to_string(),
587            grpc_param_values(),
588        )
589        .expect("parameterized query");
590
591        assert_eq!(result.result.records.len(), 1);
592    }
593
594    #[test]
595    fn grpc_query_enforces_ack_n_commit_policy_fail_closed() {
596        let _env_lock = env_lock().lock().expect("env lock");
597        let _env = EnvGuard::set(&[
598            ("RED_PRIMARY_COMMIT_POLICY", "ack_n=1"),
599            ("RED_REPLICATION_ACK_TIMEOUT_MS", "20"),
600            ("RED_COMMIT_FAIL_ON_TIMEOUT", "true"),
601        ]);
602        let data_path = temp_data_path("grpc_ack_n_timeout");
603        cleanup(&data_path);
604
605        let runtime = RedDBRuntime::with_options(
606            crate::api::RedDBOptions::persistent(&data_path)
607                .with_replication(crate::replication::ReplicationConfig::primary()),
608        )
609        .expect("runtime");
610
611        let err = execute_grpc_query_with_optional_params(
612            &runtime,
613            "INSERT INTO grpc_ack_items (id, name) VALUES (1, 'alpha')".to_string(),
614            Vec::new(),
615        )
616        .expect_err("ack_n without replica ack must fail closed");
617
618        assert_eq!(err.code(), tonic::Code::DeadlineExceeded);
619        assert!(
620            err.message().contains("commit policy timed out")
621                && err.message().contains("RED_COMMIT_FAIL_ON_TIMEOUT"),
622            "error should identify commit policy timeout, got {err:?}"
623        );
624        assert!(
625            runtime.cdc_current_lsn() > 0,
626            "local mutation should advance CDC before gRPC response fails"
627        );
628
629        cleanup(&data_path);
630    }
631
632    #[tokio::test]
633    async fn grpc_query_rpc_binds_query_request_params() {
634        let runtime =
635            RedDBRuntime::with_options(crate::api::RedDBOptions::in_memory()).expect("runtime");
636        seed_grpc_param_table(&runtime);
637        let service = GrpcRuntime {
638            runtime,
639            auth_store: Arc::new(AuthStore::new(crate::auth::AuthConfig::default())),
640            prepared_registry: PreparedStatementRegistry::new(),
641            oauth_validator: None,
642        };
643
644        let reply = RedDb::query(
645            &service,
646            Request::new(QueryRequest {
647                query: "SELECT id, name FROM p WHERE id = $1 AND name = $2".to_string(),
648                entity_types: Vec::new(),
649                capabilities: Vec::new(),
650                params: grpc_param_values(),
651            }),
652        )
653        .await
654        .expect("query rpc")
655        .into_inner();
656
657        assert_eq!(reply.record_count, 1);
658        assert!(reply.result_json.contains("Alice"), "{}", reply.result_json);
659        assert!(!reply.result_json.contains("Bob"), "{}", reply.result_json);
660    }
661
662    #[tokio::test]
663    async fn pull_wal_records_rejects_stale_term_on_grpc_path() {
664        let runtime = RedDBRuntime::with_options(
665            crate::api::RedDBOptions::in_memory()
666                .with_replication(crate::replication::ReplicationConfig::primary().with_term(6)),
667        )
668        .expect("runtime");
669        let auth_store = Arc::new(AuthStore::new(crate::auth::AuthConfig {
670            enabled: true,
671            require_auth: true,
672            ..crate::auth::AuthConfig::default()
673        }));
674        let bootstrap = auth_store
675            .bootstrap("replica", "secret")
676            .expect("bootstrap");
677        let policy = crate::auth::policies::Policy::from_json_str(
678            r#"{
679                "id": "replication-stream",
680                "version": 1,
681                "statements": [{
682                    "effect": "allow",
683                    "actions": ["cluster:replication:stream"],
684                    "resources": ["cluster:replication"]
685                }]
686            }"#,
687        )
688        .expect("policy");
689        auth_store.put_policy(policy).expect("install policy");
690        auth_store
691            .attach_policy(
692                crate::auth::store::PrincipalRef::User(crate::auth::UserId::platform("replica")),
693                "replication-stream",
694            )
695            .expect("attach policy");
696        let service = GrpcRuntime {
697            runtime,
698            auth_store,
699            prepared_registry: PreparedStatementRegistry::new(),
700            oauth_validator: None,
701        };
702
703        let open = reddb_wire::replication::WalStreamOpen {
704            since_lsn: 0,
705            max_count: 1,
706            replica_id: Some("replica-a".to_string()),
707            term: 5,
708            await_data: false,
709            await_timeout_ms: 1,
710        };
711        let mut request = Request::new(JsonPayloadRequest {
712            payload_json: String::from_utf8(open.encode_json()).expect("json"),
713        });
714        request.metadata_mut().insert(
715            "authorization",
716            format!("Bearer {}", bootstrap.api_key.key)
717                .parse()
718                .expect("metadata"),
719        );
720
721        let err = RedDb::pull_wal_records(&service, request)
722            .await
723            .expect_err("stale term should be fenced");
724
725        assert_eq!(err.code(), tonic::Code::FailedPrecondition);
726        assert!(
727            err.message().contains("stale")
728                || err.message().contains("fenced")
729                || err.message().contains("current term"),
730            "unexpected stale-term error: {err:?}"
731        );
732    }
733
734    fn seed_grpc_param_table(runtime: &RedDBRuntime) {
735        runtime
736            .execute_query("CREATE TABLE p (id INTEGER, name TEXT)")
737            .expect("create table");
738        runtime
739            .execute_query("INSERT INTO p (id, name) VALUES (1, 'Alice')")
740            .expect("insert alice");
741        runtime
742            .execute_query("INSERT INTO p (id, name) VALUES (2, 'Bob')")
743            .expect("insert bob");
744    }
745
746    fn grpc_param_values() -> Vec<QueryValue> {
747        vec![
748            QueryValue {
749                kind: Some(Kind::IntValue(1)),
750            },
751            QueryValue {
752                kind: Some(Kind::TextValue("Alice".to_string())),
753            },
754        ]
755    }
756
757    fn env_lock() -> &'static std::sync::Mutex<()> {
758        static LOCK: std::sync::OnceLock<std::sync::Mutex<()>> = std::sync::OnceLock::new();
759        LOCK.get_or_init(|| std::sync::Mutex::new(()))
760    }
761
762    struct EnvGuard {
763        previous: Vec<(&'static str, Option<String>)>,
764    }
765
766    impl EnvGuard {
767        fn set(vars: &[(&'static str, &'static str)]) -> Self {
768            let previous = vars
769                .iter()
770                .map(|(key, _)| (*key, std::env::var(key).ok()))
771                .collect();
772            for (key, value) in vars {
773                std::env::set_var(key, value);
774            }
775            Self { previous }
776        }
777    }
778
779    impl Drop for EnvGuard {
780        fn drop(&mut self) {
781            for (key, value) in self.previous.iter().rev() {
782                match value {
783                    Some(value) => std::env::set_var(key, value),
784                    None => std::env::remove_var(key),
785                }
786            }
787        }
788    }
789
790    fn temp_data_path(name: &str) -> std::path::PathBuf {
791        let suffix = SystemTime::now()
792            .duration_since(UNIX_EPOCH)
793            .unwrap()
794            .as_nanos();
795        std::env::temp_dir().join(format!("reddb_{name}_{suffix}.rdb"))
796    }
797
798    fn cleanup(data_path: &std::path::Path) {
799        let _ = std::fs::remove_file(data_path);
800        let _ = std::fs::remove_file(
801            crate::replication::primary::PrimaryReplication::slot_path_for(data_path),
802        );
803        let _ = std::fs::remove_file(crate::replication::primary::LogicalWalSpool::path_for(
804            data_path,
805        ));
806        let _ = std::fs::remove_dir_all(
807            crate::replication::primary::PrimaryReplication::primary_replica_root_for(data_path),
808        );
809        reddb_file::cleanup_rebootstrap_artifacts(data_path);
810    }
811}
812
813#[cfg(test)]
814mod grpc_ask_query_reply_tests {
815    use super::*;
816    use crate::storage::query::modes::QueryMode;
817    use crate::storage::query::unified::{UnifiedRecord, UnifiedResult};
818    use crate::storage::schema::Value as SchemaValue;
819
820    fn ask_runtime_result() -> RuntimeQueryResult {
821        let mut result = UnifiedResult::with_columns(vec![
822            "answer".into(),
823            "provider".into(),
824            "model".into(),
825            "mode".into(),
826            "retry_count".into(),
827            "prompt_tokens".into(),
828            "completion_tokens".into(),
829            "sources_flat".into(),
830            "citations".into(),
831            "validation".into(),
832        ]);
833        let mut record = UnifiedRecord::new();
834        record.set("answer", SchemaValue::text("Deploy failed [^1]."));
835        record.set("provider", SchemaValue::text("openai"));
836        record.set("model", SchemaValue::text("gpt-4o-mini"));
837        record.set("mode", SchemaValue::text("strict"));
838        record.set("retry_count", SchemaValue::Integer(0));
839        record.set("prompt_tokens", SchemaValue::Integer(11));
840        record.set("completion_tokens", SchemaValue::Integer(7));
841        record.set(
842            "sources_flat",
843            SchemaValue::Json(
844                br#"[{"urn":"urn:reddb:row:deployments:1","kind":"row","collection":"deployments","id":"1"}]"#.to_vec(),
845            ),
846        );
847        record.set(
848            "citations",
849            SchemaValue::Json(br#"[{"marker":1,"urn":"urn:reddb:row:deployments:1"}]"#.to_vec()),
850        );
851        record.set(
852            "validation",
853            SchemaValue::Json(br#"{"ok":true,"warnings":[],"errors":[]}"#.to_vec()),
854        );
855        result.push(record);
856
857        RuntimeQueryResult {
858            query: "ASK 'why did deploy fail?'".to_string(),
859            mode: QueryMode::Sql,
860            statement: "ask",
861            engine: "runtime-ai",
862            result,
863            affected_rows: 0,
864            statement_type: "select",
865            bookmark: None,
866        }
867    }
868
869    #[test]
870    fn query_reply_ask_result_json_uses_full_canonical_schema() {
871        let reply = query_reply(ask_runtime_result(), &None, &None);
872        let json: crate::json::Value =
873            crate::json::from_str(&reply.result_json).expect("valid ask json");
874
875        assert_eq!(
876            json.get("answer").and_then(crate::json::Value::as_str),
877            Some("Deploy failed [^1].")
878        );
879        assert_eq!(
880            json.get("cache_hit").and_then(crate::json::Value::as_bool),
881            Some(false)
882        );
883        assert_eq!(
884            json.get("cost_usd").and_then(crate::json::Value::as_f64),
885            Some(0.0)
886        );
887        assert_eq!(
888            json.get("mode").and_then(crate::json::Value::as_str),
889            Some("strict")
890        );
891        assert_eq!(
892            json.get("retry_count").and_then(crate::json::Value::as_u64),
893            Some(0)
894        );
895        assert!(
896            json.get("records").is_none(),
897            "ASK must not be row-wrapped: {}",
898            reply.result_json
899        );
900        assert!(
901            json.get("sources_flat")
902                .and_then(crate::json::Value::as_array)
903                .is_some_and(|sources| sources.len() == 1
904                    && sources[0]
905                        .get("payload")
906                        .and_then(crate::json::Value::as_str)
907                        .is_some()),
908            "sources_flat must be parsed with payload fallback: {}",
909            reply.result_json
910        );
911        assert!(
912            json.get("citations")
913                .and_then(crate::json::Value::as_array)
914                .is_some_and(|citations| citations.len() == 1),
915            "citations must be parsed: {}",
916            reply.result_json
917        );
918        assert_eq!(
919            json.get("validation")
920                .and_then(|v| v.get("ok"))
921                .and_then(crate::json::Value::as_bool),
922            Some(true)
923        );
924    }
925
926    #[test]
927    fn query_reply_non_ask_answer_column_keeps_row_shape() {
928        let mut result = UnifiedResult::with_columns(vec!["answer".into()]);
929        let mut record = UnifiedRecord::new();
930        record.set("answer", SchemaValue::text("plain select"));
931        result.push(record);
932
933        let reply = query_reply(
934            RuntimeQueryResult {
935                query: "SELECT 'plain select' AS answer".to_string(),
936                mode: QueryMode::Sql,
937                statement: "select",
938                engine: "runtime-sql",
939                result,
940                affected_rows: 0,
941                statement_type: "select",
942                bookmark: None,
943            },
944            &None,
945            &None,
946        );
947        let json: crate::json::Value =
948            crate::json::from_str(&reply.result_json).expect("valid query json");
949
950        assert!(
951            json.get("records").is_some(),
952            "non-ASK must stay row-wrapped"
953        );
954        assert!(
955            json.get("answer").is_none(),
956            "non-ASK must not use ASK envelope"
957        );
958    }
959
960    #[test]
961    fn query_reply_non_ask_json_column_preserves_object() {
962        let mut result = UnifiedResult::with_columns(vec!["value".into()]);
963        let mut record = UnifiedRecord::new();
964        record.set(
965            "value",
966            SchemaValue::Json(br#"{"alpha":"A","nested":{"leaf":12}}"#.to_vec()),
967        );
968        result.push(record);
969
970        let reply = query_reply(
971            RuntimeQueryResult {
972                query: "LIST KV proj AS JSON".to_string(),
973                mode: QueryMode::Sql,
974                statement: "kv_list_json",
975                engine: "kv",
976                result,
977                affected_rows: 0,
978                statement_type: "select",
979                bookmark: None,
980            },
981            &None,
982            &None,
983        );
984        let json: crate::json::Value =
985            crate::json::from_str(&reply.result_json).expect("valid query json");
986        let value = json
987            .get("records")
988            .and_then(crate::json::Value::as_array)
989            .and_then(|records| records.first())
990            .and_then(|record| record.get("value"))
991            .expect("value column");
992
993        assert_eq!(
994            value.get("alpha").and_then(crate::json::Value::as_str),
995            Some("A")
996        );
997        assert_eq!(
998            value
999                .get("nested")
1000                .and_then(|nested| nested.get("leaf"))
1001                .and_then(crate::json::Value::as_f64),
1002            Some(12.0)
1003        );
1004    }
1005}
1006
1007/// Emit a single info-level event with the SHA-256 fingerprint of the
1008/// active gRPC server cert + an mTLS flag. Never logs PEM bytes.
1009fn log_grpc_tls_identity(tls: &GrpcTlsOptions) {
1010    use sha2::{Digest, Sha256};
1011    let cert_fp = {
1012        let mut h = Sha256::new();
1013        h.update(&tls.cert_pem);
1014        let digest = h.finalize();
1015        // First 16 hex chars are enough for human cross-check; the full
1016        // SHA-256 lives in audit logs only.
1017        let mut buf = String::with_capacity(64);
1018        for b in digest.iter() {
1019            buf.push_str(&format!("{b:02x}"));
1020        }
1021        buf
1022    };
1023    tracing::info!(
1024        target: "reddb::security",
1025        transport = "grpc",
1026        cert_sha256 = %cert_fp,
1027        mtls = tls.client_ca_pem.is_some(),
1028        "gRPC TLS identity loaded"
1029    );
1030}
1031
1032include!("grpc/service_impl.rs");