Skip to main content

kimberlite_wire/
message.rs

1//! Request and response message types for the wire protocol.
2//!
3//! Messages are serialized using postcard for efficient binary encoding.
4
5use bytes::Bytes;
6use kimberlite_types::{DataClass, Offset, Placement, StreamId, TenantId};
7use serde::{Deserialize, Serialize};
8
9use crate::error::{WireError, WireResult};
10use crate::frame::Frame;
11
12/// Unique identifier for a request, used to match responses.
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
14pub struct RequestId(pub u64);
15
16impl RequestId {
17    /// Creates a new request ID.
18    pub fn new(id: u64) -> Self {
19        Self(id)
20    }
21}
22
23// ============================================================================
24// Request Types
25// ============================================================================
26
27/// A client request to the server.
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct Request {
30    /// Unique request identifier.
31    pub id: RequestId,
32    /// Tenant context for the request.
33    pub tenant_id: TenantId,
34    /// The request payload.
35    pub payload: RequestPayload,
36}
37
38impl Request {
39    /// Creates a new request.
40    pub fn new(id: RequestId, tenant_id: TenantId, payload: RequestPayload) -> Self {
41        Self {
42            id,
43            tenant_id,
44            payload,
45        }
46    }
47
48    /// Encodes the request to a frame.
49    pub fn to_frame(&self) -> WireResult<Frame> {
50        let payload =
51            postcard::to_allocvec(self).map_err(|e| WireError::Serialization(e.to_string()))?;
52        Ok(Frame::new(Bytes::from(payload)))
53    }
54
55    /// Decodes a request from a frame.
56    pub fn from_frame(frame: &Frame) -> WireResult<Self> {
57        postcard::from_bytes(&frame.payload).map_err(WireError::from)
58    }
59}
60
61/// Request payload variants.
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub enum RequestPayload {
64    /// Handshake to establish connection.
65    Handshake(HandshakeRequest),
66    /// Create a new stream.
67    CreateStream(CreateStreamRequest),
68    /// Append events to a stream.
69    AppendEvents(AppendEventsRequest),
70    /// Execute a SQL query.
71    Query(QueryRequest),
72    /// Execute a SQL query at a specific position.
73    QueryAt(QueryAtRequest),
74    /// Read events from a stream.
75    ReadEvents(ReadEventsRequest),
76    /// Sync all data to disk.
77    Sync(SyncRequest),
78}
79
80/// Handshake request to establish connection.
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct HandshakeRequest {
83    /// Client protocol version.
84    pub client_version: u16,
85    /// Optional authentication token.
86    pub auth_token: Option<String>,
87}
88
89/// Create stream request.
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct CreateStreamRequest {
92    /// Stream name.
93    pub name: String,
94    /// Data classification.
95    pub data_class: DataClass,
96    /// Placement policy.
97    pub placement: Placement,
98}
99
100/// Append events request.
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct AppendEventsRequest {
103    /// Target stream.
104    pub stream_id: StreamId,
105    /// Events to append.
106    pub events: Vec<Vec<u8>>,
107}
108
109/// SQL query request.
110#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct QueryRequest {
112    /// SQL query string.
113    pub sql: String,
114    /// Query parameters.
115    pub params: Vec<QueryParam>,
116}
117
118/// SQL query at specific position request.
119#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct QueryAtRequest {
121    /// SQL query string.
122    pub sql: String,
123    /// Query parameters.
124    pub params: Vec<QueryParam>,
125    /// Log position to query at.
126    pub position: Offset,
127}
128
129/// Query parameter value.
130#[derive(Debug, Clone, Serialize, Deserialize)]
131pub enum QueryParam {
132    /// Null value.
133    Null,
134    /// 64-bit integer.
135    BigInt(i64),
136    /// Text string.
137    Text(String),
138    /// Boolean.
139    Boolean(bool),
140    /// Timestamp (nanoseconds since epoch).
141    Timestamp(i64),
142}
143
144/// Read events request.
145#[derive(Debug, Clone, Serialize, Deserialize)]
146pub struct ReadEventsRequest {
147    /// Source stream.
148    pub stream_id: StreamId,
149    /// Starting offset (inclusive).
150    pub from_offset: Offset,
151    /// Maximum bytes to read.
152    pub max_bytes: u64,
153}
154
155/// Sync request.
156#[derive(Debug, Clone, Serialize, Deserialize)]
157pub struct SyncRequest {}
158
159// ============================================================================
160// Response Types
161// ============================================================================
162
163/// A server response to a client request.
164#[derive(Debug, Clone, Serialize, Deserialize)]
165pub struct Response {
166    /// Request ID this is responding to.
167    pub request_id: RequestId,
168    /// The response payload.
169    pub payload: ResponsePayload,
170}
171
172impl Response {
173    /// Creates a new response.
174    pub fn new(request_id: RequestId, payload: ResponsePayload) -> Self {
175        Self {
176            request_id,
177            payload,
178        }
179    }
180
181    /// Creates an error response.
182    pub fn error(request_id: RequestId, code: ErrorCode, message: String) -> Self {
183        Self {
184            request_id,
185            payload: ResponsePayload::Error(ErrorResponse { code, message }),
186        }
187    }
188
189    /// Encodes the response to a frame.
190    pub fn to_frame(&self) -> WireResult<Frame> {
191        let payload =
192            postcard::to_allocvec(self).map_err(|e| WireError::Serialization(e.to_string()))?;
193        Ok(Frame::new(Bytes::from(payload)))
194    }
195
196    /// Decodes a response from a frame.
197    pub fn from_frame(frame: &Frame) -> WireResult<Self> {
198        postcard::from_bytes(&frame.payload).map_err(WireError::from)
199    }
200}
201
202/// Response payload variants.
203#[derive(Debug, Clone, Serialize, Deserialize)]
204pub enum ResponsePayload {
205    /// Error response.
206    Error(ErrorResponse),
207    /// Handshake response.
208    Handshake(HandshakeResponse),
209    /// Create stream response.
210    CreateStream(CreateStreamResponse),
211    /// Append events response.
212    AppendEvents(AppendEventsResponse),
213    /// Query response.
214    Query(QueryResponse),
215    /// Query at response.
216    QueryAt(QueryAtResponse),
217    /// Read events response.
218    ReadEvents(ReadEventsResponse),
219    /// Sync response.
220    Sync(SyncResponse),
221}
222
223/// Error response.
224#[derive(Debug, Clone, Serialize, Deserialize)]
225pub struct ErrorResponse {
226    /// Error code.
227    pub code: ErrorCode,
228    /// Human-readable error message.
229    pub message: String,
230}
231
232/// Error codes for wire protocol errors.
233#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
234#[repr(u16)]
235pub enum ErrorCode {
236    /// Unknown error.
237    Unknown = 0,
238    /// Internal server error.
239    InternalError = 1,
240    /// Invalid request format.
241    InvalidRequest = 2,
242    /// Authentication failed.
243    AuthenticationFailed = 3,
244    /// Tenant not found.
245    TenantNotFound = 4,
246    /// Stream not found.
247    StreamNotFound = 5,
248    /// Table not found.
249    TableNotFound = 6,
250    /// Query parse error.
251    QueryParseError = 7,
252    /// Query execution error.
253    QueryExecutionError = 8,
254    /// Position ahead of current.
255    PositionAhead = 9,
256    /// Stream already exists.
257    StreamAlreadyExists = 10,
258    /// Invalid stream offset.
259    InvalidOffset = 11,
260    /// Storage error.
261    StorageError = 12,
262    /// Projection lag.
263    ProjectionLag = 13,
264    /// Rate limit exceeded.
265    RateLimited = 14,
266    /// Not the leader - client should retry on another node.
267    ///
268    /// This error is returned in cluster mode when a write request
269    /// is sent to a follower replica. The error message may include
270    /// a leader hint to help the client redirect.
271    NotLeader = 15,
272}
273
274/// Handshake response.
275#[derive(Debug, Clone, Serialize, Deserialize)]
276pub struct HandshakeResponse {
277    /// Server protocol version.
278    pub server_version: u16,
279    /// Whether authentication succeeded.
280    pub authenticated: bool,
281    /// Server capabilities.
282    pub capabilities: Vec<String>,
283}
284
285/// Create stream response.
286#[derive(Debug, Clone, Serialize, Deserialize)]
287pub struct CreateStreamResponse {
288    /// The created stream ID.
289    pub stream_id: StreamId,
290}
291
292/// Append events response.
293#[derive(Debug, Clone, Serialize, Deserialize)]
294pub struct AppendEventsResponse {
295    /// Offset of the first appended event.
296    pub first_offset: Offset,
297    /// Number of events appended.
298    pub count: u32,
299}
300
301/// Query response.
302#[derive(Debug, Clone, Serialize, Deserialize)]
303pub struct QueryResponse {
304    /// Column names.
305    pub columns: Vec<String>,
306    /// Rows of data.
307    pub rows: Vec<Vec<QueryValue>>,
308}
309
310/// Query at response (same as Query).
311pub type QueryAtResponse = QueryResponse;
312
313/// Query result value.
314#[derive(Debug, Clone, Serialize, Deserialize)]
315pub enum QueryValue {
316    /// Null value.
317    Null,
318    /// 64-bit integer.
319    BigInt(i64),
320    /// Text string.
321    Text(String),
322    /// Boolean.
323    Boolean(bool),
324    /// Timestamp (nanoseconds since epoch).
325    Timestamp(i64),
326}
327
328/// Read events response.
329#[derive(Debug, Clone, Serialize, Deserialize)]
330pub struct ReadEventsResponse {
331    /// The events.
332    pub events: Vec<Vec<u8>>,
333    /// Next offset to read from (for pagination).
334    pub next_offset: Option<Offset>,
335}
336
337/// Sync response.
338#[derive(Debug, Clone, Serialize, Deserialize)]
339pub struct SyncResponse {
340    /// Whether sync completed successfully.
341    pub success: bool,
342}
343
344#[cfg(test)]
345mod message_tests {
346    use super::*;
347
348    #[test]
349    fn test_request_roundtrip() {
350        let request = Request::new(
351            RequestId::new(1),
352            TenantId::new(42),
353            RequestPayload::CreateStream(CreateStreamRequest {
354                name: "test-stream".to_string(),
355                data_class: DataClass::NonPHI,
356                placement: Placement::Global,
357            }),
358        );
359
360        // Encode to frame
361        let frame = request.to_frame().unwrap();
362
363        // Decode from frame
364        let decoded = Request::from_frame(&frame).unwrap();
365
366        assert_eq!(decoded.id, request.id);
367        assert_eq!(u64::from(decoded.tenant_id), 42);
368    }
369
370    #[test]
371    fn test_response_roundtrip() {
372        let response = Response::new(
373            RequestId::new(1),
374            ResponsePayload::CreateStream(CreateStreamResponse {
375                stream_id: StreamId::new(100),
376            }),
377        );
378
379        // Encode to frame
380        let frame = response.to_frame().unwrap();
381
382        // Decode from frame
383        let decoded = Response::from_frame(&frame).unwrap();
384
385        assert_eq!(decoded.request_id, response.request_id);
386    }
387
388    #[test]
389    fn test_error_response() {
390        let response = Response::error(
391            RequestId::new(1),
392            ErrorCode::StreamNotFound,
393            "stream 123 not found".to_string(),
394        );
395
396        let frame = response.to_frame().unwrap();
397        let decoded = Response::from_frame(&frame).unwrap();
398
399        if let ResponsePayload::Error(err) = decoded.payload {
400            assert_eq!(err.code, ErrorCode::StreamNotFound);
401            assert_eq!(err.message, "stream 123 not found");
402        } else {
403            panic!("expected error payload");
404        }
405    }
406
407    #[test]
408    fn test_query_params() {
409        let request = Request::new(
410            RequestId::new(2),
411            TenantId::new(1),
412            RequestPayload::Query(QueryRequest {
413                sql: "SELECT * FROM events WHERE id = $1".to_string(),
414                params: vec![
415                    QueryParam::BigInt(42),
416                    QueryParam::Text("hello".to_string()),
417                    QueryParam::Boolean(true),
418                    QueryParam::Null,
419                ],
420            }),
421        );
422
423        let frame = request.to_frame().unwrap();
424        let decoded = Request::from_frame(&frame).unwrap();
425
426        if let RequestPayload::Query(q) = decoded.payload {
427            assert_eq!(q.params.len(), 4);
428        } else {
429            panic!("expected query payload");
430        }
431    }
432}