Skip to main content

ormdb_proto/
message.rs

1//! Request and response message types.
2
3use crate::explain::ExplainResult;
4use crate::metrics::MetricsResult;
5use crate::mutation::{Mutation, MutationBatch};
6use crate::query::{AggregateQuery, GraphQuery};
7use crate::replication::{ReplicationStatus, StreamChangesRequest, StreamChangesResponse};
8use crate::result::{AggregateResult, MutationResult, QueryResult};
9use rkyv::{Archive, Deserialize, Serialize};
10use serde::{Deserialize as SerdeDeserialize, Serialize as SerdeSerialize};
11
12/// A request from client to server.
13#[derive(Debug, Clone, PartialEq, Archive, Serialize, Deserialize, SerdeSerialize, SerdeDeserialize)]
14pub struct Request {
15    /// Unique request identifier for correlation.
16    pub id: u64,
17    /// Schema version the client expects.
18    pub schema_version: u64,
19    /// The operation to perform.
20    pub operation: Operation,
21}
22
23/// Operations that can be requested.
24#[derive(Debug, Clone, PartialEq, Archive, Serialize, Deserialize, SerdeSerialize, SerdeDeserialize)]
25pub enum Operation {
26    /// Execute a graph query.
27    Query(GraphQuery),
28    /// Execute a single mutation.
29    Mutate(Mutation),
30    /// Execute a batch of mutations atomically.
31    MutateBatch(MutationBatch),
32    /// Get the current schema.
33    GetSchema,
34    /// Ping the server (for health checks).
35    Ping,
36    /// Subscribe to changes on an entity or relation.
37    Subscribe(Subscription),
38    /// Unsubscribe from a previous subscription.
39    Unsubscribe {
40        /// The subscription ID to cancel.
41        subscription_id: u64,
42    },
43    /// Explain a query plan without executing it.
44    Explain(GraphQuery),
45    /// Get server metrics.
46    GetMetrics,
47    /// Execute an aggregate query.
48    Aggregate(AggregateQuery),
49    /// Stream changes from the changelog (CDC/replication).
50    StreamChanges(StreamChangesRequest),
51    /// Get replication status.
52    GetReplicationStatus,
53}
54
55/// A subscription request for change notifications.
56#[derive(Debug, Clone, PartialEq, Archive, Serialize, Deserialize, SerdeSerialize, SerdeDeserialize)]
57pub struct Subscription {
58    /// The entity type to watch for changes.
59    pub entity: String,
60    /// Optional filter to limit which changes are sent.
61    pub filter: Option<crate::query::Filter>,
62    /// Optional list of fields to include in change notifications.
63    pub fields: Option<Vec<String>>,
64    /// Include related entity changes.
65    pub include_relations: bool,
66}
67
68impl Request {
69    /// Create a query request.
70    pub fn query(id: u64, schema_version: u64, query: GraphQuery) -> Self {
71        Self {
72            id,
73            schema_version,
74            operation: Operation::Query(query),
75        }
76    }
77
78    /// Create a mutation request.
79    pub fn mutate(id: u64, schema_version: u64, mutation: Mutation) -> Self {
80        Self {
81            id,
82            schema_version,
83            operation: Operation::Mutate(mutation),
84        }
85    }
86
87    /// Create a batch mutation request.
88    pub fn mutate_batch(id: u64, schema_version: u64, batch: MutationBatch) -> Self {
89        Self {
90            id,
91            schema_version,
92            operation: Operation::MutateBatch(batch),
93        }
94    }
95
96    /// Create a get schema request.
97    pub fn get_schema(id: u64) -> Self {
98        Self {
99            id,
100            schema_version: 0, // Not relevant for schema fetch
101            operation: Operation::GetSchema,
102        }
103    }
104
105    /// Create a ping request.
106    pub fn ping(id: u64) -> Self {
107        Self {
108            id,
109            schema_version: 0,
110            operation: Operation::Ping,
111        }
112    }
113
114    /// Create a subscribe request.
115    pub fn subscribe(id: u64, schema_version: u64, subscription: Subscription) -> Self {
116        Self {
117            id,
118            schema_version,
119            operation: Operation::Subscribe(subscription),
120        }
121    }
122
123    /// Create an unsubscribe request.
124    pub fn unsubscribe(id: u64, subscription_id: u64) -> Self {
125        Self {
126            id,
127            schema_version: 0,
128            operation: Operation::Unsubscribe { subscription_id },
129        }
130    }
131
132    /// Create an explain request.
133    pub fn explain(id: u64, schema_version: u64, query: GraphQuery) -> Self {
134        Self {
135            id,
136            schema_version,
137            operation: Operation::Explain(query),
138        }
139    }
140
141    /// Create a get metrics request.
142    pub fn get_metrics(id: u64) -> Self {
143        Self {
144            id,
145            schema_version: 0,
146            operation: Operation::GetMetrics,
147        }
148    }
149
150    /// Create an aggregate query request.
151    pub fn aggregate(id: u64, schema_version: u64, query: AggregateQuery) -> Self {
152        Self {
153            id,
154            schema_version,
155            operation: Operation::Aggregate(query),
156        }
157    }
158
159    /// Create a stream changes request (CDC/replication).
160    pub fn stream_changes(id: u64, request: StreamChangesRequest) -> Self {
161        Self {
162            id,
163            schema_version: 0,
164            operation: Operation::StreamChanges(request),
165        }
166    }
167
168    /// Create a get replication status request.
169    pub fn get_replication_status(id: u64) -> Self {
170        Self {
171            id,
172            schema_version: 0,
173            operation: Operation::GetReplicationStatus,
174        }
175    }
176}
177
178impl Subscription {
179    /// Create a new subscription for an entity.
180    pub fn new(entity: impl Into<String>) -> Self {
181        Self {
182            entity: entity.into(),
183            filter: None,
184            fields: None,
185            include_relations: false,
186        }
187    }
188
189    /// Add a filter to the subscription.
190    pub fn with_filter(mut self, filter: crate::query::Filter) -> Self {
191        self.filter = Some(filter);
192        self
193    }
194
195    /// Specify which fields to include in change notifications.
196    pub fn with_fields(mut self, fields: Vec<String>) -> Self {
197        self.fields = Some(fields);
198        self
199    }
200
201    /// Include related entity changes.
202    pub fn with_relations(mut self) -> Self {
203        self.include_relations = true;
204        self
205    }
206}
207
208/// A response from server to client.
209#[derive(Debug, Clone, PartialEq, Archive, Serialize, Deserialize, SerdeSerialize, SerdeDeserialize)]
210pub struct Response {
211    /// Request ID this response correlates to.
212    pub id: u64,
213    /// Response status.
214    pub status: Status,
215    /// Response payload.
216    pub payload: ResponsePayload,
217}
218
219/// Response status.
220#[derive(Debug, Clone, PartialEq, Archive, Serialize, Deserialize, SerdeSerialize, SerdeDeserialize)]
221pub enum Status {
222    /// Request succeeded.
223    Ok,
224    /// Request failed with an error.
225    Error {
226        /// Error code for programmatic handling.
227        code: u32,
228        /// Human-readable error message.
229        message: String,
230    },
231}
232
233impl Status {
234    /// Create a success status.
235    pub fn ok() -> Self {
236        Status::Ok
237    }
238
239    /// Create an error status.
240    pub fn error(code: u32, message: impl Into<String>) -> Self {
241        Status::Error {
242            code,
243            message: message.into(),
244        }
245    }
246
247    /// Check if this is a success status.
248    pub fn is_ok(&self) -> bool {
249        matches!(self, Status::Ok)
250    }
251
252    /// Check if this is an error status.
253    pub fn is_error(&self) -> bool {
254        matches!(self, Status::Error { .. })
255    }
256}
257
258/// Response payload variants.
259#[derive(Debug, Clone, PartialEq, Archive, Serialize, Deserialize, SerdeSerialize, SerdeDeserialize)]
260pub enum ResponsePayload {
261    /// Query result.
262    Query(QueryResult),
263    /// Mutation result.
264    Mutation(MutationResult),
265    /// Schema data (serialized SchemaBundle).
266    Schema {
267        /// Schema version.
268        version: u64,
269        /// Serialized schema data.
270        data: Vec<u8>,
271    },
272    /// Pong response to ping.
273    Pong,
274    /// Subscription confirmed.
275    SubscriptionConfirmed {
276        /// The assigned subscription ID.
277        subscription_id: u64,
278    },
279    /// Subscription cancelled.
280    Unsubscribed,
281    /// Empty payload (for errors).
282    Empty,
283    /// Query explanation result.
284    Explain(ExplainResult),
285    /// Server metrics result.
286    Metrics(MetricsResult),
287    /// Aggregate query result.
288    Aggregate(AggregateResult),
289    /// Stream changes response (CDC/replication).
290    StreamChanges(StreamChangesResponse),
291    /// Replication status response.
292    ReplicationStatus(ReplicationStatus),
293}
294
295/// A change event for pub-sub notifications.
296#[derive(Debug, Clone, PartialEq, Archive, Serialize, Deserialize, SerdeSerialize, SerdeDeserialize)]
297pub struct ChangeEvent {
298    /// The subscription ID this event relates to.
299    pub subscription_id: u64,
300    /// The type of change.
301    pub change_type: ChangeType,
302    /// The entity type that changed.
303    pub entity: String,
304    /// The ID of the changed entity.
305    pub entity_id: [u8; 16],
306    /// The fields that changed (if available).
307    pub changed_fields: Vec<String>,
308    /// Schema version when the change occurred.
309    pub schema_version: u64,
310}
311
312/// Types of changes that can occur.
313#[derive(Debug, Clone, Copy, PartialEq, Eq, Archive, Serialize, Deserialize, SerdeSerialize, SerdeDeserialize)]
314pub enum ChangeType {
315    /// A new entity was inserted.
316    Insert,
317    /// An existing entity was updated.
318    Update,
319    /// An entity was deleted.
320    Delete,
321}
322
323impl Response {
324    /// Create a successful query response.
325    pub fn query_ok(id: u64, result: QueryResult) -> Self {
326        Self {
327            id,
328            status: Status::ok(),
329            payload: ResponsePayload::Query(result),
330        }
331    }
332
333    /// Create a successful mutation response.
334    pub fn mutation_ok(id: u64, result: MutationResult) -> Self {
335        Self {
336            id,
337            status: Status::ok(),
338            payload: ResponsePayload::Mutation(result),
339        }
340    }
341
342    /// Create a schema response.
343    pub fn schema_ok(id: u64, version: u64, data: Vec<u8>) -> Self {
344        Self {
345            id,
346            status: Status::ok(),
347            payload: ResponsePayload::Schema { version, data },
348        }
349    }
350
351    /// Create a pong response.
352    pub fn pong(id: u64) -> Self {
353        Self {
354            id,
355            status: Status::ok(),
356            payload: ResponsePayload::Pong,
357        }
358    }
359
360    /// Create an error response.
361    pub fn error(id: u64, code: u32, message: impl Into<String>) -> Self {
362        Self {
363            id,
364            status: Status::error(code, message),
365            payload: ResponsePayload::Empty,
366        }
367    }
368
369    /// Create a subscription confirmed response.
370    pub fn subscription_confirmed(id: u64, subscription_id: u64) -> Self {
371        Self {
372            id,
373            status: Status::ok(),
374            payload: ResponsePayload::SubscriptionConfirmed { subscription_id },
375        }
376    }
377
378    /// Create an unsubscribed response.
379    pub fn unsubscribed(id: u64) -> Self {
380        Self {
381            id,
382            status: Status::ok(),
383            payload: ResponsePayload::Unsubscribed,
384        }
385    }
386
387    /// Create a successful explain response.
388    pub fn explain_ok(id: u64, result: ExplainResult) -> Self {
389        Self {
390            id,
391            status: Status::ok(),
392            payload: ResponsePayload::Explain(result),
393        }
394    }
395
396    /// Create a successful metrics response.
397    pub fn metrics_ok(id: u64, result: MetricsResult) -> Self {
398        Self {
399            id,
400            status: Status::ok(),
401            payload: ResponsePayload::Metrics(result),
402        }
403    }
404
405    /// Create a successful aggregate query response.
406    pub fn aggregate_ok(id: u64, result: AggregateResult) -> Self {
407        Self {
408            id,
409            status: Status::ok(),
410            payload: ResponsePayload::Aggregate(result),
411        }
412    }
413
414    /// Create a successful stream changes response.
415    pub fn stream_changes_ok(id: u64, result: StreamChangesResponse) -> Self {
416        Self {
417            id,
418            status: Status::ok(),
419            payload: ResponsePayload::StreamChanges(result),
420        }
421    }
422
423    /// Create a successful replication status response.
424    pub fn replication_status_ok(id: u64, status: ReplicationStatus) -> Self {
425        Self {
426            id,
427            status: Status::ok(),
428            payload: ResponsePayload::ReplicationStatus(status),
429        }
430    }
431}
432
433/// Standard error codes.
434pub mod error_codes {
435    /// Unknown/internal error.
436    pub const INTERNAL: u32 = 1;
437    /// Invalid request format.
438    pub const INVALID_REQUEST: u32 = 2;
439    /// Entity not found.
440    pub const NOT_FOUND: u32 = 3;
441    /// Constraint violation.
442    pub const CONSTRAINT_VIOLATION: u32 = 4;
443    /// Schema version mismatch.
444    pub const SCHEMA_MISMATCH: u32 = 5;
445    /// Permission denied.
446    pub const PERMISSION_DENIED: u32 = 6;
447    /// Transaction conflict.
448    pub const CONFLICT: u32 = 7;
449    /// Query budget exceeded.
450    pub const BUDGET_EXCEEDED: u32 = 8;
451    /// Request timeout.
452    pub const TIMEOUT: u32 = 9;
453    /// Write rejected because server is a read-only replica.
454    pub const READ_ONLY_REPLICA: u32 = 10;
455    /// Invalid LSN in replication request.
456    pub const INVALID_LSN: u32 = 11;
457}
458
459#[cfg(test)]
460mod tests {
461    use super::*;
462    use crate::mutation::FieldValue;
463    use crate::query::FilterExpr;
464    use crate::result::{ColumnData, EntityBlock};
465
466    #[test]
467    fn test_query_request() {
468        let request = Request::query(
469            1,
470            5,
471            GraphQuery::new("User").with_fields(vec!["id".into(), "name".into()]),
472        );
473
474        assert_eq!(request.id, 1);
475        assert_eq!(request.schema_version, 5);
476        if let Operation::Query(query) = &request.operation {
477            assert_eq!(query.root_entity, "User");
478        } else {
479            panic!("Expected Query operation");
480        }
481    }
482
483    #[test]
484    fn test_mutation_request() {
485        let request = Request::mutate(
486            2,
487            5,
488            Mutation::insert("User", vec![FieldValue::new("name", "Alice")]),
489        );
490
491        assert_eq!(request.id, 2);
492        if let Operation::Mutate(mutation) = &request.operation {
493            assert_eq!(mutation.entity(), "User");
494        } else {
495            panic!("Expected Mutate operation");
496        }
497    }
498
499    #[test]
500    fn test_query_response() {
501        let result = QueryResult::new(
502            vec![EntityBlock::with_data(
503                "User",
504                vec![[1u8; 16]],
505                vec![ColumnData::new("name", vec!["Alice".into()])],
506            )],
507            vec![],
508            false,
509        );
510
511        let response = Response::query_ok(1, result);
512        assert_eq!(response.id, 1);
513        assert!(response.status.is_ok());
514
515        if let ResponsePayload::Query(result) = &response.payload {
516            assert_eq!(result.total_entities(), 1);
517        } else {
518            panic!("Expected Query payload");
519        }
520    }
521
522    #[test]
523    fn test_error_response() {
524        let response = Response::error(42, error_codes::NOT_FOUND, "User not found");
525
526        assert_eq!(response.id, 42);
527        assert!(response.status.is_error());
528
529        if let Status::Error { code, message } = &response.status {
530            assert_eq!(*code, error_codes::NOT_FOUND);
531            assert_eq!(message, "User not found");
532        }
533    }
534
535    #[test]
536    fn test_message_serialization_roundtrip() {
537        let request = Request::query(
538            100,
539            1,
540            GraphQuery::new("Post")
541                .with_filter(FilterExpr::eq("published", true).into()),
542        );
543
544        let bytes = rkyv::to_bytes::<rkyv::rancor::Error>(&request).unwrap();
545        let archived = rkyv::access::<ArchivedRequest, rkyv::rancor::Error>(&bytes).unwrap();
546        let deserialized: Request =
547            rkyv::deserialize::<Request, rkyv::rancor::Error>(archived).unwrap();
548
549        assert_eq!(request, deserialized);
550
551        // Test response
552        let response = Response::error(100, error_codes::INVALID_REQUEST, "Bad query");
553
554        let bytes = rkyv::to_bytes::<rkyv::rancor::Error>(&response).unwrap();
555        let archived = rkyv::access::<ArchivedResponse, rkyv::rancor::Error>(&bytes).unwrap();
556        let deserialized: Response =
557            rkyv::deserialize::<Response, rkyv::rancor::Error>(archived).unwrap();
558
559        assert_eq!(response, deserialized);
560    }
561}