1use crate::explain::ExplainResult;
4use crate::metrics::MetricsResult;
5use crate::mutation::{Mutation, MutationBatch};
6use crate::query::{AggregateQuery, GraphQuery};
7use crate::replication::{ReplicationStatus, StreamChangesRequest, StreamChangesResponse};
8use crate::result::{AggregateResult, MutationResult, QueryResult};
9use rkyv::{Archive, Deserialize, Serialize};
10use serde::{Deserialize as SerdeDeserialize, Serialize as SerdeSerialize};
11
12#[derive(Debug, Clone, PartialEq, Archive, Serialize, Deserialize, SerdeSerialize, SerdeDeserialize)]
14pub struct Request {
15 pub id: u64,
17 pub schema_version: u64,
19 pub operation: Operation,
21}
22
23#[derive(Debug, Clone, PartialEq, Archive, Serialize, Deserialize, SerdeSerialize, SerdeDeserialize)]
25pub enum Operation {
26 Query(GraphQuery),
28 Mutate(Mutation),
30 MutateBatch(MutationBatch),
32 GetSchema,
34 Ping,
36 Subscribe(Subscription),
38 Unsubscribe {
40 subscription_id: u64,
42 },
43 Explain(GraphQuery),
45 GetMetrics,
47 Aggregate(AggregateQuery),
49 StreamChanges(StreamChangesRequest),
51 GetReplicationStatus,
53}
54
55#[derive(Debug, Clone, PartialEq, Archive, Serialize, Deserialize, SerdeSerialize, SerdeDeserialize)]
57pub struct Subscription {
58 pub entity: String,
60 pub filter: Option<crate::query::Filter>,
62 pub fields: Option<Vec<String>>,
64 pub include_relations: bool,
66}
67
68impl Request {
69 pub fn query(id: u64, schema_version: u64, query: GraphQuery) -> Self {
71 Self {
72 id,
73 schema_version,
74 operation: Operation::Query(query),
75 }
76 }
77
78 pub fn mutate(id: u64, schema_version: u64, mutation: Mutation) -> Self {
80 Self {
81 id,
82 schema_version,
83 operation: Operation::Mutate(mutation),
84 }
85 }
86
87 pub fn mutate_batch(id: u64, schema_version: u64, batch: MutationBatch) -> Self {
89 Self {
90 id,
91 schema_version,
92 operation: Operation::MutateBatch(batch),
93 }
94 }
95
96 pub fn get_schema(id: u64) -> Self {
98 Self {
99 id,
100 schema_version: 0, operation: Operation::GetSchema,
102 }
103 }
104
105 pub fn ping(id: u64) -> Self {
107 Self {
108 id,
109 schema_version: 0,
110 operation: Operation::Ping,
111 }
112 }
113
114 pub fn subscribe(id: u64, schema_version: u64, subscription: Subscription) -> Self {
116 Self {
117 id,
118 schema_version,
119 operation: Operation::Subscribe(subscription),
120 }
121 }
122
123 pub fn unsubscribe(id: u64, subscription_id: u64) -> Self {
125 Self {
126 id,
127 schema_version: 0,
128 operation: Operation::Unsubscribe { subscription_id },
129 }
130 }
131
132 pub fn explain(id: u64, schema_version: u64, query: GraphQuery) -> Self {
134 Self {
135 id,
136 schema_version,
137 operation: Operation::Explain(query),
138 }
139 }
140
141 pub fn get_metrics(id: u64) -> Self {
143 Self {
144 id,
145 schema_version: 0,
146 operation: Operation::GetMetrics,
147 }
148 }
149
150 pub fn aggregate(id: u64, schema_version: u64, query: AggregateQuery) -> Self {
152 Self {
153 id,
154 schema_version,
155 operation: Operation::Aggregate(query),
156 }
157 }
158
159 pub fn stream_changes(id: u64, request: StreamChangesRequest) -> Self {
161 Self {
162 id,
163 schema_version: 0,
164 operation: Operation::StreamChanges(request),
165 }
166 }
167
168 pub fn get_replication_status(id: u64) -> Self {
170 Self {
171 id,
172 schema_version: 0,
173 operation: Operation::GetReplicationStatus,
174 }
175 }
176}
177
178impl Subscription {
179 pub fn new(entity: impl Into<String>) -> Self {
181 Self {
182 entity: entity.into(),
183 filter: None,
184 fields: None,
185 include_relations: false,
186 }
187 }
188
189 pub fn with_filter(mut self, filter: crate::query::Filter) -> Self {
191 self.filter = Some(filter);
192 self
193 }
194
195 pub fn with_fields(mut self, fields: Vec<String>) -> Self {
197 self.fields = Some(fields);
198 self
199 }
200
201 pub fn with_relations(mut self) -> Self {
203 self.include_relations = true;
204 self
205 }
206}
207
208#[derive(Debug, Clone, PartialEq, Archive, Serialize, Deserialize, SerdeSerialize, SerdeDeserialize)]
210pub struct Response {
211 pub id: u64,
213 pub status: Status,
215 pub payload: ResponsePayload,
217}
218
219#[derive(Debug, Clone, PartialEq, Archive, Serialize, Deserialize, SerdeSerialize, SerdeDeserialize)]
221pub enum Status {
222 Ok,
224 Error {
226 code: u32,
228 message: String,
230 },
231}
232
233impl Status {
234 pub fn ok() -> Self {
236 Status::Ok
237 }
238
239 pub fn error(code: u32, message: impl Into<String>) -> Self {
241 Status::Error {
242 code,
243 message: message.into(),
244 }
245 }
246
247 pub fn is_ok(&self) -> bool {
249 matches!(self, Status::Ok)
250 }
251
252 pub fn is_error(&self) -> bool {
254 matches!(self, Status::Error { .. })
255 }
256}
257
258#[derive(Debug, Clone, PartialEq, Archive, Serialize, Deserialize, SerdeSerialize, SerdeDeserialize)]
260pub enum ResponsePayload {
261 Query(QueryResult),
263 Mutation(MutationResult),
265 Schema {
267 version: u64,
269 data: Vec<u8>,
271 },
272 Pong,
274 SubscriptionConfirmed {
276 subscription_id: u64,
278 },
279 Unsubscribed,
281 Empty,
283 Explain(ExplainResult),
285 Metrics(MetricsResult),
287 Aggregate(AggregateResult),
289 StreamChanges(StreamChangesResponse),
291 ReplicationStatus(ReplicationStatus),
293}
294
295#[derive(Debug, Clone, PartialEq, Archive, Serialize, Deserialize, SerdeSerialize, SerdeDeserialize)]
297pub struct ChangeEvent {
298 pub subscription_id: u64,
300 pub change_type: ChangeType,
302 pub entity: String,
304 pub entity_id: [u8; 16],
306 pub changed_fields: Vec<String>,
308 pub schema_version: u64,
310}
311
312#[derive(Debug, Clone, Copy, PartialEq, Eq, Archive, Serialize, Deserialize, SerdeSerialize, SerdeDeserialize)]
314pub enum ChangeType {
315 Insert,
317 Update,
319 Delete,
321}
322
323impl Response {
324 pub fn query_ok(id: u64, result: QueryResult) -> Self {
326 Self {
327 id,
328 status: Status::ok(),
329 payload: ResponsePayload::Query(result),
330 }
331 }
332
333 pub fn mutation_ok(id: u64, result: MutationResult) -> Self {
335 Self {
336 id,
337 status: Status::ok(),
338 payload: ResponsePayload::Mutation(result),
339 }
340 }
341
342 pub fn schema_ok(id: u64, version: u64, data: Vec<u8>) -> Self {
344 Self {
345 id,
346 status: Status::ok(),
347 payload: ResponsePayload::Schema { version, data },
348 }
349 }
350
351 pub fn pong(id: u64) -> Self {
353 Self {
354 id,
355 status: Status::ok(),
356 payload: ResponsePayload::Pong,
357 }
358 }
359
360 pub fn error(id: u64, code: u32, message: impl Into<String>) -> Self {
362 Self {
363 id,
364 status: Status::error(code, message),
365 payload: ResponsePayload::Empty,
366 }
367 }
368
369 pub fn subscription_confirmed(id: u64, subscription_id: u64) -> Self {
371 Self {
372 id,
373 status: Status::ok(),
374 payload: ResponsePayload::SubscriptionConfirmed { subscription_id },
375 }
376 }
377
378 pub fn unsubscribed(id: u64) -> Self {
380 Self {
381 id,
382 status: Status::ok(),
383 payload: ResponsePayload::Unsubscribed,
384 }
385 }
386
387 pub fn explain_ok(id: u64, result: ExplainResult) -> Self {
389 Self {
390 id,
391 status: Status::ok(),
392 payload: ResponsePayload::Explain(result),
393 }
394 }
395
396 pub fn metrics_ok(id: u64, result: MetricsResult) -> Self {
398 Self {
399 id,
400 status: Status::ok(),
401 payload: ResponsePayload::Metrics(result),
402 }
403 }
404
405 pub fn aggregate_ok(id: u64, result: AggregateResult) -> Self {
407 Self {
408 id,
409 status: Status::ok(),
410 payload: ResponsePayload::Aggregate(result),
411 }
412 }
413
414 pub fn stream_changes_ok(id: u64, result: StreamChangesResponse) -> Self {
416 Self {
417 id,
418 status: Status::ok(),
419 payload: ResponsePayload::StreamChanges(result),
420 }
421 }
422
423 pub fn replication_status_ok(id: u64, status: ReplicationStatus) -> Self {
425 Self {
426 id,
427 status: Status::ok(),
428 payload: ResponsePayload::ReplicationStatus(status),
429 }
430 }
431}
432
433pub mod error_codes {
435 pub const INTERNAL: u32 = 1;
437 pub const INVALID_REQUEST: u32 = 2;
439 pub const NOT_FOUND: u32 = 3;
441 pub const CONSTRAINT_VIOLATION: u32 = 4;
443 pub const SCHEMA_MISMATCH: u32 = 5;
445 pub const PERMISSION_DENIED: u32 = 6;
447 pub const CONFLICT: u32 = 7;
449 pub const BUDGET_EXCEEDED: u32 = 8;
451 pub const TIMEOUT: u32 = 9;
453 pub const READ_ONLY_REPLICA: u32 = 10;
455 pub const INVALID_LSN: u32 = 11;
457}
458
459#[cfg(test)]
460mod tests {
461 use super::*;
462 use crate::mutation::FieldValue;
463 use crate::query::FilterExpr;
464 use crate::result::{ColumnData, EntityBlock};
465
466 #[test]
467 fn test_query_request() {
468 let request = Request::query(
469 1,
470 5,
471 GraphQuery::new("User").with_fields(vec!["id".into(), "name".into()]),
472 );
473
474 assert_eq!(request.id, 1);
475 assert_eq!(request.schema_version, 5);
476 if let Operation::Query(query) = &request.operation {
477 assert_eq!(query.root_entity, "User");
478 } else {
479 panic!("Expected Query operation");
480 }
481 }
482
483 #[test]
484 fn test_mutation_request() {
485 let request = Request::mutate(
486 2,
487 5,
488 Mutation::insert("User", vec![FieldValue::new("name", "Alice")]),
489 );
490
491 assert_eq!(request.id, 2);
492 if let Operation::Mutate(mutation) = &request.operation {
493 assert_eq!(mutation.entity(), "User");
494 } else {
495 panic!("Expected Mutate operation");
496 }
497 }
498
499 #[test]
500 fn test_query_response() {
501 let result = QueryResult::new(
502 vec![EntityBlock::with_data(
503 "User",
504 vec![[1u8; 16]],
505 vec![ColumnData::new("name", vec!["Alice".into()])],
506 )],
507 vec![],
508 false,
509 );
510
511 let response = Response::query_ok(1, result);
512 assert_eq!(response.id, 1);
513 assert!(response.status.is_ok());
514
515 if let ResponsePayload::Query(result) = &response.payload {
516 assert_eq!(result.total_entities(), 1);
517 } else {
518 panic!("Expected Query payload");
519 }
520 }
521
522 #[test]
523 fn test_error_response() {
524 let response = Response::error(42, error_codes::NOT_FOUND, "User not found");
525
526 assert_eq!(response.id, 42);
527 assert!(response.status.is_error());
528
529 if let Status::Error { code, message } = &response.status {
530 assert_eq!(*code, error_codes::NOT_FOUND);
531 assert_eq!(message, "User not found");
532 }
533 }
534
535 #[test]
536 fn test_message_serialization_roundtrip() {
537 let request = Request::query(
538 100,
539 1,
540 GraphQuery::new("Post")
541 .with_filter(FilterExpr::eq("published", true).into()),
542 );
543
544 let bytes = rkyv::to_bytes::<rkyv::rancor::Error>(&request).unwrap();
545 let archived = rkyv::access::<ArchivedRequest, rkyv::rancor::Error>(&bytes).unwrap();
546 let deserialized: Request =
547 rkyv::deserialize::<Request, rkyv::rancor::Error>(archived).unwrap();
548
549 assert_eq!(request, deserialized);
550
551 let response = Response::error(100, error_codes::INVALID_REQUEST, "Bad query");
553
554 let bytes = rkyv::to_bytes::<rkyv::rancor::Error>(&response).unwrap();
555 let archived = rkyv::access::<ArchivedResponse, rkyv::rancor::Error>(&bytes).unwrap();
556 let deserialized: Response =
557 rkyv::deserialize::<Response, rkyv::rancor::Error>(archived).unwrap();
558
559 assert_eq!(response, deserialized);
560 }
561}