1pub(crate) use crate::application::json_input::{
2 json_bool_field, json_f32_field, json_string_field, json_usize_field,
3};
4pub(crate) use crate::application::{
5 AdminUseCases, CatalogUseCases, CreateEdgeInput, CreateEntityOutput, CreateNodeGraphLinkInput,
6 CreateNodeInput, CreateNodeTableLinkInput, CreateRowInput, CreateVectorInput,
7 DeleteEntityInput, EntityUseCases, ExecuteQueryInput, ExplainQueryInput, GraphCentralityInput,
8 GraphClusteringInput, GraphCommunitiesInput, GraphComponentsInput, GraphCyclesInput,
9 GraphHitsInput, GraphNeighborhoodInput, GraphPersonalizedPageRankInput, GraphShortestPathInput,
10 GraphTopologicalSortInput, GraphTraversalInput, GraphUseCases, InspectNativeArtifactInput,
11 NativeUseCases, PatchEntityInput, QueryUseCases, SearchHybridInput, SearchIvfInput,
12 SearchSimilarInput, SearchTextInput,
13};
14use std::collections::BTreeMap;
15use std::pin::Pin;
16use std::sync::Arc;
17use std::time::{SystemTime, UNIX_EPOCH};
18
19use crate::api::{RedDBOptions, RedDBResult};
20use crate::auth::middleware::{check_permission, AuthResult};
21use crate::auth::store::AuthStore;
22use crate::auth::Role;
23use crate::health::{HealthProvider, HealthState};
24use crate::json::{
25 from_str as json_from_str, to_string as json_to_string, Map, Value as JsonValue,
26};
27use crate::runtime::{
28 RedDBRuntime, RuntimeFilter, RuntimeFilterValue, RuntimeGraphCentralityAlgorithm,
29 RuntimeGraphCentralityResult, RuntimeGraphClusteringResult, RuntimeGraphCommunityAlgorithm,
30 RuntimeGraphCommunityResult, RuntimeGraphComponentsMode, RuntimeGraphComponentsResult,
31 RuntimeGraphCyclesResult, RuntimeGraphDirection, RuntimeGraphHitsResult,
32 RuntimeGraphNeighborhoodResult, RuntimeGraphPathAlgorithm, RuntimeGraphPathResult,
33 RuntimeGraphPattern, RuntimeGraphProjection, RuntimeGraphTopologicalSortResult,
34 RuntimeGraphTraversalResult, RuntimeGraphTraversalStrategy, RuntimeIvfSearchResult,
35 RuntimeQueryResult, RuntimeQueryWeights, RuntimeStats, ScanPage,
36};
37use crate::storage::schema::Value;
38use crate::storage::unified::devx::refs::{NodeRef, TableRef};
39use crate::storage::unified::{Metadata, MetadataValue};
40use crate::storage::{EntityData, EntityId, UnifiedEntity};
41use tokio_stream::wrappers::TcpListenerStream;
42use tonic::metadata::MetadataMap;
43use tonic::{Request, Response, Status};
44
45pub use reddb_grpc_proto as proto;
51
52use proto::red_db_server::{RedDb, RedDbServer};
53use proto::{
54 ask_stream_event, AskAnswerToken, AskReply, AskRequest, AskSources, AskStreamEvent,
55 BatchQueryReply, BatchQueryRequest, BulkEntityReply, Citation, CollectionRequest,
56 CollectionsReply, DeleteEntityRequest, DeploymentProfileRequest, Empty, EntityReply,
57 ExecutePreparedRequest, ExportRequest, GraphProjectionUpsertRequest, HealthReply,
58 IndexNameRequest, IndexToggleRequest, JsonBulkCreateRequest, JsonCreateRequest,
59 JsonPayloadRequest, KvWatchEvent, KvWatchRequest, ManifestRequest, OperationReply,
60 PayloadReply, PrepareQueryReply, PrepareQueryRequest, QueryReply, QueryRequest, QueryValue,
61 ScanEntity, ScanReply, ScanRequest, StatsReply, TopologyReply, TopologyRequest,
62 UpdateEntityRequest, Validation, ValidationItem,
63};
64
65mod control_support;
66mod entity_ops;
67mod input_support;
68pub(crate) mod scan_json;
69
70use self::control_support::*;
71use self::entity_ops::*;
72use self::input_support::*;
73use self::scan_json::*;
74
75#[derive(Debug, Clone)]
76pub struct GrpcServerOptions {
77 pub bind_addr: String,
78 pub tls: Option<GrpcTlsOptions>,
83}
84
85#[derive(Debug, Clone)]
91pub struct GrpcTlsOptions {
92 pub cert_pem: Vec<u8>,
94 pub key_pem: Vec<u8>,
96 pub client_ca_pem: Option<Vec<u8>>,
101}
102
103impl GrpcTlsOptions {
104 pub fn to_tonic_config(
108 &self,
109 ) -> Result<tonic::transport::ServerTlsConfig, Box<dyn std::error::Error>> {
110 let identity = tonic::transport::Identity::from_pem(&self.cert_pem, &self.key_pem);
111 let mut cfg = tonic::transport::ServerTlsConfig::new().identity(identity);
112 if let Some(ca_pem) = &self.client_ca_pem {
113 cfg = cfg.client_ca_root(tonic::transport::Certificate::from_pem(ca_pem));
114 }
115 Ok(cfg)
116 }
117}
118
119impl Default for GrpcServerOptions {
120 fn default() -> Self {
121 Self {
122 bind_addr: "127.0.0.1:5555".to_string(),
123 tls: None,
124 }
125 }
126}
127
128#[derive(Clone)]
129pub struct RedDBGrpcServer {
130 runtime: RedDBRuntime,
131 options: GrpcServerOptions,
132 auth_store: Arc<AuthStore>,
133 oauth_validator: Option<Arc<crate::auth::OAuthValidator>>,
139}
140
141impl RedDBGrpcServer {
142 pub fn new(runtime: RedDBRuntime) -> Self {
143 let auth_config = crate::auth::AuthConfig::default();
144 let auth_store = Arc::new(AuthStore::new(auth_config));
145 Self::with_options(runtime, GrpcServerOptions::default(), auth_store)
146 }
147
148 pub fn from_database_options(
149 db_options: RedDBOptions,
150 options: GrpcServerOptions,
151 ) -> RedDBResult<Self> {
152 let runtime = RedDBRuntime::with_options(db_options.clone())?;
154
155 let auth_store = if db_options.auth.vault_enabled {
156 let pager = runtime.db().store().pager().cloned().ok_or_else(|| {
160 crate::api::RedDBError::Internal(
161 "vault requires a paged database (persistent mode)".into(),
162 )
163 })?;
164 let store = AuthStore::with_vault(db_options.auth.clone(), pager, None)
165 .map_err(|e| crate::api::RedDBError::Internal(e.to_string()))?;
166 Arc::new(store)
167 } else {
168 Arc::new(AuthStore::new(db_options.auth.clone()))
169 };
170 auth_store.bootstrap_from_env();
171 Ok(Self::with_options(runtime, options, auth_store))
172 }
173
174 pub fn with_options(
175 runtime: RedDBRuntime,
176 options: GrpcServerOptions,
177 auth_store: Arc<AuthStore>,
178 ) -> Self {
179 runtime.set_auth_store(Arc::clone(&auth_store));
182 Self {
183 runtime,
184 options,
185 auth_store,
186 oauth_validator: None,
187 }
188 }
189
190 pub fn with_oauth_validator(mut self, validator: Arc<crate::auth::OAuthValidator>) -> Self {
196 self.oauth_validator = Some(validator);
197 self
198 }
199
200 pub fn oauth_validator(&self) -> Option<&Arc<crate::auth::OAuthValidator>> {
202 self.oauth_validator.as_ref()
203 }
204
205 pub fn runtime(&self) -> &RedDBRuntime {
206 &self.runtime
207 }
208
209 pub fn options(&self) -> &GrpcServerOptions {
210 &self.options
211 }
212
213 pub fn auth_store(&self) -> &Arc<AuthStore> {
214 &self.auth_store
215 }
216
217 fn grpc_runtime(&self) -> GrpcRuntime {
218 GrpcRuntime {
219 runtime: self.runtime.clone(),
220 auth_store: self.auth_store.clone(),
221 prepared_registry: PreparedStatementRegistry::new(),
222 oauth_validator: self.oauth_validator.clone(),
223 }
224 }
225
226 pub async fn serve(&self) -> Result<(), Box<dyn std::error::Error>> {
227 let addr = self.options.bind_addr.parse()?;
228 let mut builder = tonic::transport::Server::builder();
229 if let Some(tls) = &self.options.tls {
230 log_grpc_tls_identity(tls);
233 builder = builder.tls_config(tls.to_tonic_config()?)?;
234 }
235 builder
236 .add_service(Self::configured_service(self.grpc_runtime()))
237 .serve(addr)
238 .await?;
239 Ok(())
240 }
241
242 pub async fn serve_on(
243 &self,
244 listener: std::net::TcpListener,
245 ) -> Result<(), Box<dyn std::error::Error>> {
246 listener.set_nonblocking(true)?;
247 let listener = tokio::net::TcpListener::from_std(listener)?;
248 let incoming = TcpListenerStream::new(listener);
249 let mut builder = tonic::transport::Server::builder();
250 if let Some(tls) = &self.options.tls {
251 log_grpc_tls_identity(tls);
252 builder = builder.tls_config(tls.to_tonic_config()?)?;
253 }
254 builder
255 .add_service(Self::configured_service(self.grpc_runtime()))
256 .serve_with_incoming(incoming)
257 .await?;
258 Ok(())
259 }
260
261 fn configured_service(runtime: GrpcRuntime) -> RedDbServer<GrpcRuntime> {
262 use tonic::codec::CompressionEncoding;
266 RedDbServer::new(runtime)
267 .max_decoding_message_size(256 * 1024 * 1024)
268 .max_encoding_message_size(256 * 1024 * 1024)
269 .accept_compressed(CompressionEncoding::Zstd)
270 .accept_compressed(CompressionEncoding::Gzip)
271 .send_compressed(CompressionEncoding::Zstd)
272 }
273}
274
275struct GrpcPreparedStatement {
277 shape: std::sync::Arc<crate::storage::query::ast::QueryExpr>,
278 parameter_count: usize,
279 created_at: std::time::Instant,
280}
281
282struct PreparedStatementRegistry {
285 map: parking_lot::RwLock<std::collections::HashMap<u64, GrpcPreparedStatement>>,
287 next_id: std::sync::atomic::AtomicU64,
288 get_count: std::sync::atomic::AtomicU64,
289}
290
291impl PreparedStatementRegistry {
292 fn new() -> Arc<Self> {
293 Arc::new(Self {
294 map: parking_lot::RwLock::new(std::collections::HashMap::new()),
295 next_id: std::sync::atomic::AtomicU64::new(1),
296 get_count: std::sync::atomic::AtomicU64::new(0),
297 })
298 }
299
300 fn prepare(&self, shape: crate::storage::query::ast::QueryExpr, parameter_count: usize) -> u64 {
301 use std::sync::atomic::Ordering;
302 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
303 let mut map = self.map.write();
304 self.evict_old_locked(&mut map);
305 map.insert(
306 id,
307 GrpcPreparedStatement {
308 shape: std::sync::Arc::new(shape),
310 parameter_count,
311 created_at: std::time::Instant::now(),
312 },
313 );
314 id
315 }
316
317 fn get_shape_and_count(
318 &self,
319 id: u64,
320 ) -> Option<(std::sync::Arc<crate::storage::query::ast::QueryExpr>, usize)> {
321 let get_count = self
324 .get_count
325 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
326 + 1;
327 if get_count.is_multiple_of(256) {
328 let mut map = self.map.write();
329 self.evict_old_locked(&mut map);
330 }
331 let map = self.map.read();
332 map.get(&id)
333 .map(|s| (std::sync::Arc::clone(&s.shape), s.parameter_count))
334 }
335
336 fn evict_old_locked(&self, map: &mut std::collections::HashMap<u64, GrpcPreparedStatement>) {
337 let threshold = std::time::Duration::from_secs(3600);
338 map.retain(|_, v| v.created_at.elapsed() < threshold);
339 }
340}
341
342#[derive(Clone)]
343struct GrpcRuntime {
344 runtime: RedDBRuntime,
345 auth_store: Arc<AuthStore>,
346 prepared_registry: Arc<PreparedStatementRegistry>,
347 oauth_validator: Option<Arc<crate::auth::OAuthValidator>>,
351}
352
353impl GrpcRuntime {
354 fn admin_use_cases(&self) -> AdminUseCases<'_, RedDBRuntime> {
355 AdminUseCases::new(&self.runtime)
356 }
357
358 fn catalog_use_cases(&self) -> CatalogUseCases<'_, RedDBRuntime> {
359 CatalogUseCases::new(&self.runtime)
360 }
361
362 fn query_use_cases(&self) -> QueryUseCases<'_, RedDBRuntime> {
363 QueryUseCases::new(&self.runtime)
364 }
365
366 fn entity_use_cases(&self) -> EntityUseCases<'_, RedDBRuntime> {
367 EntityUseCases::new(&self.runtime)
368 }
369
370 fn graph_use_cases(&self) -> GraphUseCases<'_, RedDBRuntime> {
371 GraphUseCases::new(&self.runtime)
372 }
373
374 fn native_use_cases(&self) -> NativeUseCases<'_, RedDBRuntime> {
375 NativeUseCases::new(&self.runtime)
376 }
377}
378
379fn grpc_query_value_to_schema_value(value: QueryValue) -> Result<Value, Status> {
380 use proto::query_value::Kind;
381
382 match value
383 .kind
384 .ok_or_else(|| Status::invalid_argument("missing query param value"))?
385 {
386 Kind::NullValue(_) => Ok(Value::Null),
387 Kind::BoolValue(value) => Ok(Value::Boolean(value)),
388 Kind::IntValue(value) => Ok(Value::Integer(value)),
389 Kind::FloatValue(value) => Ok(Value::Float(value)),
390 Kind::TextValue(value) => Ok(Value::Text(std::sync::Arc::from(value))),
391 Kind::BytesValue(value) => Ok(Value::Blob(value)),
392 Kind::VectorValue(value) => Ok(Value::Vector(value.values)),
393 Kind::JsonValue(value) => {
394 let parsed = json_from_str::<JsonValue>(&value)
395 .map_err(|e| Status::invalid_argument(format!("json param parse error: {e}")))?;
396 let encoded = json_to_string(&parsed)
397 .map_err(|e| Status::invalid_argument(format!("json param encode error: {e}")))?;
398 Ok(Value::Json(encoded.into_bytes()))
399 }
400 Kind::TimestampValue(value) => Ok(Value::Timestamp(value)),
401 Kind::UuidValue(value) => {
402 let bytes: [u8; 16] = value.try_into().map_err(|value: Vec<u8>| {
403 Status::invalid_argument(format!(
404 "uuid param must be 16 bytes, got {}",
405 value.len()
406 ))
407 })?;
408 Ok(Value::Uuid(bytes))
409 }
410 }
411}
412
413fn execute_grpc_query_with_optional_params(
414 runtime: &RedDBRuntime,
415 query: String,
416 params: Vec<QueryValue>,
417) -> Result<RuntimeQueryResult, Status> {
418 if params.is_empty() {
419 return runtime.execute_query(&query).map_err(to_status);
420 }
421
422 let binds = params
423 .into_iter()
424 .map(grpc_query_value_to_schema_value)
425 .collect::<Result<Vec<_>, _>>()?;
426 let parsed = crate::storage::query::modes::parse_multi(&query)
427 .map_err(|e| Status::invalid_argument(format!("parse error: {e}")))?;
428 let bound = crate::storage::query::user_params::bind(&parsed, &binds)
429 .map_err(|e| Status::invalid_argument(format!("bind error: {e}")))?;
430 runtime.execute_query_expr(bound).map_err(to_status)
431}
432
433#[cfg(test)]
434mod grpc_query_value_tests {
435 use super::*;
436 use proto::query_value::Kind;
437
438 #[test]
439 fn grpc_query_value_maps_to_schema_value_variants() {
440 let cases = vec![
441 (
442 QueryValue {
443 kind: Some(Kind::NullValue(proto::QueryNull {})),
444 },
445 Value::Null,
446 ),
447 (
448 QueryValue {
449 kind: Some(Kind::BoolValue(true)),
450 },
451 Value::Boolean(true),
452 ),
453 (
454 QueryValue {
455 kind: Some(Kind::IntValue(42)),
456 },
457 Value::Integer(42),
458 ),
459 (
460 QueryValue {
461 kind: Some(Kind::FloatValue(1.5)),
462 },
463 Value::Float(1.5),
464 ),
465 (
466 QueryValue {
467 kind: Some(Kind::BytesValue(vec![0, 1, 2])),
468 },
469 Value::Blob(vec![0, 1, 2]),
470 ),
471 (
472 QueryValue {
473 kind: Some(Kind::VectorValue(proto::QueryVector {
474 values: vec![0.25, 0.5],
475 })),
476 },
477 Value::Vector(vec![0.25, 0.5]),
478 ),
479 (
480 QueryValue {
481 kind: Some(Kind::TimestampValue(1_779_999_000)),
482 },
483 Value::Timestamp(1_779_999_000),
484 ),
485 (
486 QueryValue {
487 kind: Some(Kind::UuidValue(vec![0x11; 16])),
488 },
489 Value::Uuid([0x11; 16]),
490 ),
491 ];
492
493 for (input, expected) in cases {
494 assert_eq!(grpc_query_value_to_schema_value(input).unwrap(), expected);
495 }
496
497 assert_eq!(
498 grpc_query_value_to_schema_value(QueryValue {
499 kind: Some(Kind::TextValue("alice".into())),
500 })
501 .unwrap(),
502 Value::Text(std::sync::Arc::from("alice"))
503 );
504 assert_eq!(
505 grpc_query_value_to_schema_value(QueryValue {
506 kind: Some(Kind::JsonValue("{\"role\":\"admin\"}".into())),
507 })
508 .unwrap(),
509 Value::Json(b"{\"role\":\"admin\"}".to_vec())
510 );
511 }
512
513 #[test]
514 fn grpc_query_value_rejects_missing_kind_and_bad_uuid() {
515 assert!(grpc_query_value_to_schema_value(QueryValue { kind: None }).is_err());
516 assert!(grpc_query_value_to_schema_value(QueryValue {
517 kind: Some(Kind::UuidValue(vec![0; 15])),
518 })
519 .is_err());
520 }
521}
522
523#[cfg(test)]
524mod grpc_ask_query_reply_tests {
525 use super::*;
526 use crate::storage::query::modes::QueryMode;
527 use crate::storage::query::unified::{UnifiedRecord, UnifiedResult};
528 use crate::storage::schema::Value as SchemaValue;
529
530 fn ask_runtime_result() -> RuntimeQueryResult {
531 let mut result = UnifiedResult::with_columns(vec![
532 "answer".into(),
533 "provider".into(),
534 "model".into(),
535 "mode".into(),
536 "retry_count".into(),
537 "prompt_tokens".into(),
538 "completion_tokens".into(),
539 "sources_flat".into(),
540 "citations".into(),
541 "validation".into(),
542 ]);
543 let mut record = UnifiedRecord::new();
544 record.set("answer", SchemaValue::text("Deploy failed [^1]."));
545 record.set("provider", SchemaValue::text("openai"));
546 record.set("model", SchemaValue::text("gpt-4o-mini"));
547 record.set("mode", SchemaValue::text("strict"));
548 record.set("retry_count", SchemaValue::Integer(0));
549 record.set("prompt_tokens", SchemaValue::Integer(11));
550 record.set("completion_tokens", SchemaValue::Integer(7));
551 record.set(
552 "sources_flat",
553 SchemaValue::Json(
554 br#"[{"urn":"urn:reddb:row:deployments:1","kind":"row","collection":"deployments","id":"1"}]"#.to_vec(),
555 ),
556 );
557 record.set(
558 "citations",
559 SchemaValue::Json(br#"[{"marker":1,"urn":"urn:reddb:row:deployments:1"}]"#.to_vec()),
560 );
561 record.set(
562 "validation",
563 SchemaValue::Json(br#"{"ok":true,"warnings":[],"errors":[]}"#.to_vec()),
564 );
565 result.push(record);
566
567 RuntimeQueryResult {
568 query: "ASK 'why did deploy fail?'".to_string(),
569 mode: QueryMode::Sql,
570 statement: "ask",
571 engine: "runtime-ai",
572 result,
573 affected_rows: 0,
574 statement_type: "select",
575 }
576 }
577
578 #[test]
579 fn query_reply_ask_result_json_uses_full_canonical_schema() {
580 let reply = query_reply(ask_runtime_result(), &None, &None);
581 let json: crate::json::Value =
582 crate::json::from_str(&reply.result_json).expect("valid ask json");
583
584 assert_eq!(
585 json.get("answer").and_then(crate::json::Value::as_str),
586 Some("Deploy failed [^1].")
587 );
588 assert_eq!(
589 json.get("cache_hit").and_then(crate::json::Value::as_bool),
590 Some(false)
591 );
592 assert_eq!(
593 json.get("cost_usd").and_then(crate::json::Value::as_f64),
594 Some(0.0)
595 );
596 assert_eq!(
597 json.get("mode").and_then(crate::json::Value::as_str),
598 Some("strict")
599 );
600 assert_eq!(
601 json.get("retry_count").and_then(crate::json::Value::as_u64),
602 Some(0)
603 );
604 assert!(
605 json.get("records").is_none(),
606 "ASK must not be row-wrapped: {}",
607 reply.result_json
608 );
609 assert!(
610 json.get("sources_flat")
611 .and_then(crate::json::Value::as_array)
612 .is_some_and(|sources| sources.len() == 1
613 && sources[0]
614 .get("payload")
615 .and_then(crate::json::Value::as_str)
616 .is_some()),
617 "sources_flat must be parsed with payload fallback: {}",
618 reply.result_json
619 );
620 assert!(
621 json.get("citations")
622 .and_then(crate::json::Value::as_array)
623 .is_some_and(|citations| citations.len() == 1),
624 "citations must be parsed: {}",
625 reply.result_json
626 );
627 assert_eq!(
628 json.get("validation")
629 .and_then(|v| v.get("ok"))
630 .and_then(crate::json::Value::as_bool),
631 Some(true)
632 );
633 }
634
635 #[test]
636 fn query_reply_non_ask_answer_column_keeps_row_shape() {
637 let mut result = UnifiedResult::with_columns(vec!["answer".into()]);
638 let mut record = UnifiedRecord::new();
639 record.set("answer", SchemaValue::text("plain select"));
640 result.push(record);
641
642 let reply = query_reply(
643 RuntimeQueryResult {
644 query: "SELECT 'plain select' AS answer".to_string(),
645 mode: QueryMode::Sql,
646 statement: "select",
647 engine: "runtime-sql",
648 result,
649 affected_rows: 0,
650 statement_type: "select",
651 },
652 &None,
653 &None,
654 );
655 let json: crate::json::Value =
656 crate::json::from_str(&reply.result_json).expect("valid query json");
657
658 assert!(
659 json.get("records").is_some(),
660 "non-ASK must stay row-wrapped"
661 );
662 assert!(
663 json.get("answer").is_none(),
664 "non-ASK must not use ASK envelope"
665 );
666 }
667}
668
669fn log_grpc_tls_identity(tls: &GrpcTlsOptions) {
672 use sha2::{Digest, Sha256};
673 let cert_fp = {
674 let mut h = Sha256::new();
675 h.update(&tls.cert_pem);
676 let digest = h.finalize();
677 let mut buf = String::with_capacity(64);
680 for b in digest.iter() {
681 buf.push_str(&format!("{b:02x}"));
682 }
683 buf
684 };
685 tracing::info!(
686 target: "reddb::security",
687 transport = "grpc",
688 cert_sha256 = %cert_fp,
689 mtls = tls.client_ca_pem.is_some(),
690 "gRPC TLS identity loaded"
691 );
692}
693
694include!("grpc/service_impl.rs");