Skip to main content

ormdb_server/
handler.rs

1//! Request handler for processing client requests.
2
3use std::sync::Arc;
4use std::time::Duration;
5
6use tracing::{debug, instrument, warn};
7
8use ormdb_core::metrics::SharedMetricsRegistry;
9use ormdb_core::query::{AggregateExecutor, ExplainService};
10use ormdb_proto::{
11    error_codes, AggregateQuery, CacheMetrics, EntityCount, EntityQueryCount, MetricsResult,
12    MutationMetrics, Operation, QueryMetrics, ReplicationRole, ReplicationStatus, Request,
13    Response, StorageMetrics, StreamChangesRequest, StreamChangesResponse, TransportMetrics,
14};
15
16use crate::database::Database;
17use crate::error::Error;
18use crate::mutation::MutationExecutor;
19
20const STATS_REFRESH_INTERVAL: Duration = Duration::from_secs(60);
21
22/// Handles incoming requests and dispatches to appropriate handlers.
23pub struct RequestHandler {
24    database: Arc<Database>,
25    metrics: Option<SharedMetricsRegistry>,
26}
27
28impl RequestHandler {
29    /// Create a new request handler with the given database.
30    pub fn new(database: Arc<Database>) -> Self {
31        Self {
32            database,
33            metrics: None,
34        }
35    }
36
37    /// Create a new request handler with metrics support.
38    pub fn with_metrics(database: Arc<Database>, metrics: SharedMetricsRegistry) -> Self {
39        Self {
40            database,
41            metrics: Some(metrics),
42        }
43    }
44
45    /// Handle a request and return a response.
46    #[instrument(skip(self, request), fields(request_id = request.id, op = ?std::mem::discriminant(&request.operation)))]
47    pub fn handle(&self, request: &Request) -> Response {
48        let start = std::time::Instant::now();
49        let result = self.handle_inner(request);
50
51        let response = match result {
52            Ok(response) => response,
53            Err(e) => self.error_response(request.id, e),
54        };
55
56        debug!(duration_us = start.elapsed().as_micros() as u64, success = response.status.is_ok(), "request handled");
57        response
58    }
59
60    /// Internal handler that can return errors.
61    fn handle_inner(&self, request: &Request) -> Result<Response, Error> {
62        // Check schema version for operations that require it
63        if matches!(
64            request.operation,
65            Operation::Query(_) | Operation::Mutate(_) | Operation::MutateBatch(_)
66        ) {
67            let server_version = self.database.schema_version();
68            if request.schema_version != 0 && request.schema_version != server_version {
69                return Ok(Response::error(
70                    request.id,
71                    error_codes::SCHEMA_MISMATCH,
72                    format!(
73                        "schema version mismatch: client has {}, server has {}",
74                        request.schema_version, server_version
75                    ),
76                ));
77            }
78        }
79
80        match &request.operation {
81            Operation::Query(query) => self.handle_query(request.id, query),
82            Operation::Mutate(mutation) => self.handle_mutate(request.id, mutation),
83            Operation::MutateBatch(batch) => self.handle_batch(request.id, batch),
84            Operation::GetSchema => self.handle_get_schema(request.id),
85            Operation::Ping => Ok(Response::pong(request.id)),
86            Operation::Explain(query) => self.handle_explain(request.id, query),
87            Operation::GetMetrics => self.handle_get_metrics(request.id),
88            Operation::Aggregate(query) => self.handle_aggregate(request.id, query),
89            Operation::Subscribe(_) | Operation::Unsubscribe { .. } => {
90                // Pub-sub operations require async handler integration (Phase 6)
91                Ok(Response::error(
92                    request.id,
93                    error_codes::INVALID_REQUEST,
94                    "pub-sub operations not yet available on this handler",
95                ))
96            }
97            Operation::StreamChanges(req) => self.handle_stream_changes(request.id, req),
98            Operation::GetReplicationStatus => self.handle_replication_status(request.id),
99        }
100    }
101
102    /// Handle a query operation.
103    #[instrument(skip(self, query), fields(entity = %query.root_entity))]
104    fn handle_query(
105        &self,
106        request_id: u64,
107        query: &ormdb_proto::GraphQuery,
108    ) -> Result<Response, Error> {
109        if let Err(e) = self
110            .database
111            .refresh_statistics_if_stale(STATS_REFRESH_INTERVAL)
112        {
113            warn!(error = %e, "Failed to refresh statistics");
114        }
115
116        let executor = if let Some(metrics) = &self.metrics {
117            self.database.executor_with_metrics(metrics.clone())
118        } else {
119            self.database.executor()
120        };
121        let statistics = self.database.statistics();
122        let cache = self.database.plan_cache();
123        let result = executor
124            .execute_with_cache(query, cache, Some(statistics))
125            .map_err(|e| Error::Database(format!("query execution failed: {}", e)))?;
126
127        debug!(entities_returned = result.entities.get(0).map(|e| e.len()).unwrap_or(0), "query completed");
128        Ok(Response::query_ok(request_id, result))
129    }
130
131    /// Handle an aggregate query operation.
132    #[instrument(skip(self, query), fields(entity = %query.root_entity))]
133    fn handle_aggregate(
134        &self,
135        request_id: u64,
136        query: &AggregateQuery,
137    ) -> Result<Response, Error> {
138        let executor = AggregateExecutor::new(
139            self.database.storage(),
140            self.database.columnar(),
141        );
142        let result = executor
143            .execute(query)
144            .map_err(|e| Error::Database(format!("aggregate query failed: {}", e)))?;
145
146        debug!(entity = %query.root_entity, aggregations = query.aggregations.len(), "aggregate query completed");
147        Ok(Response::aggregate_ok(request_id, result))
148    }
149
150    /// Handle a single mutation operation.
151    #[instrument(skip(self, mutation), fields(entity = %mutation.entity(), mutation_type = ?std::mem::discriminant(mutation)))]
152    fn handle_mutate(
153        &self,
154        request_id: u64,
155        mutation: &ormdb_proto::Mutation,
156    ) -> Result<Response, Error> {
157        let executor = MutationExecutor::new(&self.database);
158        let result = executor.execute(mutation)?;
159
160        debug!(affected = result.affected, "mutation completed");
161        Ok(Response::mutation_ok(request_id, result))
162    }
163
164    /// Handle a batch mutation operation.
165    fn handle_batch(
166        &self,
167        request_id: u64,
168        batch: &ormdb_proto::MutationBatch,
169    ) -> Result<Response, Error> {
170        let executor = MutationExecutor::new(&self.database);
171        let result = executor.execute_batch(batch)?;
172
173        Ok(Response::mutation_ok(request_id, result))
174    }
175
176    /// Handle a get schema request.
177    fn handle_get_schema(&self, request_id: u64) -> Result<Response, Error> {
178        let version = self.database.schema_version();
179
180        let data = if version == 0 {
181            // No schema applied yet
182            Vec::new()
183        } else {
184            // Get the current schema and serialize it
185            let schema = self
186                .database
187                .catalog()
188                .current_schema()
189                .map_err(|e| Error::Database(format!("failed to get schema: {}", e)))?
190                .ok_or_else(|| {
191                    Error::Database("schema version is non-zero but no schema found".to_string())
192                })?;
193
194            schema
195                .to_bytes()
196                .map_err(|e| Error::Database(format!("failed to serialize schema: {}", e)))?
197        };
198
199        Ok(Response::schema_ok(request_id, version, data))
200    }
201
202    /// Handle an explain request.
203    fn handle_explain(
204        &self,
205        request_id: u64,
206        query: &ormdb_proto::GraphQuery,
207    ) -> Result<Response, Error> {
208        if let Err(e) = self
209            .database
210            .refresh_statistics_if_stale(STATS_REFRESH_INTERVAL)
211        {
212            warn!(error = %e, "Failed to refresh statistics");
213        }
214
215        let catalog = self.database.catalog();
216        let statistics = self.database.statistics();
217        let cache = self.database.plan_cache();
218
219        let service = ExplainService::new(catalog)
220            .with_statistics(statistics)
221            .with_cache(cache);
222
223        let result = service
224            .explain(query)
225            .map_err(|e| Error::Database(format!("explain failed: {}", e)))?;
226
227        Ok(Response::explain_ok(request_id, result))
228    }
229
230    /// Handle a get metrics request.
231    fn handle_get_metrics(&self, request_id: u64) -> Result<Response, Error> {
232        let result = self.collect_metrics();
233        Ok(Response::metrics_ok(request_id, result))
234    }
235
236    /// Collect current server metrics.
237    fn collect_metrics(&self) -> MetricsResult {
238        // Get metrics from registry if available
239        let (uptime_secs, query_metrics, mutations, cache) = if let Some(ref registry) = self.metrics {
240            let queries_by_entity: Vec<EntityQueryCount> = registry
241                .queries_by_entity()
242                .into_iter()
243                .map(|(entity, count)| EntityQueryCount { entity, count })
244                .collect();
245
246            (
247                registry.uptime_secs(),
248                QueryMetrics {
249                    total_count: registry.query_count(),
250                    avg_duration_us: registry.avg_query_latency_us(),
251                    p50_duration_us: registry.p50_query_latency_us(),
252                    p99_duration_us: registry.p99_query_latency_us(),
253                    max_duration_us: registry.max_query_latency_us(),
254                    by_entity: queries_by_entity,
255                },
256                MutationMetrics {
257                    total_count: registry.mutation_count(),
258                    inserts: registry.insert_count(),
259                    updates: registry.update_count(),
260                    deletes: registry.delete_count(),
261                    upserts: registry.upsert_count(),
262                    rows_affected: registry.rows_affected(),
263                },
264                CacheMetrics {
265                    hits: registry.cache_hits(),
266                    misses: registry.cache_misses(),
267                    hit_rate: registry.cache_hit_rate(),
268                    size: self.database.plan_cache().len() as u64,
269                    capacity: 1000, // Default capacity
270                    evictions: registry.cache_evictions(),
271                },
272            )
273        } else {
274            // No metrics registry, return defaults
275            (
276                0,
277                QueryMetrics::default(),
278                MutationMetrics::default(),
279                CacheMetrics::default(),
280            )
281        };
282
283        // Get storage metrics from statistics
284        let statistics = self.database.statistics();
285        let entity_counts: Vec<EntityCount> = statistics
286            .snapshot()
287            .into_iter()
288            .map(|(entity, count)| EntityCount { entity, count })
289            .collect();
290
291        let total_entities: u64 = entity_counts.iter().map(|e| e.count).sum();
292
293        MetricsResult::new(
294            uptime_secs,
295            query_metrics,
296            mutations,
297            cache,
298            StorageMetrics {
299                entity_counts,
300                total_entities,
301                size_bytes: None,
302                active_transactions: 0,
303            },
304            TransportMetrics::default(),
305        )
306    }
307
308    /// Handle a stream changes request (CDC/replication).
309    fn handle_stream_changes(
310        &self,
311        request_id: u64,
312        req: &StreamChangesRequest,
313    ) -> Result<Response, Error> {
314        let changelog = self.database.changelog();
315
316        // Scan entries from the changelog
317        let (entries, has_more) = if let Some(ref filter) = req.entity_filter {
318            changelog.scan_filtered(req.from_lsn, req.batch_size as usize, Some(filter))
319        } else {
320            changelog.scan_batch(req.from_lsn, req.batch_size as usize)
321        }
322        .map_err(|e| Error::Database(format!("failed to scan changelog: {}", e)))?;
323
324        // Calculate next LSN
325        let next_lsn = entries.last().map(|e| e.lsn + 1).unwrap_or(req.from_lsn);
326
327        let response = StreamChangesResponse::new(entries, next_lsn, has_more);
328        Ok(Response::stream_changes_ok(request_id, response))
329    }
330
331    /// Handle a get replication status request.
332    fn handle_replication_status(&self, request_id: u64) -> Result<Response, Error> {
333        let changelog = self.database.changelog();
334        let current_lsn = changelog.current_lsn();
335
336        // For now, all servers are standalone (full replication manager comes later)
337        let status = ReplicationStatus::new(ReplicationRole::Standalone, current_lsn);
338
339        Ok(Response::replication_status_ok(request_id, status))
340    }
341
342    /// Convert an error to an error response.
343    fn error_response(&self, request_id: u64, error: Error) -> Response {
344        let (code, message) = match &error {
345            Error::Database(msg) => {
346                if msg.contains("not found") {
347                    (error_codes::NOT_FOUND, msg.clone())
348                } else {
349                    (error_codes::INTERNAL, msg.clone())
350                }
351            }
352            Error::Storage(e) => (error_codes::INTERNAL, e.to_string()),
353            Error::Protocol(e) => (error_codes::INVALID_REQUEST, e.to_string()),
354            Error::Transport(msg) => (error_codes::INTERNAL, msg.clone()),
355            Error::Config(msg) => (error_codes::INTERNAL, msg.clone()),
356            Error::Io(e) => (error_codes::INTERNAL, e.to_string()),
357        };
358
359        Response::error(request_id, code, message)
360    }
361}
362
363#[cfg(test)]
364mod tests {
365    use super::*;
366    use ormdb_core::catalog::{EntityDef, FieldDef, FieldType, ScalarType, SchemaBundle};
367    use ormdb_proto::{FieldValue, GraphQuery, Mutation, MutationBatch, ResponsePayload, Status};
368
369    fn setup_test_db() -> (tempfile::TempDir, Arc<Database>) {
370        let dir = tempfile::tempdir().unwrap();
371        let db = Database::open(dir.path()).unwrap();
372
373        // Create schema
374        let schema = SchemaBundle::new(1).with_entity(
375            EntityDef::new("User", "id")
376                .with_field(FieldDef::new("id", FieldType::Scalar(ScalarType::Uuid)))
377                .with_field(FieldDef::new("name", FieldType::Scalar(ScalarType::String)))
378                .with_field(FieldDef::new("age", FieldType::Scalar(ScalarType::Int32))),
379        );
380        db.catalog().apply_schema(schema).unwrap();
381
382        (dir, Arc::new(db))
383    }
384
385    #[test]
386    fn test_ping() {
387        let (_dir, db) = setup_test_db();
388        let handler = RequestHandler::new(db);
389
390        let request = Request::ping(1);
391        let response = handler.handle(&request);
392
393        assert_eq!(response.id, 1);
394        assert!(response.status.is_ok());
395        assert!(matches!(response.payload, ResponsePayload::Pong));
396    }
397
398    #[test]
399    fn test_get_schema() {
400        let (_dir, db) = setup_test_db();
401        let handler = RequestHandler::new(db);
402
403        let request = Request::get_schema(2);
404        let response = handler.handle(&request);
405
406        assert_eq!(response.id, 2);
407        assert!(response.status.is_ok());
408
409        if let ResponsePayload::Schema { version, data } = &response.payload {
410            assert_eq!(*version, 1);
411            assert!(!data.is_empty());
412        } else {
413            panic!("Expected Schema payload");
414        }
415    }
416
417    #[test]
418    fn test_query_empty() {
419        let (_dir, db) = setup_test_db();
420        let handler = RequestHandler::new(db);
421
422        let request = Request::query(3, 1, GraphQuery::new("User"));
423        let response = handler.handle(&request);
424
425        assert_eq!(response.id, 3);
426        assert!(response.status.is_ok());
427
428        if let ResponsePayload::Query(result) = &response.payload {
429            assert_eq!(result.entities.len(), 1);
430            assert!(result.entities[0].is_empty());
431        } else {
432            panic!("Expected Query payload");
433        }
434    }
435
436    #[test]
437    fn test_mutation_insert() {
438        let (_dir, db) = setup_test_db();
439        let handler = RequestHandler::new(db);
440
441        let mutation = Mutation::insert(
442            "User",
443            vec![
444                FieldValue::new("name", "Alice"),
445                FieldValue::new("age", 30i32),
446            ],
447        );
448        let request = Request::mutate(4, 1, mutation);
449        let response = handler.handle(&request);
450
451        assert_eq!(response.id, 4);
452        assert!(response.status.is_ok());
453
454        if let ResponsePayload::Mutation(result) = &response.payload {
455            assert_eq!(result.affected, 1);
456            assert_eq!(result.inserted_ids.len(), 1);
457        } else {
458            panic!("Expected Mutation payload");
459        }
460    }
461
462    #[test]
463    fn test_mutation_batch() {
464        let (_dir, db) = setup_test_db();
465        let handler = RequestHandler::new(db);
466
467        let batch = MutationBatch::from_mutations(vec![
468            Mutation::insert("User", vec![FieldValue::new("name", "User1")]),
469            Mutation::insert("User", vec![FieldValue::new("name", "User2")]),
470        ]);
471        let request = Request::mutate_batch(5, 1, batch);
472        let response = handler.handle(&request);
473
474        assert_eq!(response.id, 5);
475        assert!(response.status.is_ok());
476
477        if let ResponsePayload::Mutation(result) = &response.payload {
478            assert_eq!(result.affected, 2);
479            assert_eq!(result.inserted_ids.len(), 2);
480        } else {
481            panic!("Expected Mutation payload");
482        }
483    }
484
485    #[test]
486    fn test_schema_mismatch() {
487        let (_dir, db) = setup_test_db();
488        let handler = RequestHandler::new(db);
489
490        // Client has wrong schema version
491        let request = Request::query(6, 99, GraphQuery::new("User"));
492        let response = handler.handle(&request);
493
494        assert_eq!(response.id, 6);
495        assert!(response.status.is_error());
496
497        if let Status::Error { code, message } = &response.status {
498            assert_eq!(*code, error_codes::SCHEMA_MISMATCH);
499            assert!(message.contains("mismatch"));
500        } else {
501            panic!("Expected error status");
502        }
503    }
504
505    #[test]
506    fn test_insert_and_query() {
507        let (_dir, db) = setup_test_db();
508        let handler = RequestHandler::new(db);
509
510        // Insert a user
511        let mutation = Mutation::insert(
512            "User",
513            vec![
514                FieldValue::new("name", "Bob"),
515                FieldValue::new("age", 25i32),
516            ],
517        );
518        let insert_request = Request::mutate(7, 1, mutation);
519        let insert_response = handler.handle(&insert_request);
520        assert!(insert_response.status.is_ok());
521
522        // Query users
523        let query_request = Request::query(8, 1, GraphQuery::new("User"));
524        let query_response = handler.handle(&query_request);
525
526        assert!(query_response.status.is_ok());
527        if let ResponsePayload::Query(result) = &query_response.payload {
528            assert_eq!(result.entities[0].len(), 1);
529        } else {
530            panic!("Expected Query payload");
531        }
532    }
533}