pub mod error;
use std::collections::{BTreeMap, BTreeSet};
use std::fs::Metadata;
use std::io::SeekFrom;
use std::net::SocketAddr;
use std::path::{Component, Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use directories::BaseDirs;
use mdns_sd::{ResolvedService, ServiceDaemon, ServiceEvent, ServiceInfo};
use quinn::crypto::rustls::QuicClientConfig;
use rcgen::{CertifiedKey, generate_simple_self_signed};
use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier};
use rustls::crypto::{CryptoProvider, verify_tls12_signature, verify_tls13_signature};
use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer, ServerName, UnixTime};
use rustls::{DigitallySignedStruct, SignatureScheme};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
use uuid::Uuid;
pub use error::{Error, Result};
pub const NATIVE_PROTOCOL_VERSION: u16 = 1;
pub const MIN_NATIVE_PROTOCOL_VERSION: u16 = 1;
pub const NATIVE_SERVICE_TYPE: &str = "_ferry._udp.local.";
const DIRECT_MAGIC: &[u8; 8] = b"FERRY01\n";
const MAX_FRAME_LEN: usize = 16 * 1024 * 1024;
const IO_BUFFER_SIZE: usize = 64 * 1024;
const MANIFEST_VERSION: u16 = 1;
const DEFAULT_CHUNK_SIZE: u64 = 1024 * 1024;
const CONFIG_DIR_NAME: &str = "ferry";
const PSK_PROOF_CONTEXT: &[u8] = b"fileferry-direct-psk-v1";
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DirectPeer {
address: SocketAddr,
expected_fingerprint: Option<String>,
}
impl DirectPeer {
pub fn parse(value: &str) -> Result<Self> {
Ok(Self {
address: value.parse()?,
expected_fingerprint: None,
})
}
pub const fn address(&self) -> SocketAddr {
self.address
}
pub const fn from_address(address: SocketAddr) -> Self {
Self {
address,
expected_fingerprint: None,
}
}
pub fn with_expected_fingerprint(mut self, fingerprint: impl Into<String>) -> Result<Self> {
self.expected_fingerprint = Some(normalized_fingerprint(fingerprint.into())?);
Ok(self)
}
pub fn expected_fingerprint(&self) -> Option<&str> {
self.expected_fingerprint.as_deref()
}
}
#[derive(Clone, PartialEq, Eq, Deserialize)]
pub struct PskSecret(String);
impl PskSecret {
pub fn new(psk: impl Into<String>) -> Result<Self> {
validate_psk(psk.into()).map(Self)
}
fn expose_secret(&self) -> &str {
&self.0
}
}
impl std::fmt::Debug for PskSecret {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter.write_str("PskSecret(<redacted>)")
}
}
impl Serialize for PskSecret {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str("<redacted>")
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SendRequest {
peer: DirectPeer,
paths: Vec<PathBuf>,
#[serde(default, skip_serializing)]
psk: Option<PskSecret>,
}
impl SendRequest {
pub fn new(peer: DirectPeer, paths: Vec<PathBuf>) -> Result<Self> {
if paths.is_empty() {
return Err(Error::InvalidInput(
"at least one file path is required".to_string(),
));
}
Ok(Self {
peer,
paths,
psk: None,
})
}
pub const fn peer(&self) -> &DirectPeer {
&self.peer
}
pub fn paths(&self) -> &[PathBuf] {
&self.paths
}
pub fn with_psk(mut self, psk: impl Into<String>) -> Result<Self> {
self.psk = Some(PskSecret::new(psk)?);
Ok(self)
}
pub fn psk(&self) -> Option<&str> {
self.psk.as_ref().map(PskSecret::expose_secret)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ReceiveRequest {
listen: SocketAddr,
#[serde(default, skip_serializing)]
psk: Option<PskSecret>,
}
impl ReceiveRequest {
pub fn new(listen: SocketAddr) -> Self {
Self { listen, psk: None }
}
pub const fn listen(&self) -> SocketAddr {
self.listen
}
pub fn with_psk(mut self, psk: impl Into<String>) -> Result<Self> {
self.psk = Some(PskSecret::new(psk)?);
Ok(self)
}
pub fn psk(&self) -> Option<&str> {
self.psk.as_ref().map(PskSecret::expose_secret)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AppConfig {
pub alias: String,
pub listen_port: u16,
pub quic_port: u16,
pub download_dir: String,
pub auto_accept_known: bool,
pub auto_accept_unknown: bool,
pub discovery: Vec<String>,
pub max_concurrent_files: usize,
pub trust: TrustConfig,
}
impl Default for AppConfig {
fn default() -> Self {
Self {
alias: default_alias(),
listen_port: 53317,
quic_port: 53318,
download_dir: "~/Downloads/ferry".to_string(),
auto_accept_known: true,
auto_accept_unknown: false,
discovery: vec!["native".to_string()],
max_concurrent_files: 8,
trust: TrustConfig::default(),
}
}
}
impl AppConfig {
pub fn load_or_default() -> Result<Self> {
Self::load_or_default_from(config_dir()?.join("config.toml"))
}
pub fn load_or_default_from(path: impl AsRef<Path>) -> Result<Self> {
let path = path.as_ref();
match std::fs::read_to_string(path) {
Ok(contents) => {
toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
}
Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Self::default()),
Err(source) => Err(Error::IoPath {
path: path.to_path_buf(),
source,
}),
}
}
pub fn save_default_path(&self) -> Result<()> {
self.save_to_path(config_dir()?.join("config.toml"))
}
pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
write_toml_file(path.as_ref(), self)
}
pub fn to_toml_string(&self) -> Result<String> {
toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
}
pub fn redacted(&self) -> Self {
let mut config = self.clone();
config.trust = config.trust.redacted();
config
}
pub fn to_redacted_toml_string(&self) -> Result<String> {
self.redacted().to_toml_string()
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DaemonConfig {
pub listen: SocketAddr,
pub destination: PathBuf,
}
impl DaemonConfig {
pub fn from_app_config(config: &AppConfig) -> Result<Self> {
Ok(Self {
listen: SocketAddr::from(([0, 0, 0, 0], config.quic_port)),
destination: expand_home_path(&config.download_dir)?,
})
}
pub fn load_or_default() -> Result<Self> {
let app_config = AppConfig::load_or_default()?;
Self::load_or_default_from(config_dir()?.join("daemon.toml"), &app_config)
}
pub fn load_or_default_from(path: impl AsRef<Path>, app_config: &AppConfig) -> Result<Self> {
let path = path.as_ref();
match std::fs::read_to_string(path) {
Ok(contents) => {
toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
}
Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
Self::from_app_config(app_config)
}
Err(source) => Err(Error::IoPath {
path: path.to_path_buf(),
source,
}),
}
}
pub fn save_default_path(&self) -> Result<()> {
self.save_to_path(config_dir()?.join("daemon.toml"))
}
pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
write_toml_file(path.as_ref(), self)
}
pub fn to_toml_string(&self) -> Result<String> {
toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TrustConfig {
pub require_fingerprint: bool,
pub psk: String,
}
impl Default for TrustConfig {
fn default() -> Self {
Self {
require_fingerprint: true,
psk: String::new(),
}
}
}
impl TrustConfig {
pub fn redacted(&self) -> Self {
let psk = if self.psk.is_empty() {
String::new()
} else {
"<redacted>".to_string()
};
Self {
require_fingerprint: self.require_fingerprint,
psk,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct NativeIdentity {
cert_der: Vec<u8>,
key_der: Vec<u8>,
fingerprint: String,
}
impl NativeIdentity {
pub fn load_or_generate() -> Result<Self> {
Self::load_or_generate_in(config_dir()?)
}
pub fn load_or_generate_in(config_dir: impl AsRef<Path>) -> Result<Self> {
let config_dir = config_dir.as_ref();
let cert_path = config_dir.join("identity.cert.der");
let key_path = config_dir.join("identity.key.der");
if cert_path.exists() && key_path.exists() {
let cert_der = std::fs::read(&cert_path).map_err(|source| Error::IoPath {
path: cert_path,
source,
})?;
let key_der = std::fs::read(&key_path).map_err(|source| Error::IoPath {
path: key_path,
source,
})?;
return Ok(Self::from_parts(cert_der, key_der));
}
std::fs::create_dir_all(config_dir).map_err(|source| Error::IoPath {
path: config_dir.to_path_buf(),
source,
})?;
let identity = Self::generate()?;
write_private_file(&cert_path, &identity.cert_der)?;
write_private_file(&key_path, &identity.key_der)?;
Ok(identity)
}
pub fn generate() -> Result<Self> {
let CertifiedKey { cert, signing_key } =
generate_simple_self_signed(vec!["localhost".to_string()])?;
Ok(Self::from_parts(
cert.der().to_vec(),
signing_key.serialize_der(),
))
}
pub fn fingerprint(&self) -> &str {
&self.fingerprint
}
fn cert_chain(&self) -> Vec<CertificateDer<'static>> {
vec![CertificateDer::from(self.cert_der.clone())]
}
fn private_key(&self) -> PrivateKeyDer<'static> {
PrivateKeyDer::from(PrivatePkcs8KeyDer::from(self.key_der.clone()))
}
fn from_parts(cert_der: Vec<u8>, key_der: Vec<u8>) -> Self {
let fingerprint = blake3::hash(&cert_der).to_hex().to_string();
Self {
cert_der,
key_der,
fingerprint,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum TransferDirection {
Send,
Receive,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TransferProgress {
pub direction: TransferDirection,
pub file_name: String,
pub bytes_done: u64,
pub bytes_total: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "event", rename_all = "snake_case")]
pub enum TransferEvent {
SessionStarted {
direction: TransferDirection,
session_id: Uuid,
files_total: usize,
bytes_total: u64,
},
FileStarted {
direction: TransferDirection,
file_name: String,
bytes_total: u64,
resume_offset: u64,
},
Progress {
direction: TransferDirection,
file_name: String,
bytes_done: u64,
bytes_total: u64,
},
FileFinished {
direction: TransferDirection,
file_name: String,
bytes: u64,
blake3: String,
status: TransferFileStatus,
},
SessionFinished {
direction: TransferDirection,
files_total: usize,
bytes_total: u64,
blake3: String,
},
SessionCancelled {
direction: TransferDirection,
session_id: Uuid,
},
}
impl TransferEvent {
pub fn progress(&self) -> Option<TransferProgress> {
match self {
Self::Progress {
direction,
file_name,
bytes_done,
bytes_total,
} => Some(TransferProgress {
direction: *direction,
file_name: file_name.clone(),
bytes_done: *bytes_done,
bytes_total: *bytes_total,
}),
_ => None,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct TransferControl {
cancelled: Arc<AtomicBool>,
}
impl TransferControl {
pub fn new() -> Self {
Self::default()
}
pub fn cancel(&self) {
self.cancelled.store(true, Ordering::SeqCst);
}
pub fn is_cancelled(&self) -> bool {
self.cancelled.load(Ordering::SeqCst)
}
fn check_cancelled(&self) -> Result<()> {
if self.is_cancelled() {
Err(Error::TransferCancelled)
} else {
Ok(())
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TransferSummary {
pub path: PathBuf,
pub file_name: String,
pub bytes: u64,
pub blake3: String,
pub files: Vec<TransferFileSummary>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TransferFileSummary {
pub path: PathBuf,
pub relative_path: PathBuf,
pub bytes: u64,
pub blake3: String,
pub status: TransferFileStatus,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum TransferFileStatus {
Sent,
Received,
Skipped,
Resumed,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TransferManifest {
pub version: u16,
pub session_id: Uuid,
pub chunk_size: u64,
pub entries: Vec<ManifestEntry>,
}
impl TransferManifest {
pub fn file_entries(&self) -> impl Iterator<Item = &ManifestEntry> {
self.entries
.iter()
.filter(|entry| entry.kind == ManifestEntryKind::File)
}
pub fn total_file_bytes(&self) -> u64 {
self.file_entries().map(|entry| entry.size).sum()
}
pub fn digest(&self) -> Result<String> {
let bytes = serde_json::to_vec(self).map_err(|error| Error::Protocol(error.to_string()))?;
Ok(blake3::hash(&bytes).to_hex().to_string())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ManifestEntry {
pub relative_path: PathBuf,
pub kind: ManifestEntryKind,
pub size: u64,
pub blake3: Option<String>,
pub chunks: Vec<String>,
pub unix_mode: Option<u32>,
pub modified_unix_ms: Option<i128>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ManifestEntryKind {
File,
Directory,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ResumeDecision {
Create,
Skip,
ResumeFrom(u64),
Conflict(String),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PeerObservation {
pub fingerprint: String,
pub alias: Option<String>,
pub hostname: Option<String>,
pub address: SocketAddr,
pub transports: BTreeSet<String>,
pub seen_unix_ms: i128,
}
impl PeerObservation {
pub fn new(fingerprint: impl Into<String>, address: SocketAddr, seen_unix_ms: i128) -> Self {
Self {
fingerprint: fingerprint.into(),
alias: None,
hostname: None,
address,
transports: BTreeSet::new(),
seen_unix_ms,
}
}
pub fn with_alias(mut self, alias: impl Into<String>) -> Self {
self.alias = Some(alias.into());
self
}
pub fn with_hostname(mut self, hostname: impl Into<String>) -> Self {
self.hostname = Some(hostname.into());
self
}
pub fn with_transport(mut self, transport: impl Into<String>) -> Self {
self.transports.insert(transport.into());
self
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PeerRecord {
pub fingerprint: String,
pub aliases: BTreeSet<String>,
pub hostnames: BTreeSet<String>,
pub addresses: BTreeSet<SocketAddr>,
pub transports: BTreeSet<String>,
pub first_seen_unix_ms: i128,
pub last_seen_unix_ms: i128,
}
impl PeerRecord {
fn from_observation(observation: PeerObservation) -> Self {
let mut aliases = BTreeSet::new();
if let Some(alias) = observation.alias {
aliases.insert(alias);
}
let mut hostnames = BTreeSet::new();
if let Some(hostname) = observation.hostname {
hostnames.insert(hostname);
}
let mut addresses = BTreeSet::new();
addresses.insert(observation.address);
Self {
fingerprint: observation.fingerprint,
aliases,
hostnames,
addresses,
transports: observation.transports,
first_seen_unix_ms: observation.seen_unix_ms,
last_seen_unix_ms: observation.seen_unix_ms,
}
}
fn merge(&mut self, observation: PeerObservation) {
if let Some(alias) = observation.alias {
self.aliases.insert(alias);
}
if let Some(hostname) = observation.hostname {
self.hostnames.insert(hostname);
}
self.addresses.insert(observation.address);
self.transports.extend(observation.transports);
self.first_seen_unix_ms = self.first_seen_unix_ms.min(observation.seen_unix_ms);
self.last_seen_unix_ms = self.last_seen_unix_ms.max(observation.seen_unix_ms);
}
pub fn preferred_quic_address(&self) -> Option<SocketAddr> {
if !self.transports.contains("quic") {
return None;
}
self.addresses.iter().copied().next()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum TrustState {
Trusted,
Blocked,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct KnownPeerEntry {
pub fingerprint: String,
#[serde(default)]
pub aliases: BTreeSet<String>,
#[serde(default)]
pub hostnames: BTreeSet<String>,
#[serde(default)]
pub addresses: BTreeSet<SocketAddr>,
#[serde(default)]
pub transports: BTreeSet<String>,
pub trust_state: TrustState,
pub first_seen_unix_ms: Option<i128>,
pub last_seen_unix_ms: Option<i128>,
}
impl KnownPeerEntry {
pub fn trusted(fingerprint: impl Into<String>) -> Result<Self> {
let fingerprint = normalized_fingerprint(fingerprint.into())?;
Ok(Self {
fingerprint,
aliases: BTreeSet::new(),
hostnames: BTreeSet::new(),
addresses: BTreeSet::new(),
transports: BTreeSet::new(),
trust_state: TrustState::Trusted,
first_seen_unix_ms: None,
last_seen_unix_ms: None,
})
}
pub fn from_record(record: &PeerRecord, trust_state: TrustState) -> Result<Self> {
let fingerprint = normalized_fingerprint(record.fingerprint.clone())?;
Ok(Self {
fingerprint,
aliases: record.aliases.clone(),
hostnames: record.hostnames.clone(),
addresses: record.addresses.clone(),
transports: record.transports.clone(),
trust_state,
first_seen_unix_ms: Some(record.first_seen_unix_ms),
last_seen_unix_ms: Some(record.last_seen_unix_ms),
})
}
}
#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TrustStore {
#[serde(default)]
pub peers: BTreeMap<String, KnownPeerEntry>,
}
impl TrustStore {
pub fn load_or_default() -> Result<Self> {
Self::load_or_default_from(config_dir()?.join("known_peers.toml"))
}
pub fn load_or_default_from(path: impl AsRef<Path>) -> Result<Self> {
let path = path.as_ref();
match std::fs::read_to_string(path) {
Ok(contents) => {
toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
}
Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Self::default()),
Err(source) => Err(Error::IoPath {
path: path.to_path_buf(),
source,
}),
}
}
pub fn save_default_path(&self) -> Result<()> {
self.save_to_path(config_dir()?.join("known_peers.toml"))
}
pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
write_toml_file(path.as_ref(), self)
}
pub fn trust_fingerprint(&mut self, fingerprint: impl Into<String>) -> Result<&KnownPeerEntry> {
let entry = KnownPeerEntry::trusted(fingerprint)?;
let fingerprint = entry.fingerprint.clone();
self.peers
.entry(fingerprint.clone())
.and_modify(|existing| existing.trust_state = TrustState::Trusted)
.or_insert(entry);
Ok(self
.peers
.get(&fingerprint)
.expect("trusted peer entry should exist"))
}
pub fn trust_record(&mut self, record: &PeerRecord) -> Result<&KnownPeerEntry> {
let entry = KnownPeerEntry::from_record(record, TrustState::Trusted)?;
let fingerprint = entry.fingerprint.clone();
self.peers
.entry(fingerprint.clone())
.and_modify(|existing| {
existing.aliases.extend(record.aliases.clone());
existing.hostnames.extend(record.hostnames.clone());
existing.addresses.extend(record.addresses.clone());
existing.transports.extend(record.transports.clone());
existing.first_seen_unix_ms = Some(
existing
.first_seen_unix_ms
.unwrap_or(record.first_seen_unix_ms)
.min(record.first_seen_unix_ms),
);
existing.last_seen_unix_ms = Some(
existing
.last_seen_unix_ms
.unwrap_or(record.last_seen_unix_ms)
.max(record.last_seen_unix_ms),
);
existing.trust_state = TrustState::Trusted;
})
.or_insert(entry);
Ok(self
.peers
.get(&fingerprint)
.expect("trusted peer entry should exist"))
}
pub fn trust_direct_address(
&mut self,
fingerprint: impl Into<String>,
address: SocketAddr,
) -> Result<&KnownPeerEntry> {
let mut entry = KnownPeerEntry::trusted(fingerprint)?;
let seen = system_time_unix_ms(SystemTime::now()).unwrap_or_default();
entry.addresses.insert(address);
entry.transports.insert("quic".to_string());
entry.first_seen_unix_ms = Some(seen);
entry.last_seen_unix_ms = Some(seen);
let fingerprint = entry.fingerprint.clone();
self.peers
.entry(fingerprint.clone())
.and_modify(|existing| {
existing.addresses.insert(address);
existing.transports.insert("quic".to_string());
existing.first_seen_unix_ms =
Some(existing.first_seen_unix_ms.unwrap_or(seen).min(seen));
existing.last_seen_unix_ms =
Some(existing.last_seen_unix_ms.unwrap_or(seen).max(seen));
existing.trust_state = TrustState::Trusted;
})
.or_insert(entry);
Ok(self
.peers
.get(&fingerprint)
.expect("trusted peer entry should exist"))
}
pub fn is_trusted_fingerprint(&self, fingerprint: &str) -> bool {
self.peers
.get(fingerprint)
.is_some_and(|entry| entry.trust_state == TrustState::Trusted)
}
pub fn trusted_fingerprint_for_address(&self, address: SocketAddr) -> Option<&str> {
self.peers
.values()
.find(|entry| {
entry.trust_state == TrustState::Trusted && entry.addresses.contains(&address)
})
.map(|entry| entry.fingerprint.as_str())
}
pub fn forget(&mut self, fingerprint: &str) -> bool {
self.peers.remove(fingerprint).is_some()
}
pub fn records(&self) -> impl Iterator<Item = &KnownPeerEntry> {
self.peers.values()
}
pub fn to_toml_string(&self) -> Result<String> {
toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
}
}
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct PeerRegistry {
peers: BTreeMap<String, PeerRecord>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PeerLookup<'a> {
Found(&'a PeerRecord),
Ambiguous(Vec<&'a PeerRecord>),
Missing,
}
impl PeerRegistry {
pub fn new() -> Self {
Self::default()
}
pub fn observe(&mut self, observation: PeerObservation) -> Result<&PeerRecord> {
if observation.fingerprint.trim().is_empty() {
return Err(Error::InvalidInput(
"peer fingerprint must not be empty".to_string(),
));
}
let fingerprint = observation.fingerprint.clone();
if let Some(record) = self.peers.get_mut(&fingerprint) {
record.merge(observation);
} else {
self.peers.insert(
fingerprint.clone(),
PeerRecord::from_observation(observation),
);
}
Ok(self
.peers
.get(&fingerprint)
.expect("observed peer record should exist"))
}
pub fn get_by_fingerprint(&self, fingerprint: &str) -> Option<&PeerRecord> {
self.peers.get(fingerprint)
}
pub fn lookup(&self, query: &str) -> Option<&PeerRecord> {
match self.lookup_detail(query) {
PeerLookup::Found(record) => Some(record),
PeerLookup::Ambiguous(_) | PeerLookup::Missing => None,
}
}
pub fn lookup_detail(&self, query: &str) -> PeerLookup<'_> {
if let Some(record) = self.peers.get(query) {
return PeerLookup::Found(record);
}
let matches = self
.peers
.values()
.filter(|record| {
record.fingerprint.starts_with(query)
|| record.aliases.iter().any(|alias| alias == query)
|| record.hostnames.iter().any(|hostname| hostname == query)
})
.collect::<Vec<_>>();
match matches.as_slice() {
[] => PeerLookup::Missing,
[record] => PeerLookup::Found(record),
_ => PeerLookup::Ambiguous(matches),
}
}
pub fn records(&self) -> impl Iterator<Item = &PeerRecord> {
self.peers.values()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NativeAnnouncement {
pub alias: String,
pub fingerprint: String,
pub quic_port: u16,
pub listen_port: Option<u16>,
}
impl NativeAnnouncement {
pub fn from_config(config: &AppConfig, identity: &NativeIdentity) -> Self {
Self {
alias: config.alias.clone(),
fingerprint: identity.fingerprint().to_string(),
quic_port: config.quic_port,
listen_port: Some(config.listen_port),
}
}
fn service_info(&self) -> Result<ServiceInfo> {
let alias = sanitize_dns_label(&self.alias);
let fingerprint_prefix = self
.fingerprint
.get(..12)
.unwrap_or(self.fingerprint.as_str());
let instance = format!("{alias}-{fingerprint_prefix}");
let hostname = format!("{alias}.local.");
let properties = [
("pv", NATIVE_PROTOCOL_VERSION.to_string()),
("minpv", MIN_NATIVE_PROTOCOL_VERSION.to_string()),
("fp", self.fingerprint.clone()),
("alias", self.alias.clone()),
("transports", "quic".to_string()),
("quic_port", self.quic_port.to_string()),
(
"listen_port",
self.listen_port
.map(|port| port.to_string())
.unwrap_or_default(),
),
];
ServiceInfo::new(
NATIVE_SERVICE_TYPE,
&instance,
&hostname,
"",
self.quic_port,
&properties[..],
)
.map(ServiceInfo::enable_addr_auto)
.map_err(|error| Error::Discovery(error.to_string()))
}
}
pub struct NativeAnnouncer {
daemon: ServiceDaemon,
fullname: String,
}
impl NativeAnnouncer {
pub fn start(announcement: &NativeAnnouncement) -> Result<Self> {
let daemon = ServiceDaemon::new().map_err(|error| Error::Discovery(error.to_string()))?;
let service = announcement.service_info()?;
let fullname = service.get_fullname().to_string();
daemon
.register(service)
.map_err(|error| Error::Discovery(error.to_string()))?;
Ok(Self { daemon, fullname })
}
}
impl Drop for NativeAnnouncer {
fn drop(&mut self) {
let _ = self.daemon.unregister(&self.fullname);
let _ = self.daemon.shutdown();
}
}
pub async fn discover_native_peers(timeout: Duration) -> Result<PeerRegistry> {
let daemon = ServiceDaemon::new().map_err(|error| Error::Discovery(error.to_string()))?;
let receiver = daemon
.browse(NATIVE_SERVICE_TYPE)
.map_err(|error| Error::Discovery(error.to_string()))?;
let deadline = tokio::time::Instant::now() + timeout;
let mut registry = PeerRegistry::new();
loop {
let now = tokio::time::Instant::now();
if now >= deadline {
break;
}
let wait = deadline - now;
match tokio::time::timeout(wait, receiver.recv_async()).await {
Ok(Ok(ServiceEvent::ServiceResolved(service))) => {
observe_resolved_service(&mut registry, &service)?;
}
Ok(Ok(_)) => {}
Ok(Err(error)) => {
return Err(Error::Discovery(error.to_string()));
}
Err(_) => break,
}
}
daemon
.stop_browse(NATIVE_SERVICE_TYPE)
.map_err(|error| Error::Discovery(error.to_string()))?;
let _ = daemon.shutdown();
Ok(registry)
}
fn observe_resolved_service(registry: &mut PeerRegistry, service: &ResolvedService) -> Result<()> {
let Some(fingerprint) = service.get_property_val_str("fp") else {
return Ok(());
};
let fingerprint = match normalized_fingerprint(fingerprint.to_string()) {
Ok(fingerprint) => fingerprint,
Err(_) => return Ok(()),
};
let Some(highest_version) = parse_txt_u16(service, "pv") else {
return Ok(());
};
let Some(minimum_version) = parse_txt_u16(service, "minpv") else {
return Ok(());
};
if minimum_version > highest_version
|| minimum_version > NATIVE_PROTOCOL_VERSION
|| highest_version < MIN_NATIVE_PROTOCOL_VERSION
{
return Ok(());
}
let Some(quic_port) = parse_txt_u16(service, "quic_port") else {
return Ok(());
};
let transports = service
.get_property_val_str("transports")
.unwrap_or_default()
.split(',')
.map(str::trim)
.filter(|value| !value.is_empty())
.map(ToOwned::to_owned)
.collect::<Vec<_>>();
if !transports.iter().any(|transport| transport == "quic") {
return Ok(());
}
let alias = service.get_property_val_str("alias").map(ToOwned::to_owned);
let hostname = Some(service.get_hostname().trim_end_matches('.').to_string());
let seen_unix_ms = system_time_unix_ms(SystemTime::now()).unwrap_or_default();
for address in service.get_addresses_v4() {
let mut observation = PeerObservation::new(
fingerprint.clone(),
SocketAddr::from((address, quic_port)),
seen_unix_ms,
);
if let Some(alias) = &alias {
observation = observation.with_alias(alias.clone());
}
if let Some(hostname) = &hostname {
observation = observation.with_hostname(hostname.clone());
}
for transport in &transports {
observation = observation.with_transport(transport.clone());
}
registry.observe(observation)?;
}
Ok(())
}
fn parse_txt_u16(service: &ResolvedService, key: &str) -> Option<u16> {
service.get_property_val_str(key)?.parse().ok()
}
fn sanitize_dns_label(value: &str) -> String {
let mut label = value
.chars()
.map(|ch| {
if ch.is_ascii_alphanumeric() || ch == '-' {
ch.to_ascii_lowercase()
} else {
'-'
}
})
.collect::<String>();
while label.contains("--") {
label = label.replace("--", "-");
}
let label = label.trim_matches('-').to_string();
if label.is_empty() {
"fileferry-device".to_string()
} else {
label
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct DirectReceivePlan {
files: Vec<DirectFilePlan>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct DirectProtocolHello {
protocol_version: u16,
min_protocol_version: u16,
client_fingerprint: String,
#[serde(default)]
psk: bool,
}
impl DirectProtocolHello {
fn new(identity: &NativeIdentity, psk: Option<&str>) -> Self {
Self {
protocol_version: NATIVE_PROTOCOL_VERSION,
min_protocol_version: MIN_NATIVE_PROTOCOL_VERSION,
client_fingerprint: identity.fingerprint().to_string(),
psk: psk.is_some(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct DirectProtocolResponse {
accepted: bool,
protocol_version: Option<u16>,
#[serde(default)]
psk_challenge: Option<String>,
error: Option<String>,
}
impl DirectProtocolResponse {
fn accept(protocol_version: u16) -> Self {
Self {
accepted: true,
protocol_version: Some(protocol_version),
psk_challenge: None,
error: None,
}
}
fn accept_with_psk(protocol_version: u16, challenge: impl Into<String>) -> Self {
Self {
accepted: true,
protocol_version: Some(protocol_version),
psk_challenge: Some(challenge.into()),
error: None,
}
}
fn reject(error: impl Into<String>) -> Self {
Self {
accepted: false,
protocol_version: None,
psk_challenge: None,
error: Some(error.into()),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct DirectPskProof {
proof: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct DirectPskResult {
accepted: bool,
error: Option<String>,
}
impl DirectPskResult {
fn accept() -> Self {
Self {
accepted: true,
error: None,
}
}
fn reject(error: impl Into<String>) -> Self {
Self {
accepted: false,
error: Some(error.into()),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct DirectFilePlan {
relative_path: PathBuf,
action: DirectFileAction,
offset: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
enum DirectFileAction {
Receive,
Skip,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct DirectPayloadHeader {
relative_path: PathBuf,
offset: u64,
bytes: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct DirectAck {
ok: bool,
error: Option<String>,
}
pub fn validate_send_paths(paths: &[PathBuf]) -> Result<()> {
if paths.is_empty() {
return Err(Error::InvalidInput(
"at least one file path is required".to_string(),
));
}
for path in paths {
if path.as_os_str().is_empty() {
return Err(Error::InvalidInput("empty file path".to_string()));
}
}
Ok(())
}
pub fn display_path(path: &Path) -> String {
path.to_string_lossy().into_owned()
}
pub async fn build_manifest(paths: &[PathBuf]) -> Result<TransferManifest> {
validate_send_paths(paths)?;
let sources = collect_manifest_sources(paths)?;
let mut entries = Vec::with_capacity(sources.len());
let mut seen = BTreeSet::new();
for source in sources {
if !seen.insert(source.relative_path.clone()) {
return Err(Error::InvalidInput(format!(
"duplicate transfer path: {}",
display_path(&source.relative_path)
)));
}
entries.push(build_manifest_entry(source).await?);
}
Ok(TransferManifest {
version: MANIFEST_VERSION,
session_id: Uuid::new_v4(),
chunk_size: DEFAULT_CHUNK_SIZE,
entries,
})
}
pub fn safe_destination_path(destination: &Path, relative_path: &Path) -> Result<PathBuf> {
validate_relative_transfer_path(relative_path)?;
Ok(destination.join(relative_path))
}
pub async fn decide_resume(
destination: &Path,
entry: &ManifestEntry,
chunk_size: u64,
) -> Result<ResumeDecision> {
if entry.kind != ManifestEntryKind::File {
return Ok(ResumeDecision::Create);
}
let path = safe_destination_path(destination, &entry.relative_path)?;
let metadata = match tokio::fs::metadata(&path).await {
Ok(metadata) => metadata,
Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
return Ok(ResumeDecision::Create);
}
Err(source) => {
return Err(Error::IoPath { path, source });
}
};
if !metadata.is_file() {
return Ok(ResumeDecision::Conflict(format!(
"{} already exists and is not a regular file",
display_path(&path)
)));
}
if metadata.len() == entry.size {
let expected = entry
.blake3
.as_deref()
.ok_or_else(|| Error::Protocol("file manifest entry is missing BLAKE3".to_string()))?;
let actual = hash_file(&path).await?;
return if actual == expected {
Ok(ResumeDecision::Skip)
} else {
Ok(ResumeDecision::Conflict(format!(
"{} already exists with different contents",
display_path(&path)
)))
};
}
if metadata.len() > entry.size {
return Ok(ResumeDecision::Conflict(format!(
"{} is larger than incoming file",
display_path(&path)
)));
}
let verified_offset = verified_prefix_offset(&path, entry, metadata.len(), chunk_size).await?;
if verified_offset > 0 {
Ok(ResumeDecision::ResumeFrom(verified_offset))
} else {
Ok(ResumeDecision::Conflict(format!(
"{} already exists and does not match a verified prefix",
display_path(&path)
)))
}
}
pub async fn send_direct_file(
request: &SendRequest,
identity: &NativeIdentity,
mut events: impl FnMut(TransferEvent),
) -> Result<TransferSummary> {
send_direct_file_with_control(request, identity, TransferControl::new(), &mut events).await
}
pub async fn send_direct_file_with_control(
request: &SendRequest,
identity: &NativeIdentity,
control: TransferControl,
mut events: impl FnMut(TransferEvent),
) -> Result<TransferSummary> {
send_direct_file_with_control_and_trust(request, identity, control, &mut events, |_| Ok(()))
.await
}
pub async fn send_direct_file_with_control_and_trust(
request: &SendRequest,
identity: &NativeIdentity,
control: TransferControl,
mut events: impl FnMut(TransferEvent),
mut confirm_unpinned_receiver: impl FnMut(&str) -> Result<()>,
) -> Result<TransferSummary> {
ensure_rustls_provider();
control.check_cancelled()?;
let manifest = build_manifest(request.paths()).await?;
let sources = manifest_source_map(request.paths())?;
events(TransferEvent::SessionStarted {
direction: TransferDirection::Send,
session_id: manifest.session_id,
files_total: manifest.file_entries().count(),
bytes_total: manifest.total_file_bytes(),
});
control.check_cancelled()?;
let (client_config, server_fingerprint) = client_config_capturing_server_fingerprint()?;
let mut endpoint = quinn::Endpoint::client(SocketAddr::from(([0, 0, 0, 0], 0)))?;
endpoint.set_default_client_config(client_config);
let connection = endpoint
.connect(request.peer().address(), "localhost")?
.await?;
let actual = captured_server_fingerprint(&server_fingerprint)?;
match request.peer().expected_fingerprint() {
Some(expected) if actual != expected => {
connection.close(1u32.into(), b"fingerprint mismatch");
return Err(Error::PeerFingerprintMismatch {
expected: expected.to_string(),
actual,
});
}
Some(_) => {}
None => {
if let Err(error) = confirm_unpinned_receiver(&actual) {
connection.close(1u32.into(), b"receiver fingerprint not trusted");
return Err(error);
}
}
}
let (mut send, mut recv) = connection.open_bi().await?;
send.write_all(DIRECT_MAGIC).await?;
write_json_frame(
&mut send,
&DirectProtocolHello::new(identity, request.psk()),
)
.await?;
let protocol: DirectProtocolResponse = read_json_frame(&mut recv).await?;
if !protocol.accepted {
return Err(Error::Protocol(protocol.error.unwrap_or_else(|| {
"receiver rejected protocol negotiation".to_string()
})));
}
if protocol.protocol_version != Some(NATIVE_PROTOCOL_VERSION) {
return Err(Error::Protocol(format!(
"receiver selected unsupported protocol version {:?}",
protocol.protocol_version
)));
}
match (request.psk(), protocol.psk_challenge.as_deref()) {
(Some(psk), Some(challenge)) => {
let proof = DirectPskProof {
proof: psk_proof(
psk,
challenge,
identity.fingerprint(),
NATIVE_PROTOCOL_VERSION,
)?,
};
write_json_frame(&mut send, &proof).await?;
let auth: DirectPskResult = read_json_frame(&mut recv).await?;
if !auth.accepted {
return Err(Error::Authentication(
auth.error
.unwrap_or_else(|| "PSK authentication rejected".to_string()),
));
}
}
(Some(_), None) => {
return Err(Error::Authentication(
"receiver is not configured for PSK mode".to_string(),
));
}
(None, Some(_)) => {
return Err(Error::Authentication(
"receiver requires PSK authentication".to_string(),
));
}
(None, None) => {}
}
write_json_frame(&mut send, &manifest).await?;
let plan: DirectReceivePlan = read_json_frame(&mut recv).await?;
let mut summaries = Vec::new();
for file_plan in plan.files {
let Some(entry) = manifest
.file_entries()
.find(|entry| entry.relative_path == file_plan.relative_path)
else {
return Err(Error::Protocol(format!(
"receiver requested unknown file: {}",
display_path(&file_plan.relative_path)
)));
};
let Some(source_path) = sources.get(&file_plan.relative_path) else {
return Err(Error::Protocol(format!(
"missing source for manifest file: {}",
display_path(&file_plan.relative_path)
)));
};
if file_plan.action == DirectFileAction::Skip {
let blake3 = entry.blake3.clone().unwrap_or_default();
events(TransferEvent::FileFinished {
direction: TransferDirection::Send,
file_name: display_path(&entry.relative_path),
bytes: entry.size,
blake3: blake3.clone(),
status: TransferFileStatus::Skipped,
});
summaries.push(TransferFileSummary {
path: source_path.clone(),
relative_path: entry.relative_path.clone(),
bytes: entry.size,
blake3,
status: TransferFileStatus::Skipped,
});
continue;
}
match send_payload_file(
&mut send,
source_path,
entry,
file_plan.offset,
&control,
&mut events,
)
.await
{
Ok(()) => {}
Err(Error::TransferCancelled) => {
events(TransferEvent::SessionCancelled {
direction: TransferDirection::Send,
session_id: manifest.session_id,
});
connection.close(1u32.into(), b"cancelled");
return Err(Error::TransferCancelled);
}
Err(error) => return Err(error),
}
let status = if file_plan.offset > 0 {
TransferFileStatus::Resumed
} else {
TransferFileStatus::Sent
};
let blake3 = entry.blake3.clone().unwrap_or_default();
events(TransferEvent::FileFinished {
direction: TransferDirection::Send,
file_name: display_path(&entry.relative_path),
bytes: entry.size,
blake3: blake3.clone(),
status,
});
summaries.push(TransferFileSummary {
path: source_path.clone(),
relative_path: entry.relative_path.clone(),
bytes: entry.size,
blake3,
status,
});
}
send.finish()?;
let ack: DirectAck = read_json_frame(&mut recv).await?;
if !ack.ok {
return Err(Error::Transfer(
ack.error
.unwrap_or_else(|| "receiver rejected transfer".to_string()),
));
}
connection.close(0u32.into(), b"done");
endpoint.wait_idle().await;
let summary = summary_from_files(&manifest, summaries)?;
events(TransferEvent::SessionFinished {
direction: TransferDirection::Send,
files_total: summary.files.len(),
bytes_total: summary.bytes,
blake3: summary.blake3.clone(),
});
Ok(summary)
}
pub async fn receive_direct_file(
request: &ReceiveRequest,
destination: impl AsRef<Path>,
identity: &NativeIdentity,
events: impl FnMut(TransferEvent),
) -> Result<TransferSummary> {
receive_direct_file_with_control(
request,
destination,
identity,
TransferControl::new(),
events,
)
.await
}
pub async fn receive_direct_file_with_control(
request: &ReceiveRequest,
destination: impl AsRef<Path>,
identity: &NativeIdentity,
control: TransferControl,
mut events: impl FnMut(TransferEvent),
) -> Result<TransferSummary> {
ensure_rustls_provider();
control.check_cancelled()?;
let destination = destination.as_ref();
tokio::fs::create_dir_all(destination)
.await
.map_err(|source| Error::IoPath {
path: destination.to_path_buf(),
source,
})?;
let server_config =
quinn::ServerConfig::with_single_cert(identity.cert_chain(), identity.private_key())?;
let endpoint = quinn::Endpoint::server(server_config, request.listen())?;
let incoming = endpoint.accept().await.ok_or(Error::NoIncomingConnection)?;
let connection = incoming.await?;
let (mut send, mut recv) = connection.accept_bi().await?;
let result = receive_stream_file(
destination,
request.psk(),
&mut recv,
&mut send,
&control,
&mut events,
)
.await;
let ack = match &result {
Ok(_) => DirectAck {
ok: true,
error: None,
},
Err(error) => DirectAck {
ok: false,
error: Some(error.to_string()),
},
};
write_json_frame(&mut send, &ack).await?;
send.finish()?;
let _ = tokio::time::timeout(Duration::from_secs(1), connection.closed()).await;
endpoint.close(0u32.into(), b"done");
endpoint.wait_idle().await;
result
}
async fn receive_stream_file(
destination: &Path,
psk: Option<&str>,
recv: &mut quinn::RecvStream,
send: &mut quinn::SendStream,
control: &TransferControl,
events: &mut impl FnMut(TransferEvent),
) -> Result<TransferSummary> {
let mut magic = [0; DIRECT_MAGIC.len()];
recv.read_exact(&mut magic).await?;
if &magic != DIRECT_MAGIC {
return Err(Error::Protocol("bad direct-transfer magic".to_string()));
}
let hello: DirectProtocolHello = read_json_frame(recv).await?;
let protocol = negotiate_direct_protocol(&hello, psk);
write_json_frame(send, &protocol).await?;
if !protocol.accepted {
return Err(Error::Protocol(protocol.error.unwrap_or_else(|| {
"unsupported direct protocol version".to_string()
})));
}
if let (Some(psk), Some(challenge)) = (psk, protocol.psk_challenge.as_deref()) {
let proof: DirectPskProof = read_json_frame(recv).await?;
let expected = psk_proof(
psk,
challenge,
&hello.client_fingerprint,
protocol.protocol_version.unwrap_or(NATIVE_PROTOCOL_VERSION),
)?;
if proof.proof != expected {
write_json_frame(send, &DirectPskResult::reject("PSK proof mismatch")).await?;
return Err(Error::Authentication("PSK proof mismatch".to_string()));
}
write_json_frame(send, &DirectPskResult::accept()).await?;
}
let manifest: TransferManifest = read_json_frame(recv).await?;
validate_manifest(&manifest)?;
events(TransferEvent::SessionStarted {
direction: TransferDirection::Receive,
session_id: manifest.session_id,
files_total: manifest.file_entries().count(),
bytes_total: manifest.total_file_bytes(),
});
control.check_cancelled()?;
for entry in &manifest.entries {
if entry.kind == ManifestEntryKind::Directory {
let path = safe_destination_path(destination, &entry.relative_path)?;
tokio::fs::create_dir_all(&path)
.await
.map_err(|source| Error::IoPath { path, source })?;
}
}
let mut plan = DirectReceivePlan { files: Vec::new() };
for entry in manifest.file_entries() {
let decision = decide_resume(destination, entry, manifest.chunk_size).await?;
let (action, offset) = match decision {
ResumeDecision::Create => (DirectFileAction::Receive, 0),
ResumeDecision::Skip => (DirectFileAction::Skip, entry.size),
ResumeDecision::ResumeFrom(offset) => (DirectFileAction::Receive, offset),
ResumeDecision::Conflict(message) => return Err(Error::InvalidInput(message)),
};
plan.files.push(DirectFilePlan {
relative_path: entry.relative_path.clone(),
action,
offset,
});
}
write_json_frame(send, &plan).await?;
let mut summaries = Vec::new();
for file_plan in &plan.files {
let Some(entry) = manifest
.file_entries()
.find(|entry| entry.relative_path == file_plan.relative_path)
else {
return Err(Error::Protocol(format!(
"planned file missing from manifest: {}",
display_path(&file_plan.relative_path)
)));
};
let final_path = safe_destination_path(destination, &entry.relative_path)?;
if file_plan.action == DirectFileAction::Skip {
let blake3 = entry.blake3.clone().unwrap_or_default();
events(TransferEvent::FileFinished {
direction: TransferDirection::Receive,
file_name: display_path(&entry.relative_path),
bytes: entry.size,
blake3: blake3.clone(),
status: TransferFileStatus::Skipped,
});
summaries.push(TransferFileSummary {
path: final_path,
relative_path: entry.relative_path.clone(),
bytes: entry.size,
blake3,
status: TransferFileStatus::Skipped,
});
continue;
}
match receive_payload_file(recv, destination, entry, file_plan.offset, control, events)
.await
{
Ok(()) => {}
Err(Error::TransferCancelled) => {
events(TransferEvent::SessionCancelled {
direction: TransferDirection::Receive,
session_id: manifest.session_id,
});
return Err(Error::TransferCancelled);
}
Err(error) => return Err(error),
}
let actual_hash = hash_file(&final_path).await?;
let expected_hash = entry
.blake3
.as_deref()
.ok_or_else(|| Error::Protocol("file manifest entry is missing BLAKE3".to_string()))?;
if actual_hash != expected_hash {
return Err(Error::Transfer(format!(
"BLAKE3 hash mismatch for {}",
display_path(&entry.relative_path)
)));
}
let status = if file_plan.offset > 0 {
TransferFileStatus::Resumed
} else {
TransferFileStatus::Received
};
events(TransferEvent::FileFinished {
direction: TransferDirection::Receive,
file_name: display_path(&entry.relative_path),
bytes: entry.size,
blake3: actual_hash.clone(),
status,
});
summaries.push(TransferFileSummary {
path: final_path,
relative_path: entry.relative_path.clone(),
bytes: entry.size,
blake3: actual_hash,
status,
});
}
let summary = summary_from_files(&manifest, summaries)?;
events(TransferEvent::SessionFinished {
direction: TransferDirection::Receive,
files_total: summary.files.len(),
bytes_total: summary.bytes,
blake3: summary.blake3.clone(),
});
Ok(summary)
}
async fn send_payload_file(
send: &mut quinn::SendStream,
source_path: &Path,
entry: &ManifestEntry,
offset: u64,
control: &TransferControl,
events: &mut impl FnMut(TransferEvent),
) -> Result<()> {
if offset > entry.size {
return Err(Error::Protocol(format!(
"resume offset exceeds file size for {}",
display_path(&entry.relative_path)
)));
}
let header = DirectPayloadHeader {
relative_path: entry.relative_path.clone(),
offset,
bytes: entry.size - offset,
};
write_json_frame(send, &header).await?;
events(TransferEvent::FileStarted {
direction: TransferDirection::Send,
file_name: display_path(&entry.relative_path),
bytes_total: entry.size,
resume_offset: offset,
});
let mut file = tokio::fs::File::open(source_path)
.await
.map_err(|source| Error::IoPath {
path: source_path.to_path_buf(),
source,
})?;
file.seek(SeekFrom::Start(offset))
.await
.map_err(|source| Error::IoPath {
path: source_path.to_path_buf(),
source,
})?;
let mut buffer = vec![0; IO_BUFFER_SIZE];
let mut bytes_done = offset;
while bytes_done < entry.size {
control.check_cancelled()?;
let remaining = entry.size - bytes_done;
let read_size = buffer.len().min(remaining as usize);
let read = file
.read(&mut buffer[..read_size])
.await
.map_err(|source| Error::IoPath {
path: source_path.to_path_buf(),
source,
})?;
if read == 0 {
return Err(Error::Protocol(format!(
"source file ended early: {}",
display_path(source_path)
)));
}
send.write_all(&buffer[..read]).await?;
bytes_done += read as u64;
events(TransferEvent::Progress {
direction: TransferDirection::Send,
file_name: display_path(&entry.relative_path),
bytes_done,
bytes_total: entry.size,
});
control.check_cancelled()?;
}
Ok(())
}
async fn receive_payload_file(
recv: &mut quinn::RecvStream,
destination: &Path,
entry: &ManifestEntry,
offset: u64,
control: &TransferControl,
events: &mut impl FnMut(TransferEvent),
) -> Result<()> {
let header: DirectPayloadHeader = read_json_frame(recv).await?;
if header.relative_path != entry.relative_path
|| header.offset != offset
|| header.bytes != entry.size - offset
{
return Err(Error::Protocol(format!(
"payload header does not match receive plan for {}",
display_path(&entry.relative_path)
)));
}
events(TransferEvent::FileStarted {
direction: TransferDirection::Receive,
file_name: display_path(&entry.relative_path),
bytes_total: entry.size,
resume_offset: offset,
});
let final_path = safe_destination_path(destination, &entry.relative_path)?;
if let Some(parent) = final_path.parent() {
tokio::fs::create_dir_all(parent)
.await
.map_err(|source| Error::IoPath {
path: parent.to_path_buf(),
source,
})?;
}
let write_path = if offset == 0 {
temp_path_for(&final_path)
} else {
final_path.clone()
};
let mut file = if offset == 0 {
tokio::fs::File::create(&write_path)
.await
.map_err(|source| Error::IoPath {
path: write_path.clone(),
source,
})?
} else {
let mut file = tokio::fs::OpenOptions::new()
.write(true)
.open(&write_path)
.await
.map_err(|source| Error::IoPath {
path: write_path.clone(),
source,
})?;
file.set_len(offset).await.map_err(|source| Error::IoPath {
path: write_path.clone(),
source,
})?;
file.seek(SeekFrom::Start(offset))
.await
.map_err(|source| Error::IoPath {
path: write_path.clone(),
source,
})?;
file
};
let mut bytes_done = offset;
let mut bytes_remaining = header.bytes;
let mut buffer = vec![0; IO_BUFFER_SIZE];
while bytes_remaining > 0 {
if control.is_cancelled() {
if offset == 0 {
let _ = tokio::fs::remove_file(&write_path).await;
}
return Err(Error::TransferCancelled);
}
let read_size = buffer.len().min(bytes_remaining as usize);
let read = match recv.read(&mut buffer[..read_size]).await {
Ok(Some(read)) => read,
Ok(None) => {
if offset == 0 {
let _ = tokio::fs::remove_file(&write_path).await;
}
return Err(Error::Protocol(
"stream ended before file payload".to_string(),
));
}
Err(error) => {
if offset == 0 {
let _ = tokio::fs::remove_file(&write_path).await;
}
return Err(Error::Read(error));
}
};
file.write_all(&buffer[..read])
.await
.map_err(|source| Error::IoPath {
path: write_path.clone(),
source,
})?;
bytes_done += read as u64;
bytes_remaining -= read as u64;
events(TransferEvent::Progress {
direction: TransferDirection::Receive,
file_name: display_path(&entry.relative_path),
bytes_done,
bytes_total: entry.size,
});
if control.is_cancelled() {
if offset == 0 {
let _ = tokio::fs::remove_file(&write_path).await;
}
return Err(Error::TransferCancelled);
}
}
file.flush().await.map_err(|source| Error::IoPath {
path: write_path.clone(),
source,
})?;
drop(file);
if offset == 0 {
tokio::fs::rename(&write_path, &final_path)
.await
.map_err(|source| Error::IoPath {
path: final_path,
source,
})?;
}
Ok(())
}
async fn write_json_frame<T: Serialize>(send: &mut quinn::SendStream, value: &T) -> Result<()> {
let bytes = serde_json::to_vec(value).map_err(|error| Error::Protocol(error.to_string()))?;
if bytes.len() > MAX_FRAME_LEN {
return Err(Error::Protocol("frame exceeds maximum length".to_string()));
}
send.write_all(&(bytes.len() as u32).to_be_bytes()).await?;
send.write_all(&bytes).await?;
Ok(())
}
async fn read_json_frame<T: for<'de> Deserialize<'de>>(recv: &mut quinn::RecvStream) -> Result<T> {
let mut len = [0; 4];
recv.read_exact(&mut len).await?;
let len = u32::from_be_bytes(len) as usize;
if len > MAX_FRAME_LEN {
return Err(Error::Protocol("frame exceeds maximum length".to_string()));
}
let mut bytes = vec![0; len];
recv.read_exact(&mut bytes).await?;
serde_json::from_slice(&bytes).map_err(|error| Error::Protocol(error.to_string()))
}
fn negotiate_direct_protocol(
hello: &DirectProtocolHello,
psk: Option<&str>,
) -> DirectProtocolResponse {
if hello.min_protocol_version > NATIVE_PROTOCOL_VERSION {
return DirectProtocolResponse::reject(format!(
"peer requires protocol version {}, local max is {}",
hello.min_protocol_version, NATIVE_PROTOCOL_VERSION
));
}
if hello.protocol_version < MIN_NATIVE_PROTOCOL_VERSION {
return DirectProtocolResponse::reject(format!(
"peer max protocol version {} is below local minimum {}",
hello.protocol_version, MIN_NATIVE_PROTOCOL_VERSION
));
}
let selected_version = NATIVE_PROTOCOL_VERSION.min(hello.protocol_version);
match (psk, hello.psk) {
(Some(_), false) => DirectProtocolResponse::reject("receiver requires PSK authentication"),
(Some(_), true) => {
DirectProtocolResponse::accept_with_psk(selected_version, Uuid::new_v4().to_string())
}
(None, true) => DirectProtocolResponse::reject("receiver is not configured for PSK mode"),
(None, false) => DirectProtocolResponse::accept(selected_version),
}
}
#[derive(Debug, Clone)]
struct ManifestSource {
source_path: PathBuf,
relative_path: PathBuf,
}
fn collect_manifest_sources(paths: &[PathBuf]) -> Result<Vec<ManifestSource>> {
let mut sources = Vec::new();
for path in paths {
let metadata = std::fs::symlink_metadata(path).map_err(|source| Error::IoPath {
path: path.clone(),
source,
})?;
if metadata.file_type().is_symlink() {
return Err(Error::InvalidInput(format!(
"{} is a symlink; symlink transfers are not supported",
display_path(path)
)));
}
let root_name = path
.file_name()
.ok_or_else(|| Error::InvalidInput(format!("{} has no file name", display_path(path))))?
.to_os_string();
let relative_path = PathBuf::from(root_name);
if metadata.is_dir() {
sources.push(ManifestSource {
source_path: path.clone(),
relative_path: relative_path.clone(),
});
collect_dir_sources(path, &relative_path, &mut sources)?;
} else if metadata.is_file() {
sources.push(ManifestSource {
source_path: path.clone(),
relative_path,
});
} else {
return Err(Error::InvalidInput(format!(
"{} is not a regular file or directory",
display_path(path)
)));
}
}
Ok(sources)
}
fn collect_dir_sources(
dir: &Path,
relative_dir: &Path,
sources: &mut Vec<ManifestSource>,
) -> Result<()> {
for entry in std::fs::read_dir(dir).map_err(|source| Error::IoPath {
path: dir.to_path_buf(),
source,
})? {
let entry = entry.map_err(|source| Error::IoPath {
path: dir.to_path_buf(),
source,
})?;
let path = entry.path();
let metadata = std::fs::symlink_metadata(&path).map_err(|source| Error::IoPath {
path: path.clone(),
source,
})?;
if metadata.file_type().is_symlink() {
return Err(Error::InvalidInput(format!(
"{} is a symlink; symlink transfers are not supported",
display_path(&path)
)));
}
let relative_path = relative_dir.join(entry.file_name());
if metadata.is_dir() {
sources.push(ManifestSource {
source_path: path.clone(),
relative_path: relative_path.clone(),
});
collect_dir_sources(&path, &relative_path, sources)?;
} else if metadata.is_file() {
sources.push(ManifestSource {
source_path: path,
relative_path,
});
}
}
Ok(())
}
fn manifest_source_map(paths: &[PathBuf]) -> Result<BTreeMap<PathBuf, PathBuf>> {
Ok(collect_manifest_sources(paths)?
.into_iter()
.filter_map(|source| {
let metadata = std::fs::metadata(&source.source_path).ok()?;
metadata
.is_file()
.then_some((source.relative_path, source.source_path))
})
.collect())
}
async fn build_manifest_entry(source: ManifestSource) -> Result<ManifestEntry> {
validate_relative_transfer_path(&source.relative_path)?;
let metadata = tokio::fs::metadata(&source.source_path)
.await
.map_err(|source_error| Error::IoPath {
path: source.source_path.clone(),
source: source_error,
})?;
let unix_mode = unix_mode(&metadata);
let modified_unix_ms = modified_unix_ms(&metadata);
if metadata.is_dir() {
return Ok(ManifestEntry {
relative_path: source.relative_path,
kind: ManifestEntryKind::Directory,
size: 0,
blake3: None,
chunks: Vec::new(),
unix_mode,
modified_unix_ms,
});
}
let (blake3, chunks) = hash_file_with_chunks(&source.source_path, DEFAULT_CHUNK_SIZE).await?;
Ok(ManifestEntry {
relative_path: source.relative_path,
kind: ManifestEntryKind::File,
size: metadata.len(),
blake3: Some(blake3),
chunks,
unix_mode,
modified_unix_ms,
})
}
fn validate_manifest(manifest: &TransferManifest) -> Result<()> {
if manifest.version != MANIFEST_VERSION {
return Err(Error::Protocol(format!(
"unsupported manifest version {}",
manifest.version
)));
}
if manifest.chunk_size == 0 {
return Err(Error::Protocol("manifest chunk size is zero".to_string()));
}
let mut seen = BTreeSet::new();
for entry in &manifest.entries {
validate_relative_transfer_path(&entry.relative_path)?;
if !seen.insert(entry.relative_path.clone()) {
return Err(Error::Protocol(format!(
"duplicate manifest path: {}",
display_path(&entry.relative_path)
)));
}
if entry.kind == ManifestEntryKind::File && entry.blake3.is_none() {
return Err(Error::Protocol(format!(
"file manifest entry is missing BLAKE3: {}",
display_path(&entry.relative_path)
)));
}
}
Ok(())
}
fn validate_relative_transfer_path(path: &Path) -> Result<()> {
if path.as_os_str().is_empty() || path.is_absolute() {
return Err(Error::InvalidInput(format!(
"unsafe transfer path: {}",
display_path(path)
)));
}
let mut normal_components = 0usize;
for component in path.components() {
match component {
Component::Normal(value) => {
let Some(name) = value.to_str() else {
return Err(Error::InvalidInput(format!(
"transfer path must be UTF-8: {}",
display_path(path)
)));
};
validate_path_component(name, path)?;
normal_components += 1;
}
Component::CurDir
| Component::ParentDir
| Component::RootDir
| Component::Prefix(_) => {
return Err(Error::InvalidInput(format!(
"unsafe transfer path: {}",
display_path(path)
)));
}
}
}
if normal_components == 0 {
return Err(Error::InvalidInput(format!(
"unsafe transfer path: {}",
display_path(path)
)));
}
Ok(())
}
fn validate_path_component(component: &str, full_path: &Path) -> Result<()> {
if component.is_empty()
|| component.contains('\\')
|| component.contains('/')
|| component.contains(':')
|| component.ends_with(' ')
|| component.ends_with('.')
|| is_windows_reserved_name(component)
{
return Err(Error::InvalidInput(format!(
"unsafe transfer path: {}",
display_path(full_path)
)));
}
Ok(())
}
fn is_windows_reserved_name(component: &str) -> bool {
let stem = component
.split('.')
.next()
.unwrap_or(component)
.trim_end_matches([' ', '.'])
.to_ascii_uppercase();
matches!(
stem.as_str(),
"CON"
| "PRN"
| "AUX"
| "NUL"
| "COM1"
| "COM2"
| "COM3"
| "COM4"
| "COM5"
| "COM6"
| "COM7"
| "COM8"
| "COM9"
| "LPT1"
| "LPT2"
| "LPT3"
| "LPT4"
| "LPT5"
| "LPT6"
| "LPT7"
| "LPT8"
| "LPT9"
)
}
async fn hash_file(path: &Path) -> Result<String> {
hash_file_with_chunks(path, DEFAULT_CHUNK_SIZE)
.await
.map(|(hash, _)| hash)
}
async fn hash_file_with_chunks(path: &Path, chunk_size: u64) -> Result<(String, Vec<String>)> {
let mut file = tokio::fs::File::open(path)
.await
.map_err(|source| Error::IoPath {
path: path.to_path_buf(),
source,
})?;
let mut hasher = blake3::Hasher::new();
let mut chunk_hasher = blake3::Hasher::new();
let mut chunk_bytes = 0u64;
let mut chunks = Vec::new();
let mut buffer = vec![0; IO_BUFFER_SIZE];
loop {
let read = file
.read(&mut buffer)
.await
.map_err(|source| Error::IoPath {
path: path.to_path_buf(),
source,
})?;
if read == 0 {
break;
}
hasher.update(&buffer[..read]);
let mut remaining = &buffer[..read];
while !remaining.is_empty() {
let take = remaining.len().min((chunk_size - chunk_bytes) as usize);
chunk_hasher.update(&remaining[..take]);
chunk_bytes += take as u64;
remaining = &remaining[take..];
if chunk_bytes == chunk_size {
chunks.push(chunk_hasher.finalize().to_hex().to_string());
chunk_hasher = blake3::Hasher::new();
chunk_bytes = 0;
}
}
}
if chunk_bytes > 0 {
chunks.push(chunk_hasher.finalize().to_hex().to_string());
}
Ok((hasher.finalize().to_hex().to_string(), chunks))
}
async fn verified_prefix_offset(
path: &Path,
entry: &ManifestEntry,
existing_len: u64,
chunk_size: u64,
) -> Result<u64> {
if existing_len == 0 || entry.chunks.is_empty() {
return Ok(0);
}
let chunks_to_check = (existing_len / chunk_size).min(entry.chunks.len() as u64) as usize;
if chunks_to_check == 0 {
return Ok(0);
}
let mut file = tokio::fs::File::open(path)
.await
.map_err(|source| Error::IoPath {
path: path.to_path_buf(),
source,
})?;
let mut buffer = vec![0; IO_BUFFER_SIZE];
let mut verified_chunks = 0usize;
for expected in entry.chunks.iter().take(chunks_to_check) {
let mut remaining = chunk_size;
let mut hasher = blake3::Hasher::new();
while remaining > 0 {
let read_size = buffer.len().min(remaining as usize);
file.read_exact(&mut buffer[..read_size])
.await
.map_err(|source| Error::IoPath {
path: path.to_path_buf(),
source,
})?;
hasher.update(&buffer[..read_size]);
remaining -= read_size as u64;
}
if hasher.finalize().to_hex().as_str() != expected {
break;
}
verified_chunks += 1;
}
Ok(verified_chunks as u64 * chunk_size)
}
fn summary_from_files(
manifest: &TransferManifest,
files: Vec<TransferFileSummary>,
) -> Result<TransferSummary> {
let bytes = manifest.total_file_bytes();
let blake3 = if files.len() == 1 {
files[0].blake3.clone()
} else {
manifest.digest()?
};
let file_name = if files.len() == 1 {
display_path(&files[0].relative_path)
} else {
format!("{} files", files.len())
};
let path = files
.first()
.map(|file| file.path.clone())
.unwrap_or_default();
Ok(TransferSummary {
path,
file_name,
bytes,
blake3,
files,
})
}
fn temp_path_for(final_path: &Path) -> PathBuf {
let file_name = final_path
.file_name()
.and_then(|name| name.to_str())
.unwrap_or("transfer");
final_path.with_file_name(format!(".{file_name}.{}.ferry-part", Uuid::new_v4()))
}
fn modified_unix_ms(metadata: &Metadata) -> Option<i128> {
system_time_unix_ms(metadata.modified().ok()?)
}
fn system_time_unix_ms(time: SystemTime) -> Option<i128> {
match time.duration_since(UNIX_EPOCH) {
Ok(duration) => Some(duration.as_millis() as i128),
Err(error) => Some(-(error.duration().as_millis() as i128)),
}
}
#[cfg(unix)]
fn unix_mode(metadata: &Metadata) -> Option<u32> {
use std::os::unix::fs::PermissionsExt;
Some(metadata.permissions().mode())
}
#[cfg(not(unix))]
fn unix_mode(_metadata: &Metadata) -> Option<u32> {
None
}
fn client_config_capturing_server_fingerprint()
-> Result<(quinn::ClientConfig, Arc<Mutex<Option<String>>>)> {
let server_fingerprint = Arc::new(Mutex::new(None));
let crypto = rustls::ClientConfig::builder()
.dangerous()
.with_custom_certificate_verifier(ServerFingerprintVerifier::new(
server_fingerprint.clone(),
))
.with_no_client_auth();
let config = quinn::ClientConfig::new(Arc::new(
QuicClientConfig::try_from(crypto)
.map_err(|error| Error::CryptoConfig(error.to_string()))?,
));
Ok((config, server_fingerprint))
}
fn captured_server_fingerprint(fingerprint: &Arc<Mutex<Option<String>>>) -> Result<String> {
fingerprint
.lock()
.map_err(|_| Error::CryptoConfig("server fingerprint capture lock poisoned".to_string()))?
.clone()
.ok_or_else(|| {
Error::CryptoConfig("server certificate fingerprint was not captured".to_string())
})
}
#[derive(Debug)]
struct ServerFingerprintVerifier {
provider: Arc<CryptoProvider>,
server_fingerprint: Arc<Mutex<Option<String>>>,
}
impl ServerFingerprintVerifier {
fn new(server_fingerprint: Arc<Mutex<Option<String>>>) -> Arc<Self> {
Arc::new(Self {
provider: Arc::new(rustls::crypto::ring::default_provider()),
server_fingerprint,
})
}
}
impl ServerCertVerifier for ServerFingerprintVerifier {
fn verify_server_cert(
&self,
end_entity: &CertificateDer<'_>,
_intermediates: &[CertificateDer<'_>],
_server_name: &ServerName<'_>,
_ocsp: &[u8],
_now: UnixTime,
) -> std::result::Result<ServerCertVerified, rustls::Error> {
let fingerprint = blake3::hash(end_entity.as_ref()).to_hex().to_string();
*self.server_fingerprint.lock().map_err(|_| {
rustls::Error::General("server fingerprint capture lock poisoned".to_string())
})? = Some(fingerprint);
Ok(ServerCertVerified::assertion())
}
fn verify_tls12_signature(
&self,
message: &[u8],
cert: &CertificateDer<'_>,
dss: &DigitallySignedStruct,
) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
verify_tls12_signature(
message,
cert,
dss,
&self.provider.signature_verification_algorithms,
)
}
fn verify_tls13_signature(
&self,
message: &[u8],
cert: &CertificateDer<'_>,
dss: &DigitallySignedStruct,
) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
verify_tls13_signature(
message,
cert,
dss,
&self.provider.signature_verification_algorithms,
)
}
fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
self.provider
.signature_verification_algorithms
.supported_schemes()
}
}
fn ensure_rustls_provider() {
let _ = rustls::crypto::ring::default_provider().install_default();
}
pub fn config_dir() -> Result<PathBuf> {
let base_dirs = BaseDirs::new().ok_or(Error::ConfigDirUnavailable)?;
Ok(base_dirs.config_dir().join(CONFIG_DIR_NAME))
}
fn default_alias() -> String {
std::env::var("HOSTNAME")
.or_else(|_| std::env::var("COMPUTERNAME"))
.ok()
.filter(|value| !value.trim().is_empty())
.unwrap_or_else(|| "fileferry-device".to_string())
}
fn expand_home_path(path: &str) -> Result<PathBuf> {
if path == "~" {
return Ok(BaseDirs::new()
.ok_or(Error::ConfigDirUnavailable)?
.home_dir()
.to_path_buf());
}
if let Some(rest) = path.strip_prefix("~/") {
return Ok(BaseDirs::new()
.ok_or(Error::ConfigDirUnavailable)?
.home_dir()
.join(rest));
}
Ok(PathBuf::from(path))
}
fn normalized_fingerprint(fingerprint: String) -> Result<String> {
let fingerprint = fingerprint.trim().to_ascii_lowercase();
if fingerprint.len() != 64 || !fingerprint.chars().all(|char| char.is_ascii_hexdigit()) {
return Err(Error::InvalidInput(
"peer fingerprint must be a full 64-character hex BLAKE3 fingerprint".to_string(),
));
}
Ok(fingerprint)
}
fn validate_psk(psk: String) -> Result<String> {
if psk.is_empty() {
return Err(Error::InvalidInput("PSK must not be empty".to_string()));
}
Ok(psk)
}
fn psk_proof(
psk: &str,
challenge: &str,
client_fingerprint: &str,
protocol_version: u16,
) -> Result<String> {
validate_psk(psk.to_string())?;
let key = blake3::hash(psk.as_bytes());
let mut message = Vec::new();
message.extend_from_slice(PSK_PROOF_CONTEXT);
message.push(0);
message.extend_from_slice(challenge.as_bytes());
message.push(0);
message.extend_from_slice(client_fingerprint.as_bytes());
message.push(0);
message.extend_from_slice(protocol_version.to_string().as_bytes());
Ok(blake3::keyed_hash(key.as_bytes(), &message)
.to_hex()
.to_string())
}
fn write_toml_file<T: Serialize>(path: &Path, value: &T) -> Result<()> {
let bytes =
toml::to_string_pretty(value).map_err(|error| Error::ConfigSerialize(error.to_string()))?;
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).map_err(|source| Error::IoPath {
path: parent.to_path_buf(),
source,
})?;
}
let temp_path = path.with_extension(format!("tmp-{}", Uuid::new_v4()));
std::fs::write(&temp_path, bytes).map_err(|source| Error::IoPath {
path: temp_path.clone(),
source,
})?;
std::fs::rename(&temp_path, path).map_err(|source| Error::IoPath {
path: path.to_path_buf(),
source,
})?;
Ok(())
}
fn write_private_file(path: &Path, bytes: &[u8]) -> Result<()> {
std::fs::write(path, bytes).map_err(|source| Error::IoPath {
path: path.to_path_buf(),
source,
})?;
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let mut permissions = std::fs::metadata(path)
.map_err(|source| Error::IoPath {
path: path.to_path_buf(),
source,
})?
.permissions();
permissions.set_mode(0o600);
std::fs::set_permissions(path, permissions).map_err(|source| Error::IoPath {
path: path.to_path_buf(),
source,
})?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{Arc, Mutex};
#[test]
fn direct_peer_parses_socket_address() {
let peer = DirectPeer::parse("127.0.0.1:53318").expect("address parses");
assert_eq!(peer.address().to_string(), "127.0.0.1:53318");
assert_eq!(peer.expected_fingerprint(), None);
}
#[test]
fn direct_peer_accepts_expected_fingerprint() {
let fingerprint = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
let peer = DirectPeer::parse("127.0.0.1:53318")
.expect("address parses")
.with_expected_fingerprint(fingerprint)
.expect("fingerprint parses");
assert_eq!(peer.expected_fingerprint(), Some(fingerprint));
}
#[test]
fn direct_peer_rejects_missing_port() {
assert!(DirectPeer::parse("127.0.0.1").is_err());
}
#[test]
fn send_request_requires_at_least_one_path() {
let peer = DirectPeer::parse("127.0.0.1:53318").expect("address parses");
assert!(SendRequest::new(peer, Vec::new()).is_err());
}
#[test]
fn psk_secret_is_redacted_in_debug_and_serialization() {
let secret = PskSecret::new("do-not-print").expect("psk accepted");
assert_eq!(format!("{secret:?}"), "PskSecret(<redacted>)");
assert_eq!(
serde_json::to_string(&secret).expect("secret serializes"),
r#""<redacted>""#
);
let peer = DirectPeer::parse("127.0.0.1:53318").expect("address parses");
let request = SendRequest::new(peer, vec![PathBuf::from("file.txt")])
.expect("send request")
.with_psk("do-not-print")
.expect("psk accepted");
assert!(!format!("{request:?}").contains("do-not-print"));
}
#[test]
fn validate_send_paths_rejects_empty_slice() {
assert!(validate_send_paths(&[]).is_err());
}
#[tokio::test]
async fn manifest_walks_directory_and_serializes_entries() {
let temp = tempfile::tempdir().expect("tempdir");
let root = temp.path().join("bundle");
let nested = root.join("nested");
tokio::fs::create_dir_all(&nested).await.expect("dirs");
tokio::fs::write(root.join("a.txt"), b"alpha")
.await
.expect("file a");
tokio::fs::write(nested.join("b.txt"), b"beta")
.await
.expect("file b");
let manifest = build_manifest(&[root]).await.expect("manifest");
let json = serde_json::to_string(&manifest).expect("manifest serializes");
let decoded: TransferManifest = serde_json::from_str(&json).expect("manifest decodes");
assert_eq!(decoded.version, MANIFEST_VERSION);
assert!(decoded.entries.iter().any(|entry| {
entry.kind == ManifestEntryKind::Directory
&& entry.relative_path == Path::new("bundle/nested")
}));
assert!(decoded.entries.iter().any(|entry| {
entry.kind == ManifestEntryKind::File
&& entry.relative_path == Path::new("bundle/nested/b.txt")
&& entry.size == 4
&& entry.blake3.is_some()
}));
}
#[test]
fn safe_destination_path_rejects_unsafe_paths() {
let dest = Path::new("/tmp/ferry-dest");
assert!(safe_destination_path(dest, Path::new("../escape.txt")).is_err());
assert!(safe_destination_path(dest, Path::new("/absolute.txt")).is_err());
assert!(safe_destination_path(dest, Path::new("nested/CON")).is_err());
assert!(safe_destination_path(dest, Path::new("nested\\escape.txt")).is_err());
assert!(safe_destination_path(dest, Path::new("nested/ok.txt")).is_ok());
}
#[test]
fn safe_destination_path_rejects_generated_escape_and_reserved_patterns() {
let dest = Path::new("/tmp/ferry-dest");
let unsafe_components = [
"..",
".",
"CON",
"con.txt",
"NUL",
"COM1",
"LPT9",
"trailingspace ",
"trailingdot.",
"has:colon",
"has\\backslash",
];
for component in unsafe_components {
assert!(
safe_destination_path(dest, Path::new(component)).is_err(),
"{component} should be rejected as a top-level path"
);
if component != "." {
assert!(
safe_destination_path(dest, Path::new("safe").join(component).as_path())
.is_err(),
"{component} should be rejected as a nested path"
);
}
}
for component in ["alpha", "alpha-1", "alpha_1", "report.final.txt"] {
let path = safe_destination_path(dest, Path::new("safe").join(component).as_path())
.expect("safe generated path accepted");
assert!(path.starts_with(dest));
}
}
#[tokio::test]
async fn resume_decision_skips_existing_matching_file() {
let temp = tempfile::tempdir().expect("tempdir");
let source = temp.path().join("source.txt");
let dest = temp.path().join("dest");
tokio::fs::create_dir_all(&dest).await.expect("dest dir");
tokio::fs::write(&source, b"same contents")
.await
.expect("source");
tokio::fs::write(dest.join("source.txt"), b"same contents")
.await
.expect("dest file");
let manifest = build_manifest(&[source]).await.expect("manifest");
let entry = manifest.file_entries().next().expect("file entry");
let decision = decide_resume(&dest, entry, manifest.chunk_size)
.await
.expect("decision");
assert_eq!(decision, ResumeDecision::Skip);
}
#[tokio::test]
async fn resume_decision_returns_verified_chunk_offset() {
let temp = tempfile::tempdir().expect("tempdir");
let source = temp.path().join("large.bin");
let dest = temp.path().join("dest");
tokio::fs::create_dir_all(&dest).await.expect("dest dir");
let bytes = vec![42; (DEFAULT_CHUNK_SIZE as usize * 2) + 128];
tokio::fs::write(&source, &bytes).await.expect("source");
tokio::fs::write(
dest.join("large.bin"),
&bytes[..DEFAULT_CHUNK_SIZE as usize + 99],
)
.await
.expect("partial");
let manifest = build_manifest(&[source]).await.expect("manifest");
let entry = manifest.file_entries().next().expect("file entry");
let decision = decide_resume(&dest, entry, manifest.chunk_size)
.await
.expect("decision");
assert_eq!(decision, ResumeDecision::ResumeFrom(DEFAULT_CHUNK_SIZE));
}
#[tokio::test]
async fn resume_decision_rejects_existing_file_with_mismatched_contents() {
let temp = tempfile::tempdir().expect("tempdir");
let source = temp.path().join("source.txt");
let dest = temp.path().join("dest");
tokio::fs::create_dir_all(&dest).await.expect("dest dir");
tokio::fs::write(&source, b"expected contents")
.await
.expect("source");
tokio::fs::write(dest.join("source.txt"), b"different contents")
.await
.expect("conflicting dest file");
let manifest = build_manifest(&[source]).await.expect("manifest");
let entry = manifest.file_entries().next().expect("file entry");
let decision = decide_resume(&dest, entry, manifest.chunk_size)
.await
.expect("decision");
assert!(matches!(decision, ResumeDecision::Conflict(_)));
}
#[tokio::test]
async fn resume_decision_rejects_existing_file_larger_than_manifest_entry() {
let temp = tempfile::tempdir().expect("tempdir");
let source = temp.path().join("source.txt");
let dest = temp.path().join("dest");
tokio::fs::create_dir_all(&dest).await.expect("dest dir");
tokio::fs::write(&source, b"small").await.expect("source");
tokio::fs::write(dest.join("source.txt"), b"larger destination")
.await
.expect("larger dest file");
let manifest = build_manifest(&[source]).await.expect("manifest");
let entry = manifest.file_entries().next().expect("file entry");
let decision = decide_resume(&dest, entry, manifest.chunk_size)
.await
.expect("decision");
assert!(matches!(decision, ResumeDecision::Conflict(_)));
}
#[test]
fn identity_is_loaded_after_generation() {
let temp = tempfile::tempdir().expect("tempdir");
let generated =
NativeIdentity::load_or_generate_in(temp.path()).expect("identity generated");
let loaded = NativeIdentity::load_or_generate_in(temp.path()).expect("identity loaded");
assert_eq!(generated.fingerprint(), loaded.fingerprint());
}
#[test]
fn app_config_round_trips_toml() {
let temp = tempfile::tempdir().expect("tempdir");
let path = temp.path().join("config.toml");
let config = AppConfig {
alias: "desk".to_string(),
..AppConfig::default()
};
config.save_to_path(&path).expect("config saved");
let loaded = AppConfig::load_or_default_from(&path).expect("config loaded");
assert_eq!(loaded.alias, "desk");
assert_eq!(loaded.listen_port, 53317);
assert!(loaded.trust.require_fingerprint);
}
#[test]
fn app_config_redacts_psk_for_display_output() {
let config = AppConfig {
trust: TrustConfig {
psk: "super-secret-psk".to_string(),
..TrustConfig::default()
},
..AppConfig::default()
};
let redacted = config.to_redacted_toml_string().expect("config serializes");
assert!(!redacted.contains("super-secret-psk"));
assert!(redacted.contains(r#"psk = "<redacted>""#));
}
#[test]
fn daemon_config_defaults_from_app_config_and_round_trips_toml() {
let temp = tempfile::tempdir().expect("tempdir");
let path = temp.path().join("daemon.toml");
let app_config = AppConfig {
quic_port: 54444,
download_dir: temp.path().join("downloads").display().to_string(),
..AppConfig::default()
};
let defaulted =
DaemonConfig::load_or_default_from(&path, &app_config).expect("daemon config");
assert_eq!(
defaulted.listen,
SocketAddr::from(([0, 0, 0, 0], app_config.quic_port))
);
assert_eq!(defaulted.destination, temp.path().join("downloads"));
let config = DaemonConfig {
listen: SocketAddr::from(([127, 0, 0, 1], 53318)),
destination: temp.path().join("daemon-dest"),
};
config.save_to_path(&path).expect("daemon config saved");
let loaded =
DaemonConfig::load_or_default_from(&path, &app_config).expect("daemon config loaded");
assert_eq!(loaded, config);
}
#[test]
fn trust_store_load_save_and_forget_round_trip() {
let temp = tempfile::tempdir().expect("tempdir");
let path = temp.path().join("known_peers.toml");
let fingerprint = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
let mut store = TrustStore::default();
store
.trust_fingerprint(fingerprint)
.expect("fingerprint trusted");
store.save_to_path(&path).expect("trust store saved");
let mut loaded = TrustStore::load_or_default_from(&path).expect("trust store loaded");
assert_eq!(loaded.records().count(), 1);
assert_eq!(loaded.peers[fingerprint].trust_state, TrustState::Trusted);
assert!(loaded.forget(fingerprint));
assert_eq!(loaded.records().count(), 0);
}
#[test]
fn trust_store_rejects_short_fingerprint_without_discovery_lookup() {
let mut store = TrustStore::default();
assert!(store.trust_fingerprint("9a4f2c").is_err());
}
#[test]
fn trust_store_pins_direct_address_to_fingerprint() {
let fingerprint = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
let address = SocketAddr::from(([192, 168, 1, 42], 53318));
let mut store = TrustStore::default();
store
.trust_direct_address(fingerprint, address)
.expect("direct address trusted");
assert!(store.is_trusted_fingerprint(fingerprint));
assert_eq!(
store.trusted_fingerprint_for_address(address),
Some(fingerprint)
);
assert!(store.peers[fingerprint].addresses.contains(&address));
assert!(store.peers[fingerprint].transports.contains("quic"));
assert_eq!(store.peers[fingerprint].trust_state, TrustState::Trusted);
}
#[test]
fn transfer_events_serialize_with_stable_event_names() {
let event = TransferEvent::Progress {
direction: TransferDirection::Send,
file_name: "bundle/a.txt".to_string(),
bytes_done: 5,
bytes_total: 9,
};
let json = serde_json::to_value(&event).expect("event serializes");
assert_eq!(json["event"], "progress");
assert_eq!(json["direction"], "send");
assert_eq!(json["file_name"], "bundle/a.txt");
assert_eq!(json["bytes_done"], 5);
assert_eq!(json["bytes_total"], 9);
}
#[test]
fn direct_protocol_hello_has_stable_golden_json() {
let hello = DirectProtocolHello {
protocol_version: 1,
min_protocol_version: 1,
client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
.to_string(),
psk: false,
};
let json = serde_json::to_string(&hello).expect("hello serializes");
assert_eq!(
json,
r#"{"protocol_version":1,"min_protocol_version":1,"client_fingerprint":"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef","psk":false}"#
);
}
#[test]
fn direct_protocol_response_has_stable_golden_json() {
let response = DirectProtocolResponse::accept(1);
let json = serde_json::to_string(&response).expect("response serializes");
assert_eq!(
json,
r#"{"accepted":true,"protocol_version":1,"psk_challenge":null,"error":null}"#
);
}
#[test]
fn direct_protocol_decodes_pre_psk_negotiation_frames() {
let hello: DirectProtocolHello = serde_json::from_str(
r#"{"protocol_version":1,"min_protocol_version":1,"client_fingerprint":"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"}"#,
)
.expect("old hello decodes");
assert!(!hello.psk);
let response: DirectProtocolResponse =
serde_json::from_str(r#"{"accepted":true,"protocol_version":1,"error":null}"#)
.expect("old response decodes");
assert_eq!(response, DirectProtocolResponse::accept(1));
}
#[test]
fn direct_protocol_negotiates_supported_overlap() {
let hello = DirectProtocolHello {
protocol_version: 3,
min_protocol_version: 1,
client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
.to_string(),
psk: false,
};
let response = negotiate_direct_protocol(&hello, None);
assert_eq!(response, DirectProtocolResponse::accept(1));
}
#[test]
fn direct_protocol_rejects_incompatible_versions() {
let too_new = DirectProtocolHello {
protocol_version: 3,
min_protocol_version: 2,
client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
.to_string(),
psk: false,
};
let too_old = DirectProtocolHello {
protocol_version: 0,
min_protocol_version: 0,
client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
.to_string(),
psk: false,
};
assert!(!negotiate_direct_protocol(&too_new, None).accepted);
assert!(!negotiate_direct_protocol(&too_old, None).accepted);
}
#[test]
fn direct_protocol_requires_matching_psk_mode() {
let no_psk = DirectProtocolHello {
protocol_version: 1,
min_protocol_version: 1,
client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
.to_string(),
psk: false,
};
let with_psk = DirectProtocolHello {
psk: true,
..no_psk.clone()
};
assert!(!negotiate_direct_protocol(&no_psk, Some("secret")).accepted);
assert!(!negotiate_direct_protocol(&with_psk, None).accepted);
let accepted = negotiate_direct_protocol(&with_psk, Some("secret"));
assert!(accepted.accepted);
assert!(accepted.psk_challenge.is_some());
}
#[test]
fn transfer_manifest_has_stable_golden_json() {
let manifest = TransferManifest {
version: MANIFEST_VERSION,
session_id: Uuid::parse_str("00000000-0000-0000-0000-000000000001")
.expect("uuid parses"),
chunk_size: DEFAULT_CHUNK_SIZE,
entries: vec![ManifestEntry {
relative_path: PathBuf::from("bundle/a.txt"),
kind: ManifestEntryKind::File,
size: 5,
blake3: Some(
"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef".to_string(),
),
chunks: vec![
"abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789".to_string(),
],
unix_mode: Some(0o100644),
modified_unix_ms: Some(1_700_000_000_000),
}],
};
let json = serde_json::to_string(&manifest).expect("manifest serializes");
assert_eq!(
json,
r#"{"version":1,"session_id":"00000000-0000-0000-0000-000000000001","chunk_size":1048576,"entries":[{"relative_path":"bundle/a.txt","kind":"file","size":5,"blake3":"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef","chunks":["abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789"],"unix_mode":33188,"modified_unix_ms":1700000000000}]}"#
);
}
#[test]
fn peer_registry_merges_observations_by_fingerprint() {
let mut registry = PeerRegistry::new();
let fingerprint = "abcdef1234567890";
let first = PeerObservation::new(
fingerprint,
SocketAddr::from(([192, 168, 1, 10], 53318)),
100,
)
.with_alias("desk")
.with_hostname("desk.local")
.with_transport("quic");
let second = PeerObservation::new(
fingerprint,
SocketAddr::from(([192, 168, 1, 11], 53318)),
200,
)
.with_alias("workstation")
.with_transport("quic");
registry.observe(first).expect("first observation");
registry.observe(second).expect("second observation");
let record = registry
.get_by_fingerprint(fingerprint)
.expect("record by fingerprint");
assert_eq!(registry.records().count(), 1);
assert!(record.aliases.contains("desk"));
assert!(record.aliases.contains("workstation"));
assert!(record.hostnames.contains("desk.local"));
assert_eq!(record.addresses.len(), 2);
assert_eq!(record.first_seen_unix_ms, 100);
assert_eq!(record.last_seen_unix_ms, 200);
}
#[test]
fn peer_registry_looks_up_unique_fingerprint_prefix_alias_and_hostname() {
let mut registry = PeerRegistry::new();
registry
.observe(
PeerObservation::new(
"abcdef1234567890",
SocketAddr::from(([192, 168, 1, 10], 53318)),
100,
)
.with_alias("desk")
.with_hostname("desk.local"),
)
.expect("first observation");
registry
.observe(
PeerObservation::new(
"123456abcdef7890",
SocketAddr::from(([192, 168, 1, 20], 53318)),
100,
)
.with_alias("laptop"),
)
.expect("second observation");
assert_eq!(
registry
.lookup("abcdef")
.map(|record| record.fingerprint.as_str()),
Some("abcdef1234567890")
);
assert_eq!(
registry
.lookup("desk")
.map(|record| record.fingerprint.as_str()),
Some("abcdef1234567890")
);
assert_eq!(
registry
.lookup("desk.local")
.map(|record| record.fingerprint.as_str()),
Some("abcdef1234567890")
);
assert!(registry.lookup("missing").is_none());
}
#[test]
fn peer_registry_rejects_ambiguous_lookup_hints() {
let mut registry = PeerRegistry::new();
registry
.observe(
PeerObservation::new(
"abcdef1234567890",
SocketAddr::from(([192, 168, 1, 10], 53318)),
100,
)
.with_alias("desk"),
)
.expect("first observation");
registry
.observe(
PeerObservation::new(
"abcdef9999999999",
SocketAddr::from(([192, 168, 1, 11], 53318)),
100,
)
.with_alias("desk"),
)
.expect("second observation");
assert!(registry.lookup("abcdef").is_none());
assert!(registry.lookup("desk").is_none());
assert_eq!(
registry
.lookup("abcdef1234567890")
.map(|record| record.fingerprint.as_str()),
Some("abcdef1234567890")
);
match registry.lookup_detail("desk") {
PeerLookup::Ambiguous(records) => assert_eq!(records.len(), 2),
other => panic!("expected ambiguous duplicate-alias lookup, got {other:?}"),
}
assert!(matches!(
registry.lookup_detail("missing"),
PeerLookup::Missing
));
}
#[test]
fn native_announcement_builds_expected_txt_properties() {
let announcement = NativeAnnouncement {
alias: "Stephen MBP".to_string(),
fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
.to_string(),
quic_port: 53318,
listen_port: Some(53317),
};
let service = announcement.service_info().expect("service info");
assert_eq!(service.get_type(), NATIVE_SERVICE_TYPE);
assert!(
service
.get_fullname()
.starts_with("stephen-mbp-0123456789ab.")
);
assert_eq!(service.get_property_val_str("pv"), Some("1"));
assert_eq!(service.get_property_val_str("minpv"), Some("1"));
assert_eq!(
service.get_property_val_str("fp"),
Some("0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")
);
assert_eq!(service.get_property_val_str("alias"), Some("Stephen MBP"));
assert_eq!(service.get_property_val_str("transports"), Some("quic"));
assert_eq!(service.get_property_val_str("quic_port"), Some("53318"));
assert_eq!(service.get_property_val_str("listen_port"), Some("53317"));
}
#[test]
fn resolved_native_service_merges_into_registry() {
let properties = [
("pv", "1"),
("minpv", "1"),
(
"fp",
"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
),
("alias", "desk"),
("transports", "quic"),
("quic_port", "53318"),
];
let service = ServiceInfo::new(
NATIVE_SERVICE_TYPE,
"desk-0123456789ab",
"desk.local.",
"192.168.1.42",
53318,
&properties[..],
)
.expect("service info")
.as_resolved_service();
let mut registry = PeerRegistry::new();
observe_resolved_service(&mut registry, &service).expect("observation");
let record = registry.lookup("desk").expect("record by alias");
assert_eq!(
record.fingerprint,
"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
);
assert_eq!(
record.preferred_quic_address(),
Some(SocketAddr::from(([192, 168, 1, 42], 53318)))
);
}
#[test]
fn resolved_native_service_ignores_incompatible_protocol_ranges() {
let properties = [
("pv", "3"),
("minpv", "2"),
(
"fp",
"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
),
("alias", "desk"),
("transports", "quic"),
("quic_port", "53318"),
];
let service = ServiceInfo::new(
NATIVE_SERVICE_TYPE,
"desk-0123456789ab",
"desk.local.",
"192.168.1.42",
53318,
&properties[..],
)
.expect("service info")
.as_resolved_service();
let mut registry = PeerRegistry::new();
observe_resolved_service(&mut registry, &service).expect("observation");
assert_eq!(registry.records().count(), 0);
}
#[tokio::test]
async fn direct_quic_loopback_sends_one_file() {
let source_dir = tempfile::tempdir().expect("source tempdir");
let dest_dir = tempfile::tempdir().expect("dest tempdir");
let identity_dir = tempfile::tempdir().expect("identity tempdir");
let file_path = source_dir.path().join("hello.txt");
tokio::fs::write(&file_path, b"hello from ferry")
.await
.expect("source file written");
let identity =
NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
let recv_identity = identity.clone();
let reserved_socket =
std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
let recv_addr = reserved_socket.local_addr().expect("local addr");
drop(reserved_socket);
let receive_progress = Arc::new(Mutex::new(Vec::new()));
let receive_progress_log = receive_progress.clone();
let dest_path = dest_dir.path().to_path_buf();
let server = tokio::spawn(async move {
receive_direct_file(
&ReceiveRequest::new(recv_addr),
dest_path,
&recv_identity,
|event| {
receive_progress_log
.lock()
.expect("progress lock")
.push(event);
},
)
.await
});
tokio::time::sleep(Duration::from_millis(100)).await;
let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
let send_progress = Arc::new(Mutex::new(Vec::new()));
let send_progress_log = send_progress.clone();
let send_summary = send_direct_file(&send_request, &identity, |event| {
send_progress_log.lock().expect("progress lock").push(event);
})
.await
.expect("file sent");
let receive_summary = server.await.expect("server joined").expect("file received");
let received = tokio::fs::read(dest_dir.path().join("hello.txt"))
.await
.expect("received file readable");
assert_eq!(received, b"hello from ferry");
assert_eq!(send_summary.bytes, receive_summary.bytes);
assert_eq!(send_summary.blake3, receive_summary.blake3);
let send_progress = send_progress.lock().expect("progress lock");
let receive_progress = receive_progress.lock().expect("progress lock");
assert!(matches!(
send_progress.first(),
Some(TransferEvent::SessionStarted {
direction: TransferDirection::Send,
..
})
));
assert!(
send_progress
.iter()
.any(|event| matches!(event, TransferEvent::Progress { .. }))
);
assert!(
send_progress
.iter()
.any(|event| matches!(event, TransferEvent::SessionFinished { .. }))
);
assert!(matches!(
receive_progress.first(),
Some(TransferEvent::SessionStarted {
direction: TransferDirection::Receive,
..
})
));
assert!(
receive_progress
.iter()
.any(|event| matches!(event, TransferEvent::Progress { .. }))
);
assert!(
receive_progress
.iter()
.any(|event| matches!(event, TransferEvent::SessionFinished { .. }))
);
}
#[tokio::test]
async fn direct_transfer_accepts_expected_receiver_fingerprint() {
let source_dir = tempfile::tempdir().expect("source tempdir");
let dest_dir = tempfile::tempdir().expect("dest tempdir");
let sender_identity_dir = tempfile::tempdir().expect("sender identity tempdir");
let receiver_identity_dir = tempfile::tempdir().expect("receiver identity tempdir");
let file_path = source_dir.path().join("hello.txt");
tokio::fs::write(&file_path, b"hello from ferry")
.await
.expect("source file written");
let sender_identity = NativeIdentity::load_or_generate_in(sender_identity_dir.path())
.expect("sender identity");
let receiver_identity = NativeIdentity::load_or_generate_in(receiver_identity_dir.path())
.expect("receiver identity");
let expected_fingerprint = receiver_identity.fingerprint().to_string();
let reserved_socket =
std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
let recv_addr = reserved_socket.local_addr().expect("local addr");
drop(reserved_socket);
let dest_path = dest_dir.path().to_path_buf();
let server = tokio::spawn(async move {
receive_direct_file(
&ReceiveRequest::new(recv_addr),
dest_path,
&receiver_identity,
|_| {},
)
.await
});
tokio::time::sleep(Duration::from_millis(100)).await;
let peer = DirectPeer::from_address(recv_addr)
.with_expected_fingerprint(expected_fingerprint)
.expect("expected fingerprint accepted");
let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
let send_summary = send_direct_file(&send_request, &sender_identity, |_| {})
.await
.expect("file sent");
let receive_summary = server.await.expect("server joined").expect("file received");
let received = tokio::fs::read(dest_dir.path().join("hello.txt"))
.await
.expect("received file readable");
assert_eq!(received, b"hello from ferry");
assert_eq!(send_summary.bytes, receive_summary.bytes);
assert_eq!(send_summary.blake3, receive_summary.blake3);
}
#[tokio::test]
async fn direct_transfer_confirms_unpinned_receiver_before_payload() {
let source_dir = tempfile::tempdir().expect("source tempdir");
let dest_dir = tempfile::tempdir().expect("dest tempdir");
let sender_identity_dir = tempfile::tempdir().expect("sender identity tempdir");
let receiver_identity_dir = tempfile::tempdir().expect("receiver identity tempdir");
let file_path = source_dir.path().join("hello.txt");
tokio::fs::write(&file_path, b"hello from ferry")
.await
.expect("source file written");
let sender_identity = NativeIdentity::load_or_generate_in(sender_identity_dir.path())
.expect("sender identity");
let receiver_identity = NativeIdentity::load_or_generate_in(receiver_identity_dir.path())
.expect("receiver identity");
let expected_fingerprint = receiver_identity.fingerprint().to_string();
let reserved_socket =
std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
let recv_addr = reserved_socket.local_addr().expect("local addr");
drop(reserved_socket);
let dest_path = dest_dir.path().to_path_buf();
let server = tokio::spawn(async move {
receive_direct_file(
&ReceiveRequest::new(recv_addr),
dest_path,
&receiver_identity,
|_| {},
)
.await
});
tokio::time::sleep(Duration::from_millis(100)).await;
let peer = DirectPeer::from_address(recv_addr);
let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
let callback_seen = Arc::new(AtomicBool::new(false));
let callback_seen_send = callback_seen.clone();
let send_summary = send_direct_file_with_control_and_trust(
&send_request,
&sender_identity,
TransferControl::new(),
|_| {},
|fingerprint| {
callback_seen_send.store(true, Ordering::SeqCst);
assert_eq!(fingerprint, expected_fingerprint);
Ok(())
},
)
.await
.expect("file sent after trust callback");
let receive_summary = server.await.expect("server joined").expect("file received");
assert!(callback_seen.load(Ordering::SeqCst));
assert_eq!(send_summary.bytes, receive_summary.bytes);
assert_eq!(send_summary.blake3, receive_summary.blake3);
}
#[tokio::test]
async fn direct_transfer_rejects_unexpected_receiver_fingerprint_before_payload() {
let source_dir = tempfile::tempdir().expect("source tempdir");
let dest_dir = tempfile::tempdir().expect("dest tempdir");
let sender_identity_dir = tempfile::tempdir().expect("sender identity tempdir");
let receiver_identity_dir = tempfile::tempdir().expect("receiver identity tempdir");
let other_identity_dir = tempfile::tempdir().expect("other identity tempdir");
let file_path = source_dir.path().join("hello.txt");
tokio::fs::write(&file_path, b"hello from ferry")
.await
.expect("source file written");
let sender_identity = NativeIdentity::load_or_generate_in(sender_identity_dir.path())
.expect("sender identity");
let receiver_identity = NativeIdentity::load_or_generate_in(receiver_identity_dir.path())
.expect("receiver identity");
let other_identity =
NativeIdentity::load_or_generate_in(other_identity_dir.path()).expect("other identity");
let reserved_socket =
std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
let recv_addr = reserved_socket.local_addr().expect("local addr");
drop(reserved_socket);
let dest_path = dest_dir.path().to_path_buf();
let server = tokio::spawn(async move {
receive_direct_file(
&ReceiveRequest::new(recv_addr),
dest_path,
&receiver_identity,
|_| {},
)
.await
});
tokio::time::sleep(Duration::from_millis(100)).await;
let peer = DirectPeer::from_address(recv_addr)
.with_expected_fingerprint(other_identity.fingerprint().to_string())
.expect("expected fingerprint accepted");
let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
let error = send_direct_file(&send_request, &sender_identity, |_| {})
.await
.expect_err("fingerprint mismatch should reject send");
assert!(matches!(error, Error::PeerFingerprintMismatch { .. }));
let server_error = server
.await
.expect("server joined")
.expect_err("server closed");
assert!(matches!(
server_error,
Error::Connection(_) | Error::NoIncomingConnection
));
assert!(
!tokio::fs::try_exists(dest_dir.path().join("hello.txt"))
.await
.expect("destination checked")
);
}
#[tokio::test]
async fn direct_transfer_accepts_matching_psk_before_payload() {
let source_dir = tempfile::tempdir().expect("source tempdir");
let dest_dir = tempfile::tempdir().expect("dest tempdir");
let identity_dir = tempfile::tempdir().expect("identity tempdir");
let file_path = source_dir.path().join("hello.txt");
tokio::fs::write(&file_path, b"hello from ferry")
.await
.expect("source file written");
let identity =
NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
let recv_identity = identity.clone();
let reserved_socket =
std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
let recv_addr = reserved_socket.local_addr().expect("local addr");
drop(reserved_socket);
let dest_path = dest_dir.path().to_path_buf();
let receive_request = ReceiveRequest::new(recv_addr)
.with_psk("correct horse battery staple")
.expect("receiver psk accepted");
let server = tokio::spawn(async move {
receive_direct_file(&receive_request, dest_path, &recv_identity, |_| {}).await
});
tokio::time::sleep(Duration::from_millis(100)).await;
let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
let send_request = SendRequest::new(peer, vec![file_path])
.expect("send request is valid")
.with_psk("correct horse battery staple")
.expect("sender psk accepted");
let send_summary = send_direct_file(&send_request, &identity, |_| {})
.await
.expect("file sent");
let receive_summary = server.await.expect("server joined").expect("file received");
let received = tokio::fs::read(dest_dir.path().join("hello.txt"))
.await
.expect("received file readable");
assert_eq!(received, b"hello from ferry");
assert_eq!(send_summary.bytes, receive_summary.bytes);
}
#[tokio::test]
async fn direct_transfer_rejects_missing_psk_before_payload() {
let source_dir = tempfile::tempdir().expect("source tempdir");
let dest_dir = tempfile::tempdir().expect("dest tempdir");
let identity_dir = tempfile::tempdir().expect("identity tempdir");
let file_path = source_dir.path().join("hello.txt");
tokio::fs::write(&file_path, b"hello from ferry")
.await
.expect("source file written");
let identity =
NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
let recv_identity = identity.clone();
let reserved_socket =
std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
let recv_addr = reserved_socket.local_addr().expect("local addr");
drop(reserved_socket);
let dest_path = dest_dir.path().to_path_buf();
let receive_request = ReceiveRequest::new(recv_addr)
.with_psk("receiver secret")
.expect("receiver psk accepted");
let server = tokio::spawn(async move {
receive_direct_file(&receive_request, dest_path, &recv_identity, |_| {}).await
});
tokio::time::sleep(Duration::from_millis(100)).await;
let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
let error = send_direct_file(&send_request, &identity, |_| {})
.await
.expect_err("missing psk should reject send");
assert!(matches!(error, Error::Protocol(_)));
let _ = server.await.expect("server joined");
assert!(
!tokio::fs::try_exists(dest_dir.path().join("hello.txt"))
.await
.expect("destination checked")
);
}
#[tokio::test]
async fn direct_transfer_rejects_wrong_psk_before_payload() {
let source_dir = tempfile::tempdir().expect("source tempdir");
let dest_dir = tempfile::tempdir().expect("dest tempdir");
let identity_dir = tempfile::tempdir().expect("identity tempdir");
let file_path = source_dir.path().join("hello.txt");
tokio::fs::write(&file_path, b"hello from ferry")
.await
.expect("source file written");
let identity =
NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
let recv_identity = identity.clone();
let reserved_socket =
std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
let recv_addr = reserved_socket.local_addr().expect("local addr");
drop(reserved_socket);
let dest_path = dest_dir.path().to_path_buf();
let receive_request = ReceiveRequest::new(recv_addr)
.with_psk("receiver secret")
.expect("receiver psk accepted");
let server = tokio::spawn(async move {
receive_direct_file(&receive_request, dest_path, &recv_identity, |_| {}).await
});
tokio::time::sleep(Duration::from_millis(100)).await;
let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
let send_request = SendRequest::new(peer, vec![file_path])
.expect("send request is valid")
.with_psk("wrong secret")
.expect("sender psk accepted");
let error = send_direct_file(&send_request, &identity, |_| {})
.await
.expect_err("wrong psk should reject send");
assert!(matches!(error, Error::Authentication(_)));
assert!(!error.to_string().contains("wrong secret"));
assert!(!error.to_string().contains("receiver secret"));
let server_error = server
.await
.expect("server joined")
.expect_err("server should reject wrong psk");
assert!(matches!(
server_error,
Error::Authentication(_) | Error::Write(_)
));
assert!(
!tokio::fs::try_exists(dest_dir.path().join("hello.txt"))
.await
.expect("destination checked")
);
}
#[tokio::test]
async fn direct_quic_loopback_sends_directory_and_resumes_partial_file() {
let source_dir = tempfile::tempdir().expect("source tempdir");
let dest_dir = tempfile::tempdir().expect("dest tempdir");
let identity_dir = tempfile::tempdir().expect("identity tempdir");
let bundle = source_dir.path().join("bundle");
let nested = bundle.join("nested");
tokio::fs::create_dir_all(&nested)
.await
.expect("source dirs");
tokio::fs::write(nested.join("small.txt"), b"small")
.await
.expect("small file");
let large_bytes = vec![7; (DEFAULT_CHUNK_SIZE as usize * 2) + 17];
tokio::fs::write(bundle.join("large.bin"), &large_bytes)
.await
.expect("large file");
let dest_bundle = dest_dir.path().join("bundle");
tokio::fs::create_dir_all(&dest_bundle)
.await
.expect("dest bundle");
tokio::fs::write(
dest_bundle.join("large.bin"),
&large_bytes[..DEFAULT_CHUNK_SIZE as usize + 5],
)
.await
.expect("partial large");
let identity =
NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
let recv_identity = identity.clone();
let reserved_socket =
std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
let recv_addr = reserved_socket.local_addr().expect("local addr");
drop(reserved_socket);
let dest_path = dest_dir.path().to_path_buf();
let server = tokio::spawn(async move {
receive_direct_file(
&ReceiveRequest::new(recv_addr),
dest_path,
&recv_identity,
|_| {},
)
.await
});
tokio::time::sleep(Duration::from_millis(100)).await;
let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
let send_request = SendRequest::new(peer, vec![bundle]).expect("send request is valid");
let send_summary = send_direct_file(&send_request, &identity, |_| {})
.await
.expect("directory sent");
let receive_summary = server
.await
.expect("server joined")
.expect("directory received");
let received_large = tokio::fs::read(dest_bundle.join("large.bin"))
.await
.expect("large file readable");
let received_small = tokio::fs::read(dest_bundle.join("nested/small.txt"))
.await
.expect("small file readable");
assert_eq!(received_large, large_bytes);
assert_eq!(received_small, b"small");
assert_eq!(send_summary.bytes, receive_summary.bytes);
assert!(
receive_summary
.files
.iter()
.any(|file| file.status == TransferFileStatus::Resumed)
);
}
#[tokio::test]
async fn direct_quic_loopback_cancels_send_and_does_not_finalize_partial_file() {
let source_dir = tempfile::tempdir().expect("source tempdir");
let dest_dir = tempfile::tempdir().expect("dest tempdir");
let identity_dir = tempfile::tempdir().expect("identity tempdir");
let file_path = source_dir.path().join("payload.bin");
tokio::fs::write(&file_path, vec![3; IO_BUFFER_SIZE * 64])
.await
.expect("source file written");
let identity =
NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
let recv_identity = identity.clone();
let reserved_socket =
std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
let recv_addr = reserved_socket.local_addr().expect("local addr");
drop(reserved_socket);
let dest_path = dest_dir.path().to_path_buf();
let server = tokio::spawn(async move {
receive_direct_file(
&ReceiveRequest::new(recv_addr),
dest_path,
&recv_identity,
|_| {},
)
.await
});
tokio::time::sleep(Duration::from_millis(100)).await;
let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
let send_request =
SendRequest::new(peer, vec![file_path.clone()]).expect("send request is valid");
let send_control = TransferControl::new();
let cancel_on_progress = send_control.clone();
let send_events = Arc::new(Mutex::new(Vec::new()));
let send_events_log = send_events.clone();
let send_result =
send_direct_file_with_control(&send_request, &identity, send_control, |event| {
if matches!(event, TransferEvent::Progress { .. }) {
cancel_on_progress.cancel();
}
send_events_log.lock().expect("events lock").push(event);
})
.await;
assert!(matches!(send_result, Err(Error::TransferCancelled)));
let receive_result = tokio::time::timeout(Duration::from_secs(5), server)
.await
.expect("receiver should finish after sender cancellation")
.expect("server joined");
assert!(receive_result.is_err());
assert!(!dest_dir.path().join("payload.bin").exists());
assert!(
send_events
.lock()
.expect("events lock")
.iter()
.any(|event| matches!(event, TransferEvent::SessionCancelled { .. }))
);
}
}