noxu_rep/net/inmem.rs
1//! First-class in-memory replication transport (Wave 11-D).
2//!
3//! noxu-rep ships three wire-level transports out of the box:
4//!
5//! | Transport | Module | Use case |
6//! |-----------|--------|----------|
7//! | TCP | [`crate::net::TcpChannel`] | Plain replication LAN/WAN |
8//! | TLS | `crate::net::TlsTcpChannel` | Encrypted WAN (rustls / native-tls) |
9//! | QUIC | `crate::net::QuicChannel` | Multiplexed UDP (feature `quic`) |
10//! | **In-memory** | [`InMemoryTransport`] (this module) | In-process clusters, embedded use cases, tests |
11//!
12//! The in-memory transport originated as a wire-level fixture for
13//! protocol unit tests ([`crate::net::LocalChannel`]). Wave 11-D
14//! promotes that wiring into a first-class **production** transport
15//! so users can compose multi-node replication groups inside a single
16//! process — useful for embedded deployments, integration testing,
17//! and Stateright-driven property tests that need real
18//! `ReplicatedEnvironment` instances but no real network.
19//!
20//! # Topologies
21//!
22//! Two topologies are supported out of the box:
23//!
24//! * **Pair** — back-to-back endpoints, suitable for a 2-node
25//! master/replica pair: [`InMemoryTransport::new_pair`].
26//! * **Group** — `N`-node fully-connected mesh suitable for any
27//! election quorum: [`InMemoryTransport::new_group`].
28//!
29//! The mesh maintains exactly `N · (N - 1)` directional channels —
30//! one per ordered `(from, to)` pair — and routes each `send` to the
31//! corresponding peer's receive queue, mirroring the semantics of a
32//! real point-to-point socket cluster.
33//!
34//! # Crash injection
35//!
36//! Production cluster tests need to exercise crash recovery without
37//! tearing down the entire process. [`InMemoryGroup::simulate_crash`]
38//! closes every channel that originates from or terminates at the
39//! crashed node, so subsequent `send` / `receive` calls on those
40//! channels return [`crate::error::RepError::ChannelClosed`] — exactly
41//! what a real socket disconnect would produce.
42//!
43//! Once a node has been crashed, [`InMemoryGroup::reconnect`] rewires
44//! a fresh set of channels into the same slot, simulating a node
45//! restart or a network partition heal.
46//!
47//! # Wire compatibility
48//!
49//! `InMemoryEndpoint` is a thin wrapper around [`LocalChannel`] and
50//! implements the same [`Channel`] trait as the TCP, TLS, and QUIC
51//! transports. Higher layers
52//! ([`crate::stream::feeder::FeederRunner`],
53//! [`crate::stream::replica_stream::ReplicaStream`],
54//! [`crate::elections`]) consume `dyn Channel` so they work
55//! identically over any of the four transports without modification.
56//!
57//! # Usage
58//!
59//! ```no_run
60//! use noxu_rep::net::{Channel, InMemoryTransport};
61//! use std::time::Duration;
62//!
63//! // 1. Single back-to-back pair (e.g., master + 1 replica).
64//! let (a, b) = InMemoryTransport::new_pair();
65//! a.send(b"hello").unwrap();
66//! let msg = b.receive(Duration::from_millis(50)).unwrap();
67//! assert_eq!(msg, Some(b"hello".to_vec()));
68//!
69//! // 2. 3-node fully-connected mesh.
70//! let group = InMemoryTransport::new_group(3);
71//! group.channel(0, 1).send(b"ping").unwrap();
72//! let recv = group.channel(1, 0).receive(Duration::from_millis(50)).unwrap();
73//! assert_eq!(recv, Some(b"ping".to_vec()));
74//! ```
75
76use std::sync::Arc;
77use std::time::Duration;
78
79use noxu_sync::Mutex;
80
81use crate::error::Result;
82use crate::net::channel::{Channel, LocalChannel, LocalChannelPair};
83
84// ---------------------------------------------------------------------------
85// InMemoryEndpoint
86// ---------------------------------------------------------------------------
87
88/// One end of an in-memory replication channel.
89///
90/// Implements [`Channel`] identically to [`crate::net::TcpChannel`] and
91/// `crate::net::TlsTcpChannel`. Internally backed by
92/// [`LocalChannel`] queues and a `noxu_sync::Mutex`.
93///
94/// Endpoints are constructed via [`InMemoryTransport::new_pair`] or
95/// [`InMemoryTransport::new_group`]. Direct construction is intentionally
96/// not exposed — pairing two endpoints requires the cross-connected
97/// queues set up by the transport factory.
98pub struct InMemoryEndpoint {
99 /// The underlying [`LocalChannel`]. Held in an `Arc` so the
100 /// owning `InMemoryGroup` can hand out cheap clones to higher
101 /// layers (`Arc<dyn Channel>`-style) without giving up ownership.
102 inner: Arc<LocalChannel>,
103}
104
105impl InMemoryEndpoint {
106 fn new(inner: LocalChannel) -> Self {
107 Self { inner: Arc::new(inner) }
108 }
109
110 /// Return a cheap shareable handle to this endpoint's underlying
111 /// channel. Useful when the protocol layer wants
112 /// `Arc<dyn Channel>` (e.g., when spawning a reader thread that
113 /// outlives the borrow of the group).
114 pub fn channel_handle(&self) -> Arc<dyn Channel> {
115 Arc::clone(&self.inner) as Arc<dyn Channel>
116 }
117}
118
119impl Channel for InMemoryEndpoint {
120 fn send(&self, data: &[u8]) -> Result<()> {
121 self.inner.send(data)
122 }
123
124 fn receive(&self, timeout: Duration) -> Result<Option<Vec<u8>>> {
125 self.inner.receive(timeout)
126 }
127
128 fn close(&self) -> Result<()> {
129 self.inner.close()
130 }
131
132 fn is_open(&self) -> bool {
133 self.inner.is_open()
134 }
135}
136
137// ---------------------------------------------------------------------------
138// InMemoryTransport (factory)
139// ---------------------------------------------------------------------------
140
141/// Factory namespace for in-memory replication transports.
142///
143/// `InMemoryTransport` is a zero-sized type whose associated functions
144/// build [`InMemoryEndpoint`] / [`InMemoryGroup`] instances. See the
145/// [module-level docs](crate::net::inmem) for the full topology table
146/// and design rationale.
147pub struct InMemoryTransport;
148
149impl InMemoryTransport {
150 /// Create a single bidirectional pair of cross-connected
151 /// in-memory endpoints.
152 ///
153 /// Sends on `a` arrive at `b`'s receive queue and vice versa.
154 /// Equivalent to [`LocalChannelPair::new`] but returned as
155 /// production-named [`InMemoryEndpoint`] handles.
156 pub fn new_pair() -> (InMemoryEndpoint, InMemoryEndpoint) {
157 let pair = LocalChannelPair::new();
158 (
159 InMemoryEndpoint::new(pair.channel_a),
160 InMemoryEndpoint::new(pair.channel_b),
161 )
162 }
163
164 /// Create an `n`-node fully-connected in-memory group.
165 ///
166 /// The returned [`InMemoryGroup`] owns `n · (n - 1)` directional
167 /// channels arranged so that `group.channel(i, j).send(msg)` is
168 /// observed by `group.channel(j, i).receive(...)`.
169 ///
170 /// # Panics
171 ///
172 /// Panics if `n == 0`. A 1-node "group" is supported (degenerate)
173 /// but a zero-node group is meaningless and almost certainly a
174 /// caller bug.
175 pub fn new_group(n: usize) -> InMemoryGroup {
176 InMemoryGroup::new(n)
177 }
178}
179
180// ---------------------------------------------------------------------------
181// InMemoryGroup
182// ---------------------------------------------------------------------------
183
184/// An `n`-node fully-connected in-memory replication mesh.
185///
186/// `InMemoryGroup` owns one [`InMemoryEndpoint`] per ordered
187/// `(from, to)` peer pair (with `from != to`). The endpoint at
188/// `(from, to)` is `from`'s view of its socket to `to`; sending on
189/// that endpoint is observed by the endpoint at `(to, from)`.
190///
191/// Higher layers typically consume the group by handing each node
192/// the row of channels `[group.channel(my_id, peer)] for peer in 0..n`.
193///
194/// # Crash and recovery
195///
196/// [`InMemoryGroup::simulate_crash`] closes every channel touching a
197/// node, modelling a hard crash or partition. After a crash,
198/// [`InMemoryGroup::reconnect`] rewires that node's row of channels
199/// (paired against the other live nodes) to model a node restart or
200/// healed partition. A crashed node may be reconnected at most once
201/// per crash; the implementation tolerates repeated calls.
202pub struct InMemoryGroup {
203 n: usize,
204 /// `endpoints[i][j]` (with `i != j`) is node `i`'s endpoint to
205 /// node `j`. Diagonal slots (`i == j`) are kept as `None` so the
206 /// caller can index by `(from, to)` without arithmetic.
207 ///
208 /// Wrapped in `Mutex<Option<_>>` so [`Self::reconnect`] can replace
209 /// individual endpoints under the lock without invalidating any
210 /// outstanding `Arc<dyn Channel>` clones held elsewhere.
211 endpoints: Vec<Vec<Mutex<Option<InMemoryEndpoint>>>>,
212}
213
214impl InMemoryGroup {
215 fn new(n: usize) -> Self {
216 assert!(n > 0, "InMemoryGroup requires at least one node");
217
218 // Build n×n matrix; diagonal stays None.
219 let endpoints: Vec<Vec<Mutex<Option<InMemoryEndpoint>>>> = (0..n)
220 .map(|_| (0..n).map(|_| Mutex::new(None)).collect())
221 .collect();
222
223 // Cross-connect every (i, j) with i < j: one LocalChannelPair
224 // gives us both `i → j` (channel_a) and `j → i` (channel_b).
225 #[allow(clippy::needless_range_loop)]
226 for i in 0..n {
227 for j in (i + 1)..n {
228 let pair = LocalChannelPair::new();
229 *endpoints[i][j].lock() =
230 Some(InMemoryEndpoint::new(pair.channel_a));
231 *endpoints[j][i].lock() =
232 Some(InMemoryEndpoint::new(pair.channel_b));
233 }
234 }
235
236 Self { n, endpoints }
237 }
238
239 /// Number of nodes in the mesh.
240 pub fn size(&self) -> usize {
241 self.n
242 }
243
244 /// Return a cheap [`Arc<dyn Channel>`] handle to the directed
245 /// channel from `from` to `to`.
246 ///
247 /// # Panics
248 ///
249 /// Panics if `from` or `to` is out of range, if `from == to`, or
250 /// if the channel has been removed by [`Self::simulate_crash`]
251 /// without a subsequent [`Self::reconnect`].
252 pub fn channel(&self, from: usize, to: usize) -> Arc<dyn Channel> {
253 assert!(from < self.n, "from index {from} out of range (n={})", self.n);
254 assert!(to < self.n, "to index {to} out of range (n={})", self.n);
255 assert!(from != to, "in-memory mesh has no self-loop channel");
256 let slot = self.endpoints[from][to].lock();
257 slot.as_ref()
258 .unwrap_or_else(|| {
259 panic!(
260 "in-memory channel {from}→{to} is closed; \
261 call reconnect({from}) before reuse"
262 )
263 })
264 .channel_handle()
265 }
266
267 /// Try to acquire the directed channel from `from` to `to`,
268 /// returning `None` if the channel has been crashed.
269 ///
270 /// # Panics
271 ///
272 /// Panics on out-of-range indices or `from == to`.
273 pub fn try_channel(
274 &self,
275 from: usize,
276 to: usize,
277 ) -> Option<Arc<dyn Channel>> {
278 assert!(from < self.n, "from index {from} out of range (n={})", self.n);
279 assert!(to < self.n, "to index {to} out of range (n={})", self.n);
280 assert!(from != to, "in-memory mesh has no self-loop channel");
281 let slot = self.endpoints[from][to].lock();
282 slot.as_ref().map(|e| e.channel_handle())
283 }
284
285 /// Simulate a hard crash of `node`: every channel originating
286 /// from or terminating at `node` is `close`d and dropped from the
287 /// mesh. Subsequent `send` / `receive` on any handle that was
288 /// previously cloned out of those channels returns
289 /// [`crate::error::RepError::ChannelClosed`], matching a real
290 /// socket disconnect.
291 ///
292 /// Idempotent: calling `simulate_crash` on an already-crashed
293 /// node is a no-op.
294 ///
295 /// # Panics
296 ///
297 /// Panics if `node` is out of range.
298 pub fn simulate_crash(&self, node: usize) {
299 assert!(node < self.n, "node index {node} out of range (n={})", self.n);
300 for peer in 0..self.n {
301 if peer == node {
302 continue;
303 }
304 // Close and drop both directions independently so a half-
305 // crashed mesh (one direction reconnected, the other not)
306 // is still expressible by the caller.
307 let mut out = self.endpoints[node][peer].lock();
308 if let Some(ep) = out.take() {
309 let _ = ep.inner.close();
310 }
311 drop(out);
312
313 let mut inn = self.endpoints[peer][node].lock();
314 if let Some(ep) = inn.take() {
315 let _ = ep.inner.close();
316 }
317 }
318 }
319
320 /// Rewire `node`'s row of channels against every peer that is
321 /// still live (i.e., still has a channel slot to `node`). Models
322 /// a node restart or a healed partition.
323 ///
324 /// Channels whose remote end is itself currently crashed are left
325 /// disconnected; the caller should reconnect them in a separate
326 /// pass once the remote node has come back.
327 ///
328 /// # Panics
329 ///
330 /// Panics if `node` is out of range.
331 pub fn reconnect(&self, node: usize) {
332 assert!(node < self.n, "node index {node} out of range (n={})", self.n);
333 for peer in 0..self.n {
334 if peer == node {
335 continue;
336 }
337 // Lock both ordered pairs, smallest index first to keep a
338 // global lock order (deadlock-free).
339 let (lo, hi) =
340 if node < peer { (node, peer) } else { (peer, node) };
341 let mut a = self.endpoints[lo][hi].lock();
342 let mut b = self.endpoints[hi][lo].lock();
343
344 // Only reconnect if both directions are currently empty.
345 // If one side is live and the other isn't, the caller has
346 // a half-open mesh and we leave it as-is.
347 if a.is_some() || b.is_some() {
348 continue;
349 }
350 let pair = LocalChannelPair::new();
351 *a = Some(InMemoryEndpoint::new(pair.channel_a));
352 *b = Some(InMemoryEndpoint::new(pair.channel_b));
353 }
354 }
355
356 /// Return `true` iff every directed channel touching `node` is
357 /// currently open.
358 pub fn is_node_live(&self, node: usize) -> bool {
359 if node >= self.n {
360 return false;
361 }
362 for peer in 0..self.n {
363 if peer == node {
364 continue;
365 }
366 if self.endpoints[node][peer].lock().is_none() {
367 return false;
368 }
369 if self.endpoints[peer][node].lock().is_none() {
370 return false;
371 }
372 }
373 true
374 }
375}
376
377// ---------------------------------------------------------------------------
378// Tests
379// ---------------------------------------------------------------------------
380
381#[cfg(test)]
382mod tests {
383 use super::*;
384
385 #[test]
386 fn pair_round_trip() {
387 let (a, b) = InMemoryTransport::new_pair();
388 a.send(b"hello").unwrap();
389 let got = b.receive(Duration::from_millis(50)).unwrap();
390 assert_eq!(got, Some(b"hello".to_vec()));
391 b.send(b"world").unwrap();
392 let got = a.receive(Duration::from_millis(50)).unwrap();
393 assert_eq!(got, Some(b"world".to_vec()));
394 }
395
396 #[test]
397 fn group_3node_mesh_is_fully_connected() {
398 let group = InMemoryTransport::new_group(3);
399 assert_eq!(group.size(), 3);
400
401 // Every directed pair has a channel.
402 for i in 0..3 {
403 for j in 0..3 {
404 if i == j {
405 continue;
406 }
407 let _ = group.channel(i, j);
408 }
409 }
410
411 // Sends on (i → j) are received on the (j → i) endpoint
412 // (same underlying queue pair, opposite end).
413 group.channel(0, 1).send(b"01").unwrap();
414 let got =
415 group.channel(1, 0).receive(Duration::from_millis(50)).unwrap();
416 assert_eq!(got, Some(b"01".to_vec()));
417 }
418
419 #[test]
420 fn group_independent_pairs_do_not_cross_talk() {
421 let group = InMemoryTransport::new_group(4);
422 group.channel(0, 1).send(b"to-1").unwrap();
423 group.channel(0, 2).send(b"to-2").unwrap();
424
425 let g10 =
426 group.channel(1, 0).receive(Duration::from_millis(50)).unwrap();
427 let g20 =
428 group.channel(2, 0).receive(Duration::from_millis(50)).unwrap();
429 let g30 =
430 group.channel(3, 0).receive(Duration::from_millis(50)).unwrap();
431 assert_eq!(g10, Some(b"to-1".to_vec()));
432 assert_eq!(g20, Some(b"to-2".to_vec()));
433 assert_eq!(g30, None, "node 3 must not see node 1's traffic");
434 }
435
436 #[test]
437 fn simulate_crash_closes_all_channels_for_node() {
438 let group = InMemoryTransport::new_group(3);
439 // Take handles before the crash; they must observe the close.
440 let zero_to_one = group.channel(0, 1);
441 let one_to_zero = group.channel(1, 0);
442
443 group.simulate_crash(0);
444
445 // Pre-crash handles see ChannelClosed on send / receive.
446 assert!(zero_to_one.send(b"after-crash").is_err());
447 let r = one_to_zero.receive(Duration::from_millis(20));
448 assert!(r.is_err(), "post-crash receive must surface error");
449
450 // Group accessors fail-fast for the crashed slot.
451 assert!(group.try_channel(0, 1).is_none());
452 assert!(group.try_channel(1, 0).is_none());
453 // Non-crashed pair still works.
454 assert!(group.try_channel(1, 2).is_some());
455 group.channel(1, 2).send(b"alive").unwrap();
456 let got =
457 group.channel(2, 1).receive(Duration::from_millis(50)).unwrap();
458 assert_eq!(got, Some(b"alive".to_vec()));
459 }
460
461 #[test]
462 fn simulate_crash_is_idempotent() {
463 let group = InMemoryTransport::new_group(3);
464 group.simulate_crash(2);
465 group.simulate_crash(2);
466 // Node 2 is fully crashed: every channel touching it is None.
467 assert!(!group.is_node_live(2));
468 // The non-crashed (0,1) link is still up.
469 assert!(group.try_channel(0, 1).is_some());
470 assert!(group.try_channel(1, 0).is_some());
471 // Sanity: nodes 0 and 1 each still have an open neighbor.
472 group.channel(0, 1).send(b"alive").unwrap();
473 let got =
474 group.channel(1, 0).receive(Duration::from_millis(50)).unwrap();
475 assert_eq!(got, Some(b"alive".to_vec()));
476 }
477
478 #[test]
479 fn reconnect_after_crash_restores_traffic() {
480 let group = InMemoryTransport::new_group(3);
481 group.simulate_crash(0);
482 assert!(!group.is_node_live(0));
483
484 group.reconnect(0);
485 assert!(group.is_node_live(0));
486
487 // New handles work end-to-end.
488 group.channel(0, 1).send(b"reborn").unwrap();
489 let got =
490 group.channel(1, 0).receive(Duration::from_millis(50)).unwrap();
491 assert_eq!(got, Some(b"reborn".to_vec()));
492 }
493
494 #[test]
495 #[should_panic(expected = "out of range")]
496 fn channel_out_of_range_panics() {
497 let group = InMemoryTransport::new_group(2);
498 let _ = group.channel(5, 0);
499 }
500
501 #[test]
502 #[should_panic(expected = "no self-loop")]
503 fn channel_self_loop_panics() {
504 let group = InMemoryTransport::new_group(2);
505 let _ = group.channel(0, 0);
506 }
507
508 #[test]
509 #[should_panic(expected = "at least one node")]
510 fn empty_group_panics() {
511 let _ = InMemoryTransport::new_group(0);
512 }
513
514 #[test]
515 fn one_node_group_has_no_channels() {
516 let group = InMemoryTransport::new_group(1);
517 assert_eq!(group.size(), 1);
518 assert!(group.is_node_live(0));
519 }
520
521 #[test]
522 fn channel_handle_outlives_borrow_of_group() {
523 let handle: Arc<dyn Channel> = {
524 let group = InMemoryTransport::new_group(2);
525 group.channel(0, 1)
526 };
527 // The group has been dropped; the handle's underlying queues
528 // are kept alive by the matching peer-side handle on the Arc
529 // refcount of the inner LocalChannel. Sending into a dropped
530 // peer must surface ChannelClosed (writer is gone, reader
531 // queue is dropped). For this smoke test we just verify the
532 // handle itself stays usable enough to query is_open without
533 // panicking.
534 let _ = handle.is_open();
535 }
536}