noxu_rep/
master_transfer.rs1use noxu_sync::Mutex;
7use std::time::{Duration, Instant};
8
9use crate::error::{RepError, Result};
10
11#[derive(Debug, Clone)]
13pub struct MasterTransferConfig {
14 pub target_node: String,
16 pub timeout: Duration,
18 pub force: bool,
20}
21
22impl MasterTransferConfig {
23 pub fn new(target_node: String, timeout: Duration) -> Self {
25 Self { target_node, timeout, force: false }
26 }
27
28 pub fn with_force(mut self, force: bool) -> Self {
30 self.force = force;
31 self
32 }
33}
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum TransferState {
38 NotStarted,
40 InProgress,
42 Completed,
44 Failed,
46 TimedOut,
48}
49
50pub struct MasterTransfer {
52 config: MasterTransferConfig,
53 state: Mutex<TransferState>,
54 start_time: Mutex<Option<Instant>>,
55}
56
57impl MasterTransfer {
58 pub fn new(config: MasterTransferConfig) -> Self {
60 Self {
61 config,
62 state: Mutex::new(TransferState::NotStarted),
63 start_time: Mutex::new(None),
64 }
65 }
66
67 pub fn get_state(&self) -> TransferState {
69 *self.state.lock()
70 }
71
72 pub fn get_target(&self) -> &str {
74 &self.config.target_node
75 }
76
77 pub fn get_config(&self) -> &MasterTransferConfig {
79 &self.config
80 }
81
82 pub fn start(&self) -> Result<()> {
84 let mut state = self.state.lock();
85 if *state != TransferState::NotStarted {
86 return Err(RepError::StateError(
87 "Transfer already started".to_string(),
88 ));
89 }
90 *state = TransferState::InProgress;
91 *self.start_time.lock() = Some(Instant::now());
92 Ok(())
93 }
94
95 pub fn complete(&self) -> Result<()> {
97 let mut state = self.state.lock();
98 if *state != TransferState::InProgress {
99 return Err(RepError::StateError(
100 "Transfer not in progress".to_string(),
101 ));
102 }
103 *state = TransferState::Completed;
104 Ok(())
105 }
106
107 pub fn fail(&self, _reason: &str) -> Result<()> {
109 let mut state = self.state.lock();
110 if *state != TransferState::InProgress {
111 return Err(RepError::StateError(
112 "Transfer not in progress".to_string(),
113 ));
114 }
115 *state = TransferState::Failed;
116 Ok(())
117 }
118
119 pub fn is_timed_out(&self) -> bool {
121 if let Some(start) = *self.start_time.lock() {
122 start.elapsed() > self.config.timeout
123 } else {
124 false
125 }
126 }
127
128 pub fn elapsed(&self) -> Option<Duration> {
130 self.start_time.lock().map(|t| t.elapsed())
131 }
132}
133
134#[cfg(test)]
135mod tests {
136 use super::*;
137
138 fn test_config() -> MasterTransferConfig {
139 MasterTransferConfig::new("node2".to_string(), Duration::from_secs(30))
140 }
141
142 #[test]
143 fn test_config_builder() {
144 let config = MasterTransferConfig::new(
145 "node2".to_string(),
146 Duration::from_secs(30),
147 )
148 .with_force(true);
149 assert_eq!(config.target_node, "node2");
150 assert_eq!(config.timeout, Duration::from_secs(30));
151 assert!(config.force);
152 }
153
154 #[test]
155 fn test_initial_state() {
156 let transfer = MasterTransfer::new(test_config());
157 assert_eq!(transfer.get_state(), TransferState::NotStarted);
158 assert_eq!(transfer.get_target(), "node2");
159 }
160
161 #[test]
162 fn test_start() {
163 let transfer = MasterTransfer::new(test_config());
164 assert!(transfer.start().is_ok());
165 assert_eq!(transfer.get_state(), TransferState::InProgress);
166 assert!(transfer.elapsed().is_some());
167 }
168
169 #[test]
170 fn test_start_twice_fails() {
171 let transfer = MasterTransfer::new(test_config());
172 transfer.start().unwrap();
173 assert!(transfer.start().is_err());
174 }
175
176 #[test]
177 fn test_complete() {
178 let transfer = MasterTransfer::new(test_config());
179 transfer.start().unwrap();
180 assert!(transfer.complete().is_ok());
181 assert_eq!(transfer.get_state(), TransferState::Completed);
182 }
183
184 #[test]
185 fn test_complete_without_start_fails() {
186 let transfer = MasterTransfer::new(test_config());
187 assert!(transfer.complete().is_err());
188 }
189
190 #[test]
191 fn test_fail() {
192 let transfer = MasterTransfer::new(test_config());
193 transfer.start().unwrap();
194 assert!(transfer.fail("test reason").is_ok());
195 assert_eq!(transfer.get_state(), TransferState::Failed);
196 }
197
198 #[test]
199 fn test_timeout_not_started() {
200 let transfer = MasterTransfer::new(test_config());
201 assert!(!transfer.is_timed_out());
202 }
203
204 #[test]
205 fn test_timeout_short() {
206 let config = MasterTransferConfig::new(
207 "node2".to_string(),
208 Duration::from_millis(1),
209 );
210 let transfer = MasterTransfer::new(config);
211 transfer.start().unwrap();
212 std::thread::sleep(Duration::from_millis(5));
213 assert!(transfer.is_timed_out());
214 }
215
216 #[test]
217 fn test_timeout_long() {
218 let config = MasterTransferConfig::new(
219 "node2".to_string(),
220 Duration::from_secs(60),
221 );
222 let transfer = MasterTransfer::new(config);
223 transfer.start().unwrap();
224 assert!(!transfer.is_timed_out());
225 }
226}