1use bytes::Bytes;
6use kimberlite_types::{DataClass, Offset, Placement, StreamId, TenantId};
7use serde::{Deserialize, Serialize};
8
9use crate::error::{WireError, WireResult};
10use crate::frame::Frame;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
14pub struct RequestId(pub u64);
15
16impl RequestId {
17 pub fn new(id: u64) -> Self {
19 Self(id)
20 }
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct Request {
30 pub id: RequestId,
32 pub tenant_id: TenantId,
34 pub payload: RequestPayload,
36}
37
38impl Request {
39 pub fn new(id: RequestId, tenant_id: TenantId, payload: RequestPayload) -> Self {
41 Self {
42 id,
43 tenant_id,
44 payload,
45 }
46 }
47
48 pub fn to_frame(&self) -> WireResult<Frame> {
50 let payload =
51 postcard::to_allocvec(self).map_err(|e| WireError::Serialization(e.to_string()))?;
52 Ok(Frame::new(Bytes::from(payload)))
53 }
54
55 pub fn from_frame(frame: &Frame) -> WireResult<Self> {
57 postcard::from_bytes(&frame.payload).map_err(WireError::from)
58 }
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub enum RequestPayload {
64 Handshake(HandshakeRequest),
66 CreateStream(CreateStreamRequest),
68 AppendEvents(AppendEventsRequest),
70 Query(QueryRequest),
72 QueryAt(QueryAtRequest),
74 ReadEvents(ReadEventsRequest),
76 Sync(SyncRequest),
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct HandshakeRequest {
83 pub client_version: u16,
85 pub auth_token: Option<String>,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct CreateStreamRequest {
92 pub name: String,
94 pub data_class: DataClass,
96 pub placement: Placement,
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct AppendEventsRequest {
103 pub stream_id: StreamId,
105 pub events: Vec<Vec<u8>>,
107}
108
109#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct QueryRequest {
112 pub sql: String,
114 pub params: Vec<QueryParam>,
116}
117
118#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct QueryAtRequest {
121 pub sql: String,
123 pub params: Vec<QueryParam>,
125 pub position: Offset,
127}
128
129#[derive(Debug, Clone, Serialize, Deserialize)]
131pub enum QueryParam {
132 Null,
134 BigInt(i64),
136 Text(String),
138 Boolean(bool),
140 Timestamp(i64),
142}
143
144#[derive(Debug, Clone, Serialize, Deserialize)]
146pub struct ReadEventsRequest {
147 pub stream_id: StreamId,
149 pub from_offset: Offset,
151 pub max_bytes: u64,
153}
154
155#[derive(Debug, Clone, Serialize, Deserialize)]
157pub struct SyncRequest {}
158
159#[derive(Debug, Clone, Serialize, Deserialize)]
165pub struct Response {
166 pub request_id: RequestId,
168 pub payload: ResponsePayload,
170}
171
172impl Response {
173 pub fn new(request_id: RequestId, payload: ResponsePayload) -> Self {
175 Self {
176 request_id,
177 payload,
178 }
179 }
180
181 pub fn error(request_id: RequestId, code: ErrorCode, message: String) -> Self {
183 Self {
184 request_id,
185 payload: ResponsePayload::Error(ErrorResponse { code, message }),
186 }
187 }
188
189 pub fn to_frame(&self) -> WireResult<Frame> {
191 let payload =
192 postcard::to_allocvec(self).map_err(|e| WireError::Serialization(e.to_string()))?;
193 Ok(Frame::new(Bytes::from(payload)))
194 }
195
196 pub fn from_frame(frame: &Frame) -> WireResult<Self> {
198 postcard::from_bytes(&frame.payload).map_err(WireError::from)
199 }
200}
201
202#[derive(Debug, Clone, Serialize, Deserialize)]
204pub enum ResponsePayload {
205 Error(ErrorResponse),
207 Handshake(HandshakeResponse),
209 CreateStream(CreateStreamResponse),
211 AppendEvents(AppendEventsResponse),
213 Query(QueryResponse),
215 QueryAt(QueryAtResponse),
217 ReadEvents(ReadEventsResponse),
219 Sync(SyncResponse),
221}
222
223#[derive(Debug, Clone, Serialize, Deserialize)]
225pub struct ErrorResponse {
226 pub code: ErrorCode,
228 pub message: String,
230}
231
232#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
234#[repr(u16)]
235pub enum ErrorCode {
236 Unknown = 0,
238 InternalError = 1,
240 InvalidRequest = 2,
242 AuthenticationFailed = 3,
244 TenantNotFound = 4,
246 StreamNotFound = 5,
248 TableNotFound = 6,
250 QueryParseError = 7,
252 QueryExecutionError = 8,
254 PositionAhead = 9,
256 StreamAlreadyExists = 10,
258 InvalidOffset = 11,
260 StorageError = 12,
262 ProjectionLag = 13,
264 RateLimited = 14,
266 NotLeader = 15,
272}
273
274#[derive(Debug, Clone, Serialize, Deserialize)]
276pub struct HandshakeResponse {
277 pub server_version: u16,
279 pub authenticated: bool,
281 pub capabilities: Vec<String>,
283}
284
285#[derive(Debug, Clone, Serialize, Deserialize)]
287pub struct CreateStreamResponse {
288 pub stream_id: StreamId,
290}
291
292#[derive(Debug, Clone, Serialize, Deserialize)]
294pub struct AppendEventsResponse {
295 pub first_offset: Offset,
297 pub count: u32,
299}
300
301#[derive(Debug, Clone, Serialize, Deserialize)]
303pub struct QueryResponse {
304 pub columns: Vec<String>,
306 pub rows: Vec<Vec<QueryValue>>,
308}
309
310pub type QueryAtResponse = QueryResponse;
312
313#[derive(Debug, Clone, Serialize, Deserialize)]
315pub enum QueryValue {
316 Null,
318 BigInt(i64),
320 Text(String),
322 Boolean(bool),
324 Timestamp(i64),
326}
327
328#[derive(Debug, Clone, Serialize, Deserialize)]
330pub struct ReadEventsResponse {
331 pub events: Vec<Vec<u8>>,
333 pub next_offset: Option<Offset>,
335}
336
337#[derive(Debug, Clone, Serialize, Deserialize)]
339pub struct SyncResponse {
340 pub success: bool,
342}
343
344#[cfg(test)]
345mod message_tests {
346 use super::*;
347
348 #[test]
349 fn test_request_roundtrip() {
350 let request = Request::new(
351 RequestId::new(1),
352 TenantId::new(42),
353 RequestPayload::CreateStream(CreateStreamRequest {
354 name: "test-stream".to_string(),
355 data_class: DataClass::NonPHI,
356 placement: Placement::Global,
357 }),
358 );
359
360 let frame = request.to_frame().unwrap();
362
363 let decoded = Request::from_frame(&frame).unwrap();
365
366 assert_eq!(decoded.id, request.id);
367 assert_eq!(u64::from(decoded.tenant_id), 42);
368 }
369
370 #[test]
371 fn test_response_roundtrip() {
372 let response = Response::new(
373 RequestId::new(1),
374 ResponsePayload::CreateStream(CreateStreamResponse {
375 stream_id: StreamId::new(100),
376 }),
377 );
378
379 let frame = response.to_frame().unwrap();
381
382 let decoded = Response::from_frame(&frame).unwrap();
384
385 assert_eq!(decoded.request_id, response.request_id);
386 }
387
388 #[test]
389 fn test_error_response() {
390 let response = Response::error(
391 RequestId::new(1),
392 ErrorCode::StreamNotFound,
393 "stream 123 not found".to_string(),
394 );
395
396 let frame = response.to_frame().unwrap();
397 let decoded = Response::from_frame(&frame).unwrap();
398
399 if let ResponsePayload::Error(err) = decoded.payload {
400 assert_eq!(err.code, ErrorCode::StreamNotFound);
401 assert_eq!(err.message, "stream 123 not found");
402 } else {
403 panic!("expected error payload");
404 }
405 }
406
407 #[test]
408 fn test_query_params() {
409 let request = Request::new(
410 RequestId::new(2),
411 TenantId::new(1),
412 RequestPayload::Query(QueryRequest {
413 sql: "SELECT * FROM events WHERE id = $1".to_string(),
414 params: vec![
415 QueryParam::BigInt(42),
416 QueryParam::Text("hello".to_string()),
417 QueryParam::Boolean(true),
418 QueryParam::Null,
419 ],
420 }),
421 );
422
423 let frame = request.to_frame().unwrap();
424 let decoded = Request::from_frame(&frame).unwrap();
425
426 if let RequestPayload::Query(q) = decoded.payload {
427 assert_eq!(q.params.len(), 4);
428 } else {
429 panic!("expected query payload");
430 }
431 }
432}