1use std::io::{Read as IoRead, Write as IoWrite};
23use std::net::{SocketAddr, TcpListener};
24use std::path::PathBuf;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicBool, Ordering};
27use std::thread;
28
29use crate::error::{RepError, Result};
30use crate::net::channel::Channel;
31use crate::net::service_dispatcher::ServiceHandler;
32
33const RESTORE_MAGIC: u32 = 0x4E52_5354;
35
36pub const RESTORE_SERVICE_NAME: &str = "RESTORE";
38
39pub struct NetworkRestoreServer {
44 env_home: PathBuf,
46 running: Arc<AtomicBool>,
48}
49
50impl NetworkRestoreServer {
51 pub fn new(env_home: impl Into<PathBuf>) -> Self {
53 Self {
54 env_home: env_home.into(),
55 running: Arc::new(AtomicBool::new(false)),
56 }
57 }
58
59 pub fn into_arc(self) -> Arc<Self> {
62 Arc::new(self)
63 }
64
65 pub fn stop(&self) {
67 self.running.store(false, Ordering::SeqCst);
68 }
69
70 pub fn is_running(&self) -> bool {
72 self.running.load(Ordering::SeqCst)
73 }
74
75 pub fn start(self: &Arc<Self>, addr: SocketAddr) -> Result<SocketAddr> {
80 let listener = TcpListener::bind(addr)
81 .map_err(|e| RepError::NetworkError(e.to_string()))?;
82 let bound = listener
83 .local_addr()
84 .map_err(|e| RepError::NetworkError(e.to_string()))?;
85
86 self.running.store(true, Ordering::SeqCst);
87
88 let server = Arc::clone(self);
89 thread::spawn(move || {
90 while server.running.load(Ordering::SeqCst) {
91 match listener.accept() {
92 Ok((stream, _peer)) => {
93 let srv = Arc::clone(&server);
94 thread::spawn(move || {
95 let _ = srv.serve_raw(stream);
96 });
97 }
98 Err(_) => break,
99 }
100 }
101 server.running.store(false, Ordering::SeqCst);
102 });
103
104 Ok(bound)
105 }
106
107 fn serve_raw(&self, mut stream: std::net::TcpStream) -> Result<()> {
111 let mut magic_buf = [0u8; 4];
113 stream.read_exact(&mut magic_buf).map_err(|e| {
114 RepError::NetworkRestoreError(format!("reading magic: {}", e))
115 })?;
116 let magic = u32::from_le_bytes(magic_buf);
117 if magic != RESTORE_MAGIC {
118 return Err(RepError::NetworkRestoreError(format!(
119 "bad restore magic: 0x{:08X}",
120 magic
121 )));
122 }
123
124 self.send_files_to(&mut stream)
125 }
126
127 fn send_files_to<W: IoRead + IoWrite>(&self, out: &mut W) -> Result<()> {
132 let mut files: Vec<(String, PathBuf)> =
134 std::fs::read_dir(&self.env_home)
135 .map_err(|e| {
136 RepError::NetworkRestoreError(format!(
137 "cannot read env_home {}: {}",
138 self.env_home.display(),
139 e
140 ))
141 })?
142 .filter_map(|entry| {
143 let entry = entry.ok()?;
144 let path = entry.path();
145 if path.extension()?.to_str()? == "ndb" {
146 let name = path.file_name()?.to_str()?.to_string();
147 Some((name, path))
148 } else {
149 None
150 }
151 })
152 .collect();
153 files.sort_by(|a, b| a.0.cmp(&b.0));
154
155 let count = files.len() as u32;
157 out.write_all(&count.to_le_bytes()).map_err(|e| {
158 RepError::NetworkRestoreError(format!("writing file count: {}", e))
159 })?;
160
161 let mut chunk = vec![0u8; 65536];
162
163 for (name, path) in &files {
164 let name_bytes = name.as_bytes();
166 if name_bytes.len() > u16::MAX as usize {
167 return Err(RepError::NetworkRestoreError(format!(
168 "filename too long: {}",
169 name
170 )));
171 }
172
173 let name_len = name_bytes.len() as u16;
174 out.write_all(&name_len.to_le_bytes()).map_err(|e| {
175 RepError::NetworkRestoreError(format!(
176 "writing name_len for '{}': {}",
177 name, e
178 ))
179 })?;
180 out.write_all(name_bytes).map_err(|e| {
181 RepError::NetworkRestoreError(format!(
182 "writing filename '{}': {}",
183 name, e
184 ))
185 })?;
186
187 let metadata = std::fs::metadata(path).map_err(|e| {
189 RepError::NetworkRestoreError(format!(
190 "stat '{}': {}",
191 path.display(),
192 e
193 ))
194 })?;
195 let file_size = metadata.len();
196 out.write_all(&file_size.to_le_bytes()).map_err(|e| {
197 RepError::NetworkRestoreError(format!(
198 "writing size for '{}': {}",
199 name, e
200 ))
201 })?;
202
203 let mut file = std::fs::File::open(path).map_err(|e| {
205 RepError::NetworkRestoreError(format!(
206 "open '{}': {}",
207 path.display(),
208 e
209 ))
210 })?;
211 let mut remaining = file_size as usize;
212 let mut digest = crc32fast::Hasher::new();
217 while remaining > 0 {
218 let to_read = remaining.min(chunk.len());
219 let n = file.read(&mut chunk[..to_read]).map_err(|e| {
220 RepError::NetworkRestoreError(format!(
221 "reading '{}': {}",
222 path.display(),
223 e
224 ))
225 })?;
226 if n == 0 {
227 break; }
229 digest.update(&chunk[..n]);
230 out.write_all(&chunk[..n]).map_err(|e| {
231 RepError::NetworkRestoreError(format!(
232 "sending data for '{}': {}",
233 name, e
234 ))
235 })?;
236 remaining -= n;
237 }
238 out.write_all(&digest.finalize().to_le_bytes()).map_err(|e| {
240 RepError::NetworkRestoreError(format!(
241 "sending digest for '{}': {}",
242 name, e
243 ))
244 })?;
245
246 log::debug!(
247 "NetworkRestoreServer: sent '{}' ({} bytes)",
248 name,
249 file_size
250 );
251 }
252
253 out.flush().map_err(|e| {
254 RepError::NetworkRestoreError(format!("flushing output: {}", e))
255 })?;
256
257 Ok(())
258 }
259}
260
261impl ServiceHandler for NetworkRestoreServer {
270 fn service_name(&self) -> &str {
271 RESTORE_SERVICE_NAME
272 }
273
274 fn handle(&self, channel: Box<dyn Channel>) -> Result<()> {
275 use std::time::Duration;
277
278 let magic_bytes =
279 channel.receive(Duration::from_secs(30))?.ok_or_else(|| {
280 RepError::NetworkRestoreError(
281 "no magic bytes received on RESTORE channel".into(),
282 )
283 })?;
284 if magic_bytes.len() < 4 {
285 return Err(RepError::NetworkRestoreError(format!(
286 "short magic: {} bytes",
287 magic_bytes.len()
288 )));
289 }
290 let magic = u32::from_le_bytes([
291 magic_bytes[0],
292 magic_bytes[1],
293 magic_bytes[2],
294 magic_bytes[3],
295 ]);
296 if magic != RESTORE_MAGIC {
297 return Err(RepError::NetworkRestoreError(format!(
298 "bad restore magic: 0x{:08X}",
299 magic
300 )));
301 }
302
303 let mut files: Vec<(String, PathBuf)> =
305 std::fs::read_dir(&self.env_home)
306 .map_err(|e| {
307 RepError::NetworkRestoreError(format!(
308 "read_dir {}: {}",
309 self.env_home.display(),
310 e
311 ))
312 })?
313 .filter_map(|entry| {
314 let entry = entry.ok()?;
315 let path = entry.path();
316 if path.extension()?.to_str()? == "ndb" {
317 let name = path.file_name()?.to_str()?.to_string();
318 Some((name, path))
319 } else {
320 None
321 }
322 })
323 .collect();
324 files.sort_by(|a, b| a.0.cmp(&b.0));
325
326 let mut payload: Vec<u8> = Vec::new();
330 let count = files.len() as u32;
331 payload.extend_from_slice(&count.to_le_bytes());
332
333 let mut chunk = vec![0u8; 65536];
334 for (name, path) in &files {
335 let name_bytes = name.as_bytes();
336 let name_len = name_bytes.len() as u16;
337 payload.extend_from_slice(&name_len.to_le_bytes());
338 payload.extend_from_slice(name_bytes);
339
340 let metadata = std::fs::metadata(path).map_err(|e| {
341 RepError::NetworkRestoreError(format!(
342 "stat '{}': {}",
343 path.display(),
344 e
345 ))
346 })?;
347 let file_size = metadata.len();
348 payload.extend_from_slice(&file_size.to_le_bytes());
349
350 let mut file = std::fs::File::open(path).map_err(|e| {
351 RepError::NetworkRestoreError(format!(
352 "open '{}': {}",
353 path.display(),
354 e
355 ))
356 })?;
357 let mut remaining = file_size as usize;
358 let mut digest = crc32fast::Hasher::new();
362 while remaining > 0 {
363 let to_read = remaining.min(chunk.len());
364 let n = file.read(&mut chunk[..to_read]).map_err(|e| {
365 RepError::NetworkRestoreError(format!(
366 "reading '{}': {}",
367 path.display(),
368 e
369 ))
370 })?;
371 if n == 0 {
372 break;
373 }
374 digest.update(&chunk[..n]);
375 payload.extend_from_slice(&chunk[..n]);
376 remaining -= n;
377 }
378 payload.extend_from_slice(&digest.finalize().to_le_bytes());
379 }
380
381 channel.send(&payload)?;
382 Ok(())
383 }
384}
385
386#[cfg(test)]
387mod tests {
388 use super::*;
389 use std::io::Write;
390 use std::time::Duration;
391 use tempfile::TempDir;
392
393 use crate::network_restore::{NetworkRestore, NetworkRestoreConfig};
394
395 fn make_env_home(files: &[(&str, &[u8])]) -> TempDir {
397 let dir = tempfile::tempdir().expect("temp dir");
398 for (name, data) in files {
399 let mut f =
400 std::fs::File::create(dir.path().join(name)).expect("create");
401 f.write_all(data).expect("write");
402 }
403 dir
404 }
405
406 #[test]
411 fn test_server_starts_and_stops() {
412 let dir = make_env_home(&[]);
413 let server = Arc::new(NetworkRestoreServer::new(dir.path()));
414 let _addr = server.start("127.0.0.1:0".parse().unwrap()).unwrap();
415 assert!(server.is_running());
416 server.stop();
417 std::thread::sleep(Duration::from_millis(50));
418 assert!(!server.is_running());
419 }
420
421 #[test]
422 fn test_restore_empty_env_home() {
423 let dir = make_env_home(&[]);
424 let server = Arc::new(NetworkRestoreServer::new(dir.path()));
425 let bound = server.start("127.0.0.1:0".parse().unwrap()).unwrap();
426 std::thread::sleep(Duration::from_millis(20));
427
428 let restore_dir = tempfile::tempdir().expect("restore dir");
429 let config = NetworkRestoreConfig {
430 source_node: "test".to_string(),
431 source_host: "127.0.0.1".to_string(),
432 source_port: bound.port(),
433 retain_log_files: false,
434 };
435 let restore =
436 NetworkRestore::new(config).with_local_dir(restore_dir.path());
437 restore.execute().expect("empty restore should succeed");
438
439 let received: Vec<_> = std::fs::read_dir(restore_dir.path())
440 .unwrap()
441 .filter_map(|e| e.ok())
442 .collect();
443 assert_eq!(received.len(), 0);
444 server.stop();
445 }
446
447 #[test]
448 fn test_restore_digest_detects_corruption() {
449 use std::io::Cursor;
454 let content = b"the quick brown fox jumps over the lazy dog";
455 let dir = make_env_home(&[("00000001.ndb", content)]);
456 let server = NetworkRestoreServer::new(dir.path());
457 let mut buf = Cursor::new(Vec::new());
458 server.send_files_to(&mut buf).expect("send into buffer");
459 let mut wire = buf.into_inner();
460
461 let mut off = 4usize;
464 let name_len =
465 u16::from_le_bytes(wire[off..off + 2].try_into().unwrap()) as usize;
466 off += 2 + name_len;
467 let file_size =
468 u64::from_le_bytes(wire[off..off + 8].try_into().unwrap()) as usize;
469 off += 8;
470 let body_start = off;
471 let trailer_start = body_start + file_size;
472
473 let want = u32::from_le_bytes(
475 wire[trailer_start..trailer_start + 4].try_into().unwrap(),
476 );
477 assert_eq!(want, crc32fast::hash(&wire[body_start..trailer_start]));
478
479 wire[body_start] ^= 0xFF;
481 let got = crc32fast::hash(&wire[body_start..trailer_start]);
482 assert_ne!(
483 want, got,
484 "D10: corrupted body must fail the CRC32 trailer check"
485 );
486 }
487
488 #[test]
489 fn test_restore_single_file() {
490 let content = b"log file content for testing";
491 let dir = make_env_home(&[("00000001.ndb", content)]);
492 let server = Arc::new(NetworkRestoreServer::new(dir.path()));
493 let bound = server.start("127.0.0.1:0".parse().unwrap()).unwrap();
494 std::thread::sleep(Duration::from_millis(20));
495
496 let restore_dir = tempfile::tempdir().expect("restore dir");
497 let config = NetworkRestoreConfig {
498 source_node: "node1".to_string(),
499 source_host: "127.0.0.1".to_string(),
500 source_port: bound.port(),
501 retain_log_files: false,
502 };
503 let restore =
504 NetworkRestore::new(config).with_local_dir(restore_dir.path());
505 restore.execute().expect("single-file restore");
506
507 let received = std::fs::read(restore_dir.path().join("00000001.ndb"))
508 .expect("received file");
509 assert_eq!(&received, content);
510 server.stop();
511 }
512
513 #[test]
514 fn test_restore_multiple_files() {
515 let file_data: Vec<(&str, Vec<u8>)> = (0u32..5)
516 .map(|i| {
517 let name: &'static str =
518 Box::leak(format!("{:08}.ndb", i).into_boxed_str());
519 let data = vec![(i & 0xFF) as u8; 1024 * (i as usize + 1)];
520 (name, data)
521 })
522 .collect();
523
524 let file_refs: Vec<(&str, &[u8])> =
525 file_data.iter().map(|(n, d)| (*n, d.as_slice())).collect();
526 let dir = make_env_home(&file_refs);
527 let server = Arc::new(NetworkRestoreServer::new(dir.path()));
528 let bound = server.start("127.0.0.1:0".parse().unwrap()).unwrap();
529 std::thread::sleep(Duration::from_millis(20));
530
531 let restore_dir = tempfile::tempdir().expect("restore dir");
532 let config = NetworkRestoreConfig {
533 source_node: "node1".to_string(),
534 source_host: "127.0.0.1".to_string(),
535 source_port: bound.port(),
536 retain_log_files: false,
537 };
538 let restore =
539 NetworkRestore::new(config).with_local_dir(restore_dir.path());
540 restore.execute().expect("multi-file restore");
541
542 for (name, expected) in &file_data {
543 let got = std::fs::read(restore_dir.path().join(name)).expect(name);
544 assert_eq!(&got, expected, "file {} mismatch", name);
545 }
546 server.stop();
547 }
548
549 #[test]
550 fn test_restore_non_ndb_files_not_sent() {
551 let dir = make_env_home(&[
553 ("00000001.ndb", b"log data"),
554 ("noxu.config.csv", b"config"),
555 ("README.txt", b"readme"),
556 ]);
557 let server = Arc::new(NetworkRestoreServer::new(dir.path()));
558 let bound = server.start("127.0.0.1:0".parse().unwrap()).unwrap();
559 std::thread::sleep(Duration::from_millis(20));
560
561 let restore_dir = tempfile::tempdir().expect("restore dir");
562 let config = NetworkRestoreConfig {
563 source_node: "node1".to_string(),
564 source_host: "127.0.0.1".to_string(),
565 source_port: bound.port(),
566 retain_log_files: false,
567 };
568 let restore =
569 NetworkRestore::new(config).with_local_dir(restore_dir.path());
570 restore.execute().expect("restore");
571
572 let mut names: Vec<String> = std::fs::read_dir(restore_dir.path())
574 .unwrap()
575 .filter_map(|e| e.ok())
576 .map(|e| e.file_name().to_string_lossy().to_string())
577 .collect();
578 names.sort();
579 assert_eq!(names, vec!["00000001.ndb"]);
580 server.stop();
581 }
582
583 #[test]
584 fn test_restore_retain_log_files() {
585 let original = b"original content";
586 let updated = b"new content from restore";
587
588 let src_dir = make_env_home(&[("00000001.ndb", updated)]);
589 let server = Arc::new(NetworkRestoreServer::new(src_dir.path()));
590 let bound = server.start("127.0.0.1:0".parse().unwrap()).unwrap();
591 std::thread::sleep(Duration::from_millis(20));
592
593 let restore_dir = tempfile::tempdir().expect("restore dir");
595 std::fs::write(restore_dir.path().join("00000001.ndb"), original)
596 .expect("pre-populate");
597
598 let config = NetworkRestoreConfig {
599 source_node: "node1".to_string(),
600 source_host: "127.0.0.1".to_string(),
601 source_port: bound.port(),
602 retain_log_files: true,
603 };
604 let restore =
605 NetworkRestore::new(config).with_local_dir(restore_dir.path());
606 restore.execute().expect("restore with retain");
607
608 let got =
610 std::fs::read(restore_dir.path().join("00000001.ndb")).unwrap();
611 assert_eq!(&got, updated);
612
613 let bak =
615 std::fs::read(restore_dir.path().join("00000001.ndb.bak")).unwrap();
616 assert_eq!(&bak, original);
617 server.stop();
618 }
619
620 #[test]
621 fn test_restore_large_file() {
622 let large = vec![0xABu8; 200 * 1024];
624 let dir = make_env_home(&[("large.ndb", &large)]);
625 let server = Arc::new(NetworkRestoreServer::new(dir.path()));
626 let bound = server.start("127.0.0.1:0".parse().unwrap()).unwrap();
627 std::thread::sleep(Duration::from_millis(20));
628
629 let restore_dir = tempfile::tempdir().expect("restore dir");
630 let config = NetworkRestoreConfig {
631 source_node: "node1".to_string(),
632 source_host: "127.0.0.1".to_string(),
633 source_port: bound.port(),
634 retain_log_files: false,
635 };
636 let restore =
637 NetworkRestore::new(config).with_local_dir(restore_dir.path());
638 restore.execute().expect("large file restore");
639
640 let got = std::fs::read(restore_dir.path().join("large.ndb")).unwrap();
641 assert_eq!(got.len(), large.len());
642 assert_eq!(&got, &large);
643 server.stop();
644 }
645
646 #[test]
647 fn test_server_service_name() {
648 let dir = make_env_home(&[]);
649 let server = NetworkRestoreServer::new(dir.path());
650 assert_eq!(server.service_name(), RESTORE_SERVICE_NAME);
651 assert_eq!(server.service_name(), "RESTORE");
652 }
653
654 #[test]
655 fn test_restore_progress_tracking() {
656 let content = b"progress test data";
657 let dir = make_env_home(&[("00000001.ndb", content)]);
658 let server = Arc::new(NetworkRestoreServer::new(dir.path()));
659 let bound = server.start("127.0.0.1:0".parse().unwrap()).unwrap();
660 std::thread::sleep(Duration::from_millis(20));
661
662 let restore_dir = tempfile::tempdir().expect("restore dir");
663 let config = NetworkRestoreConfig {
664 source_node: "node1".to_string(),
665 source_host: "127.0.0.1".to_string(),
666 source_port: bound.port(),
667 retain_log_files: false,
668 };
669 let restore =
670 NetworkRestore::new(config).with_local_dir(restore_dir.path());
671 restore.execute().expect("restore");
672
673 let progress = restore.get_progress();
674 assert_eq!(progress.files_transferred, 1);
675 assert_eq!(progress.bytes_transferred, content.len() as u64);
676 server.stop();
677 }
678
679 #[test]
684 fn test_into_arc_wraps_self() {
685 let dir = make_env_home(&[]);
686 let server = NetworkRestoreServer::new(dir.path());
687 let arc = server.into_arc();
688 assert!(!arc.is_running());
691 assert_eq!(Arc::strong_count(&arc), 1);
692 }
693
694 #[test]
695 fn test_serve_raw_rejects_bad_magic() {
696 let dir = make_env_home(&[]);
700 let server = Arc::new(NetworkRestoreServer::new(dir.path()));
701 let bound = server.start("127.0.0.1:0".parse().unwrap()).unwrap();
702 std::thread::sleep(Duration::from_millis(20));
703
704 let mut stream = std::net::TcpStream::connect(bound).unwrap();
705 stream.write_all(&[0xDE, 0xAD, 0xBE, 0xEF]).unwrap();
706
707 stream.set_read_timeout(Some(Duration::from_millis(500))).unwrap();
711 let mut buf = [0u8; 4];
712 let r = std::io::Read::read(&mut stream, &mut buf);
713 match r {
714 Ok(0) => {} Ok(_n) => panic!("server replied to bad magic instead of closing"),
716 Err(_) => {} }
718 server.stop();
719 }
720
721 #[test]
722 fn test_serve_raw_short_read_on_magic() {
723 let dir = make_env_home(&[]);
727 let server = Arc::new(NetworkRestoreServer::new(dir.path()));
728 let bound = server.start("127.0.0.1:0".parse().unwrap()).unwrap();
729 std::thread::sleep(Duration::from_millis(20));
730
731 {
733 let _ = std::net::TcpStream::connect(bound).unwrap();
734 }
735 std::thread::sleep(Duration::from_millis(20));
738 assert!(server.is_running());
739 server.stop();
740 }
741
742 #[test]
743 fn test_serve_raw_real_handshake_streams_files() {
744 let content = b"hello world";
751 let dir = make_env_home(&[("00000000.ndb", content)]);
752 let server = Arc::new(NetworkRestoreServer::new(dir.path()));
753 let bound = server.start("127.0.0.1:0".parse().unwrap()).unwrap();
754 std::thread::sleep(Duration::from_millis(20));
755
756 let mut stream = std::net::TcpStream::connect(bound).unwrap();
757 stream.write_all(&RESTORE_MAGIC.to_le_bytes()).unwrap();
758
759 let mut count_buf = [0u8; 4];
761 std::io::Read::read_exact(&mut stream, &mut count_buf).unwrap();
762 let count = u32::from_le_bytes(count_buf);
763 assert_eq!(count, 1);
764
765 let mut name_len_buf = [0u8; 2];
767 std::io::Read::read_exact(&mut stream, &mut name_len_buf).unwrap();
768 let name_len = u16::from_le_bytes(name_len_buf) as usize;
769 let mut name_buf = vec![0u8; name_len];
770 std::io::Read::read_exact(&mut stream, &mut name_buf).unwrap();
771 assert_eq!(&name_buf, b"00000000.ndb");
772
773 let mut size_buf = [0u8; 8];
775 std::io::Read::read_exact(&mut stream, &mut size_buf).unwrap();
776 let size = u64::from_le_bytes(size_buf);
777 assert_eq!(size as usize, content.len());
778
779 let mut payload = vec![0u8; size as usize];
780 std::io::Read::read_exact(&mut stream, &mut payload).unwrap();
781 assert_eq!(&payload, content);
782
783 server.stop();
784 }
785
786 #[test]
787 fn test_start_returns_error_for_unbindable_addr() {
788 let dir = make_env_home(&[]);
789 let server = Arc::new(NetworkRestoreServer::new(dir.path()));
790 let r = server.start("192.0.2.1:9999".parse().unwrap());
795 assert!(
796 r.is_err(),
797 "binding to a non-local address should fail on all platforms"
798 );
799 }
800
801 #[test]
802 fn test_stop_is_idempotent() {
803 let dir = make_env_home(&[]);
804 let server = Arc::new(NetworkRestoreServer::new(dir.path()));
805 let _ = server.start("127.0.0.1:0".parse().unwrap()).unwrap();
806 server.stop();
807 server.stop();
808 std::thread::sleep(Duration::from_millis(20));
809 assert!(!server.is_running());
810 }
811
812 #[test]
817 fn test_service_handler_handle_streams_via_channel() {
818 use crate::net::channel::LocalChannelPair;
819
820 let content = b"abcdef";
821 let dir = make_env_home(&[("00000005.ndb", content)]);
822 let server = NetworkRestoreServer::new(dir.path());
823
824 let pair = LocalChannelPair::new();
825 let server_channel: Box<dyn crate::net::channel::Channel> =
826 Box::new(pair.channel_a);
827 let client_channel = pair.channel_b;
828
829 client_channel.send(&RESTORE_MAGIC.to_le_bytes()).unwrap();
831
832 let r = server.handle(server_channel);
834 assert!(r.is_ok(), "handle returned Err: {:?}", r.err());
835
836 use crate::net::channel::Channel;
838 let payload = client_channel
839 .receive(Duration::from_secs(5))
840 .unwrap()
841 .expect("payload");
842
843 let count = u32::from_le_bytes([
845 payload[0], payload[1], payload[2], payload[3],
846 ]);
847 assert_eq!(count, 1);
848
849 let name_len = u16::from_le_bytes([payload[4], payload[5]]) as usize;
850 assert_eq!(name_len, b"00000005.ndb".len());
851 let name = &payload[6..6 + name_len];
852 assert_eq!(name, b"00000005.ndb");
853
854 let size_off = 6 + name_len;
855 let mut size_bytes = [0u8; 8];
856 size_bytes.copy_from_slice(&payload[size_off..size_off + 8]);
857 let size = u64::from_le_bytes(size_bytes) as usize;
858 assert_eq!(size, content.len());
859
860 let data_off = size_off + 8;
861 assert_eq!(&payload[data_off..data_off + size], content);
862 }
863
864 #[test]
865 fn test_service_handler_handle_rejects_bad_magic() {
866 use crate::net::channel::LocalChannelPair;
867
868 let dir = make_env_home(&[]);
869 let server = NetworkRestoreServer::new(dir.path());
870
871 let pair = LocalChannelPair::new();
872 let server_channel: Box<dyn crate::net::channel::Channel> =
873 Box::new(pair.channel_a);
874 let client_channel = pair.channel_b;
875
876 client_channel.send(&[0xDE, 0xAD, 0xBE, 0xEF]).unwrap();
877 let r = server.handle(server_channel);
878 assert!(r.is_err(), "handle on bad magic must error");
879 let msg = format!("{}", r.err().unwrap());
880 assert!(
881 msg.contains("bad restore magic"),
882 "expected 'bad restore magic' in error, got: {msg}"
883 );
884 }
885
886 #[test]
887 fn test_service_handler_handle_rejects_short_magic() {
888 use crate::net::channel::LocalChannelPair;
889
890 let dir = make_env_home(&[]);
891 let server = NetworkRestoreServer::new(dir.path());
892
893 let pair = LocalChannelPair::new();
894 let server_channel: Box<dyn crate::net::channel::Channel> =
895 Box::new(pair.channel_a);
896 let client_channel = pair.channel_b;
897
898 client_channel.send(&[0xDE]).unwrap();
899 let r = server.handle(server_channel);
900 assert!(r.is_err(), "handle on short magic must error");
901 }
902
903 #[test]
904 fn test_service_handler_handle_no_magic_received() {
905 use crate::net::channel::LocalChannelPair;
906
907 let dir = make_env_home(&[]);
908 let server = NetworkRestoreServer::new(dir.path());
909
910 let pair = LocalChannelPair::new();
911 let server_channel: Box<dyn crate::net::channel::Channel> =
912 Box::new(pair.channel_a);
913 drop(pair.channel_b);
916 let r = server.handle(server_channel);
917 assert!(r.is_err(), "handle without magic must error");
918 }
919
920 #[test]
921 fn test_service_handler_handle_with_unreadable_env_home() {
922 use crate::net::channel::LocalChannelPair;
923
924 let server = NetworkRestoreServer::new("/nonexistent/path/xxx");
927
928 let pair = LocalChannelPair::new();
929 let server_channel: Box<dyn crate::net::channel::Channel> =
930 Box::new(pair.channel_a);
931 let client_channel = pair.channel_b;
932
933 client_channel.send(&RESTORE_MAGIC.to_le_bytes()).unwrap();
934 let r = server.handle(server_channel);
935 assert!(r.is_err(), "unreadable env_home must error");
936 }
937}