remote/lib.rs
1//! Remote copy protocol and networking for distributed file operations
2//!
3//! This crate provides the networking layer and protocol definitions for remote file copying
4//! in the RCP tools suite. It enables efficient distributed copying between remote hosts using
5//! SSH for orchestration and QUIC for high-performance data transfer.
6//!
7//! # Overview
8//!
9//! The remote copy system uses a three-node architecture:
10//!
11//! ```text
12//! Master (rcp)
13//! ├── SSH → Source Host (rcpd)
14//! │ └── QUIC → Master (control)
15//! │ └── QUIC Server (waits for Destination)
16//! └── SSH → Destination Host (rcpd)
17//! └── QUIC → Master (control)
18//! └── QUIC Client → Source (data transfer)
19//! ```
20//!
21//! ## Connection Flow
22//!
23//! 1. **Initialization**: Master starts `rcpd` processes on source and destination via SSH
24//! 2. **Control Connections**: Both `rcpd` processes connect back to Master via QUIC
25//! 3. **Address Exchange**: Source starts QUIC server and sends its address to Master
26//! 4. **Direct Connection**: Master forwards address to Destination, which connects to Source
27//! 5. **Data Transfer**: Files flow directly from Source to Destination (not through Master)
28//!
29//! This design ensures efficient data transfer while allowing the Master to coordinate
30//! operations and monitor progress.
31//!
32//! # Key Components
33//!
34//! ## SSH Session Management
35//!
36//! The [`SshSession`] type represents an SSH connection to a remote host and is used to:
37//! - Launch `rcpd` daemons on remote hosts
38//! - Configure connection parameters (user, host, port)
39//!
40//! ## QUIC Networking
41//!
42//! QUIC protocol provides:
43//! - Multiplexed streams over a single connection
44//! - Built-in encryption and authentication
45//! - Efficient data transfer with congestion control
46//!
47//! Key functions:
48//! - [`get_server_with_port_ranges`] - Create QUIC server endpoint with optional port restrictions
49//! - [`get_client_with_port_ranges_and_pinning`] - Create secure QUIC client with certificate pinning
50//! - [`get_endpoint_addr`] - Get the local address of an endpoint
51//!
52//! ## Port Range Configuration
53//!
54//! The [`port_ranges`] module allows restricting QUIC to specific port ranges, useful for
55//! firewall-restricted environments:
56//!
57//! ```rust,no_run
58//! # use remote::get_server_with_port_ranges;
59//! // Bind to ports in the 8000-8999 range
60//! let endpoint = get_server_with_port_ranges(Some("8000-8999"))?;
61//! # Ok::<(), anyhow::Error>(())
62//! ```
63//!
64//! ## Protocol Messages
65//!
66//! The [`protocol`] module defines the message types exchanged between nodes:
67//! - `MasterHello` - Master → rcpd configuration
68//! - `SourceMasterHello` - Source → Master address information
69//! - `RcpdResult` - rcpd → Master operation results
70//! - `TracingHello` - rcpd → Master tracing initialization
71//!
72//! ## Stream Communication
73//!
74//! The [`streams`] module provides high-level abstractions over QUIC streams:
75//! - Bidirectional streams for request/response communication
76//! - Unidirectional streams for tracing and logging
77//! - Object serialization/deserialization using bincode
78//!
79//! ## Remote Tracing
80//!
81//! The [`tracelog`] module enables distributed logging and progress tracking:
82//! - Forward tracing events from remote `rcpd` processes to Master
83//! - Aggregate progress information across multiple remote operations
84//! - Display unified progress for distributed operations
85//!
86//! # Security Model
87//!
88//! The remote copy system implements a defense-in-depth security model using SSH for authentication
89//! and certificate pinning for QUIC connection integrity. This provides protection against
90//! man-in-the-middle (MITM) attacks while maintaining ease of deployment.
91//!
92//! ## Authentication & Authorization
93//!
94//! **SSH is the security perimeter**: All remote operations begin with SSH authentication.
95//! - Initial access control is handled entirely by SSH
96//! - Users must be authenticated and authorized via SSH before any QUIC connections are established
97//! - SSH configuration (keys, permissions, etc.) determines who can initiate remote copies
98//!
99//! ## Transport Encryption & Integrity
100//!
101//! **QUIC with TLS 1.3**: All data transfer uses QUIC protocol built on TLS 1.3
102//! - Provides encryption for data confidentiality
103//! - Ensures data integrity through cryptographic authentication
104//! - Built-in protection against replay attacks
105//!
106//! ## Trust Bootstrap via Certificate Pinning
107//!
108//! **Two secured QUIC connections** in every remote copy operation:
109//!
110//! ### 1. Master ← rcpd (Control Connection)
111//! ```text
112//! Master (rcp) Remote Host (rcpd)
113//! | |
114//! | 1. SSH connection established |
115//! |<--------------------------------->|
116//! | 2. Master generates self-signed |
117//! | cert, computes SHA-256 |
118//! | fingerprint |
119//! | |
120//! | 3. Launch rcpd via SSH with |
121//! | fingerprint as argument |
122//! |---------------------------------->|
123//! | |
124//! | 4. rcpd validates Master's cert |
125//! | against received fingerprint |
126//! |<---(QUIC + cert pinning)----------|
127//! ```
128//!
129//! - Master generates ephemeral self-signed certificate at startup
130//! - Certificate fingerprint (SHA-256) is passed to rcpd via SSH command-line arguments
131//! - rcpd validates Master's certificate by computing its fingerprint and comparing
132//! - Connection fails if fingerprints don't match (MITM protection)
133//!
134//! ### 2. Source → Destination (Data Transfer Connection)
135//! ```text
136//! Source (rcpd) Destination (rcpd)
137//! | |
138//! | 1. Source generates self-signed |
139//! | cert, computes SHA-256 |
140//! | fingerprint |
141//! | |
142//! | 2. Send fingerprint + address |
143//! | to Master via secure channel |
144//! |---------------------------------->|
145//! | Master |
146//! | | |
147//! | 3. Master forwards | |
148//! | to Destination | |
149//! | |----------->|
150//! | |
151//! | 4. Destination validates Source's |
152//! | cert against received |
153//! | fingerprint |
154//! |<---(QUIC + cert pinning)----------|
155//! ```
156//!
157//! - Source generates ephemeral self-signed certificate
158//! - Fingerprint is sent to Master over already-secured Master←Source connection
159//! - Master forwards fingerprint to Destination over already-secured Master←Destination connection
160//! - Destination validates Source's certificate against fingerprint
161//! - Direct Source→Destination connection established only after successful validation
162//!
163//! ## SSH as Secure Out-of-Band Channel
164//!
165//! **Key insight**: SSH provides a secure, authenticated channel for bootstrapping QUIC trust
166//!
167//! - Certificate fingerprints are transmitted through SSH (Master→rcpd command-line arguments)
168//! - SSH connection is already authenticated and encrypted
169//! - This creates a "chain of trust":
170//! 1. User trusts SSH (proven by successful authentication)
171//! 2. SSH carries the certificate fingerprint securely
172//! 3. QUIC connection validates against that fingerprint
173//! 4. Therefore, QUIC connection is trustworthy
174//!
175//! ## Attack Resistance
176//!
177//! ### ✅ Protected Against
178//!
179//! - **Man-in-the-Middle (MITM)**: Certificate pinning prevents attackers from impersonating endpoints
180//! - **Replay Attacks**: TLS 1.3 in QUIC provides built-in replay protection
181//! - **Eavesdropping**: All data encrypted with TLS 1.3
182//! - **Tampering**: Cryptographic integrity checks prevent data modification
183//! - **Unauthorized Access**: SSH authentication is required before any operations
184//!
185//! ### ⚠️ Threat Model Assumptions
186//!
187//! - **SSH is secure**: The security model depends on SSH being properly configured and uncompromised
188//! - **Certificate fingerprints are short-lived**: Ephemeral certificates are generated per-session
189//! - **Trusted network for Master**: The machine running Master (rcp) should be trusted
190//!
191//! ## Best Practices
192//!
193//! 1. **Secure SSH Configuration**: Use key-based authentication, disable password auth
194//! 2. **Keep Systems Updated**: Ensure SSH, TLS libraries, and QUIC implementations are current
195//! 3. **Network Segmentation**: Run remote copies on trusted network segments when possible
196//! 4. **Monitor Logs**: Certificate validation failures indicate potential security issues
197//!
198//! # Network Troubleshooting
199//!
200//! Common failure scenarios and their handling:
201//!
202//! ## SSH Connection Fails
203//! - **Cause**: Host unreachable, authentication failure
204//! - **Timeout**: ~30s (SSH default)
205//! - **Error**: Standard SSH error messages
206//!
207//! ## rcpd Cannot Connect to Master
208//! - **Cause**: Firewall blocks QUIC, network routing issue
209//! - **Timeout**: Configurable via `--remote-copy-conn-timeout-sec` (default: 15s)
210//! - **Solution**: Check firewall rules for QUIC ports
211//!
212//! ## Destination Cannot Connect to Source
213//! - **Cause**: Firewall blocks direct connection between hosts
214//! - **Timeout**: Configurable (default: 15s)
215//! - **Solution**: Use `--quic-port-ranges` to specify allowed ports, configure firewall
216//!
217//! For detailed troubleshooting, see the repository's `docs/network_connectivity.md`.
218//!
219//! # Examples
220//!
221//! ## Starting a Remote Copy Daemon
222//!
223//! ```rust,no_run
224//! use remote::{SshSession, protocol::RcpdConfig, start_rcpd};
225//! use std::net::SocketAddr;
226//!
227//! # async fn example() -> anyhow::Result<()> {
228//! let session = SshSession {
229//! user: Some("user".to_string()),
230//! host: "example.com".to_string(),
231//! port: None,
232//! };
233//!
234//! let config = RcpdConfig {
235//! verbose: 0,
236//! fail_early: false,
237//! max_workers: 4,
238//! max_blocking_threads: 512,
239//! max_open_files: None,
240//! ops_throttle: 0,
241//! iops_throttle: 0,
242//! chunk_size: 1024 * 1024,
243//! dereference: false,
244//! overwrite: false,
245//! overwrite_compare: String::new(),
246//! debug_log_prefix: None,
247//! quic_port_ranges: None,
248//! progress: false,
249//! progress_delay: None,
250//! remote_copy_conn_timeout_sec: 15,
251//! master_cert_fingerprint: Vec::new(),
252//! };
253//! let master_addr: SocketAddr = "192.168.1.100:5000".parse()?;
254//! let server_name = "master-server";
255//!
256//! let process = start_rcpd(&config, &session, &master_addr, server_name).await?;
257//! # Ok(())
258//! # }
259//! ```
260//!
261//! ## Creating a QUIC Server with Port Ranges
262//!
263//! ```rust,no_run
264//! use remote::{get_server_with_port_ranges, get_endpoint_addr};
265//!
266//! # fn example() -> anyhow::Result<()> {
267//! // Create server restricted to ports 8000-8999
268//! let (endpoint, _cert_fingerprint) = get_server_with_port_ranges(Some("8000-8999"))?;
269//! let addr = get_endpoint_addr(&endpoint)?;
270//! println!("Server listening on: {}", addr);
271//! # Ok(())
272//! # }
273//! ```
274//!
275//! # Module Organization
276//!
277//! - [`port_ranges`] - Port range parsing and UDP socket binding
278//! - [`protocol`] - Protocol message definitions and serialization
279//! - [`streams`] - QUIC stream wrappers with typed message passing
280//! - [`tracelog`] - Remote tracing and progress aggregation
281
282use anyhow::{anyhow, Context};
283use rand::Rng;
284use tracing::instrument;
285
286pub mod port_ranges;
287pub mod protocol;
288pub mod streams;
289pub mod tracelog;
290
291#[derive(Debug, PartialEq)]
292pub struct SshSession {
293 pub user: Option<String>,
294 pub host: String,
295 pub port: Option<u16>,
296}
297
298impl SshSession {
299 pub fn local() -> Self {
300 Self {
301 user: None,
302 host: "localhost".to_string(),
303 port: None,
304 }
305 }
306}
307
308async fn setup_ssh_session(
309 session: &SshSession,
310) -> anyhow::Result<std::sync::Arc<openssh::Session>> {
311 let host = session.host.as_str();
312 let destination = match (session.user.as_deref(), session.port) {
313 (Some(user), Some(port)) => format!("ssh://{user}@{host}:{port}"),
314 (None, Some(port)) => format!("ssh://{}:{}", session.host, port),
315 (Some(user), None) => format!("ssh://{user}@{host}"),
316 (None, None) => format!("ssh://{host}"),
317 };
318 tracing::debug!("Connecting to SSH destination: {}", destination);
319 let session = std::sync::Arc::new(
320 openssh::Session::connect(destination, openssh::KnownHosts::Accept)
321 .await
322 .context("Failed to establish SSH connection")?,
323 );
324 Ok(session)
325}
326
327#[instrument]
328pub async fn wait_for_rcpd_process(
329 process: openssh::Child<std::sync::Arc<openssh::Session>>,
330) -> anyhow::Result<()> {
331 tracing::info!("Waiting on rcpd server on: {:?}", process);
332 // wait for process to exit with a timeout and capture output
333 let output = tokio::time::timeout(
334 std::time::Duration::from_secs(10),
335 process.wait_with_output(),
336 )
337 .await
338 .context("Timeout waiting for rcpd process to exit")?
339 .context("Failed to wait for rcpd process")?;
340 if !output.status.success() {
341 let stdout = String::from_utf8_lossy(&output.stdout);
342 let stderr = String::from_utf8_lossy(&output.stderr);
343 tracing::error!(
344 "rcpd command failed on remote host, status code: {:?}\nstdout:\n{}\nstderr:\n{}",
345 output.status.code(),
346 stdout,
347 stderr
348 );
349 return Err(anyhow!(
350 "rcpd command failed on remote host, status code: {:?}",
351 output.status.code(),
352 ));
353 }
354 // log stderr even on success if there's any output (might contain warnings)
355 if !output.stderr.is_empty() {
356 let stderr = String::from_utf8_lossy(&output.stderr);
357 tracing::debug!("rcpd stderr output:\n{}", stderr);
358 }
359 Ok(())
360}
361
362#[instrument]
363pub async fn start_rcpd(
364 rcpd_config: &protocol::RcpdConfig,
365 session: &SshSession,
366 master_addr: &std::net::SocketAddr,
367 master_server_name: &str,
368) -> anyhow::Result<openssh::Child<std::sync::Arc<openssh::Session>>> {
369 tracing::info!("Starting rcpd server on: {:?}", session);
370 let session = setup_ssh_session(session).await?;
371 // Run rcpd command remotely
372 let current_exe = std::env::current_exe().context("Failed to get current executable path")?;
373 let bin_dir = current_exe
374 .parent()
375 .context("Failed to get parent directory of current executable")?;
376 tracing::debug!("Running rcpd from: {:?}", bin_dir);
377 let rcpd_args = rcpd_config.to_args();
378 tracing::debug!("rcpd arguments: {:?}", rcpd_args);
379 let mut cmd = session.arc_command(format!("{}/rcpd", bin_dir.display()));
380 cmd.arg("--master-addr")
381 .arg(master_addr.to_string())
382 .arg("--server-name")
383 .arg(master_server_name)
384 .args(rcpd_args);
385 // capture stdout and stderr so we can read them later
386 cmd.stdout(openssh::Stdio::piped());
387 cmd.stderr(openssh::Stdio::piped());
388 tracing::info!("Will run remotely: {cmd:?}");
389 cmd.spawn().await.context("Failed to spawn rcpd command")
390}
391
392/// Compute SHA-256 fingerprint of a DER-encoded certificate
393fn compute_cert_fingerprint(cert_der: &[u8]) -> ring::digest::Digest {
394 ring::digest::digest(&ring::digest::SHA256, cert_der)
395}
396
397/// Configure QUIC server with a self-signed certificate
398/// Returns the server config and the SHA-256 fingerprint of the certificate
399fn configure_server() -> anyhow::Result<(quinn::ServerConfig, Vec<u8>)> {
400 tracing::info!("Configuring QUIC server");
401 let cert = rcgen::generate_simple_self_signed(vec!["localhost".into()])?;
402 let key_der = cert.serialize_private_key_der();
403 let cert_der = cert.serialize_der()?;
404 let fingerprint = compute_cert_fingerprint(&cert_der);
405 let fingerprint_vec = fingerprint.as_ref().to_vec();
406 tracing::debug!(
407 "Generated certificate with fingerprint: {}",
408 hex::encode(&fingerprint_vec)
409 );
410 let key = rustls::PrivateKey(key_der);
411 let cert = rustls::Certificate(cert_der);
412 let server_config = quinn::ServerConfig::with_single_cert(vec![cert], key)
413 .context("Failed to create server config")?;
414 Ok((server_config, fingerprint_vec))
415}
416
417#[instrument]
418pub fn get_server_with_port_ranges(
419 port_ranges: Option<&str>,
420) -> anyhow::Result<(quinn::Endpoint, Vec<u8>)> {
421 let (server_config, cert_fingerprint) = configure_server()?;
422 let socket = if let Some(ranges_str) = port_ranges {
423 let ranges = port_ranges::PortRanges::parse(ranges_str)?;
424 ranges.bind_udp_socket(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED))?
425 } else {
426 // default behavior: bind to any available port
427 std::net::UdpSocket::bind("0.0.0.0:0")?
428 };
429 let endpoint = quinn::Endpoint::new(
430 quinn::EndpointConfig::default(),
431 Some(server_config),
432 socket,
433 std::sync::Arc::new(quinn::TokioRuntime),
434 )
435 .context("Failed to create QUIC endpoint")?;
436 Ok((endpoint, cert_fingerprint))
437}
438
439// certificate verifier that validates against a pinned certificate fingerprint
440// This prevents MITM attacks by ensuring we're connecting to the expected server
441struct PinnedCertVerifier {
442 expected_fingerprint: Vec<u8>,
443}
444
445impl PinnedCertVerifier {
446 fn new(expected_fingerprint: Vec<u8>) -> Self {
447 Self {
448 expected_fingerprint,
449 }
450 }
451}
452
453impl rustls::client::ServerCertVerifier for PinnedCertVerifier {
454 fn verify_server_cert(
455 &self,
456 end_entity: &rustls::Certificate,
457 _intermediates: &[rustls::Certificate],
458 _server_name: &rustls::ServerName,
459 _scts: &mut dyn Iterator<Item = &[u8]>,
460 _ocsp_response: &[u8],
461 _now: std::time::SystemTime,
462 ) -> Result<rustls::client::ServerCertVerified, rustls::Error> {
463 let received_fingerprint = compute_cert_fingerprint(&end_entity.0);
464 if received_fingerprint.as_ref() == self.expected_fingerprint.as_slice() {
465 tracing::debug!(
466 "Certificate fingerprint validated successfully: {}",
467 hex::encode(&self.expected_fingerprint)
468 );
469 Ok(rustls::client::ServerCertVerified::assertion())
470 } else {
471 tracing::error!(
472 "Certificate fingerprint mismatch! Expected: {}, Got: {}",
473 hex::encode(&self.expected_fingerprint),
474 hex::encode(received_fingerprint)
475 );
476 Err(rustls::Error::InvalidCertificate(
477 rustls::CertificateError::Other(std::sync::Arc::new(std::io::Error::new(
478 std::io::ErrorKind::InvalidData,
479 format!(
480 "Certificate fingerprint mismatch (expected {}, got {})",
481 hex::encode(&self.expected_fingerprint),
482 hex::encode(received_fingerprint)
483 ),
484 ))),
485 ))
486 }
487 }
488}
489
490fn get_local_ip() -> anyhow::Result<std::net::IpAddr> {
491 let socket = std::net::UdpSocket::bind("0.0.0.0:0")?;
492 socket.connect("8.8.8.8:80")?;
493 Ok(socket.local_addr()?.ip())
494}
495
496#[instrument]
497pub fn get_endpoint_addr(endpoint: &quinn::Endpoint) -> anyhow::Result<std::net::SocketAddr> {
498 // endpoint is bound to 0.0.0.0 so we need to get the local IP address
499 let local_ip = get_local_ip().context("Failed to get local IP address")?;
500 let endpoint_addr = endpoint.local_addr()?;
501 Ok(std::net::SocketAddr::new(local_ip, endpoint_addr.port()))
502}
503
504#[instrument]
505pub fn get_random_server_name() -> String {
506 rand::thread_rng()
507 .sample_iter(&rand::distributions::Alphanumeric)
508 .take(20)
509 .map(char::from)
510 .collect()
511}
512
513#[instrument]
514pub fn get_client_with_cert_pinning(cert_fingerprint: Vec<u8>) -> anyhow::Result<quinn::Endpoint> {
515 get_client_with_port_ranges_and_pinning(None, cert_fingerprint)
516}
517
518#[instrument]
519pub fn get_client_with_port_ranges_and_pinning(
520 port_ranges: Option<&str>,
521 cert_fingerprint: Vec<u8>,
522) -> anyhow::Result<quinn::Endpoint> {
523 tracing::info!(
524 "Creating QUIC client with certificate pinning (fingerprint: {})",
525 hex::encode(&cert_fingerprint)
526 );
527 // create a crypto backend with certificate pinning
528 let crypto = rustls::ClientConfig::builder()
529 .with_safe_defaults()
530 .with_custom_certificate_verifier(std::sync::Arc::new(PinnedCertVerifier::new(
531 cert_fingerprint,
532 )))
533 .with_no_client_auth();
534 create_client_endpoint(port_ranges, crypto)
535}
536
537// helper function to create client endpoint with given crypto config
538fn create_client_endpoint(
539 port_ranges: Option<&str>,
540 crypto: rustls::ClientConfig,
541) -> anyhow::Result<quinn::Endpoint> {
542 // create QUIC client config
543 let client_config = quinn::ClientConfig::new(std::sync::Arc::new(crypto));
544 let socket = if let Some(ranges_str) = port_ranges {
545 let ranges = port_ranges::PortRanges::parse(ranges_str)?;
546 ranges.bind_udp_socket(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED))?
547 } else {
548 // default behavior: bind to any available port
549 std::net::UdpSocket::bind("0.0.0.0:0")?
550 };
551 // create and configure endpoint
552 let mut endpoint = quinn::Endpoint::new(
553 quinn::EndpointConfig::default(),
554 None, // No server config for client
555 socket,
556 std::sync::Arc::new(quinn::TokioRuntime),
557 )
558 .context("Failed to create QUIC endpoint")?;
559 endpoint.set_default_client_config(client_config);
560 Ok(endpoint)
561}