Skip to main content

noxu_rep/
network_restore_server.rs

1//! Server-side network restore: stream log files to a requesting node.
2//!
3//! The restore server accepts TCP connections from nodes running
4//! `NetworkRestore::execute()`. On each connection it:
5//!
6//! 1. Reads the 4-byte restore magic (`NRST` = `0x4E52_5354`).
7//! 2. Lists all `.ndb` files in `env_home`, sorted by name.
8//! 3. Writes `[file_count: u32 LE]`.
9//! 4. For each file: writes `[name_len: u16 LE][name bytes][file_size: u64
10//!    LE][file bytes]` in 64 KiB chunks.
11//!
12//! Two modes are available:
13//!
14//! - **Standalone**: call `NetworkRestoreServer::start(addr)` to bind a
15//!   dedicated `TcpListener` and serve all incoming connections in the
16//!   background.
17//! - **Dispatcher-integrated**: register `NetworkRestoreServer` as a
18//!   `ServiceHandler` named `"RESTORE"` with a `TcpServiceDispatcher`.
19//!   The service dispatcher handles TCP negotiation; the handler receives a
20//!   pre-opened channel through which the RESTORE protocol runs.
21
22use std::io::{Read as IoRead, Write as IoWrite};
23use std::net::{SocketAddr, TcpListener};
24use std::path::PathBuf;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicBool, Ordering};
27use std::thread;
28
29use crate::error::{RepError, Result};
30use crate::net::channel::Channel;
31use crate::net::service_dispatcher::ServiceHandler;
32
33/// `0x4E52_5354` — the four bytes `NRST` as a little-endian u32.
34const RESTORE_MAGIC: u32 = 0x4E52_5354;
35
36/// The service name used when registered with `TcpServiceDispatcher`.
37pub const RESTORE_SERVICE_NAME: &str = "RESTORE";
38
39/// Serves log files to nodes performing a network restore.
40///
41/// Implements both standalone TCP serving and the `ServiceHandler` trait so
42/// it can be plugged into a `TcpServiceDispatcher`.
43pub struct NetworkRestoreServer {
44    /// Directory containing `.ndb` log files to serve.
45    env_home: PathBuf,
46    /// Running flag; used to stop the accept loop.
47    running: Arc<AtomicBool>,
48}
49
50impl NetworkRestoreServer {
51    /// Create a new restore server that will serve files from `env_home`.
52    pub fn new(env_home: impl Into<PathBuf>) -> Self {
53        Self {
54            env_home: env_home.into(),
55            running: Arc::new(AtomicBool::new(false)),
56        }
57    }
58
59    /// Wrap in `Arc` so the same instance can be shared between the accept
60    /// loop thread and the `ServiceHandler` registration.
61    pub fn into_arc(self) -> Arc<Self> {
62        Arc::new(self)
63    }
64
65    /// Stop the standalone accept loop (if one was started).
66    pub fn stop(&self) {
67        self.running.store(false, Ordering::SeqCst);
68    }
69
70    /// Whether the standalone accept loop is running.
71    pub fn is_running(&self) -> bool {
72        self.running.load(Ordering::SeqCst)
73    }
74
75    /// Start a dedicated TCP accept loop on `addr`.
76    ///
77    /// Returns the actual bound address (useful when `addr` has port 0).
78    /// Connections are handled in per-connection threads.
79    pub fn start(self: &Arc<Self>, addr: SocketAddr) -> Result<SocketAddr> {
80        let listener = TcpListener::bind(addr)
81            .map_err(|e| RepError::NetworkError(e.to_string()))?;
82        let bound = listener
83            .local_addr()
84            .map_err(|e| RepError::NetworkError(e.to_string()))?;
85
86        self.running.store(true, Ordering::SeqCst);
87
88        let server = Arc::clone(self);
89        thread::spawn(move || {
90            while server.running.load(Ordering::SeqCst) {
91                match listener.accept() {
92                    Ok((stream, _peer)) => {
93                        let srv = Arc::clone(&server);
94                        thread::spawn(move || {
95                            let _ = srv.serve_raw(stream);
96                        });
97                    }
98                    Err(_) => break,
99                }
100            }
101            server.running.store(false, Ordering::SeqCst);
102        });
103
104        Ok(bound)
105    }
106
107    /// Serve a single raw `TcpStream` connection using the RESTORE protocol.
108    ///
109    /// Called by the standalone accept loop.
110    fn serve_raw(&self, mut stream: std::net::TcpStream) -> Result<()> {
111        // Read and validate magic.
112        let mut magic_buf = [0u8; 4];
113        stream.read_exact(&mut magic_buf).map_err(|e| {
114            RepError::NetworkRestoreError(format!("reading magic: {}", e))
115        })?;
116        let magic = u32::from_le_bytes(magic_buf);
117        if magic != RESTORE_MAGIC {
118            return Err(RepError::NetworkRestoreError(format!(
119                "bad restore magic: 0x{:08X}",
120                magic
121            )));
122        }
123
124        self.send_files_to(&mut stream)
125    }
126
127    /// Core file-transfer logic: enumerate `.ndb` files, send count, then
128    /// stream each file's name + size + bytes to `out`.
129    ///
130    /// Used by both `serve_raw` and the `ServiceHandler::handle` path.
131    fn send_files_to<W: IoRead + IoWrite>(&self, out: &mut W) -> Result<()> {
132        // Enumerate all .ndb files in env_home, sorted by name.
133        let mut files: Vec<(String, PathBuf)> =
134            std::fs::read_dir(&self.env_home)
135                .map_err(|e| {
136                    RepError::NetworkRestoreError(format!(
137                        "cannot read env_home {}: {}",
138                        self.env_home.display(),
139                        e
140                    ))
141                })?
142                .filter_map(|entry| {
143                    let entry = entry.ok()?;
144                    let path = entry.path();
145                    if path.extension()?.to_str()? == "ndb" {
146                        let name = path.file_name()?.to_str()?.to_string();
147                        Some((name, path))
148                    } else {
149                        None
150                    }
151                })
152                .collect();
153        files.sort_by(|a, b| a.0.cmp(&b.0));
154
155        // Send file count.
156        let count = files.len() as u32;
157        out.write_all(&count.to_le_bytes()).map_err(|e| {
158            RepError::NetworkRestoreError(format!("writing file count: {}", e))
159        })?;
160
161        let mut chunk = vec![0u8; 65536];
162
163        for (name, path) in &files {
164            // Verify name fits in a u16 length prefix.
165            let name_bytes = name.as_bytes();
166            if name_bytes.len() > u16::MAX as usize {
167                return Err(RepError::NetworkRestoreError(format!(
168                    "filename too long: {}",
169                    name
170                )));
171            }
172
173            let name_len = name_bytes.len() as u16;
174            out.write_all(&name_len.to_le_bytes()).map_err(|e| {
175                RepError::NetworkRestoreError(format!(
176                    "writing name_len for '{}': {}",
177                    name, e
178                ))
179            })?;
180            out.write_all(name_bytes).map_err(|e| {
181                RepError::NetworkRestoreError(format!(
182                    "writing filename '{}': {}",
183                    name, e
184                ))
185            })?;
186
187            // File size.
188            let metadata = std::fs::metadata(path).map_err(|e| {
189                RepError::NetworkRestoreError(format!(
190                    "stat '{}': {}",
191                    path.display(),
192                    e
193                ))
194            })?;
195            let file_size = metadata.len();
196            out.write_all(&file_size.to_le_bytes()).map_err(|e| {
197                RepError::NetworkRestoreError(format!(
198                    "writing size for '{}': {}",
199                    name, e
200                ))
201            })?;
202
203            // Stream file data in 64 KiB chunks.
204            let mut file = std::fs::File::open(path).map_err(|e| {
205                RepError::NetworkRestoreError(format!(
206                    "open '{}': {}",
207                    path.display(),
208                    e
209                ))
210            })?;
211            let mut remaining = file_size as usize;
212            // D10: compute a CRC32 over the file bytes and send it as a 4-byte
213            // trailer after the data, so the client can detect truncation or
214            // corruption in transit (JE NetworkBackup sends a MessageDigest
215            // with FileEnd; we use the project-wide CRC32 from crc32fast).
216            let mut digest = crc32fast::Hasher::new();
217            while remaining > 0 {
218                let to_read = remaining.min(chunk.len());
219                let n = file.read(&mut chunk[..to_read]).map_err(|e| {
220                    RepError::NetworkRestoreError(format!(
221                        "reading '{}': {}",
222                        path.display(),
223                        e
224                    ))
225                })?;
226                if n == 0 {
227                    break; // Unexpected EOF — file may have been truncated.
228                }
229                digest.update(&chunk[..n]);
230                out.write_all(&chunk[..n]).map_err(|e| {
231                    RepError::NetworkRestoreError(format!(
232                        "sending data for '{}': {}",
233                        name, e
234                    ))
235                })?;
236                remaining -= n;
237            }
238            // Send the CRC32 trailer.
239            out.write_all(&digest.finalize().to_le_bytes()).map_err(|e| {
240                RepError::NetworkRestoreError(format!(
241                    "sending digest for '{}': {}",
242                    name, e
243                ))
244            })?;
245
246            log::debug!(
247                "NetworkRestoreServer: sent '{}' ({} bytes)",
248                name,
249                file_size
250            );
251        }
252
253        out.flush().map_err(|e| {
254            RepError::NetworkRestoreError(format!("flushing output: {}", e))
255        })?;
256
257        Ok(())
258    }
259}
260
261// ---------------------------------------------------------------------------
262// ServiceHandler implementation
263// ---------------------------------------------------------------------------
264
265/// `NetworkRestoreServer` can be registered with `TcpServiceDispatcher` under
266/// the `"RESTORE"` service name. The service dispatcher reads the service name
267/// from each new connection before calling `handle()`; the channel passed here
268/// is ready for the RESTORE protocol (magic bytes onward).
269impl ServiceHandler for NetworkRestoreServer {
270    fn service_name(&self) -> &str {
271        RESTORE_SERVICE_NAME
272    }
273
274    fn handle(&self, channel: Box<dyn Channel>) -> Result<()> {
275        // Read the RESTORE magic through the channel.
276        use std::time::Duration;
277
278        let magic_bytes =
279            channel.receive(Duration::from_secs(30))?.ok_or_else(|| {
280                RepError::NetworkRestoreError(
281                    "no magic bytes received on RESTORE channel".into(),
282                )
283            })?;
284        if magic_bytes.len() < 4 {
285            return Err(RepError::NetworkRestoreError(format!(
286                "short magic: {} bytes",
287                magic_bytes.len()
288            )));
289        }
290        let magic = u32::from_le_bytes([
291            magic_bytes[0],
292            magic_bytes[1],
293            magic_bytes[2],
294            magic_bytes[3],
295        ]);
296        if magic != RESTORE_MAGIC {
297            return Err(RepError::NetworkRestoreError(format!(
298                "bad restore magic: 0x{:08X}",
299                magic
300            )));
301        }
302
303        // Build file list and send via the channel's framing.
304        let mut files: Vec<(String, PathBuf)> =
305            std::fs::read_dir(&self.env_home)
306                .map_err(|e| {
307                    RepError::NetworkRestoreError(format!(
308                        "read_dir {}: {}",
309                        self.env_home.display(),
310                        e
311                    ))
312                })?
313                .filter_map(|entry| {
314                    let entry = entry.ok()?;
315                    let path = entry.path();
316                    if path.extension()?.to_str()? == "ndb" {
317                        let name = path.file_name()?.to_str()?.to_string();
318                        Some((name, path))
319                    } else {
320                        None
321                    }
322                })
323                .collect();
324        files.sort_by(|a, b| a.0.cmp(&b.0));
325
326        // Send a single framed message containing the entire restore payload.
327        // The payload uses the same wire layout as the raw-TCP path so the
328        // client's `execute()` can work regardless of transport.
329        let mut payload: Vec<u8> = Vec::new();
330        let count = files.len() as u32;
331        payload.extend_from_slice(&count.to_le_bytes());
332
333        let mut chunk = vec![0u8; 65536];
334        for (name, path) in &files {
335            let name_bytes = name.as_bytes();
336            let name_len = name_bytes.len() as u16;
337            payload.extend_from_slice(&name_len.to_le_bytes());
338            payload.extend_from_slice(name_bytes);
339
340            let metadata = std::fs::metadata(path).map_err(|e| {
341                RepError::NetworkRestoreError(format!(
342                    "stat '{}': {}",
343                    path.display(),
344                    e
345                ))
346            })?;
347            let file_size = metadata.len();
348            payload.extend_from_slice(&file_size.to_le_bytes());
349
350            let mut file = std::fs::File::open(path).map_err(|e| {
351                RepError::NetworkRestoreError(format!(
352                    "open '{}': {}",
353                    path.display(),
354                    e
355                ))
356            })?;
357            let mut remaining = file_size as usize;
358            // D10: append a CRC32 trailer per file (same layout as the
359            // raw-TCP send_files_to path) so execute_via_dispatcher can verify
360            // integrity before accepting the file.
361            let mut digest = crc32fast::Hasher::new();
362            while remaining > 0 {
363                let to_read = remaining.min(chunk.len());
364                let n = file.read(&mut chunk[..to_read]).map_err(|e| {
365                    RepError::NetworkRestoreError(format!(
366                        "reading '{}': {}",
367                        path.display(),
368                        e
369                    ))
370                })?;
371                if n == 0 {
372                    break;
373                }
374                digest.update(&chunk[..n]);
375                payload.extend_from_slice(&chunk[..n]);
376                remaining -= n;
377            }
378            payload.extend_from_slice(&digest.finalize().to_le_bytes());
379        }
380
381        channel.send(&payload)?;
382        Ok(())
383    }
384}
385
386#[cfg(test)]
387mod tests {
388    use super::*;
389    use std::io::Write;
390    use std::time::Duration;
391    use tempfile::TempDir;
392
393    use crate::network_restore::{NetworkRestore, NetworkRestoreConfig};
394
395    /// Create a temp env_home with some synthetic .ndb files.
396    fn make_env_home(files: &[(&str, &[u8])]) -> TempDir {
397        let dir = tempfile::tempdir().expect("temp dir");
398        for (name, data) in files {
399            let mut f =
400                std::fs::File::create(dir.path().join(name)).expect("create");
401            f.write_all(data).expect("write");
402        }
403        dir
404    }
405
406    // -----------------------------------------------------------------------
407    // Standalone TCP server tests
408    // -----------------------------------------------------------------------
409
410    #[test]
411    fn test_server_starts_and_stops() {
412        let dir = make_env_home(&[]);
413        let server = Arc::new(NetworkRestoreServer::new(dir.path()));
414        let _addr = server.start("127.0.0.1:0".parse().unwrap()).unwrap();
415        assert!(server.is_running());
416        server.stop();
417        std::thread::sleep(Duration::from_millis(50));
418        assert!(!server.is_running());
419    }
420
421    #[test]
422    fn test_restore_empty_env_home() {
423        let dir = make_env_home(&[]);
424        let server = Arc::new(NetworkRestoreServer::new(dir.path()));
425        let bound = server.start("127.0.0.1:0".parse().unwrap()).unwrap();
426        std::thread::sleep(Duration::from_millis(20));
427
428        let restore_dir = tempfile::tempdir().expect("restore dir");
429        let config = NetworkRestoreConfig {
430            source_node: "test".to_string(),
431            source_host: "127.0.0.1".to_string(),
432            source_port: bound.port(),
433            retain_log_files: false,
434        };
435        let restore =
436            NetworkRestore::new(config).with_local_dir(restore_dir.path());
437        restore.execute().expect("empty restore should succeed");
438
439        let received: Vec<_> = std::fs::read_dir(restore_dir.path())
440            .unwrap()
441            .filter_map(|e| e.ok())
442            .collect();
443        assert_eq!(received.len(), 0);
444        server.stop();
445    }
446
447    #[test]
448    fn test_restore_digest_detects_corruption() {
449        // D10: send a file into an in-memory buffer via send_files_to, which
450        // appends a CRC32 trailer; flip one data byte; confirm the recomputed
451        // CRC over the corrupted body no longer matches the trailer (the exact
452        // condition the client verify rejects on).
453        use std::io::Cursor;
454        let content = b"the quick brown fox jumps over the lazy dog";
455        let dir = make_env_home(&[("00000001.ndb", content)]);
456        let server = NetworkRestoreServer::new(dir.path());
457        let mut buf = Cursor::new(Vec::new());
458        server.send_files_to(&mut buf).expect("send into buffer");
459        let mut wire = buf.into_inner();
460
461        // Locate the file body: skip count(4) + name_len(4) + name + size(8).
462        // count
463        let mut off = 4usize;
464        let name_len =
465            u16::from_le_bytes(wire[off..off + 2].try_into().unwrap()) as usize;
466        off += 2 + name_len;
467        let file_size =
468            u64::from_le_bytes(wire[off..off + 8].try_into().unwrap()) as usize;
469        off += 8;
470        let body_start = off;
471        let trailer_start = body_start + file_size;
472
473        // The trailer must match the clean body.
474        let want = u32::from_le_bytes(
475            wire[trailer_start..trailer_start + 4].try_into().unwrap(),
476        );
477        assert_eq!(want, crc32fast::hash(&wire[body_start..trailer_start]));
478
479        // Flip one body byte; the CRC must now mismatch (client rejects).
480        wire[body_start] ^= 0xFF;
481        let got = crc32fast::hash(&wire[body_start..trailer_start]);
482        assert_ne!(
483            want, got,
484            "D10: corrupted body must fail the CRC32 trailer check"
485        );
486    }
487
488    #[test]
489    fn test_restore_single_file() {
490        let content = b"log file content for testing";
491        let dir = make_env_home(&[("00000001.ndb", content)]);
492        let server = Arc::new(NetworkRestoreServer::new(dir.path()));
493        let bound = server.start("127.0.0.1:0".parse().unwrap()).unwrap();
494        std::thread::sleep(Duration::from_millis(20));
495
496        let restore_dir = tempfile::tempdir().expect("restore dir");
497        let config = NetworkRestoreConfig {
498            source_node: "node1".to_string(),
499            source_host: "127.0.0.1".to_string(),
500            source_port: bound.port(),
501            retain_log_files: false,
502        };
503        let restore =
504            NetworkRestore::new(config).with_local_dir(restore_dir.path());
505        restore.execute().expect("single-file restore");
506
507        let received = std::fs::read(restore_dir.path().join("00000001.ndb"))
508            .expect("received file");
509        assert_eq!(&received, content);
510        server.stop();
511    }
512
513    #[test]
514    fn test_restore_multiple_files() {
515        let file_data: Vec<(&str, Vec<u8>)> = (0u32..5)
516            .map(|i| {
517                let name: &'static str =
518                    Box::leak(format!("{:08}.ndb", i).into_boxed_str());
519                let data = vec![(i & 0xFF) as u8; 1024 * (i as usize + 1)];
520                (name, data)
521            })
522            .collect();
523
524        let file_refs: Vec<(&str, &[u8])> =
525            file_data.iter().map(|(n, d)| (*n, d.as_slice())).collect();
526        let dir = make_env_home(&file_refs);
527        let server = Arc::new(NetworkRestoreServer::new(dir.path()));
528        let bound = server.start("127.0.0.1:0".parse().unwrap()).unwrap();
529        std::thread::sleep(Duration::from_millis(20));
530
531        let restore_dir = tempfile::tempdir().expect("restore dir");
532        let config = NetworkRestoreConfig {
533            source_node: "node1".to_string(),
534            source_host: "127.0.0.1".to_string(),
535            source_port: bound.port(),
536            retain_log_files: false,
537        };
538        let restore =
539            NetworkRestore::new(config).with_local_dir(restore_dir.path());
540        restore.execute().expect("multi-file restore");
541
542        for (name, expected) in &file_data {
543            let got = std::fs::read(restore_dir.path().join(name)).expect(name);
544            assert_eq!(&got, expected, "file {} mismatch", name);
545        }
546        server.stop();
547    }
548
549    #[test]
550    fn test_restore_non_ndb_files_not_sent() {
551        // Only .ndb files should be transferred.
552        let dir = make_env_home(&[
553            ("00000001.ndb", b"log data"),
554            ("noxu.config.csv", b"config"),
555            ("README.txt", b"readme"),
556        ]);
557        let server = Arc::new(NetworkRestoreServer::new(dir.path()));
558        let bound = server.start("127.0.0.1:0".parse().unwrap()).unwrap();
559        std::thread::sleep(Duration::from_millis(20));
560
561        let restore_dir = tempfile::tempdir().expect("restore dir");
562        let config = NetworkRestoreConfig {
563            source_node: "node1".to_string(),
564            source_host: "127.0.0.1".to_string(),
565            source_port: bound.port(),
566            retain_log_files: false,
567        };
568        let restore =
569            NetworkRestore::new(config).with_local_dir(restore_dir.path());
570        restore.execute().expect("restore");
571
572        // Only the .ndb file should appear.
573        let mut names: Vec<String> = std::fs::read_dir(restore_dir.path())
574            .unwrap()
575            .filter_map(|e| e.ok())
576            .map(|e| e.file_name().to_string_lossy().to_string())
577            .collect();
578        names.sort();
579        assert_eq!(names, vec!["00000001.ndb"]);
580        server.stop();
581    }
582
583    #[test]
584    fn test_restore_retain_log_files() {
585        let original = b"original content";
586        let updated = b"new content from restore";
587
588        let src_dir = make_env_home(&[("00000001.ndb", updated)]);
589        let server = Arc::new(NetworkRestoreServer::new(src_dir.path()));
590        let bound = server.start("127.0.0.1:0".parse().unwrap()).unwrap();
591        std::thread::sleep(Duration::from_millis(20));
592
593        // Pre-populate the destination with the original file.
594        let restore_dir = tempfile::tempdir().expect("restore dir");
595        std::fs::write(restore_dir.path().join("00000001.ndb"), original)
596            .expect("pre-populate");
597
598        let config = NetworkRestoreConfig {
599            source_node: "node1".to_string(),
600            source_host: "127.0.0.1".to_string(),
601            source_port: bound.port(),
602            retain_log_files: true,
603        };
604        let restore =
605            NetworkRestore::new(config).with_local_dir(restore_dir.path());
606        restore.execute().expect("restore with retain");
607
608        // The restored file should contain the new data.
609        let got =
610            std::fs::read(restore_dir.path().join("00000001.ndb")).unwrap();
611        assert_eq!(&got, updated);
612
613        // The backup file should still contain the original.
614        let bak =
615            std::fs::read(restore_dir.path().join("00000001.ndb.bak")).unwrap();
616        assert_eq!(&bak, original);
617        server.stop();
618    }
619
620    #[test]
621    fn test_restore_large_file() {
622        // 200 KiB — ensures chunking through the 64 KiB buffer.
623        let large = vec![0xABu8; 200 * 1024];
624        let dir = make_env_home(&[("large.ndb", &large)]);
625        let server = Arc::new(NetworkRestoreServer::new(dir.path()));
626        let bound = server.start("127.0.0.1:0".parse().unwrap()).unwrap();
627        std::thread::sleep(Duration::from_millis(20));
628
629        let restore_dir = tempfile::tempdir().expect("restore dir");
630        let config = NetworkRestoreConfig {
631            source_node: "node1".to_string(),
632            source_host: "127.0.0.1".to_string(),
633            source_port: bound.port(),
634            retain_log_files: false,
635        };
636        let restore =
637            NetworkRestore::new(config).with_local_dir(restore_dir.path());
638        restore.execute().expect("large file restore");
639
640        let got = std::fs::read(restore_dir.path().join("large.ndb")).unwrap();
641        assert_eq!(got.len(), large.len());
642        assert_eq!(&got, &large);
643        server.stop();
644    }
645
646    #[test]
647    fn test_server_service_name() {
648        let dir = make_env_home(&[]);
649        let server = NetworkRestoreServer::new(dir.path());
650        assert_eq!(server.service_name(), RESTORE_SERVICE_NAME);
651        assert_eq!(server.service_name(), "RESTORE");
652    }
653
654    #[test]
655    fn test_restore_progress_tracking() {
656        let content = b"progress test data";
657        let dir = make_env_home(&[("00000001.ndb", content)]);
658        let server = Arc::new(NetworkRestoreServer::new(dir.path()));
659        let bound = server.start("127.0.0.1:0".parse().unwrap()).unwrap();
660        std::thread::sleep(Duration::from_millis(20));
661
662        let restore_dir = tempfile::tempdir().expect("restore dir");
663        let config = NetworkRestoreConfig {
664            source_node: "node1".to_string(),
665            source_host: "127.0.0.1".to_string(),
666            source_port: bound.port(),
667            retain_log_files: false,
668        };
669        let restore =
670            NetworkRestore::new(config).with_local_dir(restore_dir.path());
671        restore.execute().expect("restore");
672
673        let progress = restore.get_progress();
674        assert_eq!(progress.files_transferred, 1);
675        assert_eq!(progress.bytes_transferred, content.len() as u64);
676        server.stop();
677    }
678
679    // -----------------------------------------------------------------------
680    // Wire-protocol error-path coverage
681    // -----------------------------------------------------------------------
682
683    #[test]
684    fn test_into_arc_wraps_self() {
685        let dir = make_env_home(&[]);
686        let server = NetworkRestoreServer::new(dir.path());
687        let arc = server.into_arc();
688        // Arc::strong_count is 1 right after wrapping; verify the
689        // running flag is reachable and false.
690        assert!(!arc.is_running());
691        assert_eq!(Arc::strong_count(&arc), 1);
692    }
693
694    #[test]
695    fn test_serve_raw_rejects_bad_magic() {
696        // Connect to the server and send 4 bytes of garbage. The
697        // server should close the connection with an Err on its
698        // side; on the client we observe EOF / unexpected close.
699        let dir = make_env_home(&[]);
700        let server = Arc::new(NetworkRestoreServer::new(dir.path()));
701        let bound = server.start("127.0.0.1:0".parse().unwrap()).unwrap();
702        std::thread::sleep(Duration::from_millis(20));
703
704        let mut stream = std::net::TcpStream::connect(bound).unwrap();
705        stream.write_all(&[0xDE, 0xAD, 0xBE, 0xEF]).unwrap();
706
707        // Server should close the stream rather than keep talking.
708        // Read with a short timeout — expect 0 bytes (EOF) or an
709        // error.
710        stream.set_read_timeout(Some(Duration::from_millis(500))).unwrap();
711        let mut buf = [0u8; 4];
712        let r = std::io::Read::read(&mut stream, &mut buf);
713        match r {
714            Ok(0) => {} // clean EOF — server hung up
715            Ok(_n) => panic!("server replied to bad magic instead of closing"),
716            Err(_) => {} // timeout or reset — also acceptable
717        }
718        server.stop();
719    }
720
721    #[test]
722    fn test_serve_raw_short_read_on_magic() {
723        // Connect and immediately close (send no bytes). The server
724        // should fail its read_exact with a short-read error and
725        // not panic. The accept thread continues.
726        let dir = make_env_home(&[]);
727        let server = Arc::new(NetworkRestoreServer::new(dir.path()));
728        let bound = server.start("127.0.0.1:0".parse().unwrap()).unwrap();
729        std::thread::sleep(Duration::from_millis(20));
730
731        // Connect and drop immediately.
732        {
733            let _ = std::net::TcpStream::connect(bound).unwrap();
734        }
735        // Subsequent connection should still work — accept loop
736        // didn't crash.
737        std::thread::sleep(Duration::from_millis(20));
738        assert!(server.is_running());
739        server.stop();
740    }
741
742    #[test]
743    fn test_serve_raw_real_handshake_streams_files() {
744        // End-to-end: use the standalone server (start + serve_raw)
745        // to transfer one file. The existing test_restore_single_file
746        // also exercises this, but via the higher-level
747        // NetworkRestore client; here we open a raw socket and
748        // walk the wire protocol manually so the read_exact /
749        // u32::from_le_bytes paths are exercised.
750        let content = b"hello world";
751        let dir = make_env_home(&[("00000000.ndb", content)]);
752        let server = Arc::new(NetworkRestoreServer::new(dir.path()));
753        let bound = server.start("127.0.0.1:0".parse().unwrap()).unwrap();
754        std::thread::sleep(Duration::from_millis(20));
755
756        let mut stream = std::net::TcpStream::connect(bound).unwrap();
757        stream.write_all(&RESTORE_MAGIC.to_le_bytes()).unwrap();
758
759        // Read file count (u32, little-endian).
760        let mut count_buf = [0u8; 4];
761        std::io::Read::read_exact(&mut stream, &mut count_buf).unwrap();
762        let count = u32::from_le_bytes(count_buf);
763        assert_eq!(count, 1);
764
765        // Read filename length (u16) + name bytes.
766        let mut name_len_buf = [0u8; 2];
767        std::io::Read::read_exact(&mut stream, &mut name_len_buf).unwrap();
768        let name_len = u16::from_le_bytes(name_len_buf) as usize;
769        let mut name_buf = vec![0u8; name_len];
770        std::io::Read::read_exact(&mut stream, &mut name_buf).unwrap();
771        assert_eq!(&name_buf, b"00000000.ndb");
772
773        // Read file size (u64) + file bytes.
774        let mut size_buf = [0u8; 8];
775        std::io::Read::read_exact(&mut stream, &mut size_buf).unwrap();
776        let size = u64::from_le_bytes(size_buf);
777        assert_eq!(size as usize, content.len());
778
779        let mut payload = vec![0u8; size as usize];
780        std::io::Read::read_exact(&mut stream, &mut payload).unwrap();
781        assert_eq!(&payload, content);
782
783        server.stop();
784    }
785
786    #[test]
787    fn test_start_returns_error_for_unbindable_addr() {
788        let dir = make_env_home(&[]);
789        let server = Arc::new(NetworkRestoreServer::new(dir.path()));
790        // 192.0.2.1 is RFC 5737 TEST-NET-1 — guaranteed not assigned to any
791        // local interface, so bind() fails with EADDRNOTAVAIL on both Unix
792        // and Windows. (Port 1 is unreliable cross-platform: privileged on
793        // Unix, but freely bindable by unprivileged users on Windows.)
794        let r = server.start("192.0.2.1:9999".parse().unwrap());
795        assert!(
796            r.is_err(),
797            "binding to a non-local address should fail on all platforms"
798        );
799    }
800
801    #[test]
802    fn test_stop_is_idempotent() {
803        let dir = make_env_home(&[]);
804        let server = Arc::new(NetworkRestoreServer::new(dir.path()));
805        let _ = server.start("127.0.0.1:0".parse().unwrap()).unwrap();
806        server.stop();
807        server.stop();
808        std::thread::sleep(Duration::from_millis(20));
809        assert!(!server.is_running());
810    }
811
812    // -----------------------------------------------------------------------
813    // ServiceHandler::handle path (multiplexed-channel transport)
814    // -----------------------------------------------------------------------
815
816    #[test]
817    fn test_service_handler_handle_streams_via_channel() {
818        use crate::net::channel::LocalChannelPair;
819
820        let content = b"abcdef";
821        let dir = make_env_home(&[("00000005.ndb", content)]);
822        let server = NetworkRestoreServer::new(dir.path());
823
824        let pair = LocalChannelPair::new();
825        let server_channel: Box<dyn crate::net::channel::Channel> =
826            Box::new(pair.channel_a);
827        let client_channel = pair.channel_b;
828
829        // Client sends magic.
830        client_channel.send(&RESTORE_MAGIC.to_le_bytes()).unwrap();
831
832        // Server runs handle().
833        let r = server.handle(server_channel);
834        assert!(r.is_ok(), "handle returned Err: {:?}", r.err());
835
836        // Client receives the framed payload.
837        use crate::net::channel::Channel;
838        let payload = client_channel
839            .receive(Duration::from_secs(5))
840            .unwrap()
841            .expect("payload");
842
843        // Expected wire format: u32 count + (u16 name_len + name + u64 size + bytes).
844        let count = u32::from_le_bytes([
845            payload[0], payload[1], payload[2], payload[3],
846        ]);
847        assert_eq!(count, 1);
848
849        let name_len = u16::from_le_bytes([payload[4], payload[5]]) as usize;
850        assert_eq!(name_len, b"00000005.ndb".len());
851        let name = &payload[6..6 + name_len];
852        assert_eq!(name, b"00000005.ndb");
853
854        let size_off = 6 + name_len;
855        let mut size_bytes = [0u8; 8];
856        size_bytes.copy_from_slice(&payload[size_off..size_off + 8]);
857        let size = u64::from_le_bytes(size_bytes) as usize;
858        assert_eq!(size, content.len());
859
860        let data_off = size_off + 8;
861        assert_eq!(&payload[data_off..data_off + size], content);
862    }
863
864    #[test]
865    fn test_service_handler_handle_rejects_bad_magic() {
866        use crate::net::channel::LocalChannelPair;
867
868        let dir = make_env_home(&[]);
869        let server = NetworkRestoreServer::new(dir.path());
870
871        let pair = LocalChannelPair::new();
872        let server_channel: Box<dyn crate::net::channel::Channel> =
873            Box::new(pair.channel_a);
874        let client_channel = pair.channel_b;
875
876        client_channel.send(&[0xDE, 0xAD, 0xBE, 0xEF]).unwrap();
877        let r = server.handle(server_channel);
878        assert!(r.is_err(), "handle on bad magic must error");
879        let msg = format!("{}", r.err().unwrap());
880        assert!(
881            msg.contains("bad restore magic"),
882            "expected 'bad restore magic' in error, got: {msg}"
883        );
884    }
885
886    #[test]
887    fn test_service_handler_handle_rejects_short_magic() {
888        use crate::net::channel::LocalChannelPair;
889
890        let dir = make_env_home(&[]);
891        let server = NetworkRestoreServer::new(dir.path());
892
893        let pair = LocalChannelPair::new();
894        let server_channel: Box<dyn crate::net::channel::Channel> =
895            Box::new(pair.channel_a);
896        let client_channel = pair.channel_b;
897
898        client_channel.send(&[0xDE]).unwrap();
899        let r = server.handle(server_channel);
900        assert!(r.is_err(), "handle on short magic must error");
901    }
902
903    #[test]
904    fn test_service_handler_handle_no_magic_received() {
905        use crate::net::channel::LocalChannelPair;
906
907        let dir = make_env_home(&[]);
908        let server = NetworkRestoreServer::new(dir.path());
909
910        let pair = LocalChannelPair::new();
911        let server_channel: Box<dyn crate::net::channel::Channel> =
912            Box::new(pair.channel_a);
913        // Drop the client side without sending — server should fail
914        // with "no magic bytes received".
915        drop(pair.channel_b);
916        let r = server.handle(server_channel);
917        assert!(r.is_err(), "handle without magic must error");
918    }
919
920    #[test]
921    fn test_service_handler_handle_with_unreadable_env_home() {
922        use crate::net::channel::LocalChannelPair;
923
924        // Point env_home at a path that doesn't exist — read_dir
925        // fails inside handle() and we get a NetworkRestoreError.
926        let server = NetworkRestoreServer::new("/nonexistent/path/xxx");
927
928        let pair = LocalChannelPair::new();
929        let server_channel: Box<dyn crate::net::channel::Channel> =
930            Box::new(pair.channel_a);
931        let client_channel = pair.channel_b;
932
933        client_channel.send(&RESTORE_MAGIC.to_le_bytes()).unwrap();
934        let r = server.handle(server_channel);
935        assert!(r.is_err(), "unreadable env_home must error");
936    }
937}