1pub(crate) use crate::application::json_input::{
2 json_bool_field, json_f32_field, json_string_field, json_usize_field,
3};
4pub(crate) use crate::application::{
5 AdminUseCases, CatalogUseCases, CreateEdgeInput, CreateEntityOutput, CreateNodeGraphLinkInput,
6 CreateNodeInput, CreateNodeTableLinkInput, CreateRowInput, CreateVectorInput,
7 DeleteEntityInput, EntityUseCases, ExecuteQueryInput, ExplainQueryInput, GraphCentralityInput,
8 GraphClusteringInput, GraphCommunitiesInput, GraphComponentsInput, GraphCyclesInput,
9 GraphHitsInput, GraphNeighborhoodInput, GraphPersonalizedPageRankInput, GraphShortestPathInput,
10 GraphTopologicalSortInput, GraphTraversalInput, GraphUseCases, InspectNativeArtifactInput,
11 NativeUseCases, PatchEntityInput, QueryUseCases, SearchHybridInput, SearchIvfInput,
12 SearchSimilarInput, SearchTextInput,
13};
14use std::collections::BTreeMap;
15use std::pin::Pin;
16use std::sync::Arc;
17use std::time::{SystemTime, UNIX_EPOCH};
18
19use crate::api::{RedDBOptions, RedDBResult};
20use crate::auth::middleware::{check_permission, AuthResult};
21use crate::auth::store::AuthStore;
22use crate::auth::Role;
23use crate::health::{HealthProvider, HealthState};
24use crate::json::{
25 from_str as json_from_str, to_string as json_to_string, Map, Value as JsonValue,
26};
27use crate::runtime::{
28 RedDBRuntime, RuntimeFilter, RuntimeFilterValue, RuntimeGraphCentralityAlgorithm,
29 RuntimeGraphCentralityResult, RuntimeGraphClusteringResult, RuntimeGraphCommunityAlgorithm,
30 RuntimeGraphCommunityResult, RuntimeGraphComponentsMode, RuntimeGraphComponentsResult,
31 RuntimeGraphCyclesResult, RuntimeGraphDirection, RuntimeGraphHitsResult,
32 RuntimeGraphNeighborhoodResult, RuntimeGraphPathAlgorithm, RuntimeGraphPathResult,
33 RuntimeGraphPattern, RuntimeGraphProjection, RuntimeGraphTopologicalSortResult,
34 RuntimeGraphTraversalResult, RuntimeGraphTraversalStrategy, RuntimeIvfSearchResult,
35 RuntimeQueryResult, RuntimeQueryWeights, RuntimeStats, ScanPage,
36};
37use crate::storage::schema::Value;
38use crate::storage::unified::devx::refs::{NodeRef, TableRef};
39use crate::storage::unified::{Metadata, MetadataValue};
40use crate::storage::{EntityData, EntityId, UnifiedEntity};
41use tokio_stream::wrappers::TcpListenerStream;
42use tonic::metadata::MetadataMap;
43use tonic::{Request, Response, Status};
44
45pub use reddb_grpc_proto as proto;
51
52use proto::red_db_server::{RedDb, RedDbServer};
53use proto::{
54 ask_stream_event, AskAnswerToken, AskReply, AskRequest, AskSources, AskStreamEvent,
55 BatchInsertChunk, BatchInsertReply, BatchQueryReply, BatchQueryRequest, BulkEntityReply,
56 Citation, CollectionRequest,
57 CollectionsReply, DeleteEntityRequest, DeploymentProfileRequest, Empty, EntityReply,
58 ExecutePreparedRequest, ExportRequest, GraphProjectionUpsertRequest, HealthReply,
59 IndexNameRequest, IndexToggleRequest, JsonBulkCreateRequest, JsonCreateRequest,
60 JsonPayloadRequest, KvWatchEvent, KvWatchRequest, ManifestRequest, OperationReply,
61 PayloadReply, PrepareQueryReply, PrepareQueryRequest, QueryReply, QueryRequest, QueryValue,
62 ScanEntity, ScanReply, ScanRequest, StatsReply, TopologyReply, TopologyRequest,
63 UpdateEntityRequest, Validation, ValidationItem,
64};
65
66mod control_support;
67mod entity_ops;
68mod input_support;
69pub(crate) mod scan_json;
70
71use self::control_support::*;
72use self::entity_ops::*;
73use self::input_support::*;
74use self::scan_json::*;
75
76#[derive(Debug, Clone)]
77pub struct GrpcServerOptions {
78 pub bind_addr: String,
79 pub tls: Option<GrpcTlsOptions>,
84}
85
86#[derive(Debug, Clone)]
92pub struct GrpcTlsOptions {
93 pub cert_pem: Vec<u8>,
95 pub key_pem: Vec<u8>,
97 pub client_ca_pem: Option<Vec<u8>>,
102}
103
104impl GrpcTlsOptions {
105 pub fn to_tonic_config(
109 &self,
110 ) -> Result<tonic::transport::ServerTlsConfig, Box<dyn std::error::Error>> {
111 let identity = tonic::transport::Identity::from_pem(&self.cert_pem, &self.key_pem);
112 let mut cfg = tonic::transport::ServerTlsConfig::new().identity(identity);
113 if let Some(ca_pem) = &self.client_ca_pem {
114 cfg = cfg.client_ca_root(tonic::transport::Certificate::from_pem(ca_pem));
115 }
116 Ok(cfg)
117 }
118}
119
120impl Default for GrpcServerOptions {
121 fn default() -> Self {
122 Self {
123 bind_addr: "127.0.0.1:5555".to_string(),
124 tls: None,
125 }
126 }
127}
128
129#[derive(Clone)]
130pub struct RedDBGrpcServer {
131 runtime: RedDBRuntime,
132 options: GrpcServerOptions,
133 auth_store: Arc<AuthStore>,
134 oauth_validator: Option<Arc<crate::auth::OAuthValidator>>,
140}
141
142impl RedDBGrpcServer {
143 pub fn new(runtime: RedDBRuntime) -> Self {
144 let auth_config = crate::auth::AuthConfig::default();
145 let auth_store = Arc::new(AuthStore::new(auth_config));
146 Self::with_options(runtime, GrpcServerOptions::default(), auth_store)
147 }
148
149 pub fn from_database_options(
150 db_options: RedDBOptions,
151 options: GrpcServerOptions,
152 ) -> RedDBResult<Self> {
153 let runtime = RedDBRuntime::with_options(db_options.clone())?;
155
156 let auth_store = if db_options.auth.vault_enabled {
157 let pager = runtime.db().store().pager().cloned().ok_or_else(|| {
161 crate::api::RedDBError::Internal(
162 "vault requires a paged database (persistent mode)".into(),
163 )
164 })?;
165 let store = AuthStore::with_vault(db_options.auth.clone(), pager, None)
166 .map_err(|e| crate::api::RedDBError::Internal(e.to_string()))?;
167 Arc::new(store)
168 } else {
169 Arc::new(AuthStore::new(db_options.auth.clone()))
170 };
171 auth_store.bootstrap_from_env();
172 Ok(Self::with_options(runtime, options, auth_store))
173 }
174
175 pub fn with_options(
176 runtime: RedDBRuntime,
177 options: GrpcServerOptions,
178 auth_store: Arc<AuthStore>,
179 ) -> Self {
180 runtime.set_auth_store(Arc::clone(&auth_store));
183 Self {
184 runtime,
185 options,
186 auth_store,
187 oauth_validator: None,
188 }
189 }
190
191 pub fn with_oauth_validator(mut self, validator: Arc<crate::auth::OAuthValidator>) -> Self {
197 self.oauth_validator = Some(validator);
198 self
199 }
200
201 pub fn oauth_validator(&self) -> Option<&Arc<crate::auth::OAuthValidator>> {
203 self.oauth_validator.as_ref()
204 }
205
206 pub fn runtime(&self) -> &RedDBRuntime {
207 &self.runtime
208 }
209
210 pub fn options(&self) -> &GrpcServerOptions {
211 &self.options
212 }
213
214 pub fn auth_store(&self) -> &Arc<AuthStore> {
215 &self.auth_store
216 }
217
218 fn grpc_runtime(&self) -> GrpcRuntime {
219 GrpcRuntime {
220 runtime: self.runtime.clone(),
221 auth_store: self.auth_store.clone(),
222 prepared_registry: PreparedStatementRegistry::new(),
223 oauth_validator: self.oauth_validator.clone(),
224 }
225 }
226
227 pub async fn serve(&self) -> Result<(), Box<dyn std::error::Error>> {
228 let addr = self.options.bind_addr.parse()?;
229 let mut builder = tonic::transport::Server::builder();
230 if let Some(tls) = &self.options.tls {
231 log_grpc_tls_identity(tls);
234 builder = builder.tls_config(tls.to_tonic_config()?)?;
235 }
236 builder
237 .add_service(Self::configured_service(self.grpc_runtime()))
238 .serve(addr)
239 .await?;
240 Ok(())
241 }
242
243 pub async fn serve_on(
244 &self,
245 listener: std::net::TcpListener,
246 ) -> Result<(), Box<dyn std::error::Error>> {
247 listener.set_nonblocking(true)?;
248 let listener = tokio::net::TcpListener::from_std(listener)?;
249 let incoming = TcpListenerStream::new(listener);
250 let mut builder = tonic::transport::Server::builder();
251 if let Some(tls) = &self.options.tls {
252 log_grpc_tls_identity(tls);
253 builder = builder.tls_config(tls.to_tonic_config()?)?;
254 }
255 builder
256 .add_service(Self::configured_service(self.grpc_runtime()))
257 .serve_with_incoming(incoming)
258 .await?;
259 Ok(())
260 }
261
262 fn configured_service(runtime: GrpcRuntime) -> RedDbServer<GrpcRuntime> {
263 use tonic::codec::CompressionEncoding;
267 RedDbServer::new(runtime)
268 .max_decoding_message_size(256 * 1024 * 1024)
269 .max_encoding_message_size(256 * 1024 * 1024)
270 .accept_compressed(CompressionEncoding::Zstd)
271 .accept_compressed(CompressionEncoding::Gzip)
272 .send_compressed(CompressionEncoding::Zstd)
273 }
274}
275
276struct GrpcPreparedStatement {
278 shape: std::sync::Arc<crate::storage::query::ast::QueryExpr>,
279 parameter_count: usize,
280 created_at: std::time::Instant,
281}
282
283struct PreparedStatementRegistry {
286 map: parking_lot::RwLock<std::collections::HashMap<u64, GrpcPreparedStatement>>,
288 next_id: std::sync::atomic::AtomicU64,
289 get_count: std::sync::atomic::AtomicU64,
290}
291
292impl PreparedStatementRegistry {
293 fn new() -> Arc<Self> {
294 Arc::new(Self {
295 map: parking_lot::RwLock::new(std::collections::HashMap::new()),
296 next_id: std::sync::atomic::AtomicU64::new(1),
297 get_count: std::sync::atomic::AtomicU64::new(0),
298 })
299 }
300
301 fn prepare(&self, shape: crate::storage::query::ast::QueryExpr, parameter_count: usize) -> u64 {
302 use std::sync::atomic::Ordering;
303 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
304 let mut map = self.map.write();
305 self.evict_old_locked(&mut map);
306 map.insert(
307 id,
308 GrpcPreparedStatement {
309 shape: std::sync::Arc::new(shape),
311 parameter_count,
312 created_at: std::time::Instant::now(),
313 },
314 );
315 id
316 }
317
318 fn get_shape_and_count(
319 &self,
320 id: u64,
321 ) -> Option<(std::sync::Arc<crate::storage::query::ast::QueryExpr>, usize)> {
322 let get_count = self
325 .get_count
326 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
327 + 1;
328 if get_count.is_multiple_of(256) {
329 let mut map = self.map.write();
330 self.evict_old_locked(&mut map);
331 }
332 let map = self.map.read();
333 map.get(&id)
334 .map(|s| (std::sync::Arc::clone(&s.shape), s.parameter_count))
335 }
336
337 fn evict_old_locked(&self, map: &mut std::collections::HashMap<u64, GrpcPreparedStatement>) {
338 let threshold = std::time::Duration::from_secs(3600);
339 map.retain(|_, v| v.created_at.elapsed() < threshold);
340 }
341}
342
343#[derive(Clone)]
344struct GrpcRuntime {
345 runtime: RedDBRuntime,
346 auth_store: Arc<AuthStore>,
347 prepared_registry: Arc<PreparedStatementRegistry>,
348 oauth_validator: Option<Arc<crate::auth::OAuthValidator>>,
352}
353
354impl GrpcRuntime {
355 fn admin_use_cases(&self) -> AdminUseCases<'_, RedDBRuntime> {
356 AdminUseCases::new(&self.runtime)
357 }
358
359 fn catalog_use_cases(&self) -> CatalogUseCases<'_, RedDBRuntime> {
360 CatalogUseCases::new(&self.runtime)
361 }
362
363 fn query_use_cases(&self) -> QueryUseCases<'_, RedDBRuntime> {
364 QueryUseCases::new(&self.runtime)
365 }
366
367 fn entity_use_cases(&self) -> EntityUseCases<'_, RedDBRuntime> {
368 EntityUseCases::new(&self.runtime)
369 }
370
371 fn graph_use_cases(&self) -> GraphUseCases<'_, RedDBRuntime> {
372 GraphUseCases::new(&self.runtime)
373 }
374
375 fn native_use_cases(&self) -> NativeUseCases<'_, RedDBRuntime> {
376 NativeUseCases::new(&self.runtime)
377 }
378}
379
380fn grpc_query_value_to_schema_value(value: QueryValue) -> Result<Value, Status> {
381 use proto::query_value::Kind;
382
383 match value
384 .kind
385 .ok_or_else(|| Status::invalid_argument("missing query param value"))?
386 {
387 Kind::NullValue(_) => Ok(Value::Null),
388 Kind::BoolValue(value) => Ok(Value::Boolean(value)),
389 Kind::IntValue(value) => Ok(Value::Integer(value)),
390 Kind::FloatValue(value) => Ok(Value::Float(value)),
391 Kind::TextValue(value) => Ok(Value::Text(std::sync::Arc::from(value))),
392 Kind::BytesValue(value) => Ok(Value::Blob(value)),
393 Kind::VectorValue(value) => Ok(Value::Vector(value.values)),
394 Kind::JsonValue(value) => {
395 let parsed = json_from_str::<JsonValue>(&value)
396 .map_err(|e| Status::invalid_argument(format!("json param parse error: {e}")))?;
397 let encoded = json_to_string(&parsed)
398 .map_err(|e| Status::invalid_argument(format!("json param encode error: {e}")))?;
399 Ok(Value::Json(encoded.into_bytes()))
400 }
401 Kind::TimestampValue(value) => Ok(Value::Timestamp(value)),
402 Kind::UuidValue(value) => {
403 let bytes: [u8; 16] = value.try_into().map_err(|value: Vec<u8>| {
404 Status::invalid_argument(format!(
405 "uuid param must be 16 bytes, got {}",
406 value.len()
407 ))
408 })?;
409 Ok(Value::Uuid(bytes))
410 }
411 }
412}
413
414fn execute_grpc_query_with_optional_params(
415 runtime: &RedDBRuntime,
416 query: String,
417 params: Vec<QueryValue>,
418) -> Result<RuntimeQueryResult, Status> {
419 if params.is_empty() {
420 return runtime.execute_query(&query).map_err(to_status);
421 }
422
423 let binds = params
424 .into_iter()
425 .map(grpc_query_value_to_schema_value)
426 .collect::<Result<Vec<_>, _>>()?;
427 let parsed = crate::storage::query::modes::parse_multi(&query)
428 .map_err(|e| Status::invalid_argument(format!("parse error: {e}")))?;
429 let bound = crate::storage::query::user_params::bind(&parsed, &binds)
430 .map_err(|e| Status::invalid_argument(format!("bind error: {e}")))?;
431 runtime.execute_query_expr(bound).map_err(to_status)
432}
433
434#[cfg(test)]
435mod grpc_query_value_tests {
436 use super::*;
437 use proto::query_value::Kind;
438
439 #[test]
440 fn grpc_query_value_maps_to_schema_value_variants() {
441 let cases = vec![
442 (
443 QueryValue {
444 kind: Some(Kind::NullValue(proto::QueryNull {})),
445 },
446 Value::Null,
447 ),
448 (
449 QueryValue {
450 kind: Some(Kind::BoolValue(true)),
451 },
452 Value::Boolean(true),
453 ),
454 (
455 QueryValue {
456 kind: Some(Kind::IntValue(42)),
457 },
458 Value::Integer(42),
459 ),
460 (
461 QueryValue {
462 kind: Some(Kind::FloatValue(1.5)),
463 },
464 Value::Float(1.5),
465 ),
466 (
467 QueryValue {
468 kind: Some(Kind::BytesValue(vec![0, 1, 2])),
469 },
470 Value::Blob(vec![0, 1, 2]),
471 ),
472 (
473 QueryValue {
474 kind: Some(Kind::VectorValue(proto::QueryVector {
475 values: vec![0.25, 0.5],
476 })),
477 },
478 Value::Vector(vec![0.25, 0.5]),
479 ),
480 (
481 QueryValue {
482 kind: Some(Kind::TimestampValue(1_779_999_000)),
483 },
484 Value::Timestamp(1_779_999_000),
485 ),
486 (
487 QueryValue {
488 kind: Some(Kind::UuidValue(vec![0x11; 16])),
489 },
490 Value::Uuid([0x11; 16]),
491 ),
492 ];
493
494 for (input, expected) in cases {
495 assert_eq!(grpc_query_value_to_schema_value(input).unwrap(), expected);
496 }
497
498 assert_eq!(
499 grpc_query_value_to_schema_value(QueryValue {
500 kind: Some(Kind::TextValue("alice".into())),
501 })
502 .unwrap(),
503 Value::Text(std::sync::Arc::from("alice"))
504 );
505 assert_eq!(
506 grpc_query_value_to_schema_value(QueryValue {
507 kind: Some(Kind::JsonValue("{\"role\":\"admin\"}".into())),
508 })
509 .unwrap(),
510 Value::Json(b"{\"role\":\"admin\"}".to_vec())
511 );
512 }
513
514 #[test]
515 fn grpc_query_value_rejects_missing_kind_and_bad_uuid() {
516 assert!(grpc_query_value_to_schema_value(QueryValue { kind: None }).is_err());
517 assert!(grpc_query_value_to_schema_value(QueryValue {
518 kind: Some(Kind::UuidValue(vec![0; 15])),
519 })
520 .is_err());
521 }
522}
523
524#[cfg(test)]
525mod grpc_ask_query_reply_tests {
526 use super::*;
527 use crate::storage::query::modes::QueryMode;
528 use crate::storage::query::unified::{UnifiedRecord, UnifiedResult};
529 use crate::storage::schema::Value as SchemaValue;
530
531 fn ask_runtime_result() -> RuntimeQueryResult {
532 let mut result = UnifiedResult::with_columns(vec![
533 "answer".into(),
534 "provider".into(),
535 "model".into(),
536 "mode".into(),
537 "retry_count".into(),
538 "prompt_tokens".into(),
539 "completion_tokens".into(),
540 "sources_flat".into(),
541 "citations".into(),
542 "validation".into(),
543 ]);
544 let mut record = UnifiedRecord::new();
545 record.set("answer", SchemaValue::text("Deploy failed [^1]."));
546 record.set("provider", SchemaValue::text("openai"));
547 record.set("model", SchemaValue::text("gpt-4o-mini"));
548 record.set("mode", SchemaValue::text("strict"));
549 record.set("retry_count", SchemaValue::Integer(0));
550 record.set("prompt_tokens", SchemaValue::Integer(11));
551 record.set("completion_tokens", SchemaValue::Integer(7));
552 record.set(
553 "sources_flat",
554 SchemaValue::Json(
555 br#"[{"urn":"urn:reddb:row:deployments:1","kind":"row","collection":"deployments","id":"1"}]"#.to_vec(),
556 ),
557 );
558 record.set(
559 "citations",
560 SchemaValue::Json(br#"[{"marker":1,"urn":"urn:reddb:row:deployments:1"}]"#.to_vec()),
561 );
562 record.set(
563 "validation",
564 SchemaValue::Json(br#"{"ok":true,"warnings":[],"errors":[]}"#.to_vec()),
565 );
566 result.push(record);
567
568 RuntimeQueryResult {
569 query: "ASK 'why did deploy fail?'".to_string(),
570 mode: QueryMode::Sql,
571 statement: "ask",
572 engine: "runtime-ai",
573 result,
574 affected_rows: 0,
575 statement_type: "select",
576 }
577 }
578
579 #[test]
580 fn query_reply_ask_result_json_uses_full_canonical_schema() {
581 let reply = query_reply(ask_runtime_result(), &None, &None);
582 let json: crate::json::Value =
583 crate::json::from_str(&reply.result_json).expect("valid ask json");
584
585 assert_eq!(
586 json.get("answer").and_then(crate::json::Value::as_str),
587 Some("Deploy failed [^1].")
588 );
589 assert_eq!(
590 json.get("cache_hit").and_then(crate::json::Value::as_bool),
591 Some(false)
592 );
593 assert_eq!(
594 json.get("cost_usd").and_then(crate::json::Value::as_f64),
595 Some(0.0)
596 );
597 assert_eq!(
598 json.get("mode").and_then(crate::json::Value::as_str),
599 Some("strict")
600 );
601 assert_eq!(
602 json.get("retry_count").and_then(crate::json::Value::as_u64),
603 Some(0)
604 );
605 assert!(
606 json.get("records").is_none(),
607 "ASK must not be row-wrapped: {}",
608 reply.result_json
609 );
610 assert!(
611 json.get("sources_flat")
612 .and_then(crate::json::Value::as_array)
613 .is_some_and(|sources| sources.len() == 1
614 && sources[0]
615 .get("payload")
616 .and_then(crate::json::Value::as_str)
617 .is_some()),
618 "sources_flat must be parsed with payload fallback: {}",
619 reply.result_json
620 );
621 assert!(
622 json.get("citations")
623 .and_then(crate::json::Value::as_array)
624 .is_some_and(|citations| citations.len() == 1),
625 "citations must be parsed: {}",
626 reply.result_json
627 );
628 assert_eq!(
629 json.get("validation")
630 .and_then(|v| v.get("ok"))
631 .and_then(crate::json::Value::as_bool),
632 Some(true)
633 );
634 }
635
636 #[test]
637 fn query_reply_non_ask_answer_column_keeps_row_shape() {
638 let mut result = UnifiedResult::with_columns(vec!["answer".into()]);
639 let mut record = UnifiedRecord::new();
640 record.set("answer", SchemaValue::text("plain select"));
641 result.push(record);
642
643 let reply = query_reply(
644 RuntimeQueryResult {
645 query: "SELECT 'plain select' AS answer".to_string(),
646 mode: QueryMode::Sql,
647 statement: "select",
648 engine: "runtime-sql",
649 result,
650 affected_rows: 0,
651 statement_type: "select",
652 },
653 &None,
654 &None,
655 );
656 let json: crate::json::Value =
657 crate::json::from_str(&reply.result_json).expect("valid query json");
658
659 assert!(
660 json.get("records").is_some(),
661 "non-ASK must stay row-wrapped"
662 );
663 assert!(
664 json.get("answer").is_none(),
665 "non-ASK must not use ASK envelope"
666 );
667 }
668}
669
670fn log_grpc_tls_identity(tls: &GrpcTlsOptions) {
673 use sha2::{Digest, Sha256};
674 let cert_fp = {
675 let mut h = Sha256::new();
676 h.update(&tls.cert_pem);
677 let digest = h.finalize();
678 let mut buf = String::with_capacity(64);
681 for b in digest.iter() {
682 buf.push_str(&format!("{b:02x}"));
683 }
684 buf
685 };
686 tracing::info!(
687 target: "reddb::security",
688 transport = "grpc",
689 cert_sha256 = %cert_fp,
690 mtls = tls.client_ca_pem.is_some(),
691 "gRPC TLS identity loaded"
692 );
693}
694
695include!("grpc/service_impl.rs");