Skip to main content

noxu_rep/
network_restore_server.rs

1//! Server-side network restore: stream log files to a requesting node.
2//!
3//! The restore server accepts TCP connections from nodes running
4//! `NetworkRestore::execute()`. On each connection it:
5//!
6//! 1. Reads the 4-byte restore magic (`NRST` = `0x4E52_5354`).
7//! 2. Lists all `.ndb` files in `env_home`, sorted by name.
8//! 3. Writes `[file_count: u32 LE]`.
9//! 4. For each file: writes `[name_len: u16 LE][name bytes][file_size: u64
10//!    LE][file bytes]` in 64 KiB chunks.
11//!
12//! Two modes are available:
13//!
14//! - **Standalone**: call `NetworkRestoreServer::start(addr)` to bind a
15//!   dedicated `TcpListener` and serve all incoming connections in the
16//!   background.
17//! - **Dispatcher-integrated**: register `NetworkRestoreServer` as a
18//!   `ServiceHandler` named `"RESTORE"` with a `TcpServiceDispatcher`.
19//!   The service dispatcher handles TCP negotiation; the handler receives a
20//!   pre-opened channel through which the RESTORE protocol runs.
21
22use std::io::{Read as IoRead, Write as IoWrite};
23use std::net::{SocketAddr, TcpListener};
24use std::path::PathBuf;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicBool, Ordering};
27use std::thread;
28
29use crate::error::{RepError, Result};
30use crate::net::channel::Channel;
31use crate::net::service_dispatcher::ServiceHandler;
32
33/// `0x4E52_5354` — the four bytes `NRST` as a little-endian u32.
34const RESTORE_MAGIC: u32 = 0x4E52_5354;
35
36/// The service name used when registered with `TcpServiceDispatcher`.
37pub const RESTORE_SERVICE_NAME: &str = "RESTORE";
38
39/// Serves log files to nodes performing a network restore.
40///
41/// Implements both standalone TCP serving and the `ServiceHandler` trait so
42/// it can be plugged into a `TcpServiceDispatcher`.
43pub struct NetworkRestoreServer {
44    /// Directory containing `.ndb` log files to serve.
45    env_home: PathBuf,
46    /// Running flag; used to stop the accept loop.
47    running: Arc<AtomicBool>,
48}
49
50impl NetworkRestoreServer {
51    /// Create a new restore server that will serve files from `env_home`.
52    pub fn new(env_home: impl Into<PathBuf>) -> Self {
53        Self {
54            env_home: env_home.into(),
55            running: Arc::new(AtomicBool::new(false)),
56        }
57    }
58
59    /// Wrap in `Arc` so the same instance can be shared between the accept
60    /// loop thread and the `ServiceHandler` registration.
61    pub fn into_arc(self) -> Arc<Self> {
62        Arc::new(self)
63    }
64
65    /// Stop the standalone accept loop (if one was started).
66    pub fn stop(&self) {
67        self.running.store(false, Ordering::SeqCst);
68    }
69
70    /// Whether the standalone accept loop is running.
71    pub fn is_running(&self) -> bool {
72        self.running.load(Ordering::SeqCst)
73    }
74
75    /// Start a dedicated TCP accept loop on `addr`.
76    ///
77    /// Returns the actual bound address (useful when `addr` has port 0).
78    /// Connections are handled in per-connection threads.
79    pub fn start(self: &Arc<Self>, addr: SocketAddr) -> Result<SocketAddr> {
80        let listener = TcpListener::bind(addr)
81            .map_err(|e| RepError::NetworkError(e.to_string()))?;
82        let bound = listener
83            .local_addr()
84            .map_err(|e| RepError::NetworkError(e.to_string()))?;
85
86        self.running.store(true, Ordering::SeqCst);
87
88        let server = Arc::clone(self);
89        thread::spawn(move || {
90            while server.running.load(Ordering::SeqCst) {
91                match listener.accept() {
92                    Ok((stream, _peer)) => {
93                        let srv = Arc::clone(&server);
94                        thread::spawn(move || {
95                            let _ = srv.serve_raw(stream);
96                        });
97                    }
98                    Err(_) => break,
99                }
100            }
101            server.running.store(false, Ordering::SeqCst);
102        });
103
104        Ok(bound)
105    }
106
107    /// Serve a single raw `TcpStream` connection using the RESTORE protocol.
108    ///
109    /// Called by the standalone accept loop.
110    fn serve_raw(&self, mut stream: std::net::TcpStream) -> Result<()> {
111        // Read and validate magic.
112        let mut magic_buf = [0u8; 4];
113        stream.read_exact(&mut magic_buf).map_err(|e| {
114            RepError::NetworkRestoreError(format!("reading magic: {}", e))
115        })?;
116        let magic = u32::from_le_bytes(magic_buf);
117        if magic != RESTORE_MAGIC {
118            return Err(RepError::NetworkRestoreError(format!(
119                "bad restore magic: 0x{:08X}",
120                magic
121            )));
122        }
123
124        self.send_files_to(&mut stream)
125    }
126
127    /// Core file-transfer logic: enumerate `.ndb` files, send count, then
128    /// stream each file's name + size + bytes to `out`.
129    ///
130    /// Used by both `serve_raw` and the `ServiceHandler::handle` path.
131    fn send_files_to<W: IoRead + IoWrite>(&self, out: &mut W) -> Result<()> {
132        // Enumerate all .ndb files in env_home, sorted by name.
133        let mut files: Vec<(String, PathBuf)> =
134            std::fs::read_dir(&self.env_home)
135                .map_err(|e| {
136                    RepError::NetworkRestoreError(format!(
137                        "cannot read env_home {}: {}",
138                        self.env_home.display(),
139                        e
140                    ))
141                })?
142                .filter_map(|entry| {
143                    let entry = entry.ok()?;
144                    let path = entry.path();
145                    if path.extension()?.to_str()? == "ndb" {
146                        let name = path.file_name()?.to_str()?.to_string();
147                        Some((name, path))
148                    } else {
149                        None
150                    }
151                })
152                .collect();
153        files.sort_by(|a, b| a.0.cmp(&b.0));
154
155        // Send file count.
156        let count = files.len() as u32;
157        out.write_all(&count.to_le_bytes()).map_err(|e| {
158            RepError::NetworkRestoreError(format!("writing file count: {}", e))
159        })?;
160
161        let mut chunk = vec![0u8; 65536];
162
163        for (name, path) in &files {
164            // Verify name fits in a u16 length prefix.
165            let name_bytes = name.as_bytes();
166            if name_bytes.len() > u16::MAX as usize {
167                return Err(RepError::NetworkRestoreError(format!(
168                    "filename too long: {}",
169                    name
170                )));
171            }
172
173            let name_len = name_bytes.len() as u16;
174            out.write_all(&name_len.to_le_bytes()).map_err(|e| {
175                RepError::NetworkRestoreError(format!(
176                    "writing name_len for '{}': {}",
177                    name, e
178                ))
179            })?;
180            out.write_all(name_bytes).map_err(|e| {
181                RepError::NetworkRestoreError(format!(
182                    "writing filename '{}': {}",
183                    name, e
184                ))
185            })?;
186
187            // File size.
188            let metadata = std::fs::metadata(path).map_err(|e| {
189                RepError::NetworkRestoreError(format!(
190                    "stat '{}': {}",
191                    path.display(),
192                    e
193                ))
194            })?;
195            let file_size = metadata.len();
196            out.write_all(&file_size.to_le_bytes()).map_err(|e| {
197                RepError::NetworkRestoreError(format!(
198                    "writing size for '{}': {}",
199                    name, e
200                ))
201            })?;
202
203            // Stream file data in 64 KiB chunks.
204            let mut file = std::fs::File::open(path).map_err(|e| {
205                RepError::NetworkRestoreError(format!(
206                    "open '{}': {}",
207                    path.display(),
208                    e
209                ))
210            })?;
211            let mut remaining = file_size as usize;
212            while remaining > 0 {
213                let to_read = remaining.min(chunk.len());
214                let n = file.read(&mut chunk[..to_read]).map_err(|e| {
215                    RepError::NetworkRestoreError(format!(
216                        "reading '{}': {}",
217                        path.display(),
218                        e
219                    ))
220                })?;
221                if n == 0 {
222                    break; // Unexpected EOF — file may have been truncated.
223                }
224                out.write_all(&chunk[..n]).map_err(|e| {
225                    RepError::NetworkRestoreError(format!(
226                        "sending data for '{}': {}",
227                        name, e
228                    ))
229                })?;
230                remaining -= n;
231            }
232
233            log::debug!(
234                "NetworkRestoreServer: sent '{}' ({} bytes)",
235                name,
236                file_size
237            );
238        }
239
240        out.flush().map_err(|e| {
241            RepError::NetworkRestoreError(format!("flushing output: {}", e))
242        })?;
243
244        Ok(())
245    }
246}
247
248// ---------------------------------------------------------------------------
249// ServiceHandler implementation
250// ---------------------------------------------------------------------------
251
252/// `NetworkRestoreServer` can be registered with `TcpServiceDispatcher` under
253/// the `"RESTORE"` service name. The service dispatcher reads the service name
254/// from each new connection before calling `handle()`; the channel passed here
255/// is ready for the RESTORE protocol (magic bytes onward).
256impl ServiceHandler for NetworkRestoreServer {
257    fn service_name(&self) -> &str {
258        RESTORE_SERVICE_NAME
259    }
260
261    fn handle(&self, channel: Box<dyn Channel>) -> Result<()> {
262        // Read the RESTORE magic through the channel.
263        use std::time::Duration;
264
265        let magic_bytes =
266            channel.receive(Duration::from_secs(30))?.ok_or_else(|| {
267                RepError::NetworkRestoreError(
268                    "no magic bytes received on RESTORE channel".into(),
269                )
270            })?;
271        if magic_bytes.len() < 4 {
272            return Err(RepError::NetworkRestoreError(format!(
273                "short magic: {} bytes",
274                magic_bytes.len()
275            )));
276        }
277        let magic = u32::from_le_bytes([
278            magic_bytes[0],
279            magic_bytes[1],
280            magic_bytes[2],
281            magic_bytes[3],
282        ]);
283        if magic != RESTORE_MAGIC {
284            return Err(RepError::NetworkRestoreError(format!(
285                "bad restore magic: 0x{:08X}",
286                magic
287            )));
288        }
289
290        // Build file list and send via the channel's framing.
291        let mut files: Vec<(String, PathBuf)> =
292            std::fs::read_dir(&self.env_home)
293                .map_err(|e| {
294                    RepError::NetworkRestoreError(format!(
295                        "read_dir {}: {}",
296                        self.env_home.display(),
297                        e
298                    ))
299                })?
300                .filter_map(|entry| {
301                    let entry = entry.ok()?;
302                    let path = entry.path();
303                    if path.extension()?.to_str()? == "ndb" {
304                        let name = path.file_name()?.to_str()?.to_string();
305                        Some((name, path))
306                    } else {
307                        None
308                    }
309                })
310                .collect();
311        files.sort_by(|a, b| a.0.cmp(&b.0));
312
313        // Send a single framed message containing the entire restore payload.
314        // The payload uses the same wire layout as the raw-TCP path so the
315        // client's `execute()` can work regardless of transport.
316        let mut payload: Vec<u8> = Vec::new();
317        let count = files.len() as u32;
318        payload.extend_from_slice(&count.to_le_bytes());
319
320        let mut chunk = vec![0u8; 65536];
321        for (name, path) in &files {
322            let name_bytes = name.as_bytes();
323            let name_len = name_bytes.len() as u16;
324            payload.extend_from_slice(&name_len.to_le_bytes());
325            payload.extend_from_slice(name_bytes);
326
327            let metadata = std::fs::metadata(path).map_err(|e| {
328                RepError::NetworkRestoreError(format!(
329                    "stat '{}': {}",
330                    path.display(),
331                    e
332                ))
333            })?;
334            let file_size = metadata.len();
335            payload.extend_from_slice(&file_size.to_le_bytes());
336
337            let mut file = std::fs::File::open(path).map_err(|e| {
338                RepError::NetworkRestoreError(format!(
339                    "open '{}': {}",
340                    path.display(),
341                    e
342                ))
343            })?;
344            let mut remaining = file_size as usize;
345            while remaining > 0 {
346                let to_read = remaining.min(chunk.len());
347                let n = file.read(&mut chunk[..to_read]).map_err(|e| {
348                    RepError::NetworkRestoreError(format!(
349                        "reading '{}': {}",
350                        path.display(),
351                        e
352                    ))
353                })?;
354                if n == 0 {
355                    break;
356                }
357                payload.extend_from_slice(&chunk[..n]);
358                remaining -= n;
359            }
360        }
361
362        channel.send(&payload)?;
363        Ok(())
364    }
365}
366
367#[cfg(test)]
368mod tests {
369    use super::*;
370    use std::io::Write;
371    use std::time::Duration;
372    use tempfile::TempDir;
373
374    use crate::network_restore::{NetworkRestore, NetworkRestoreConfig};
375
376    /// Create a temp env_home with some synthetic .ndb files.
377    fn make_env_home(files: &[(&str, &[u8])]) -> TempDir {
378        let dir = tempfile::tempdir().expect("temp dir");
379        for (name, data) in files {
380            let mut f =
381                std::fs::File::create(dir.path().join(name)).expect("create");
382            f.write_all(data).expect("write");
383        }
384        dir
385    }
386
387    // -----------------------------------------------------------------------
388    // Standalone TCP server tests
389    // -----------------------------------------------------------------------
390
391    #[test]
392    fn test_server_starts_and_stops() {
393        let dir = make_env_home(&[]);
394        let server = Arc::new(NetworkRestoreServer::new(dir.path()));
395        let _addr = server.start("127.0.0.1:0".parse().unwrap()).unwrap();
396        assert!(server.is_running());
397        server.stop();
398        std::thread::sleep(Duration::from_millis(50));
399        assert!(!server.is_running());
400    }
401
402    #[test]
403    fn test_restore_empty_env_home() {
404        let dir = make_env_home(&[]);
405        let server = Arc::new(NetworkRestoreServer::new(dir.path()));
406        let bound = server.start("127.0.0.1:0".parse().unwrap()).unwrap();
407        std::thread::sleep(Duration::from_millis(20));
408
409        let restore_dir = tempfile::tempdir().expect("restore dir");
410        let config = NetworkRestoreConfig {
411            source_node: "test".to_string(),
412            source_host: "127.0.0.1".to_string(),
413            source_port: bound.port(),
414            retain_log_files: false,
415        };
416        let restore =
417            NetworkRestore::new(config).with_local_dir(restore_dir.path());
418        restore.execute().expect("empty restore should succeed");
419
420        let received: Vec<_> = std::fs::read_dir(restore_dir.path())
421            .unwrap()
422            .filter_map(|e| e.ok())
423            .collect();
424        assert_eq!(received.len(), 0);
425        server.stop();
426    }
427
428    #[test]
429    fn test_restore_single_file() {
430        let content = b"log file content for testing";
431        let dir = make_env_home(&[("00000001.ndb", content)]);
432        let server = Arc::new(NetworkRestoreServer::new(dir.path()));
433        let bound = server.start("127.0.0.1:0".parse().unwrap()).unwrap();
434        std::thread::sleep(Duration::from_millis(20));
435
436        let restore_dir = tempfile::tempdir().expect("restore dir");
437        let config = NetworkRestoreConfig {
438            source_node: "node1".to_string(),
439            source_host: "127.0.0.1".to_string(),
440            source_port: bound.port(),
441            retain_log_files: false,
442        };
443        let restore =
444            NetworkRestore::new(config).with_local_dir(restore_dir.path());
445        restore.execute().expect("single-file restore");
446
447        let received = std::fs::read(restore_dir.path().join("00000001.ndb"))
448            .expect("received file");
449        assert_eq!(&received, content);
450        server.stop();
451    }
452
453    #[test]
454    fn test_restore_multiple_files() {
455        let file_data: Vec<(&str, Vec<u8>)> = (0u32..5)
456            .map(|i| {
457                let name: &'static str =
458                    Box::leak(format!("{:08}.ndb", i).into_boxed_str());
459                let data = vec![(i & 0xFF) as u8; 1024 * (i as usize + 1)];
460                (name, data)
461            })
462            .collect();
463
464        let file_refs: Vec<(&str, &[u8])> =
465            file_data.iter().map(|(n, d)| (*n, d.as_slice())).collect();
466        let dir = make_env_home(&file_refs);
467        let server = Arc::new(NetworkRestoreServer::new(dir.path()));
468        let bound = server.start("127.0.0.1:0".parse().unwrap()).unwrap();
469        std::thread::sleep(Duration::from_millis(20));
470
471        let restore_dir = tempfile::tempdir().expect("restore dir");
472        let config = NetworkRestoreConfig {
473            source_node: "node1".to_string(),
474            source_host: "127.0.0.1".to_string(),
475            source_port: bound.port(),
476            retain_log_files: false,
477        };
478        let restore =
479            NetworkRestore::new(config).with_local_dir(restore_dir.path());
480        restore.execute().expect("multi-file restore");
481
482        for (name, expected) in &file_data {
483            let got = std::fs::read(restore_dir.path().join(name)).expect(name);
484            assert_eq!(&got, expected, "file {} mismatch", name);
485        }
486        server.stop();
487    }
488
489    #[test]
490    fn test_restore_non_ndb_files_not_sent() {
491        // Only .ndb files should be transferred.
492        let dir = make_env_home(&[
493            ("00000001.ndb", b"log data"),
494            ("noxu.config.csv", b"config"),
495            ("README.txt", b"readme"),
496        ]);
497        let server = Arc::new(NetworkRestoreServer::new(dir.path()));
498        let bound = server.start("127.0.0.1:0".parse().unwrap()).unwrap();
499        std::thread::sleep(Duration::from_millis(20));
500
501        let restore_dir = tempfile::tempdir().expect("restore dir");
502        let config = NetworkRestoreConfig {
503            source_node: "node1".to_string(),
504            source_host: "127.0.0.1".to_string(),
505            source_port: bound.port(),
506            retain_log_files: false,
507        };
508        let restore =
509            NetworkRestore::new(config).with_local_dir(restore_dir.path());
510        restore.execute().expect("restore");
511
512        // Only the .ndb file should appear.
513        let mut names: Vec<String> = std::fs::read_dir(restore_dir.path())
514            .unwrap()
515            .filter_map(|e| e.ok())
516            .map(|e| e.file_name().to_string_lossy().to_string())
517            .collect();
518        names.sort();
519        assert_eq!(names, vec!["00000001.ndb"]);
520        server.stop();
521    }
522
523    #[test]
524    fn test_restore_retain_log_files() {
525        let original = b"original content";
526        let updated = b"new content from restore";
527
528        let src_dir = make_env_home(&[("00000001.ndb", updated)]);
529        let server = Arc::new(NetworkRestoreServer::new(src_dir.path()));
530        let bound = server.start("127.0.0.1:0".parse().unwrap()).unwrap();
531        std::thread::sleep(Duration::from_millis(20));
532
533        // Pre-populate the destination with the original file.
534        let restore_dir = tempfile::tempdir().expect("restore dir");
535        std::fs::write(restore_dir.path().join("00000001.ndb"), original)
536            .expect("pre-populate");
537
538        let config = NetworkRestoreConfig {
539            source_node: "node1".to_string(),
540            source_host: "127.0.0.1".to_string(),
541            source_port: bound.port(),
542            retain_log_files: true,
543        };
544        let restore =
545            NetworkRestore::new(config).with_local_dir(restore_dir.path());
546        restore.execute().expect("restore with retain");
547
548        // The restored file should contain the new data.
549        let got =
550            std::fs::read(restore_dir.path().join("00000001.ndb")).unwrap();
551        assert_eq!(&got, updated);
552
553        // The backup file should still contain the original.
554        let bak =
555            std::fs::read(restore_dir.path().join("00000001.ndb.bak")).unwrap();
556        assert_eq!(&bak, original);
557        server.stop();
558    }
559
560    #[test]
561    fn test_restore_large_file() {
562        // 200 KiB — ensures chunking through the 64 KiB buffer.
563        let large = vec![0xABu8; 200 * 1024];
564        let dir = make_env_home(&[("large.ndb", &large)]);
565        let server = Arc::new(NetworkRestoreServer::new(dir.path()));
566        let bound = server.start("127.0.0.1:0".parse().unwrap()).unwrap();
567        std::thread::sleep(Duration::from_millis(20));
568
569        let restore_dir = tempfile::tempdir().expect("restore dir");
570        let config = NetworkRestoreConfig {
571            source_node: "node1".to_string(),
572            source_host: "127.0.0.1".to_string(),
573            source_port: bound.port(),
574            retain_log_files: false,
575        };
576        let restore =
577            NetworkRestore::new(config).with_local_dir(restore_dir.path());
578        restore.execute().expect("large file restore");
579
580        let got = std::fs::read(restore_dir.path().join("large.ndb")).unwrap();
581        assert_eq!(got.len(), large.len());
582        assert_eq!(&got, &large);
583        server.stop();
584    }
585
586    #[test]
587    fn test_server_service_name() {
588        let dir = make_env_home(&[]);
589        let server = NetworkRestoreServer::new(dir.path());
590        assert_eq!(server.service_name(), RESTORE_SERVICE_NAME);
591        assert_eq!(server.service_name(), "RESTORE");
592    }
593
594    #[test]
595    fn test_restore_progress_tracking() {
596        let content = b"progress test data";
597        let dir = make_env_home(&[("00000001.ndb", content)]);
598        let server = Arc::new(NetworkRestoreServer::new(dir.path()));
599        let bound = server.start("127.0.0.1:0".parse().unwrap()).unwrap();
600        std::thread::sleep(Duration::from_millis(20));
601
602        let restore_dir = tempfile::tempdir().expect("restore dir");
603        let config = NetworkRestoreConfig {
604            source_node: "node1".to_string(),
605            source_host: "127.0.0.1".to_string(),
606            source_port: bound.port(),
607            retain_log_files: false,
608        };
609        let restore =
610            NetworkRestore::new(config).with_local_dir(restore_dir.path());
611        restore.execute().expect("restore");
612
613        let progress = restore.get_progress();
614        assert_eq!(progress.files_transferred, 1);
615        assert_eq!(progress.bytes_transferred, content.len() as u64);
616        server.stop();
617    }
618
619    // -----------------------------------------------------------------------
620    // Wire-protocol error-path coverage
621    // -----------------------------------------------------------------------
622
623    #[test]
624    fn test_into_arc_wraps_self() {
625        let dir = make_env_home(&[]);
626        let server = NetworkRestoreServer::new(dir.path());
627        let arc = server.into_arc();
628        // Arc::strong_count is 1 right after wrapping; verify the
629        // running flag is reachable and false.
630        assert!(!arc.is_running());
631        assert_eq!(Arc::strong_count(&arc), 1);
632    }
633
634    #[test]
635    fn test_serve_raw_rejects_bad_magic() {
636        // Connect to the server and send 4 bytes of garbage. The
637        // server should close the connection with an Err on its
638        // side; on the client we observe EOF / unexpected close.
639        let dir = make_env_home(&[]);
640        let server = Arc::new(NetworkRestoreServer::new(dir.path()));
641        let bound = server.start("127.0.0.1:0".parse().unwrap()).unwrap();
642        std::thread::sleep(Duration::from_millis(20));
643
644        let mut stream = std::net::TcpStream::connect(bound).unwrap();
645        stream.write_all(&[0xDE, 0xAD, 0xBE, 0xEF]).unwrap();
646
647        // Server should close the stream rather than keep talking.
648        // Read with a short timeout — expect 0 bytes (EOF) or an
649        // error.
650        stream.set_read_timeout(Some(Duration::from_millis(500))).unwrap();
651        let mut buf = [0u8; 4];
652        let r = std::io::Read::read(&mut stream, &mut buf);
653        match r {
654            Ok(0) => {} // clean EOF — server hung up
655            Ok(_n) => panic!("server replied to bad magic instead of closing"),
656            Err(_) => {} // timeout or reset — also acceptable
657        }
658        server.stop();
659    }
660
661    #[test]
662    fn test_serve_raw_short_read_on_magic() {
663        // Connect and immediately close (send no bytes). The server
664        // should fail its read_exact with a short-read error and
665        // not panic. The accept thread continues.
666        let dir = make_env_home(&[]);
667        let server = Arc::new(NetworkRestoreServer::new(dir.path()));
668        let bound = server.start("127.0.0.1:0".parse().unwrap()).unwrap();
669        std::thread::sleep(Duration::from_millis(20));
670
671        // Connect and drop immediately.
672        {
673            let _ = std::net::TcpStream::connect(bound).unwrap();
674        }
675        // Subsequent connection should still work — accept loop
676        // didn't crash.
677        std::thread::sleep(Duration::from_millis(20));
678        assert!(server.is_running());
679        server.stop();
680    }
681
682    #[test]
683    fn test_serve_raw_real_handshake_streams_files() {
684        // End-to-end: use the standalone server (start + serve_raw)
685        // to transfer one file. The existing test_restore_single_file
686        // also exercises this, but via the higher-level
687        // NetworkRestore client; here we open a raw socket and
688        // walk the wire protocol manually so the read_exact /
689        // u32::from_le_bytes paths are exercised.
690        let content = b"hello world";
691        let dir = make_env_home(&[("00000000.ndb", content)]);
692        let server = Arc::new(NetworkRestoreServer::new(dir.path()));
693        let bound = server.start("127.0.0.1:0".parse().unwrap()).unwrap();
694        std::thread::sleep(Duration::from_millis(20));
695
696        let mut stream = std::net::TcpStream::connect(bound).unwrap();
697        stream.write_all(&RESTORE_MAGIC.to_le_bytes()).unwrap();
698
699        // Read file count (u32, little-endian).
700        let mut count_buf = [0u8; 4];
701        std::io::Read::read_exact(&mut stream, &mut count_buf).unwrap();
702        let count = u32::from_le_bytes(count_buf);
703        assert_eq!(count, 1);
704
705        // Read filename length (u16) + name bytes.
706        let mut name_len_buf = [0u8; 2];
707        std::io::Read::read_exact(&mut stream, &mut name_len_buf).unwrap();
708        let name_len = u16::from_le_bytes(name_len_buf) as usize;
709        let mut name_buf = vec![0u8; name_len];
710        std::io::Read::read_exact(&mut stream, &mut name_buf).unwrap();
711        assert_eq!(&name_buf, b"00000000.ndb");
712
713        // Read file size (u64) + file bytes.
714        let mut size_buf = [0u8; 8];
715        std::io::Read::read_exact(&mut stream, &mut size_buf).unwrap();
716        let size = u64::from_le_bytes(size_buf);
717        assert_eq!(size as usize, content.len());
718
719        let mut payload = vec![0u8; size as usize];
720        std::io::Read::read_exact(&mut stream, &mut payload).unwrap();
721        assert_eq!(&payload, content);
722
723        server.stop();
724    }
725
726    #[test]
727    fn test_start_returns_error_for_unbindable_addr() {
728        let dir = make_env_home(&[]);
729        let server = Arc::new(NetworkRestoreServer::new(dir.path()));
730        // Port 1 should not be bindable for unprivileged user.
731        let r = server.start("127.0.0.1:1".parse().unwrap());
732        assert!(r.is_err(), "binding to port 1 should fail for non-root");
733    }
734
735    #[test]
736    fn test_stop_is_idempotent() {
737        let dir = make_env_home(&[]);
738        let server = Arc::new(NetworkRestoreServer::new(dir.path()));
739        let _ = server.start("127.0.0.1:0".parse().unwrap()).unwrap();
740        server.stop();
741        server.stop();
742        std::thread::sleep(Duration::from_millis(20));
743        assert!(!server.is_running());
744    }
745
746    // -----------------------------------------------------------------------
747    // ServiceHandler::handle path (multiplexed-channel transport)
748    // -----------------------------------------------------------------------
749
750    #[test]
751    fn test_service_handler_handle_streams_via_channel() {
752        use crate::net::channel::LocalChannelPair;
753
754        let content = b"abcdef";
755        let dir = make_env_home(&[("00000005.ndb", content)]);
756        let server = NetworkRestoreServer::new(dir.path());
757
758        let pair = LocalChannelPair::new();
759        let server_channel: Box<dyn crate::net::channel::Channel> =
760            Box::new(pair.channel_a);
761        let client_channel = pair.channel_b;
762
763        // Client sends magic.
764        client_channel.send(&RESTORE_MAGIC.to_le_bytes()).unwrap();
765
766        // Server runs handle().
767        let r = server.handle(server_channel);
768        assert!(r.is_ok(), "handle returned Err: {:?}", r.err());
769
770        // Client receives the framed payload.
771        use crate::net::channel::Channel;
772        let payload = client_channel
773            .receive(Duration::from_secs(5))
774            .unwrap()
775            .expect("payload");
776
777        // Expected wire format: u32 count + (u16 name_len + name + u64 size + bytes).
778        let count = u32::from_le_bytes([
779            payload[0], payload[1], payload[2], payload[3],
780        ]);
781        assert_eq!(count, 1);
782
783        let name_len = u16::from_le_bytes([payload[4], payload[5]]) as usize;
784        assert_eq!(name_len, b"00000005.ndb".len());
785        let name = &payload[6..6 + name_len];
786        assert_eq!(name, b"00000005.ndb");
787
788        let size_off = 6 + name_len;
789        let mut size_bytes = [0u8; 8];
790        size_bytes.copy_from_slice(&payload[size_off..size_off + 8]);
791        let size = u64::from_le_bytes(size_bytes) as usize;
792        assert_eq!(size, content.len());
793
794        let data_off = size_off + 8;
795        assert_eq!(&payload[data_off..data_off + size], content);
796    }
797
798    #[test]
799    fn test_service_handler_handle_rejects_bad_magic() {
800        use crate::net::channel::LocalChannelPair;
801
802        let dir = make_env_home(&[]);
803        let server = NetworkRestoreServer::new(dir.path());
804
805        let pair = LocalChannelPair::new();
806        let server_channel: Box<dyn crate::net::channel::Channel> =
807            Box::new(pair.channel_a);
808        let client_channel = pair.channel_b;
809
810        client_channel.send(&[0xDE, 0xAD, 0xBE, 0xEF]).unwrap();
811        let r = server.handle(server_channel);
812        assert!(r.is_err(), "handle on bad magic must error");
813        let msg = format!("{}", r.err().unwrap());
814        assert!(
815            msg.contains("bad restore magic"),
816            "expected 'bad restore magic' in error, got: {msg}"
817        );
818    }
819
820    #[test]
821    fn test_service_handler_handle_rejects_short_magic() {
822        use crate::net::channel::LocalChannelPair;
823
824        let dir = make_env_home(&[]);
825        let server = NetworkRestoreServer::new(dir.path());
826
827        let pair = LocalChannelPair::new();
828        let server_channel: Box<dyn crate::net::channel::Channel> =
829            Box::new(pair.channel_a);
830        let client_channel = pair.channel_b;
831
832        client_channel.send(&[0xDE]).unwrap();
833        let r = server.handle(server_channel);
834        assert!(r.is_err(), "handle on short magic must error");
835    }
836
837    #[test]
838    fn test_service_handler_handle_no_magic_received() {
839        use crate::net::channel::LocalChannelPair;
840
841        let dir = make_env_home(&[]);
842        let server = NetworkRestoreServer::new(dir.path());
843
844        let pair = LocalChannelPair::new();
845        let server_channel: Box<dyn crate::net::channel::Channel> =
846            Box::new(pair.channel_a);
847        // Drop the client side without sending — server should fail
848        // with "no magic bytes received".
849        drop(pair.channel_b);
850        let r = server.handle(server_channel);
851        assert!(r.is_err(), "handle without magic must error");
852    }
853
854    #[test]
855    fn test_service_handler_handle_with_unreadable_env_home() {
856        use crate::net::channel::LocalChannelPair;
857
858        // Point env_home at a path that doesn't exist — read_dir
859        // fails inside handle() and we get a NetworkRestoreError.
860        let server = NetworkRestoreServer::new("/nonexistent/path/xxx");
861
862        let pair = LocalChannelPair::new();
863        let server_channel: Box<dyn crate::net::channel::Channel> =
864            Box::new(pair.channel_a);
865        let client_channel = pair.channel_b;
866
867        client_channel.send(&RESTORE_MAGIC.to_le_bytes()).unwrap();
868        let r = server.handle(server_channel);
869        assert!(r.is_err(), "unreadable env_home must error");
870    }
871}