Skip to main content

oxirs_cluster/tcp_cluster/
codec.rs

1//! Length-prefixed message framing for the TCP cluster harness.
2//!
3//! Wire format: `[u32 BE length][JSON body]`
4//!
5//! The same framing pattern used by the existing cluster transport in
6//! `server/oxirs-fuseki/src/clustering/node.rs`, so operators already
7//! familiar with the production code can reason about the format without
8//! additional documentation.
9
10use std::io;
11
12use serde::{Deserialize, Serialize};
13use tokio::io::{AsyncReadExt, AsyncWriteExt};
14
15// ─────────────────────────────────────────────────────────────────────────────
16// Message vocabulary
17// ─────────────────────────────────────────────────────────────────────────────
18
19/// Messages exchanged between TCP cluster nodes.
20///
21/// Each variant maps to a distinct cluster primitive:
22/// - **Gossip** — epidemic-protocol key-value propagation
23/// - **Ping** / **Pong** — heartbeat pair for liveness checking
24/// - **Replicate** / **ReplicateAck** — log replication stubs that prove
25///   the replication path works over real sockets without full Raft state
26#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
27pub enum ClusterMessage {
28    /// Gossip: propagate a key-value state entry (last-write-wins).
29    Gossip {
30        /// Node ID of the sender.
31        sender_id: String,
32        /// Key being gossiped.
33        key: String,
34        /// Opaque u64 value.
35        value: u64,
36        /// Monotonically-increasing version; higher wins on conflict.
37        version: u64,
38    },
39    /// Heartbeat ping — the receiver should reply with a matching `Pong`.
40    Ping {
41        /// Node ID of the sender.
42        sender_id: String,
43        /// Sequence number echoed back in the matching `Pong`.
44        seq: u64,
45    },
46    /// Heartbeat pong — reply to `Ping`.
47    Pong {
48        /// Node ID of the replying node.
49        sender_id: String,
50        /// Echoed sequence number from the matching `Ping`.
51        seq: u64,
52    },
53    /// Replication stub: leader proposes a log entry.
54    Replicate {
55        /// Node ID of the leader.
56        leader_id: String,
57        /// Log index (1-based).
58        index: u64,
59        /// Raft term of this entry.
60        term: u64,
61        /// CRC32 checksum of the (simulated) entry payload.
62        checksum: u64,
63    },
64    /// Acknowledgment from a follower for a `Replicate` RPC.
65    ReplicateAck {
66        /// Node ID of the acknowledging follower.
67        follower_id: String,
68        /// Log index being acknowledged.
69        index: u64,
70        /// `true` if the entry was accepted, `false` if rejected.
71        success: bool,
72    },
73}
74
75// ─────────────────────────────────────────────────────────────────────────────
76// Codec
77// ─────────────────────────────────────────────────────────────────────────────
78
79/// Zero-size type providing stateless read/write helpers for [`ClusterMessage`].
80///
81/// Wire format: `[u32 big-endian byte count][serde_json UTF-8 body]`
82pub struct MessageCodec;
83
84impl MessageCodec {
85    /// Write one [`ClusterMessage`] to `writer`.
86    ///
87    /// The bytes on the wire are:
88    /// 1. 4-byte big-endian unsigned length of the JSON body
89    /// 2. the JSON body (UTF-8)
90    ///
91    /// # Errors
92    ///
93    /// Returns `Err` if serialization fails or the underlying write fails.
94    pub async fn write<W>(writer: &mut W, msg: &ClusterMessage) -> io::Result<()>
95    where
96        W: AsyncWriteExt + Unpin,
97    {
98        let body =
99            serde_json::to_vec(msg).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
100
101        let len = u32::try_from(body.len()).map_err(|_| {
102            io::Error::new(
103                io::ErrorKind::InvalidData,
104                "message body exceeds u32::MAX bytes",
105            )
106        })?;
107
108        writer.write_all(&len.to_be_bytes()).await?;
109        writer.write_all(&body).await?;
110        writer.flush().await?;
111        Ok(())
112    }
113
114    /// Read one [`ClusterMessage`] from `reader`.
115    ///
116    /// Reads the 4-byte length prefix, allocates a buffer of that size, reads
117    /// the body, then deserialises.
118    ///
119    /// # Errors
120    ///
121    /// Returns `Err` if the read fails or the body is not valid JSON.
122    pub async fn read<R>(reader: &mut R) -> io::Result<ClusterMessage>
123    where
124        R: AsyncReadExt + Unpin,
125    {
126        let mut len_buf = [0u8; 4];
127        reader.read_exact(&mut len_buf).await?;
128        let len = u32::from_be_bytes(len_buf) as usize;
129
130        let mut body = vec![0u8; len];
131        reader.read_exact(&mut body).await?;
132
133        serde_json::from_slice(&body).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
134    }
135}
136
137// ─────────────────────────────────────────────────────────────────────────────
138// Tests
139// ─────────────────────────────────────────────────────────────────────────────
140
141#[cfg(test)]
142mod tests {
143    use super::*;
144    use tokio::io::duplex;
145
146    #[tokio::test]
147    async fn test_roundtrip_gossip() {
148        let msg = ClusterMessage::Gossip {
149            sender_id: "node-1".to_owned(),
150            key: "alpha".to_owned(),
151            value: 99,
152            version: 7,
153        };
154        let (mut client, mut server) = duplex(1024);
155        MessageCodec::write(&mut client, &msg).await.expect("write");
156        let received = MessageCodec::read(&mut server).await.expect("read");
157        assert_eq!(msg, received);
158    }
159
160    #[tokio::test]
161    async fn test_roundtrip_ping() {
162        let msg = ClusterMessage::Ping {
163            sender_id: "node-2".to_owned(),
164            seq: 42,
165        };
166        let (mut client, mut server) = duplex(1024);
167        MessageCodec::write(&mut client, &msg).await.expect("write");
168        let received = MessageCodec::read(&mut server).await.expect("read");
169        assert_eq!(msg, received);
170    }
171
172    #[tokio::test]
173    async fn test_roundtrip_replicate() {
174        let msg = ClusterMessage::Replicate {
175            leader_id: "leader".to_owned(),
176            index: 100,
177            term: 3,
178            checksum: 0xDEAD_BEEF,
179        };
180        let (mut client, mut server) = duplex(1024);
181        MessageCodec::write(&mut client, &msg).await.expect("write");
182        let received = MessageCodec::read(&mut server).await.expect("read");
183        assert_eq!(msg, received);
184    }
185
186    #[tokio::test]
187    async fn test_multiple_messages_in_sequence() {
188        let msgs = vec![
189            ClusterMessage::Ping {
190                sender_id: "a".to_owned(),
191                seq: 1,
192            },
193            ClusterMessage::Pong {
194                sender_id: "b".to_owned(),
195                seq: 1,
196            },
197            ClusterMessage::ReplicateAck {
198                follower_id: "c".to_owned(),
199                index: 5,
200                success: true,
201            },
202        ];
203        let (mut client, mut server) = duplex(4096);
204        for msg in &msgs {
205            MessageCodec::write(&mut client, msg).await.expect("write");
206        }
207        // drop client write-half to signal EOF, but read from the duplex side
208        for expected in &msgs {
209            let received = MessageCodec::read(&mut server).await.expect("read");
210            assert_eq!(expected, &received);
211        }
212    }
213}