Skip to main content

noxu_rep/net/
inmem.rs

1//! First-class in-memory replication transport (Wave 11-D).
2//!
3//! noxu-rep ships three wire-level transports out of the box:
4//!
5//! | Transport | Module | Use case |
6//! |-----------|--------|----------|
7//! | TCP       | [`crate::net::TcpChannel`]      | Plain replication LAN/WAN |
8//! | TLS       | `crate::net::TlsTcpChannel`     | Encrypted WAN (rustls / native-tls) |
9//! | QUIC      | `crate::net::QuicChannel`       | Multiplexed UDP (feature `quic`) |
10//! | **In-memory** | [`InMemoryTransport`] (this module) | In-process clusters, embedded use cases, tests |
11//!
12//! The in-memory transport originated as a wire-level fixture for
13//! protocol unit tests ([`crate::net::LocalChannel`]).  Wave 11-D
14//! promotes that wiring into a first-class **production** transport
15//! so users can compose multi-node replication groups inside a single
16//! process — useful for embedded deployments, integration testing,
17//! and Stateright-driven property tests that need real
18//! `ReplicatedEnvironment` instances but no real network.
19//!
20//! # Topologies
21//!
22//! Two topologies are supported out of the box:
23//!
24//! * **Pair** — back-to-back endpoints, suitable for a 2-node
25//!   master/replica pair: [`InMemoryTransport::new_pair`].
26//! * **Group** — `N`-node fully-connected mesh suitable for any
27//!   election quorum: [`InMemoryTransport::new_group`].
28//!
29//! The mesh maintains exactly `N · (N - 1)` directional channels —
30//! one per ordered `(from, to)` pair — and routes each `send` to the
31//! corresponding peer's receive queue, mirroring the semantics of a
32//! real point-to-point socket cluster.
33//!
34//! # Crash injection
35//!
36//! Production cluster tests need to exercise crash recovery without
37//! tearing down the entire process.  [`InMemoryGroup::simulate_crash`]
38//! closes every channel that originates from or terminates at the
39//! crashed node, so subsequent `send` / `receive` calls on those
40//! channels return [`crate::error::RepError::ChannelClosed`] — exactly
41//! what a real socket disconnect would produce.
42//!
43//! Once a node has been crashed, [`InMemoryGroup::reconnect`] rewires
44//! a fresh set of channels into the same slot, simulating a node
45//! restart or a network partition heal.
46//!
47//! # Wire compatibility
48//!
49//! `InMemoryEndpoint` is a thin wrapper around [`LocalChannel`] and
50//! implements the same [`Channel`] trait as the TCP, TLS, and QUIC
51//! transports.  Higher layers
52//! ([`crate::stream::feeder::FeederRunner`],
53//! [`crate::stream::replica_stream::ReplicaStream`],
54//! [`crate::elections`]) consume `dyn Channel` so they work
55//! identically over any of the four transports without modification.
56//!
57//! # Usage
58//!
59//! ```no_run
60//! use noxu_rep::net::{Channel, InMemoryTransport};
61//! use std::time::Duration;
62//!
63//! // 1. Single back-to-back pair (e.g., master + 1 replica).
64//! let (a, b) = InMemoryTransport::new_pair();
65//! a.send(b"hello").unwrap();
66//! let msg = b.receive(Duration::from_millis(50)).unwrap();
67//! assert_eq!(msg, Some(b"hello".to_vec()));
68//!
69//! // 2. 3-node fully-connected mesh.
70//! let group = InMemoryTransport::new_group(3);
71//! group.channel(0, 1).send(b"ping").unwrap();
72//! let recv = group.channel(1, 0).receive(Duration::from_millis(50)).unwrap();
73//! assert_eq!(recv, Some(b"ping".to_vec()));
74//! ```
75
76use std::sync::Arc;
77use std::time::Duration;
78
79use noxu_sync::Mutex;
80
81use crate::error::Result;
82use crate::net::channel::{Channel, LocalChannel, LocalChannelPair};
83
84// ---------------------------------------------------------------------------
85// InMemoryEndpoint
86// ---------------------------------------------------------------------------
87
88/// One end of an in-memory replication channel.
89///
90/// Implements [`Channel`] identically to [`crate::net::TcpChannel`] and
91/// `crate::net::TlsTcpChannel`.  Internally backed by
92/// [`LocalChannel`] queues and a `noxu_sync::Mutex`.
93///
94/// Endpoints are constructed via [`InMemoryTransport::new_pair`] or
95/// [`InMemoryTransport::new_group`].  Direct construction is intentionally
96/// not exposed — pairing two endpoints requires the cross-connected
97/// queues set up by the transport factory.
98pub struct InMemoryEndpoint {
99    /// The underlying [`LocalChannel`].  Held in an `Arc` so the
100    /// owning `InMemoryGroup` can hand out cheap clones to higher
101    /// layers (`Arc<dyn Channel>`-style) without giving up ownership.
102    inner: Arc<LocalChannel>,
103}
104
105impl InMemoryEndpoint {
106    fn new(inner: LocalChannel) -> Self {
107        Self { inner: Arc::new(inner) }
108    }
109
110    /// Return a cheap shareable handle to this endpoint's underlying
111    /// channel.  Useful when the protocol layer wants
112    /// `Arc<dyn Channel>` (e.g., when spawning a reader thread that
113    /// outlives the borrow of the group).
114    pub fn channel_handle(&self) -> Arc<dyn Channel> {
115        Arc::clone(&self.inner) as Arc<dyn Channel>
116    }
117}
118
119impl Channel for InMemoryEndpoint {
120    fn send(&self, data: &[u8]) -> Result<()> {
121        self.inner.send(data)
122    }
123
124    fn receive(&self, timeout: Duration) -> Result<Option<Vec<u8>>> {
125        self.inner.receive(timeout)
126    }
127
128    fn close(&self) -> Result<()> {
129        self.inner.close()
130    }
131
132    fn is_open(&self) -> bool {
133        self.inner.is_open()
134    }
135}
136
137// ---------------------------------------------------------------------------
138// InMemoryTransport (factory)
139// ---------------------------------------------------------------------------
140
141/// Factory namespace for in-memory replication transports.
142///
143/// `InMemoryTransport` is a zero-sized type whose associated functions
144/// build [`InMemoryEndpoint`] / [`InMemoryGroup`] instances.  See the
145/// [module-level docs](crate::net::inmem) for the full topology table
146/// and design rationale.
147pub struct InMemoryTransport;
148
149impl InMemoryTransport {
150    /// Create a single bidirectional pair of cross-connected
151    /// in-memory endpoints.
152    ///
153    /// Sends on `a` arrive at `b`'s receive queue and vice versa.
154    /// Equivalent to [`LocalChannelPair::new`] but returned as
155    /// production-named [`InMemoryEndpoint`] handles.
156    pub fn new_pair() -> (InMemoryEndpoint, InMemoryEndpoint) {
157        let pair = LocalChannelPair::new();
158        (
159            InMemoryEndpoint::new(pair.channel_a),
160            InMemoryEndpoint::new(pair.channel_b),
161        )
162    }
163
164    /// Create an `n`-node fully-connected in-memory group.
165    ///
166    /// The returned [`InMemoryGroup`] owns `n · (n - 1)` directional
167    /// channels arranged so that `group.channel(i, j).send(msg)` is
168    /// observed by `group.channel(j, i).receive(...)`.
169    ///
170    /// # Panics
171    ///
172    /// Panics if `n == 0`.  A 1-node "group" is supported (degenerate)
173    /// but a zero-node group is meaningless and almost certainly a
174    /// caller bug.
175    pub fn new_group(n: usize) -> InMemoryGroup {
176        InMemoryGroup::new(n)
177    }
178}
179
180// ---------------------------------------------------------------------------
181// InMemoryGroup
182// ---------------------------------------------------------------------------
183
184/// An `n`-node fully-connected in-memory replication mesh.
185///
186/// `InMemoryGroup` owns one [`InMemoryEndpoint`] per ordered
187/// `(from, to)` peer pair (with `from != to`).  The endpoint at
188/// `(from, to)` is `from`'s view of its socket to `to`; sending on
189/// that endpoint is observed by the endpoint at `(to, from)`.
190///
191/// Higher layers typically consume the group by handing each node
192/// the row of channels `[group.channel(my_id, peer)] for peer in 0..n`.
193///
194/// # Crash and recovery
195///
196/// [`InMemoryGroup::simulate_crash`] closes every channel touching a
197/// node, modelling a hard crash or partition.  After a crash,
198/// [`InMemoryGroup::reconnect`] rewires that node's row of channels
199/// (paired against the other live nodes) to model a node restart or
200/// healed partition.  A crashed node may be reconnected at most once
201/// per crash; the implementation tolerates repeated calls.
202pub struct InMemoryGroup {
203    n: usize,
204    /// `endpoints[i][j]` (with `i != j`) is node `i`'s endpoint to
205    /// node `j`.  Diagonal slots (`i == j`) are kept as `None` so the
206    /// caller can index by `(from, to)` without arithmetic.
207    ///
208    /// Wrapped in `Mutex<Option<_>>` so [`Self::reconnect`] can replace
209    /// individual endpoints under the lock without invalidating any
210    /// outstanding `Arc<dyn Channel>` clones held elsewhere.
211    endpoints: Vec<Vec<Mutex<Option<InMemoryEndpoint>>>>,
212}
213
214impl InMemoryGroup {
215    fn new(n: usize) -> Self {
216        assert!(n > 0, "InMemoryGroup requires at least one node");
217
218        // Build n×n matrix; diagonal stays None.
219        let endpoints: Vec<Vec<Mutex<Option<InMemoryEndpoint>>>> = (0..n)
220            .map(|_| (0..n).map(|_| Mutex::new(None)).collect())
221            .collect();
222
223        // Cross-connect every (i, j) with i < j: one LocalChannelPair
224        // gives us both `i → j` (channel_a) and `j → i` (channel_b).
225        #[allow(clippy::needless_range_loop)]
226        for i in 0..n {
227            for j in (i + 1)..n {
228                let pair = LocalChannelPair::new();
229                *endpoints[i][j].lock() =
230                    Some(InMemoryEndpoint::new(pair.channel_a));
231                *endpoints[j][i].lock() =
232                    Some(InMemoryEndpoint::new(pair.channel_b));
233            }
234        }
235
236        Self { n, endpoints }
237    }
238
239    /// Number of nodes in the mesh.
240    pub fn size(&self) -> usize {
241        self.n
242    }
243
244    /// Return a cheap [`Arc<dyn Channel>`] handle to the directed
245    /// channel from `from` to `to`.
246    ///
247    /// # Panics
248    ///
249    /// Panics if `from` or `to` is out of range, if `from == to`, or
250    /// if the channel has been removed by [`Self::simulate_crash`]
251    /// without a subsequent [`Self::reconnect`].
252    pub fn channel(&self, from: usize, to: usize) -> Arc<dyn Channel> {
253        assert!(from < self.n, "from index {from} out of range (n={})", self.n);
254        assert!(to < self.n, "to index {to} out of range (n={})", self.n);
255        assert!(from != to, "in-memory mesh has no self-loop channel");
256        let slot = self.endpoints[from][to].lock();
257        slot.as_ref()
258            .unwrap_or_else(|| {
259                panic!(
260                    "in-memory channel {from}→{to} is closed; \
261                     call reconnect({from}) before reuse"
262                )
263            })
264            .channel_handle()
265    }
266
267    /// Try to acquire the directed channel from `from` to `to`,
268    /// returning `None` if the channel has been crashed.
269    ///
270    /// # Panics
271    ///
272    /// Panics on out-of-range indices or `from == to`.
273    pub fn try_channel(
274        &self,
275        from: usize,
276        to: usize,
277    ) -> Option<Arc<dyn Channel>> {
278        assert!(from < self.n, "from index {from} out of range (n={})", self.n);
279        assert!(to < self.n, "to index {to} out of range (n={})", self.n);
280        assert!(from != to, "in-memory mesh has no self-loop channel");
281        let slot = self.endpoints[from][to].lock();
282        slot.as_ref().map(|e| e.channel_handle())
283    }
284
285    /// Simulate a hard crash of `node`: every channel originating
286    /// from or terminating at `node` is `close`d and dropped from the
287    /// mesh.  Subsequent `send` / `receive` on any handle that was
288    /// previously cloned out of those channels returns
289    /// [`crate::error::RepError::ChannelClosed`], matching a real
290    /// socket disconnect.
291    ///
292    /// Idempotent: calling `simulate_crash` on an already-crashed
293    /// node is a no-op.
294    ///
295    /// # Panics
296    ///
297    /// Panics if `node` is out of range.
298    pub fn simulate_crash(&self, node: usize) {
299        assert!(node < self.n, "node index {node} out of range (n={})", self.n);
300        for peer in 0..self.n {
301            if peer == node {
302                continue;
303            }
304            // Close and drop both directions independently so a half-
305            // crashed mesh (one direction reconnected, the other not)
306            // is still expressible by the caller.
307            let mut out = self.endpoints[node][peer].lock();
308            if let Some(ep) = out.take() {
309                let _ = ep.inner.close();
310            }
311            drop(out);
312
313            let mut inn = self.endpoints[peer][node].lock();
314            if let Some(ep) = inn.take() {
315                let _ = ep.inner.close();
316            }
317        }
318    }
319
320    /// Rewire `node`'s row of channels against every peer that is
321    /// still live (i.e., still has a channel slot to `node`).  Models
322    /// a node restart or a healed partition.
323    ///
324    /// Channels whose remote end is itself currently crashed are left
325    /// disconnected; the caller should reconnect them in a separate
326    /// pass once the remote node has come back.
327    ///
328    /// # Panics
329    ///
330    /// Panics if `node` is out of range.
331    pub fn reconnect(&self, node: usize) {
332        assert!(node < self.n, "node index {node} out of range (n={})", self.n);
333        for peer in 0..self.n {
334            if peer == node {
335                continue;
336            }
337            // Lock both ordered pairs, smallest index first to keep a
338            // global lock order (deadlock-free).
339            let (lo, hi) =
340                if node < peer { (node, peer) } else { (peer, node) };
341            let mut a = self.endpoints[lo][hi].lock();
342            let mut b = self.endpoints[hi][lo].lock();
343
344            // Only reconnect if both directions are currently empty.
345            // If one side is live and the other isn't, the caller has
346            // a half-open mesh and we leave it as-is.
347            if a.is_some() || b.is_some() {
348                continue;
349            }
350            let pair = LocalChannelPair::new();
351            *a = Some(InMemoryEndpoint::new(pair.channel_a));
352            *b = Some(InMemoryEndpoint::new(pair.channel_b));
353        }
354    }
355
356    /// Return `true` iff every directed channel touching `node` is
357    /// currently open.
358    pub fn is_node_live(&self, node: usize) -> bool {
359        if node >= self.n {
360            return false;
361        }
362        for peer in 0..self.n {
363            if peer == node {
364                continue;
365            }
366            if self.endpoints[node][peer].lock().is_none() {
367                return false;
368            }
369            if self.endpoints[peer][node].lock().is_none() {
370                return false;
371            }
372        }
373        true
374    }
375}
376
377// ---------------------------------------------------------------------------
378// Tests
379// ---------------------------------------------------------------------------
380
381#[cfg(test)]
382mod tests {
383    use super::*;
384
385    #[test]
386    fn pair_round_trip() {
387        let (a, b) = InMemoryTransport::new_pair();
388        a.send(b"hello").unwrap();
389        let got = b.receive(Duration::from_millis(50)).unwrap();
390        assert_eq!(got, Some(b"hello".to_vec()));
391        b.send(b"world").unwrap();
392        let got = a.receive(Duration::from_millis(50)).unwrap();
393        assert_eq!(got, Some(b"world".to_vec()));
394    }
395
396    #[test]
397    fn group_3node_mesh_is_fully_connected() {
398        let group = InMemoryTransport::new_group(3);
399        assert_eq!(group.size(), 3);
400
401        // Every directed pair has a channel.
402        for i in 0..3 {
403            for j in 0..3 {
404                if i == j {
405                    continue;
406                }
407                let _ = group.channel(i, j);
408            }
409        }
410
411        // Sends on (i → j) are received on the (j → i) endpoint
412        // (same underlying queue pair, opposite end).
413        group.channel(0, 1).send(b"01").unwrap();
414        let got =
415            group.channel(1, 0).receive(Duration::from_millis(50)).unwrap();
416        assert_eq!(got, Some(b"01".to_vec()));
417    }
418
419    #[test]
420    fn group_independent_pairs_do_not_cross_talk() {
421        let group = InMemoryTransport::new_group(4);
422        group.channel(0, 1).send(b"to-1").unwrap();
423        group.channel(0, 2).send(b"to-2").unwrap();
424
425        let g10 =
426            group.channel(1, 0).receive(Duration::from_millis(50)).unwrap();
427        let g20 =
428            group.channel(2, 0).receive(Duration::from_millis(50)).unwrap();
429        let g30 =
430            group.channel(3, 0).receive(Duration::from_millis(50)).unwrap();
431        assert_eq!(g10, Some(b"to-1".to_vec()));
432        assert_eq!(g20, Some(b"to-2".to_vec()));
433        assert_eq!(g30, None, "node 3 must not see node 1's traffic");
434    }
435
436    #[test]
437    fn simulate_crash_closes_all_channels_for_node() {
438        let group = InMemoryTransport::new_group(3);
439        // Take handles before the crash; they must observe the close.
440        let zero_to_one = group.channel(0, 1);
441        let one_to_zero = group.channel(1, 0);
442
443        group.simulate_crash(0);
444
445        // Pre-crash handles see ChannelClosed on send / receive.
446        assert!(zero_to_one.send(b"after-crash").is_err());
447        let r = one_to_zero.receive(Duration::from_millis(20));
448        assert!(r.is_err(), "post-crash receive must surface error");
449
450        // Group accessors fail-fast for the crashed slot.
451        assert!(group.try_channel(0, 1).is_none());
452        assert!(group.try_channel(1, 0).is_none());
453        // Non-crashed pair still works.
454        assert!(group.try_channel(1, 2).is_some());
455        group.channel(1, 2).send(b"alive").unwrap();
456        let got =
457            group.channel(2, 1).receive(Duration::from_millis(50)).unwrap();
458        assert_eq!(got, Some(b"alive".to_vec()));
459    }
460
461    #[test]
462    fn simulate_crash_is_idempotent() {
463        let group = InMemoryTransport::new_group(3);
464        group.simulate_crash(2);
465        group.simulate_crash(2);
466        // Node 2 is fully crashed: every channel touching it is None.
467        assert!(!group.is_node_live(2));
468        // The non-crashed (0,1) link is still up.
469        assert!(group.try_channel(0, 1).is_some());
470        assert!(group.try_channel(1, 0).is_some());
471        // Sanity: nodes 0 and 1 each still have an open neighbor.
472        group.channel(0, 1).send(b"alive").unwrap();
473        let got =
474            group.channel(1, 0).receive(Duration::from_millis(50)).unwrap();
475        assert_eq!(got, Some(b"alive".to_vec()));
476    }
477
478    #[test]
479    fn reconnect_after_crash_restores_traffic() {
480        let group = InMemoryTransport::new_group(3);
481        group.simulate_crash(0);
482        assert!(!group.is_node_live(0));
483
484        group.reconnect(0);
485        assert!(group.is_node_live(0));
486
487        // New handles work end-to-end.
488        group.channel(0, 1).send(b"reborn").unwrap();
489        let got =
490            group.channel(1, 0).receive(Duration::from_millis(50)).unwrap();
491        assert_eq!(got, Some(b"reborn".to_vec()));
492    }
493
494    #[test]
495    #[should_panic(expected = "out of range")]
496    fn channel_out_of_range_panics() {
497        let group = InMemoryTransport::new_group(2);
498        let _ = group.channel(5, 0);
499    }
500
501    #[test]
502    #[should_panic(expected = "no self-loop")]
503    fn channel_self_loop_panics() {
504        let group = InMemoryTransport::new_group(2);
505        let _ = group.channel(0, 0);
506    }
507
508    #[test]
509    #[should_panic(expected = "at least one node")]
510    fn empty_group_panics() {
511        let _ = InMemoryTransport::new_group(0);
512    }
513
514    #[test]
515    fn one_node_group_has_no_channels() {
516        let group = InMemoryTransport::new_group(1);
517        assert_eq!(group.size(), 1);
518        assert!(group.is_node_live(0));
519    }
520
521    #[test]
522    fn channel_handle_outlives_borrow_of_group() {
523        let handle: Arc<dyn Channel> = {
524            let group = InMemoryTransport::new_group(2);
525            group.channel(0, 1)
526        };
527        // The group has been dropped; the handle's underlying queues
528        // are kept alive by the matching peer-side handle on the Arc
529        // refcount of the inner LocalChannel.  Sending into a dropped
530        // peer must surface ChannelClosed (writer is gone, reader
531        // queue is dropped).  For this smoke test we just verify the
532        // handle itself stays usable enough to query is_open without
533        // panicking.
534        let _ = handle.is_open();
535    }
536}