Skip to main content

reddb_server/
grpc.rs

1pub(crate) use crate::application::json_input::{
2    json_bool_field, json_f32_field, json_string_field, json_usize_field,
3};
4pub(crate) use crate::application::{
5    AdminUseCases, CatalogUseCases, CreateEdgeInput, CreateEntityOutput, CreateNodeGraphLinkInput,
6    CreateNodeInput, CreateNodeTableLinkInput, CreateRowInput, CreateVectorInput,
7    DeleteEntityInput, EntityUseCases, ExecuteQueryInput, ExplainQueryInput, GraphCentralityInput,
8    GraphClusteringInput, GraphCommunitiesInput, GraphComponentsInput, GraphCyclesInput,
9    GraphHitsInput, GraphNeighborhoodInput, GraphPersonalizedPageRankInput, GraphShortestPathInput,
10    GraphTopologicalSortInput, GraphTraversalInput, GraphUseCases, InspectNativeArtifactInput,
11    NativeUseCases, PatchEntityInput, QueryUseCases, SearchHybridInput, SearchIvfInput,
12    SearchSimilarInput, SearchTextInput,
13};
14use std::collections::BTreeMap;
15use std::pin::Pin;
16use std::sync::Arc;
17use std::time::{SystemTime, UNIX_EPOCH};
18
19use crate::api::{RedDBOptions, RedDBResult};
20use crate::auth::middleware::{check_permission, AuthResult};
21use crate::auth::store::AuthStore;
22use crate::auth::Role;
23use crate::health::{HealthProvider, HealthState};
24use crate::json::{
25    from_str as json_from_str, to_string as json_to_string, Map, Value as JsonValue,
26};
27use crate::runtime::{
28    RedDBRuntime, RuntimeFilter, RuntimeFilterValue, RuntimeGraphCentralityAlgorithm,
29    RuntimeGraphCentralityResult, RuntimeGraphClusteringResult, RuntimeGraphCommunityAlgorithm,
30    RuntimeGraphCommunityResult, RuntimeGraphComponentsMode, RuntimeGraphComponentsResult,
31    RuntimeGraphCyclesResult, RuntimeGraphDirection, RuntimeGraphHitsResult,
32    RuntimeGraphNeighborhoodResult, RuntimeGraphPathAlgorithm, RuntimeGraphPathResult,
33    RuntimeGraphPattern, RuntimeGraphProjection, RuntimeGraphTopologicalSortResult,
34    RuntimeGraphTraversalResult, RuntimeGraphTraversalStrategy, RuntimeIvfSearchResult,
35    RuntimeQueryResult, RuntimeQueryWeights, RuntimeStats, ScanPage,
36};
37use crate::storage::schema::Value;
38use crate::storage::unified::devx::refs::{NodeRef, TableRef};
39use crate::storage::unified::{Metadata, MetadataValue};
40use crate::storage::{EntityData, EntityId, UnifiedEntity};
41use tokio_stream::wrappers::TcpListenerStream;
42use tonic::metadata::MetadataMap;
43use tonic::{Request, Response, Status};
44
45// gRPC protobuf types and tonic stubs live in the standalone
46// `reddb-grpc-proto` crate so `reddb-server` and `reddb-client`
47// can both consume them without a dependency cycle. We expose
48// them under the legacy `proto` module path so existing
49// `crate::grpc::proto::…` imports keep resolving.
50pub use reddb_grpc_proto as proto;
51
52use proto::red_db_server::{RedDb, RedDbServer};
53use proto::{
54    ask_stream_event, AskAnswerToken, AskReply, AskRequest, AskSources, AskStreamEvent,
55    BatchQueryReply, BatchQueryRequest, BulkEntityReply, Citation,
56    CollectionRequest, CollectionsReply, DeleteEntityRequest, DeploymentProfileRequest, Empty,
57    EntityReply, ExecutePreparedRequest, ExportRequest, GraphProjectionUpsertRequest, HealthReply,
58    IndexNameRequest, IndexToggleRequest, JsonBulkCreateRequest, JsonCreateRequest,
59    JsonPayloadRequest, KvWatchEvent, KvWatchRequest, ManifestRequest, OperationReply,
60    PayloadReply, PrepareQueryReply, PrepareQueryRequest, QueryReply, QueryRequest, QueryValue,
61    ScanEntity, ScanReply, ScanRequest, StatsReply, TopologyReply, TopologyRequest,
62    UpdateEntityRequest, Validation, ValidationItem,
63};
64
65mod control_support;
66mod entity_ops;
67mod input_support;
68pub(crate) mod scan_json;
69
70use self::control_support::*;
71use self::entity_ops::*;
72use self::input_support::*;
73use self::scan_json::*;
74
75#[derive(Debug, Clone)]
76pub struct GrpcServerOptions {
77    pub bind_addr: String,
78    /// Optional TLS configuration. When set the server terminates
79    /// TLS for inbound gRPC traffic via `tonic::transport::ServerTlsConfig`.
80    /// When `None`, the listener stays plaintext (back-compat for
81    /// loopback / sidecar deployments where a sidecar terminates TLS).
82    pub tls: Option<GrpcTlsOptions>,
83}
84
85/// PEM-encoded TLS material for gRPC's tonic-rustls server.
86///
87/// The server identity is required (cert + key); the optional
88/// client-CA enables mTLS — when present, tonic verifies and
89/// requires a client cert chain that anchors at this CA bundle.
90#[derive(Debug, Clone)]
91pub struct GrpcTlsOptions {
92    /// PEM bytes for the server certificate chain (leaf first).
93    pub cert_pem: Vec<u8>,
94    /// PEM bytes for the server private key (PKCS#8 / SEC1 / RSA).
95    pub key_pem: Vec<u8>,
96    /// Optional PEM bytes for the trust anchor used to verify
97    /// client certificates. When `Some(_)`, the server requires
98    /// every client to present a cert that chains to this CA;
99    /// when `None`, the server runs one-way TLS only.
100    pub client_ca_pem: Option<Vec<u8>>,
101}
102
103impl GrpcTlsOptions {
104    /// Build a `tonic` `ServerTlsConfig` from PEM bytes, applying
105    /// rustls defaults (TLS 1.2 + 1.3 — older versions are not
106    /// negotiable on tokio-rustls 0.26).
107    pub fn to_tonic_config(
108        &self,
109    ) -> Result<tonic::transport::ServerTlsConfig, Box<dyn std::error::Error>> {
110        let identity = tonic::transport::Identity::from_pem(&self.cert_pem, &self.key_pem);
111        let mut cfg = tonic::transport::ServerTlsConfig::new().identity(identity);
112        if let Some(ca_pem) = &self.client_ca_pem {
113            cfg = cfg.client_ca_root(tonic::transport::Certificate::from_pem(ca_pem));
114        }
115        Ok(cfg)
116    }
117}
118
119impl Default for GrpcServerOptions {
120    fn default() -> Self {
121        Self {
122            bind_addr: "127.0.0.1:5555".to_string(),
123            tls: None,
124        }
125    }
126}
127
128#[derive(Clone)]
129pub struct RedDBGrpcServer {
130    runtime: RedDBRuntime,
131    options: GrpcServerOptions,
132    auth_store: Arc<AuthStore>,
133    /// Optional OAuth/OIDC JWT validator. When set, the gRPC
134    /// interceptor validates JWT-shaped bearers against the issuer's
135    /// JWKS *before* attempting `AuthStore` session/api-key lookups.
136    /// Build externally via `crate::auth::OAuthValidator::with_verifier`
137    /// and attach with [`Self::with_oauth_validator`].
138    oauth_validator: Option<Arc<crate::auth::OAuthValidator>>,
139}
140
141impl RedDBGrpcServer {
142    pub fn new(runtime: RedDBRuntime) -> Self {
143        let auth_config = crate::auth::AuthConfig::default();
144        let auth_store = Arc::new(AuthStore::new(auth_config));
145        Self::with_options(runtime, GrpcServerOptions::default(), auth_store)
146    }
147
148    pub fn from_database_options(
149        db_options: RedDBOptions,
150        options: GrpcServerOptions,
151    ) -> RedDBResult<Self> {
152        // Create runtime first so we can access the pager for vault pages.
153        let runtime = RedDBRuntime::with_options(db_options.clone())?;
154
155        let auth_store = if db_options.auth.vault_enabled {
156            // The vault stores its encrypted state in reserved pages inside
157            // the main .rdb file.  Extract the pager reference from the
158            // runtime's underlying store.
159            let pager = runtime.db().store().pager().cloned().ok_or_else(|| {
160                crate::api::RedDBError::Internal(
161                    "vault requires a paged database (persistent mode)".into(),
162                )
163            })?;
164            let store = AuthStore::with_vault(db_options.auth.clone(), pager, None)
165                .map_err(|e| crate::api::RedDBError::Internal(e.to_string()))?;
166            Arc::new(store)
167        } else {
168            Arc::new(AuthStore::new(db_options.auth.clone()))
169        };
170        auth_store.bootstrap_from_env();
171        Ok(Self::with_options(runtime, options, auth_store))
172    }
173
174    pub fn with_options(
175        runtime: RedDBRuntime,
176        options: GrpcServerOptions,
177        auth_store: Arc<AuthStore>,
178    ) -> Self {
179        // Inject the auth store into the runtime so that Value::Secret
180        // auto-encrypt/decrypt can read the vault AES key.
181        runtime.set_auth_store(Arc::clone(&auth_store));
182        Self {
183            runtime,
184            options,
185            auth_store,
186            oauth_validator: None,
187        }
188    }
189
190    /// Attach an externally-constructed OAuth/OIDC JWT validator. Once
191    /// set, JWT-shaped bearer tokens (3-segment) on the
192    /// `authorization` metadata are validated against the issuer's
193    /// JWKS, expiry, audience, etc. Non-JWT bearers fall back to the
194    /// `AuthStore` session/API-key path.
195    pub fn with_oauth_validator(mut self, validator: Arc<crate::auth::OAuthValidator>) -> Self {
196        self.oauth_validator = Some(validator);
197        self
198    }
199
200    /// Inspect the active OAuth validator, when one is configured.
201    pub fn oauth_validator(&self) -> Option<&Arc<crate::auth::OAuthValidator>> {
202        self.oauth_validator.as_ref()
203    }
204
205    pub fn runtime(&self) -> &RedDBRuntime {
206        &self.runtime
207    }
208
209    pub fn options(&self) -> &GrpcServerOptions {
210        &self.options
211    }
212
213    pub fn auth_store(&self) -> &Arc<AuthStore> {
214        &self.auth_store
215    }
216
217    fn grpc_runtime(&self) -> GrpcRuntime {
218        GrpcRuntime {
219            runtime: self.runtime.clone(),
220            auth_store: self.auth_store.clone(),
221            prepared_registry: PreparedStatementRegistry::new(),
222            oauth_validator: self.oauth_validator.clone(),
223        }
224    }
225
226    pub async fn serve(&self) -> Result<(), Box<dyn std::error::Error>> {
227        let addr = self.options.bind_addr.parse()?;
228        let mut builder = tonic::transport::Server::builder();
229        if let Some(tls) = &self.options.tls {
230            // Constant-time SHA256 fingerprint logged for ops triage —
231            // never the bytes of cert/key themselves.
232            log_grpc_tls_identity(tls);
233            builder = builder.tls_config(tls.to_tonic_config()?)?;
234        }
235        builder
236            .add_service(Self::configured_service(self.grpc_runtime()))
237            .serve(addr)
238            .await?;
239        Ok(())
240    }
241
242    pub async fn serve_on(
243        &self,
244        listener: std::net::TcpListener,
245    ) -> Result<(), Box<dyn std::error::Error>> {
246        listener.set_nonblocking(true)?;
247        let listener = tokio::net::TcpListener::from_std(listener)?;
248        let incoming = TcpListenerStream::new(listener);
249        let mut builder = tonic::transport::Server::builder();
250        if let Some(tls) = &self.options.tls {
251            log_grpc_tls_identity(tls);
252            builder = builder.tls_config(tls.to_tonic_config()?)?;
253        }
254        builder
255            .add_service(Self::configured_service(self.grpc_runtime()))
256            .serve_with_incoming(incoming)
257            .await?;
258        Ok(())
259    }
260
261    fn configured_service(runtime: GrpcRuntime) -> RedDbServer<GrpcRuntime> {
262        // Advertise zstd + gzip so clients can opt in. Server compresses
263        // outbound replies with zstd; sticking to a single send codec keeps
264        // CPU predictable while still accepting either on inbound.
265        use tonic::codec::CompressionEncoding;
266        RedDbServer::new(runtime)
267            .max_decoding_message_size(256 * 1024 * 1024)
268            .max_encoding_message_size(256 * 1024 * 1024)
269            .accept_compressed(CompressionEncoding::Zstd)
270            .accept_compressed(CompressionEncoding::Gzip)
271            .send_compressed(CompressionEncoding::Zstd)
272    }
273}
274
275/// Server-side prepared statement — parsed + parameterized once, executed N times.
276struct GrpcPreparedStatement {
277    shape: std::sync::Arc<crate::storage::query::ast::QueryExpr>,
278    parameter_count: usize,
279    created_at: std::time::Instant,
280}
281
282/// Registry of prepared statements for one server instance.
283/// Session-independent: any connection can execute any prepared statement by ID.
284struct PreparedStatementRegistry {
285    // parking_lot::RwLock never poisons on panic — safe to use without unwrap().
286    map: parking_lot::RwLock<std::collections::HashMap<u64, GrpcPreparedStatement>>,
287    next_id: std::sync::atomic::AtomicU64,
288    get_count: std::sync::atomic::AtomicU64,
289}
290
291impl PreparedStatementRegistry {
292    fn new() -> Arc<Self> {
293        Arc::new(Self {
294            map: parking_lot::RwLock::new(std::collections::HashMap::new()),
295            next_id: std::sync::atomic::AtomicU64::new(1),
296            get_count: std::sync::atomic::AtomicU64::new(0),
297        })
298    }
299
300    fn prepare(&self, shape: crate::storage::query::ast::QueryExpr, parameter_count: usize) -> u64 {
301        use std::sync::atomic::Ordering;
302        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
303        let mut map = self.map.write();
304        self.evict_old_locked(&mut map);
305        map.insert(
306            id,
307            GrpcPreparedStatement {
308                // Store as Arc to avoid cloning the full AST on every execute.
309                shape: std::sync::Arc::new(shape),
310                parameter_count,
311                created_at: std::time::Instant::now(),
312            },
313        );
314        id
315    }
316
317    fn get_shape_and_count(
318        &self,
319        id: u64,
320    ) -> Option<(std::sync::Arc<crate::storage::query::ast::QueryExpr>, usize)> {
321        // Periodic eviction on execute/get traffic so long-lived servers that
322        // prepare once and execute many times still age out stale statements.
323        let get_count = self
324            .get_count
325            .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
326            + 1;
327        if get_count.is_multiple_of(256) {
328            let mut map = self.map.write();
329            self.evict_old_locked(&mut map);
330        }
331        let map = self.map.read();
332        map.get(&id)
333            .map(|s| (std::sync::Arc::clone(&s.shape), s.parameter_count))
334    }
335
336    fn evict_old_locked(&self, map: &mut std::collections::HashMap<u64, GrpcPreparedStatement>) {
337        let threshold = std::time::Duration::from_secs(3600);
338        map.retain(|_, v| v.created_at.elapsed() < threshold);
339    }
340}
341
342#[derive(Clone)]
343struct GrpcRuntime {
344    runtime: RedDBRuntime,
345    auth_store: Arc<AuthStore>,
346    prepared_registry: Arc<PreparedStatementRegistry>,
347    /// OAuth/OIDC JWT validator built once from `auth_store.config().oauth`
348    /// when the operator enables OAuth. `None` means JWT bearers fall
349    /// back to the AuthStore lookup path.
350    oauth_validator: Option<Arc<crate::auth::OAuthValidator>>,
351}
352
353impl GrpcRuntime {
354    fn admin_use_cases(&self) -> AdminUseCases<'_, RedDBRuntime> {
355        AdminUseCases::new(&self.runtime)
356    }
357
358    fn catalog_use_cases(&self) -> CatalogUseCases<'_, RedDBRuntime> {
359        CatalogUseCases::new(&self.runtime)
360    }
361
362    fn query_use_cases(&self) -> QueryUseCases<'_, RedDBRuntime> {
363        QueryUseCases::new(&self.runtime)
364    }
365
366    fn entity_use_cases(&self) -> EntityUseCases<'_, RedDBRuntime> {
367        EntityUseCases::new(&self.runtime)
368    }
369
370    fn graph_use_cases(&self) -> GraphUseCases<'_, RedDBRuntime> {
371        GraphUseCases::new(&self.runtime)
372    }
373
374    fn native_use_cases(&self) -> NativeUseCases<'_, RedDBRuntime> {
375        NativeUseCases::new(&self.runtime)
376    }
377}
378
379fn grpc_query_value_to_schema_value(value: QueryValue) -> Result<Value, Status> {
380    use proto::query_value::Kind;
381
382    match value
383        .kind
384        .ok_or_else(|| Status::invalid_argument("missing query param value"))?
385    {
386        Kind::NullValue(_) => Ok(Value::Null),
387        Kind::BoolValue(value) => Ok(Value::Boolean(value)),
388        Kind::IntValue(value) => Ok(Value::Integer(value)),
389        Kind::FloatValue(value) => Ok(Value::Float(value)),
390        Kind::TextValue(value) => Ok(Value::Text(std::sync::Arc::from(value))),
391        Kind::BytesValue(value) => Ok(Value::Blob(value)),
392        Kind::VectorValue(value) => Ok(Value::Vector(value.values)),
393        Kind::JsonValue(value) => {
394            let parsed = json_from_str::<JsonValue>(&value)
395                .map_err(|e| Status::invalid_argument(format!("json param parse error: {e}")))?;
396            let encoded = json_to_string(&parsed)
397                .map_err(|e| Status::invalid_argument(format!("json param encode error: {e}")))?;
398            Ok(Value::Json(encoded.into_bytes()))
399        }
400        Kind::TimestampValue(value) => Ok(Value::Timestamp(value)),
401        Kind::UuidValue(value) => {
402            let bytes: [u8; 16] = value.try_into().map_err(|value: Vec<u8>| {
403                Status::invalid_argument(format!(
404                    "uuid param must be 16 bytes, got {}",
405                    value.len()
406                ))
407            })?;
408            Ok(Value::Uuid(bytes))
409        }
410    }
411}
412
413fn execute_grpc_query_with_optional_params(
414    runtime: &RedDBRuntime,
415    query: String,
416    params: Vec<QueryValue>,
417) -> Result<RuntimeQueryResult, Status> {
418    if params.is_empty() {
419        return runtime.execute_query(&query).map_err(to_status);
420    }
421
422    let binds = params
423        .into_iter()
424        .map(grpc_query_value_to_schema_value)
425        .collect::<Result<Vec<_>, _>>()?;
426    let parsed = crate::storage::query::modes::parse_multi(&query)
427        .map_err(|e| Status::invalid_argument(format!("parse error: {e}")))?;
428    let bound = crate::storage::query::user_params::bind(&parsed, &binds)
429        .map_err(|e| Status::invalid_argument(format!("bind error: {e}")))?;
430    runtime.execute_query_expr(bound).map_err(to_status)
431}
432
433#[cfg(test)]
434mod grpc_query_value_tests {
435    use super::*;
436    use proto::query_value::Kind;
437
438    #[test]
439    fn grpc_query_value_maps_to_schema_value_variants() {
440        let cases = vec![
441            (
442                QueryValue {
443                    kind: Some(Kind::NullValue(proto::QueryNull {})),
444                },
445                Value::Null,
446            ),
447            (
448                QueryValue {
449                    kind: Some(Kind::BoolValue(true)),
450                },
451                Value::Boolean(true),
452            ),
453            (
454                QueryValue {
455                    kind: Some(Kind::IntValue(42)),
456                },
457                Value::Integer(42),
458            ),
459            (
460                QueryValue {
461                    kind: Some(Kind::FloatValue(1.5)),
462                },
463                Value::Float(1.5),
464            ),
465            (
466                QueryValue {
467                    kind: Some(Kind::BytesValue(vec![0, 1, 2])),
468                },
469                Value::Blob(vec![0, 1, 2]),
470            ),
471            (
472                QueryValue {
473                    kind: Some(Kind::VectorValue(proto::QueryVector {
474                        values: vec![0.25, 0.5],
475                    })),
476                },
477                Value::Vector(vec![0.25, 0.5]),
478            ),
479            (
480                QueryValue {
481                    kind: Some(Kind::TimestampValue(1_779_999_000)),
482                },
483                Value::Timestamp(1_779_999_000),
484            ),
485            (
486                QueryValue {
487                    kind: Some(Kind::UuidValue(vec![0x11; 16])),
488                },
489                Value::Uuid([0x11; 16]),
490            ),
491        ];
492
493        for (input, expected) in cases {
494            assert_eq!(grpc_query_value_to_schema_value(input).unwrap(), expected);
495        }
496
497        assert_eq!(
498            grpc_query_value_to_schema_value(QueryValue {
499                kind: Some(Kind::TextValue("alice".into())),
500            })
501            .unwrap(),
502            Value::Text(std::sync::Arc::from("alice"))
503        );
504        assert_eq!(
505            grpc_query_value_to_schema_value(QueryValue {
506                kind: Some(Kind::JsonValue("{\"role\":\"admin\"}".into())),
507            })
508            .unwrap(),
509            Value::Json(b"{\"role\":\"admin\"}".to_vec())
510        );
511    }
512
513    #[test]
514    fn grpc_query_value_rejects_missing_kind_and_bad_uuid() {
515        assert!(grpc_query_value_to_schema_value(QueryValue { kind: None }).is_err());
516        assert!(grpc_query_value_to_schema_value(QueryValue {
517            kind: Some(Kind::UuidValue(vec![0; 15])),
518        })
519        .is_err());
520    }
521}
522
523#[cfg(test)]
524mod grpc_ask_query_reply_tests {
525    use super::*;
526    use crate::storage::query::modes::QueryMode;
527    use crate::storage::query::unified::{UnifiedRecord, UnifiedResult};
528    use crate::storage::schema::Value as SchemaValue;
529
530    fn ask_runtime_result() -> RuntimeQueryResult {
531        let mut result = UnifiedResult::with_columns(vec![
532            "answer".into(),
533            "provider".into(),
534            "model".into(),
535            "mode".into(),
536            "retry_count".into(),
537            "prompt_tokens".into(),
538            "completion_tokens".into(),
539            "sources_flat".into(),
540            "citations".into(),
541            "validation".into(),
542        ]);
543        let mut record = UnifiedRecord::new();
544        record.set("answer", SchemaValue::text("Deploy failed [^1]."));
545        record.set("provider", SchemaValue::text("openai"));
546        record.set("model", SchemaValue::text("gpt-4o-mini"));
547        record.set("mode", SchemaValue::text("strict"));
548        record.set("retry_count", SchemaValue::Integer(0));
549        record.set("prompt_tokens", SchemaValue::Integer(11));
550        record.set("completion_tokens", SchemaValue::Integer(7));
551        record.set(
552            "sources_flat",
553            SchemaValue::Json(
554                br#"[{"urn":"urn:reddb:row:deployments:1","kind":"row","collection":"deployments","id":"1"}]"#.to_vec(),
555            ),
556        );
557        record.set(
558            "citations",
559            SchemaValue::Json(br#"[{"marker":1,"urn":"urn:reddb:row:deployments:1"}]"#.to_vec()),
560        );
561        record.set(
562            "validation",
563            SchemaValue::Json(br#"{"ok":true,"warnings":[],"errors":[]}"#.to_vec()),
564        );
565        result.push(record);
566
567        RuntimeQueryResult {
568            query: "ASK 'why did deploy fail?'".to_string(),
569            mode: QueryMode::Sql,
570            statement: "ask",
571            engine: "runtime-ai",
572            result,
573            affected_rows: 0,
574            statement_type: "select",
575        }
576    }
577
578    #[test]
579    fn query_reply_ask_result_json_uses_full_canonical_schema() {
580        let reply = query_reply(ask_runtime_result(), &None, &None);
581        let json: crate::json::Value =
582            crate::json::from_str(&reply.result_json).expect("valid ask json");
583
584        assert_eq!(
585            json.get("answer").and_then(crate::json::Value::as_str),
586            Some("Deploy failed [^1].")
587        );
588        assert_eq!(
589            json.get("cache_hit").and_then(crate::json::Value::as_bool),
590            Some(false)
591        );
592        assert_eq!(
593            json.get("cost_usd").and_then(crate::json::Value::as_f64),
594            Some(0.0)
595        );
596        assert_eq!(
597            json.get("mode").and_then(crate::json::Value::as_str),
598            Some("strict")
599        );
600        assert_eq!(
601            json.get("retry_count").and_then(crate::json::Value::as_u64),
602            Some(0)
603        );
604        assert!(
605            json.get("records").is_none(),
606            "ASK must not be row-wrapped: {}",
607            reply.result_json
608        );
609        assert!(
610            json.get("sources_flat")
611                .and_then(crate::json::Value::as_array)
612                .is_some_and(|sources| sources.len() == 1
613                    && sources[0]
614                        .get("payload")
615                        .and_then(crate::json::Value::as_str)
616                        .is_some()),
617            "sources_flat must be parsed with payload fallback: {}",
618            reply.result_json
619        );
620        assert!(
621            json.get("citations")
622                .and_then(crate::json::Value::as_array)
623                .is_some_and(|citations| citations.len() == 1),
624            "citations must be parsed: {}",
625            reply.result_json
626        );
627        assert_eq!(
628            json.get("validation")
629                .and_then(|v| v.get("ok"))
630                .and_then(crate::json::Value::as_bool),
631            Some(true)
632        );
633    }
634
635    #[test]
636    fn query_reply_non_ask_answer_column_keeps_row_shape() {
637        let mut result = UnifiedResult::with_columns(vec!["answer".into()]);
638        let mut record = UnifiedRecord::new();
639        record.set("answer", SchemaValue::text("plain select"));
640        result.push(record);
641
642        let reply = query_reply(
643            RuntimeQueryResult {
644                query: "SELECT 'plain select' AS answer".to_string(),
645                mode: QueryMode::Sql,
646                statement: "select",
647                engine: "runtime-sql",
648                result,
649                affected_rows: 0,
650                statement_type: "select",
651            },
652            &None,
653            &None,
654        );
655        let json: crate::json::Value =
656            crate::json::from_str(&reply.result_json).expect("valid query json");
657
658        assert!(
659            json.get("records").is_some(),
660            "non-ASK must stay row-wrapped"
661        );
662        assert!(
663            json.get("answer").is_none(),
664            "non-ASK must not use ASK envelope"
665        );
666    }
667}
668
669/// Emit a single info-level event with the SHA-256 fingerprint of the
670/// active gRPC server cert + an mTLS flag. Never logs PEM bytes.
671fn log_grpc_tls_identity(tls: &GrpcTlsOptions) {
672    use sha2::{Digest, Sha256};
673    let cert_fp = {
674        let mut h = Sha256::new();
675        h.update(&tls.cert_pem);
676        let digest = h.finalize();
677        // First 16 hex chars are enough for human cross-check; the full
678        // SHA-256 lives in audit logs only.
679        let mut buf = String::with_capacity(64);
680        for b in digest.iter() {
681            buf.push_str(&format!("{b:02x}"));
682        }
683        buf
684    };
685    tracing::info!(
686        target: "reddb::security",
687        transport = "grpc",
688        cert_sha256 = %cert_fp,
689        mtls = tls.client_ca_pem.is_some(),
690        "gRPC TLS identity loaded"
691    );
692}
693
694include!("grpc/service_impl.rs");