Skip to main content

ferry_core/
lib.rs

1pub mod error;
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::fs::Metadata;
5use std::io::SeekFrom;
6use std::net::SocketAddr;
7use std::path::{Component, Path, PathBuf};
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::{Arc, Mutex};
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12use directories::BaseDirs;
13use mdns_sd::{ResolvedService, ServiceDaemon, ServiceEvent, ServiceInfo};
14use quinn::crypto::rustls::QuicClientConfig;
15use rcgen::{CertifiedKey, generate_simple_self_signed};
16use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier};
17use rustls::crypto::{CryptoProvider, verify_tls12_signature, verify_tls13_signature};
18use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer, ServerName, UnixTime};
19use rustls::{DigitallySignedStruct, SignatureScheme};
20use serde::{Deserialize, Serialize};
21use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
22use uuid::Uuid;
23
24pub use error::{Error, Result};
25
26pub const NATIVE_PROTOCOL_VERSION: u16 = 1;
27pub const MIN_NATIVE_PROTOCOL_VERSION: u16 = 1;
28pub const NATIVE_SERVICE_TYPE: &str = "_ferry._udp.local.";
29
30const DIRECT_MAGIC: &[u8; 8] = b"FERRY01\n";
31const MAX_FRAME_LEN: usize = 16 * 1024 * 1024;
32const IO_BUFFER_SIZE: usize = 64 * 1024;
33const MANIFEST_VERSION: u16 = 1;
34const DEFAULT_CHUNK_SIZE: u64 = 1024 * 1024;
35const DEFAULT_MANIFEST_CONCURRENCY: usize = 8;
36const CONFIG_DIR_NAME: &str = "ferry";
37const PSK_PROOF_CONTEXT: &[u8] = b"fileferry-direct-psk-v1";
38
39#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
40pub struct DirectPeer {
41    address: SocketAddr,
42    expected_fingerprint: Option<String>,
43}
44
45impl DirectPeer {
46    pub fn parse(value: &str) -> Result<Self> {
47        Ok(Self {
48            address: value.parse()?,
49            expected_fingerprint: None,
50        })
51    }
52
53    pub const fn address(&self) -> SocketAddr {
54        self.address
55    }
56
57    pub const fn from_address(address: SocketAddr) -> Self {
58        Self {
59            address,
60            expected_fingerprint: None,
61        }
62    }
63
64    pub fn with_expected_fingerprint(mut self, fingerprint: impl Into<String>) -> Result<Self> {
65        self.expected_fingerprint = Some(normalized_fingerprint(fingerprint.into())?);
66        Ok(self)
67    }
68
69    pub fn expected_fingerprint(&self) -> Option<&str> {
70        self.expected_fingerprint.as_deref()
71    }
72}
73
74#[derive(Clone, PartialEq, Eq, Deserialize)]
75pub struct PskSecret(String);
76
77impl PskSecret {
78    pub fn new(psk: impl Into<String>) -> Result<Self> {
79        validate_psk(psk.into()).map(Self)
80    }
81
82    fn expose_secret(&self) -> &str {
83        &self.0
84    }
85}
86
87impl std::fmt::Debug for PskSecret {
88    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        formatter.write_str("PskSecret(<redacted>)")
90    }
91}
92
93impl Serialize for PskSecret {
94    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
95    where
96        S: serde::Serializer,
97    {
98        serializer.serialize_str("<redacted>")
99    }
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
103pub struct SendRequest {
104    peer: DirectPeer,
105    paths: Vec<PathBuf>,
106    #[serde(default, skip_serializing)]
107    psk: Option<PskSecret>,
108}
109
110impl SendRequest {
111    pub fn new(peer: DirectPeer, paths: Vec<PathBuf>) -> Result<Self> {
112        if paths.is_empty() {
113            return Err(Error::InvalidInput(
114                "at least one file path is required".to_string(),
115            ));
116        }
117
118        Ok(Self {
119            peer,
120            paths,
121            psk: None,
122        })
123    }
124
125    pub const fn peer(&self) -> &DirectPeer {
126        &self.peer
127    }
128
129    pub fn paths(&self) -> &[PathBuf] {
130        &self.paths
131    }
132
133    pub fn with_psk(mut self, psk: impl Into<String>) -> Result<Self> {
134        self.psk = Some(PskSecret::new(psk)?);
135        Ok(self)
136    }
137
138    pub fn psk(&self) -> Option<&str> {
139        self.psk.as_ref().map(PskSecret::expose_secret)
140    }
141}
142
143#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
144pub struct ReceiveRequest {
145    listen: SocketAddr,
146    #[serde(default, skip_serializing)]
147    psk: Option<PskSecret>,
148}
149
150impl ReceiveRequest {
151    pub fn new(listen: SocketAddr) -> Self {
152        Self { listen, psk: None }
153    }
154
155    pub const fn listen(&self) -> SocketAddr {
156        self.listen
157    }
158
159    pub fn with_psk(mut self, psk: impl Into<String>) -> Result<Self> {
160        self.psk = Some(PskSecret::new(psk)?);
161        Ok(self)
162    }
163
164    pub fn psk(&self) -> Option<&str> {
165        self.psk.as_ref().map(PskSecret::expose_secret)
166    }
167}
168
169#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
170pub struct AppConfig {
171    pub alias: String,
172    pub listen_port: u16,
173    pub quic_port: u16,
174    pub download_dir: String,
175    pub auto_accept_known: bool,
176    pub auto_accept_unknown: bool,
177    pub discovery: Vec<String>,
178    pub max_concurrent_files: usize,
179    pub trust: TrustConfig,
180}
181
182impl Default for AppConfig {
183    fn default() -> Self {
184        Self {
185            alias: default_alias(),
186            listen_port: 53317,
187            quic_port: 53318,
188            download_dir: "~/Downloads/ferry".to_string(),
189            auto_accept_known: true,
190            auto_accept_unknown: false,
191            discovery: vec!["native".to_string()],
192            max_concurrent_files: 8,
193            trust: TrustConfig::default(),
194        }
195    }
196}
197
198impl AppConfig {
199    pub fn load_or_default() -> Result<Self> {
200        Self::load_or_default_from(config_dir()?.join("config.toml"))
201    }
202
203    pub fn load_or_default_from(path: impl AsRef<Path>) -> Result<Self> {
204        let path = path.as_ref();
205        match std::fs::read_to_string(path) {
206            Ok(contents) => {
207                toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
208            }
209            Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Self::default()),
210            Err(source) => Err(Error::IoPath {
211                path: path.to_path_buf(),
212                source,
213            }),
214        }
215    }
216
217    pub fn save_default_path(&self) -> Result<()> {
218        self.save_to_path(config_dir()?.join("config.toml"))
219    }
220
221    pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
222        write_toml_file(path.as_ref(), self)
223    }
224
225    pub fn to_toml_string(&self) -> Result<String> {
226        toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
227    }
228
229    pub fn redacted(&self) -> Self {
230        let mut config = self.clone();
231        config.trust = config.trust.redacted();
232        config
233    }
234
235    pub fn to_redacted_toml_string(&self) -> Result<String> {
236        self.redacted().to_toml_string()
237    }
238}
239
240#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
241pub struct DaemonConfig {
242    pub listen: SocketAddr,
243    pub destination: PathBuf,
244}
245
246impl DaemonConfig {
247    pub fn from_app_config(config: &AppConfig) -> Result<Self> {
248        Ok(Self {
249            listen: SocketAddr::from(([0, 0, 0, 0], config.quic_port)),
250            destination: expand_home_path(&config.download_dir)?,
251        })
252    }
253
254    pub fn load_or_default() -> Result<Self> {
255        let app_config = AppConfig::load_or_default()?;
256        Self::load_or_default_from(config_dir()?.join("daemon.toml"), &app_config)
257    }
258
259    pub fn load_or_default_from(path: impl AsRef<Path>, app_config: &AppConfig) -> Result<Self> {
260        let path = path.as_ref();
261        match std::fs::read_to_string(path) {
262            Ok(contents) => {
263                toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
264            }
265            Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
266                Self::from_app_config(app_config)
267            }
268            Err(source) => Err(Error::IoPath {
269                path: path.to_path_buf(),
270                source,
271            }),
272        }
273    }
274
275    pub fn save_default_path(&self) -> Result<()> {
276        self.save_to_path(config_dir()?.join("daemon.toml"))
277    }
278
279    pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
280        write_toml_file(path.as_ref(), self)
281    }
282
283    pub fn to_toml_string(&self) -> Result<String> {
284        toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
285    }
286}
287
288#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
289pub struct TrustConfig {
290    pub require_fingerprint: bool,
291    pub psk: String,
292}
293
294impl Default for TrustConfig {
295    fn default() -> Self {
296        Self {
297            require_fingerprint: true,
298            psk: String::new(),
299        }
300    }
301}
302
303impl TrustConfig {
304    pub fn redacted(&self) -> Self {
305        let psk = if self.psk.is_empty() {
306            String::new()
307        } else {
308            "<redacted>".to_string()
309        };
310        Self {
311            require_fingerprint: self.require_fingerprint,
312            psk,
313        }
314    }
315}
316
317#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
318pub struct NativeIdentity {
319    cert_der: Vec<u8>,
320    key_der: Vec<u8>,
321    fingerprint: String,
322}
323
324impl NativeIdentity {
325    pub fn load_or_generate() -> Result<Self> {
326        Self::load_or_generate_in(config_dir()?)
327    }
328
329    pub fn load_or_generate_in(config_dir: impl AsRef<Path>) -> Result<Self> {
330        let config_dir = config_dir.as_ref();
331        let cert_path = config_dir.join("identity.cert.der");
332        let key_path = config_dir.join("identity.key.der");
333
334        if cert_path.exists() && key_path.exists() {
335            let cert_der = std::fs::read(&cert_path).map_err(|source| Error::IoPath {
336                path: cert_path,
337                source,
338            })?;
339            let key_der = std::fs::read(&key_path).map_err(|source| Error::IoPath {
340                path: key_path,
341                source,
342            })?;
343
344            return Ok(Self::from_parts(cert_der, key_der));
345        }
346
347        std::fs::create_dir_all(config_dir).map_err(|source| Error::IoPath {
348            path: config_dir.to_path_buf(),
349            source,
350        })?;
351
352        let identity = Self::generate()?;
353        write_private_file(&cert_path, &identity.cert_der)?;
354        write_private_file(&key_path, &identity.key_der)?;
355        Ok(identity)
356    }
357
358    pub fn generate() -> Result<Self> {
359        let CertifiedKey { cert, signing_key } =
360            generate_simple_self_signed(vec!["localhost".to_string()])?;
361        Ok(Self::from_parts(
362            cert.der().to_vec(),
363            signing_key.serialize_der(),
364        ))
365    }
366
367    pub fn fingerprint(&self) -> &str {
368        &self.fingerprint
369    }
370
371    fn cert_chain(&self) -> Vec<CertificateDer<'static>> {
372        vec![CertificateDer::from(self.cert_der.clone())]
373    }
374
375    fn private_key(&self) -> PrivateKeyDer<'static> {
376        PrivateKeyDer::from(PrivatePkcs8KeyDer::from(self.key_der.clone()))
377    }
378
379    fn from_parts(cert_der: Vec<u8>, key_der: Vec<u8>) -> Self {
380        let fingerprint = blake3::hash(&cert_der).to_hex().to_string();
381        Self {
382            cert_der,
383            key_der,
384            fingerprint,
385        }
386    }
387}
388
389#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
390#[serde(rename_all = "snake_case")]
391pub enum TransferDirection {
392    Send,
393    Receive,
394}
395
396#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
397pub struct TransferProgress {
398    pub direction: TransferDirection,
399    pub file_name: String,
400    pub bytes_done: u64,
401    pub bytes_total: u64,
402}
403
404#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
405#[serde(tag = "event", rename_all = "snake_case")]
406pub enum TransferEvent {
407    SessionStarted {
408        direction: TransferDirection,
409        session_id: Uuid,
410        files_total: usize,
411        bytes_total: u64,
412    },
413    FileStarted {
414        direction: TransferDirection,
415        file_name: String,
416        bytes_total: u64,
417        resume_offset: u64,
418    },
419    Progress {
420        direction: TransferDirection,
421        file_name: String,
422        bytes_done: u64,
423        bytes_total: u64,
424    },
425    FileFinished {
426        direction: TransferDirection,
427        file_name: String,
428        bytes: u64,
429        blake3: String,
430        status: TransferFileStatus,
431    },
432    SessionFinished {
433        direction: TransferDirection,
434        files_total: usize,
435        bytes_total: u64,
436        blake3: String,
437    },
438    SessionCancelled {
439        direction: TransferDirection,
440        session_id: Uuid,
441    },
442}
443
444impl TransferEvent {
445    pub fn progress(&self) -> Option<TransferProgress> {
446        match self {
447            Self::Progress {
448                direction,
449                file_name,
450                bytes_done,
451                bytes_total,
452            } => Some(TransferProgress {
453                direction: *direction,
454                file_name: file_name.clone(),
455                bytes_done: *bytes_done,
456                bytes_total: *bytes_total,
457            }),
458            _ => None,
459        }
460    }
461}
462
463#[derive(Debug, Clone, Default)]
464pub struct TransferControl {
465    cancelled: Arc<AtomicBool>,
466}
467
468impl TransferControl {
469    pub fn new() -> Self {
470        Self::default()
471    }
472
473    pub fn cancel(&self) {
474        self.cancelled.store(true, Ordering::SeqCst);
475    }
476
477    pub fn is_cancelled(&self) -> bool {
478        self.cancelled.load(Ordering::SeqCst)
479    }
480
481    fn check_cancelled(&self) -> Result<()> {
482        if self.is_cancelled() {
483            Err(Error::TransferCancelled)
484        } else {
485            Ok(())
486        }
487    }
488}
489
490#[derive(Debug, Clone, PartialEq, Eq)]
491pub struct TransferSummary {
492    pub path: PathBuf,
493    pub file_name: String,
494    pub bytes: u64,
495    pub blake3: String,
496    pub files: Vec<TransferFileSummary>,
497}
498
499#[derive(Debug, Clone, PartialEq, Eq)]
500pub struct TransferFileSummary {
501    pub path: PathBuf,
502    pub relative_path: PathBuf,
503    pub bytes: u64,
504    pub blake3: String,
505    pub status: TransferFileStatus,
506}
507
508#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
509#[serde(rename_all = "snake_case")]
510pub enum TransferFileStatus {
511    Sent,
512    Received,
513    Skipped,
514    Resumed,
515}
516
517#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
518pub struct TransferManifest {
519    pub version: u16,
520    pub session_id: Uuid,
521    pub chunk_size: u64,
522    pub entries: Vec<ManifestEntry>,
523}
524
525impl TransferManifest {
526    pub fn file_entries(&self) -> impl Iterator<Item = &ManifestEntry> {
527        self.entries
528            .iter()
529            .filter(|entry| entry.kind == ManifestEntryKind::File)
530    }
531
532    pub fn total_file_bytes(&self) -> u64 {
533        self.file_entries().map(|entry| entry.size).sum()
534    }
535
536    pub fn digest(&self) -> Result<String> {
537        let bytes = serde_json::to_vec(self).map_err(|error| Error::Protocol(error.to_string()))?;
538        Ok(blake3::hash(&bytes).to_hex().to_string())
539    }
540}
541
542#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
543pub struct ManifestEntry {
544    pub relative_path: PathBuf,
545    pub kind: ManifestEntryKind,
546    pub size: u64,
547    pub blake3: Option<String>,
548    pub chunks: Vec<String>,
549    pub unix_mode: Option<u32>,
550    pub modified_unix_ms: Option<i128>,
551}
552
553#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
554#[serde(rename_all = "snake_case")]
555pub enum ManifestEntryKind {
556    File,
557    Directory,
558}
559
560#[derive(Debug, Clone, PartialEq, Eq)]
561pub enum ResumeDecision {
562    Create,
563    Skip,
564    ResumeFrom(u64),
565    Conflict(String),
566}
567
568#[derive(Debug, Clone, PartialEq, Eq)]
569pub struct PeerObservation {
570    pub fingerprint: String,
571    pub alias: Option<String>,
572    pub hostname: Option<String>,
573    pub address: SocketAddr,
574    pub transports: BTreeSet<String>,
575    pub seen_unix_ms: i128,
576}
577
578impl PeerObservation {
579    pub fn new(fingerprint: impl Into<String>, address: SocketAddr, seen_unix_ms: i128) -> Self {
580        Self {
581            fingerprint: fingerprint.into(),
582            alias: None,
583            hostname: None,
584            address,
585            transports: BTreeSet::new(),
586            seen_unix_ms,
587        }
588    }
589
590    pub fn with_alias(mut self, alias: impl Into<String>) -> Self {
591        self.alias = Some(alias.into());
592        self
593    }
594
595    pub fn with_hostname(mut self, hostname: impl Into<String>) -> Self {
596        self.hostname = Some(hostname.into());
597        self
598    }
599
600    pub fn with_transport(mut self, transport: impl Into<String>) -> Self {
601        self.transports.insert(transport.into());
602        self
603    }
604}
605
606#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
607pub struct PeerRecord {
608    pub fingerprint: String,
609    pub aliases: BTreeSet<String>,
610    pub hostnames: BTreeSet<String>,
611    pub addresses: BTreeSet<SocketAddr>,
612    pub transports: BTreeSet<String>,
613    pub first_seen_unix_ms: i128,
614    pub last_seen_unix_ms: i128,
615}
616
617impl PeerRecord {
618    fn from_observation(observation: PeerObservation) -> Self {
619        let mut aliases = BTreeSet::new();
620        if let Some(alias) = observation.alias {
621            aliases.insert(alias);
622        }
623
624        let mut hostnames = BTreeSet::new();
625        if let Some(hostname) = observation.hostname {
626            hostnames.insert(hostname);
627        }
628
629        let mut addresses = BTreeSet::new();
630        addresses.insert(observation.address);
631
632        Self {
633            fingerprint: observation.fingerprint,
634            aliases,
635            hostnames,
636            addresses,
637            transports: observation.transports,
638            first_seen_unix_ms: observation.seen_unix_ms,
639            last_seen_unix_ms: observation.seen_unix_ms,
640        }
641    }
642
643    fn merge(&mut self, observation: PeerObservation) {
644        if let Some(alias) = observation.alias {
645            self.aliases.insert(alias);
646        }
647        if let Some(hostname) = observation.hostname {
648            self.hostnames.insert(hostname);
649        }
650        self.addresses.insert(observation.address);
651        self.transports.extend(observation.transports);
652        self.first_seen_unix_ms = self.first_seen_unix_ms.min(observation.seen_unix_ms);
653        self.last_seen_unix_ms = self.last_seen_unix_ms.max(observation.seen_unix_ms);
654    }
655
656    pub fn preferred_quic_address(&self) -> Option<SocketAddr> {
657        if !self.transports.contains("quic") {
658            return None;
659        }
660
661        self.addresses.iter().copied().next()
662    }
663}
664
665#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
666#[serde(rename_all = "snake_case")]
667pub enum TrustState {
668    Trusted,
669    Blocked,
670}
671
672#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
673pub struct KnownPeerEntry {
674    pub fingerprint: String,
675    #[serde(default)]
676    pub aliases: BTreeSet<String>,
677    #[serde(default)]
678    pub hostnames: BTreeSet<String>,
679    #[serde(default)]
680    pub addresses: BTreeSet<SocketAddr>,
681    #[serde(default)]
682    pub transports: BTreeSet<String>,
683    pub trust_state: TrustState,
684    pub first_seen_unix_ms: Option<i128>,
685    pub last_seen_unix_ms: Option<i128>,
686}
687
688impl KnownPeerEntry {
689    pub fn trusted(fingerprint: impl Into<String>) -> Result<Self> {
690        let fingerprint = normalized_fingerprint(fingerprint.into())?;
691        Ok(Self {
692            fingerprint,
693            aliases: BTreeSet::new(),
694            hostnames: BTreeSet::new(),
695            addresses: BTreeSet::new(),
696            transports: BTreeSet::new(),
697            trust_state: TrustState::Trusted,
698            first_seen_unix_ms: None,
699            last_seen_unix_ms: None,
700        })
701    }
702
703    pub fn from_record(record: &PeerRecord, trust_state: TrustState) -> Result<Self> {
704        let fingerprint = normalized_fingerprint(record.fingerprint.clone())?;
705        Ok(Self {
706            fingerprint,
707            aliases: record.aliases.clone(),
708            hostnames: record.hostnames.clone(),
709            addresses: record.addresses.clone(),
710            transports: record.transports.clone(),
711            trust_state,
712            first_seen_unix_ms: Some(record.first_seen_unix_ms),
713            last_seen_unix_ms: Some(record.last_seen_unix_ms),
714        })
715    }
716}
717
718#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
719pub struct TrustStore {
720    #[serde(default)]
721    pub peers: BTreeMap<String, KnownPeerEntry>,
722}
723
724impl TrustStore {
725    pub fn load_or_default() -> Result<Self> {
726        Self::load_or_default_from(config_dir()?.join("known_peers.toml"))
727    }
728
729    pub fn load_or_default_from(path: impl AsRef<Path>) -> Result<Self> {
730        let path = path.as_ref();
731        match std::fs::read_to_string(path) {
732            Ok(contents) => {
733                toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
734            }
735            Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Self::default()),
736            Err(source) => Err(Error::IoPath {
737                path: path.to_path_buf(),
738                source,
739            }),
740        }
741    }
742
743    pub fn save_default_path(&self) -> Result<()> {
744        self.save_to_path(config_dir()?.join("known_peers.toml"))
745    }
746
747    pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
748        write_toml_file(path.as_ref(), self)
749    }
750
751    pub fn trust_fingerprint(&mut self, fingerprint: impl Into<String>) -> Result<&KnownPeerEntry> {
752        let entry = KnownPeerEntry::trusted(fingerprint)?;
753        let fingerprint = entry.fingerprint.clone();
754        self.peers
755            .entry(fingerprint.clone())
756            .and_modify(|existing| existing.trust_state = TrustState::Trusted)
757            .or_insert(entry);
758        Ok(self
759            .peers
760            .get(&fingerprint)
761            .expect("trusted peer entry should exist"))
762    }
763
764    pub fn trust_record(&mut self, record: &PeerRecord) -> Result<&KnownPeerEntry> {
765        let entry = KnownPeerEntry::from_record(record, TrustState::Trusted)?;
766        let fingerprint = entry.fingerprint.clone();
767        self.peers
768            .entry(fingerprint.clone())
769            .and_modify(|existing| {
770                existing.aliases.extend(record.aliases.clone());
771                existing.hostnames.extend(record.hostnames.clone());
772                existing.addresses.extend(record.addresses.clone());
773                existing.transports.extend(record.transports.clone());
774                existing.first_seen_unix_ms = Some(
775                    existing
776                        .first_seen_unix_ms
777                        .unwrap_or(record.first_seen_unix_ms)
778                        .min(record.first_seen_unix_ms),
779                );
780                existing.last_seen_unix_ms = Some(
781                    existing
782                        .last_seen_unix_ms
783                        .unwrap_or(record.last_seen_unix_ms)
784                        .max(record.last_seen_unix_ms),
785                );
786                existing.trust_state = TrustState::Trusted;
787            })
788            .or_insert(entry);
789        Ok(self
790            .peers
791            .get(&fingerprint)
792            .expect("trusted peer entry should exist"))
793    }
794
795    pub fn trust_direct_address(
796        &mut self,
797        fingerprint: impl Into<String>,
798        address: SocketAddr,
799    ) -> Result<&KnownPeerEntry> {
800        let mut entry = KnownPeerEntry::trusted(fingerprint)?;
801        let seen = system_time_unix_ms(SystemTime::now()).unwrap_or_default();
802        entry.addresses.insert(address);
803        entry.transports.insert("quic".to_string());
804        entry.first_seen_unix_ms = Some(seen);
805        entry.last_seen_unix_ms = Some(seen);
806
807        let fingerprint = entry.fingerprint.clone();
808        self.peers
809            .entry(fingerprint.clone())
810            .and_modify(|existing| {
811                existing.addresses.insert(address);
812                existing.transports.insert("quic".to_string());
813                existing.first_seen_unix_ms =
814                    Some(existing.first_seen_unix_ms.unwrap_or(seen).min(seen));
815                existing.last_seen_unix_ms =
816                    Some(existing.last_seen_unix_ms.unwrap_or(seen).max(seen));
817                existing.trust_state = TrustState::Trusted;
818            })
819            .or_insert(entry);
820        Ok(self
821            .peers
822            .get(&fingerprint)
823            .expect("trusted peer entry should exist"))
824    }
825
826    pub fn is_trusted_fingerprint(&self, fingerprint: &str) -> bool {
827        self.peers
828            .get(fingerprint)
829            .is_some_and(|entry| entry.trust_state == TrustState::Trusted)
830    }
831
832    pub fn trusted_fingerprint_for_address(&self, address: SocketAddr) -> Option<&str> {
833        self.peers
834            .values()
835            .find(|entry| {
836                entry.trust_state == TrustState::Trusted && entry.addresses.contains(&address)
837            })
838            .map(|entry| entry.fingerprint.as_str())
839    }
840
841    pub fn forget(&mut self, fingerprint: &str) -> bool {
842        self.peers.remove(fingerprint).is_some()
843    }
844
845    pub fn records(&self) -> impl Iterator<Item = &KnownPeerEntry> {
846        self.peers.values()
847    }
848
849    pub fn to_toml_string(&self) -> Result<String> {
850        toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
851    }
852}
853
854#[derive(Debug, Default, Clone, PartialEq, Eq)]
855pub struct PeerRegistry {
856    peers: BTreeMap<String, PeerRecord>,
857}
858
859#[derive(Debug, Clone, PartialEq, Eq)]
860pub enum PeerLookup<'a> {
861    Found(&'a PeerRecord),
862    Ambiguous(Vec<&'a PeerRecord>),
863    Missing,
864}
865
866impl PeerRegistry {
867    pub fn new() -> Self {
868        Self::default()
869    }
870
871    pub fn observe(&mut self, observation: PeerObservation) -> Result<&PeerRecord> {
872        if observation.fingerprint.trim().is_empty() {
873            return Err(Error::InvalidInput(
874                "peer fingerprint must not be empty".to_string(),
875            ));
876        }
877
878        let fingerprint = observation.fingerprint.clone();
879        if let Some(record) = self.peers.get_mut(&fingerprint) {
880            record.merge(observation);
881        } else {
882            self.peers.insert(
883                fingerprint.clone(),
884                PeerRecord::from_observation(observation),
885            );
886        }
887
888        Ok(self
889            .peers
890            .get(&fingerprint)
891            .expect("observed peer record should exist"))
892    }
893
894    pub fn get_by_fingerprint(&self, fingerprint: &str) -> Option<&PeerRecord> {
895        self.peers.get(fingerprint)
896    }
897
898    pub fn lookup(&self, query: &str) -> Option<&PeerRecord> {
899        match self.lookup_detail(query) {
900            PeerLookup::Found(record) => Some(record),
901            PeerLookup::Ambiguous(_) | PeerLookup::Missing => None,
902        }
903    }
904
905    pub fn lookup_detail(&self, query: &str) -> PeerLookup<'_> {
906        if let Some(record) = self.peers.get(query) {
907            return PeerLookup::Found(record);
908        }
909
910        let matches = self
911            .peers
912            .values()
913            .filter(|record| {
914                record.fingerprint.starts_with(query)
915                    || record.aliases.iter().any(|alias| alias == query)
916                    || record.hostnames.iter().any(|hostname| hostname == query)
917            })
918            .collect::<Vec<_>>();
919
920        match matches.as_slice() {
921            [] => PeerLookup::Missing,
922            [record] => PeerLookup::Found(record),
923            _ => PeerLookup::Ambiguous(matches),
924        }
925    }
926
927    pub fn records(&self) -> impl Iterator<Item = &PeerRecord> {
928        self.peers.values()
929    }
930}
931
932#[derive(Debug, Clone, PartialEq, Eq)]
933pub struct NativeAnnouncement {
934    pub alias: String,
935    pub fingerprint: String,
936    pub quic_port: u16,
937    pub listen_port: Option<u16>,
938}
939
940impl NativeAnnouncement {
941    pub fn from_config(config: &AppConfig, identity: &NativeIdentity) -> Self {
942        Self {
943            alias: config.alias.clone(),
944            fingerprint: identity.fingerprint().to_string(),
945            quic_port: config.quic_port,
946            listen_port: Some(config.listen_port),
947        }
948    }
949
950    fn service_info(&self) -> Result<ServiceInfo> {
951        let alias = sanitize_dns_label(&self.alias);
952        let fingerprint_prefix = self
953            .fingerprint
954            .get(..12)
955            .unwrap_or(self.fingerprint.as_str());
956        let instance = format!("{alias}-{fingerprint_prefix}");
957        let hostname = format!("{alias}.local.");
958        let properties = [
959            ("pv", NATIVE_PROTOCOL_VERSION.to_string()),
960            ("minpv", MIN_NATIVE_PROTOCOL_VERSION.to_string()),
961            ("fp", self.fingerprint.clone()),
962            ("alias", self.alias.clone()),
963            ("transports", "quic".to_string()),
964            ("quic_port", self.quic_port.to_string()),
965            (
966                "listen_port",
967                self.listen_port
968                    .map(|port| port.to_string())
969                    .unwrap_or_default(),
970            ),
971        ];
972
973        ServiceInfo::new(
974            NATIVE_SERVICE_TYPE,
975            &instance,
976            &hostname,
977            "",
978            self.quic_port,
979            &properties[..],
980        )
981        .map(ServiceInfo::enable_addr_auto)
982        .map_err(|error| Error::Discovery(error.to_string()))
983    }
984}
985
986pub struct NativeAnnouncer {
987    daemon: ServiceDaemon,
988    fullname: String,
989}
990
991impl NativeAnnouncer {
992    pub fn start(announcement: &NativeAnnouncement) -> Result<Self> {
993        let daemon = ServiceDaemon::new().map_err(|error| Error::Discovery(error.to_string()))?;
994        let service = announcement.service_info()?;
995        let fullname = service.get_fullname().to_string();
996        daemon
997            .register(service)
998            .map_err(|error| Error::Discovery(error.to_string()))?;
999        Ok(Self { daemon, fullname })
1000    }
1001}
1002
1003impl Drop for NativeAnnouncer {
1004    fn drop(&mut self) {
1005        let _ = self.daemon.unregister(&self.fullname);
1006        let _ = self.daemon.shutdown();
1007    }
1008}
1009
1010pub async fn discover_native_peers(timeout: Duration) -> Result<PeerRegistry> {
1011    let daemon = ServiceDaemon::new().map_err(|error| Error::Discovery(error.to_string()))?;
1012    let receiver = daemon
1013        .browse(NATIVE_SERVICE_TYPE)
1014        .map_err(|error| Error::Discovery(error.to_string()))?;
1015    let deadline = tokio::time::Instant::now() + timeout;
1016    let mut registry = PeerRegistry::new();
1017
1018    loop {
1019        let now = tokio::time::Instant::now();
1020        if now >= deadline {
1021            break;
1022        }
1023
1024        let wait = deadline - now;
1025        match tokio::time::timeout(wait, receiver.recv_async()).await {
1026            Ok(Ok(ServiceEvent::ServiceResolved(service))) => {
1027                observe_resolved_service(&mut registry, &service)?;
1028            }
1029            Ok(Ok(_)) => {}
1030            Ok(Err(error)) => {
1031                return Err(Error::Discovery(error.to_string()));
1032            }
1033            Err(_) => break,
1034        }
1035    }
1036
1037    daemon
1038        .stop_browse(NATIVE_SERVICE_TYPE)
1039        .map_err(|error| Error::Discovery(error.to_string()))?;
1040    let _ = daemon.shutdown();
1041    Ok(registry)
1042}
1043
1044fn observe_resolved_service(registry: &mut PeerRegistry, service: &ResolvedService) -> Result<()> {
1045    let Some(fingerprint) = service.get_property_val_str("fp") else {
1046        return Ok(());
1047    };
1048    let fingerprint = match normalized_fingerprint(fingerprint.to_string()) {
1049        Ok(fingerprint) => fingerprint,
1050        Err(_) => return Ok(()),
1051    };
1052    let Some(highest_version) = parse_txt_u16(service, "pv") else {
1053        return Ok(());
1054    };
1055    let Some(minimum_version) = parse_txt_u16(service, "minpv") else {
1056        return Ok(());
1057    };
1058    if minimum_version > highest_version
1059        || minimum_version > NATIVE_PROTOCOL_VERSION
1060        || highest_version < MIN_NATIVE_PROTOCOL_VERSION
1061    {
1062        return Ok(());
1063    }
1064
1065    let Some(quic_port) = parse_txt_u16(service, "quic_port") else {
1066        return Ok(());
1067    };
1068    let transports = service
1069        .get_property_val_str("transports")
1070        .unwrap_or_default()
1071        .split(',')
1072        .map(str::trim)
1073        .filter(|value| !value.is_empty())
1074        .map(ToOwned::to_owned)
1075        .collect::<Vec<_>>();
1076    if !transports.iter().any(|transport| transport == "quic") {
1077        return Ok(());
1078    }
1079
1080    let alias = service.get_property_val_str("alias").map(ToOwned::to_owned);
1081    let hostname = Some(service.get_hostname().trim_end_matches('.').to_string());
1082    let seen_unix_ms = system_time_unix_ms(SystemTime::now()).unwrap_or_default();
1083
1084    for address in service.get_addresses_v4() {
1085        let mut observation = PeerObservation::new(
1086            fingerprint.clone(),
1087            SocketAddr::from((address, quic_port)),
1088            seen_unix_ms,
1089        );
1090        if let Some(alias) = &alias {
1091            observation = observation.with_alias(alias.clone());
1092        }
1093        if let Some(hostname) = &hostname {
1094            observation = observation.with_hostname(hostname.clone());
1095        }
1096        for transport in &transports {
1097            observation = observation.with_transport(transport.clone());
1098        }
1099        registry.observe(observation)?;
1100    }
1101
1102    Ok(())
1103}
1104
1105fn parse_txt_u16(service: &ResolvedService, key: &str) -> Option<u16> {
1106    service.get_property_val_str(key)?.parse().ok()
1107}
1108
1109fn sanitize_dns_label(value: &str) -> String {
1110    let mut label = value
1111        .chars()
1112        .map(|ch| {
1113            if ch.is_ascii_alphanumeric() || ch == '-' {
1114                ch.to_ascii_lowercase()
1115            } else {
1116                '-'
1117            }
1118        })
1119        .collect::<String>();
1120    while label.contains("--") {
1121        label = label.replace("--", "-");
1122    }
1123    let label = label.trim_matches('-').to_string();
1124    if label.is_empty() {
1125        "fileferry-device".to_string()
1126    } else {
1127        label
1128    }
1129}
1130
1131#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1132struct DirectReceivePlan {
1133    files: Vec<DirectFilePlan>,
1134}
1135
1136#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1137struct DirectProtocolHello {
1138    protocol_version: u16,
1139    min_protocol_version: u16,
1140    client_fingerprint: String,
1141    #[serde(default)]
1142    psk: bool,
1143}
1144
1145impl DirectProtocolHello {
1146    fn new(identity: &NativeIdentity, psk: Option<&str>) -> Self {
1147        Self {
1148            protocol_version: NATIVE_PROTOCOL_VERSION,
1149            min_protocol_version: MIN_NATIVE_PROTOCOL_VERSION,
1150            client_fingerprint: identity.fingerprint().to_string(),
1151            psk: psk.is_some(),
1152        }
1153    }
1154}
1155
1156#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1157struct DirectProtocolResponse {
1158    accepted: bool,
1159    protocol_version: Option<u16>,
1160    #[serde(default)]
1161    psk_challenge: Option<String>,
1162    error: Option<String>,
1163}
1164
1165impl DirectProtocolResponse {
1166    fn accept(protocol_version: u16) -> Self {
1167        Self {
1168            accepted: true,
1169            protocol_version: Some(protocol_version),
1170            psk_challenge: None,
1171            error: None,
1172        }
1173    }
1174
1175    fn accept_with_psk(protocol_version: u16, challenge: impl Into<String>) -> Self {
1176        Self {
1177            accepted: true,
1178            protocol_version: Some(protocol_version),
1179            psk_challenge: Some(challenge.into()),
1180            error: None,
1181        }
1182    }
1183
1184    fn reject(error: impl Into<String>) -> Self {
1185        Self {
1186            accepted: false,
1187            protocol_version: None,
1188            psk_challenge: None,
1189            error: Some(error.into()),
1190        }
1191    }
1192}
1193
1194#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1195struct DirectPskProof {
1196    proof: String,
1197}
1198
1199#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1200struct DirectPskResult {
1201    accepted: bool,
1202    error: Option<String>,
1203}
1204
1205impl DirectPskResult {
1206    fn accept() -> Self {
1207        Self {
1208            accepted: true,
1209            error: None,
1210        }
1211    }
1212
1213    fn reject(error: impl Into<String>) -> Self {
1214        Self {
1215            accepted: false,
1216            error: Some(error.into()),
1217        }
1218    }
1219}
1220
1221#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1222struct DirectFilePlan {
1223    relative_path: PathBuf,
1224    action: DirectFileAction,
1225    offset: u64,
1226}
1227
1228#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1229#[serde(rename_all = "snake_case")]
1230enum DirectFileAction {
1231    Receive,
1232    Skip,
1233}
1234
1235#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1236struct DirectPayloadHeader {
1237    relative_path: PathBuf,
1238    offset: u64,
1239    bytes: u64,
1240}
1241
1242#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1243struct DirectAck {
1244    ok: bool,
1245    error: Option<String>,
1246}
1247
1248pub fn validate_send_paths(paths: &[PathBuf]) -> Result<()> {
1249    if paths.is_empty() {
1250        return Err(Error::InvalidInput(
1251            "at least one file path is required".to_string(),
1252        ));
1253    }
1254
1255    for path in paths {
1256        if path.as_os_str().is_empty() {
1257            return Err(Error::InvalidInput("empty file path".to_string()));
1258        }
1259    }
1260
1261    Ok(())
1262}
1263
1264pub fn display_path(path: &Path) -> String {
1265    path.to_string_lossy().into_owned()
1266}
1267
1268pub async fn build_manifest(paths: &[PathBuf]) -> Result<TransferManifest> {
1269    validate_send_paths(paths)?;
1270
1271    let sources = collect_manifest_sources(paths)?;
1272    let mut seen = BTreeSet::new();
1273    for source in &sources {
1274        if !seen.insert(source.relative_path.clone()) {
1275            return Err(Error::InvalidInput(format!(
1276                "duplicate transfer path: {}",
1277                display_path(&source.relative_path)
1278            )));
1279        }
1280    }
1281
1282    Ok(TransferManifest {
1283        version: MANIFEST_VERSION,
1284        session_id: Uuid::new_v4(),
1285        chunk_size: DEFAULT_CHUNK_SIZE,
1286        entries: build_manifest_entries(sources).await?,
1287    })
1288}
1289
1290async fn build_manifest_entries(sources: Vec<ManifestSource>) -> Result<Vec<ManifestEntry>> {
1291    if sources.len() <= 1 {
1292        let mut entries = Vec::with_capacity(sources.len());
1293        for source in sources {
1294            entries.push(build_manifest_entry(source).await?);
1295        }
1296        return Ok(entries);
1297    }
1298
1299    let len = sources.len();
1300    let concurrency = std::thread::available_parallelism()
1301        .map(|parallelism| parallelism.get())
1302        .unwrap_or(1)
1303        .clamp(1, DEFAULT_MANIFEST_CONCURRENCY)
1304        .min(len);
1305    let mut sources = sources.into_iter().enumerate();
1306    let mut tasks = tokio::task::JoinSet::new();
1307    let mut entries = vec![None; len];
1308
1309    for _ in 0..concurrency {
1310        if let Some((index, source)) = sources.next() {
1311            tasks.spawn(async move {
1312                build_manifest_entry(source)
1313                    .await
1314                    .map(|entry| (index, entry))
1315            });
1316        }
1317    }
1318
1319    while let Some(result) = tasks.join_next().await {
1320        let (index, entry) =
1321            result.map_err(|error| Error::Protocol(format!("manifest task failed: {error}")))??;
1322        entries[index] = Some(entry);
1323
1324        if let Some((index, source)) = sources.next() {
1325            tasks.spawn(async move {
1326                build_manifest_entry(source)
1327                    .await
1328                    .map(|entry| (index, entry))
1329            });
1330        }
1331    }
1332
1333    Ok(entries
1334        .into_iter()
1335        .map(|entry| entry.expect("manifest task should fill every entry"))
1336        .collect())
1337}
1338
1339pub fn safe_destination_path(destination: &Path, relative_path: &Path) -> Result<PathBuf> {
1340    validate_relative_transfer_path(relative_path)?;
1341    Ok(destination.join(relative_path))
1342}
1343
1344pub async fn decide_resume(
1345    destination: &Path,
1346    entry: &ManifestEntry,
1347    chunk_size: u64,
1348) -> Result<ResumeDecision> {
1349    if entry.kind != ManifestEntryKind::File {
1350        return Ok(ResumeDecision::Create);
1351    }
1352
1353    let path = safe_destination_path(destination, &entry.relative_path)?;
1354    let metadata = match tokio::fs::metadata(&path).await {
1355        Ok(metadata) => metadata,
1356        Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
1357            return Ok(ResumeDecision::Create);
1358        }
1359        Err(source) => {
1360            return Err(Error::IoPath { path, source });
1361        }
1362    };
1363
1364    if !metadata.is_file() {
1365        return Ok(ResumeDecision::Conflict(format!(
1366            "{} already exists and is not a regular file",
1367            display_path(&path)
1368        )));
1369    }
1370
1371    if metadata.len() == entry.size {
1372        let expected = entry
1373            .blake3
1374            .as_deref()
1375            .ok_or_else(|| Error::Protocol("file manifest entry is missing BLAKE3".to_string()))?;
1376        let actual = hash_file(&path).await?;
1377        return if actual == expected {
1378            Ok(ResumeDecision::Skip)
1379        } else {
1380            Ok(ResumeDecision::Conflict(format!(
1381                "{} already exists with different contents",
1382                display_path(&path)
1383            )))
1384        };
1385    }
1386
1387    if metadata.len() > entry.size {
1388        return Ok(ResumeDecision::Conflict(format!(
1389            "{} is larger than incoming file",
1390            display_path(&path)
1391        )));
1392    }
1393
1394    let verified_offset = verified_prefix_offset(&path, entry, metadata.len(), chunk_size).await?;
1395    if verified_offset > 0 {
1396        Ok(ResumeDecision::ResumeFrom(verified_offset))
1397    } else {
1398        Ok(ResumeDecision::Conflict(format!(
1399            "{} already exists and does not match a verified prefix",
1400            display_path(&path)
1401        )))
1402    }
1403}
1404
1405pub async fn send_direct_file(
1406    request: &SendRequest,
1407    identity: &NativeIdentity,
1408    mut events: impl FnMut(TransferEvent),
1409) -> Result<TransferSummary> {
1410    send_direct_file_with_control(request, identity, TransferControl::new(), &mut events).await
1411}
1412
1413pub async fn send_direct_file_with_control(
1414    request: &SendRequest,
1415    identity: &NativeIdentity,
1416    control: TransferControl,
1417    mut events: impl FnMut(TransferEvent),
1418) -> Result<TransferSummary> {
1419    send_direct_file_with_control_and_trust(request, identity, control, &mut events, |_| Ok(()))
1420        .await
1421}
1422
1423pub async fn send_direct_file_with_control_and_trust(
1424    request: &SendRequest,
1425    identity: &NativeIdentity,
1426    control: TransferControl,
1427    mut events: impl FnMut(TransferEvent),
1428    mut confirm_unpinned_receiver: impl FnMut(&str) -> Result<()>,
1429) -> Result<TransferSummary> {
1430    ensure_rustls_provider();
1431
1432    control.check_cancelled()?;
1433    let manifest = build_manifest(request.paths()).await?;
1434    let sources = manifest_source_map(request.paths())?;
1435    events(TransferEvent::SessionStarted {
1436        direction: TransferDirection::Send,
1437        session_id: manifest.session_id,
1438        files_total: manifest.file_entries().count(),
1439        bytes_total: manifest.total_file_bytes(),
1440    });
1441    control.check_cancelled()?;
1442
1443    let (client_config, server_fingerprint) = client_config_capturing_server_fingerprint()?;
1444    let mut endpoint = quinn::Endpoint::client(SocketAddr::from(([0, 0, 0, 0], 0)))?;
1445    endpoint.set_default_client_config(client_config);
1446    let connection = endpoint
1447        .connect(request.peer().address(), "localhost")?
1448        .await?;
1449    let actual = captured_server_fingerprint(&server_fingerprint)?;
1450    match request.peer().expected_fingerprint() {
1451        Some(expected) if actual != expected => {
1452            connection.close(1u32.into(), b"fingerprint mismatch");
1453            return Err(Error::PeerFingerprintMismatch {
1454                expected: expected.to_string(),
1455                actual,
1456            });
1457        }
1458        Some(_) => {}
1459        None => {
1460            if let Err(error) = confirm_unpinned_receiver(&actual) {
1461                connection.close(1u32.into(), b"receiver fingerprint not trusted");
1462                return Err(error);
1463            }
1464        }
1465    }
1466    let (mut send, mut recv) = connection.open_bi().await?;
1467
1468    send.write_all(DIRECT_MAGIC).await?;
1469    write_json_frame(
1470        &mut send,
1471        &DirectProtocolHello::new(identity, request.psk()),
1472    )
1473    .await?;
1474    let protocol: DirectProtocolResponse = read_json_frame(&mut recv).await?;
1475    if !protocol.accepted {
1476        return Err(Error::Protocol(protocol.error.unwrap_or_else(|| {
1477            "receiver rejected protocol negotiation".to_string()
1478        })));
1479    }
1480    if protocol.protocol_version != Some(NATIVE_PROTOCOL_VERSION) {
1481        return Err(Error::Protocol(format!(
1482            "receiver selected unsupported protocol version {:?}",
1483            protocol.protocol_version
1484        )));
1485    }
1486    match (request.psk(), protocol.psk_challenge.as_deref()) {
1487        (Some(psk), Some(challenge)) => {
1488            let proof = DirectPskProof {
1489                proof: psk_proof(
1490                    psk,
1491                    challenge,
1492                    identity.fingerprint(),
1493                    NATIVE_PROTOCOL_VERSION,
1494                )?,
1495            };
1496            write_json_frame(&mut send, &proof).await?;
1497            let auth: DirectPskResult = read_json_frame(&mut recv).await?;
1498            if !auth.accepted {
1499                return Err(Error::Authentication(
1500                    auth.error
1501                        .unwrap_or_else(|| "PSK authentication rejected".to_string()),
1502                ));
1503            }
1504        }
1505        (Some(_), None) => {
1506            return Err(Error::Authentication(
1507                "receiver is not configured for PSK mode".to_string(),
1508            ));
1509        }
1510        (None, Some(_)) => {
1511            return Err(Error::Authentication(
1512                "receiver requires PSK authentication".to_string(),
1513            ));
1514        }
1515        (None, None) => {}
1516    }
1517
1518    write_json_frame(&mut send, &manifest).await?;
1519
1520    let plan: DirectReceivePlan = read_json_frame(&mut recv).await?;
1521    let mut summaries = Vec::new();
1522    for file_plan in plan.files {
1523        let Some(entry) = manifest
1524            .file_entries()
1525            .find(|entry| entry.relative_path == file_plan.relative_path)
1526        else {
1527            return Err(Error::Protocol(format!(
1528                "receiver requested unknown file: {}",
1529                display_path(&file_plan.relative_path)
1530            )));
1531        };
1532
1533        let Some(source_path) = sources.get(&file_plan.relative_path) else {
1534            return Err(Error::Protocol(format!(
1535                "missing source for manifest file: {}",
1536                display_path(&file_plan.relative_path)
1537            )));
1538        };
1539
1540        if file_plan.action == DirectFileAction::Skip {
1541            let blake3 = entry.blake3.clone().unwrap_or_default();
1542            events(TransferEvent::FileFinished {
1543                direction: TransferDirection::Send,
1544                file_name: display_path(&entry.relative_path),
1545                bytes: entry.size,
1546                blake3: blake3.clone(),
1547                status: TransferFileStatus::Skipped,
1548            });
1549            summaries.push(TransferFileSummary {
1550                path: source_path.clone(),
1551                relative_path: entry.relative_path.clone(),
1552                bytes: entry.size,
1553                blake3,
1554                status: TransferFileStatus::Skipped,
1555            });
1556            continue;
1557        }
1558
1559        match send_payload_file(
1560            &mut send,
1561            source_path,
1562            entry,
1563            file_plan.offset,
1564            &control,
1565            &mut events,
1566        )
1567        .await
1568        {
1569            Ok(()) => {}
1570            Err(Error::TransferCancelled) => {
1571                events(TransferEvent::SessionCancelled {
1572                    direction: TransferDirection::Send,
1573                    session_id: manifest.session_id,
1574                });
1575                connection.close(1u32.into(), b"cancelled");
1576                return Err(Error::TransferCancelled);
1577            }
1578            Err(error) => return Err(error),
1579        }
1580        let status = if file_plan.offset > 0 {
1581            TransferFileStatus::Resumed
1582        } else {
1583            TransferFileStatus::Sent
1584        };
1585        let blake3 = entry.blake3.clone().unwrap_or_default();
1586        events(TransferEvent::FileFinished {
1587            direction: TransferDirection::Send,
1588            file_name: display_path(&entry.relative_path),
1589            bytes: entry.size,
1590            blake3: blake3.clone(),
1591            status,
1592        });
1593        summaries.push(TransferFileSummary {
1594            path: source_path.clone(),
1595            relative_path: entry.relative_path.clone(),
1596            bytes: entry.size,
1597            blake3,
1598            status,
1599        });
1600    }
1601    send.finish()?;
1602
1603    let ack: DirectAck = read_json_frame(&mut recv).await?;
1604    if !ack.ok {
1605        return Err(Error::Transfer(
1606            ack.error
1607                .unwrap_or_else(|| "receiver rejected transfer".to_string()),
1608        ));
1609    }
1610
1611    connection.close(0u32.into(), b"done");
1612    endpoint.wait_idle().await;
1613
1614    let summary = summary_from_files(&manifest, summaries)?;
1615    events(TransferEvent::SessionFinished {
1616        direction: TransferDirection::Send,
1617        files_total: summary.files.len(),
1618        bytes_total: summary.bytes,
1619        blake3: summary.blake3.clone(),
1620    });
1621    Ok(summary)
1622}
1623
1624pub async fn receive_direct_file(
1625    request: &ReceiveRequest,
1626    destination: impl AsRef<Path>,
1627    identity: &NativeIdentity,
1628    events: impl FnMut(TransferEvent),
1629) -> Result<TransferSummary> {
1630    receive_direct_file_with_control(
1631        request,
1632        destination,
1633        identity,
1634        TransferControl::new(),
1635        events,
1636    )
1637    .await
1638}
1639
1640pub async fn receive_direct_file_with_control(
1641    request: &ReceiveRequest,
1642    destination: impl AsRef<Path>,
1643    identity: &NativeIdentity,
1644    control: TransferControl,
1645    mut events: impl FnMut(TransferEvent),
1646) -> Result<TransferSummary> {
1647    ensure_rustls_provider();
1648
1649    control.check_cancelled()?;
1650    let destination = destination.as_ref();
1651    tokio::fs::create_dir_all(destination)
1652        .await
1653        .map_err(|source| Error::IoPath {
1654            path: destination.to_path_buf(),
1655            source,
1656        })?;
1657
1658    let server_config =
1659        quinn::ServerConfig::with_single_cert(identity.cert_chain(), identity.private_key())?;
1660    let endpoint = quinn::Endpoint::server(server_config, request.listen())?;
1661    let incoming = endpoint.accept().await.ok_or(Error::NoIncomingConnection)?;
1662    let connection = incoming.await?;
1663    let (mut send, mut recv) = connection.accept_bi().await?;
1664
1665    let result = receive_stream_file(
1666        destination,
1667        request.psk(),
1668        &mut recv,
1669        &mut send,
1670        &control,
1671        &mut events,
1672    )
1673    .await;
1674    let ack = match &result {
1675        Ok(_) => DirectAck {
1676            ok: true,
1677            error: None,
1678        },
1679        Err(error) => DirectAck {
1680            ok: false,
1681            error: Some(error.to_string()),
1682        },
1683    };
1684    write_json_frame(&mut send, &ack).await?;
1685    send.finish()?;
1686    let _ = tokio::time::timeout(Duration::from_secs(1), connection.closed()).await;
1687    endpoint.close(0u32.into(), b"done");
1688    endpoint.wait_idle().await;
1689
1690    result
1691}
1692
1693async fn receive_stream_file(
1694    destination: &Path,
1695    psk: Option<&str>,
1696    recv: &mut quinn::RecvStream,
1697    send: &mut quinn::SendStream,
1698    control: &TransferControl,
1699    events: &mut impl FnMut(TransferEvent),
1700) -> Result<TransferSummary> {
1701    let mut magic = [0; DIRECT_MAGIC.len()];
1702    recv.read_exact(&mut magic).await?;
1703    if &magic != DIRECT_MAGIC {
1704        return Err(Error::Protocol("bad direct-transfer magic".to_string()));
1705    }
1706
1707    let hello: DirectProtocolHello = read_json_frame(recv).await?;
1708    let protocol = negotiate_direct_protocol(&hello, psk);
1709    write_json_frame(send, &protocol).await?;
1710    if !protocol.accepted {
1711        return Err(Error::Protocol(protocol.error.unwrap_or_else(|| {
1712            "unsupported direct protocol version".to_string()
1713        })));
1714    }
1715    if let (Some(psk), Some(challenge)) = (psk, protocol.psk_challenge.as_deref()) {
1716        let proof: DirectPskProof = read_json_frame(recv).await?;
1717        let expected = psk_proof(
1718            psk,
1719            challenge,
1720            &hello.client_fingerprint,
1721            protocol.protocol_version.unwrap_or(NATIVE_PROTOCOL_VERSION),
1722        )?;
1723        if proof.proof != expected {
1724            write_json_frame(send, &DirectPskResult::reject("PSK proof mismatch")).await?;
1725            return Err(Error::Authentication("PSK proof mismatch".to_string()));
1726        }
1727        write_json_frame(send, &DirectPskResult::accept()).await?;
1728    }
1729
1730    let manifest: TransferManifest = read_json_frame(recv).await?;
1731    validate_manifest(&manifest)?;
1732    events(TransferEvent::SessionStarted {
1733        direction: TransferDirection::Receive,
1734        session_id: manifest.session_id,
1735        files_total: manifest.file_entries().count(),
1736        bytes_total: manifest.total_file_bytes(),
1737    });
1738    control.check_cancelled()?;
1739
1740    for entry in &manifest.entries {
1741        if entry.kind == ManifestEntryKind::Directory {
1742            let path = safe_destination_path(destination, &entry.relative_path)?;
1743            tokio::fs::create_dir_all(&path)
1744                .await
1745                .map_err(|source| Error::IoPath { path, source })?;
1746        }
1747    }
1748
1749    let mut plan = DirectReceivePlan { files: Vec::new() };
1750    for entry in manifest.file_entries() {
1751        let decision = decide_resume(destination, entry, manifest.chunk_size).await?;
1752        let (action, offset) = match decision {
1753            ResumeDecision::Create => (DirectFileAction::Receive, 0),
1754            ResumeDecision::Skip => (DirectFileAction::Skip, entry.size),
1755            ResumeDecision::ResumeFrom(offset) => (DirectFileAction::Receive, offset),
1756            ResumeDecision::Conflict(message) => return Err(Error::InvalidInput(message)),
1757        };
1758        plan.files.push(DirectFilePlan {
1759            relative_path: entry.relative_path.clone(),
1760            action,
1761            offset,
1762        });
1763    }
1764    write_json_frame(send, &plan).await?;
1765
1766    let mut summaries = Vec::new();
1767    for file_plan in &plan.files {
1768        let Some(entry) = manifest
1769            .file_entries()
1770            .find(|entry| entry.relative_path == file_plan.relative_path)
1771        else {
1772            return Err(Error::Protocol(format!(
1773                "planned file missing from manifest: {}",
1774                display_path(&file_plan.relative_path)
1775            )));
1776        };
1777
1778        let final_path = safe_destination_path(destination, &entry.relative_path)?;
1779        if file_plan.action == DirectFileAction::Skip {
1780            let blake3 = entry.blake3.clone().unwrap_or_default();
1781            events(TransferEvent::FileFinished {
1782                direction: TransferDirection::Receive,
1783                file_name: display_path(&entry.relative_path),
1784                bytes: entry.size,
1785                blake3: blake3.clone(),
1786                status: TransferFileStatus::Skipped,
1787            });
1788            summaries.push(TransferFileSummary {
1789                path: final_path,
1790                relative_path: entry.relative_path.clone(),
1791                bytes: entry.size,
1792                blake3,
1793                status: TransferFileStatus::Skipped,
1794            });
1795            continue;
1796        }
1797
1798        match receive_payload_file(recv, destination, entry, file_plan.offset, control, events)
1799            .await
1800        {
1801            Ok(()) => {}
1802            Err(Error::TransferCancelled) => {
1803                events(TransferEvent::SessionCancelled {
1804                    direction: TransferDirection::Receive,
1805                    session_id: manifest.session_id,
1806                });
1807                return Err(Error::TransferCancelled);
1808            }
1809            Err(error) => return Err(error),
1810        }
1811        let actual_hash = hash_file(&final_path).await?;
1812        let expected_hash = entry
1813            .blake3
1814            .as_deref()
1815            .ok_or_else(|| Error::Protocol("file manifest entry is missing BLAKE3".to_string()))?;
1816        if actual_hash != expected_hash {
1817            return Err(Error::Transfer(format!(
1818                "BLAKE3 hash mismatch for {}",
1819                display_path(&entry.relative_path)
1820            )));
1821        }
1822
1823        let status = if file_plan.offset > 0 {
1824            TransferFileStatus::Resumed
1825        } else {
1826            TransferFileStatus::Received
1827        };
1828        events(TransferEvent::FileFinished {
1829            direction: TransferDirection::Receive,
1830            file_name: display_path(&entry.relative_path),
1831            bytes: entry.size,
1832            blake3: actual_hash.clone(),
1833            status,
1834        });
1835        summaries.push(TransferFileSummary {
1836            path: final_path,
1837            relative_path: entry.relative_path.clone(),
1838            bytes: entry.size,
1839            blake3: actual_hash,
1840            status,
1841        });
1842    }
1843
1844    let summary = summary_from_files(&manifest, summaries)?;
1845    events(TransferEvent::SessionFinished {
1846        direction: TransferDirection::Receive,
1847        files_total: summary.files.len(),
1848        bytes_total: summary.bytes,
1849        blake3: summary.blake3.clone(),
1850    });
1851    Ok(summary)
1852}
1853
1854async fn send_payload_file(
1855    send: &mut quinn::SendStream,
1856    source_path: &Path,
1857    entry: &ManifestEntry,
1858    offset: u64,
1859    control: &TransferControl,
1860    events: &mut impl FnMut(TransferEvent),
1861) -> Result<()> {
1862    if offset > entry.size {
1863        return Err(Error::Protocol(format!(
1864            "resume offset exceeds file size for {}",
1865            display_path(&entry.relative_path)
1866        )));
1867    }
1868
1869    let header = DirectPayloadHeader {
1870        relative_path: entry.relative_path.clone(),
1871        offset,
1872        bytes: entry.size - offset,
1873    };
1874    write_json_frame(send, &header).await?;
1875    events(TransferEvent::FileStarted {
1876        direction: TransferDirection::Send,
1877        file_name: display_path(&entry.relative_path),
1878        bytes_total: entry.size,
1879        resume_offset: offset,
1880    });
1881
1882    let mut file = tokio::fs::File::open(source_path)
1883        .await
1884        .map_err(|source| Error::IoPath {
1885            path: source_path.to_path_buf(),
1886            source,
1887        })?;
1888    file.seek(SeekFrom::Start(offset))
1889        .await
1890        .map_err(|source| Error::IoPath {
1891            path: source_path.to_path_buf(),
1892            source,
1893        })?;
1894
1895    let mut buffer = vec![0; IO_BUFFER_SIZE];
1896    let mut bytes_done = offset;
1897    while bytes_done < entry.size {
1898        control.check_cancelled()?;
1899        let remaining = entry.size - bytes_done;
1900        let read_size = buffer.len().min(remaining as usize);
1901        let read = file
1902            .read(&mut buffer[..read_size])
1903            .await
1904            .map_err(|source| Error::IoPath {
1905                path: source_path.to_path_buf(),
1906                source,
1907            })?;
1908        if read == 0 {
1909            return Err(Error::Protocol(format!(
1910                "source file ended early: {}",
1911                display_path(source_path)
1912            )));
1913        }
1914        send.write_all(&buffer[..read]).await?;
1915        bytes_done += read as u64;
1916        events(TransferEvent::Progress {
1917            direction: TransferDirection::Send,
1918            file_name: display_path(&entry.relative_path),
1919            bytes_done,
1920            bytes_total: entry.size,
1921        });
1922        control.check_cancelled()?;
1923    }
1924
1925    Ok(())
1926}
1927
1928async fn receive_payload_file(
1929    recv: &mut quinn::RecvStream,
1930    destination: &Path,
1931    entry: &ManifestEntry,
1932    offset: u64,
1933    control: &TransferControl,
1934    events: &mut impl FnMut(TransferEvent),
1935) -> Result<()> {
1936    let header: DirectPayloadHeader = read_json_frame(recv).await?;
1937    if header.relative_path != entry.relative_path
1938        || header.offset != offset
1939        || header.bytes != entry.size - offset
1940    {
1941        return Err(Error::Protocol(format!(
1942            "payload header does not match receive plan for {}",
1943            display_path(&entry.relative_path)
1944        )));
1945    }
1946    events(TransferEvent::FileStarted {
1947        direction: TransferDirection::Receive,
1948        file_name: display_path(&entry.relative_path),
1949        bytes_total: entry.size,
1950        resume_offset: offset,
1951    });
1952
1953    let final_path = safe_destination_path(destination, &entry.relative_path)?;
1954    if let Some(parent) = final_path.parent() {
1955        tokio::fs::create_dir_all(parent)
1956            .await
1957            .map_err(|source| Error::IoPath {
1958                path: parent.to_path_buf(),
1959                source,
1960            })?;
1961    }
1962
1963    let write_path = if offset == 0 {
1964        temp_path_for(&final_path)
1965    } else {
1966        final_path.clone()
1967    };
1968
1969    let mut file = if offset == 0 {
1970        tokio::fs::File::create(&write_path)
1971            .await
1972            .map_err(|source| Error::IoPath {
1973                path: write_path.clone(),
1974                source,
1975            })?
1976    } else {
1977        let mut file = tokio::fs::OpenOptions::new()
1978            .write(true)
1979            .open(&write_path)
1980            .await
1981            .map_err(|source| Error::IoPath {
1982                path: write_path.clone(),
1983                source,
1984            })?;
1985        file.set_len(offset).await.map_err(|source| Error::IoPath {
1986            path: write_path.clone(),
1987            source,
1988        })?;
1989        file.seek(SeekFrom::Start(offset))
1990            .await
1991            .map_err(|source| Error::IoPath {
1992                path: write_path.clone(),
1993                source,
1994            })?;
1995        file
1996    };
1997
1998    let mut bytes_done = offset;
1999    let mut bytes_remaining = header.bytes;
2000    let mut buffer = vec![0; IO_BUFFER_SIZE];
2001    while bytes_remaining > 0 {
2002        if control.is_cancelled() {
2003            if offset == 0 {
2004                let _ = tokio::fs::remove_file(&write_path).await;
2005            }
2006            return Err(Error::TransferCancelled);
2007        }
2008        let read_size = buffer.len().min(bytes_remaining as usize);
2009        let read = match recv.read(&mut buffer[..read_size]).await {
2010            Ok(Some(read)) => read,
2011            Ok(None) => {
2012                if offset == 0 {
2013                    let _ = tokio::fs::remove_file(&write_path).await;
2014                }
2015                return Err(Error::Protocol(
2016                    "stream ended before file payload".to_string(),
2017                ));
2018            }
2019            Err(error) => {
2020                if offset == 0 {
2021                    let _ = tokio::fs::remove_file(&write_path).await;
2022                }
2023                return Err(Error::Read(error));
2024            }
2025        };
2026
2027        file.write_all(&buffer[..read])
2028            .await
2029            .map_err(|source| Error::IoPath {
2030                path: write_path.clone(),
2031                source,
2032            })?;
2033        bytes_done += read as u64;
2034        bytes_remaining -= read as u64;
2035        events(TransferEvent::Progress {
2036            direction: TransferDirection::Receive,
2037            file_name: display_path(&entry.relative_path),
2038            bytes_done,
2039            bytes_total: entry.size,
2040        });
2041        if control.is_cancelled() {
2042            if offset == 0 {
2043                let _ = tokio::fs::remove_file(&write_path).await;
2044            }
2045            return Err(Error::TransferCancelled);
2046        }
2047    }
2048
2049    file.flush().await.map_err(|source| Error::IoPath {
2050        path: write_path.clone(),
2051        source,
2052    })?;
2053    drop(file);
2054
2055    if offset == 0 {
2056        tokio::fs::rename(&write_path, &final_path)
2057            .await
2058            .map_err(|source| Error::IoPath {
2059                path: final_path,
2060                source,
2061            })?;
2062    }
2063
2064    Ok(())
2065}
2066
2067async fn write_json_frame<T: Serialize>(send: &mut quinn::SendStream, value: &T) -> Result<()> {
2068    let bytes = serde_json::to_vec(value).map_err(|error| Error::Protocol(error.to_string()))?;
2069    if bytes.len() > MAX_FRAME_LEN {
2070        return Err(Error::Protocol("frame exceeds maximum length".to_string()));
2071    }
2072    send.write_all(&(bytes.len() as u32).to_be_bytes()).await?;
2073    send.write_all(&bytes).await?;
2074    Ok(())
2075}
2076
2077async fn read_json_frame<T: for<'de> Deserialize<'de>>(recv: &mut quinn::RecvStream) -> Result<T> {
2078    let mut len = [0; 4];
2079    recv.read_exact(&mut len).await?;
2080    let len = u32::from_be_bytes(len) as usize;
2081    if len > MAX_FRAME_LEN {
2082        return Err(Error::Protocol("frame exceeds maximum length".to_string()));
2083    }
2084
2085    let mut bytes = vec![0; len];
2086    recv.read_exact(&mut bytes).await?;
2087    serde_json::from_slice(&bytes).map_err(|error| Error::Protocol(error.to_string()))
2088}
2089
2090fn negotiate_direct_protocol(
2091    hello: &DirectProtocolHello,
2092    psk: Option<&str>,
2093) -> DirectProtocolResponse {
2094    if hello.min_protocol_version > NATIVE_PROTOCOL_VERSION {
2095        return DirectProtocolResponse::reject(format!(
2096            "peer requires protocol version {}, local max is {}",
2097            hello.min_protocol_version, NATIVE_PROTOCOL_VERSION
2098        ));
2099    }
2100
2101    if hello.protocol_version < MIN_NATIVE_PROTOCOL_VERSION {
2102        return DirectProtocolResponse::reject(format!(
2103            "peer max protocol version {} is below local minimum {}",
2104            hello.protocol_version, MIN_NATIVE_PROTOCOL_VERSION
2105        ));
2106    }
2107
2108    let selected_version = NATIVE_PROTOCOL_VERSION.min(hello.protocol_version);
2109    match (psk, hello.psk) {
2110        (Some(_), false) => DirectProtocolResponse::reject("receiver requires PSK authentication"),
2111        (Some(_), true) => {
2112            DirectProtocolResponse::accept_with_psk(selected_version, Uuid::new_v4().to_string())
2113        }
2114        (None, true) => DirectProtocolResponse::reject("receiver is not configured for PSK mode"),
2115        (None, false) => DirectProtocolResponse::accept(selected_version),
2116    }
2117}
2118
2119#[derive(Debug, Clone)]
2120struct ManifestSource {
2121    source_path: PathBuf,
2122    relative_path: PathBuf,
2123}
2124
2125fn collect_manifest_sources(paths: &[PathBuf]) -> Result<Vec<ManifestSource>> {
2126    let mut sources = Vec::new();
2127    for path in paths {
2128        let metadata = std::fs::symlink_metadata(path).map_err(|source| Error::IoPath {
2129            path: path.clone(),
2130            source,
2131        })?;
2132        if metadata.file_type().is_symlink() {
2133            return Err(Error::InvalidInput(format!(
2134                "{} is a symlink; symlink transfers are not supported",
2135                display_path(path)
2136            )));
2137        }
2138
2139        let root_name = path
2140            .file_name()
2141            .ok_or_else(|| Error::InvalidInput(format!("{} has no file name", display_path(path))))?
2142            .to_os_string();
2143        let relative_path = PathBuf::from(root_name);
2144
2145        if metadata.is_dir() {
2146            sources.push(ManifestSource {
2147                source_path: path.clone(),
2148                relative_path: relative_path.clone(),
2149            });
2150            collect_dir_sources(path, &relative_path, &mut sources)?;
2151        } else if metadata.is_file() {
2152            sources.push(ManifestSource {
2153                source_path: path.clone(),
2154                relative_path,
2155            });
2156        } else {
2157            return Err(Error::InvalidInput(format!(
2158                "{} is not a regular file or directory",
2159                display_path(path)
2160            )));
2161        }
2162    }
2163
2164    Ok(sources)
2165}
2166
2167fn collect_dir_sources(
2168    dir: &Path,
2169    relative_dir: &Path,
2170    sources: &mut Vec<ManifestSource>,
2171) -> Result<()> {
2172    let mut entries = std::fs::read_dir(dir)
2173        .map_err(|source| Error::IoPath {
2174            path: dir.to_path_buf(),
2175            source,
2176        })?
2177        .collect::<std::result::Result<Vec<_>, _>>()
2178        .map_err(|source| Error::IoPath {
2179            path: dir.to_path_buf(),
2180            source,
2181        })?;
2182    entries.sort_by_key(|entry| entry.file_name());
2183
2184    for entry in entries {
2185        let path = entry.path();
2186        let metadata = std::fs::symlink_metadata(&path).map_err(|source| Error::IoPath {
2187            path: path.clone(),
2188            source,
2189        })?;
2190        if metadata.file_type().is_symlink() {
2191            return Err(Error::InvalidInput(format!(
2192                "{} is a symlink; symlink transfers are not supported",
2193                display_path(&path)
2194            )));
2195        }
2196
2197        let relative_path = relative_dir.join(entry.file_name());
2198        if metadata.is_dir() {
2199            sources.push(ManifestSource {
2200                source_path: path.clone(),
2201                relative_path: relative_path.clone(),
2202            });
2203            collect_dir_sources(&path, &relative_path, sources)?;
2204        } else if metadata.is_file() {
2205            sources.push(ManifestSource {
2206                source_path: path,
2207                relative_path,
2208            });
2209        }
2210    }
2211
2212    Ok(())
2213}
2214
2215fn manifest_source_map(paths: &[PathBuf]) -> Result<BTreeMap<PathBuf, PathBuf>> {
2216    Ok(collect_manifest_sources(paths)?
2217        .into_iter()
2218        .filter_map(|source| {
2219            let metadata = std::fs::metadata(&source.source_path).ok()?;
2220            metadata
2221                .is_file()
2222                .then_some((source.relative_path, source.source_path))
2223        })
2224        .collect())
2225}
2226
2227async fn build_manifest_entry(source: ManifestSource) -> Result<ManifestEntry> {
2228    validate_relative_transfer_path(&source.relative_path)?;
2229    let metadata = tokio::fs::metadata(&source.source_path)
2230        .await
2231        .map_err(|source_error| Error::IoPath {
2232            path: source.source_path.clone(),
2233            source: source_error,
2234        })?;
2235    let unix_mode = unix_mode(&metadata);
2236    let modified_unix_ms = modified_unix_ms(&metadata);
2237
2238    if metadata.is_dir() {
2239        return Ok(ManifestEntry {
2240            relative_path: source.relative_path,
2241            kind: ManifestEntryKind::Directory,
2242            size: 0,
2243            blake3: None,
2244            chunks: Vec::new(),
2245            unix_mode,
2246            modified_unix_ms,
2247        });
2248    }
2249
2250    let (blake3, chunks) = hash_file_with_chunks(&source.source_path, DEFAULT_CHUNK_SIZE).await?;
2251    Ok(ManifestEntry {
2252        relative_path: source.relative_path,
2253        kind: ManifestEntryKind::File,
2254        size: metadata.len(),
2255        blake3: Some(blake3),
2256        chunks,
2257        unix_mode,
2258        modified_unix_ms,
2259    })
2260}
2261
2262fn validate_manifest(manifest: &TransferManifest) -> Result<()> {
2263    if manifest.version != MANIFEST_VERSION {
2264        return Err(Error::Protocol(format!(
2265            "unsupported manifest version {}",
2266            manifest.version
2267        )));
2268    }
2269    if manifest.chunk_size == 0 {
2270        return Err(Error::Protocol("manifest chunk size is zero".to_string()));
2271    }
2272
2273    let mut seen = BTreeSet::new();
2274    for entry in &manifest.entries {
2275        validate_relative_transfer_path(&entry.relative_path)?;
2276        if !seen.insert(entry.relative_path.clone()) {
2277            return Err(Error::Protocol(format!(
2278                "duplicate manifest path: {}",
2279                display_path(&entry.relative_path)
2280            )));
2281        }
2282        if entry.kind == ManifestEntryKind::File && entry.blake3.is_none() {
2283            return Err(Error::Protocol(format!(
2284                "file manifest entry is missing BLAKE3: {}",
2285                display_path(&entry.relative_path)
2286            )));
2287        }
2288    }
2289
2290    Ok(())
2291}
2292
2293fn validate_relative_transfer_path(path: &Path) -> Result<()> {
2294    if path.as_os_str().is_empty() || path.is_absolute() {
2295        return Err(Error::InvalidInput(format!(
2296            "unsafe transfer path: {}",
2297            display_path(path)
2298        )));
2299    }
2300
2301    let mut normal_components = 0usize;
2302    for component in path.components() {
2303        match component {
2304            Component::Normal(value) => {
2305                let Some(name) = value.to_str() else {
2306                    return Err(Error::InvalidInput(format!(
2307                        "transfer path must be UTF-8: {}",
2308                        display_path(path)
2309                    )));
2310                };
2311                validate_path_component(name, path)?;
2312                normal_components += 1;
2313            }
2314            Component::CurDir
2315            | Component::ParentDir
2316            | Component::RootDir
2317            | Component::Prefix(_) => {
2318                return Err(Error::InvalidInput(format!(
2319                    "unsafe transfer path: {}",
2320                    display_path(path)
2321                )));
2322            }
2323        }
2324    }
2325
2326    if normal_components == 0 {
2327        return Err(Error::InvalidInput(format!(
2328            "unsafe transfer path: {}",
2329            display_path(path)
2330        )));
2331    }
2332
2333    Ok(())
2334}
2335
2336fn validate_path_component(component: &str, full_path: &Path) -> Result<()> {
2337    if component.is_empty()
2338        || component.contains('\\')
2339        || component.contains('/')
2340        || component.contains(':')
2341        || component.ends_with(' ')
2342        || component.ends_with('.')
2343        || is_windows_reserved_name(component)
2344    {
2345        return Err(Error::InvalidInput(format!(
2346            "unsafe transfer path: {}",
2347            display_path(full_path)
2348        )));
2349    }
2350
2351    Ok(())
2352}
2353
2354fn is_windows_reserved_name(component: &str) -> bool {
2355    let stem = component
2356        .split('.')
2357        .next()
2358        .unwrap_or(component)
2359        .trim_end_matches([' ', '.'])
2360        .to_ascii_uppercase();
2361    matches!(
2362        stem.as_str(),
2363        "CON"
2364            | "PRN"
2365            | "AUX"
2366            | "NUL"
2367            | "COM1"
2368            | "COM2"
2369            | "COM3"
2370            | "COM4"
2371            | "COM5"
2372            | "COM6"
2373            | "COM7"
2374            | "COM8"
2375            | "COM9"
2376            | "LPT1"
2377            | "LPT2"
2378            | "LPT3"
2379            | "LPT4"
2380            | "LPT5"
2381            | "LPT6"
2382            | "LPT7"
2383            | "LPT8"
2384            | "LPT9"
2385    )
2386}
2387
2388async fn hash_file(path: &Path) -> Result<String> {
2389    hash_file_with_chunks(path, DEFAULT_CHUNK_SIZE)
2390        .await
2391        .map(|(hash, _)| hash)
2392}
2393
2394async fn hash_file_with_chunks(path: &Path, chunk_size: u64) -> Result<(String, Vec<String>)> {
2395    let mut file = tokio::fs::File::open(path)
2396        .await
2397        .map_err(|source| Error::IoPath {
2398            path: path.to_path_buf(),
2399            source,
2400        })?;
2401    let mut hasher = blake3::Hasher::new();
2402    let mut chunk_hasher = blake3::Hasher::new();
2403    let mut chunk_bytes = 0u64;
2404    let mut chunks = Vec::new();
2405    let mut buffer = vec![0; IO_BUFFER_SIZE];
2406
2407    loop {
2408        let read = file
2409            .read(&mut buffer)
2410            .await
2411            .map_err(|source| Error::IoPath {
2412                path: path.to_path_buf(),
2413                source,
2414            })?;
2415        if read == 0 {
2416            break;
2417        }
2418        hasher.update(&buffer[..read]);
2419
2420        let mut remaining = &buffer[..read];
2421        while !remaining.is_empty() {
2422            let take = remaining.len().min((chunk_size - chunk_bytes) as usize);
2423            chunk_hasher.update(&remaining[..take]);
2424            chunk_bytes += take as u64;
2425            remaining = &remaining[take..];
2426            if chunk_bytes == chunk_size {
2427                chunks.push(chunk_hasher.finalize().to_hex().to_string());
2428                chunk_hasher = blake3::Hasher::new();
2429                chunk_bytes = 0;
2430            }
2431        }
2432    }
2433
2434    if chunk_bytes > 0 {
2435        chunks.push(chunk_hasher.finalize().to_hex().to_string());
2436    }
2437
2438    Ok((hasher.finalize().to_hex().to_string(), chunks))
2439}
2440
2441async fn verified_prefix_offset(
2442    path: &Path,
2443    entry: &ManifestEntry,
2444    existing_len: u64,
2445    chunk_size: u64,
2446) -> Result<u64> {
2447    if existing_len == 0 || entry.chunks.is_empty() {
2448        return Ok(0);
2449    }
2450
2451    let chunks_to_check = (existing_len / chunk_size).min(entry.chunks.len() as u64) as usize;
2452    if chunks_to_check == 0 {
2453        return Ok(0);
2454    }
2455
2456    let mut file = tokio::fs::File::open(path)
2457        .await
2458        .map_err(|source| Error::IoPath {
2459            path: path.to_path_buf(),
2460            source,
2461        })?;
2462    let mut buffer = vec![0; IO_BUFFER_SIZE];
2463    let mut verified_chunks = 0usize;
2464
2465    for expected in entry.chunks.iter().take(chunks_to_check) {
2466        let mut remaining = chunk_size;
2467        let mut hasher = blake3::Hasher::new();
2468        while remaining > 0 {
2469            let read_size = buffer.len().min(remaining as usize);
2470            file.read_exact(&mut buffer[..read_size])
2471                .await
2472                .map_err(|source| Error::IoPath {
2473                    path: path.to_path_buf(),
2474                    source,
2475                })?;
2476            hasher.update(&buffer[..read_size]);
2477            remaining -= read_size as u64;
2478        }
2479
2480        if hasher.finalize().to_hex().as_str() != expected {
2481            break;
2482        }
2483        verified_chunks += 1;
2484    }
2485
2486    Ok(verified_chunks as u64 * chunk_size)
2487}
2488
2489fn summary_from_files(
2490    manifest: &TransferManifest,
2491    files: Vec<TransferFileSummary>,
2492) -> Result<TransferSummary> {
2493    let bytes = manifest.total_file_bytes();
2494    let blake3 = if files.len() == 1 {
2495        files[0].blake3.clone()
2496    } else {
2497        manifest.digest()?
2498    };
2499    let file_name = if files.len() == 1 {
2500        display_path(&files[0].relative_path)
2501    } else {
2502        format!("{} files", files.len())
2503    };
2504    let path = files
2505        .first()
2506        .map(|file| file.path.clone())
2507        .unwrap_or_default();
2508
2509    Ok(TransferSummary {
2510        path,
2511        file_name,
2512        bytes,
2513        blake3,
2514        files,
2515    })
2516}
2517
2518fn temp_path_for(final_path: &Path) -> PathBuf {
2519    let file_name = final_path
2520        .file_name()
2521        .and_then(|name| name.to_str())
2522        .unwrap_or("transfer");
2523    final_path.with_file_name(format!(".{file_name}.{}.ferry-part", Uuid::new_v4()))
2524}
2525
2526fn modified_unix_ms(metadata: &Metadata) -> Option<i128> {
2527    system_time_unix_ms(metadata.modified().ok()?)
2528}
2529
2530fn system_time_unix_ms(time: SystemTime) -> Option<i128> {
2531    match time.duration_since(UNIX_EPOCH) {
2532        Ok(duration) => Some(duration.as_millis() as i128),
2533        Err(error) => Some(-(error.duration().as_millis() as i128)),
2534    }
2535}
2536
2537#[cfg(unix)]
2538fn unix_mode(metadata: &Metadata) -> Option<u32> {
2539    use std::os::unix::fs::PermissionsExt;
2540
2541    Some(metadata.permissions().mode())
2542}
2543
2544#[cfg(not(unix))]
2545fn unix_mode(_metadata: &Metadata) -> Option<u32> {
2546    None
2547}
2548
2549fn client_config_capturing_server_fingerprint()
2550-> Result<(quinn::ClientConfig, Arc<Mutex<Option<String>>>)> {
2551    let server_fingerprint = Arc::new(Mutex::new(None));
2552    let crypto = rustls::ClientConfig::builder()
2553        .dangerous()
2554        .with_custom_certificate_verifier(ServerFingerprintVerifier::new(
2555            server_fingerprint.clone(),
2556        ))
2557        .with_no_client_auth();
2558
2559    let config = quinn::ClientConfig::new(Arc::new(
2560        QuicClientConfig::try_from(crypto)
2561            .map_err(|error| Error::CryptoConfig(error.to_string()))?,
2562    ));
2563    Ok((config, server_fingerprint))
2564}
2565
2566fn captured_server_fingerprint(fingerprint: &Arc<Mutex<Option<String>>>) -> Result<String> {
2567    fingerprint
2568        .lock()
2569        .map_err(|_| Error::CryptoConfig("server fingerprint capture lock poisoned".to_string()))?
2570        .clone()
2571        .ok_or_else(|| {
2572            Error::CryptoConfig("server certificate fingerprint was not captured".to_string())
2573        })
2574}
2575
2576#[derive(Debug)]
2577struct ServerFingerprintVerifier {
2578    provider: Arc<CryptoProvider>,
2579    server_fingerprint: Arc<Mutex<Option<String>>>,
2580}
2581
2582impl ServerFingerprintVerifier {
2583    fn new(server_fingerprint: Arc<Mutex<Option<String>>>) -> Arc<Self> {
2584        Arc::new(Self {
2585            provider: Arc::new(rustls::crypto::ring::default_provider()),
2586            server_fingerprint,
2587        })
2588    }
2589}
2590
2591impl ServerCertVerifier for ServerFingerprintVerifier {
2592    fn verify_server_cert(
2593        &self,
2594        end_entity: &CertificateDer<'_>,
2595        _intermediates: &[CertificateDer<'_>],
2596        _server_name: &ServerName<'_>,
2597        _ocsp: &[u8],
2598        _now: UnixTime,
2599    ) -> std::result::Result<ServerCertVerified, rustls::Error> {
2600        let fingerprint = blake3::hash(end_entity.as_ref()).to_hex().to_string();
2601        *self.server_fingerprint.lock().map_err(|_| {
2602            rustls::Error::General("server fingerprint capture lock poisoned".to_string())
2603        })? = Some(fingerprint);
2604        Ok(ServerCertVerified::assertion())
2605    }
2606
2607    fn verify_tls12_signature(
2608        &self,
2609        message: &[u8],
2610        cert: &CertificateDer<'_>,
2611        dss: &DigitallySignedStruct,
2612    ) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
2613        verify_tls12_signature(
2614            message,
2615            cert,
2616            dss,
2617            &self.provider.signature_verification_algorithms,
2618        )
2619    }
2620
2621    fn verify_tls13_signature(
2622        &self,
2623        message: &[u8],
2624        cert: &CertificateDer<'_>,
2625        dss: &DigitallySignedStruct,
2626    ) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
2627        verify_tls13_signature(
2628            message,
2629            cert,
2630            dss,
2631            &self.provider.signature_verification_algorithms,
2632        )
2633    }
2634
2635    fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
2636        self.provider
2637            .signature_verification_algorithms
2638            .supported_schemes()
2639    }
2640}
2641
2642fn ensure_rustls_provider() {
2643    let _ = rustls::crypto::ring::default_provider().install_default();
2644}
2645
2646pub fn config_dir() -> Result<PathBuf> {
2647    let base_dirs = BaseDirs::new().ok_or(Error::ConfigDirUnavailable)?;
2648    Ok(base_dirs.config_dir().join(CONFIG_DIR_NAME))
2649}
2650
2651fn default_alias() -> String {
2652    std::env::var("HOSTNAME")
2653        .or_else(|_| std::env::var("COMPUTERNAME"))
2654        .ok()
2655        .filter(|value| !value.trim().is_empty())
2656        .unwrap_or_else(|| "fileferry-device".to_string())
2657}
2658
2659fn expand_home_path(path: &str) -> Result<PathBuf> {
2660    if path == "~" {
2661        return Ok(BaseDirs::new()
2662            .ok_or(Error::ConfigDirUnavailable)?
2663            .home_dir()
2664            .to_path_buf());
2665    }
2666
2667    if let Some(rest) = path.strip_prefix("~/") {
2668        return Ok(BaseDirs::new()
2669            .ok_or(Error::ConfigDirUnavailable)?
2670            .home_dir()
2671            .join(rest));
2672    }
2673
2674    Ok(PathBuf::from(path))
2675}
2676
2677fn normalized_fingerprint(fingerprint: String) -> Result<String> {
2678    let fingerprint = fingerprint.trim().to_ascii_lowercase();
2679    if fingerprint.len() != 64 || !fingerprint.chars().all(|char| char.is_ascii_hexdigit()) {
2680        return Err(Error::InvalidInput(
2681            "peer fingerprint must be a full 64-character hex BLAKE3 fingerprint".to_string(),
2682        ));
2683    }
2684
2685    Ok(fingerprint)
2686}
2687
2688fn validate_psk(psk: String) -> Result<String> {
2689    if psk.is_empty() {
2690        return Err(Error::InvalidInput("PSK must not be empty".to_string()));
2691    }
2692
2693    Ok(psk)
2694}
2695
2696fn psk_proof(
2697    psk: &str,
2698    challenge: &str,
2699    client_fingerprint: &str,
2700    protocol_version: u16,
2701) -> Result<String> {
2702    validate_psk(psk.to_string())?;
2703    let key = blake3::hash(psk.as_bytes());
2704    let mut message = Vec::new();
2705    message.extend_from_slice(PSK_PROOF_CONTEXT);
2706    message.push(0);
2707    message.extend_from_slice(challenge.as_bytes());
2708    message.push(0);
2709    message.extend_from_slice(client_fingerprint.as_bytes());
2710    message.push(0);
2711    message.extend_from_slice(protocol_version.to_string().as_bytes());
2712    Ok(blake3::keyed_hash(key.as_bytes(), &message)
2713        .to_hex()
2714        .to_string())
2715}
2716
2717fn write_toml_file<T: Serialize>(path: &Path, value: &T) -> Result<()> {
2718    let bytes =
2719        toml::to_string_pretty(value).map_err(|error| Error::ConfigSerialize(error.to_string()))?;
2720    if let Some(parent) = path.parent() {
2721        std::fs::create_dir_all(parent).map_err(|source| Error::IoPath {
2722            path: parent.to_path_buf(),
2723            source,
2724        })?;
2725    }
2726
2727    let temp_path = path.with_extension(format!("tmp-{}", Uuid::new_v4()));
2728    std::fs::write(&temp_path, bytes).map_err(|source| Error::IoPath {
2729        path: temp_path.clone(),
2730        source,
2731    })?;
2732    std::fs::rename(&temp_path, path).map_err(|source| Error::IoPath {
2733        path: path.to_path_buf(),
2734        source,
2735    })?;
2736    Ok(())
2737}
2738
2739fn write_private_file(path: &Path, bytes: &[u8]) -> Result<()> {
2740    std::fs::write(path, bytes).map_err(|source| Error::IoPath {
2741        path: path.to_path_buf(),
2742        source,
2743    })?;
2744
2745    #[cfg(unix)]
2746    {
2747        use std::os::unix::fs::PermissionsExt;
2748
2749        let mut permissions = std::fs::metadata(path)
2750            .map_err(|source| Error::IoPath {
2751                path: path.to_path_buf(),
2752                source,
2753            })?
2754            .permissions();
2755        permissions.set_mode(0o600);
2756        std::fs::set_permissions(path, permissions).map_err(|source| Error::IoPath {
2757            path: path.to_path_buf(),
2758            source,
2759        })?;
2760    }
2761
2762    Ok(())
2763}
2764
2765#[cfg(test)]
2766mod tests {
2767    use super::*;
2768    use std::sync::{Arc, Mutex};
2769
2770    #[test]
2771    fn direct_peer_parses_socket_address() {
2772        let peer = DirectPeer::parse("127.0.0.1:53318").expect("address parses");
2773
2774        assert_eq!(peer.address().to_string(), "127.0.0.1:53318");
2775        assert_eq!(peer.expected_fingerprint(), None);
2776    }
2777
2778    #[test]
2779    fn direct_peer_accepts_expected_fingerprint() {
2780        let fingerprint = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
2781        let peer = DirectPeer::parse("127.0.0.1:53318")
2782            .expect("address parses")
2783            .with_expected_fingerprint(fingerprint)
2784            .expect("fingerprint parses");
2785
2786        assert_eq!(peer.expected_fingerprint(), Some(fingerprint));
2787    }
2788
2789    #[test]
2790    fn direct_peer_rejects_missing_port() {
2791        assert!(DirectPeer::parse("127.0.0.1").is_err());
2792    }
2793
2794    #[test]
2795    fn send_request_requires_at_least_one_path() {
2796        let peer = DirectPeer::parse("127.0.0.1:53318").expect("address parses");
2797
2798        assert!(SendRequest::new(peer, Vec::new()).is_err());
2799    }
2800
2801    #[test]
2802    fn psk_secret_is_redacted_in_debug_and_serialization() {
2803        let secret = PskSecret::new("do-not-print").expect("psk accepted");
2804
2805        assert_eq!(format!("{secret:?}"), "PskSecret(<redacted>)");
2806        assert_eq!(
2807            serde_json::to_string(&secret).expect("secret serializes"),
2808            r#""<redacted>""#
2809        );
2810
2811        let peer = DirectPeer::parse("127.0.0.1:53318").expect("address parses");
2812        let request = SendRequest::new(peer, vec![PathBuf::from("file.txt")])
2813            .expect("send request")
2814            .with_psk("do-not-print")
2815            .expect("psk accepted");
2816        assert!(!format!("{request:?}").contains("do-not-print"));
2817    }
2818
2819    #[test]
2820    fn validate_send_paths_rejects_empty_slice() {
2821        assert!(validate_send_paths(&[]).is_err());
2822    }
2823
2824    #[tokio::test]
2825    async fn manifest_walks_directory_and_serializes_entries() {
2826        let temp = tempfile::tempdir().expect("tempdir");
2827        let root = temp.path().join("bundle");
2828        let nested = root.join("nested");
2829        let empty = nested.join("empty");
2830        tokio::fs::create_dir_all(&empty).await.expect("dirs");
2831        tokio::fs::write(root.join("a.txt"), b"alpha")
2832            .await
2833            .expect("file a");
2834        tokio::fs::write(root.join("zero.bin"), [])
2835            .await
2836            .expect("zero file");
2837        tokio::fs::write(nested.join("b.txt"), b"beta")
2838            .await
2839            .expect("file b");
2840        tokio::fs::write(nested.join("large.bin"), vec![9; IO_BUFFER_SIZE + 17])
2841            .await
2842            .expect("large file");
2843
2844        let manifest = build_manifest(&[root]).await.expect("manifest");
2845        let json = serde_json::to_string(&manifest).expect("manifest serializes");
2846        let decoded: TransferManifest = serde_json::from_str(&json).expect("manifest decodes");
2847        let relative_paths = decoded
2848            .entries
2849            .iter()
2850            .map(|entry| entry.relative_path.clone())
2851            .collect::<Vec<_>>();
2852
2853        assert_eq!(decoded.version, MANIFEST_VERSION);
2854        assert_eq!(
2855            relative_paths,
2856            vec![
2857                PathBuf::from("bundle"),
2858                PathBuf::from("bundle/a.txt"),
2859                PathBuf::from("bundle/nested"),
2860                PathBuf::from("bundle/nested/b.txt"),
2861                PathBuf::from("bundle/nested/empty"),
2862                PathBuf::from("bundle/nested/large.bin"),
2863                PathBuf::from("bundle/zero.bin"),
2864            ]
2865        );
2866        assert!(decoded.entries.iter().any(|entry| {
2867            entry.kind == ManifestEntryKind::Directory
2868                && entry.relative_path == Path::new("bundle/nested/empty")
2869        }));
2870        assert!(decoded.entries.iter().any(|entry| {
2871            entry.kind == ManifestEntryKind::File
2872                && entry.relative_path == Path::new("bundle/nested/b.txt")
2873                && entry.size == 4
2874                && entry.blake3.is_some()
2875        }));
2876        assert!(decoded.entries.iter().any(|entry| {
2877            entry.kind == ManifestEntryKind::File
2878                && entry.relative_path == Path::new("bundle/zero.bin")
2879                && entry.size == 0
2880                && entry.blake3.is_some()
2881        }));
2882        assert!(decoded.entries.iter().any(|entry| {
2883            entry.kind == ManifestEntryKind::File
2884                && entry.relative_path == Path::new("bundle/nested/large.bin")
2885                && entry.size == IO_BUFFER_SIZE as u64 + 17
2886                && entry.blake3.is_some()
2887        }));
2888    }
2889
2890    #[test]
2891    fn safe_destination_path_rejects_unsafe_paths() {
2892        let dest = Path::new("/tmp/ferry-dest");
2893
2894        assert!(safe_destination_path(dest, Path::new("../escape.txt")).is_err());
2895        assert!(safe_destination_path(dest, Path::new("/absolute.txt")).is_err());
2896        assert!(safe_destination_path(dest, Path::new("nested/CON")).is_err());
2897        assert!(safe_destination_path(dest, Path::new("nested\\escape.txt")).is_err());
2898        assert!(safe_destination_path(dest, Path::new("nested/ok.txt")).is_ok());
2899    }
2900
2901    #[test]
2902    fn safe_destination_path_rejects_generated_escape_and_reserved_patterns() {
2903        let dest = Path::new("/tmp/ferry-dest");
2904        let unsafe_components = [
2905            "..",
2906            ".",
2907            "CON",
2908            "con.txt",
2909            "NUL",
2910            "COM1",
2911            "LPT9",
2912            "trailingspace ",
2913            "trailingdot.",
2914            "has:colon",
2915            "has\\backslash",
2916        ];
2917
2918        for component in unsafe_components {
2919            assert!(
2920                safe_destination_path(dest, Path::new(component)).is_err(),
2921                "{component} should be rejected as a top-level path"
2922            );
2923            if component != "." {
2924                assert!(
2925                    safe_destination_path(dest, Path::new("safe").join(component).as_path())
2926                        .is_err(),
2927                    "{component} should be rejected as a nested path"
2928                );
2929            }
2930        }
2931
2932        for component in ["alpha", "alpha-1", "alpha_1", "report.final.txt"] {
2933            let path = safe_destination_path(dest, Path::new("safe").join(component).as_path())
2934                .expect("safe generated path accepted");
2935            assert!(path.starts_with(dest));
2936        }
2937    }
2938
2939    #[tokio::test]
2940    async fn resume_decision_skips_existing_matching_file() {
2941        let temp = tempfile::tempdir().expect("tempdir");
2942        let source = temp.path().join("source.txt");
2943        let dest = temp.path().join("dest");
2944        tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2945        tokio::fs::write(&source, b"same contents")
2946            .await
2947            .expect("source");
2948        tokio::fs::write(dest.join("source.txt"), b"same contents")
2949            .await
2950            .expect("dest file");
2951        let manifest = build_manifest(&[source]).await.expect("manifest");
2952        let entry = manifest.file_entries().next().expect("file entry");
2953
2954        let decision = decide_resume(&dest, entry, manifest.chunk_size)
2955            .await
2956            .expect("decision");
2957
2958        assert_eq!(decision, ResumeDecision::Skip);
2959    }
2960
2961    #[tokio::test]
2962    async fn resume_decision_returns_verified_chunk_offset() {
2963        let temp = tempfile::tempdir().expect("tempdir");
2964        let source = temp.path().join("large.bin");
2965        let dest = temp.path().join("dest");
2966        tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2967        let bytes = vec![42; (DEFAULT_CHUNK_SIZE as usize * 2) + 128];
2968        tokio::fs::write(&source, &bytes).await.expect("source");
2969        tokio::fs::write(
2970            dest.join("large.bin"),
2971            &bytes[..DEFAULT_CHUNK_SIZE as usize + 99],
2972        )
2973        .await
2974        .expect("partial");
2975        let manifest = build_manifest(&[source]).await.expect("manifest");
2976        let entry = manifest.file_entries().next().expect("file entry");
2977
2978        let decision = decide_resume(&dest, entry, manifest.chunk_size)
2979            .await
2980            .expect("decision");
2981
2982        assert_eq!(decision, ResumeDecision::ResumeFrom(DEFAULT_CHUNK_SIZE));
2983    }
2984
2985    #[tokio::test]
2986    async fn resume_decision_rejects_existing_file_with_mismatched_contents() {
2987        let temp = tempfile::tempdir().expect("tempdir");
2988        let source = temp.path().join("source.txt");
2989        let dest = temp.path().join("dest");
2990        tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2991        tokio::fs::write(&source, b"expected contents")
2992            .await
2993            .expect("source");
2994        tokio::fs::write(dest.join("source.txt"), b"different contents")
2995            .await
2996            .expect("conflicting dest file");
2997        let manifest = build_manifest(&[source]).await.expect("manifest");
2998        let entry = manifest.file_entries().next().expect("file entry");
2999
3000        let decision = decide_resume(&dest, entry, manifest.chunk_size)
3001            .await
3002            .expect("decision");
3003
3004        assert!(matches!(decision, ResumeDecision::Conflict(_)));
3005    }
3006
3007    #[tokio::test]
3008    async fn resume_decision_rejects_existing_file_larger_than_manifest_entry() {
3009        let temp = tempfile::tempdir().expect("tempdir");
3010        let source = temp.path().join("source.txt");
3011        let dest = temp.path().join("dest");
3012        tokio::fs::create_dir_all(&dest).await.expect("dest dir");
3013        tokio::fs::write(&source, b"small").await.expect("source");
3014        tokio::fs::write(dest.join("source.txt"), b"larger destination")
3015            .await
3016            .expect("larger dest file");
3017        let manifest = build_manifest(&[source]).await.expect("manifest");
3018        let entry = manifest.file_entries().next().expect("file entry");
3019
3020        let decision = decide_resume(&dest, entry, manifest.chunk_size)
3021            .await
3022            .expect("decision");
3023
3024        assert!(matches!(decision, ResumeDecision::Conflict(_)));
3025    }
3026
3027    #[tokio::test]
3028    async fn resume_decision_handles_nested_partial_files_safely() {
3029        let temp = tempfile::tempdir().expect("tempdir");
3030        let source_root = temp.path().join("bundle");
3031        let source_nested = source_root.join("nested");
3032        let dest = temp.path().join("dest");
3033        let dest_nested = dest.join("bundle/nested");
3034        tokio::fs::create_dir_all(&source_nested)
3035            .await
3036            .expect("source nested dir");
3037        tokio::fs::create_dir_all(&dest_nested)
3038            .await
3039            .expect("dest nested dir");
3040        let bytes = (0..((DEFAULT_CHUNK_SIZE as usize * 2) + 777))
3041            .map(|index| (index % 251) as u8)
3042            .collect::<Vec<_>>();
3043        let source_file = source_nested.join("large.bin");
3044        tokio::fs::write(&source_file, &bytes)
3045            .await
3046            .expect("source");
3047        let manifest = build_manifest(&[source_root]).await.expect("manifest");
3048        let entry = manifest
3049            .file_entries()
3050            .find(|entry| entry.relative_path == Path::new("bundle/nested/large.bin"))
3051            .expect("nested file entry");
3052
3053        tokio::fs::write(
3054            dest_nested.join("large.bin"),
3055            &bytes[..DEFAULT_CHUNK_SIZE as usize + 99],
3056        )
3057        .await
3058        .expect("matching partial");
3059        let decision = decide_resume(&dest, entry, manifest.chunk_size)
3060            .await
3061            .expect("matching partial decision");
3062        assert_eq!(decision, ResumeDecision::ResumeFrom(DEFAULT_CHUNK_SIZE));
3063
3064        let mut mismatched = bytes[..DEFAULT_CHUNK_SIZE as usize + 99].to_vec();
3065        mismatched[0] ^= 0xff;
3066        tokio::fs::write(dest_nested.join("large.bin"), mismatched)
3067            .await
3068            .expect("mismatched partial");
3069        let decision = decide_resume(&dest, entry, manifest.chunk_size)
3070            .await
3071            .expect("mismatched partial decision");
3072        assert!(matches!(decision, ResumeDecision::Conflict(_)));
3073    }
3074
3075    #[tokio::test]
3076    async fn resume_decision_rejects_same_name_file_directory_conflicts() {
3077        let temp = tempfile::tempdir().expect("tempdir");
3078        let source = temp.path().join("source.txt");
3079        let dest = temp.path().join("dest");
3080        tokio::fs::create_dir_all(&dest).await.expect("dest dir");
3081        tokio::fs::write(&source, b"incoming file")
3082            .await
3083            .expect("source");
3084        tokio::fs::create_dir(dest.join("source.txt"))
3085            .await
3086            .expect("conflicting dest dir");
3087        let manifest = build_manifest(&[source]).await.expect("manifest");
3088        let entry = manifest.file_entries().next().expect("file entry");
3089
3090        let decision = decide_resume(&dest, entry, manifest.chunk_size)
3091            .await
3092            .expect("decision");
3093
3094        assert!(matches!(decision, ResumeDecision::Conflict(_)));
3095    }
3096
3097    #[test]
3098    fn identity_is_loaded_after_generation() {
3099        let temp = tempfile::tempdir().expect("tempdir");
3100        let generated =
3101            NativeIdentity::load_or_generate_in(temp.path()).expect("identity generated");
3102        let loaded = NativeIdentity::load_or_generate_in(temp.path()).expect("identity loaded");
3103
3104        assert_eq!(generated.fingerprint(), loaded.fingerprint());
3105    }
3106
3107    #[test]
3108    fn app_config_round_trips_toml() {
3109        let temp = tempfile::tempdir().expect("tempdir");
3110        let path = temp.path().join("config.toml");
3111        let config = AppConfig {
3112            alias: "desk".to_string(),
3113            ..AppConfig::default()
3114        };
3115
3116        config.save_to_path(&path).expect("config saved");
3117        let loaded = AppConfig::load_or_default_from(&path).expect("config loaded");
3118
3119        assert_eq!(loaded.alias, "desk");
3120        assert_eq!(loaded.listen_port, 53317);
3121        assert!(loaded.trust.require_fingerprint);
3122    }
3123
3124    #[test]
3125    fn app_config_redacts_psk_for_display_output() {
3126        let config = AppConfig {
3127            trust: TrustConfig {
3128                psk: "super-secret-psk".to_string(),
3129                ..TrustConfig::default()
3130            },
3131            ..AppConfig::default()
3132        };
3133
3134        let redacted = config.to_redacted_toml_string().expect("config serializes");
3135
3136        assert!(!redacted.contains("super-secret-psk"));
3137        assert!(redacted.contains(r#"psk = "<redacted>""#));
3138    }
3139
3140    #[test]
3141    fn daemon_config_defaults_from_app_config_and_round_trips_toml() {
3142        let temp = tempfile::tempdir().expect("tempdir");
3143        let path = temp.path().join("daemon.toml");
3144        let app_config = AppConfig {
3145            quic_port: 54444,
3146            download_dir: temp.path().join("downloads").display().to_string(),
3147            ..AppConfig::default()
3148        };
3149
3150        let defaulted =
3151            DaemonConfig::load_or_default_from(&path, &app_config).expect("daemon config");
3152        assert_eq!(
3153            defaulted.listen,
3154            SocketAddr::from(([0, 0, 0, 0], app_config.quic_port))
3155        );
3156        assert_eq!(defaulted.destination, temp.path().join("downloads"));
3157
3158        let config = DaemonConfig {
3159            listen: SocketAddr::from(([127, 0, 0, 1], 53318)),
3160            destination: temp.path().join("daemon-dest"),
3161        };
3162        config.save_to_path(&path).expect("daemon config saved");
3163        let loaded =
3164            DaemonConfig::load_or_default_from(&path, &app_config).expect("daemon config loaded");
3165
3166        assert_eq!(loaded, config);
3167    }
3168
3169    #[test]
3170    fn trust_store_load_save_and_forget_round_trip() {
3171        let temp = tempfile::tempdir().expect("tempdir");
3172        let path = temp.path().join("known_peers.toml");
3173        let fingerprint = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
3174        let mut store = TrustStore::default();
3175
3176        store
3177            .trust_fingerprint(fingerprint)
3178            .expect("fingerprint trusted");
3179        store.save_to_path(&path).expect("trust store saved");
3180        let mut loaded = TrustStore::load_or_default_from(&path).expect("trust store loaded");
3181
3182        assert_eq!(loaded.records().count(), 1);
3183        assert_eq!(loaded.peers[fingerprint].trust_state, TrustState::Trusted);
3184        assert!(loaded.forget(fingerprint));
3185        assert_eq!(loaded.records().count(), 0);
3186    }
3187
3188    #[test]
3189    fn trust_store_rejects_short_fingerprint_without_discovery_lookup() {
3190        let mut store = TrustStore::default();
3191
3192        assert!(store.trust_fingerprint("9a4f2c").is_err());
3193    }
3194
3195    #[test]
3196    fn trust_store_pins_direct_address_to_fingerprint() {
3197        let fingerprint = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
3198        let address = SocketAddr::from(([192, 168, 1, 42], 53318));
3199        let mut store = TrustStore::default();
3200
3201        store
3202            .trust_direct_address(fingerprint, address)
3203            .expect("direct address trusted");
3204
3205        assert!(store.is_trusted_fingerprint(fingerprint));
3206        assert_eq!(
3207            store.trusted_fingerprint_for_address(address),
3208            Some(fingerprint)
3209        );
3210        assert!(store.peers[fingerprint].addresses.contains(&address));
3211        assert!(store.peers[fingerprint].transports.contains("quic"));
3212        assert_eq!(store.peers[fingerprint].trust_state, TrustState::Trusted);
3213    }
3214
3215    #[test]
3216    fn transfer_events_serialize_with_stable_event_names() {
3217        let event = TransferEvent::Progress {
3218            direction: TransferDirection::Send,
3219            file_name: "bundle/a.txt".to_string(),
3220            bytes_done: 5,
3221            bytes_total: 9,
3222        };
3223
3224        let json = serde_json::to_value(&event).expect("event serializes");
3225
3226        assert_eq!(json["event"], "progress");
3227        assert_eq!(json["direction"], "send");
3228        assert_eq!(json["file_name"], "bundle/a.txt");
3229        assert_eq!(json["bytes_done"], 5);
3230        assert_eq!(json["bytes_total"], 9);
3231    }
3232
3233    #[test]
3234    fn direct_protocol_hello_has_stable_golden_json() {
3235        let hello = DirectProtocolHello {
3236            protocol_version: 1,
3237            min_protocol_version: 1,
3238            client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3239                .to_string(),
3240            psk: false,
3241        };
3242
3243        let json = serde_json::to_string(&hello).expect("hello serializes");
3244
3245        assert_eq!(
3246            json,
3247            r#"{"protocol_version":1,"min_protocol_version":1,"client_fingerprint":"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef","psk":false}"#
3248        );
3249    }
3250
3251    #[test]
3252    fn direct_protocol_response_has_stable_golden_json() {
3253        let response = DirectProtocolResponse::accept(1);
3254
3255        let json = serde_json::to_string(&response).expect("response serializes");
3256
3257        assert_eq!(
3258            json,
3259            r#"{"accepted":true,"protocol_version":1,"psk_challenge":null,"error":null}"#
3260        );
3261    }
3262
3263    #[test]
3264    fn direct_protocol_decodes_pre_psk_negotiation_frames() {
3265        let hello: DirectProtocolHello = serde_json::from_str(
3266            r#"{"protocol_version":1,"min_protocol_version":1,"client_fingerprint":"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"}"#,
3267        )
3268        .expect("old hello decodes");
3269        assert!(!hello.psk);
3270
3271        let response: DirectProtocolResponse =
3272            serde_json::from_str(r#"{"accepted":true,"protocol_version":1,"error":null}"#)
3273                .expect("old response decodes");
3274        assert_eq!(response, DirectProtocolResponse::accept(1));
3275    }
3276
3277    #[test]
3278    fn direct_protocol_negotiates_supported_overlap() {
3279        let hello = DirectProtocolHello {
3280            protocol_version: 3,
3281            min_protocol_version: 1,
3282            client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3283                .to_string(),
3284            psk: false,
3285        };
3286
3287        let response = negotiate_direct_protocol(&hello, None);
3288
3289        assert_eq!(response, DirectProtocolResponse::accept(1));
3290    }
3291
3292    #[test]
3293    fn direct_protocol_rejects_incompatible_versions() {
3294        let too_new = DirectProtocolHello {
3295            protocol_version: 3,
3296            min_protocol_version: 2,
3297            client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3298                .to_string(),
3299            psk: false,
3300        };
3301        let too_old = DirectProtocolHello {
3302            protocol_version: 0,
3303            min_protocol_version: 0,
3304            client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3305                .to_string(),
3306            psk: false,
3307        };
3308
3309        assert!(!negotiate_direct_protocol(&too_new, None).accepted);
3310        assert!(!negotiate_direct_protocol(&too_old, None).accepted);
3311    }
3312
3313    #[test]
3314    fn direct_protocol_requires_matching_psk_mode() {
3315        let no_psk = DirectProtocolHello {
3316            protocol_version: 1,
3317            min_protocol_version: 1,
3318            client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3319                .to_string(),
3320            psk: false,
3321        };
3322        let with_psk = DirectProtocolHello {
3323            psk: true,
3324            ..no_psk.clone()
3325        };
3326
3327        assert!(!negotiate_direct_protocol(&no_psk, Some("secret")).accepted);
3328        assert!(!negotiate_direct_protocol(&with_psk, None).accepted);
3329
3330        let accepted = negotiate_direct_protocol(&with_psk, Some("secret"));
3331        assert!(accepted.accepted);
3332        assert!(accepted.psk_challenge.is_some());
3333    }
3334
3335    #[test]
3336    fn transfer_manifest_has_stable_golden_json() {
3337        let manifest = TransferManifest {
3338            version: MANIFEST_VERSION,
3339            session_id: Uuid::parse_str("00000000-0000-0000-0000-000000000001")
3340                .expect("uuid parses"),
3341            chunk_size: DEFAULT_CHUNK_SIZE,
3342            entries: vec![ManifestEntry {
3343                relative_path: PathBuf::from("bundle/a.txt"),
3344                kind: ManifestEntryKind::File,
3345                size: 5,
3346                blake3: Some(
3347                    "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef".to_string(),
3348                ),
3349                chunks: vec![
3350                    "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789".to_string(),
3351                ],
3352                unix_mode: Some(0o100644),
3353                modified_unix_ms: Some(1_700_000_000_000),
3354            }],
3355        };
3356
3357        let json = serde_json::to_string(&manifest).expect("manifest serializes");
3358
3359        assert_eq!(
3360            json,
3361            r#"{"version":1,"session_id":"00000000-0000-0000-0000-000000000001","chunk_size":1048576,"entries":[{"relative_path":"bundle/a.txt","kind":"file","size":5,"blake3":"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef","chunks":["abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789"],"unix_mode":33188,"modified_unix_ms":1700000000000}]}"#
3362        );
3363    }
3364
3365    #[test]
3366    fn peer_registry_merges_observations_by_fingerprint() {
3367        let mut registry = PeerRegistry::new();
3368        let fingerprint = "abcdef1234567890";
3369        let first = PeerObservation::new(
3370            fingerprint,
3371            SocketAddr::from(([192, 168, 1, 10], 53318)),
3372            100,
3373        )
3374        .with_alias("desk")
3375        .with_hostname("desk.local")
3376        .with_transport("quic");
3377        let second = PeerObservation::new(
3378            fingerprint,
3379            SocketAddr::from(([192, 168, 1, 11], 53318)),
3380            200,
3381        )
3382        .with_alias("workstation")
3383        .with_transport("quic");
3384
3385        registry.observe(first).expect("first observation");
3386        registry.observe(second).expect("second observation");
3387        let record = registry
3388            .get_by_fingerprint(fingerprint)
3389            .expect("record by fingerprint");
3390
3391        assert_eq!(registry.records().count(), 1);
3392        assert!(record.aliases.contains("desk"));
3393        assert!(record.aliases.contains("workstation"));
3394        assert!(record.hostnames.contains("desk.local"));
3395        assert_eq!(record.addresses.len(), 2);
3396        assert_eq!(record.first_seen_unix_ms, 100);
3397        assert_eq!(record.last_seen_unix_ms, 200);
3398    }
3399
3400    #[test]
3401    fn peer_registry_looks_up_unique_fingerprint_prefix_alias_and_hostname() {
3402        let mut registry = PeerRegistry::new();
3403        registry
3404            .observe(
3405                PeerObservation::new(
3406                    "abcdef1234567890",
3407                    SocketAddr::from(([192, 168, 1, 10], 53318)),
3408                    100,
3409                )
3410                .with_alias("desk")
3411                .with_hostname("desk.local"),
3412            )
3413            .expect("first observation");
3414        registry
3415            .observe(
3416                PeerObservation::new(
3417                    "123456abcdef7890",
3418                    SocketAddr::from(([192, 168, 1, 20], 53318)),
3419                    100,
3420                )
3421                .with_alias("laptop"),
3422            )
3423            .expect("second observation");
3424
3425        assert_eq!(
3426            registry
3427                .lookup("abcdef")
3428                .map(|record| record.fingerprint.as_str()),
3429            Some("abcdef1234567890")
3430        );
3431        assert_eq!(
3432            registry
3433                .lookup("desk")
3434                .map(|record| record.fingerprint.as_str()),
3435            Some("abcdef1234567890")
3436        );
3437        assert_eq!(
3438            registry
3439                .lookup("desk.local")
3440                .map(|record| record.fingerprint.as_str()),
3441            Some("abcdef1234567890")
3442        );
3443        assert!(registry.lookup("missing").is_none());
3444    }
3445
3446    #[test]
3447    fn peer_registry_rejects_ambiguous_lookup_hints() {
3448        let mut registry = PeerRegistry::new();
3449        registry
3450            .observe(
3451                PeerObservation::new(
3452                    "abcdef1234567890",
3453                    SocketAddr::from(([192, 168, 1, 10], 53318)),
3454                    100,
3455                )
3456                .with_alias("desk"),
3457            )
3458            .expect("first observation");
3459        registry
3460            .observe(
3461                PeerObservation::new(
3462                    "abcdef9999999999",
3463                    SocketAddr::from(([192, 168, 1, 11], 53318)),
3464                    100,
3465                )
3466                .with_alias("desk"),
3467            )
3468            .expect("second observation");
3469
3470        assert!(registry.lookup("abcdef").is_none());
3471        assert!(registry.lookup("desk").is_none());
3472        assert_eq!(
3473            registry
3474                .lookup("abcdef1234567890")
3475                .map(|record| record.fingerprint.as_str()),
3476            Some("abcdef1234567890")
3477        );
3478        match registry.lookup_detail("desk") {
3479            PeerLookup::Ambiguous(records) => assert_eq!(records.len(), 2),
3480            other => panic!("expected ambiguous duplicate-alias lookup, got {other:?}"),
3481        }
3482        assert!(matches!(
3483            registry.lookup_detail("missing"),
3484            PeerLookup::Missing
3485        ));
3486    }
3487
3488    #[test]
3489    fn native_announcement_builds_expected_txt_properties() {
3490        let announcement = NativeAnnouncement {
3491            alias: "Stephen MBP".to_string(),
3492            fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3493                .to_string(),
3494            quic_port: 53318,
3495            listen_port: Some(53317),
3496        };
3497
3498        let service = announcement.service_info().expect("service info");
3499
3500        assert_eq!(service.get_type(), NATIVE_SERVICE_TYPE);
3501        assert!(
3502            service
3503                .get_fullname()
3504                .starts_with("stephen-mbp-0123456789ab.")
3505        );
3506        assert_eq!(service.get_property_val_str("pv"), Some("1"));
3507        assert_eq!(service.get_property_val_str("minpv"), Some("1"));
3508        assert_eq!(
3509            service.get_property_val_str("fp"),
3510            Some("0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")
3511        );
3512        assert_eq!(service.get_property_val_str("alias"), Some("Stephen MBP"));
3513        assert_eq!(service.get_property_val_str("transports"), Some("quic"));
3514        assert_eq!(service.get_property_val_str("quic_port"), Some("53318"));
3515        assert_eq!(service.get_property_val_str("listen_port"), Some("53317"));
3516    }
3517
3518    #[test]
3519    fn resolved_native_service_merges_into_registry() {
3520        let properties = [
3521            ("pv", "1"),
3522            ("minpv", "1"),
3523            (
3524                "fp",
3525                "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
3526            ),
3527            ("alias", "desk"),
3528            ("transports", "quic"),
3529            ("quic_port", "53318"),
3530        ];
3531        let service = ServiceInfo::new(
3532            NATIVE_SERVICE_TYPE,
3533            "desk-0123456789ab",
3534            "desk.local.",
3535            "192.168.1.42",
3536            53318,
3537            &properties[..],
3538        )
3539        .expect("service info")
3540        .as_resolved_service();
3541        let mut registry = PeerRegistry::new();
3542
3543        observe_resolved_service(&mut registry, &service).expect("observation");
3544
3545        let record = registry.lookup("desk").expect("record by alias");
3546        assert_eq!(
3547            record.fingerprint,
3548            "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3549        );
3550        assert_eq!(
3551            record.preferred_quic_address(),
3552            Some(SocketAddr::from(([192, 168, 1, 42], 53318)))
3553        );
3554    }
3555
3556    #[test]
3557    fn resolved_native_service_ignores_incompatible_protocol_ranges() {
3558        let properties = [
3559            ("pv", "3"),
3560            ("minpv", "2"),
3561            (
3562                "fp",
3563                "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
3564            ),
3565            ("alias", "desk"),
3566            ("transports", "quic"),
3567            ("quic_port", "53318"),
3568        ];
3569        let service = ServiceInfo::new(
3570            NATIVE_SERVICE_TYPE,
3571            "desk-0123456789ab",
3572            "desk.local.",
3573            "192.168.1.42",
3574            53318,
3575            &properties[..],
3576        )
3577        .expect("service info")
3578        .as_resolved_service();
3579        let mut registry = PeerRegistry::new();
3580
3581        observe_resolved_service(&mut registry, &service).expect("observation");
3582
3583        assert_eq!(registry.records().count(), 0);
3584    }
3585
3586    #[tokio::test]
3587    async fn direct_quic_loopback_sends_one_file() {
3588        let source_dir = tempfile::tempdir().expect("source tempdir");
3589        let dest_dir = tempfile::tempdir().expect("dest tempdir");
3590        let identity_dir = tempfile::tempdir().expect("identity tempdir");
3591        let file_path = source_dir.path().join("hello.txt");
3592        tokio::fs::write(&file_path, b"hello from ferry")
3593            .await
3594            .expect("source file written");
3595
3596        let identity =
3597            NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3598        let recv_identity = identity.clone();
3599        let reserved_socket =
3600            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3601        let recv_addr = reserved_socket.local_addr().expect("local addr");
3602        drop(reserved_socket);
3603        let receive_progress = Arc::new(Mutex::new(Vec::new()));
3604        let receive_progress_log = receive_progress.clone();
3605        let dest_path = dest_dir.path().to_path_buf();
3606        let server = tokio::spawn(async move {
3607            receive_direct_file(
3608                &ReceiveRequest::new(recv_addr),
3609                dest_path,
3610                &recv_identity,
3611                |event| {
3612                    receive_progress_log
3613                        .lock()
3614                        .expect("progress lock")
3615                        .push(event);
3616                },
3617            )
3618            .await
3619        });
3620
3621        tokio::time::sleep(Duration::from_millis(100)).await;
3622
3623        let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3624        let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
3625        let send_progress = Arc::new(Mutex::new(Vec::new()));
3626        let send_progress_log = send_progress.clone();
3627        let send_summary = send_direct_file(&send_request, &identity, |event| {
3628            send_progress_log.lock().expect("progress lock").push(event);
3629        })
3630        .await
3631        .expect("file sent");
3632        let receive_summary = server.await.expect("server joined").expect("file received");
3633
3634        let received = tokio::fs::read(dest_dir.path().join("hello.txt"))
3635            .await
3636            .expect("received file readable");
3637        assert_eq!(received, b"hello from ferry");
3638        assert_eq!(send_summary.bytes, receive_summary.bytes);
3639        assert_eq!(send_summary.blake3, receive_summary.blake3);
3640        let send_progress = send_progress.lock().expect("progress lock");
3641        let receive_progress = receive_progress.lock().expect("progress lock");
3642        assert!(matches!(
3643            send_progress.first(),
3644            Some(TransferEvent::SessionStarted {
3645                direction: TransferDirection::Send,
3646                ..
3647            })
3648        ));
3649        assert!(
3650            send_progress
3651                .iter()
3652                .any(|event| matches!(event, TransferEvent::Progress { .. }))
3653        );
3654        assert!(
3655            send_progress
3656                .iter()
3657                .any(|event| matches!(event, TransferEvent::SessionFinished { .. }))
3658        );
3659        assert!(matches!(
3660            receive_progress.first(),
3661            Some(TransferEvent::SessionStarted {
3662                direction: TransferDirection::Receive,
3663                ..
3664            })
3665        ));
3666        assert!(
3667            receive_progress
3668                .iter()
3669                .any(|event| matches!(event, TransferEvent::Progress { .. }))
3670        );
3671        assert!(
3672            receive_progress
3673                .iter()
3674                .any(|event| matches!(event, TransferEvent::SessionFinished { .. }))
3675        );
3676    }
3677
3678    #[tokio::test]
3679    async fn direct_transfer_accepts_expected_receiver_fingerprint() {
3680        let source_dir = tempfile::tempdir().expect("source tempdir");
3681        let dest_dir = tempfile::tempdir().expect("dest tempdir");
3682        let sender_identity_dir = tempfile::tempdir().expect("sender identity tempdir");
3683        let receiver_identity_dir = tempfile::tempdir().expect("receiver identity tempdir");
3684        let file_path = source_dir.path().join("hello.txt");
3685        tokio::fs::write(&file_path, b"hello from ferry")
3686            .await
3687            .expect("source file written");
3688
3689        let sender_identity = NativeIdentity::load_or_generate_in(sender_identity_dir.path())
3690            .expect("sender identity");
3691        let receiver_identity = NativeIdentity::load_or_generate_in(receiver_identity_dir.path())
3692            .expect("receiver identity");
3693        let expected_fingerprint = receiver_identity.fingerprint().to_string();
3694        let reserved_socket =
3695            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3696        let recv_addr = reserved_socket.local_addr().expect("local addr");
3697        drop(reserved_socket);
3698        let dest_path = dest_dir.path().to_path_buf();
3699        let server = tokio::spawn(async move {
3700            receive_direct_file(
3701                &ReceiveRequest::new(recv_addr),
3702                dest_path,
3703                &receiver_identity,
3704                |_| {},
3705            )
3706            .await
3707        });
3708
3709        tokio::time::sleep(Duration::from_millis(100)).await;
3710
3711        let peer = DirectPeer::from_address(recv_addr)
3712            .with_expected_fingerprint(expected_fingerprint)
3713            .expect("expected fingerprint accepted");
3714        let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
3715        let send_summary = send_direct_file(&send_request, &sender_identity, |_| {})
3716            .await
3717            .expect("file sent");
3718        let receive_summary = server.await.expect("server joined").expect("file received");
3719
3720        let received = tokio::fs::read(dest_dir.path().join("hello.txt"))
3721            .await
3722            .expect("received file readable");
3723        assert_eq!(received, b"hello from ferry");
3724        assert_eq!(send_summary.bytes, receive_summary.bytes);
3725        assert_eq!(send_summary.blake3, receive_summary.blake3);
3726    }
3727
3728    #[tokio::test]
3729    async fn direct_transfer_confirms_unpinned_receiver_before_payload() {
3730        let source_dir = tempfile::tempdir().expect("source tempdir");
3731        let dest_dir = tempfile::tempdir().expect("dest tempdir");
3732        let sender_identity_dir = tempfile::tempdir().expect("sender identity tempdir");
3733        let receiver_identity_dir = tempfile::tempdir().expect("receiver identity tempdir");
3734        let file_path = source_dir.path().join("hello.txt");
3735        tokio::fs::write(&file_path, b"hello from ferry")
3736            .await
3737            .expect("source file written");
3738
3739        let sender_identity = NativeIdentity::load_or_generate_in(sender_identity_dir.path())
3740            .expect("sender identity");
3741        let receiver_identity = NativeIdentity::load_or_generate_in(receiver_identity_dir.path())
3742            .expect("receiver identity");
3743        let expected_fingerprint = receiver_identity.fingerprint().to_string();
3744        let reserved_socket =
3745            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3746        let recv_addr = reserved_socket.local_addr().expect("local addr");
3747        drop(reserved_socket);
3748        let dest_path = dest_dir.path().to_path_buf();
3749        let server = tokio::spawn(async move {
3750            receive_direct_file(
3751                &ReceiveRequest::new(recv_addr),
3752                dest_path,
3753                &receiver_identity,
3754                |_| {},
3755            )
3756            .await
3757        });
3758
3759        tokio::time::sleep(Duration::from_millis(100)).await;
3760
3761        let peer = DirectPeer::from_address(recv_addr);
3762        let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
3763        let callback_seen = Arc::new(AtomicBool::new(false));
3764        let callback_seen_send = callback_seen.clone();
3765        let send_summary = send_direct_file_with_control_and_trust(
3766            &send_request,
3767            &sender_identity,
3768            TransferControl::new(),
3769            |_| {},
3770            |fingerprint| {
3771                callback_seen_send.store(true, Ordering::SeqCst);
3772                assert_eq!(fingerprint, expected_fingerprint);
3773                Ok(())
3774            },
3775        )
3776        .await
3777        .expect("file sent after trust callback");
3778        let receive_summary = server.await.expect("server joined").expect("file received");
3779
3780        assert!(callback_seen.load(Ordering::SeqCst));
3781        assert_eq!(send_summary.bytes, receive_summary.bytes);
3782        assert_eq!(send_summary.blake3, receive_summary.blake3);
3783    }
3784
3785    #[tokio::test]
3786    async fn direct_transfer_rejects_unexpected_receiver_fingerprint_before_payload() {
3787        let source_dir = tempfile::tempdir().expect("source tempdir");
3788        let dest_dir = tempfile::tempdir().expect("dest tempdir");
3789        let sender_identity_dir = tempfile::tempdir().expect("sender identity tempdir");
3790        let receiver_identity_dir = tempfile::tempdir().expect("receiver identity tempdir");
3791        let other_identity_dir = tempfile::tempdir().expect("other identity tempdir");
3792        let file_path = source_dir.path().join("hello.txt");
3793        tokio::fs::write(&file_path, b"hello from ferry")
3794            .await
3795            .expect("source file written");
3796
3797        let sender_identity = NativeIdentity::load_or_generate_in(sender_identity_dir.path())
3798            .expect("sender identity");
3799        let receiver_identity = NativeIdentity::load_or_generate_in(receiver_identity_dir.path())
3800            .expect("receiver identity");
3801        let other_identity =
3802            NativeIdentity::load_or_generate_in(other_identity_dir.path()).expect("other identity");
3803        let reserved_socket =
3804            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3805        let recv_addr = reserved_socket.local_addr().expect("local addr");
3806        drop(reserved_socket);
3807        let dest_path = dest_dir.path().to_path_buf();
3808        let server = tokio::spawn(async move {
3809            receive_direct_file(
3810                &ReceiveRequest::new(recv_addr),
3811                dest_path,
3812                &receiver_identity,
3813                |_| {},
3814            )
3815            .await
3816        });
3817
3818        tokio::time::sleep(Duration::from_millis(100)).await;
3819
3820        let peer = DirectPeer::from_address(recv_addr)
3821            .with_expected_fingerprint(other_identity.fingerprint().to_string())
3822            .expect("expected fingerprint accepted");
3823        let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
3824        let error = send_direct_file(&send_request, &sender_identity, |_| {})
3825            .await
3826            .expect_err("fingerprint mismatch should reject send");
3827
3828        assert!(matches!(error, Error::PeerFingerprintMismatch { .. }));
3829        let server_error = server
3830            .await
3831            .expect("server joined")
3832            .expect_err("server closed");
3833        assert!(matches!(
3834            server_error,
3835            Error::Connection(_) | Error::NoIncomingConnection
3836        ));
3837        assert!(
3838            !tokio::fs::try_exists(dest_dir.path().join("hello.txt"))
3839                .await
3840                .expect("destination checked")
3841        );
3842    }
3843
3844    #[tokio::test]
3845    async fn direct_transfer_accepts_matching_psk_before_payload() {
3846        let source_dir = tempfile::tempdir().expect("source tempdir");
3847        let dest_dir = tempfile::tempdir().expect("dest tempdir");
3848        let identity_dir = tempfile::tempdir().expect("identity tempdir");
3849        let file_path = source_dir.path().join("hello.txt");
3850        tokio::fs::write(&file_path, b"hello from ferry")
3851            .await
3852            .expect("source file written");
3853
3854        let identity =
3855            NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3856        let recv_identity = identity.clone();
3857        let reserved_socket =
3858            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3859        let recv_addr = reserved_socket.local_addr().expect("local addr");
3860        drop(reserved_socket);
3861        let dest_path = dest_dir.path().to_path_buf();
3862        let receive_request = ReceiveRequest::new(recv_addr)
3863            .with_psk("correct horse battery staple")
3864            .expect("receiver psk accepted");
3865        let server = tokio::spawn(async move {
3866            receive_direct_file(&receive_request, dest_path, &recv_identity, |_| {}).await
3867        });
3868
3869        tokio::time::sleep(Duration::from_millis(100)).await;
3870
3871        let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3872        let send_request = SendRequest::new(peer, vec![file_path])
3873            .expect("send request is valid")
3874            .with_psk("correct horse battery staple")
3875            .expect("sender psk accepted");
3876        let send_summary = send_direct_file(&send_request, &identity, |_| {})
3877            .await
3878            .expect("file sent");
3879        let receive_summary = server.await.expect("server joined").expect("file received");
3880
3881        let received = tokio::fs::read(dest_dir.path().join("hello.txt"))
3882            .await
3883            .expect("received file readable");
3884        assert_eq!(received, b"hello from ferry");
3885        assert_eq!(send_summary.bytes, receive_summary.bytes);
3886    }
3887
3888    #[tokio::test]
3889    async fn direct_transfer_rejects_missing_psk_before_payload() {
3890        let source_dir = tempfile::tempdir().expect("source tempdir");
3891        let dest_dir = tempfile::tempdir().expect("dest tempdir");
3892        let identity_dir = tempfile::tempdir().expect("identity tempdir");
3893        let file_path = source_dir.path().join("hello.txt");
3894        tokio::fs::write(&file_path, b"hello from ferry")
3895            .await
3896            .expect("source file written");
3897
3898        let identity =
3899            NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3900        let recv_identity = identity.clone();
3901        let reserved_socket =
3902            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3903        let recv_addr = reserved_socket.local_addr().expect("local addr");
3904        drop(reserved_socket);
3905        let dest_path = dest_dir.path().to_path_buf();
3906        let receive_request = ReceiveRequest::new(recv_addr)
3907            .with_psk("receiver secret")
3908            .expect("receiver psk accepted");
3909        let server = tokio::spawn(async move {
3910            receive_direct_file(&receive_request, dest_path, &recv_identity, |_| {}).await
3911        });
3912
3913        tokio::time::sleep(Duration::from_millis(100)).await;
3914
3915        let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3916        let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
3917        let error = send_direct_file(&send_request, &identity, |_| {})
3918            .await
3919            .expect_err("missing psk should reject send");
3920
3921        assert!(matches!(error, Error::Protocol(_)));
3922        let _ = server.await.expect("server joined");
3923        assert!(
3924            !tokio::fs::try_exists(dest_dir.path().join("hello.txt"))
3925                .await
3926                .expect("destination checked")
3927        );
3928    }
3929
3930    #[tokio::test]
3931    async fn direct_transfer_rejects_wrong_psk_before_payload() {
3932        let source_dir = tempfile::tempdir().expect("source tempdir");
3933        let dest_dir = tempfile::tempdir().expect("dest tempdir");
3934        let identity_dir = tempfile::tempdir().expect("identity tempdir");
3935        let file_path = source_dir.path().join("hello.txt");
3936        tokio::fs::write(&file_path, b"hello from ferry")
3937            .await
3938            .expect("source file written");
3939
3940        let identity =
3941            NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3942        let recv_identity = identity.clone();
3943        let reserved_socket =
3944            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3945        let recv_addr = reserved_socket.local_addr().expect("local addr");
3946        drop(reserved_socket);
3947        let dest_path = dest_dir.path().to_path_buf();
3948        let receive_request = ReceiveRequest::new(recv_addr)
3949            .with_psk("receiver secret")
3950            .expect("receiver psk accepted");
3951        let server = tokio::spawn(async move {
3952            receive_direct_file(&receive_request, dest_path, &recv_identity, |_| {}).await
3953        });
3954
3955        tokio::time::sleep(Duration::from_millis(100)).await;
3956
3957        let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3958        let send_request = SendRequest::new(peer, vec![file_path])
3959            .expect("send request is valid")
3960            .with_psk("wrong secret")
3961            .expect("sender psk accepted");
3962        let error = send_direct_file(&send_request, &identity, |_| {})
3963            .await
3964            .expect_err("wrong psk should reject send");
3965
3966        assert!(matches!(error, Error::Authentication(_)));
3967        assert!(!error.to_string().contains("wrong secret"));
3968        assert!(!error.to_string().contains("receiver secret"));
3969        let server_error = server
3970            .await
3971            .expect("server joined")
3972            .expect_err("server should reject wrong psk");
3973        assert!(matches!(
3974            server_error,
3975            Error::Authentication(_) | Error::Write(_)
3976        ));
3977        assert!(
3978            !tokio::fs::try_exists(dest_dir.path().join("hello.txt"))
3979                .await
3980                .expect("destination checked")
3981        );
3982    }
3983
3984    #[tokio::test]
3985    async fn direct_quic_loopback_sends_directory_and_resumes_partial_file() {
3986        let source_dir = tempfile::tempdir().expect("source tempdir");
3987        let dest_dir = tempfile::tempdir().expect("dest tempdir");
3988        let identity_dir = tempfile::tempdir().expect("identity tempdir");
3989        let bundle = source_dir.path().join("bundle");
3990        let nested = bundle.join("nested");
3991        tokio::fs::create_dir_all(&nested)
3992            .await
3993            .expect("source dirs");
3994        tokio::fs::write(nested.join("small.txt"), b"small")
3995            .await
3996            .expect("small file");
3997        let large_bytes = vec![7; (DEFAULT_CHUNK_SIZE as usize * 2) + 17];
3998        tokio::fs::write(bundle.join("large.bin"), &large_bytes)
3999            .await
4000            .expect("large file");
4001
4002        let dest_bundle = dest_dir.path().join("bundle");
4003        tokio::fs::create_dir_all(&dest_bundle)
4004            .await
4005            .expect("dest bundle");
4006        tokio::fs::write(
4007            dest_bundle.join("large.bin"),
4008            &large_bytes[..DEFAULT_CHUNK_SIZE as usize + 5],
4009        )
4010        .await
4011        .expect("partial large");
4012
4013        let identity =
4014            NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
4015        let recv_identity = identity.clone();
4016        let reserved_socket =
4017            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
4018        let recv_addr = reserved_socket.local_addr().expect("local addr");
4019        drop(reserved_socket);
4020        let dest_path = dest_dir.path().to_path_buf();
4021        let server = tokio::spawn(async move {
4022            receive_direct_file(
4023                &ReceiveRequest::new(recv_addr),
4024                dest_path,
4025                &recv_identity,
4026                |_| {},
4027            )
4028            .await
4029        });
4030
4031        tokio::time::sleep(Duration::from_millis(100)).await;
4032
4033        let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
4034        let send_request = SendRequest::new(peer, vec![bundle]).expect("send request is valid");
4035        let send_summary = send_direct_file(&send_request, &identity, |_| {})
4036            .await
4037            .expect("directory sent");
4038        let receive_summary = server
4039            .await
4040            .expect("server joined")
4041            .expect("directory received");
4042
4043        let received_large = tokio::fs::read(dest_bundle.join("large.bin"))
4044            .await
4045            .expect("large file readable");
4046        let received_small = tokio::fs::read(dest_bundle.join("nested/small.txt"))
4047            .await
4048            .expect("small file readable");
4049        assert_eq!(received_large, large_bytes);
4050        assert_eq!(received_small, b"small");
4051        assert_eq!(send_summary.bytes, receive_summary.bytes);
4052        assert!(
4053            receive_summary
4054                .files
4055                .iter()
4056                .any(|file| file.status == TransferFileStatus::Resumed)
4057        );
4058    }
4059
4060    #[tokio::test]
4061    async fn direct_quic_loopback_rejects_file_blocking_incoming_directory() {
4062        let source_dir = tempfile::tempdir().expect("source tempdir");
4063        let dest_dir = tempfile::tempdir().expect("dest tempdir");
4064        let identity_dir = tempfile::tempdir().expect("identity tempdir");
4065        let bundle = source_dir.path().join("bundle");
4066        let nested = bundle.join("nested");
4067        tokio::fs::create_dir_all(&nested)
4068            .await
4069            .expect("source dirs");
4070        tokio::fs::write(nested.join("payload.txt"), b"payload")
4071            .await
4072            .expect("source file");
4073
4074        tokio::fs::create_dir_all(dest_dir.path().join("bundle"))
4075            .await
4076            .expect("dest bundle dir");
4077        let conflicting_path = dest_dir.path().join("bundle/nested");
4078        tokio::fs::write(&conflicting_path, b"do not overwrite")
4079            .await
4080            .expect("conflicting dest file");
4081
4082        let identity =
4083            NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
4084        let recv_identity = identity.clone();
4085        let reserved_socket =
4086            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
4087        let recv_addr = reserved_socket.local_addr().expect("local addr");
4088        drop(reserved_socket);
4089        let dest_path = dest_dir.path().to_path_buf();
4090        let server = tokio::spawn(async move {
4091            receive_direct_file(
4092                &ReceiveRequest::new(recv_addr),
4093                dest_path,
4094                &recv_identity,
4095                |_| {},
4096            )
4097            .await
4098        });
4099
4100        tokio::time::sleep(Duration::from_millis(100)).await;
4101
4102        let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
4103        let send_request = SendRequest::new(peer, vec![bundle]).expect("send request is valid");
4104        let send_result = send_direct_file(&send_request, &identity, |_| {}).await;
4105        let receive_result = server.await.expect("server joined");
4106
4107        assert!(send_result.is_err());
4108        assert!(receive_result.is_err());
4109        let conflict = tokio::fs::read(&conflicting_path)
4110            .await
4111            .expect("conflicting dest file remains readable");
4112        assert_eq!(conflict, b"do not overwrite");
4113        assert!(!conflicting_path.is_dir());
4114    }
4115
4116    #[tokio::test]
4117    async fn direct_quic_loopback_cancels_send_and_does_not_finalize_partial_file() {
4118        let source_dir = tempfile::tempdir().expect("source tempdir");
4119        let dest_dir = tempfile::tempdir().expect("dest tempdir");
4120        let identity_dir = tempfile::tempdir().expect("identity tempdir");
4121        let file_path = source_dir.path().join("payload.bin");
4122        tokio::fs::write(&file_path, vec![3; IO_BUFFER_SIZE * 64])
4123            .await
4124            .expect("source file written");
4125
4126        let identity =
4127            NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
4128        let recv_identity = identity.clone();
4129        let reserved_socket =
4130            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
4131        let recv_addr = reserved_socket.local_addr().expect("local addr");
4132        drop(reserved_socket);
4133
4134        let dest_path = dest_dir.path().to_path_buf();
4135        let server = tokio::spawn(async move {
4136            receive_direct_file(
4137                &ReceiveRequest::new(recv_addr),
4138                dest_path,
4139                &recv_identity,
4140                |_| {},
4141            )
4142            .await
4143        });
4144
4145        tokio::time::sleep(Duration::from_millis(100)).await;
4146
4147        let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
4148        let send_request =
4149            SendRequest::new(peer, vec![file_path.clone()]).expect("send request is valid");
4150        let send_control = TransferControl::new();
4151        let cancel_on_progress = send_control.clone();
4152        let send_events = Arc::new(Mutex::new(Vec::new()));
4153        let send_events_log = send_events.clone();
4154        let send_result =
4155            send_direct_file_with_control(&send_request, &identity, send_control, |event| {
4156                if matches!(event, TransferEvent::Progress { .. }) {
4157                    cancel_on_progress.cancel();
4158                }
4159                send_events_log.lock().expect("events lock").push(event);
4160            })
4161            .await;
4162
4163        assert!(matches!(send_result, Err(Error::TransferCancelled)));
4164
4165        let receive_result = tokio::time::timeout(Duration::from_secs(5), server)
4166            .await
4167            .expect("receiver should finish after sender cancellation")
4168            .expect("server joined");
4169        assert!(receive_result.is_err());
4170
4171        assert!(!dest_dir.path().join("payload.bin").exists());
4172        assert!(
4173            send_events
4174                .lock()
4175                .expect("events lock")
4176                .iter()
4177                .any(|event| matches!(event, TransferEvent::SessionCancelled { .. }))
4178        );
4179    }
4180
4181    #[tokio::test]
4182    async fn direct_quic_loopback_cleans_partial_file_when_directory_receive_is_cancelled() {
4183        let source_dir = tempfile::tempdir().expect("source tempdir");
4184        let dest_dir = tempfile::tempdir().expect("dest tempdir");
4185        let identity_dir = tempfile::tempdir().expect("identity tempdir");
4186        let bundle = source_dir.path().join("bundle");
4187        let nested = bundle.join("nested");
4188        tokio::fs::create_dir_all(&nested)
4189            .await
4190            .expect("source dirs");
4191        tokio::fs::write(bundle.join("small.txt"), b"small")
4192            .await
4193            .expect("small file");
4194        tokio::fs::write(nested.join("payload.bin"), vec![5; IO_BUFFER_SIZE * 128])
4195            .await
4196            .expect("large source file");
4197
4198        let identity =
4199            NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
4200        let recv_identity = identity.clone();
4201        let reserved_socket =
4202            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
4203        let recv_addr = reserved_socket.local_addr().expect("local addr");
4204        drop(reserved_socket);
4205
4206        let receive_control = TransferControl::new();
4207        let cancel_receive = receive_control.clone();
4208        let receive_events = Arc::new(Mutex::new(Vec::new()));
4209        let receive_events_log = receive_events.clone();
4210        let dest_path = dest_dir.path().to_path_buf();
4211        let server = tokio::spawn(async move {
4212            receive_direct_file_with_control(
4213                &ReceiveRequest::new(recv_addr),
4214                dest_path,
4215                &recv_identity,
4216                receive_control,
4217                |event| {
4218                    if let TransferEvent::Progress { file_name, .. } = &event
4219                        && file_name == "bundle/nested/payload.bin"
4220                    {
4221                        cancel_receive.cancel();
4222                    }
4223                    receive_events_log.lock().expect("events lock").push(event);
4224                },
4225            )
4226            .await
4227        });
4228
4229        tokio::time::sleep(Duration::from_millis(100)).await;
4230
4231        let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
4232        let send_request = SendRequest::new(peer, vec![bundle]).expect("send request is valid");
4233        let send_result = send_direct_file(&send_request, &identity, |_| {}).await;
4234        let receive_result = tokio::time::timeout(Duration::from_secs(5), server)
4235            .await
4236            .expect("receiver should finish after cancellation")
4237            .expect("server joined");
4238
4239        assert!(send_result.is_err());
4240        assert!(matches!(receive_result, Err(Error::TransferCancelled)));
4241        assert!(
4242            receive_events
4243                .lock()
4244                .expect("events lock")
4245                .iter()
4246                .any(|event| matches!(event, TransferEvent::SessionCancelled { .. }))
4247        );
4248        assert!(
4249            !tokio::fs::try_exists(dest_dir.path().join("bundle/nested/payload.bin"))
4250                .await
4251                .expect("payload path checked")
4252        );
4253        let part_files = std::fs::read_dir(dest_dir.path().join("bundle/nested"))
4254            .expect("nested dest dir exists")
4255            .filter_map(|entry| entry.ok())
4256            .filter(|entry| entry.file_name().to_string_lossy().contains(".ferry-part"))
4257            .count();
4258        assert_eq!(part_files, 0);
4259    }
4260}