oxirs_cluster/tcp_cluster/codec.rs
1//! Length-prefixed message framing for the TCP cluster harness.
2//!
3//! Wire format: `[u32 BE length][JSON body]`
4//!
5//! The same framing pattern used by the existing cluster transport in
6//! `server/oxirs-fuseki/src/clustering/node.rs`, so operators already
7//! familiar with the production code can reason about the format without
8//! additional documentation.
9
10use std::io;
11
12use serde::{Deserialize, Serialize};
13use tokio::io::{AsyncReadExt, AsyncWriteExt};
14
15// ─────────────────────────────────────────────────────────────────────────────
16// Message vocabulary
17// ─────────────────────────────────────────────────────────────────────────────
18
19/// Messages exchanged between TCP cluster nodes.
20///
21/// Each variant maps to a distinct cluster primitive:
22/// - **Gossip** — epidemic-protocol key-value propagation
23/// - **Ping** / **Pong** — heartbeat pair for liveness checking
24/// - **Replicate** / **ReplicateAck** — log replication stubs that prove
25/// the replication path works over real sockets without full Raft state
26#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
27pub enum ClusterMessage {
28 /// Gossip: propagate a key-value state entry (last-write-wins).
29 Gossip {
30 /// Node ID of the sender.
31 sender_id: String,
32 /// Key being gossiped.
33 key: String,
34 /// Opaque u64 value.
35 value: u64,
36 /// Monotonically-increasing version; higher wins on conflict.
37 version: u64,
38 },
39 /// Heartbeat ping — the receiver should reply with a matching `Pong`.
40 Ping {
41 /// Node ID of the sender.
42 sender_id: String,
43 /// Sequence number echoed back in the matching `Pong`.
44 seq: u64,
45 },
46 /// Heartbeat pong — reply to `Ping`.
47 Pong {
48 /// Node ID of the replying node.
49 sender_id: String,
50 /// Echoed sequence number from the matching `Ping`.
51 seq: u64,
52 },
53 /// Replication stub: leader proposes a log entry.
54 Replicate {
55 /// Node ID of the leader.
56 leader_id: String,
57 /// Log index (1-based).
58 index: u64,
59 /// Raft term of this entry.
60 term: u64,
61 /// CRC32 checksum of the (simulated) entry payload.
62 checksum: u64,
63 },
64 /// Acknowledgment from a follower for a `Replicate` RPC.
65 ReplicateAck {
66 /// Node ID of the acknowledging follower.
67 follower_id: String,
68 /// Log index being acknowledged.
69 index: u64,
70 /// `true` if the entry was accepted, `false` if rejected.
71 success: bool,
72 },
73}
74
75// ─────────────────────────────────────────────────────────────────────────────
76// Codec
77// ─────────────────────────────────────────────────────────────────────────────
78
79/// Zero-size type providing stateless read/write helpers for [`ClusterMessage`].
80///
81/// Wire format: `[u32 big-endian byte count][serde_json UTF-8 body]`
82pub struct MessageCodec;
83
84impl MessageCodec {
85 /// Write one [`ClusterMessage`] to `writer`.
86 ///
87 /// The bytes on the wire are:
88 /// 1. 4-byte big-endian unsigned length of the JSON body
89 /// 2. the JSON body (UTF-8)
90 ///
91 /// # Errors
92 ///
93 /// Returns `Err` if serialization fails or the underlying write fails.
94 pub async fn write<W>(writer: &mut W, msg: &ClusterMessage) -> io::Result<()>
95 where
96 W: AsyncWriteExt + Unpin,
97 {
98 let body =
99 serde_json::to_vec(msg).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
100
101 let len = u32::try_from(body.len()).map_err(|_| {
102 io::Error::new(
103 io::ErrorKind::InvalidData,
104 "message body exceeds u32::MAX bytes",
105 )
106 })?;
107
108 writer.write_all(&len.to_be_bytes()).await?;
109 writer.write_all(&body).await?;
110 writer.flush().await?;
111 Ok(())
112 }
113
114 /// Read one [`ClusterMessage`] from `reader`.
115 ///
116 /// Reads the 4-byte length prefix, allocates a buffer of that size, reads
117 /// the body, then deserialises.
118 ///
119 /// # Errors
120 ///
121 /// Returns `Err` if the read fails or the body is not valid JSON.
122 pub async fn read<R>(reader: &mut R) -> io::Result<ClusterMessage>
123 where
124 R: AsyncReadExt + Unpin,
125 {
126 let mut len_buf = [0u8; 4];
127 reader.read_exact(&mut len_buf).await?;
128 let len = u32::from_be_bytes(len_buf) as usize;
129
130 let mut body = vec![0u8; len];
131 reader.read_exact(&mut body).await?;
132
133 serde_json::from_slice(&body).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
134 }
135}
136
137// ─────────────────────────────────────────────────────────────────────────────
138// Tests
139// ─────────────────────────────────────────────────────────────────────────────
140
141#[cfg(test)]
142mod tests {
143 use super::*;
144 use tokio::io::duplex;
145
146 #[tokio::test]
147 async fn test_roundtrip_gossip() {
148 let msg = ClusterMessage::Gossip {
149 sender_id: "node-1".to_owned(),
150 key: "alpha".to_owned(),
151 value: 99,
152 version: 7,
153 };
154 let (mut client, mut server) = duplex(1024);
155 MessageCodec::write(&mut client, &msg).await.expect("write");
156 let received = MessageCodec::read(&mut server).await.expect("read");
157 assert_eq!(msg, received);
158 }
159
160 #[tokio::test]
161 async fn test_roundtrip_ping() {
162 let msg = ClusterMessage::Ping {
163 sender_id: "node-2".to_owned(),
164 seq: 42,
165 };
166 let (mut client, mut server) = duplex(1024);
167 MessageCodec::write(&mut client, &msg).await.expect("write");
168 let received = MessageCodec::read(&mut server).await.expect("read");
169 assert_eq!(msg, received);
170 }
171
172 #[tokio::test]
173 async fn test_roundtrip_replicate() {
174 let msg = ClusterMessage::Replicate {
175 leader_id: "leader".to_owned(),
176 index: 100,
177 term: 3,
178 checksum: 0xDEAD_BEEF,
179 };
180 let (mut client, mut server) = duplex(1024);
181 MessageCodec::write(&mut client, &msg).await.expect("write");
182 let received = MessageCodec::read(&mut server).await.expect("read");
183 assert_eq!(msg, received);
184 }
185
186 #[tokio::test]
187 async fn test_multiple_messages_in_sequence() {
188 let msgs = vec![
189 ClusterMessage::Ping {
190 sender_id: "a".to_owned(),
191 seq: 1,
192 },
193 ClusterMessage::Pong {
194 sender_id: "b".to_owned(),
195 seq: 1,
196 },
197 ClusterMessage::ReplicateAck {
198 follower_id: "c".to_owned(),
199 index: 5,
200 success: true,
201 },
202 ];
203 let (mut client, mut server) = duplex(4096);
204 for msg in &msgs {
205 MessageCodec::write(&mut client, msg).await.expect("write");
206 }
207 // drop client write-half to signal EOF, but read from the duplex side
208 for expected in &msgs {
209 let received = MessageCodec::read(&mut server).await.expect("read");
210 assert_eq!(expected, &received);
211 }
212 }
213}