Skip to main content

rstmdb_protocol/
message.rs

1//! JSON message types for RCP requests and responses.
2
3use crate::error::ErrorCode;
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use std::collections::HashMap;
8
9/// RCP operation types.
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
11#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
12pub enum Operation {
13    // Session management
14    Hello,
15    Auth,
16    Ping,
17    Bye,
18
19    // Server info
20    Info,
21
22    // Machine definition management
23    PutMachine,
24    GetMachine,
25    ListMachines,
26
27    // Instance lifecycle
28    CreateInstance,
29    GetInstance,
30    ListInstances,
31    DeleteInstance,
32
33    // Events
34    ApplyEvent,
35    Batch,
36
37    // Snapshots and WAL
38    SnapshotInstance,
39    WalRead,
40    WalStats,
41    Compact,
42
43    // Subscriptions
44    WatchInstance,
45    WatchAll,
46    Unwatch,
47
48    // Administrative
49    FlushAll,
50}
51
52/// Request message envelope.
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct Request {
55    /// Message type, always "request".
56    #[serde(rename = "type")]
57    pub msg_type: String,
58
59    /// Unique request ID for correlation.
60    pub id: String,
61
62    /// Operation to perform.
63    pub op: Operation,
64
65    /// Operation-specific parameters.
66    #[serde(default)]
67    pub params: Value,
68}
69
70impl Request {
71    /// Creates a new request with empty params.
72    ///
73    /// # Examples
74    ///
75    /// ```
76    /// use rstmdb_protocol::{Request, Operation};
77    /// use serde_json::json;
78    ///
79    /// let req = Request::new("1", Operation::Ping);
80    /// assert_eq!(req.op, Operation::Ping);
81    ///
82    /// let req = Request::new("2", Operation::GetInstance)
83    ///     .with_params(json!({"instance_id": "order-001"}));
84    /// ```
85    pub fn new(id: impl Into<String>, op: Operation) -> Self {
86        Self {
87            msg_type: "request".to_string(),
88            id: id.into(),
89            op,
90            params: Value::Object(Default::default()),
91        }
92    }
93
94    pub fn with_params(mut self, params: Value) -> Self {
95        self.params = params;
96        self
97    }
98}
99
100/// Response status.
101#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
102#[serde(rename_all = "lowercase")]
103pub enum ResponseStatus {
104    Ok,
105    Error,
106}
107
108/// Error details in a response.
109#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct ResponseError {
111    /// Stable error code.
112    pub code: ErrorCode,
113
114    /// Human-readable error message.
115    pub message: String,
116
117    /// Whether this error is retryable.
118    pub retryable: bool,
119
120    /// Additional error details.
121    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
122    pub details: HashMap<String, Value>,
123}
124
125impl ResponseError {
126    pub fn new(code: ErrorCode, message: impl Into<String>) -> Self {
127        Self {
128            retryable: code.is_retryable(),
129            code,
130            message: message.into(),
131            details: HashMap::new(),
132        }
133    }
134
135    pub fn with_detail(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
136        self.details.insert(key.into(), value.into());
137        self
138    }
139}
140
141/// Response metadata.
142#[derive(Debug, Clone, Default, Serialize, Deserialize)]
143pub struct ResponseMeta {
144    /// Server timestamp.
145    #[serde(skip_serializing_if = "Option::is_none")]
146    pub server_time: Option<DateTime<Utc>>,
147
148    /// Whether this server is the leader (for cluster mode).
149    #[serde(skip_serializing_if = "Option::is_none")]
150    pub leader: Option<bool>,
151
152    /// WAL offset after write operations.
153    #[serde(skip_serializing_if = "Option::is_none")]
154    pub wal_offset: Option<u64>,
155
156    /// Trace ID for distributed tracing.
157    #[serde(skip_serializing_if = "Option::is_none")]
158    pub trace_id: Option<String>,
159
160    /// Additional metadata fields (for forward compatibility).
161    #[serde(flatten)]
162    pub extra: HashMap<String, Value>,
163}
164
165/// Response message envelope.
166#[derive(Debug, Clone, Serialize, Deserialize)]
167pub struct Response {
168    /// Message type, always "response".
169    #[serde(rename = "type")]
170    pub msg_type: String,
171
172    /// Request ID this response correlates to.
173    pub id: String,
174
175    /// Response status.
176    pub status: ResponseStatus,
177
178    /// Result payload (for successful responses).
179    #[serde(skip_serializing_if = "Option::is_none")]
180    pub result: Option<Value>,
181
182    /// Error details (for error responses).
183    #[serde(skip_serializing_if = "Option::is_none")]
184    pub error: Option<ResponseError>,
185
186    /// Response metadata.
187    #[serde(default, skip_serializing_if = "is_meta_empty")]
188    pub meta: ResponseMeta,
189}
190
191fn is_meta_empty(meta: &ResponseMeta) -> bool {
192    meta.server_time.is_none()
193        && meta.leader.is_none()
194        && meta.wal_offset.is_none()
195        && meta.trace_id.is_none()
196        && meta.extra.is_empty()
197}
198
199impl Response {
200    /// Creates a successful response.
201    ///
202    /// # Examples
203    ///
204    /// ```
205    /// use rstmdb_protocol::{Response, ResponseError};
206    /// use rstmdb_protocol::ErrorCode;
207    /// use serde_json::json;
208    ///
209    /// let ok = Response::ok("1", json!({"pong": true}));
210    /// assert!(ok.is_ok());
211    ///
212    /// let err = Response::error("2", ResponseError::new(
213    ///     ErrorCode::NotFound, "not found",
214    /// ));
215    /// assert!(err.is_error());
216    /// ```
217    pub fn ok(id: impl Into<String>, result: Value) -> Self {
218        Self {
219            msg_type: "response".to_string(),
220            id: id.into(),
221            status: ResponseStatus::Ok,
222            result: Some(result),
223            error: None,
224            meta: ResponseMeta::default(),
225        }
226    }
227
228    pub fn error(id: impl Into<String>, error: ResponseError) -> Self {
229        Self {
230            msg_type: "response".to_string(),
231            id: id.into(),
232            status: ResponseStatus::Error,
233            result: None,
234            error: Some(error),
235            meta: ResponseMeta::default(),
236        }
237    }
238
239    pub fn with_meta(mut self, meta: ResponseMeta) -> Self {
240        self.meta = meta;
241        self
242    }
243
244    pub fn is_ok(&self) -> bool {
245        self.status == ResponseStatus::Ok
246    }
247
248    pub fn is_error(&self) -> bool {
249        self.status == ResponseStatus::Error
250    }
251}
252
253/// Streaming event message (for WATCH_INSTANCE, WATCH_ALL, etc.).
254#[derive(Debug, Clone, Serialize, Deserialize)]
255pub struct StreamEvent {
256    /// Message type, always "event".
257    #[serde(rename = "type")]
258    pub msg_type: String,
259
260    /// Subscription ID.
261    pub subscription_id: String,
262
263    /// Instance ID.
264    pub instance_id: String,
265
266    /// Machine name.
267    pub machine: String,
268
269    /// Machine version.
270    pub version: u32,
271
272    /// WAL offset of this event.
273    pub wal_offset: u64,
274
275    /// State before transition.
276    pub from_state: String,
277
278    /// State after transition.
279    pub to_state: String,
280
281    /// Event that triggered the transition.
282    pub event: String,
283
284    /// Event payload (optional).
285    #[serde(skip_serializing_if = "Option::is_none")]
286    pub payload: Option<Value>,
287
288    /// Instance context (optional, if requested).
289    #[serde(skip_serializing_if = "Option::is_none")]
290    pub ctx: Option<Value>,
291}
292
293// ============================================================================
294// Operation-specific parameter types
295// ============================================================================
296
297/// Parameters for HELLO request.
298#[derive(Debug, Clone, Serialize, Deserialize)]
299pub struct HelloParams {
300    pub protocol_version: u16,
301    #[serde(default)]
302    pub client_name: Option<String>,
303    #[serde(default)]
304    pub wire_modes: Vec<String>,
305    #[serde(default)]
306    pub features: Vec<String>,
307}
308
309/// Result for HELLO response.
310#[derive(Debug, Clone, Serialize, Deserialize)]
311pub struct HelloResult {
312    pub protocol_version: u16,
313    pub wire_mode: String,
314    pub server_name: String,
315    pub server_version: String,
316    pub features: Vec<String>,
317}
318
319/// Parameters for AUTH request.
320#[derive(Debug, Clone, Serialize, Deserialize)]
321pub struct AuthParams {
322    pub method: String,
323    pub token: String,
324}
325
326/// Parameters for PUT_MACHINE request.
327#[derive(Debug, Clone, Serialize, Deserialize)]
328pub struct PutMachineParams {
329    pub machine: String,
330    pub version: u32,
331    pub definition: Value,
332    #[serde(skip_serializing_if = "Option::is_none")]
333    pub checksum: Option<String>,
334}
335
336/// Result for PUT_MACHINE response.
337#[derive(Debug, Clone, Serialize, Deserialize)]
338pub struct PutMachineResult {
339    pub machine: String,
340    pub version: u32,
341    pub stored_checksum: String,
342    pub created: bool,
343}
344
345/// Parameters for GET_MACHINE request.
346#[derive(Debug, Clone, Serialize, Deserialize)]
347pub struct GetMachineParams {
348    pub machine: String,
349    pub version: u32,
350}
351
352/// Result for GET_MACHINE response.
353#[derive(Debug, Clone, Serialize, Deserialize)]
354pub struct GetMachineResult {
355    pub definition: Value,
356    pub checksum: String,
357}
358
359/// Parameters for CREATE_INSTANCE request.
360#[derive(Debug, Clone, Serialize, Deserialize)]
361pub struct CreateInstanceParams {
362    #[serde(skip_serializing_if = "Option::is_none")]
363    pub instance_id: Option<String>,
364    pub machine: String,
365    pub version: u32,
366    #[serde(default)]
367    pub initial_ctx: Value,
368    #[serde(skip_serializing_if = "Option::is_none")]
369    pub idempotency_key: Option<String>,
370}
371
372/// Result for CREATE_INSTANCE response.
373#[derive(Debug, Clone, Serialize, Deserialize)]
374pub struct CreateInstanceResult {
375    pub instance_id: String,
376    pub state: String,
377    pub wal_offset: u64,
378}
379
380/// Parameters for GET_INSTANCE request.
381#[derive(Debug, Clone, Serialize, Deserialize)]
382pub struct GetInstanceParams {
383    pub instance_id: String,
384}
385
386/// Result for GET_INSTANCE response.
387#[derive(Debug, Clone, Serialize, Deserialize)]
388pub struct GetInstanceResult {
389    pub machine: String,
390    pub version: u32,
391    pub state: String,
392    pub ctx: Value,
393    #[serde(skip_serializing_if = "Option::is_none")]
394    pub last_event_id: Option<String>,
395    pub last_wal_offset: u64,
396}
397
398/// Parameters for LIST_INSTANCES request.
399#[derive(Debug, Clone, Default, Serialize, Deserialize)]
400pub struct ListInstancesParams {
401    /// Filter by machine name.
402    #[serde(skip_serializing_if = "Option::is_none")]
403    pub machine: Option<String>,
404    /// Filter by current state.
405    #[serde(skip_serializing_if = "Option::is_none")]
406    pub state: Option<String>,
407    /// Maximum number of instances to return.
408    #[serde(skip_serializing_if = "Option::is_none")]
409    pub limit: Option<u32>,
410    /// Number of instances to skip (for pagination).
411    #[serde(skip_serializing_if = "Option::is_none")]
412    pub offset: Option<u32>,
413}
414
415/// Instance summary for list responses (excludes ctx for efficiency).
416#[derive(Debug, Clone, Serialize, Deserialize)]
417pub struct InstanceSummary {
418    pub id: String,
419    pub machine: String,
420    pub version: u32,
421    pub state: String,
422    pub created_at: i64,
423    pub updated_at: i64,
424    pub last_wal_offset: u64,
425}
426
427/// Result for LIST_INSTANCES response.
428#[derive(Debug, Clone, Serialize, Deserialize)]
429pub struct ListInstancesResult {
430    pub instances: Vec<InstanceSummary>,
431    pub total: u64,
432    pub has_more: bool,
433}
434
435/// Parameters for APPLY_EVENT request.
436#[derive(Debug, Clone, Serialize, Deserialize)]
437pub struct ApplyEventParams {
438    pub instance_id: String,
439    pub event: String,
440    #[serde(default)]
441    pub payload: Value,
442    #[serde(skip_serializing_if = "Option::is_none")]
443    pub expected_state: Option<String>,
444    #[serde(skip_serializing_if = "Option::is_none")]
445    pub expected_wal_offset: Option<u64>,
446    #[serde(skip_serializing_if = "Option::is_none")]
447    pub event_id: Option<String>,
448    #[serde(skip_serializing_if = "Option::is_none")]
449    pub idempotency_key: Option<String>,
450}
451
452/// Result for APPLY_EVENT response.
453#[derive(Debug, Clone, Serialize, Deserialize)]
454pub struct ApplyEventResult {
455    pub from_state: String,
456    pub to_state: String,
457    #[serde(skip_serializing_if = "Option::is_none")]
458    pub ctx: Option<Value>,
459    pub wal_offset: u64,
460    pub applied: bool,
461    #[serde(skip_serializing_if = "Option::is_none")]
462    pub event_id: Option<String>,
463}
464
465/// Parameters for COMPACT request.
466#[derive(Debug, Clone, Default, Serialize, Deserialize)]
467pub struct CompactParams {
468    /// Force snapshot of all instances before compaction.
469    #[serde(default)]
470    pub force_snapshot: bool,
471}
472
473/// Result for COMPACT response.
474#[derive(Debug, Clone, Serialize, Deserialize)]
475pub struct CompactResult {
476    /// Number of snapshots created.
477    pub snapshots_created: usize,
478    /// Number of WAL segments deleted.
479    pub segments_deleted: usize,
480    /// Bytes reclaimed from deleted segments.
481    pub bytes_reclaimed: u64,
482}
483
484// ============================================================================
485// Watch/Streaming parameter and result types
486// ============================================================================
487
488fn default_true() -> bool {
489    true
490}
491
492/// Parameters for WATCH_INSTANCE request.
493#[derive(Debug, Clone, Serialize, Deserialize)]
494pub struct WatchInstanceParams {
495    pub instance_id: String,
496    /// Include context in stream events (default: true).
497    #[serde(default = "default_true")]
498    pub include_ctx: bool,
499    /// Start streaming from this WAL offset (optional, for replay).
500    #[serde(skip_serializing_if = "Option::is_none")]
501    pub from_offset: Option<u64>,
502}
503
504/// Parameters for WATCH_ALL request.
505#[derive(Debug, Clone, Default, Serialize, Deserialize)]
506pub struct WatchAllParams {
507    /// Include context in stream events (default: true).
508    #[serde(default = "default_true")]
509    pub include_ctx: bool,
510    /// Start streaming from this WAL offset (optional, for replay).
511    #[serde(skip_serializing_if = "Option::is_none")]
512    pub from_offset: Option<u64>,
513    /// Filter: only these machine types (empty = all).
514    #[serde(default, skip_serializing_if = "Vec::is_empty")]
515    pub machines: Vec<String>,
516    /// Filter: only events FROM these states (empty = all).
517    #[serde(default, skip_serializing_if = "Vec::is_empty")]
518    pub from_states: Vec<String>,
519    /// Filter: only events TO these states (empty = all).
520    #[serde(default, skip_serializing_if = "Vec::is_empty")]
521    pub to_states: Vec<String>,
522    /// Filter: only these event types (empty = all).
523    #[serde(default, skip_serializing_if = "Vec::is_empty")]
524    pub events: Vec<String>,
525}
526
527/// Result for WATCH_INSTANCE response.
528#[derive(Debug, Clone, Serialize, Deserialize)]
529pub struct WatchInstanceResult {
530    pub subscription_id: String,
531    pub instance_id: String,
532    pub current_state: String,
533    pub current_wal_offset: u64,
534}
535
536/// Result for WATCH_ALL response.
537#[derive(Debug, Clone, Serialize, Deserialize)]
538pub struct WatchAllResult {
539    pub subscription_id: String,
540    /// Current WAL head offset (events after this will be streamed).
541    pub wal_offset: u64,
542}
543
544/// Parameters for UNWATCH request.
545#[derive(Debug, Clone, Serialize, Deserialize)]
546pub struct UnwatchParams {
547    pub subscription_id: String,
548}
549
550/// Result for UNWATCH response.
551#[derive(Debug, Clone, Serialize, Deserialize)]
552pub struct UnwatchResult {
553    pub subscription_id: String,
554    pub removed: bool,
555}
556
557#[cfg(test)]
558mod tests {
559    use super::*;
560
561    #[test]
562    fn test_request_serialization() {
563        let req = Request::new("1", Operation::Ping);
564        let json = serde_json::to_string(&req).unwrap();
565        assert!(json.contains(r#""op":"PING""#));
566        assert!(json.contains(r#""type":"request""#));
567    }
568
569    #[test]
570    fn test_response_ok_serialization() {
571        let resp = Response::ok("1", serde_json::json!({"pong": true}));
572        let json = serde_json::to_string(&resp).unwrap();
573        assert!(json.contains(r#""status":"ok""#));
574        assert!(json.contains(r#""pong":true"#));
575    }
576
577    #[test]
578    fn test_response_error_serialization() {
579        let err = ResponseError::new(ErrorCode::NotFound, "Instance not found")
580            .with_detail("instance_id", "i-123");
581        let resp = Response::error("1", err);
582        let json = serde_json::to_string(&resp).unwrap();
583        assert!(json.contains(r#""code":"NOT_FOUND""#));
584        assert!(json.contains(r#""retryable":false"#));
585    }
586}