remote/
lib.rs

1//! Remote copy protocol and networking for distributed file operations
2//!
3//! This crate provides the networking layer and protocol definitions for remote file copying
4//! in the RCP tools suite. It enables efficient distributed copying between remote hosts using
5//! SSH for orchestration and QUIC for high-performance data transfer.
6//!
7//! # Overview
8//!
9//! The remote copy system uses a three-node architecture:
10//!
11//! ```text
12//! Master (rcp)
13//! ├── SSH → Source Host (rcpd)
14//! │   └── QUIC → Master (control)
15//! │   └── QUIC Server (waits for Destination)
16//! └── SSH → Destination Host (rcpd)
17//!     └── QUIC → Master (control)
18//!     └── QUIC Client → Source (data transfer)
19//! ```
20//!
21//! ## Connection Flow
22//!
23//! 1. **Initialization**: Master starts `rcpd` processes on source and destination via SSH
24//! 2. **Control Connections**: Both `rcpd` processes connect back to Master via QUIC
25//! 3. **Address Exchange**: Source starts QUIC server and sends its address to Master
26//! 4. **Direct Connection**: Master forwards address to Destination, which connects to Source
27//! 5. **Data Transfer**: Files flow directly from Source to Destination (not through Master)
28//!
29//! This design ensures efficient data transfer while allowing the Master to coordinate
30//! operations and monitor progress.
31//!
32//! # Key Components
33//!
34//! ## SSH Session Management
35//!
36//! The [`SshSession`] type represents an SSH connection to a remote host and is used to:
37//! - Launch `rcpd` daemons on remote hosts
38//! - Configure connection parameters (user, host, port)
39//!
40//! ## QUIC Networking
41//!
42//! QUIC protocol provides:
43//! - Multiplexed streams over a single connection
44//! - Built-in encryption and authentication
45//! - Efficient data transfer with congestion control
46//!
47//! Key functions:
48//! - [`get_server_with_port_ranges`] - Create QUIC server endpoint with optional port restrictions
49//! - [`get_client_with_port_ranges_and_pinning`] - Create secure QUIC client with certificate pinning
50//! - [`get_endpoint_addr`] - Get the local address of an endpoint
51//!
52//! ## Port Range Configuration
53//!
54//! The [`port_ranges`] module allows restricting QUIC to specific port ranges, useful for
55//! firewall-restricted environments:
56//!
57//! ```rust,no_run
58//! # use remote::get_server_with_port_ranges;
59//! // Bind to ports in the 8000-8999 range
60//! let endpoint = get_server_with_port_ranges(Some("8000-8999"))?;
61//! # Ok::<(), anyhow::Error>(())
62//! ```
63//!
64//! ## Protocol Messages
65//!
66//! The [`protocol`] module defines the message types exchanged between nodes:
67//! - `MasterHello` - Master → rcpd configuration
68//! - `SourceMasterHello` - Source → Master address information
69//! - `RcpdResult` - rcpd → Master operation results
70//! - `TracingHello` - rcpd → Master tracing initialization
71//!
72//! ## Stream Communication
73//!
74//! The [`streams`] module provides high-level abstractions over QUIC streams:
75//! - Bidirectional streams for request/response communication
76//! - Unidirectional streams for tracing and logging
77//! - Object serialization/deserialization using bincode
78//!
79//! ## Remote Tracing
80//!
81//! The [`tracelog`] module enables distributed logging and progress tracking:
82//! - Forward tracing events from remote `rcpd` processes to Master
83//! - Aggregate progress information across multiple remote operations
84//! - Display unified progress for distributed operations
85//!
86//! # Security Model
87//!
88//! The remote copy system implements a defense-in-depth security model using SSH for authentication
89//! and certificate pinning for QUIC connection integrity. This provides protection against
90//! man-in-the-middle (MITM) attacks while maintaining ease of deployment.
91//!
92//! ## Authentication & Authorization
93//!
94//! **SSH is the security perimeter**: All remote operations begin with SSH authentication.
95//! - Initial access control is handled entirely by SSH
96//! - Users must be authenticated and authorized via SSH before any QUIC connections are established
97//! - SSH configuration (keys, permissions, etc.) determines who can initiate remote copies
98//!
99//! ## Transport Encryption & Integrity
100//!
101//! **QUIC with TLS 1.3**: All data transfer uses QUIC protocol built on TLS 1.3
102//! - Provides encryption for data confidentiality
103//! - Ensures data integrity through cryptographic authentication
104//! - Built-in protection against replay attacks
105//!
106//! ## Trust Bootstrap via Certificate Pinning
107//!
108//! **Two secured QUIC connections** in every remote copy operation:
109//!
110//! ### 1. Master ← rcpd (Control Connection)
111//! ```text
112//! Master (rcp)                    Remote Host (rcpd)
113//!    |                                   |
114//!    | 1. SSH connection established     |
115//!    |<--------------------------------->|
116//!    | 2. Master generates self-signed   |
117//!    |    cert, computes SHA-256         |
118//!    |    fingerprint                    |
119//!    |                                   |
120//!    | 3. Launch rcpd via SSH with       |
121//!    |    fingerprint as argument        |
122//!    |---------------------------------->|
123//!    |                                   |
124//!    | 4. rcpd validates Master's cert   |
125//!    |    against received fingerprint   |
126//!    |<---(QUIC + cert pinning)----------|
127//! ```
128//!
129//! - Master generates ephemeral self-signed certificate at startup
130//! - Certificate fingerprint (SHA-256) is passed to rcpd via SSH command-line arguments
131//! - rcpd validates Master's certificate by computing its fingerprint and comparing
132//! - Connection fails if fingerprints don't match (MITM protection)
133//!
134//! ### 2. Source → Destination (Data Transfer Connection)
135//! ```text
136//! Source (rcpd)                   Destination (rcpd)
137//!    |                                   |
138//!    | 1. Source generates self-signed   |
139//!    |    cert, computes SHA-256         |
140//!    |    fingerprint                    |
141//!    |                                   |
142//!    | 2. Send fingerprint + address     |
143//!    |    to Master via secure channel   |
144//!    |---------------------------------->|
145//!    |                    Master         |
146//!    |                      |            |
147//!    | 3. Master forwards   |            |
148//!    |    to Destination    |            |
149//!    |                      |----------->|
150//!    |                                   |
151//!    | 4. Destination validates Source's |
152//!    |    cert against received          |
153//!    |    fingerprint                    |
154//!    |<---(QUIC + cert pinning)----------|
155//! ```
156//!
157//! - Source generates ephemeral self-signed certificate
158//! - Fingerprint is sent to Master over already-secured Master←Source connection
159//! - Master forwards fingerprint to Destination over already-secured Master←Destination connection
160//! - Destination validates Source's certificate against fingerprint
161//! - Direct Source→Destination connection established only after successful validation
162//!
163//! ## SSH as Secure Out-of-Band Channel
164//!
165//! **Key insight**: SSH provides a secure, authenticated channel for bootstrapping QUIC trust
166//!
167//! - Certificate fingerprints are transmitted through SSH (Master→rcpd command-line arguments)
168//! - SSH connection is already authenticated and encrypted
169//! - This creates a "chain of trust":
170//!   1. User trusts SSH (proven by successful authentication)
171//!   2. SSH carries the certificate fingerprint securely
172//!   3. QUIC connection validates against that fingerprint
173//!   4. Therefore, QUIC connection is trustworthy
174//!
175//! ## Attack Resistance
176//!
177//! ### ✅ Protected Against
178//!
179//! - **Man-in-the-Middle (MITM)**: Certificate pinning prevents attackers from impersonating endpoints
180//! - **Replay Attacks**: TLS 1.3 in QUIC provides built-in replay protection
181//! - **Eavesdropping**: All data encrypted with TLS 1.3
182//! - **Tampering**: Cryptographic integrity checks prevent data modification
183//! - **Unauthorized Access**: SSH authentication is required before any operations
184//!
185//! ### ⚠️ Threat Model Assumptions
186//!
187//! - **SSH is secure**: The security model depends on SSH being properly configured and uncompromised
188//! - **Certificate fingerprints are short-lived**: Ephemeral certificates are generated per-session
189//! - **Trusted network for Master**: The machine running Master (rcp) should be trusted
190//!
191//! ## Best Practices
192//!
193//! 1. **Secure SSH Configuration**: Use key-based authentication, disable password auth
194//! 2. **Keep Systems Updated**: Ensure SSH, TLS libraries, and QUIC implementations are current
195//! 3. **Network Segmentation**: Run remote copies on trusted network segments when possible
196//! 4. **Monitor Logs**: Certificate validation failures indicate potential security issues
197//!
198//! # Network Troubleshooting
199//!
200//! Common failure scenarios and their handling:
201//!
202//! ## SSH Connection Fails
203//! - **Cause**: Host unreachable, authentication failure
204//! - **Timeout**: ~30s (SSH default)
205//! - **Error**: Standard SSH error messages
206//!
207//! ## rcpd Cannot Connect to Master
208//! - **Cause**: Firewall blocks QUIC, network routing issue
209//! - **Timeout**: Configurable via `--remote-copy-conn-timeout-sec` (default: 15s)
210//! - **Solution**: Check firewall rules for QUIC ports
211//!
212//! ## Destination Cannot Connect to Source
213//! - **Cause**: Firewall blocks direct connection between hosts
214//! - **Timeout**: Configurable (default: 15s)
215//! - **Solution**: Use `--quic-port-ranges` to specify allowed ports, configure firewall
216//!
217//! For detailed troubleshooting, see the repository's `docs/network_connectivity.md`.
218//!
219//! # Examples
220//!
221//! ## Starting a Remote Copy Daemon
222//!
223//! ```rust,no_run
224//! use remote::{SshSession, protocol::RcpdConfig, start_rcpd};
225//! use std::net::SocketAddr;
226//!
227//! # async fn example() -> anyhow::Result<()> {
228//! let session = SshSession {
229//!     user: Some("user".to_string()),
230//!     host: "example.com".to_string(),
231//!     port: None,
232//! };
233//!
234//! let config = RcpdConfig {
235//!     verbose: 0,
236//!     fail_early: false,
237//!     max_workers: 4,
238//!     max_blocking_threads: 512,
239//!     max_open_files: None,
240//!     ops_throttle: 0,
241//!     iops_throttle: 0,
242//!     chunk_size: 1024 * 1024,
243//!     dereference: false,
244//!     overwrite: false,
245//!     overwrite_compare: String::new(),
246//!     debug_log_prefix: None,
247//!     quic_port_ranges: None,
248//!     progress: false,
249//!     progress_delay: None,
250//!     remote_copy_conn_timeout_sec: 15,
251//!     master_cert_fingerprint: Vec::new(),
252//! };
253//! let master_addr: SocketAddr = "192.168.1.100:5000".parse()?;
254//! let server_name = "master-server";
255//!
256//! let process = start_rcpd(&config, &session, &master_addr, server_name).await?;
257//! # Ok(())
258//! # }
259//! ```
260//!
261//! ## Creating a QUIC Server with Port Ranges
262//!
263//! ```rust,no_run
264//! use remote::{get_server_with_port_ranges, get_endpoint_addr};
265//!
266//! # fn example() -> anyhow::Result<()> {
267//! // Create server restricted to ports 8000-8999
268//! let (endpoint, _cert_fingerprint) = get_server_with_port_ranges(Some("8000-8999"))?;
269//! let addr = get_endpoint_addr(&endpoint)?;
270//! println!("Server listening on: {}", addr);
271//! # Ok(())
272//! # }
273//! ```
274//!
275//! # Module Organization
276//!
277//! - [`port_ranges`] - Port range parsing and UDP socket binding
278//! - [`protocol`] - Protocol message definitions and serialization
279//! - [`streams`] - QUIC stream wrappers with typed message passing
280//! - [`tracelog`] - Remote tracing and progress aggregation
281
282use anyhow::{anyhow, Context};
283use rand::Rng;
284use tracing::instrument;
285
286pub mod port_ranges;
287pub mod protocol;
288pub mod streams;
289pub mod tracelog;
290
291#[derive(Debug, PartialEq)]
292pub struct SshSession {
293    pub user: Option<String>,
294    pub host: String,
295    pub port: Option<u16>,
296}
297
298impl SshSession {
299    pub fn local() -> Self {
300        Self {
301            user: None,
302            host: "localhost".to_string(),
303            port: None,
304        }
305    }
306}
307
308async fn setup_ssh_session(
309    session: &SshSession,
310) -> anyhow::Result<std::sync::Arc<openssh::Session>> {
311    let host = session.host.as_str();
312    let destination = match (session.user.as_deref(), session.port) {
313        (Some(user), Some(port)) => format!("ssh://{user}@{host}:{port}"),
314        (None, Some(port)) => format!("ssh://{}:{}", session.host, port),
315        (Some(user), None) => format!("ssh://{user}@{host}"),
316        (None, None) => format!("ssh://{host}"),
317    };
318    tracing::debug!("Connecting to SSH destination: {}", destination);
319    let session = std::sync::Arc::new(
320        openssh::Session::connect(destination, openssh::KnownHosts::Accept)
321            .await
322            .context("Failed to establish SSH connection")?,
323    );
324    Ok(session)
325}
326
327#[instrument]
328pub async fn wait_for_rcpd_process(
329    process: openssh::Child<std::sync::Arc<openssh::Session>>,
330) -> anyhow::Result<()> {
331    tracing::info!("Waiting on rcpd server on: {:?}", process);
332    // wait for process to exit with a timeout and capture output
333    let output = tokio::time::timeout(
334        std::time::Duration::from_secs(10),
335        process.wait_with_output(),
336    )
337    .await
338    .context("Timeout waiting for rcpd process to exit")?
339    .context("Failed to wait for rcpd process")?;
340    if !output.status.success() {
341        let stdout = String::from_utf8_lossy(&output.stdout);
342        let stderr = String::from_utf8_lossy(&output.stderr);
343        tracing::error!(
344            "rcpd command failed on remote host, status code: {:?}\nstdout:\n{}\nstderr:\n{}",
345            output.status.code(),
346            stdout,
347            stderr
348        );
349        return Err(anyhow!(
350            "rcpd command failed on remote host, status code: {:?}",
351            output.status.code(),
352        ));
353    }
354    // log stderr even on success if there's any output (might contain warnings)
355    if !output.stderr.is_empty() {
356        let stderr = String::from_utf8_lossy(&output.stderr);
357        tracing::debug!("rcpd stderr output:\n{}", stderr);
358    }
359    Ok(())
360}
361
362#[instrument]
363pub async fn start_rcpd(
364    rcpd_config: &protocol::RcpdConfig,
365    session: &SshSession,
366    master_addr: &std::net::SocketAddr,
367    master_server_name: &str,
368) -> anyhow::Result<openssh::Child<std::sync::Arc<openssh::Session>>> {
369    tracing::info!("Starting rcpd server on: {:?}", session);
370    let session = setup_ssh_session(session).await?;
371    // Run rcpd command remotely
372    let current_exe = std::env::current_exe().context("Failed to get current executable path")?;
373    let bin_dir = current_exe
374        .parent()
375        .context("Failed to get parent directory of current executable")?;
376    tracing::debug!("Running rcpd from: {:?}", bin_dir);
377    let rcpd_args = rcpd_config.to_args();
378    tracing::debug!("rcpd arguments: {:?}", rcpd_args);
379    let mut cmd = session.arc_command(format!("{}/rcpd", bin_dir.display()));
380    cmd.arg("--master-addr")
381        .arg(master_addr.to_string())
382        .arg("--server-name")
383        .arg(master_server_name)
384        .args(rcpd_args);
385    // capture stdout and stderr so we can read them later
386    cmd.stdout(openssh::Stdio::piped());
387    cmd.stderr(openssh::Stdio::piped());
388    tracing::info!("Will run remotely: {cmd:?}");
389    cmd.spawn().await.context("Failed to spawn rcpd command")
390}
391
392/// Compute SHA-256 fingerprint of a DER-encoded certificate
393fn compute_cert_fingerprint(cert_der: &[u8]) -> ring::digest::Digest {
394    ring::digest::digest(&ring::digest::SHA256, cert_der)
395}
396
397/// Configure QUIC server with a self-signed certificate
398/// Returns the server config and the SHA-256 fingerprint of the certificate
399fn configure_server() -> anyhow::Result<(quinn::ServerConfig, Vec<u8>)> {
400    tracing::info!("Configuring QUIC server");
401    let cert = rcgen::generate_simple_self_signed(vec!["localhost".into()])?;
402    let key_der = cert.serialize_private_key_der();
403    let cert_der = cert.serialize_der()?;
404    let fingerprint = compute_cert_fingerprint(&cert_der);
405    let fingerprint_vec = fingerprint.as_ref().to_vec();
406    tracing::debug!(
407        "Generated certificate with fingerprint: {}",
408        hex::encode(&fingerprint_vec)
409    );
410    let key = rustls::PrivateKey(key_der);
411    let cert = rustls::Certificate(cert_der);
412    let server_config = quinn::ServerConfig::with_single_cert(vec![cert], key)
413        .context("Failed to create server config")?;
414    Ok((server_config, fingerprint_vec))
415}
416
417#[instrument]
418pub fn get_server_with_port_ranges(
419    port_ranges: Option<&str>,
420) -> anyhow::Result<(quinn::Endpoint, Vec<u8>)> {
421    let (server_config, cert_fingerprint) = configure_server()?;
422    let socket = if let Some(ranges_str) = port_ranges {
423        let ranges = port_ranges::PortRanges::parse(ranges_str)?;
424        ranges.bind_udp_socket(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED))?
425    } else {
426        // default behavior: bind to any available port
427        std::net::UdpSocket::bind("0.0.0.0:0")?
428    };
429    let endpoint = quinn::Endpoint::new(
430        quinn::EndpointConfig::default(),
431        Some(server_config),
432        socket,
433        std::sync::Arc::new(quinn::TokioRuntime),
434    )
435    .context("Failed to create QUIC endpoint")?;
436    Ok((endpoint, cert_fingerprint))
437}
438
439// certificate verifier that validates against a pinned certificate fingerprint
440// This prevents MITM attacks by ensuring we're connecting to the expected server
441struct PinnedCertVerifier {
442    expected_fingerprint: Vec<u8>,
443}
444
445impl PinnedCertVerifier {
446    fn new(expected_fingerprint: Vec<u8>) -> Self {
447        Self {
448            expected_fingerprint,
449        }
450    }
451}
452
453impl rustls::client::ServerCertVerifier for PinnedCertVerifier {
454    fn verify_server_cert(
455        &self,
456        end_entity: &rustls::Certificate,
457        _intermediates: &[rustls::Certificate],
458        _server_name: &rustls::ServerName,
459        _scts: &mut dyn Iterator<Item = &[u8]>,
460        _ocsp_response: &[u8],
461        _now: std::time::SystemTime,
462    ) -> Result<rustls::client::ServerCertVerified, rustls::Error> {
463        let received_fingerprint = compute_cert_fingerprint(&end_entity.0);
464        if received_fingerprint.as_ref() == self.expected_fingerprint.as_slice() {
465            tracing::debug!(
466                "Certificate fingerprint validated successfully: {}",
467                hex::encode(&self.expected_fingerprint)
468            );
469            Ok(rustls::client::ServerCertVerified::assertion())
470        } else {
471            tracing::error!(
472                "Certificate fingerprint mismatch! Expected: {}, Got: {}",
473                hex::encode(&self.expected_fingerprint),
474                hex::encode(received_fingerprint)
475            );
476            Err(rustls::Error::InvalidCertificate(
477                rustls::CertificateError::Other(std::sync::Arc::new(std::io::Error::new(
478                    std::io::ErrorKind::InvalidData,
479                    format!(
480                        "Certificate fingerprint mismatch (expected {}, got {})",
481                        hex::encode(&self.expected_fingerprint),
482                        hex::encode(received_fingerprint)
483                    ),
484                ))),
485            ))
486        }
487    }
488}
489
490fn get_local_ip() -> anyhow::Result<std::net::IpAddr> {
491    let socket = std::net::UdpSocket::bind("0.0.0.0:0")?;
492    socket.connect("8.8.8.8:80")?;
493    Ok(socket.local_addr()?.ip())
494}
495
496#[instrument]
497pub fn get_endpoint_addr(endpoint: &quinn::Endpoint) -> anyhow::Result<std::net::SocketAddr> {
498    // endpoint is bound to 0.0.0.0 so we need to get the local IP address
499    let local_ip = get_local_ip().context("Failed to get local IP address")?;
500    let endpoint_addr = endpoint.local_addr()?;
501    Ok(std::net::SocketAddr::new(local_ip, endpoint_addr.port()))
502}
503
504#[instrument]
505pub fn get_random_server_name() -> String {
506    rand::thread_rng()
507        .sample_iter(&rand::distributions::Alphanumeric)
508        .take(20)
509        .map(char::from)
510        .collect()
511}
512
513#[instrument]
514pub fn get_client_with_cert_pinning(cert_fingerprint: Vec<u8>) -> anyhow::Result<quinn::Endpoint> {
515    get_client_with_port_ranges_and_pinning(None, cert_fingerprint)
516}
517
518#[instrument]
519pub fn get_client_with_port_ranges_and_pinning(
520    port_ranges: Option<&str>,
521    cert_fingerprint: Vec<u8>,
522) -> anyhow::Result<quinn::Endpoint> {
523    tracing::info!(
524        "Creating QUIC client with certificate pinning (fingerprint: {})",
525        hex::encode(&cert_fingerprint)
526    );
527    // create a crypto backend with certificate pinning
528    let crypto = rustls::ClientConfig::builder()
529        .with_safe_defaults()
530        .with_custom_certificate_verifier(std::sync::Arc::new(PinnedCertVerifier::new(
531            cert_fingerprint,
532        )))
533        .with_no_client_auth();
534    create_client_endpoint(port_ranges, crypto)
535}
536
537// helper function to create client endpoint with given crypto config
538fn create_client_endpoint(
539    port_ranges: Option<&str>,
540    crypto: rustls::ClientConfig,
541) -> anyhow::Result<quinn::Endpoint> {
542    // create QUIC client config
543    let client_config = quinn::ClientConfig::new(std::sync::Arc::new(crypto));
544    let socket = if let Some(ranges_str) = port_ranges {
545        let ranges = port_ranges::PortRanges::parse(ranges_str)?;
546        ranges.bind_udp_socket(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED))?
547    } else {
548        // default behavior: bind to any available port
549        std::net::UdpSocket::bind("0.0.0.0:0")?
550    };
551    // create and configure endpoint
552    let mut endpoint = quinn::Endpoint::new(
553        quinn::EndpointConfig::default(),
554        None, // No server config for client
555        socket,
556        std::sync::Arc::new(quinn::TokioRuntime),
557    )
558    .context("Failed to create QUIC endpoint")?;
559    endpoint.set_default_client_config(client_config);
560    Ok(endpoint)
561}