1pub(crate) use crate::application::json_input::{
2 json_bool_field, json_f32_field, json_string_field, json_usize_field,
3};
4pub(crate) use crate::application::{
5 AdminUseCases, CatalogUseCases, CreateEdgeInput, CreateEntityOutput, CreateNodeGraphLinkInput,
6 CreateNodeInput, CreateNodeTableLinkInput, CreateRowInput, CreateVectorInput,
7 DeleteEntityInput, EntityUseCases, ExecuteQueryInput, ExplainQueryInput, GraphCentralityInput,
8 GraphClusteringInput, GraphCommunitiesInput, GraphComponentsInput, GraphCyclesInput,
9 GraphHitsInput, GraphNeighborhoodInput, GraphPersonalizedPageRankInput, GraphShortestPathInput,
10 GraphTopologicalSortInput, GraphTraversalInput, GraphUseCases, InspectNativeArtifactInput,
11 NativeUseCases, PatchEntityInput, QueryUseCases, SearchHybridInput, SearchIvfInput,
12 SearchSimilarInput, SearchTextInput,
13};
14use std::collections::BTreeMap;
15use std::pin::Pin;
16use std::sync::Arc;
17use std::time::{SystemTime, UNIX_EPOCH};
18
19use crate::api::{RedDBOptions, RedDBResult};
20use crate::auth::middleware::{check_permission, AuthResult, AuthSource};
21use crate::auth::store::AuthStore;
22use crate::auth::Role;
23use crate::health::{HealthProvider, HealthState};
24use crate::json::{
25 from_str as json_from_str, to_string as json_to_string, Map, Value as JsonValue,
26};
27use crate::runtime::{
28 RedDBRuntime, RuntimeFilter, RuntimeFilterValue, RuntimeGraphCentralityAlgorithm,
29 RuntimeGraphCentralityResult, RuntimeGraphClusteringResult, RuntimeGraphCommunityAlgorithm,
30 RuntimeGraphCommunityResult, RuntimeGraphComponentsMode, RuntimeGraphComponentsResult,
31 RuntimeGraphCyclesResult, RuntimeGraphDirection, RuntimeGraphHitsResult,
32 RuntimeGraphNeighborhoodResult, RuntimeGraphPathAlgorithm, RuntimeGraphPathResult,
33 RuntimeGraphPattern, RuntimeGraphProjection, RuntimeGraphTopologicalSortResult,
34 RuntimeGraphTraversalResult, RuntimeGraphTraversalStrategy, RuntimeIvfSearchResult,
35 RuntimeQueryResult, RuntimeQueryWeights, RuntimeStats, ScanPage,
36};
37use crate::storage::schema::Value;
38use crate::storage::unified::devx::refs::{NodeRef, TableRef};
39use crate::storage::unified::{Metadata, MetadataValue};
40use crate::storage::{EntityData, EntityId, UnifiedEntity};
41use tokio_stream::wrappers::TcpListenerStream;
42use tonic::metadata::MetadataMap;
43use tonic::{Request, Response, Status};
44
45pub use reddb_grpc_proto as proto;
51
52use proto::red_db_server::{RedDb, RedDbServer};
53use proto::{
54 ask_stream_event, AskAnswerToken, AskReply, AskRequest, AskSources, AskStreamEvent,
55 BatchInsertChunk, BatchInsertReply, BatchQueryReply, BatchQueryRequest, BulkEntityReply,
56 Citation, CollectionRequest, CollectionsReply, DeleteEntityRequest, DeploymentProfileRequest,
57 Empty, EntityReply, ExecutePreparedRequest, ExportRequest, GraphProjectionUpsertRequest,
58 HealthReply, IndexNameRequest, IndexToggleRequest, JsonBulkCreateRequest, JsonCreateRequest,
59 JsonPayloadRequest, KvWatchEvent, KvWatchRequest, ManifestRequest, OperationReply,
60 PayloadReply, PrepareQueryReply, PrepareQueryRequest, QueryReply, QueryRequest, QueryValue,
61 ScanEntity, ScanReply, ScanRequest, StatsReply, TopologyReply, TopologyRequest,
62 UpdateEntityRequest, Validation, ValidationItem,
63};
64
65mod control_support;
66mod entity_ops;
67mod input_support;
68pub(crate) mod scan_json;
69
70use self::control_support::*;
71use self::entity_ops::*;
72use self::input_support::*;
73use self::scan_json::*;
74
75#[derive(Debug, Clone)]
76pub struct GrpcServerOptions {
77 pub bind_addr: String,
78 pub tls: Option<GrpcTlsOptions>,
83}
84
85#[derive(Debug, Clone)]
91pub struct GrpcTlsOptions {
92 pub cert_pem: Vec<u8>,
94 pub key_pem: Vec<u8>,
96 pub client_ca_pem: Option<Vec<u8>>,
101}
102
103impl GrpcTlsOptions {
104 pub fn to_tonic_config(
108 &self,
109 ) -> Result<tonic::transport::ServerTlsConfig, Box<dyn std::error::Error>> {
110 let identity = tonic::transport::Identity::from_pem(&self.cert_pem, &self.key_pem);
111 let mut cfg = tonic::transport::ServerTlsConfig::new().identity(identity);
112 if let Some(ca_pem) = &self.client_ca_pem {
113 cfg = cfg.client_ca_root(tonic::transport::Certificate::from_pem(ca_pem));
114 }
115 Ok(cfg)
116 }
117}
118
119impl Default for GrpcServerOptions {
120 fn default() -> Self {
121 Self {
122 bind_addr: "127.0.0.1:55055".to_string(),
123 tls: None,
124 }
125 }
126}
127
128#[derive(Clone)]
129pub struct RedDBGrpcServer {
130 runtime: RedDBRuntime,
131 options: GrpcServerOptions,
132 auth_store: Arc<AuthStore>,
133 oauth_validator: Option<Arc<crate::auth::OAuthValidator>>,
139}
140
141impl RedDBGrpcServer {
142 pub fn new(runtime: RedDBRuntime) -> Self {
143 let auth_config = crate::auth::AuthConfig::default();
144 let auth_store = Arc::new(AuthStore::new(auth_config));
145 Self::with_options(runtime, GrpcServerOptions::default(), auth_store)
146 }
147
148 pub fn from_database_options(
149 db_options: RedDBOptions,
150 options: GrpcServerOptions,
151 ) -> RedDBResult<Self> {
152 let runtime = RedDBRuntime::with_options(db_options.clone())?;
154
155 let auth_store = if db_options.auth.vault_enabled {
156 let pager = runtime.db().store().pager().cloned().ok_or_else(|| {
160 crate::api::RedDBError::Internal(
161 "vault requires a paged database (persistent mode)".into(),
162 )
163 })?;
164 let store = AuthStore::with_vault(db_options.auth.clone(), pager)
165 .map_err(|e| crate::api::RedDBError::Internal(e.to_string()))?;
166 Arc::new(store)
167 } else {
168 Arc::new(AuthStore::new(db_options.auth.clone()))
169 };
170 auth_store.bootstrap_from_env();
171 Ok(Self::with_options(runtime, options, auth_store))
172 }
173
174 pub fn with_options(
175 runtime: RedDBRuntime,
176 options: GrpcServerOptions,
177 auth_store: Arc<AuthStore>,
178 ) -> Self {
179 runtime.set_auth_store(Arc::clone(&auth_store));
182 Self {
183 runtime,
184 options,
185 auth_store,
186 oauth_validator: None,
187 }
188 }
189
190 pub fn with_oauth_validator(mut self, validator: Arc<crate::auth::OAuthValidator>) -> Self {
196 self.oauth_validator = Some(validator);
197 self
198 }
199
200 pub fn oauth_validator(&self) -> Option<&Arc<crate::auth::OAuthValidator>> {
202 self.oauth_validator.as_ref()
203 }
204
205 pub fn runtime(&self) -> &RedDBRuntime {
206 &self.runtime
207 }
208
209 pub fn options(&self) -> &GrpcServerOptions {
210 &self.options
211 }
212
213 pub fn auth_store(&self) -> &Arc<AuthStore> {
214 &self.auth_store
215 }
216
217 fn grpc_runtime(&self) -> GrpcRuntime {
218 GrpcRuntime {
219 runtime: self.runtime.clone(),
220 auth_store: self.auth_store.clone(),
221 prepared_registry: PreparedStatementRegistry::new(),
222 oauth_validator: self.oauth_validator.clone(),
223 }
224 }
225
226 pub async fn serve(&self) -> Result<(), Box<dyn std::error::Error>> {
227 let addr = self.options.bind_addr.parse()?;
228 let mut builder = tonic::transport::Server::builder();
229 if let Some(tls) = &self.options.tls {
230 log_grpc_tls_identity(tls);
233 builder = builder.tls_config(tls.to_tonic_config()?)?;
234 }
235 builder
236 .add_service(Self::configured_service(self.grpc_runtime()))
237 .serve(addr)
238 .await?;
239 Ok(())
240 }
241
242 pub async fn serve_on(
243 &self,
244 listener: std::net::TcpListener,
245 ) -> Result<(), Box<dyn std::error::Error>> {
246 listener.set_nonblocking(true)?;
247 let listener = tokio::net::TcpListener::from_std(listener)?;
248 let incoming = TcpListenerStream::new(listener);
249 let mut builder = tonic::transport::Server::builder();
250 if let Some(tls) = &self.options.tls {
251 log_grpc_tls_identity(tls);
252 builder = builder.tls_config(tls.to_tonic_config()?)?;
253 }
254 builder
255 .add_service(Self::configured_service(self.grpc_runtime()))
256 .serve_with_incoming(incoming)
257 .await?;
258 Ok(())
259 }
260
261 pub(crate) async fn serve_router_demux(
268 &self,
269 rx: tokio::sync::mpsc::Receiver<tokio::net::TcpStream>,
270 ) -> Result<(), Box<dyn std::error::Error>> {
271 use tokio_stream::StreamExt;
272 let incoming = tokio_stream::wrappers::ReceiverStream::new(rx).map(Ok::<_, std::io::Error>);
273 let mut builder = tonic::transport::Server::builder();
274 if let Some(tls) = &self.options.tls {
275 log_grpc_tls_identity(tls);
276 builder = builder.tls_config(tls.to_tonic_config()?)?;
277 }
278 builder
279 .add_service(Self::configured_service(self.grpc_runtime()))
280 .serve_with_incoming(incoming)
281 .await?;
282 Ok(())
283 }
284
285 fn configured_service(runtime: GrpcRuntime) -> RedDbServer<GrpcRuntime> {
286 use tonic::codec::CompressionEncoding;
290 RedDbServer::new(runtime)
291 .max_decoding_message_size(256 * 1024 * 1024)
292 .max_encoding_message_size(256 * 1024 * 1024)
293 .accept_compressed(CompressionEncoding::Zstd)
294 .accept_compressed(CompressionEncoding::Gzip)
295 .send_compressed(CompressionEncoding::Zstd)
296 }
297}
298
299struct GrpcPreparedStatement {
301 shape: std::sync::Arc<crate::storage::query::ast::QueryExpr>,
302 parameter_count: usize,
303 created_at: std::time::Instant,
304}
305
306struct PreparedStatementRegistry {
309 map: parking_lot::RwLock<std::collections::HashMap<u64, GrpcPreparedStatement>>,
311 next_id: std::sync::atomic::AtomicU64,
312 get_count: std::sync::atomic::AtomicU64,
313}
314
315impl PreparedStatementRegistry {
316 fn new() -> Arc<Self> {
317 Arc::new(Self {
318 map: parking_lot::RwLock::new(std::collections::HashMap::new()),
319 next_id: std::sync::atomic::AtomicU64::new(1),
320 get_count: std::sync::atomic::AtomicU64::new(0),
321 })
322 }
323
324 fn prepare(&self, shape: crate::storage::query::ast::QueryExpr, parameter_count: usize) -> u64 {
325 use std::sync::atomic::Ordering;
326 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
327 let mut map = self.map.write();
328 self.evict_old_locked(&mut map);
329 map.insert(
330 id,
331 GrpcPreparedStatement {
332 shape: std::sync::Arc::new(shape),
334 parameter_count,
335 created_at: std::time::Instant::now(),
336 },
337 );
338 id
339 }
340
341 fn get_shape_and_count(
342 &self,
343 id: u64,
344 ) -> Option<(std::sync::Arc<crate::storage::query::ast::QueryExpr>, usize)> {
345 let get_count = self
348 .get_count
349 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
350 + 1;
351 if get_count.is_multiple_of(256) {
352 let mut map = self.map.write();
353 self.evict_old_locked(&mut map);
354 }
355 let map = self.map.read();
356 map.get(&id)
357 .map(|s| (std::sync::Arc::clone(&s.shape), s.parameter_count))
358 }
359
360 fn evict_old_locked(&self, map: &mut std::collections::HashMap<u64, GrpcPreparedStatement>) {
361 let threshold = std::time::Duration::from_secs(3600);
362 map.retain(|_, v| v.created_at.elapsed() < threshold);
363 }
364}
365
366#[derive(Clone)]
367struct GrpcRuntime {
368 runtime: RedDBRuntime,
369 auth_store: Arc<AuthStore>,
370 prepared_registry: Arc<PreparedStatementRegistry>,
371 oauth_validator: Option<Arc<crate::auth::OAuthValidator>>,
375}
376
377impl GrpcRuntime {
378 fn admin_use_cases(&self) -> AdminUseCases<'_, RedDBRuntime> {
379 AdminUseCases::new(&self.runtime)
380 }
381
382 fn catalog_use_cases(&self) -> CatalogUseCases<'_, RedDBRuntime> {
383 CatalogUseCases::new(&self.runtime)
384 }
385
386 fn query_use_cases(&self) -> QueryUseCases<'_, RedDBRuntime> {
387 QueryUseCases::new(&self.runtime)
388 }
389
390 fn entity_use_cases(&self) -> EntityUseCases<'_, RedDBRuntime> {
391 EntityUseCases::new(&self.runtime)
392 }
393
394 fn graph_use_cases(&self) -> GraphUseCases<'_, RedDBRuntime> {
395 GraphUseCases::new(&self.runtime)
396 }
397
398 fn native_use_cases(&self) -> NativeUseCases<'_, RedDBRuntime> {
399 NativeUseCases::new(&self.runtime)
400 }
401}
402
403fn grpc_query_value_to_schema_value(value: QueryValue) -> Result<Value, Status> {
404 use proto::query_value::Kind;
405
406 match value
407 .kind
408 .ok_or_else(|| Status::invalid_argument("missing query param value"))?
409 {
410 Kind::NullValue(_) => Ok(Value::Null),
411 Kind::BoolValue(value) => Ok(Value::Boolean(value)),
412 Kind::IntValue(value) => Ok(Value::Integer(value)),
413 Kind::FloatValue(value) => Ok(Value::Float(value)),
414 Kind::TextValue(value) => Ok(Value::Text(std::sync::Arc::from(value))),
415 Kind::BytesValue(value) => Ok(Value::Blob(value)),
416 Kind::VectorValue(value) => Ok(Value::Vector(value.values)),
417 Kind::JsonValue(value) => {
418 let parsed = json_from_str::<JsonValue>(&value)
419 .map_err(|e| Status::invalid_argument(format!("json param parse error: {e}")))?;
420 let encoded = json_to_string(&parsed)
421 .map_err(|e| Status::invalid_argument(format!("json param encode error: {e}")))?;
422 Ok(Value::Json(encoded.into_bytes()))
423 }
424 Kind::TimestampValue(value) => Ok(Value::Timestamp(value)),
425 Kind::UuidValue(value) => {
426 let bytes: [u8; 16] = value.try_into().map_err(|value: Vec<u8>| {
427 Status::invalid_argument(format!(
428 "uuid param must be 16 bytes, got {}",
429 value.len()
430 ))
431 })?;
432 Ok(Value::Uuid(bytes))
433 }
434 }
435}
436
437fn execute_grpc_query_with_optional_params(
438 runtime: &RedDBRuntime,
439 query: String,
440 params: Vec<QueryValue>,
441) -> Result<RuntimeQueryResult, Status> {
442 if query.trim().is_empty() {
443 return Err(Status::invalid_argument("query field cannot be empty"));
444 }
445
446 if params.is_empty() {
447 let result = runtime.execute_query(&query).map_err(to_status)?;
448 enforce_grpc_commit_policy_after_query_result(runtime, &result)?;
449 return Ok(result);
450 }
451
452 let binds = params
453 .into_iter()
454 .map(grpc_query_value_to_schema_value)
455 .collect::<Result<Vec<_>, _>>()?;
456 let result = runtime
457 .execute_query_with_params(&query, &binds)
458 .map_err(to_status)?;
459 enforce_grpc_commit_policy_after_query_result(runtime, &result)?;
460 Ok(result)
461}
462
463fn enforce_grpc_commit_policy_after_query_result(
464 runtime: &RedDBRuntime,
465 result: &RuntimeQueryResult,
466) -> Result<(), Status> {
467 let is_mutation = matches!(result.statement_type, "insert" | "update" | "delete");
468 if !is_mutation {
469 return Ok(());
470 }
471 let post_lsn = runtime.cdc_current_lsn();
472 runtime
473 .enforce_commit_policy(post_lsn)
474 .map(|_| ())
475 .map_err(|err| Status::deadline_exceeded(err.to_string()))
476}
477
478#[cfg(test)]
479mod grpc_query_value_tests {
480 use super::*;
481 use proto::query_value::Kind;
482
483 #[test]
484 fn grpc_query_value_maps_to_schema_value_variants() {
485 let cases = vec![
486 (
487 QueryValue {
488 kind: Some(Kind::NullValue(proto::QueryNull {})),
489 },
490 Value::Null,
491 ),
492 (
493 QueryValue {
494 kind: Some(Kind::BoolValue(true)),
495 },
496 Value::Boolean(true),
497 ),
498 (
499 QueryValue {
500 kind: Some(Kind::IntValue(42)),
501 },
502 Value::Integer(42),
503 ),
504 (
505 QueryValue {
506 kind: Some(Kind::FloatValue(1.5)),
507 },
508 Value::Float(1.5),
509 ),
510 (
511 QueryValue {
512 kind: Some(Kind::BytesValue(vec![0, 1, 2])),
513 },
514 Value::Blob(vec![0, 1, 2]),
515 ),
516 (
517 QueryValue {
518 kind: Some(Kind::VectorValue(proto::QueryVector {
519 values: vec![0.25, 0.5],
520 })),
521 },
522 Value::Vector(vec![0.25, 0.5]),
523 ),
524 (
525 QueryValue {
526 kind: Some(Kind::TimestampValue(1_779_999_000)),
527 },
528 Value::Timestamp(1_779_999_000),
529 ),
530 (
531 QueryValue {
532 kind: Some(Kind::UuidValue(vec![0x11; 16])),
533 },
534 Value::Uuid([0x11; 16]),
535 ),
536 ];
537
538 for (input, expected) in cases {
539 assert_eq!(grpc_query_value_to_schema_value(input).unwrap(), expected);
540 }
541
542 assert_eq!(
543 grpc_query_value_to_schema_value(QueryValue {
544 kind: Some(Kind::TextValue("alice".into())),
545 })
546 .unwrap(),
547 Value::Text(std::sync::Arc::from("alice"))
548 );
549 assert_eq!(
550 grpc_query_value_to_schema_value(QueryValue {
551 kind: Some(Kind::JsonValue("{\"role\":\"admin\"}".into())),
552 })
553 .unwrap(),
554 Value::Json(b"{\"role\":\"admin\"}".to_vec())
555 );
556 }
557
558 #[test]
559 fn grpc_query_value_rejects_missing_kind_and_bad_uuid() {
560 assert!(grpc_query_value_to_schema_value(QueryValue { kind: None }).is_err());
561 assert!(grpc_query_value_to_schema_value(QueryValue {
562 kind: Some(Kind::UuidValue(vec![0; 15])),
563 })
564 .is_err());
565 }
566
567 #[test]
568 fn grpc_query_rejects_empty_query_before_runtime_parse() {
569 let runtime =
570 RedDBRuntime::with_options(crate::api::RedDBOptions::in_memory()).expect("runtime");
571 let err = execute_grpc_query_with_optional_params(&runtime, " ".to_string(), Vec::new())
572 .expect_err("empty query should fail");
573
574 assert_eq!(err.code(), tonic::Code::InvalidArgument);
575 assert_eq!(err.message(), "query field cannot be empty");
576 }
577
578 #[test]
579 fn grpc_query_params_are_bound_before_execution() {
580 let runtime =
581 RedDBRuntime::with_options(crate::api::RedDBOptions::in_memory()).expect("runtime");
582 seed_grpc_param_table(&runtime);
583
584 let result = execute_grpc_query_with_optional_params(
585 &runtime,
586 "SELECT id, name FROM p WHERE id = $1 AND name = $2".to_string(),
587 grpc_param_values(),
588 )
589 .expect("parameterized query");
590
591 assert_eq!(result.result.records.len(), 1);
592 }
593
594 #[test]
595 fn grpc_query_enforces_ack_n_commit_policy_fail_closed() {
596 let _env_lock = env_lock().lock().expect("env lock");
597 let _env = EnvGuard::set(&[
598 ("RED_PRIMARY_COMMIT_POLICY", "ack_n=1"),
599 ("RED_REPLICATION_ACK_TIMEOUT_MS", "20"),
600 ("RED_COMMIT_FAIL_ON_TIMEOUT", "true"),
601 ]);
602 let data_path = temp_data_path("grpc_ack_n_timeout");
603 cleanup(&data_path);
604
605 let runtime = RedDBRuntime::with_options(
606 crate::api::RedDBOptions::persistent(&data_path)
607 .with_replication(crate::replication::ReplicationConfig::primary()),
608 )
609 .expect("runtime");
610
611 let err = execute_grpc_query_with_optional_params(
612 &runtime,
613 "INSERT INTO grpc_ack_items (id, name) VALUES (1, 'alpha')".to_string(),
614 Vec::new(),
615 )
616 .expect_err("ack_n without replica ack must fail closed");
617
618 assert_eq!(err.code(), tonic::Code::DeadlineExceeded);
619 assert!(
620 err.message().contains("commit policy timed out")
621 && err.message().contains("RED_COMMIT_FAIL_ON_TIMEOUT"),
622 "error should identify commit policy timeout, got {err:?}"
623 );
624 assert!(
625 runtime.cdc_current_lsn() > 0,
626 "local mutation should advance CDC before gRPC response fails"
627 );
628
629 cleanup(&data_path);
630 }
631
632 #[tokio::test]
633 async fn grpc_query_rpc_binds_query_request_params() {
634 let runtime =
635 RedDBRuntime::with_options(crate::api::RedDBOptions::in_memory()).expect("runtime");
636 seed_grpc_param_table(&runtime);
637 let service = GrpcRuntime {
638 runtime,
639 auth_store: Arc::new(AuthStore::new(crate::auth::AuthConfig::default())),
640 prepared_registry: PreparedStatementRegistry::new(),
641 oauth_validator: None,
642 };
643
644 let reply = RedDb::query(
645 &service,
646 Request::new(QueryRequest {
647 query: "SELECT id, name FROM p WHERE id = $1 AND name = $2".to_string(),
648 entity_types: Vec::new(),
649 capabilities: Vec::new(),
650 params: grpc_param_values(),
651 }),
652 )
653 .await
654 .expect("query rpc")
655 .into_inner();
656
657 assert_eq!(reply.record_count, 1);
658 assert!(reply.result_json.contains("Alice"), "{}", reply.result_json);
659 assert!(!reply.result_json.contains("Bob"), "{}", reply.result_json);
660 }
661
662 #[tokio::test]
663 async fn pull_wal_records_rejects_stale_term_on_grpc_path() {
664 let runtime = RedDBRuntime::with_options(
665 crate::api::RedDBOptions::in_memory()
666 .with_replication(crate::replication::ReplicationConfig::primary().with_term(6)),
667 )
668 .expect("runtime");
669 let auth_store = Arc::new(AuthStore::new(crate::auth::AuthConfig {
670 enabled: true,
671 require_auth: true,
672 ..crate::auth::AuthConfig::default()
673 }));
674 let bootstrap = auth_store
675 .bootstrap("replica", "secret")
676 .expect("bootstrap");
677 let policy = crate::auth::policies::Policy::from_json_str(
678 r#"{
679 "id": "replication-stream",
680 "version": 1,
681 "statements": [{
682 "effect": "allow",
683 "actions": ["cluster:replication:stream"],
684 "resources": ["cluster:replication"]
685 }]
686 }"#,
687 )
688 .expect("policy");
689 auth_store.put_policy(policy).expect("install policy");
690 auth_store
691 .attach_policy(
692 crate::auth::store::PrincipalRef::User(crate::auth::UserId::platform("replica")),
693 "replication-stream",
694 )
695 .expect("attach policy");
696 let service = GrpcRuntime {
697 runtime,
698 auth_store,
699 prepared_registry: PreparedStatementRegistry::new(),
700 oauth_validator: None,
701 };
702
703 let open = reddb_wire::replication::WalStreamOpen {
704 since_lsn: 0,
705 max_count: 1,
706 replica_id: Some("replica-a".to_string()),
707 term: 5,
708 await_data: false,
709 await_timeout_ms: 1,
710 };
711 let mut request = Request::new(JsonPayloadRequest {
712 payload_json: String::from_utf8(open.encode_json()).expect("json"),
713 });
714 request.metadata_mut().insert(
715 "authorization",
716 format!("Bearer {}", bootstrap.api_key.key)
717 .parse()
718 .expect("metadata"),
719 );
720
721 let err = RedDb::pull_wal_records(&service, request)
722 .await
723 .expect_err("stale term should be fenced");
724
725 assert_eq!(err.code(), tonic::Code::FailedPrecondition);
726 assert!(
727 err.message().contains("stale")
728 || err.message().contains("fenced")
729 || err.message().contains("current term"),
730 "unexpected stale-term error: {err:?}"
731 );
732 }
733
734 fn seed_grpc_param_table(runtime: &RedDBRuntime) {
735 runtime
736 .execute_query("CREATE TABLE p (id INTEGER, name TEXT)")
737 .expect("create table");
738 runtime
739 .execute_query("INSERT INTO p (id, name) VALUES (1, 'Alice')")
740 .expect("insert alice");
741 runtime
742 .execute_query("INSERT INTO p (id, name) VALUES (2, 'Bob')")
743 .expect("insert bob");
744 }
745
746 fn grpc_param_values() -> Vec<QueryValue> {
747 vec![
748 QueryValue {
749 kind: Some(Kind::IntValue(1)),
750 },
751 QueryValue {
752 kind: Some(Kind::TextValue("Alice".to_string())),
753 },
754 ]
755 }
756
757 fn env_lock() -> &'static std::sync::Mutex<()> {
758 static LOCK: std::sync::OnceLock<std::sync::Mutex<()>> = std::sync::OnceLock::new();
759 LOCK.get_or_init(|| std::sync::Mutex::new(()))
760 }
761
762 struct EnvGuard {
763 previous: Vec<(&'static str, Option<String>)>,
764 }
765
766 impl EnvGuard {
767 fn set(vars: &[(&'static str, &'static str)]) -> Self {
768 let previous = vars
769 .iter()
770 .map(|(key, _)| (*key, std::env::var(key).ok()))
771 .collect();
772 for (key, value) in vars {
773 std::env::set_var(key, value);
774 }
775 Self { previous }
776 }
777 }
778
779 impl Drop for EnvGuard {
780 fn drop(&mut self) {
781 for (key, value) in self.previous.iter().rev() {
782 match value {
783 Some(value) => std::env::set_var(key, value),
784 None => std::env::remove_var(key),
785 }
786 }
787 }
788 }
789
790 fn temp_data_path(name: &str) -> std::path::PathBuf {
791 let suffix = SystemTime::now()
792 .duration_since(UNIX_EPOCH)
793 .unwrap()
794 .as_nanos();
795 std::env::temp_dir().join(format!("reddb_{name}_{suffix}.rdb"))
796 }
797
798 fn cleanup(data_path: &std::path::Path) {
799 let _ = std::fs::remove_file(data_path);
800 let _ = std::fs::remove_file(
801 crate::replication::primary::PrimaryReplication::slot_path_for(data_path),
802 );
803 let _ = std::fs::remove_file(crate::replication::primary::LogicalWalSpool::path_for(
804 data_path,
805 ));
806 let _ = std::fs::remove_dir_all(
807 crate::replication::primary::PrimaryReplication::primary_replica_root_for(data_path),
808 );
809 reddb_file::cleanup_rebootstrap_artifacts(data_path);
810 }
811}
812
813#[cfg(test)]
814mod grpc_ask_query_reply_tests {
815 use super::*;
816 use crate::storage::query::modes::QueryMode;
817 use crate::storage::query::unified::{UnifiedRecord, UnifiedResult};
818 use crate::storage::schema::Value as SchemaValue;
819
820 fn ask_runtime_result() -> RuntimeQueryResult {
821 let mut result = UnifiedResult::with_columns(vec![
822 "answer".into(),
823 "provider".into(),
824 "model".into(),
825 "mode".into(),
826 "retry_count".into(),
827 "prompt_tokens".into(),
828 "completion_tokens".into(),
829 "sources_flat".into(),
830 "citations".into(),
831 "validation".into(),
832 ]);
833 let mut record = UnifiedRecord::new();
834 record.set("answer", SchemaValue::text("Deploy failed [^1]."));
835 record.set("provider", SchemaValue::text("openai"));
836 record.set("model", SchemaValue::text("gpt-4o-mini"));
837 record.set("mode", SchemaValue::text("strict"));
838 record.set("retry_count", SchemaValue::Integer(0));
839 record.set("prompt_tokens", SchemaValue::Integer(11));
840 record.set("completion_tokens", SchemaValue::Integer(7));
841 record.set(
842 "sources_flat",
843 SchemaValue::Json(
844 br#"[{"urn":"urn:reddb:row:deployments:1","kind":"row","collection":"deployments","id":"1"}]"#.to_vec(),
845 ),
846 );
847 record.set(
848 "citations",
849 SchemaValue::Json(br#"[{"marker":1,"urn":"urn:reddb:row:deployments:1"}]"#.to_vec()),
850 );
851 record.set(
852 "validation",
853 SchemaValue::Json(br#"{"ok":true,"warnings":[],"errors":[]}"#.to_vec()),
854 );
855 result.push(record);
856
857 RuntimeQueryResult {
858 query: "ASK 'why did deploy fail?'".to_string(),
859 mode: QueryMode::Sql,
860 statement: "ask",
861 engine: "runtime-ai",
862 result,
863 affected_rows: 0,
864 statement_type: "select",
865 bookmark: None,
866 }
867 }
868
869 #[test]
870 fn query_reply_ask_result_json_uses_full_canonical_schema() {
871 let reply = query_reply(ask_runtime_result(), &None, &None);
872 let json: crate::json::Value =
873 crate::json::from_str(&reply.result_json).expect("valid ask json");
874
875 assert_eq!(
876 json.get("answer").and_then(crate::json::Value::as_str),
877 Some("Deploy failed [^1].")
878 );
879 assert_eq!(
880 json.get("cache_hit").and_then(crate::json::Value::as_bool),
881 Some(false)
882 );
883 assert_eq!(
884 json.get("cost_usd").and_then(crate::json::Value::as_f64),
885 Some(0.0)
886 );
887 assert_eq!(
888 json.get("mode").and_then(crate::json::Value::as_str),
889 Some("strict")
890 );
891 assert_eq!(
892 json.get("retry_count").and_then(crate::json::Value::as_u64),
893 Some(0)
894 );
895 assert!(
896 json.get("records").is_none(),
897 "ASK must not be row-wrapped: {}",
898 reply.result_json
899 );
900 assert!(
901 json.get("sources_flat")
902 .and_then(crate::json::Value::as_array)
903 .is_some_and(|sources| sources.len() == 1
904 && sources[0]
905 .get("payload")
906 .and_then(crate::json::Value::as_str)
907 .is_some()),
908 "sources_flat must be parsed with payload fallback: {}",
909 reply.result_json
910 );
911 assert!(
912 json.get("citations")
913 .and_then(crate::json::Value::as_array)
914 .is_some_and(|citations| citations.len() == 1),
915 "citations must be parsed: {}",
916 reply.result_json
917 );
918 assert_eq!(
919 json.get("validation")
920 .and_then(|v| v.get("ok"))
921 .and_then(crate::json::Value::as_bool),
922 Some(true)
923 );
924 }
925
926 #[test]
927 fn query_reply_non_ask_answer_column_keeps_row_shape() {
928 let mut result = UnifiedResult::with_columns(vec!["answer".into()]);
929 let mut record = UnifiedRecord::new();
930 record.set("answer", SchemaValue::text("plain select"));
931 result.push(record);
932
933 let reply = query_reply(
934 RuntimeQueryResult {
935 query: "SELECT 'plain select' AS answer".to_string(),
936 mode: QueryMode::Sql,
937 statement: "select",
938 engine: "runtime-sql",
939 result,
940 affected_rows: 0,
941 statement_type: "select",
942 bookmark: None,
943 },
944 &None,
945 &None,
946 );
947 let json: crate::json::Value =
948 crate::json::from_str(&reply.result_json).expect("valid query json");
949
950 assert!(
951 json.get("records").is_some(),
952 "non-ASK must stay row-wrapped"
953 );
954 assert!(
955 json.get("answer").is_none(),
956 "non-ASK must not use ASK envelope"
957 );
958 }
959
960 #[test]
961 fn query_reply_non_ask_json_column_preserves_object() {
962 let mut result = UnifiedResult::with_columns(vec!["value".into()]);
963 let mut record = UnifiedRecord::new();
964 record.set(
965 "value",
966 SchemaValue::Json(br#"{"alpha":"A","nested":{"leaf":12}}"#.to_vec()),
967 );
968 result.push(record);
969
970 let reply = query_reply(
971 RuntimeQueryResult {
972 query: "LIST KV proj AS JSON".to_string(),
973 mode: QueryMode::Sql,
974 statement: "kv_list_json",
975 engine: "kv",
976 result,
977 affected_rows: 0,
978 statement_type: "select",
979 bookmark: None,
980 },
981 &None,
982 &None,
983 );
984 let json: crate::json::Value =
985 crate::json::from_str(&reply.result_json).expect("valid query json");
986 let value = json
987 .get("records")
988 .and_then(crate::json::Value::as_array)
989 .and_then(|records| records.first())
990 .and_then(|record| record.get("value"))
991 .expect("value column");
992
993 assert_eq!(
994 value.get("alpha").and_then(crate::json::Value::as_str),
995 Some("A")
996 );
997 assert_eq!(
998 value
999 .get("nested")
1000 .and_then(|nested| nested.get("leaf"))
1001 .and_then(crate::json::Value::as_f64),
1002 Some(12.0)
1003 );
1004 }
1005}
1006
1007fn log_grpc_tls_identity(tls: &GrpcTlsOptions) {
1010 use sha2::{Digest, Sha256};
1011 let cert_fp = {
1012 let mut h = Sha256::new();
1013 h.update(&tls.cert_pem);
1014 let digest = h.finalize();
1015 let mut buf = String::with_capacity(64);
1018 for b in digest.iter() {
1019 buf.push_str(&format!("{b:02x}"));
1020 }
1021 buf
1022 };
1023 tracing::info!(
1024 target: "reddb::security",
1025 transport = "grpc",
1026 cert_sha256 = %cert_fp,
1027 mtls = tls.client_ca_pem.is_some(),
1028 "gRPC TLS identity loaded"
1029 );
1030}
1031
1032include!("grpc/service_impl.rs");