1use std::sync::Arc;
4use std::time::Duration;
5
6use tracing::{debug, instrument, warn};
7
8use ormdb_core::metrics::SharedMetricsRegistry;
9use ormdb_core::query::{AggregateExecutor, ExplainService};
10use ormdb_proto::{
11 error_codes, AggregateQuery, CacheMetrics, EntityCount, EntityQueryCount, MetricsResult,
12 MutationMetrics, Operation, QueryMetrics, ReplicationRole, ReplicationStatus, Request,
13 Response, StorageMetrics, StreamChangesRequest, StreamChangesResponse, TransportMetrics,
14};
15
16use crate::database::Database;
17use crate::error::Error;
18use crate::mutation::MutationExecutor;
19
20const STATS_REFRESH_INTERVAL: Duration = Duration::from_secs(60);
21
22pub struct RequestHandler {
24 database: Arc<Database>,
25 metrics: Option<SharedMetricsRegistry>,
26}
27
28impl RequestHandler {
29 pub fn new(database: Arc<Database>) -> Self {
31 Self {
32 database,
33 metrics: None,
34 }
35 }
36
37 pub fn with_metrics(database: Arc<Database>, metrics: SharedMetricsRegistry) -> Self {
39 Self {
40 database,
41 metrics: Some(metrics),
42 }
43 }
44
45 #[instrument(skip(self, request), fields(request_id = request.id, op = ?std::mem::discriminant(&request.operation)))]
47 pub fn handle(&self, request: &Request) -> Response {
48 let start = std::time::Instant::now();
49 let result = self.handle_inner(request);
50
51 let response = match result {
52 Ok(response) => response,
53 Err(e) => self.error_response(request.id, e),
54 };
55
56 debug!(duration_us = start.elapsed().as_micros() as u64, success = response.status.is_ok(), "request handled");
57 response
58 }
59
60 fn handle_inner(&self, request: &Request) -> Result<Response, Error> {
62 if matches!(
64 request.operation,
65 Operation::Query(_) | Operation::Mutate(_) | Operation::MutateBatch(_)
66 ) {
67 let server_version = self.database.schema_version();
68 if request.schema_version != 0 && request.schema_version != server_version {
69 return Ok(Response::error(
70 request.id,
71 error_codes::SCHEMA_MISMATCH,
72 format!(
73 "schema version mismatch: client has {}, server has {}",
74 request.schema_version, server_version
75 ),
76 ));
77 }
78 }
79
80 match &request.operation {
81 Operation::Query(query) => self.handle_query(request.id, query),
82 Operation::Mutate(mutation) => self.handle_mutate(request.id, mutation),
83 Operation::MutateBatch(batch) => self.handle_batch(request.id, batch),
84 Operation::GetSchema => self.handle_get_schema(request.id),
85 Operation::Ping => Ok(Response::pong(request.id)),
86 Operation::Explain(query) => self.handle_explain(request.id, query),
87 Operation::GetMetrics => self.handle_get_metrics(request.id),
88 Operation::Aggregate(query) => self.handle_aggregate(request.id, query),
89 Operation::Subscribe(_) | Operation::Unsubscribe { .. } => {
90 Ok(Response::error(
92 request.id,
93 error_codes::INVALID_REQUEST,
94 "pub-sub operations not yet available on this handler",
95 ))
96 }
97 Operation::StreamChanges(req) => self.handle_stream_changes(request.id, req),
98 Operation::GetReplicationStatus => self.handle_replication_status(request.id),
99 }
100 }
101
102 #[instrument(skip(self, query), fields(entity = %query.root_entity))]
104 fn handle_query(
105 &self,
106 request_id: u64,
107 query: &ormdb_proto::GraphQuery,
108 ) -> Result<Response, Error> {
109 if let Err(e) = self
110 .database
111 .refresh_statistics_if_stale(STATS_REFRESH_INTERVAL)
112 {
113 warn!(error = %e, "Failed to refresh statistics");
114 }
115
116 let executor = if let Some(metrics) = &self.metrics {
117 self.database.executor_with_metrics(metrics.clone())
118 } else {
119 self.database.executor()
120 };
121 let statistics = self.database.statistics();
122 let cache = self.database.plan_cache();
123 let result = executor
124 .execute_with_cache(query, cache, Some(statistics))
125 .map_err(|e| Error::Database(format!("query execution failed: {}", e)))?;
126
127 debug!(entities_returned = result.entities.get(0).map(|e| e.len()).unwrap_or(0), "query completed");
128 Ok(Response::query_ok(request_id, result))
129 }
130
131 #[instrument(skip(self, query), fields(entity = %query.root_entity))]
133 fn handle_aggregate(
134 &self,
135 request_id: u64,
136 query: &AggregateQuery,
137 ) -> Result<Response, Error> {
138 let executor = AggregateExecutor::new(
139 self.database.storage(),
140 self.database.columnar(),
141 );
142 let result = executor
143 .execute(query)
144 .map_err(|e| Error::Database(format!("aggregate query failed: {}", e)))?;
145
146 debug!(entity = %query.root_entity, aggregations = query.aggregations.len(), "aggregate query completed");
147 Ok(Response::aggregate_ok(request_id, result))
148 }
149
150 #[instrument(skip(self, mutation), fields(entity = %mutation.entity(), mutation_type = ?std::mem::discriminant(mutation)))]
152 fn handle_mutate(
153 &self,
154 request_id: u64,
155 mutation: &ormdb_proto::Mutation,
156 ) -> Result<Response, Error> {
157 let executor = MutationExecutor::new(&self.database);
158 let result = executor.execute(mutation)?;
159
160 debug!(affected = result.affected, "mutation completed");
161 Ok(Response::mutation_ok(request_id, result))
162 }
163
164 fn handle_batch(
166 &self,
167 request_id: u64,
168 batch: &ormdb_proto::MutationBatch,
169 ) -> Result<Response, Error> {
170 let executor = MutationExecutor::new(&self.database);
171 let result = executor.execute_batch(batch)?;
172
173 Ok(Response::mutation_ok(request_id, result))
174 }
175
176 fn handle_get_schema(&self, request_id: u64) -> Result<Response, Error> {
178 let version = self.database.schema_version();
179
180 let data = if version == 0 {
181 Vec::new()
183 } else {
184 let schema = self
186 .database
187 .catalog()
188 .current_schema()
189 .map_err(|e| Error::Database(format!("failed to get schema: {}", e)))?
190 .ok_or_else(|| {
191 Error::Database("schema version is non-zero but no schema found".to_string())
192 })?;
193
194 schema
195 .to_bytes()
196 .map_err(|e| Error::Database(format!("failed to serialize schema: {}", e)))?
197 };
198
199 Ok(Response::schema_ok(request_id, version, data))
200 }
201
202 fn handle_explain(
204 &self,
205 request_id: u64,
206 query: &ormdb_proto::GraphQuery,
207 ) -> Result<Response, Error> {
208 if let Err(e) = self
209 .database
210 .refresh_statistics_if_stale(STATS_REFRESH_INTERVAL)
211 {
212 warn!(error = %e, "Failed to refresh statistics");
213 }
214
215 let catalog = self.database.catalog();
216 let statistics = self.database.statistics();
217 let cache = self.database.plan_cache();
218
219 let service = ExplainService::new(catalog)
220 .with_statistics(statistics)
221 .with_cache(cache);
222
223 let result = service
224 .explain(query)
225 .map_err(|e| Error::Database(format!("explain failed: {}", e)))?;
226
227 Ok(Response::explain_ok(request_id, result))
228 }
229
230 fn handle_get_metrics(&self, request_id: u64) -> Result<Response, Error> {
232 let result = self.collect_metrics();
233 Ok(Response::metrics_ok(request_id, result))
234 }
235
236 fn collect_metrics(&self) -> MetricsResult {
238 let (uptime_secs, query_metrics, mutations, cache) = if let Some(ref registry) = self.metrics {
240 let queries_by_entity: Vec<EntityQueryCount> = registry
241 .queries_by_entity()
242 .into_iter()
243 .map(|(entity, count)| EntityQueryCount { entity, count })
244 .collect();
245
246 (
247 registry.uptime_secs(),
248 QueryMetrics {
249 total_count: registry.query_count(),
250 avg_duration_us: registry.avg_query_latency_us(),
251 p50_duration_us: registry.p50_query_latency_us(),
252 p99_duration_us: registry.p99_query_latency_us(),
253 max_duration_us: registry.max_query_latency_us(),
254 by_entity: queries_by_entity,
255 },
256 MutationMetrics {
257 total_count: registry.mutation_count(),
258 inserts: registry.insert_count(),
259 updates: registry.update_count(),
260 deletes: registry.delete_count(),
261 upserts: registry.upsert_count(),
262 rows_affected: registry.rows_affected(),
263 },
264 CacheMetrics {
265 hits: registry.cache_hits(),
266 misses: registry.cache_misses(),
267 hit_rate: registry.cache_hit_rate(),
268 size: self.database.plan_cache().len() as u64,
269 capacity: 1000, evictions: registry.cache_evictions(),
271 },
272 )
273 } else {
274 (
276 0,
277 QueryMetrics::default(),
278 MutationMetrics::default(),
279 CacheMetrics::default(),
280 )
281 };
282
283 let statistics = self.database.statistics();
285 let entity_counts: Vec<EntityCount> = statistics
286 .snapshot()
287 .into_iter()
288 .map(|(entity, count)| EntityCount { entity, count })
289 .collect();
290
291 let total_entities: u64 = entity_counts.iter().map(|e| e.count).sum();
292
293 MetricsResult::new(
294 uptime_secs,
295 query_metrics,
296 mutations,
297 cache,
298 StorageMetrics {
299 entity_counts,
300 total_entities,
301 size_bytes: None,
302 active_transactions: 0,
303 },
304 TransportMetrics::default(),
305 )
306 }
307
308 fn handle_stream_changes(
310 &self,
311 request_id: u64,
312 req: &StreamChangesRequest,
313 ) -> Result<Response, Error> {
314 let changelog = self.database.changelog();
315
316 let (entries, has_more) = if let Some(ref filter) = req.entity_filter {
318 changelog.scan_filtered(req.from_lsn, req.batch_size as usize, Some(filter))
319 } else {
320 changelog.scan_batch(req.from_lsn, req.batch_size as usize)
321 }
322 .map_err(|e| Error::Database(format!("failed to scan changelog: {}", e)))?;
323
324 let next_lsn = entries.last().map(|e| e.lsn + 1).unwrap_or(req.from_lsn);
326
327 let response = StreamChangesResponse::new(entries, next_lsn, has_more);
328 Ok(Response::stream_changes_ok(request_id, response))
329 }
330
331 fn handle_replication_status(&self, request_id: u64) -> Result<Response, Error> {
333 let changelog = self.database.changelog();
334 let current_lsn = changelog.current_lsn();
335
336 let status = ReplicationStatus::new(ReplicationRole::Standalone, current_lsn);
338
339 Ok(Response::replication_status_ok(request_id, status))
340 }
341
342 fn error_response(&self, request_id: u64, error: Error) -> Response {
344 let (code, message) = match &error {
345 Error::Database(msg) => {
346 if msg.contains("not found") {
347 (error_codes::NOT_FOUND, msg.clone())
348 } else {
349 (error_codes::INTERNAL, msg.clone())
350 }
351 }
352 Error::Storage(e) => (error_codes::INTERNAL, e.to_string()),
353 Error::Protocol(e) => (error_codes::INVALID_REQUEST, e.to_string()),
354 Error::Transport(msg) => (error_codes::INTERNAL, msg.clone()),
355 Error::Config(msg) => (error_codes::INTERNAL, msg.clone()),
356 Error::Io(e) => (error_codes::INTERNAL, e.to_string()),
357 };
358
359 Response::error(request_id, code, message)
360 }
361}
362
363#[cfg(test)]
364mod tests {
365 use super::*;
366 use ormdb_core::catalog::{EntityDef, FieldDef, FieldType, ScalarType, SchemaBundle};
367 use ormdb_proto::{FieldValue, GraphQuery, Mutation, MutationBatch, ResponsePayload, Status};
368
369 fn setup_test_db() -> (tempfile::TempDir, Arc<Database>) {
370 let dir = tempfile::tempdir().unwrap();
371 let db = Database::open(dir.path()).unwrap();
372
373 let schema = SchemaBundle::new(1).with_entity(
375 EntityDef::new("User", "id")
376 .with_field(FieldDef::new("id", FieldType::Scalar(ScalarType::Uuid)))
377 .with_field(FieldDef::new("name", FieldType::Scalar(ScalarType::String)))
378 .with_field(FieldDef::new("age", FieldType::Scalar(ScalarType::Int32))),
379 );
380 db.catalog().apply_schema(schema).unwrap();
381
382 (dir, Arc::new(db))
383 }
384
385 #[test]
386 fn test_ping() {
387 let (_dir, db) = setup_test_db();
388 let handler = RequestHandler::new(db);
389
390 let request = Request::ping(1);
391 let response = handler.handle(&request);
392
393 assert_eq!(response.id, 1);
394 assert!(response.status.is_ok());
395 assert!(matches!(response.payload, ResponsePayload::Pong));
396 }
397
398 #[test]
399 fn test_get_schema() {
400 let (_dir, db) = setup_test_db();
401 let handler = RequestHandler::new(db);
402
403 let request = Request::get_schema(2);
404 let response = handler.handle(&request);
405
406 assert_eq!(response.id, 2);
407 assert!(response.status.is_ok());
408
409 if let ResponsePayload::Schema { version, data } = &response.payload {
410 assert_eq!(*version, 1);
411 assert!(!data.is_empty());
412 } else {
413 panic!("Expected Schema payload");
414 }
415 }
416
417 #[test]
418 fn test_query_empty() {
419 let (_dir, db) = setup_test_db();
420 let handler = RequestHandler::new(db);
421
422 let request = Request::query(3, 1, GraphQuery::new("User"));
423 let response = handler.handle(&request);
424
425 assert_eq!(response.id, 3);
426 assert!(response.status.is_ok());
427
428 if let ResponsePayload::Query(result) = &response.payload {
429 assert_eq!(result.entities.len(), 1);
430 assert!(result.entities[0].is_empty());
431 } else {
432 panic!("Expected Query payload");
433 }
434 }
435
436 #[test]
437 fn test_mutation_insert() {
438 let (_dir, db) = setup_test_db();
439 let handler = RequestHandler::new(db);
440
441 let mutation = Mutation::insert(
442 "User",
443 vec![
444 FieldValue::new("name", "Alice"),
445 FieldValue::new("age", 30i32),
446 ],
447 );
448 let request = Request::mutate(4, 1, mutation);
449 let response = handler.handle(&request);
450
451 assert_eq!(response.id, 4);
452 assert!(response.status.is_ok());
453
454 if let ResponsePayload::Mutation(result) = &response.payload {
455 assert_eq!(result.affected, 1);
456 assert_eq!(result.inserted_ids.len(), 1);
457 } else {
458 panic!("Expected Mutation payload");
459 }
460 }
461
462 #[test]
463 fn test_mutation_batch() {
464 let (_dir, db) = setup_test_db();
465 let handler = RequestHandler::new(db);
466
467 let batch = MutationBatch::from_mutations(vec![
468 Mutation::insert("User", vec![FieldValue::new("name", "User1")]),
469 Mutation::insert("User", vec![FieldValue::new("name", "User2")]),
470 ]);
471 let request = Request::mutate_batch(5, 1, batch);
472 let response = handler.handle(&request);
473
474 assert_eq!(response.id, 5);
475 assert!(response.status.is_ok());
476
477 if let ResponsePayload::Mutation(result) = &response.payload {
478 assert_eq!(result.affected, 2);
479 assert_eq!(result.inserted_ids.len(), 2);
480 } else {
481 panic!("Expected Mutation payload");
482 }
483 }
484
485 #[test]
486 fn test_schema_mismatch() {
487 let (_dir, db) = setup_test_db();
488 let handler = RequestHandler::new(db);
489
490 let request = Request::query(6, 99, GraphQuery::new("User"));
492 let response = handler.handle(&request);
493
494 assert_eq!(response.id, 6);
495 assert!(response.status.is_error());
496
497 if let Status::Error { code, message } = &response.status {
498 assert_eq!(*code, error_codes::SCHEMA_MISMATCH);
499 assert!(message.contains("mismatch"));
500 } else {
501 panic!("Expected error status");
502 }
503 }
504
505 #[test]
506 fn test_insert_and_query() {
507 let (_dir, db) = setup_test_db();
508 let handler = RequestHandler::new(db);
509
510 let mutation = Mutation::insert(
512 "User",
513 vec![
514 FieldValue::new("name", "Bob"),
515 FieldValue::new("age", 25i32),
516 ],
517 );
518 let insert_request = Request::mutate(7, 1, mutation);
519 let insert_response = handler.handle(&insert_request);
520 assert!(insert_response.status.is_ok());
521
522 let query_request = Request::query(8, 1, GraphQuery::new("User"));
524 let query_response = handler.handle(&query_request);
525
526 assert!(query_response.status.is_ok());
527 if let ResponsePayload::Query(result) = &query_response.payload {
528 assert_eq!(result.entities[0].len(), 1);
529 } else {
530 panic!("Expected Query payload");
531 }
532 }
533}