Skip to main content

noxu_rep/
master_transfer.rs

1//! Master transfer support.
2//!
3//! allows transferring
4//! master status from the current master to a designated target node.
5
6use noxu_sync::Mutex;
7use std::time::{Duration, Instant};
8
9use crate::error::{RepError, Result};
10
11/// Configuration for transferring master status to another node.
12#[derive(Debug, Clone)]
13pub struct MasterTransferConfig {
14    /// Name of the target node to become master.
15    pub target_node: String,
16    /// Maximum time to wait for the transfer to complete.
17    pub timeout: Duration,
18    /// Whether to force the transfer even if the target is behind.
19    pub force: bool,
20}
21
22impl MasterTransferConfig {
23    /// Creates a new master transfer configuration.
24    pub fn new(target_node: String, timeout: Duration) -> Self {
25        Self { target_node, timeout, force: false }
26    }
27
28    /// Set whether to force the transfer.
29    pub fn with_force(mut self, force: bool) -> Self {
30        self.force = force;
31        self
32    }
33}
34
35/// State of an in-progress master transfer.
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum TransferState {
38    /// Transfer has not been started.
39    NotStarted,
40    /// Transfer is in progress.
41    InProgress,
42    /// Transfer completed successfully.
43    Completed,
44    /// Transfer failed.
45    Failed,
46    /// Transfer timed out.
47    TimedOut,
48}
49
50/// Manages a master transfer operation.
51pub struct MasterTransfer {
52    config: MasterTransferConfig,
53    state: Mutex<TransferState>,
54    start_time: Mutex<Option<Instant>>,
55}
56
57impl MasterTransfer {
58    /// Creates a new master transfer.
59    pub fn new(config: MasterTransferConfig) -> Self {
60        Self {
61            config,
62            state: Mutex::new(TransferState::NotStarted),
63            start_time: Mutex::new(None),
64        }
65    }
66
67    /// Get the current transfer state.
68    pub fn get_state(&self) -> TransferState {
69        *self.state.lock()
70    }
71
72    /// Get the target node name.
73    pub fn get_target(&self) -> &str {
74        &self.config.target_node
75    }
76
77    /// Get the transfer configuration.
78    pub fn get_config(&self) -> &MasterTransferConfig {
79        &self.config
80    }
81
82    /// Start the transfer.
83    pub fn start(&self) -> Result<()> {
84        let mut state = self.state.lock();
85        if *state != TransferState::NotStarted {
86            return Err(RepError::StateError(
87                "Transfer already started".to_string(),
88            ));
89        }
90        *state = TransferState::InProgress;
91        *self.start_time.lock() = Some(Instant::now());
92        Ok(())
93    }
94
95    /// Mark the transfer as completed.
96    pub fn complete(&self) -> Result<()> {
97        let mut state = self.state.lock();
98        if *state != TransferState::InProgress {
99            return Err(RepError::StateError(
100                "Transfer not in progress".to_string(),
101            ));
102        }
103        *state = TransferState::Completed;
104        Ok(())
105    }
106
107    /// Mark the transfer as failed.
108    pub fn fail(&self, _reason: &str) -> Result<()> {
109        let mut state = self.state.lock();
110        if *state != TransferState::InProgress {
111            return Err(RepError::StateError(
112                "Transfer not in progress".to_string(),
113            ));
114        }
115        *state = TransferState::Failed;
116        Ok(())
117    }
118
119    /// Check if the transfer has timed out.
120    pub fn is_timed_out(&self) -> bool {
121        if let Some(start) = *self.start_time.lock() {
122            start.elapsed() > self.config.timeout
123        } else {
124            false
125        }
126    }
127
128    /// Get the elapsed time since the transfer started.
129    pub fn elapsed(&self) -> Option<Duration> {
130        self.start_time.lock().map(|t| t.elapsed())
131    }
132}
133
134#[cfg(test)]
135mod tests {
136    use super::*;
137
138    fn test_config() -> MasterTransferConfig {
139        MasterTransferConfig::new("node2".to_string(), Duration::from_secs(30))
140    }
141
142    #[test]
143    fn test_config_builder() {
144        let config = MasterTransferConfig::new(
145            "node2".to_string(),
146            Duration::from_secs(30),
147        )
148        .with_force(true);
149        assert_eq!(config.target_node, "node2");
150        assert_eq!(config.timeout, Duration::from_secs(30));
151        assert!(config.force);
152    }
153
154    #[test]
155    fn test_initial_state() {
156        let transfer = MasterTransfer::new(test_config());
157        assert_eq!(transfer.get_state(), TransferState::NotStarted);
158        assert_eq!(transfer.get_target(), "node2");
159    }
160
161    #[test]
162    fn test_start() {
163        let transfer = MasterTransfer::new(test_config());
164        assert!(transfer.start().is_ok());
165        assert_eq!(transfer.get_state(), TransferState::InProgress);
166        assert!(transfer.elapsed().is_some());
167    }
168
169    #[test]
170    fn test_start_twice_fails() {
171        let transfer = MasterTransfer::new(test_config());
172        transfer.start().unwrap();
173        assert!(transfer.start().is_err());
174    }
175
176    #[test]
177    fn test_complete() {
178        let transfer = MasterTransfer::new(test_config());
179        transfer.start().unwrap();
180        assert!(transfer.complete().is_ok());
181        assert_eq!(transfer.get_state(), TransferState::Completed);
182    }
183
184    #[test]
185    fn test_complete_without_start_fails() {
186        let transfer = MasterTransfer::new(test_config());
187        assert!(transfer.complete().is_err());
188    }
189
190    #[test]
191    fn test_fail() {
192        let transfer = MasterTransfer::new(test_config());
193        transfer.start().unwrap();
194        assert!(transfer.fail("test reason").is_ok());
195        assert_eq!(transfer.get_state(), TransferState::Failed);
196    }
197
198    #[test]
199    fn test_timeout_not_started() {
200        let transfer = MasterTransfer::new(test_config());
201        assert!(!transfer.is_timed_out());
202    }
203
204    #[test]
205    fn test_timeout_short() {
206        let config = MasterTransferConfig::new(
207            "node2".to_string(),
208            Duration::from_millis(1),
209        );
210        let transfer = MasterTransfer::new(config);
211        transfer.start().unwrap();
212        std::thread::sleep(Duration::from_millis(5));
213        assert!(transfer.is_timed_out());
214    }
215
216    #[test]
217    fn test_timeout_long() {
218        let config = MasterTransferConfig::new(
219            "node2".to_string(),
220            Duration::from_secs(60),
221        );
222        let transfer = MasterTransfer::new(config);
223        transfer.start().unwrap();
224        assert!(!transfer.is_timed_out());
225    }
226}