Skip to main content

reddb_server/
grpc.rs

1pub(crate) use crate::application::json_input::{
2    json_bool_field, json_f32_field, json_string_field, json_usize_field,
3};
4pub(crate) use crate::application::{
5    AdminUseCases, CatalogUseCases, CreateEdgeInput, CreateEntityOutput, CreateNodeGraphLinkInput,
6    CreateNodeInput, CreateNodeTableLinkInput, CreateRowInput, CreateVectorInput,
7    DeleteEntityInput, EntityUseCases, ExecuteQueryInput, ExplainQueryInput, GraphCentralityInput,
8    GraphClusteringInput, GraphCommunitiesInput, GraphComponentsInput, GraphCyclesInput,
9    GraphHitsInput, GraphNeighborhoodInput, GraphPersonalizedPageRankInput, GraphShortestPathInput,
10    GraphTopologicalSortInput, GraphTraversalInput, GraphUseCases, InspectNativeArtifactInput,
11    NativeUseCases, PatchEntityInput, QueryUseCases, SearchHybridInput, SearchIvfInput,
12    SearchSimilarInput, SearchTextInput,
13};
14use std::collections::BTreeMap;
15use std::pin::Pin;
16use std::sync::Arc;
17use std::time::{SystemTime, UNIX_EPOCH};
18
19use crate::api::{RedDBOptions, RedDBResult};
20use crate::auth::middleware::{check_permission, AuthResult};
21use crate::auth::store::AuthStore;
22use crate::auth::Role;
23use crate::health::{HealthProvider, HealthState};
24use crate::json::{
25    from_str as json_from_str, to_string as json_to_string, Map, Value as JsonValue,
26};
27use crate::runtime::{
28    RedDBRuntime, RuntimeFilter, RuntimeFilterValue, RuntimeGraphCentralityAlgorithm,
29    RuntimeGraphCentralityResult, RuntimeGraphClusteringResult, RuntimeGraphCommunityAlgorithm,
30    RuntimeGraphCommunityResult, RuntimeGraphComponentsMode, RuntimeGraphComponentsResult,
31    RuntimeGraphCyclesResult, RuntimeGraphDirection, RuntimeGraphHitsResult,
32    RuntimeGraphNeighborhoodResult, RuntimeGraphPathAlgorithm, RuntimeGraphPathResult,
33    RuntimeGraphPattern, RuntimeGraphProjection, RuntimeGraphTopologicalSortResult,
34    RuntimeGraphTraversalResult, RuntimeGraphTraversalStrategy, RuntimeIvfSearchResult,
35    RuntimeQueryResult, RuntimeQueryWeights, RuntimeStats, ScanPage,
36};
37use crate::storage::schema::Value;
38use crate::storage::unified::devx::refs::{NodeRef, TableRef};
39use crate::storage::unified::{Metadata, MetadataValue};
40use crate::storage::{EntityData, EntityId, UnifiedEntity};
41use tokio_stream::wrappers::TcpListenerStream;
42use tonic::metadata::MetadataMap;
43use tonic::{Request, Response, Status};
44
45// gRPC protobuf types and tonic stubs live in the standalone
46// `reddb-grpc-proto` crate so `reddb-server` and `reddb-client`
47// can both consume them without a dependency cycle. We expose
48// them under the legacy `proto` module path so existing
49// `crate::grpc::proto::…` imports keep resolving.
50pub use reddb_grpc_proto as proto;
51
52use proto::red_db_server::{RedDb, RedDbServer};
53use proto::{
54    ask_stream_event, AskAnswerToken, AskReply, AskRequest, AskSources, AskStreamEvent,
55    BatchInsertChunk, BatchInsertReply, BatchQueryReply, BatchQueryRequest, BulkEntityReply,
56    Citation, CollectionRequest,
57    CollectionsReply, DeleteEntityRequest, DeploymentProfileRequest, Empty, EntityReply,
58    ExecutePreparedRequest, ExportRequest, GraphProjectionUpsertRequest, HealthReply,
59    IndexNameRequest, IndexToggleRequest, JsonBulkCreateRequest, JsonCreateRequest,
60    JsonPayloadRequest, KvWatchEvent, KvWatchRequest, ManifestRequest, OperationReply,
61    PayloadReply, PrepareQueryReply, PrepareQueryRequest, QueryReply, QueryRequest, QueryValue,
62    ScanEntity, ScanReply, ScanRequest, StatsReply, TopologyReply, TopologyRequest,
63    UpdateEntityRequest, Validation, ValidationItem,
64};
65
66mod control_support;
67mod entity_ops;
68mod input_support;
69pub(crate) mod scan_json;
70
71use self::control_support::*;
72use self::entity_ops::*;
73use self::input_support::*;
74use self::scan_json::*;
75
76#[derive(Debug, Clone)]
77pub struct GrpcServerOptions {
78    pub bind_addr: String,
79    /// Optional TLS configuration. When set the server terminates
80    /// TLS for inbound gRPC traffic via `tonic::transport::ServerTlsConfig`.
81    /// When `None`, the listener stays plaintext (back-compat for
82    /// loopback / sidecar deployments where a sidecar terminates TLS).
83    pub tls: Option<GrpcTlsOptions>,
84}
85
86/// PEM-encoded TLS material for gRPC's tonic-rustls server.
87///
88/// The server identity is required (cert + key); the optional
89/// client-CA enables mTLS — when present, tonic verifies and
90/// requires a client cert chain that anchors at this CA bundle.
91#[derive(Debug, Clone)]
92pub struct GrpcTlsOptions {
93    /// PEM bytes for the server certificate chain (leaf first).
94    pub cert_pem: Vec<u8>,
95    /// PEM bytes for the server private key (PKCS#8 / SEC1 / RSA).
96    pub key_pem: Vec<u8>,
97    /// Optional PEM bytes for the trust anchor used to verify
98    /// client certificates. When `Some(_)`, the server requires
99    /// every client to present a cert that chains to this CA;
100    /// when `None`, the server runs one-way TLS only.
101    pub client_ca_pem: Option<Vec<u8>>,
102}
103
104impl GrpcTlsOptions {
105    /// Build a `tonic` `ServerTlsConfig` from PEM bytes, applying
106    /// rustls defaults (TLS 1.2 + 1.3 — older versions are not
107    /// negotiable on tokio-rustls 0.26).
108    pub fn to_tonic_config(
109        &self,
110    ) -> Result<tonic::transport::ServerTlsConfig, Box<dyn std::error::Error>> {
111        let identity = tonic::transport::Identity::from_pem(&self.cert_pem, &self.key_pem);
112        let mut cfg = tonic::transport::ServerTlsConfig::new().identity(identity);
113        if let Some(ca_pem) = &self.client_ca_pem {
114            cfg = cfg.client_ca_root(tonic::transport::Certificate::from_pem(ca_pem));
115        }
116        Ok(cfg)
117    }
118}
119
120impl Default for GrpcServerOptions {
121    fn default() -> Self {
122        Self {
123            bind_addr: "127.0.0.1:5555".to_string(),
124            tls: None,
125        }
126    }
127}
128
129#[derive(Clone)]
130pub struct RedDBGrpcServer {
131    runtime: RedDBRuntime,
132    options: GrpcServerOptions,
133    auth_store: Arc<AuthStore>,
134    /// Optional OAuth/OIDC JWT validator. When set, the gRPC
135    /// interceptor validates JWT-shaped bearers against the issuer's
136    /// JWKS *before* attempting `AuthStore` session/api-key lookups.
137    /// Build externally via `crate::auth::OAuthValidator::with_verifier`
138    /// and attach with [`Self::with_oauth_validator`].
139    oauth_validator: Option<Arc<crate::auth::OAuthValidator>>,
140}
141
142impl RedDBGrpcServer {
143    pub fn new(runtime: RedDBRuntime) -> Self {
144        let auth_config = crate::auth::AuthConfig::default();
145        let auth_store = Arc::new(AuthStore::new(auth_config));
146        Self::with_options(runtime, GrpcServerOptions::default(), auth_store)
147    }
148
149    pub fn from_database_options(
150        db_options: RedDBOptions,
151        options: GrpcServerOptions,
152    ) -> RedDBResult<Self> {
153        // Create runtime first so we can access the pager for vault pages.
154        let runtime = RedDBRuntime::with_options(db_options.clone())?;
155
156        let auth_store = if db_options.auth.vault_enabled {
157            // The vault stores its encrypted state in reserved pages inside
158            // the main .rdb file.  Extract the pager reference from the
159            // runtime's underlying store.
160            let pager = runtime.db().store().pager().cloned().ok_or_else(|| {
161                crate::api::RedDBError::Internal(
162                    "vault requires a paged database (persistent mode)".into(),
163                )
164            })?;
165            let store = AuthStore::with_vault(db_options.auth.clone(), pager, None)
166                .map_err(|e| crate::api::RedDBError::Internal(e.to_string()))?;
167            Arc::new(store)
168        } else {
169            Arc::new(AuthStore::new(db_options.auth.clone()))
170        };
171        auth_store.bootstrap_from_env();
172        Ok(Self::with_options(runtime, options, auth_store))
173    }
174
175    pub fn with_options(
176        runtime: RedDBRuntime,
177        options: GrpcServerOptions,
178        auth_store: Arc<AuthStore>,
179    ) -> Self {
180        // Inject the auth store into the runtime so that Value::Secret
181        // auto-encrypt/decrypt can read the vault AES key.
182        runtime.set_auth_store(Arc::clone(&auth_store));
183        Self {
184            runtime,
185            options,
186            auth_store,
187            oauth_validator: None,
188        }
189    }
190
191    /// Attach an externally-constructed OAuth/OIDC JWT validator. Once
192    /// set, JWT-shaped bearer tokens (3-segment) on the
193    /// `authorization` metadata are validated against the issuer's
194    /// JWKS, expiry, audience, etc. Non-JWT bearers fall back to the
195    /// `AuthStore` session/API-key path.
196    pub fn with_oauth_validator(mut self, validator: Arc<crate::auth::OAuthValidator>) -> Self {
197        self.oauth_validator = Some(validator);
198        self
199    }
200
201    /// Inspect the active OAuth validator, when one is configured.
202    pub fn oauth_validator(&self) -> Option<&Arc<crate::auth::OAuthValidator>> {
203        self.oauth_validator.as_ref()
204    }
205
206    pub fn runtime(&self) -> &RedDBRuntime {
207        &self.runtime
208    }
209
210    pub fn options(&self) -> &GrpcServerOptions {
211        &self.options
212    }
213
214    pub fn auth_store(&self) -> &Arc<AuthStore> {
215        &self.auth_store
216    }
217
218    fn grpc_runtime(&self) -> GrpcRuntime {
219        GrpcRuntime {
220            runtime: self.runtime.clone(),
221            auth_store: self.auth_store.clone(),
222            prepared_registry: PreparedStatementRegistry::new(),
223            oauth_validator: self.oauth_validator.clone(),
224        }
225    }
226
227    pub async fn serve(&self) -> Result<(), Box<dyn std::error::Error>> {
228        let addr = self.options.bind_addr.parse()?;
229        let mut builder = tonic::transport::Server::builder();
230        if let Some(tls) = &self.options.tls {
231            // Constant-time SHA256 fingerprint logged for ops triage —
232            // never the bytes of cert/key themselves.
233            log_grpc_tls_identity(tls);
234            builder = builder.tls_config(tls.to_tonic_config()?)?;
235        }
236        builder
237            .add_service(Self::configured_service(self.grpc_runtime()))
238            .serve(addr)
239            .await?;
240        Ok(())
241    }
242
243    pub async fn serve_on(
244        &self,
245        listener: std::net::TcpListener,
246    ) -> Result<(), Box<dyn std::error::Error>> {
247        listener.set_nonblocking(true)?;
248        let listener = tokio::net::TcpListener::from_std(listener)?;
249        let incoming = TcpListenerStream::new(listener);
250        let mut builder = tonic::transport::Server::builder();
251        if let Some(tls) = &self.options.tls {
252            log_grpc_tls_identity(tls);
253            builder = builder.tls_config(tls.to_tonic_config()?)?;
254        }
255        builder
256            .add_service(Self::configured_service(self.grpc_runtime()))
257            .serve_with_incoming(incoming)
258            .await?;
259        Ok(())
260    }
261
262    fn configured_service(runtime: GrpcRuntime) -> RedDbServer<GrpcRuntime> {
263        // Advertise zstd + gzip so clients can opt in. Server compresses
264        // outbound replies with zstd; sticking to a single send codec keeps
265        // CPU predictable while still accepting either on inbound.
266        use tonic::codec::CompressionEncoding;
267        RedDbServer::new(runtime)
268            .max_decoding_message_size(256 * 1024 * 1024)
269            .max_encoding_message_size(256 * 1024 * 1024)
270            .accept_compressed(CompressionEncoding::Zstd)
271            .accept_compressed(CompressionEncoding::Gzip)
272            .send_compressed(CompressionEncoding::Zstd)
273    }
274}
275
276/// Server-side prepared statement — parsed + parameterized once, executed N times.
277struct GrpcPreparedStatement {
278    shape: std::sync::Arc<crate::storage::query::ast::QueryExpr>,
279    parameter_count: usize,
280    created_at: std::time::Instant,
281}
282
283/// Registry of prepared statements for one server instance.
284/// Session-independent: any connection can execute any prepared statement by ID.
285struct PreparedStatementRegistry {
286    // parking_lot::RwLock never poisons on panic — safe to use without unwrap().
287    map: parking_lot::RwLock<std::collections::HashMap<u64, GrpcPreparedStatement>>,
288    next_id: std::sync::atomic::AtomicU64,
289    get_count: std::sync::atomic::AtomicU64,
290}
291
292impl PreparedStatementRegistry {
293    fn new() -> Arc<Self> {
294        Arc::new(Self {
295            map: parking_lot::RwLock::new(std::collections::HashMap::new()),
296            next_id: std::sync::atomic::AtomicU64::new(1),
297            get_count: std::sync::atomic::AtomicU64::new(0),
298        })
299    }
300
301    fn prepare(&self, shape: crate::storage::query::ast::QueryExpr, parameter_count: usize) -> u64 {
302        use std::sync::atomic::Ordering;
303        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
304        let mut map = self.map.write();
305        self.evict_old_locked(&mut map);
306        map.insert(
307            id,
308            GrpcPreparedStatement {
309                // Store as Arc to avoid cloning the full AST on every execute.
310                shape: std::sync::Arc::new(shape),
311                parameter_count,
312                created_at: std::time::Instant::now(),
313            },
314        );
315        id
316    }
317
318    fn get_shape_and_count(
319        &self,
320        id: u64,
321    ) -> Option<(std::sync::Arc<crate::storage::query::ast::QueryExpr>, usize)> {
322        // Periodic eviction on execute/get traffic so long-lived servers that
323        // prepare once and execute many times still age out stale statements.
324        let get_count = self
325            .get_count
326            .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
327            + 1;
328        if get_count.is_multiple_of(256) {
329            let mut map = self.map.write();
330            self.evict_old_locked(&mut map);
331        }
332        let map = self.map.read();
333        map.get(&id)
334            .map(|s| (std::sync::Arc::clone(&s.shape), s.parameter_count))
335    }
336
337    fn evict_old_locked(&self, map: &mut std::collections::HashMap<u64, GrpcPreparedStatement>) {
338        let threshold = std::time::Duration::from_secs(3600);
339        map.retain(|_, v| v.created_at.elapsed() < threshold);
340    }
341}
342
343#[derive(Clone)]
344struct GrpcRuntime {
345    runtime: RedDBRuntime,
346    auth_store: Arc<AuthStore>,
347    prepared_registry: Arc<PreparedStatementRegistry>,
348    /// OAuth/OIDC JWT validator built once from `auth_store.config().oauth`
349    /// when the operator enables OAuth. `None` means JWT bearers fall
350    /// back to the AuthStore lookup path.
351    oauth_validator: Option<Arc<crate::auth::OAuthValidator>>,
352}
353
354impl GrpcRuntime {
355    fn admin_use_cases(&self) -> AdminUseCases<'_, RedDBRuntime> {
356        AdminUseCases::new(&self.runtime)
357    }
358
359    fn catalog_use_cases(&self) -> CatalogUseCases<'_, RedDBRuntime> {
360        CatalogUseCases::new(&self.runtime)
361    }
362
363    fn query_use_cases(&self) -> QueryUseCases<'_, RedDBRuntime> {
364        QueryUseCases::new(&self.runtime)
365    }
366
367    fn entity_use_cases(&self) -> EntityUseCases<'_, RedDBRuntime> {
368        EntityUseCases::new(&self.runtime)
369    }
370
371    fn graph_use_cases(&self) -> GraphUseCases<'_, RedDBRuntime> {
372        GraphUseCases::new(&self.runtime)
373    }
374
375    fn native_use_cases(&self) -> NativeUseCases<'_, RedDBRuntime> {
376        NativeUseCases::new(&self.runtime)
377    }
378}
379
380fn grpc_query_value_to_schema_value(value: QueryValue) -> Result<Value, Status> {
381    use proto::query_value::Kind;
382
383    match value
384        .kind
385        .ok_or_else(|| Status::invalid_argument("missing query param value"))?
386    {
387        Kind::NullValue(_) => Ok(Value::Null),
388        Kind::BoolValue(value) => Ok(Value::Boolean(value)),
389        Kind::IntValue(value) => Ok(Value::Integer(value)),
390        Kind::FloatValue(value) => Ok(Value::Float(value)),
391        Kind::TextValue(value) => Ok(Value::Text(std::sync::Arc::from(value))),
392        Kind::BytesValue(value) => Ok(Value::Blob(value)),
393        Kind::VectorValue(value) => Ok(Value::Vector(value.values)),
394        Kind::JsonValue(value) => {
395            let parsed = json_from_str::<JsonValue>(&value)
396                .map_err(|e| Status::invalid_argument(format!("json param parse error: {e}")))?;
397            let encoded = json_to_string(&parsed)
398                .map_err(|e| Status::invalid_argument(format!("json param encode error: {e}")))?;
399            Ok(Value::Json(encoded.into_bytes()))
400        }
401        Kind::TimestampValue(value) => Ok(Value::Timestamp(value)),
402        Kind::UuidValue(value) => {
403            let bytes: [u8; 16] = value.try_into().map_err(|value: Vec<u8>| {
404                Status::invalid_argument(format!(
405                    "uuid param must be 16 bytes, got {}",
406                    value.len()
407                ))
408            })?;
409            Ok(Value::Uuid(bytes))
410        }
411    }
412}
413
414fn execute_grpc_query_with_optional_params(
415    runtime: &RedDBRuntime,
416    query: String,
417    params: Vec<QueryValue>,
418) -> Result<RuntimeQueryResult, Status> {
419    if params.is_empty() {
420        return runtime.execute_query(&query).map_err(to_status);
421    }
422
423    let binds = params
424        .into_iter()
425        .map(grpc_query_value_to_schema_value)
426        .collect::<Result<Vec<_>, _>>()?;
427    let parsed = crate::storage::query::modes::parse_multi(&query)
428        .map_err(|e| Status::invalid_argument(format!("parse error: {e}")))?;
429    let bound = crate::storage::query::user_params::bind(&parsed, &binds)
430        .map_err(|e| Status::invalid_argument(format!("bind error: {e}")))?;
431    runtime.execute_query_expr(bound).map_err(to_status)
432}
433
434#[cfg(test)]
435mod grpc_query_value_tests {
436    use super::*;
437    use proto::query_value::Kind;
438
439    #[test]
440    fn grpc_query_value_maps_to_schema_value_variants() {
441        let cases = vec![
442            (
443                QueryValue {
444                    kind: Some(Kind::NullValue(proto::QueryNull {})),
445                },
446                Value::Null,
447            ),
448            (
449                QueryValue {
450                    kind: Some(Kind::BoolValue(true)),
451                },
452                Value::Boolean(true),
453            ),
454            (
455                QueryValue {
456                    kind: Some(Kind::IntValue(42)),
457                },
458                Value::Integer(42),
459            ),
460            (
461                QueryValue {
462                    kind: Some(Kind::FloatValue(1.5)),
463                },
464                Value::Float(1.5),
465            ),
466            (
467                QueryValue {
468                    kind: Some(Kind::BytesValue(vec![0, 1, 2])),
469                },
470                Value::Blob(vec![0, 1, 2]),
471            ),
472            (
473                QueryValue {
474                    kind: Some(Kind::VectorValue(proto::QueryVector {
475                        values: vec![0.25, 0.5],
476                    })),
477                },
478                Value::Vector(vec![0.25, 0.5]),
479            ),
480            (
481                QueryValue {
482                    kind: Some(Kind::TimestampValue(1_779_999_000)),
483                },
484                Value::Timestamp(1_779_999_000),
485            ),
486            (
487                QueryValue {
488                    kind: Some(Kind::UuidValue(vec![0x11; 16])),
489                },
490                Value::Uuid([0x11; 16]),
491            ),
492        ];
493
494        for (input, expected) in cases {
495            assert_eq!(grpc_query_value_to_schema_value(input).unwrap(), expected);
496        }
497
498        assert_eq!(
499            grpc_query_value_to_schema_value(QueryValue {
500                kind: Some(Kind::TextValue("alice".into())),
501            })
502            .unwrap(),
503            Value::Text(std::sync::Arc::from("alice"))
504        );
505        assert_eq!(
506            grpc_query_value_to_schema_value(QueryValue {
507                kind: Some(Kind::JsonValue("{\"role\":\"admin\"}".into())),
508            })
509            .unwrap(),
510            Value::Json(b"{\"role\":\"admin\"}".to_vec())
511        );
512    }
513
514    #[test]
515    fn grpc_query_value_rejects_missing_kind_and_bad_uuid() {
516        assert!(grpc_query_value_to_schema_value(QueryValue { kind: None }).is_err());
517        assert!(grpc_query_value_to_schema_value(QueryValue {
518            kind: Some(Kind::UuidValue(vec![0; 15])),
519        })
520        .is_err());
521    }
522}
523
524#[cfg(test)]
525mod grpc_ask_query_reply_tests {
526    use super::*;
527    use crate::storage::query::modes::QueryMode;
528    use crate::storage::query::unified::{UnifiedRecord, UnifiedResult};
529    use crate::storage::schema::Value as SchemaValue;
530
531    fn ask_runtime_result() -> RuntimeQueryResult {
532        let mut result = UnifiedResult::with_columns(vec![
533            "answer".into(),
534            "provider".into(),
535            "model".into(),
536            "mode".into(),
537            "retry_count".into(),
538            "prompt_tokens".into(),
539            "completion_tokens".into(),
540            "sources_flat".into(),
541            "citations".into(),
542            "validation".into(),
543        ]);
544        let mut record = UnifiedRecord::new();
545        record.set("answer", SchemaValue::text("Deploy failed [^1]."));
546        record.set("provider", SchemaValue::text("openai"));
547        record.set("model", SchemaValue::text("gpt-4o-mini"));
548        record.set("mode", SchemaValue::text("strict"));
549        record.set("retry_count", SchemaValue::Integer(0));
550        record.set("prompt_tokens", SchemaValue::Integer(11));
551        record.set("completion_tokens", SchemaValue::Integer(7));
552        record.set(
553            "sources_flat",
554            SchemaValue::Json(
555                br#"[{"urn":"urn:reddb:row:deployments:1","kind":"row","collection":"deployments","id":"1"}]"#.to_vec(),
556            ),
557        );
558        record.set(
559            "citations",
560            SchemaValue::Json(br#"[{"marker":1,"urn":"urn:reddb:row:deployments:1"}]"#.to_vec()),
561        );
562        record.set(
563            "validation",
564            SchemaValue::Json(br#"{"ok":true,"warnings":[],"errors":[]}"#.to_vec()),
565        );
566        result.push(record);
567
568        RuntimeQueryResult {
569            query: "ASK 'why did deploy fail?'".to_string(),
570            mode: QueryMode::Sql,
571            statement: "ask",
572            engine: "runtime-ai",
573            result,
574            affected_rows: 0,
575            statement_type: "select",
576        }
577    }
578
579    #[test]
580    fn query_reply_ask_result_json_uses_full_canonical_schema() {
581        let reply = query_reply(ask_runtime_result(), &None, &None);
582        let json: crate::json::Value =
583            crate::json::from_str(&reply.result_json).expect("valid ask json");
584
585        assert_eq!(
586            json.get("answer").and_then(crate::json::Value::as_str),
587            Some("Deploy failed [^1].")
588        );
589        assert_eq!(
590            json.get("cache_hit").and_then(crate::json::Value::as_bool),
591            Some(false)
592        );
593        assert_eq!(
594            json.get("cost_usd").and_then(crate::json::Value::as_f64),
595            Some(0.0)
596        );
597        assert_eq!(
598            json.get("mode").and_then(crate::json::Value::as_str),
599            Some("strict")
600        );
601        assert_eq!(
602            json.get("retry_count").and_then(crate::json::Value::as_u64),
603            Some(0)
604        );
605        assert!(
606            json.get("records").is_none(),
607            "ASK must not be row-wrapped: {}",
608            reply.result_json
609        );
610        assert!(
611            json.get("sources_flat")
612                .and_then(crate::json::Value::as_array)
613                .is_some_and(|sources| sources.len() == 1
614                    && sources[0]
615                        .get("payload")
616                        .and_then(crate::json::Value::as_str)
617                        .is_some()),
618            "sources_flat must be parsed with payload fallback: {}",
619            reply.result_json
620        );
621        assert!(
622            json.get("citations")
623                .and_then(crate::json::Value::as_array)
624                .is_some_and(|citations| citations.len() == 1),
625            "citations must be parsed: {}",
626            reply.result_json
627        );
628        assert_eq!(
629            json.get("validation")
630                .and_then(|v| v.get("ok"))
631                .and_then(crate::json::Value::as_bool),
632            Some(true)
633        );
634    }
635
636    #[test]
637    fn query_reply_non_ask_answer_column_keeps_row_shape() {
638        let mut result = UnifiedResult::with_columns(vec!["answer".into()]);
639        let mut record = UnifiedRecord::new();
640        record.set("answer", SchemaValue::text("plain select"));
641        result.push(record);
642
643        let reply = query_reply(
644            RuntimeQueryResult {
645                query: "SELECT 'plain select' AS answer".to_string(),
646                mode: QueryMode::Sql,
647                statement: "select",
648                engine: "runtime-sql",
649                result,
650                affected_rows: 0,
651                statement_type: "select",
652            },
653            &None,
654            &None,
655        );
656        let json: crate::json::Value =
657            crate::json::from_str(&reply.result_json).expect("valid query json");
658
659        assert!(
660            json.get("records").is_some(),
661            "non-ASK must stay row-wrapped"
662        );
663        assert!(
664            json.get("answer").is_none(),
665            "non-ASK must not use ASK envelope"
666        );
667    }
668}
669
670/// Emit a single info-level event with the SHA-256 fingerprint of the
671/// active gRPC server cert + an mTLS flag. Never logs PEM bytes.
672fn log_grpc_tls_identity(tls: &GrpcTlsOptions) {
673    use sha2::{Digest, Sha256};
674    let cert_fp = {
675        let mut h = Sha256::new();
676        h.update(&tls.cert_pem);
677        let digest = h.finalize();
678        // First 16 hex chars are enough for human cross-check; the full
679        // SHA-256 lives in audit logs only.
680        let mut buf = String::with_capacity(64);
681        for b in digest.iter() {
682            buf.push_str(&format!("{b:02x}"));
683        }
684        buf
685    };
686    tracing::info!(
687        target: "reddb::security",
688        transport = "grpc",
689        cert_sha256 = %cert_fp,
690        mtls = tls.client_ca_pem.is_some(),
691        "gRPC TLS identity loaded"
692    );
693}
694
695include!("grpc/service_impl.rs");