1pub(crate) use crate::application::json_input::{
2 json_bool_field, json_f32_field, json_string_field, json_usize_field,
3};
4pub(crate) use crate::application::{
5 AdminUseCases, CatalogUseCases, CreateEdgeInput, CreateEntityOutput, CreateNodeGraphLinkInput,
6 CreateNodeInput, CreateNodeTableLinkInput, CreateRowInput, CreateVectorInput,
7 DeleteEntityInput, EntityUseCases, ExecuteQueryInput, ExplainQueryInput, GraphCentralityInput,
8 GraphClusteringInput, GraphCommunitiesInput, GraphComponentsInput, GraphCyclesInput,
9 GraphHitsInput, GraphNeighborhoodInput, GraphPersonalizedPageRankInput, GraphShortestPathInput,
10 GraphTopologicalSortInput, GraphTraversalInput, GraphUseCases, InspectNativeArtifactInput,
11 NativeUseCases, PatchEntityInput, QueryUseCases, SearchHybridInput, SearchIvfInput,
12 SearchSimilarInput, SearchTextInput,
13};
14use std::collections::BTreeMap;
15use std::pin::Pin;
16use std::sync::Arc;
17use std::time::{SystemTime, UNIX_EPOCH};
18
19use crate::api::{RedDBOptions, RedDBResult};
20use crate::auth::middleware::{check_permission, AuthResult};
21use crate::auth::store::AuthStore;
22use crate::auth::Role;
23use crate::health::{HealthProvider, HealthState};
24use crate::json::{
25 from_str as json_from_str, to_string as json_to_string, Map, Value as JsonValue,
26};
27use crate::runtime::{
28 RedDBRuntime, RuntimeFilter, RuntimeFilterValue, RuntimeGraphCentralityAlgorithm,
29 RuntimeGraphCentralityResult, RuntimeGraphClusteringResult, RuntimeGraphCommunityAlgorithm,
30 RuntimeGraphCommunityResult, RuntimeGraphComponentsMode, RuntimeGraphComponentsResult,
31 RuntimeGraphCyclesResult, RuntimeGraphDirection, RuntimeGraphHitsResult,
32 RuntimeGraphNeighborhoodResult, RuntimeGraphPathAlgorithm, RuntimeGraphPathResult,
33 RuntimeGraphPattern, RuntimeGraphProjection, RuntimeGraphTopologicalSortResult,
34 RuntimeGraphTraversalResult, RuntimeGraphTraversalStrategy, RuntimeIvfSearchResult,
35 RuntimeQueryResult, RuntimeQueryWeights, RuntimeStats, ScanPage,
36};
37use crate::storage::schema::Value;
38use crate::storage::unified::devx::refs::{NodeRef, TableRef};
39use crate::storage::unified::{Metadata, MetadataValue};
40use crate::storage::{EntityData, EntityId, UnifiedEntity};
41use tokio_stream::wrappers::TcpListenerStream;
42use tonic::metadata::MetadataMap;
43use tonic::{Request, Response, Status};
44
45pub use reddb_grpc_proto as proto;
51
52use proto::red_db_server::{RedDb, RedDbServer};
53use proto::{
54 BatchQueryReply, BatchQueryRequest, BulkEntityReply, CollectionRequest, CollectionsReply,
55 DeleteEntityRequest, DeploymentProfileRequest, Empty, EntityReply, ExecutePreparedRequest,
56 ExportRequest, GraphProjectionUpsertRequest, HealthReply, IndexNameRequest, IndexToggleRequest,
57 JsonBulkCreateRequest, JsonCreateRequest, JsonPayloadRequest, KvWatchEvent, KvWatchRequest,
58 ManifestRequest, OperationReply, PayloadReply, PrepareQueryReply, PrepareQueryRequest,
59 QueryReply, QueryRequest, ScanEntity, ScanReply, ScanRequest, StatsReply, TopologyReply,
60 TopologyRequest, UpdateEntityRequest,
61};
62
63mod control_support;
64mod entity_ops;
65mod input_support;
66pub(crate) mod scan_json;
67
68use self::control_support::*;
69use self::entity_ops::*;
70use self::input_support::*;
71use self::scan_json::*;
72
73#[derive(Debug, Clone)]
74pub struct GrpcServerOptions {
75 pub bind_addr: String,
76 pub tls: Option<GrpcTlsOptions>,
81}
82
83#[derive(Debug, Clone)]
89pub struct GrpcTlsOptions {
90 pub cert_pem: Vec<u8>,
92 pub key_pem: Vec<u8>,
94 pub client_ca_pem: Option<Vec<u8>>,
99}
100
101impl GrpcTlsOptions {
102 pub fn to_tonic_config(
106 &self,
107 ) -> Result<tonic::transport::ServerTlsConfig, Box<dyn std::error::Error>> {
108 let identity = tonic::transport::Identity::from_pem(&self.cert_pem, &self.key_pem);
109 let mut cfg = tonic::transport::ServerTlsConfig::new().identity(identity);
110 if let Some(ca_pem) = &self.client_ca_pem {
111 cfg = cfg.client_ca_root(tonic::transport::Certificate::from_pem(ca_pem));
112 }
113 Ok(cfg)
114 }
115}
116
117impl Default for GrpcServerOptions {
118 fn default() -> Self {
119 Self {
120 bind_addr: "127.0.0.1:5555".to_string(),
121 tls: None,
122 }
123 }
124}
125
126#[derive(Clone)]
127pub struct RedDBGrpcServer {
128 runtime: RedDBRuntime,
129 options: GrpcServerOptions,
130 auth_store: Arc<AuthStore>,
131 oauth_validator: Option<Arc<crate::auth::OAuthValidator>>,
137}
138
139impl RedDBGrpcServer {
140 pub fn new(runtime: RedDBRuntime) -> Self {
141 let auth_config = crate::auth::AuthConfig::default();
142 let auth_store = Arc::new(AuthStore::new(auth_config));
143 Self::with_options(runtime, GrpcServerOptions::default(), auth_store)
144 }
145
146 pub fn from_database_options(
147 db_options: RedDBOptions,
148 options: GrpcServerOptions,
149 ) -> RedDBResult<Self> {
150 let runtime = RedDBRuntime::with_options(db_options.clone())?;
152
153 let auth_store = if db_options.auth.vault_enabled {
154 let pager = runtime.db().store().pager().cloned().ok_or_else(|| {
158 crate::api::RedDBError::Internal(
159 "vault requires a paged database (persistent mode)".into(),
160 )
161 })?;
162 let store = AuthStore::with_vault(db_options.auth.clone(), pager, None)
163 .map_err(|e| crate::api::RedDBError::Internal(e.to_string()))?;
164 Arc::new(store)
165 } else {
166 Arc::new(AuthStore::new(db_options.auth.clone()))
167 };
168 auth_store.bootstrap_from_env();
169 Ok(Self::with_options(runtime, options, auth_store))
170 }
171
172 pub fn with_options(
173 runtime: RedDBRuntime,
174 options: GrpcServerOptions,
175 auth_store: Arc<AuthStore>,
176 ) -> Self {
177 runtime.set_auth_store(Arc::clone(&auth_store));
180 Self {
181 runtime,
182 options,
183 auth_store,
184 oauth_validator: None,
185 }
186 }
187
188 pub fn with_oauth_validator(mut self, validator: Arc<crate::auth::OAuthValidator>) -> Self {
194 self.oauth_validator = Some(validator);
195 self
196 }
197
198 pub fn oauth_validator(&self) -> Option<&Arc<crate::auth::OAuthValidator>> {
200 self.oauth_validator.as_ref()
201 }
202
203 pub fn runtime(&self) -> &RedDBRuntime {
204 &self.runtime
205 }
206
207 pub fn options(&self) -> &GrpcServerOptions {
208 &self.options
209 }
210
211 pub fn auth_store(&self) -> &Arc<AuthStore> {
212 &self.auth_store
213 }
214
215 fn grpc_runtime(&self) -> GrpcRuntime {
216 GrpcRuntime {
217 runtime: self.runtime.clone(),
218 auth_store: self.auth_store.clone(),
219 prepared_registry: PreparedStatementRegistry::new(),
220 oauth_validator: self.oauth_validator.clone(),
221 }
222 }
223
224 pub async fn serve(&self) -> Result<(), Box<dyn std::error::Error>> {
225 let addr = self.options.bind_addr.parse()?;
226 let mut builder = tonic::transport::Server::builder();
227 if let Some(tls) = &self.options.tls {
228 log_grpc_tls_identity(tls);
231 builder = builder.tls_config(tls.to_tonic_config()?)?;
232 }
233 builder
234 .add_service(Self::configured_service(self.grpc_runtime()))
235 .serve(addr)
236 .await?;
237 Ok(())
238 }
239
240 pub async fn serve_on(
241 &self,
242 listener: std::net::TcpListener,
243 ) -> Result<(), Box<dyn std::error::Error>> {
244 listener.set_nonblocking(true)?;
245 let listener = tokio::net::TcpListener::from_std(listener)?;
246 let incoming = TcpListenerStream::new(listener);
247 let mut builder = tonic::transport::Server::builder();
248 if let Some(tls) = &self.options.tls {
249 log_grpc_tls_identity(tls);
250 builder = builder.tls_config(tls.to_tonic_config()?)?;
251 }
252 builder
253 .add_service(Self::configured_service(self.grpc_runtime()))
254 .serve_with_incoming(incoming)
255 .await?;
256 Ok(())
257 }
258
259 fn configured_service(runtime: GrpcRuntime) -> RedDbServer<GrpcRuntime> {
260 use tonic::codec::CompressionEncoding;
264 RedDbServer::new(runtime)
265 .max_decoding_message_size(256 * 1024 * 1024)
266 .max_encoding_message_size(256 * 1024 * 1024)
267 .accept_compressed(CompressionEncoding::Zstd)
268 .accept_compressed(CompressionEncoding::Gzip)
269 .send_compressed(CompressionEncoding::Zstd)
270 }
271}
272
273struct GrpcPreparedStatement {
275 shape: std::sync::Arc<crate::storage::query::ast::QueryExpr>,
276 parameter_count: usize,
277 created_at: std::time::Instant,
278}
279
280struct PreparedStatementRegistry {
283 map: parking_lot::RwLock<std::collections::HashMap<u64, GrpcPreparedStatement>>,
285 next_id: std::sync::atomic::AtomicU64,
286 get_count: std::sync::atomic::AtomicU64,
287}
288
289impl PreparedStatementRegistry {
290 fn new() -> Arc<Self> {
291 Arc::new(Self {
292 map: parking_lot::RwLock::new(std::collections::HashMap::new()),
293 next_id: std::sync::atomic::AtomicU64::new(1),
294 get_count: std::sync::atomic::AtomicU64::new(0),
295 })
296 }
297
298 fn prepare(&self, shape: crate::storage::query::ast::QueryExpr, parameter_count: usize) -> u64 {
299 use std::sync::atomic::Ordering;
300 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
301 let mut map = self.map.write();
302 self.evict_old_locked(&mut map);
303 map.insert(
304 id,
305 GrpcPreparedStatement {
306 shape: std::sync::Arc::new(shape),
308 parameter_count,
309 created_at: std::time::Instant::now(),
310 },
311 );
312 id
313 }
314
315 fn get_shape_and_count(
316 &self,
317 id: u64,
318 ) -> Option<(std::sync::Arc<crate::storage::query::ast::QueryExpr>, usize)> {
319 let get_count = self
322 .get_count
323 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
324 + 1;
325 if get_count.is_multiple_of(256) {
326 let mut map = self.map.write();
327 self.evict_old_locked(&mut map);
328 }
329 let map = self.map.read();
330 map.get(&id)
331 .map(|s| (std::sync::Arc::clone(&s.shape), s.parameter_count))
332 }
333
334 fn evict_old_locked(&self, map: &mut std::collections::HashMap<u64, GrpcPreparedStatement>) {
335 let threshold = std::time::Duration::from_secs(3600);
336 map.retain(|_, v| v.created_at.elapsed() < threshold);
337 }
338}
339
340#[derive(Clone)]
341struct GrpcRuntime {
342 runtime: RedDBRuntime,
343 auth_store: Arc<AuthStore>,
344 prepared_registry: Arc<PreparedStatementRegistry>,
345 oauth_validator: Option<Arc<crate::auth::OAuthValidator>>,
349}
350
351impl GrpcRuntime {
352 fn admin_use_cases(&self) -> AdminUseCases<'_, RedDBRuntime> {
353 AdminUseCases::new(&self.runtime)
354 }
355
356 fn catalog_use_cases(&self) -> CatalogUseCases<'_, RedDBRuntime> {
357 CatalogUseCases::new(&self.runtime)
358 }
359
360 fn query_use_cases(&self) -> QueryUseCases<'_, RedDBRuntime> {
361 QueryUseCases::new(&self.runtime)
362 }
363
364 fn entity_use_cases(&self) -> EntityUseCases<'_, RedDBRuntime> {
365 EntityUseCases::new(&self.runtime)
366 }
367
368 fn graph_use_cases(&self) -> GraphUseCases<'_, RedDBRuntime> {
369 GraphUseCases::new(&self.runtime)
370 }
371
372 fn native_use_cases(&self) -> NativeUseCases<'_, RedDBRuntime> {
373 NativeUseCases::new(&self.runtime)
374 }
375}
376
377fn log_grpc_tls_identity(tls: &GrpcTlsOptions) {
380 use sha2::{Digest, Sha256};
381 let cert_fp = {
382 let mut h = Sha256::new();
383 h.update(&tls.cert_pem);
384 let digest = h.finalize();
385 let mut buf = String::with_capacity(64);
388 for b in digest.iter() {
389 buf.push_str(&format!("{b:02x}"));
390 }
391 buf
392 };
393 tracing::info!(
394 target: "reddb::security",
395 transport = "grpc",
396 cert_sha256 = %cert_fp,
397 mtls = tls.client_ca_pem.is_some(),
398 "gRPC TLS identity loaded"
399 );
400}
401
402include!("grpc/service_impl.rs");