Skip to main content

noxu_rep/
subscription.rs

1//! Replication subscription for receiving replicated entries from a feeder.
2//!
3//! The
4//! Subscription connects to a feeder node and receives a stream of
5//! replicated log entries starting from a given VLSN. This is used by
6//! subscribers that want to consume the replication stream without being
7//! full replica members of the group.
8
9use std::net::TcpStream;
10use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
11
12use noxu_sync::Mutex;
13
14use crate::error::{RepError, Result};
15
16/// Configuration for a replication subscription.
17///
18/// Specifies the subscriber
19/// identity, the replication group to subscribe to, the feeder to connect
20/// to, and the starting VLSN.
21#[derive(Debug, Clone)]
22pub struct SubscriptionConfig {
23    /// Name of the subscriber node.
24    pub subscriber_name: String,
25    /// Name of the replication group.
26    pub group_name: String,
27    /// Hostname of the feeder to connect to.
28    pub feeder_host: String,
29    /// Feeder to connect to.
30    pub feeder_port: u16,
31    /// VLSN to start streaming from.
32    pub start_vlsn: u64,
33}
34
35/// Callback for receiving replicated entries.
36///
37/// Implementations process
38/// each replicated entry as it arrives, handle errors, and are notified
39/// when the subscriber catches up to the master's current position.
40pub trait SubscriptionCallback: Send + Sync {
41    /// Called when a new replicated entry is received.
42    ///
43    /// # Arguments
44    /// * `vlsn` - The VLSN of this entry.
45    /// * `entry_type` - The log entry type identifier.
46    /// * `data` - The raw entry payload.
47    fn on_entry(&self, vlsn: u64, entry_type: u8, data: &[u8]);
48
49    /// Called when an error occurs during subscription processing.
50    fn on_error(&self, error: &RepError);
51
52    /// Called when the subscription has caught up with the master.
53    fn on_caught_up(&self, vlsn: u64);
54}
55
56/// The current state of a subscription.
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub enum SubscriptionState {
59    /// Initial state, not yet started.
60    Idle,
61    /// Connecting to the feeder.
62    Connecting,
63    /// Actively receiving entries.
64    Active,
65    /// Caught up with the master's current VLSN.
66    CaughtUp,
67    /// An error has occurred.
68    Error,
69    /// The subscription has been shut down.
70    Shutdown,
71}
72
73/// A subscription to a replication stream.
74///
75/// Manages the lifecycle of subscribing to
76/// a feeder's replication stream: connecting, receiving entries, tracking
77/// progress, and shutting down.
78pub struct Subscription {
79    /// Configuration for this subscription.
80    config: SubscriptionConfig,
81    /// Current subscription state.
82    state: Mutex<SubscriptionState>,
83    /// The most recently processed VLSN.
84    current_vlsn: Mutex<u64>,
85    /// Total number of entries received.
86    entries_received: AtomicU64,
87    /// Whether shutdown has been requested.
88    shutdown: AtomicBool,
89    /// The live TCP connection to the feeder node.
90    ///
91    /// Which calls
92    /// `RepUtils.openSocket(feederAddr)` to connect to the feeder. Set to
93    /// `Some` after a successful `start()` call.
94    connection: Mutex<Option<TcpStream>>,
95}
96
97impl Subscription {
98    /// Create a new subscription with the given configuration.
99    pub fn new(config: SubscriptionConfig) -> Self {
100        let start_vlsn = config.start_vlsn;
101        Self {
102            config,
103            state: Mutex::new(SubscriptionState::Idle),
104            current_vlsn: Mutex::new(start_vlsn),
105            entries_received: AtomicU64::new(0),
106            shutdown: AtomicBool::new(false),
107            connection: Mutex::new(None),
108        }
109    }
110
111    /// Get the current subscription state.
112    pub fn get_state(&self) -> SubscriptionState {
113        *self.state.lock()
114    }
115
116    /// Get the most recently processed VLSN.
117    pub fn get_current_vlsn(&self) -> u64 {
118        *self.current_vlsn.lock()
119    }
120
121    /// Get the total number of entries received.
122    pub fn get_entries_received(&self) -> u64 {
123        self.entries_received.load(Ordering::Relaxed)
124    }
125
126    /// Get the subscription configuration.
127    pub fn get_config(&self) -> &SubscriptionConfig {
128        &self.config
129    }
130
131    /// Start the subscription by connecting to the feeder.
132    ///
133    /// Which calls
134    /// `SubscriptionThread.start()`, which in turn invokes
135    /// `RepUtils.openSocket(feederAddr)` to establish a TCP connection to the
136    /// feeder node.
137    ///
138    /// Transitions: `Idle` → `Connecting` → `Active` on success, or
139    /// `Idle` → `Connecting` → `Error` if the connection attempt fails.
140    pub fn start(&self) -> Result<()> {
141        let mut state = self.state.lock();
142        match *state {
143            SubscriptionState::Idle => {
144                *state = SubscriptionState::Connecting;
145
146                // Resolve the feeder address and open a TCP connection.
147                // equivalent: RepUtils.openSocket(InetSocketAddress(host, port))
148                let addr_str = format!(
149                    "{}:{}",
150                    self.config.feeder_host, self.config.feeder_port
151                );
152                match TcpStream::connect(&addr_str) {
153                    Ok(stream) => {
154                        *self.connection.lock() = Some(stream);
155                        *state = SubscriptionState::Active;
156                        Ok(())
157                    }
158                    Err(e) => {
159                        *state = SubscriptionState::Error;
160                        Err(RepError::SubscriptionError(format!(
161                            "failed to connect to feeder at {}: {}",
162                            addr_str, e
163                        )))
164                    }
165                }
166            }
167            SubscriptionState::Shutdown => Err(RepError::SubscriptionError(
168                "cannot start a shutdown subscription".into(),
169            )),
170            other => Err(RepError::SubscriptionError(format!(
171                "cannot start from state {:?}",
172                other
173            ))),
174        }
175    }
176
177    /// Get the live TCP connection to the feeder, if connected.
178    ///
179    /// Returns a cloned handle to the underlying `TcpStream`. Callers use
180    /// this to send/receive replication protocol messages.
181    pub fn get_connection(&self) -> Option<TcpStream> {
182        self.connection.lock().as_ref().and_then(|s| s.try_clone().ok())
183    }
184
185    /// Process an incoming replicated entry.
186    ///
187    /// Updates the current VLSN and entry count. In the full implementation,
188    /// this would also invoke the subscription callback.
189    pub fn process_entry(&self, vlsn: u64, _entry_type: u8, _data: Vec<u8>) {
190        if self.shutdown.load(Ordering::SeqCst) {
191            return;
192        }
193        *self.current_vlsn.lock() = vlsn;
194        self.entries_received.fetch_add(1, Ordering::Relaxed);
195    }
196
197    /// Mark the subscription as caught up with the master.
198    pub fn mark_caught_up(&self) {
199        let mut state = self.state.lock();
200        if *state == SubscriptionState::Active {
201            *state = SubscriptionState::CaughtUp;
202        }
203    }
204
205    /// Transition the subscription to the error state.
206    pub fn mark_error(&self) {
207        let mut state = self.state.lock();
208        if *state != SubscriptionState::Shutdown {
209            *state = SubscriptionState::Error;
210        }
211    }
212
213    /// Shutdown the subscription.
214    ///
215    /// Closes the TCP connection to the feeder (if open) and marks the
216    /// subscription as shut down.
217    /// which stops the `SubscriptionThread` and closes the feeder socket.
218    pub fn shutdown(&self) {
219        self.shutdown.store(true, Ordering::SeqCst);
220        *self.state.lock() = SubscriptionState::Shutdown;
221        // Close the TCP connection if one was established.
222        if let Some(stream) = self.connection.lock().take() {
223            let _ = stream.shutdown(std::net::Shutdown::Both);
224        }
225    }
226
227    /// Whether shutdown has been requested.
228    pub fn is_shutdown(&self) -> bool {
229        self.shutdown.load(Ordering::SeqCst)
230    }
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236    use std::net::TcpListener;
237
238    /// Create a config that points at a non-listening address (port 1).
239    /// Use only for tests that do NOT call `start()`.
240    fn test_config_no_connect() -> SubscriptionConfig {
241        SubscriptionConfig {
242            subscriber_name: "sub1".into(),
243            group_name: "group1".into(),
244            feeder_host: "127.0.0.1".into(),
245            feeder_port: 1, // nothing listening here
246            start_vlsn: 0,
247        }
248    }
249
250    /// Bind a listener on an ephemeral port and return a config + the listener.
251    /// Tests that call `start()` must use this so the TCP connect succeeds.
252    fn test_config_with_listener() -> (SubscriptionConfig, TcpListener) {
253        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
254        let port = listener.local_addr().unwrap().port();
255        let config = SubscriptionConfig {
256            subscriber_name: "sub1".into(),
257            group_name: "group1".into(),
258            feeder_host: "127.0.0.1".into(),
259            feeder_port: port,
260            start_vlsn: 0,
261        };
262        (config, listener)
263    }
264
265    #[test]
266    fn test_initial_state() {
267        let sub = Subscription::new(test_config_no_connect());
268        assert_eq!(sub.get_state(), SubscriptionState::Idle);
269        assert_eq!(sub.get_current_vlsn(), 0);
270        assert_eq!(sub.get_entries_received(), 0);
271        assert!(!sub.is_shutdown());
272    }
273
274    #[test]
275    fn test_start() {
276        let (config, _listener) = test_config_with_listener();
277        let sub = Subscription::new(config);
278        sub.start().unwrap();
279        assert_eq!(sub.get_state(), SubscriptionState::Active);
280        // A connection must have been established.
281        assert!(sub.get_connection().is_some());
282    }
283
284    #[test]
285    fn test_start_fails_when_no_listener() {
286        // Port 1 is not listening — start() must transition to Error and
287        // return Err.
288        let sub = Subscription::new(test_config_no_connect());
289        let result = sub.start();
290        assert!(result.is_err());
291        assert_eq!(sub.get_state(), SubscriptionState::Error);
292    }
293
294    #[test]
295    fn test_start_from_active_fails() {
296        let (config, _listener) = test_config_with_listener();
297        let sub = Subscription::new(config);
298        sub.start().unwrap();
299        let result = sub.start();
300        assert!(result.is_err());
301    }
302
303    #[test]
304    fn test_start_after_shutdown_fails() {
305        let sub = Subscription::new(test_config_no_connect());
306        sub.shutdown();
307        let result = sub.start();
308        assert!(result.is_err());
309    }
310
311    #[test]
312    fn test_process_entries() {
313        let (config, _listener) = test_config_with_listener();
314        let sub = Subscription::new(config);
315        sub.start().unwrap();
316
317        sub.process_entry(1, 1, vec![0x01]);
318        sub.process_entry(2, 1, vec![0x02]);
319        sub.process_entry(3, 2, vec![0x03]);
320
321        assert_eq!(sub.get_current_vlsn(), 3);
322        assert_eq!(sub.get_entries_received(), 3);
323    }
324
325    #[test]
326    fn test_process_entry_after_shutdown_ignored() {
327        let (config, _listener) = test_config_with_listener();
328        let sub = Subscription::new(config);
329        sub.start().unwrap();
330        sub.process_entry(1, 1, vec![0x01]);
331
332        sub.shutdown();
333        sub.process_entry(2, 1, vec![0x02]);
334
335        // VLSN should not advance after shutdown.
336        assert_eq!(sub.get_current_vlsn(), 1);
337        // But the atomic counter was already incremented for entry 1.
338        assert_eq!(sub.get_entries_received(), 1);
339    }
340
341    #[test]
342    fn test_mark_caught_up() {
343        let (config, _listener) = test_config_with_listener();
344        let sub = Subscription::new(config);
345        sub.start().unwrap();
346        assert_eq!(sub.get_state(), SubscriptionState::Active);
347
348        sub.mark_caught_up();
349        assert_eq!(sub.get_state(), SubscriptionState::CaughtUp);
350    }
351
352    #[test]
353    fn test_mark_caught_up_from_idle_no_change() {
354        let sub = Subscription::new(test_config_no_connect());
355        sub.mark_caught_up();
356        // Should still be Idle since mark_caught_up only works from Active.
357        assert_eq!(sub.get_state(), SubscriptionState::Idle);
358    }
359
360    #[test]
361    fn test_mark_error() {
362        let (config, _listener) = test_config_with_listener();
363        let sub = Subscription::new(config);
364        sub.start().unwrap();
365        sub.mark_error();
366        assert_eq!(sub.get_state(), SubscriptionState::Error);
367    }
368
369    #[test]
370    fn test_mark_error_after_shutdown_no_change() {
371        let sub = Subscription::new(test_config_no_connect());
372        sub.shutdown();
373        sub.mark_error();
374        // Shutdown is terminal, should not change to Error.
375        assert_eq!(sub.get_state(), SubscriptionState::Shutdown);
376    }
377
378    #[test]
379    fn test_shutdown() {
380        let (config, _listener) = test_config_with_listener();
381        let sub = Subscription::new(config);
382        sub.start().unwrap();
383        assert!(!sub.is_shutdown());
384
385        sub.shutdown();
386        assert!(sub.is_shutdown());
387        assert_eq!(sub.get_state(), SubscriptionState::Shutdown);
388        // Connection must have been closed.
389        assert!(sub.get_connection().is_none());
390    }
391
392    #[test]
393    fn test_config_accessor() {
394        let config = test_config_no_connect();
395        let sub = Subscription::new(config);
396        assert_eq!(sub.get_config().subscriber_name, "sub1");
397        assert_eq!(sub.get_config().group_name, "group1");
398        assert_eq!(sub.get_config().feeder_host, "127.0.0.1");
399        assert_eq!(sub.get_config().feeder_port, 1);
400    }
401
402    #[test]
403    fn test_start_vlsn_nonzero() {
404        let mut config = test_config_no_connect();
405        config.start_vlsn = 42;
406        let sub = Subscription::new(config);
407        assert_eq!(sub.get_current_vlsn(), 42);
408    }
409
410    #[test]
411    fn test_full_lifecycle() {
412        let (config, _listener) = test_config_with_listener();
413        let sub = Subscription::new(config);
414
415        // Idle -> Active (via real TCP connect)
416        assert_eq!(sub.get_state(), SubscriptionState::Idle);
417        sub.start().unwrap();
418        assert_eq!(sub.get_state(), SubscriptionState::Active);
419        assert!(sub.get_connection().is_some());
420
421        // Process entries
422        for i in 1..=10 {
423            sub.process_entry(i, 1, vec![i as u8]);
424        }
425        assert_eq!(sub.get_current_vlsn(), 10);
426        assert_eq!(sub.get_entries_received(), 10);
427
428        // Caught up
429        sub.mark_caught_up();
430        assert_eq!(sub.get_state(), SubscriptionState::CaughtUp);
431
432        // Shutdown — also closes the TCP connection
433        sub.shutdown();
434        assert_eq!(sub.get_state(), SubscriptionState::Shutdown);
435        assert!(sub.is_shutdown());
436        assert!(sub.get_connection().is_none());
437    }
438}