Skip to main content

ferry_core/
lib.rs

1pub mod error;
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::fs::Metadata;
5use std::io::SeekFrom;
6use std::net::SocketAddr;
7use std::path::{Component, Path, PathBuf};
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::{Arc, Mutex};
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12use directories::BaseDirs;
13use mdns_sd::{ResolvedService, ServiceDaemon, ServiceEvent, ServiceInfo};
14use quinn::crypto::rustls::QuicClientConfig;
15use rcgen::{CertifiedKey, generate_simple_self_signed};
16use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier};
17use rustls::crypto::{CryptoProvider, verify_tls12_signature, verify_tls13_signature};
18use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer, ServerName, UnixTime};
19use rustls::{DigitallySignedStruct, SignatureScheme};
20use serde::{Deserialize, Serialize};
21use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
22use uuid::Uuid;
23
24pub use error::{Error, Result};
25
26pub const NATIVE_PROTOCOL_VERSION: u16 = 1;
27pub const MIN_NATIVE_PROTOCOL_VERSION: u16 = 1;
28pub const NATIVE_SERVICE_TYPE: &str = "_ferry._udp.local.";
29
30const DIRECT_MAGIC: &[u8; 8] = b"FERRY01\n";
31const MAX_FRAME_LEN: usize = 16 * 1024 * 1024;
32const IO_BUFFER_SIZE: usize = 64 * 1024;
33const MANIFEST_VERSION: u16 = 1;
34const DEFAULT_CHUNK_SIZE: u64 = 1024 * 1024;
35const CONFIG_DIR_NAME: &str = "ferry";
36
37#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
38pub struct DirectPeer {
39    address: SocketAddr,
40    expected_fingerprint: Option<String>,
41}
42
43impl DirectPeer {
44    pub fn parse(value: &str) -> Result<Self> {
45        Ok(Self {
46            address: value.parse()?,
47            expected_fingerprint: None,
48        })
49    }
50
51    pub const fn address(&self) -> SocketAddr {
52        self.address
53    }
54
55    pub const fn from_address(address: SocketAddr) -> Self {
56        Self {
57            address,
58            expected_fingerprint: None,
59        }
60    }
61
62    pub fn with_expected_fingerprint(mut self, fingerprint: impl Into<String>) -> Result<Self> {
63        self.expected_fingerprint = Some(normalized_fingerprint(fingerprint.into())?);
64        Ok(self)
65    }
66
67    pub fn expected_fingerprint(&self) -> Option<&str> {
68        self.expected_fingerprint.as_deref()
69    }
70}
71
72#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
73pub struct SendRequest {
74    peer: DirectPeer,
75    paths: Vec<PathBuf>,
76}
77
78impl SendRequest {
79    pub fn new(peer: DirectPeer, paths: Vec<PathBuf>) -> Result<Self> {
80        if paths.is_empty() {
81            return Err(Error::InvalidInput(
82                "at least one file path is required".to_string(),
83            ));
84        }
85
86        Ok(Self { peer, paths })
87    }
88
89    pub const fn peer(&self) -> &DirectPeer {
90        &self.peer
91    }
92
93    pub fn paths(&self) -> &[PathBuf] {
94        &self.paths
95    }
96}
97
98#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
99pub struct ReceiveRequest {
100    listen: SocketAddr,
101}
102
103impl ReceiveRequest {
104    pub fn new(listen: SocketAddr) -> Self {
105        Self { listen }
106    }
107
108    pub const fn listen(&self) -> SocketAddr {
109        self.listen
110    }
111}
112
113#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
114pub struct AppConfig {
115    pub alias: String,
116    pub listen_port: u16,
117    pub quic_port: u16,
118    pub download_dir: String,
119    pub auto_accept_known: bool,
120    pub auto_accept_unknown: bool,
121    pub discovery: Vec<String>,
122    pub max_concurrent_files: usize,
123    pub trust: TrustConfig,
124}
125
126impl Default for AppConfig {
127    fn default() -> Self {
128        Self {
129            alias: default_alias(),
130            listen_port: 53317,
131            quic_port: 53318,
132            download_dir: "~/Downloads/ferry".to_string(),
133            auto_accept_known: true,
134            auto_accept_unknown: false,
135            discovery: vec!["native".to_string()],
136            max_concurrent_files: 8,
137            trust: TrustConfig::default(),
138        }
139    }
140}
141
142impl AppConfig {
143    pub fn load_or_default() -> Result<Self> {
144        Self::load_or_default_from(config_dir()?.join("config.toml"))
145    }
146
147    pub fn load_or_default_from(path: impl AsRef<Path>) -> Result<Self> {
148        let path = path.as_ref();
149        match std::fs::read_to_string(path) {
150            Ok(contents) => {
151                toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
152            }
153            Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Self::default()),
154            Err(source) => Err(Error::IoPath {
155                path: path.to_path_buf(),
156                source,
157            }),
158        }
159    }
160
161    pub fn save_default_path(&self) -> Result<()> {
162        self.save_to_path(config_dir()?.join("config.toml"))
163    }
164
165    pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
166        write_toml_file(path.as_ref(), self)
167    }
168
169    pub fn to_toml_string(&self) -> Result<String> {
170        toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
171    }
172
173    pub fn redacted(&self) -> Self {
174        let mut config = self.clone();
175        config.trust = config.trust.redacted();
176        config
177    }
178
179    pub fn to_redacted_toml_string(&self) -> Result<String> {
180        self.redacted().to_toml_string()
181    }
182}
183
184#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
185pub struct DaemonConfig {
186    pub listen: SocketAddr,
187    pub destination: PathBuf,
188}
189
190impl DaemonConfig {
191    pub fn from_app_config(config: &AppConfig) -> Result<Self> {
192        Ok(Self {
193            listen: SocketAddr::from(([0, 0, 0, 0], config.quic_port)),
194            destination: expand_home_path(&config.download_dir)?,
195        })
196    }
197
198    pub fn load_or_default() -> Result<Self> {
199        let app_config = AppConfig::load_or_default()?;
200        Self::load_or_default_from(config_dir()?.join("daemon.toml"), &app_config)
201    }
202
203    pub fn load_or_default_from(path: impl AsRef<Path>, app_config: &AppConfig) -> Result<Self> {
204        let path = path.as_ref();
205        match std::fs::read_to_string(path) {
206            Ok(contents) => {
207                toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
208            }
209            Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
210                Self::from_app_config(app_config)
211            }
212            Err(source) => Err(Error::IoPath {
213                path: path.to_path_buf(),
214                source,
215            }),
216        }
217    }
218
219    pub fn save_default_path(&self) -> Result<()> {
220        self.save_to_path(config_dir()?.join("daemon.toml"))
221    }
222
223    pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
224        write_toml_file(path.as_ref(), self)
225    }
226
227    pub fn to_toml_string(&self) -> Result<String> {
228        toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
229    }
230}
231
232#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
233pub struct TrustConfig {
234    pub require_fingerprint: bool,
235    pub psk: String,
236}
237
238impl Default for TrustConfig {
239    fn default() -> Self {
240        Self {
241            require_fingerprint: true,
242            psk: String::new(),
243        }
244    }
245}
246
247impl TrustConfig {
248    pub fn redacted(&self) -> Self {
249        let psk = if self.psk.is_empty() {
250            String::new()
251        } else {
252            "<redacted>".to_string()
253        };
254        Self {
255            require_fingerprint: self.require_fingerprint,
256            psk,
257        }
258    }
259}
260
261#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
262pub struct NativeIdentity {
263    cert_der: Vec<u8>,
264    key_der: Vec<u8>,
265    fingerprint: String,
266}
267
268impl NativeIdentity {
269    pub fn load_or_generate() -> Result<Self> {
270        Self::load_or_generate_in(config_dir()?)
271    }
272
273    pub fn load_or_generate_in(config_dir: impl AsRef<Path>) -> Result<Self> {
274        let config_dir = config_dir.as_ref();
275        let cert_path = config_dir.join("identity.cert.der");
276        let key_path = config_dir.join("identity.key.der");
277
278        if cert_path.exists() && key_path.exists() {
279            let cert_der = std::fs::read(&cert_path).map_err(|source| Error::IoPath {
280                path: cert_path,
281                source,
282            })?;
283            let key_der = std::fs::read(&key_path).map_err(|source| Error::IoPath {
284                path: key_path,
285                source,
286            })?;
287
288            return Ok(Self::from_parts(cert_der, key_der));
289        }
290
291        std::fs::create_dir_all(config_dir).map_err(|source| Error::IoPath {
292            path: config_dir.to_path_buf(),
293            source,
294        })?;
295
296        let identity = Self::generate()?;
297        write_private_file(&cert_path, &identity.cert_der)?;
298        write_private_file(&key_path, &identity.key_der)?;
299        Ok(identity)
300    }
301
302    pub fn generate() -> Result<Self> {
303        let CertifiedKey { cert, signing_key } =
304            generate_simple_self_signed(vec!["localhost".to_string()])?;
305        Ok(Self::from_parts(
306            cert.der().to_vec(),
307            signing_key.serialize_der(),
308        ))
309    }
310
311    pub fn fingerprint(&self) -> &str {
312        &self.fingerprint
313    }
314
315    fn cert_chain(&self) -> Vec<CertificateDer<'static>> {
316        vec![CertificateDer::from(self.cert_der.clone())]
317    }
318
319    fn private_key(&self) -> PrivateKeyDer<'static> {
320        PrivateKeyDer::from(PrivatePkcs8KeyDer::from(self.key_der.clone()))
321    }
322
323    fn from_parts(cert_der: Vec<u8>, key_der: Vec<u8>) -> Self {
324        let fingerprint = blake3::hash(&cert_der).to_hex().to_string();
325        Self {
326            cert_der,
327            key_der,
328            fingerprint,
329        }
330    }
331}
332
333#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
334#[serde(rename_all = "snake_case")]
335pub enum TransferDirection {
336    Send,
337    Receive,
338}
339
340#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
341pub struct TransferProgress {
342    pub direction: TransferDirection,
343    pub file_name: String,
344    pub bytes_done: u64,
345    pub bytes_total: u64,
346}
347
348#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
349#[serde(tag = "event", rename_all = "snake_case")]
350pub enum TransferEvent {
351    SessionStarted {
352        direction: TransferDirection,
353        session_id: Uuid,
354        files_total: usize,
355        bytes_total: u64,
356    },
357    FileStarted {
358        direction: TransferDirection,
359        file_name: String,
360        bytes_total: u64,
361        resume_offset: u64,
362    },
363    Progress {
364        direction: TransferDirection,
365        file_name: String,
366        bytes_done: u64,
367        bytes_total: u64,
368    },
369    FileFinished {
370        direction: TransferDirection,
371        file_name: String,
372        bytes: u64,
373        blake3: String,
374        status: TransferFileStatus,
375    },
376    SessionFinished {
377        direction: TransferDirection,
378        files_total: usize,
379        bytes_total: u64,
380        blake3: String,
381    },
382    SessionCancelled {
383        direction: TransferDirection,
384        session_id: Uuid,
385    },
386}
387
388impl TransferEvent {
389    pub fn progress(&self) -> Option<TransferProgress> {
390        match self {
391            Self::Progress {
392                direction,
393                file_name,
394                bytes_done,
395                bytes_total,
396            } => Some(TransferProgress {
397                direction: *direction,
398                file_name: file_name.clone(),
399                bytes_done: *bytes_done,
400                bytes_total: *bytes_total,
401            }),
402            _ => None,
403        }
404    }
405}
406
407#[derive(Debug, Clone, Default)]
408pub struct TransferControl {
409    cancelled: Arc<AtomicBool>,
410}
411
412impl TransferControl {
413    pub fn new() -> Self {
414        Self::default()
415    }
416
417    pub fn cancel(&self) {
418        self.cancelled.store(true, Ordering::SeqCst);
419    }
420
421    pub fn is_cancelled(&self) -> bool {
422        self.cancelled.load(Ordering::SeqCst)
423    }
424
425    fn check_cancelled(&self) -> Result<()> {
426        if self.is_cancelled() {
427            Err(Error::TransferCancelled)
428        } else {
429            Ok(())
430        }
431    }
432}
433
434#[derive(Debug, Clone, PartialEq, Eq)]
435pub struct TransferSummary {
436    pub path: PathBuf,
437    pub file_name: String,
438    pub bytes: u64,
439    pub blake3: String,
440    pub files: Vec<TransferFileSummary>,
441}
442
443#[derive(Debug, Clone, PartialEq, Eq)]
444pub struct TransferFileSummary {
445    pub path: PathBuf,
446    pub relative_path: PathBuf,
447    pub bytes: u64,
448    pub blake3: String,
449    pub status: TransferFileStatus,
450}
451
452#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
453#[serde(rename_all = "snake_case")]
454pub enum TransferFileStatus {
455    Sent,
456    Received,
457    Skipped,
458    Resumed,
459}
460
461#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
462pub struct TransferManifest {
463    pub version: u16,
464    pub session_id: Uuid,
465    pub chunk_size: u64,
466    pub entries: Vec<ManifestEntry>,
467}
468
469impl TransferManifest {
470    pub fn file_entries(&self) -> impl Iterator<Item = &ManifestEntry> {
471        self.entries
472            .iter()
473            .filter(|entry| entry.kind == ManifestEntryKind::File)
474    }
475
476    pub fn total_file_bytes(&self) -> u64 {
477        self.file_entries().map(|entry| entry.size).sum()
478    }
479
480    pub fn digest(&self) -> Result<String> {
481        let bytes = serde_json::to_vec(self).map_err(|error| Error::Protocol(error.to_string()))?;
482        Ok(blake3::hash(&bytes).to_hex().to_string())
483    }
484}
485
486#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
487pub struct ManifestEntry {
488    pub relative_path: PathBuf,
489    pub kind: ManifestEntryKind,
490    pub size: u64,
491    pub blake3: Option<String>,
492    pub chunks: Vec<String>,
493    pub unix_mode: Option<u32>,
494    pub modified_unix_ms: Option<i128>,
495}
496
497#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
498#[serde(rename_all = "snake_case")]
499pub enum ManifestEntryKind {
500    File,
501    Directory,
502}
503
504#[derive(Debug, Clone, PartialEq, Eq)]
505pub enum ResumeDecision {
506    Create,
507    Skip,
508    ResumeFrom(u64),
509    Conflict(String),
510}
511
512#[derive(Debug, Clone, PartialEq, Eq)]
513pub struct PeerObservation {
514    pub fingerprint: String,
515    pub alias: Option<String>,
516    pub hostname: Option<String>,
517    pub address: SocketAddr,
518    pub transports: BTreeSet<String>,
519    pub seen_unix_ms: i128,
520}
521
522impl PeerObservation {
523    pub fn new(fingerprint: impl Into<String>, address: SocketAddr, seen_unix_ms: i128) -> Self {
524        Self {
525            fingerprint: fingerprint.into(),
526            alias: None,
527            hostname: None,
528            address,
529            transports: BTreeSet::new(),
530            seen_unix_ms,
531        }
532    }
533
534    pub fn with_alias(mut self, alias: impl Into<String>) -> Self {
535        self.alias = Some(alias.into());
536        self
537    }
538
539    pub fn with_hostname(mut self, hostname: impl Into<String>) -> Self {
540        self.hostname = Some(hostname.into());
541        self
542    }
543
544    pub fn with_transport(mut self, transport: impl Into<String>) -> Self {
545        self.transports.insert(transport.into());
546        self
547    }
548}
549
550#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
551pub struct PeerRecord {
552    pub fingerprint: String,
553    pub aliases: BTreeSet<String>,
554    pub hostnames: BTreeSet<String>,
555    pub addresses: BTreeSet<SocketAddr>,
556    pub transports: BTreeSet<String>,
557    pub first_seen_unix_ms: i128,
558    pub last_seen_unix_ms: i128,
559}
560
561impl PeerRecord {
562    fn from_observation(observation: PeerObservation) -> Self {
563        let mut aliases = BTreeSet::new();
564        if let Some(alias) = observation.alias {
565            aliases.insert(alias);
566        }
567
568        let mut hostnames = BTreeSet::new();
569        if let Some(hostname) = observation.hostname {
570            hostnames.insert(hostname);
571        }
572
573        let mut addresses = BTreeSet::new();
574        addresses.insert(observation.address);
575
576        Self {
577            fingerprint: observation.fingerprint,
578            aliases,
579            hostnames,
580            addresses,
581            transports: observation.transports,
582            first_seen_unix_ms: observation.seen_unix_ms,
583            last_seen_unix_ms: observation.seen_unix_ms,
584        }
585    }
586
587    fn merge(&mut self, observation: PeerObservation) {
588        if let Some(alias) = observation.alias {
589            self.aliases.insert(alias);
590        }
591        if let Some(hostname) = observation.hostname {
592            self.hostnames.insert(hostname);
593        }
594        self.addresses.insert(observation.address);
595        self.transports.extend(observation.transports);
596        self.first_seen_unix_ms = self.first_seen_unix_ms.min(observation.seen_unix_ms);
597        self.last_seen_unix_ms = self.last_seen_unix_ms.max(observation.seen_unix_ms);
598    }
599
600    pub fn preferred_quic_address(&self) -> Option<SocketAddr> {
601        if !self.transports.contains("quic") {
602            return None;
603        }
604
605        self.addresses.iter().copied().next()
606    }
607}
608
609#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
610#[serde(rename_all = "snake_case")]
611pub enum TrustState {
612    Trusted,
613    Blocked,
614}
615
616#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
617pub struct KnownPeerEntry {
618    pub fingerprint: String,
619    #[serde(default)]
620    pub aliases: BTreeSet<String>,
621    #[serde(default)]
622    pub hostnames: BTreeSet<String>,
623    #[serde(default)]
624    pub addresses: BTreeSet<SocketAddr>,
625    #[serde(default)]
626    pub transports: BTreeSet<String>,
627    pub trust_state: TrustState,
628    pub first_seen_unix_ms: Option<i128>,
629    pub last_seen_unix_ms: Option<i128>,
630}
631
632impl KnownPeerEntry {
633    pub fn trusted(fingerprint: impl Into<String>) -> Result<Self> {
634        let fingerprint = normalized_fingerprint(fingerprint.into())?;
635        Ok(Self {
636            fingerprint,
637            aliases: BTreeSet::new(),
638            hostnames: BTreeSet::new(),
639            addresses: BTreeSet::new(),
640            transports: BTreeSet::new(),
641            trust_state: TrustState::Trusted,
642            first_seen_unix_ms: None,
643            last_seen_unix_ms: None,
644        })
645    }
646
647    pub fn from_record(record: &PeerRecord, trust_state: TrustState) -> Result<Self> {
648        let fingerprint = normalized_fingerprint(record.fingerprint.clone())?;
649        Ok(Self {
650            fingerprint,
651            aliases: record.aliases.clone(),
652            hostnames: record.hostnames.clone(),
653            addresses: record.addresses.clone(),
654            transports: record.transports.clone(),
655            trust_state,
656            first_seen_unix_ms: Some(record.first_seen_unix_ms),
657            last_seen_unix_ms: Some(record.last_seen_unix_ms),
658        })
659    }
660}
661
662#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
663pub struct TrustStore {
664    #[serde(default)]
665    pub peers: BTreeMap<String, KnownPeerEntry>,
666}
667
668impl TrustStore {
669    pub fn load_or_default() -> Result<Self> {
670        Self::load_or_default_from(config_dir()?.join("known_peers.toml"))
671    }
672
673    pub fn load_or_default_from(path: impl AsRef<Path>) -> Result<Self> {
674        let path = path.as_ref();
675        match std::fs::read_to_string(path) {
676            Ok(contents) => {
677                toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
678            }
679            Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Self::default()),
680            Err(source) => Err(Error::IoPath {
681                path: path.to_path_buf(),
682                source,
683            }),
684        }
685    }
686
687    pub fn save_default_path(&self) -> Result<()> {
688        self.save_to_path(config_dir()?.join("known_peers.toml"))
689    }
690
691    pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
692        write_toml_file(path.as_ref(), self)
693    }
694
695    pub fn trust_fingerprint(&mut self, fingerprint: impl Into<String>) -> Result<&KnownPeerEntry> {
696        let entry = KnownPeerEntry::trusted(fingerprint)?;
697        let fingerprint = entry.fingerprint.clone();
698        self.peers
699            .entry(fingerprint.clone())
700            .and_modify(|existing| existing.trust_state = TrustState::Trusted)
701            .or_insert(entry);
702        Ok(self
703            .peers
704            .get(&fingerprint)
705            .expect("trusted peer entry should exist"))
706    }
707
708    pub fn trust_record(&mut self, record: &PeerRecord) -> Result<&KnownPeerEntry> {
709        let entry = KnownPeerEntry::from_record(record, TrustState::Trusted)?;
710        let fingerprint = entry.fingerprint.clone();
711        self.peers
712            .entry(fingerprint.clone())
713            .and_modify(|existing| {
714                existing.aliases.extend(record.aliases.clone());
715                existing.hostnames.extend(record.hostnames.clone());
716                existing.addresses.extend(record.addresses.clone());
717                existing.transports.extend(record.transports.clone());
718                existing.first_seen_unix_ms = Some(
719                    existing
720                        .first_seen_unix_ms
721                        .unwrap_or(record.first_seen_unix_ms)
722                        .min(record.first_seen_unix_ms),
723                );
724                existing.last_seen_unix_ms = Some(
725                    existing
726                        .last_seen_unix_ms
727                        .unwrap_or(record.last_seen_unix_ms)
728                        .max(record.last_seen_unix_ms),
729                );
730                existing.trust_state = TrustState::Trusted;
731            })
732            .or_insert(entry);
733        Ok(self
734            .peers
735            .get(&fingerprint)
736            .expect("trusted peer entry should exist"))
737    }
738
739    pub fn forget(&mut self, fingerprint: &str) -> bool {
740        self.peers.remove(fingerprint).is_some()
741    }
742
743    pub fn records(&self) -> impl Iterator<Item = &KnownPeerEntry> {
744        self.peers.values()
745    }
746
747    pub fn to_toml_string(&self) -> Result<String> {
748        toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
749    }
750}
751
752#[derive(Debug, Default, Clone, PartialEq, Eq)]
753pub struct PeerRegistry {
754    peers: BTreeMap<String, PeerRecord>,
755}
756
757#[derive(Debug, Clone, PartialEq, Eq)]
758pub enum PeerLookup<'a> {
759    Found(&'a PeerRecord),
760    Ambiguous(Vec<&'a PeerRecord>),
761    Missing,
762}
763
764impl PeerRegistry {
765    pub fn new() -> Self {
766        Self::default()
767    }
768
769    pub fn observe(&mut self, observation: PeerObservation) -> Result<&PeerRecord> {
770        if observation.fingerprint.trim().is_empty() {
771            return Err(Error::InvalidInput(
772                "peer fingerprint must not be empty".to_string(),
773            ));
774        }
775
776        let fingerprint = observation.fingerprint.clone();
777        if let Some(record) = self.peers.get_mut(&fingerprint) {
778            record.merge(observation);
779        } else {
780            self.peers.insert(
781                fingerprint.clone(),
782                PeerRecord::from_observation(observation),
783            );
784        }
785
786        Ok(self
787            .peers
788            .get(&fingerprint)
789            .expect("observed peer record should exist"))
790    }
791
792    pub fn get_by_fingerprint(&self, fingerprint: &str) -> Option<&PeerRecord> {
793        self.peers.get(fingerprint)
794    }
795
796    pub fn lookup(&self, query: &str) -> Option<&PeerRecord> {
797        match self.lookup_detail(query) {
798            PeerLookup::Found(record) => Some(record),
799            PeerLookup::Ambiguous(_) | PeerLookup::Missing => None,
800        }
801    }
802
803    pub fn lookup_detail(&self, query: &str) -> PeerLookup<'_> {
804        if let Some(record) = self.peers.get(query) {
805            return PeerLookup::Found(record);
806        }
807
808        let matches = self
809            .peers
810            .values()
811            .filter(|record| {
812                record.fingerprint.starts_with(query)
813                    || record.aliases.iter().any(|alias| alias == query)
814                    || record.hostnames.iter().any(|hostname| hostname == query)
815            })
816            .collect::<Vec<_>>();
817
818        match matches.as_slice() {
819            [] => PeerLookup::Missing,
820            [record] => PeerLookup::Found(record),
821            _ => PeerLookup::Ambiguous(matches),
822        }
823    }
824
825    pub fn records(&self) -> impl Iterator<Item = &PeerRecord> {
826        self.peers.values()
827    }
828}
829
830#[derive(Debug, Clone, PartialEq, Eq)]
831pub struct NativeAnnouncement {
832    pub alias: String,
833    pub fingerprint: String,
834    pub quic_port: u16,
835    pub listen_port: Option<u16>,
836}
837
838impl NativeAnnouncement {
839    pub fn from_config(config: &AppConfig, identity: &NativeIdentity) -> Self {
840        Self {
841            alias: config.alias.clone(),
842            fingerprint: identity.fingerprint().to_string(),
843            quic_port: config.quic_port,
844            listen_port: Some(config.listen_port),
845        }
846    }
847
848    fn service_info(&self) -> Result<ServiceInfo> {
849        let alias = sanitize_dns_label(&self.alias);
850        let fingerprint_prefix = self
851            .fingerprint
852            .get(..12)
853            .unwrap_or(self.fingerprint.as_str());
854        let instance = format!("{alias}-{fingerprint_prefix}");
855        let hostname = format!("{alias}.local.");
856        let properties = [
857            ("pv", NATIVE_PROTOCOL_VERSION.to_string()),
858            ("minpv", MIN_NATIVE_PROTOCOL_VERSION.to_string()),
859            ("fp", self.fingerprint.clone()),
860            ("alias", self.alias.clone()),
861            ("transports", "quic".to_string()),
862            ("quic_port", self.quic_port.to_string()),
863            (
864                "listen_port",
865                self.listen_port
866                    .map(|port| port.to_string())
867                    .unwrap_or_default(),
868            ),
869        ];
870
871        ServiceInfo::new(
872            NATIVE_SERVICE_TYPE,
873            &instance,
874            &hostname,
875            "",
876            self.quic_port,
877            &properties[..],
878        )
879        .map(ServiceInfo::enable_addr_auto)
880        .map_err(|error| Error::Discovery(error.to_string()))
881    }
882}
883
884pub struct NativeAnnouncer {
885    daemon: ServiceDaemon,
886    fullname: String,
887}
888
889impl NativeAnnouncer {
890    pub fn start(announcement: &NativeAnnouncement) -> Result<Self> {
891        let daemon = ServiceDaemon::new().map_err(|error| Error::Discovery(error.to_string()))?;
892        let service = announcement.service_info()?;
893        let fullname = service.get_fullname().to_string();
894        daemon
895            .register(service)
896            .map_err(|error| Error::Discovery(error.to_string()))?;
897        Ok(Self { daemon, fullname })
898    }
899}
900
901impl Drop for NativeAnnouncer {
902    fn drop(&mut self) {
903        let _ = self.daemon.unregister(&self.fullname);
904        let _ = self.daemon.shutdown();
905    }
906}
907
908pub async fn discover_native_peers(timeout: Duration) -> Result<PeerRegistry> {
909    let daemon = ServiceDaemon::new().map_err(|error| Error::Discovery(error.to_string()))?;
910    let receiver = daemon
911        .browse(NATIVE_SERVICE_TYPE)
912        .map_err(|error| Error::Discovery(error.to_string()))?;
913    let deadline = tokio::time::Instant::now() + timeout;
914    let mut registry = PeerRegistry::new();
915
916    loop {
917        let now = tokio::time::Instant::now();
918        if now >= deadline {
919            break;
920        }
921
922        let wait = deadline - now;
923        match tokio::time::timeout(wait, receiver.recv_async()).await {
924            Ok(Ok(ServiceEvent::ServiceResolved(service))) => {
925                observe_resolved_service(&mut registry, &service)?;
926            }
927            Ok(Ok(_)) => {}
928            Ok(Err(error)) => {
929                return Err(Error::Discovery(error.to_string()));
930            }
931            Err(_) => break,
932        }
933    }
934
935    daemon
936        .stop_browse(NATIVE_SERVICE_TYPE)
937        .map_err(|error| Error::Discovery(error.to_string()))?;
938    let _ = daemon.shutdown();
939    Ok(registry)
940}
941
942fn observe_resolved_service(registry: &mut PeerRegistry, service: &ResolvedService) -> Result<()> {
943    let Some(fingerprint) = service.get_property_val_str("fp") else {
944        return Ok(());
945    };
946    let fingerprint = match normalized_fingerprint(fingerprint.to_string()) {
947        Ok(fingerprint) => fingerprint,
948        Err(_) => return Ok(()),
949    };
950    let Some(highest_version) = parse_txt_u16(service, "pv") else {
951        return Ok(());
952    };
953    let Some(minimum_version) = parse_txt_u16(service, "minpv") else {
954        return Ok(());
955    };
956    if minimum_version > highest_version
957        || minimum_version > NATIVE_PROTOCOL_VERSION
958        || highest_version < MIN_NATIVE_PROTOCOL_VERSION
959    {
960        return Ok(());
961    }
962
963    let Some(quic_port) = parse_txt_u16(service, "quic_port") else {
964        return Ok(());
965    };
966    let transports = service
967        .get_property_val_str("transports")
968        .unwrap_or_default()
969        .split(',')
970        .map(str::trim)
971        .filter(|value| !value.is_empty())
972        .map(ToOwned::to_owned)
973        .collect::<Vec<_>>();
974    if !transports.iter().any(|transport| transport == "quic") {
975        return Ok(());
976    }
977
978    let alias = service.get_property_val_str("alias").map(ToOwned::to_owned);
979    let hostname = Some(service.get_hostname().trim_end_matches('.').to_string());
980    let seen_unix_ms = system_time_unix_ms(SystemTime::now()).unwrap_or_default();
981
982    for address in service.get_addresses_v4() {
983        let mut observation = PeerObservation::new(
984            fingerprint.clone(),
985            SocketAddr::from((address, quic_port)),
986            seen_unix_ms,
987        );
988        if let Some(alias) = &alias {
989            observation = observation.with_alias(alias.clone());
990        }
991        if let Some(hostname) = &hostname {
992            observation = observation.with_hostname(hostname.clone());
993        }
994        for transport in &transports {
995            observation = observation.with_transport(transport.clone());
996        }
997        registry.observe(observation)?;
998    }
999
1000    Ok(())
1001}
1002
1003fn parse_txt_u16(service: &ResolvedService, key: &str) -> Option<u16> {
1004    service.get_property_val_str(key)?.parse().ok()
1005}
1006
1007fn sanitize_dns_label(value: &str) -> String {
1008    let mut label = value
1009        .chars()
1010        .map(|ch| {
1011            if ch.is_ascii_alphanumeric() || ch == '-' {
1012                ch.to_ascii_lowercase()
1013            } else {
1014                '-'
1015            }
1016        })
1017        .collect::<String>();
1018    while label.contains("--") {
1019        label = label.replace("--", "-");
1020    }
1021    let label = label.trim_matches('-').to_string();
1022    if label.is_empty() {
1023        "fileferry-device".to_string()
1024    } else {
1025        label
1026    }
1027}
1028
1029#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1030struct DirectReceivePlan {
1031    files: Vec<DirectFilePlan>,
1032}
1033
1034#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1035struct DirectProtocolHello {
1036    protocol_version: u16,
1037    min_protocol_version: u16,
1038    client_fingerprint: String,
1039}
1040
1041impl DirectProtocolHello {
1042    fn new(identity: &NativeIdentity) -> Self {
1043        Self {
1044            protocol_version: NATIVE_PROTOCOL_VERSION,
1045            min_protocol_version: MIN_NATIVE_PROTOCOL_VERSION,
1046            client_fingerprint: identity.fingerprint().to_string(),
1047        }
1048    }
1049}
1050
1051#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1052struct DirectProtocolResponse {
1053    accepted: bool,
1054    protocol_version: Option<u16>,
1055    error: Option<String>,
1056}
1057
1058impl DirectProtocolResponse {
1059    fn accept(protocol_version: u16) -> Self {
1060        Self {
1061            accepted: true,
1062            protocol_version: Some(protocol_version),
1063            error: None,
1064        }
1065    }
1066
1067    fn reject(error: impl Into<String>) -> Self {
1068        Self {
1069            accepted: false,
1070            protocol_version: None,
1071            error: Some(error.into()),
1072        }
1073    }
1074}
1075
1076#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1077struct DirectFilePlan {
1078    relative_path: PathBuf,
1079    action: DirectFileAction,
1080    offset: u64,
1081}
1082
1083#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1084#[serde(rename_all = "snake_case")]
1085enum DirectFileAction {
1086    Receive,
1087    Skip,
1088}
1089
1090#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1091struct DirectPayloadHeader {
1092    relative_path: PathBuf,
1093    offset: u64,
1094    bytes: u64,
1095}
1096
1097#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1098struct DirectAck {
1099    ok: bool,
1100    error: Option<String>,
1101}
1102
1103pub fn validate_send_paths(paths: &[PathBuf]) -> Result<()> {
1104    if paths.is_empty() {
1105        return Err(Error::InvalidInput(
1106            "at least one file path is required".to_string(),
1107        ));
1108    }
1109
1110    for path in paths {
1111        if path.as_os_str().is_empty() {
1112            return Err(Error::InvalidInput("empty file path".to_string()));
1113        }
1114    }
1115
1116    Ok(())
1117}
1118
1119pub fn display_path(path: &Path) -> String {
1120    path.to_string_lossy().into_owned()
1121}
1122
1123pub async fn build_manifest(paths: &[PathBuf]) -> Result<TransferManifest> {
1124    validate_send_paths(paths)?;
1125
1126    let sources = collect_manifest_sources(paths)?;
1127    let mut entries = Vec::with_capacity(sources.len());
1128    let mut seen = BTreeSet::new();
1129
1130    for source in sources {
1131        if !seen.insert(source.relative_path.clone()) {
1132            return Err(Error::InvalidInput(format!(
1133                "duplicate transfer path: {}",
1134                display_path(&source.relative_path)
1135            )));
1136        }
1137
1138        entries.push(build_manifest_entry(source).await?);
1139    }
1140
1141    Ok(TransferManifest {
1142        version: MANIFEST_VERSION,
1143        session_id: Uuid::new_v4(),
1144        chunk_size: DEFAULT_CHUNK_SIZE,
1145        entries,
1146    })
1147}
1148
1149pub fn safe_destination_path(destination: &Path, relative_path: &Path) -> Result<PathBuf> {
1150    validate_relative_transfer_path(relative_path)?;
1151    Ok(destination.join(relative_path))
1152}
1153
1154pub async fn decide_resume(
1155    destination: &Path,
1156    entry: &ManifestEntry,
1157    chunk_size: u64,
1158) -> Result<ResumeDecision> {
1159    if entry.kind != ManifestEntryKind::File {
1160        return Ok(ResumeDecision::Create);
1161    }
1162
1163    let path = safe_destination_path(destination, &entry.relative_path)?;
1164    let metadata = match tokio::fs::metadata(&path).await {
1165        Ok(metadata) => metadata,
1166        Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
1167            return Ok(ResumeDecision::Create);
1168        }
1169        Err(source) => {
1170            return Err(Error::IoPath { path, source });
1171        }
1172    };
1173
1174    if !metadata.is_file() {
1175        return Ok(ResumeDecision::Conflict(format!(
1176            "{} already exists and is not a regular file",
1177            display_path(&path)
1178        )));
1179    }
1180
1181    if metadata.len() == entry.size {
1182        let expected = entry
1183            .blake3
1184            .as_deref()
1185            .ok_or_else(|| Error::Protocol("file manifest entry is missing BLAKE3".to_string()))?;
1186        let actual = hash_file(&path).await?;
1187        return if actual == expected {
1188            Ok(ResumeDecision::Skip)
1189        } else {
1190            Ok(ResumeDecision::Conflict(format!(
1191                "{} already exists with different contents",
1192                display_path(&path)
1193            )))
1194        };
1195    }
1196
1197    if metadata.len() > entry.size {
1198        return Ok(ResumeDecision::Conflict(format!(
1199            "{} is larger than incoming file",
1200            display_path(&path)
1201        )));
1202    }
1203
1204    let verified_offset = verified_prefix_offset(&path, entry, metadata.len(), chunk_size).await?;
1205    if verified_offset > 0 {
1206        Ok(ResumeDecision::ResumeFrom(verified_offset))
1207    } else {
1208        Ok(ResumeDecision::Conflict(format!(
1209            "{} already exists and does not match a verified prefix",
1210            display_path(&path)
1211        )))
1212    }
1213}
1214
1215pub async fn send_direct_file(
1216    request: &SendRequest,
1217    identity: &NativeIdentity,
1218    mut events: impl FnMut(TransferEvent),
1219) -> Result<TransferSummary> {
1220    send_direct_file_with_control(request, identity, TransferControl::new(), &mut events).await
1221}
1222
1223pub async fn send_direct_file_with_control(
1224    request: &SendRequest,
1225    identity: &NativeIdentity,
1226    control: TransferControl,
1227    mut events: impl FnMut(TransferEvent),
1228) -> Result<TransferSummary> {
1229    ensure_rustls_provider();
1230
1231    control.check_cancelled()?;
1232    let manifest = build_manifest(request.paths()).await?;
1233    let sources = manifest_source_map(request.paths())?;
1234    events(TransferEvent::SessionStarted {
1235        direction: TransferDirection::Send,
1236        session_id: manifest.session_id,
1237        files_total: manifest.file_entries().count(),
1238        bytes_total: manifest.total_file_bytes(),
1239    });
1240    control.check_cancelled()?;
1241
1242    let (client_config, server_fingerprint) = client_config_capturing_server_fingerprint()?;
1243    let mut endpoint = quinn::Endpoint::client(SocketAddr::from(([0, 0, 0, 0], 0)))?;
1244    endpoint.set_default_client_config(client_config);
1245    let connection = endpoint
1246        .connect(request.peer().address(), "localhost")?
1247        .await?;
1248    if let Some(expected) = request.peer().expected_fingerprint() {
1249        let actual = captured_server_fingerprint(&server_fingerprint)?;
1250        if actual != expected {
1251            connection.close(1u32.into(), b"fingerprint mismatch");
1252            return Err(Error::PeerFingerprintMismatch {
1253                expected: expected.to_string(),
1254                actual,
1255            });
1256        }
1257    }
1258    let (mut send, mut recv) = connection.open_bi().await?;
1259
1260    send.write_all(DIRECT_MAGIC).await?;
1261    write_json_frame(&mut send, &DirectProtocolHello::new(identity)).await?;
1262    let protocol: DirectProtocolResponse = read_json_frame(&mut recv).await?;
1263    if !protocol.accepted {
1264        return Err(Error::Protocol(protocol.error.unwrap_or_else(|| {
1265            "receiver rejected protocol negotiation".to_string()
1266        })));
1267    }
1268    if protocol.protocol_version != Some(NATIVE_PROTOCOL_VERSION) {
1269        return Err(Error::Protocol(format!(
1270            "receiver selected unsupported protocol version {:?}",
1271            protocol.protocol_version
1272        )));
1273    }
1274
1275    write_json_frame(&mut send, &manifest).await?;
1276
1277    let plan: DirectReceivePlan = read_json_frame(&mut recv).await?;
1278    let mut summaries = Vec::new();
1279    for file_plan in plan.files {
1280        let Some(entry) = manifest
1281            .file_entries()
1282            .find(|entry| entry.relative_path == file_plan.relative_path)
1283        else {
1284            return Err(Error::Protocol(format!(
1285                "receiver requested unknown file: {}",
1286                display_path(&file_plan.relative_path)
1287            )));
1288        };
1289
1290        let Some(source_path) = sources.get(&file_plan.relative_path) else {
1291            return Err(Error::Protocol(format!(
1292                "missing source for manifest file: {}",
1293                display_path(&file_plan.relative_path)
1294            )));
1295        };
1296
1297        if file_plan.action == DirectFileAction::Skip {
1298            let blake3 = entry.blake3.clone().unwrap_or_default();
1299            events(TransferEvent::FileFinished {
1300                direction: TransferDirection::Send,
1301                file_name: display_path(&entry.relative_path),
1302                bytes: entry.size,
1303                blake3: blake3.clone(),
1304                status: TransferFileStatus::Skipped,
1305            });
1306            summaries.push(TransferFileSummary {
1307                path: source_path.clone(),
1308                relative_path: entry.relative_path.clone(),
1309                bytes: entry.size,
1310                blake3,
1311                status: TransferFileStatus::Skipped,
1312            });
1313            continue;
1314        }
1315
1316        match send_payload_file(
1317            &mut send,
1318            source_path,
1319            entry,
1320            file_plan.offset,
1321            &control,
1322            &mut events,
1323        )
1324        .await
1325        {
1326            Ok(()) => {}
1327            Err(Error::TransferCancelled) => {
1328                events(TransferEvent::SessionCancelled {
1329                    direction: TransferDirection::Send,
1330                    session_id: manifest.session_id,
1331                });
1332                connection.close(1u32.into(), b"cancelled");
1333                return Err(Error::TransferCancelled);
1334            }
1335            Err(error) => return Err(error),
1336        }
1337        let status = if file_plan.offset > 0 {
1338            TransferFileStatus::Resumed
1339        } else {
1340            TransferFileStatus::Sent
1341        };
1342        let blake3 = entry.blake3.clone().unwrap_or_default();
1343        events(TransferEvent::FileFinished {
1344            direction: TransferDirection::Send,
1345            file_name: display_path(&entry.relative_path),
1346            bytes: entry.size,
1347            blake3: blake3.clone(),
1348            status,
1349        });
1350        summaries.push(TransferFileSummary {
1351            path: source_path.clone(),
1352            relative_path: entry.relative_path.clone(),
1353            bytes: entry.size,
1354            blake3,
1355            status,
1356        });
1357    }
1358    send.finish()?;
1359
1360    let ack: DirectAck = read_json_frame(&mut recv).await?;
1361    if !ack.ok {
1362        return Err(Error::Transfer(
1363            ack.error
1364                .unwrap_or_else(|| "receiver rejected transfer".to_string()),
1365        ));
1366    }
1367
1368    connection.close(0u32.into(), b"done");
1369    endpoint.wait_idle().await;
1370
1371    let summary = summary_from_files(&manifest, summaries)?;
1372    events(TransferEvent::SessionFinished {
1373        direction: TransferDirection::Send,
1374        files_total: summary.files.len(),
1375        bytes_total: summary.bytes,
1376        blake3: summary.blake3.clone(),
1377    });
1378    Ok(summary)
1379}
1380
1381pub async fn receive_direct_file(
1382    request: &ReceiveRequest,
1383    destination: impl AsRef<Path>,
1384    identity: &NativeIdentity,
1385    events: impl FnMut(TransferEvent),
1386) -> Result<TransferSummary> {
1387    receive_direct_file_with_control(
1388        request,
1389        destination,
1390        identity,
1391        TransferControl::new(),
1392        events,
1393    )
1394    .await
1395}
1396
1397pub async fn receive_direct_file_with_control(
1398    request: &ReceiveRequest,
1399    destination: impl AsRef<Path>,
1400    identity: &NativeIdentity,
1401    control: TransferControl,
1402    mut events: impl FnMut(TransferEvent),
1403) -> Result<TransferSummary> {
1404    ensure_rustls_provider();
1405
1406    control.check_cancelled()?;
1407    let destination = destination.as_ref();
1408    tokio::fs::create_dir_all(destination)
1409        .await
1410        .map_err(|source| Error::IoPath {
1411            path: destination.to_path_buf(),
1412            source,
1413        })?;
1414
1415    let server_config =
1416        quinn::ServerConfig::with_single_cert(identity.cert_chain(), identity.private_key())?;
1417    let endpoint = quinn::Endpoint::server(server_config, request.listen())?;
1418    let incoming = endpoint.accept().await.ok_or(Error::NoIncomingConnection)?;
1419    let connection = incoming.await?;
1420    let (mut send, mut recv) = connection.accept_bi().await?;
1421
1422    let result =
1423        receive_stream_file(destination, &mut recv, &mut send, &control, &mut events).await;
1424    let ack = match &result {
1425        Ok(_) => DirectAck {
1426            ok: true,
1427            error: None,
1428        },
1429        Err(error) => DirectAck {
1430            ok: false,
1431            error: Some(error.to_string()),
1432        },
1433    };
1434    write_json_frame(&mut send, &ack).await?;
1435    send.finish()?;
1436    let _ = tokio::time::timeout(Duration::from_secs(1), connection.closed()).await;
1437    endpoint.close(0u32.into(), b"done");
1438    endpoint.wait_idle().await;
1439
1440    result
1441}
1442
1443async fn receive_stream_file(
1444    destination: &Path,
1445    recv: &mut quinn::RecvStream,
1446    send: &mut quinn::SendStream,
1447    control: &TransferControl,
1448    events: &mut impl FnMut(TransferEvent),
1449) -> Result<TransferSummary> {
1450    let mut magic = [0; DIRECT_MAGIC.len()];
1451    recv.read_exact(&mut magic).await?;
1452    if &magic != DIRECT_MAGIC {
1453        return Err(Error::Protocol("bad direct-transfer magic".to_string()));
1454    }
1455
1456    let hello: DirectProtocolHello = read_json_frame(recv).await?;
1457    let protocol = negotiate_direct_protocol(&hello);
1458    write_json_frame(send, &protocol).await?;
1459    if !protocol.accepted {
1460        return Err(Error::Protocol(protocol.error.unwrap_or_else(|| {
1461            "unsupported direct protocol version".to_string()
1462        })));
1463    }
1464
1465    let manifest: TransferManifest = read_json_frame(recv).await?;
1466    validate_manifest(&manifest)?;
1467    events(TransferEvent::SessionStarted {
1468        direction: TransferDirection::Receive,
1469        session_id: manifest.session_id,
1470        files_total: manifest.file_entries().count(),
1471        bytes_total: manifest.total_file_bytes(),
1472    });
1473    control.check_cancelled()?;
1474
1475    for entry in &manifest.entries {
1476        if entry.kind == ManifestEntryKind::Directory {
1477            let path = safe_destination_path(destination, &entry.relative_path)?;
1478            tokio::fs::create_dir_all(&path)
1479                .await
1480                .map_err(|source| Error::IoPath { path, source })?;
1481        }
1482    }
1483
1484    let mut plan = DirectReceivePlan { files: Vec::new() };
1485    for entry in manifest.file_entries() {
1486        let decision = decide_resume(destination, entry, manifest.chunk_size).await?;
1487        let (action, offset) = match decision {
1488            ResumeDecision::Create => (DirectFileAction::Receive, 0),
1489            ResumeDecision::Skip => (DirectFileAction::Skip, entry.size),
1490            ResumeDecision::ResumeFrom(offset) => (DirectFileAction::Receive, offset),
1491            ResumeDecision::Conflict(message) => return Err(Error::InvalidInput(message)),
1492        };
1493        plan.files.push(DirectFilePlan {
1494            relative_path: entry.relative_path.clone(),
1495            action,
1496            offset,
1497        });
1498    }
1499    write_json_frame(send, &plan).await?;
1500
1501    let mut summaries = Vec::new();
1502    for file_plan in &plan.files {
1503        let Some(entry) = manifest
1504            .file_entries()
1505            .find(|entry| entry.relative_path == file_plan.relative_path)
1506        else {
1507            return Err(Error::Protocol(format!(
1508                "planned file missing from manifest: {}",
1509                display_path(&file_plan.relative_path)
1510            )));
1511        };
1512
1513        let final_path = safe_destination_path(destination, &entry.relative_path)?;
1514        if file_plan.action == DirectFileAction::Skip {
1515            let blake3 = entry.blake3.clone().unwrap_or_default();
1516            events(TransferEvent::FileFinished {
1517                direction: TransferDirection::Receive,
1518                file_name: display_path(&entry.relative_path),
1519                bytes: entry.size,
1520                blake3: blake3.clone(),
1521                status: TransferFileStatus::Skipped,
1522            });
1523            summaries.push(TransferFileSummary {
1524                path: final_path,
1525                relative_path: entry.relative_path.clone(),
1526                bytes: entry.size,
1527                blake3,
1528                status: TransferFileStatus::Skipped,
1529            });
1530            continue;
1531        }
1532
1533        match receive_payload_file(recv, destination, entry, file_plan.offset, control, events)
1534            .await
1535        {
1536            Ok(()) => {}
1537            Err(Error::TransferCancelled) => {
1538                events(TransferEvent::SessionCancelled {
1539                    direction: TransferDirection::Receive,
1540                    session_id: manifest.session_id,
1541                });
1542                return Err(Error::TransferCancelled);
1543            }
1544            Err(error) => return Err(error),
1545        }
1546        let actual_hash = hash_file(&final_path).await?;
1547        let expected_hash = entry
1548            .blake3
1549            .as_deref()
1550            .ok_or_else(|| Error::Protocol("file manifest entry is missing BLAKE3".to_string()))?;
1551        if actual_hash != expected_hash {
1552            return Err(Error::Transfer(format!(
1553                "BLAKE3 hash mismatch for {}",
1554                display_path(&entry.relative_path)
1555            )));
1556        }
1557
1558        let status = if file_plan.offset > 0 {
1559            TransferFileStatus::Resumed
1560        } else {
1561            TransferFileStatus::Received
1562        };
1563        events(TransferEvent::FileFinished {
1564            direction: TransferDirection::Receive,
1565            file_name: display_path(&entry.relative_path),
1566            bytes: entry.size,
1567            blake3: actual_hash.clone(),
1568            status,
1569        });
1570        summaries.push(TransferFileSummary {
1571            path: final_path,
1572            relative_path: entry.relative_path.clone(),
1573            bytes: entry.size,
1574            blake3: actual_hash,
1575            status,
1576        });
1577    }
1578
1579    let summary = summary_from_files(&manifest, summaries)?;
1580    events(TransferEvent::SessionFinished {
1581        direction: TransferDirection::Receive,
1582        files_total: summary.files.len(),
1583        bytes_total: summary.bytes,
1584        blake3: summary.blake3.clone(),
1585    });
1586    Ok(summary)
1587}
1588
1589async fn send_payload_file(
1590    send: &mut quinn::SendStream,
1591    source_path: &Path,
1592    entry: &ManifestEntry,
1593    offset: u64,
1594    control: &TransferControl,
1595    events: &mut impl FnMut(TransferEvent),
1596) -> Result<()> {
1597    if offset > entry.size {
1598        return Err(Error::Protocol(format!(
1599            "resume offset exceeds file size for {}",
1600            display_path(&entry.relative_path)
1601        )));
1602    }
1603
1604    let header = DirectPayloadHeader {
1605        relative_path: entry.relative_path.clone(),
1606        offset,
1607        bytes: entry.size - offset,
1608    };
1609    write_json_frame(send, &header).await?;
1610    events(TransferEvent::FileStarted {
1611        direction: TransferDirection::Send,
1612        file_name: display_path(&entry.relative_path),
1613        bytes_total: entry.size,
1614        resume_offset: offset,
1615    });
1616
1617    let mut file = tokio::fs::File::open(source_path)
1618        .await
1619        .map_err(|source| Error::IoPath {
1620            path: source_path.to_path_buf(),
1621            source,
1622        })?;
1623    file.seek(SeekFrom::Start(offset))
1624        .await
1625        .map_err(|source| Error::IoPath {
1626            path: source_path.to_path_buf(),
1627            source,
1628        })?;
1629
1630    let mut buffer = vec![0; IO_BUFFER_SIZE];
1631    let mut bytes_done = offset;
1632    while bytes_done < entry.size {
1633        control.check_cancelled()?;
1634        let remaining = entry.size - bytes_done;
1635        let read_size = buffer.len().min(remaining as usize);
1636        let read = file
1637            .read(&mut buffer[..read_size])
1638            .await
1639            .map_err(|source| Error::IoPath {
1640                path: source_path.to_path_buf(),
1641                source,
1642            })?;
1643        if read == 0 {
1644            return Err(Error::Protocol(format!(
1645                "source file ended early: {}",
1646                display_path(source_path)
1647            )));
1648        }
1649        send.write_all(&buffer[..read]).await?;
1650        bytes_done += read as u64;
1651        events(TransferEvent::Progress {
1652            direction: TransferDirection::Send,
1653            file_name: display_path(&entry.relative_path),
1654            bytes_done,
1655            bytes_total: entry.size,
1656        });
1657        control.check_cancelled()?;
1658    }
1659
1660    Ok(())
1661}
1662
1663async fn receive_payload_file(
1664    recv: &mut quinn::RecvStream,
1665    destination: &Path,
1666    entry: &ManifestEntry,
1667    offset: u64,
1668    control: &TransferControl,
1669    events: &mut impl FnMut(TransferEvent),
1670) -> Result<()> {
1671    let header: DirectPayloadHeader = read_json_frame(recv).await?;
1672    if header.relative_path != entry.relative_path
1673        || header.offset != offset
1674        || header.bytes != entry.size - offset
1675    {
1676        return Err(Error::Protocol(format!(
1677            "payload header does not match receive plan for {}",
1678            display_path(&entry.relative_path)
1679        )));
1680    }
1681    events(TransferEvent::FileStarted {
1682        direction: TransferDirection::Receive,
1683        file_name: display_path(&entry.relative_path),
1684        bytes_total: entry.size,
1685        resume_offset: offset,
1686    });
1687
1688    let final_path = safe_destination_path(destination, &entry.relative_path)?;
1689    if let Some(parent) = final_path.parent() {
1690        tokio::fs::create_dir_all(parent)
1691            .await
1692            .map_err(|source| Error::IoPath {
1693                path: parent.to_path_buf(),
1694                source,
1695            })?;
1696    }
1697
1698    let write_path = if offset == 0 {
1699        temp_path_for(&final_path)
1700    } else {
1701        final_path.clone()
1702    };
1703
1704    let mut file = if offset == 0 {
1705        tokio::fs::File::create(&write_path)
1706            .await
1707            .map_err(|source| Error::IoPath {
1708                path: write_path.clone(),
1709                source,
1710            })?
1711    } else {
1712        let mut file = tokio::fs::OpenOptions::new()
1713            .write(true)
1714            .open(&write_path)
1715            .await
1716            .map_err(|source| Error::IoPath {
1717                path: write_path.clone(),
1718                source,
1719            })?;
1720        file.set_len(offset).await.map_err(|source| Error::IoPath {
1721            path: write_path.clone(),
1722            source,
1723        })?;
1724        file.seek(SeekFrom::Start(offset))
1725            .await
1726            .map_err(|source| Error::IoPath {
1727                path: write_path.clone(),
1728                source,
1729            })?;
1730        file
1731    };
1732
1733    let mut bytes_done = offset;
1734    let mut bytes_remaining = header.bytes;
1735    let mut buffer = vec![0; IO_BUFFER_SIZE];
1736    while bytes_remaining > 0 {
1737        if control.is_cancelled() {
1738            if offset == 0 {
1739                let _ = tokio::fs::remove_file(&write_path).await;
1740            }
1741            return Err(Error::TransferCancelled);
1742        }
1743        let read_size = buffer.len().min(bytes_remaining as usize);
1744        let read = match recv.read(&mut buffer[..read_size]).await {
1745            Ok(Some(read)) => read,
1746            Ok(None) => {
1747                if offset == 0 {
1748                    let _ = tokio::fs::remove_file(&write_path).await;
1749                }
1750                return Err(Error::Protocol(
1751                    "stream ended before file payload".to_string(),
1752                ));
1753            }
1754            Err(error) => {
1755                if offset == 0 {
1756                    let _ = tokio::fs::remove_file(&write_path).await;
1757                }
1758                return Err(Error::Read(error));
1759            }
1760        };
1761
1762        file.write_all(&buffer[..read])
1763            .await
1764            .map_err(|source| Error::IoPath {
1765                path: write_path.clone(),
1766                source,
1767            })?;
1768        bytes_done += read as u64;
1769        bytes_remaining -= read as u64;
1770        events(TransferEvent::Progress {
1771            direction: TransferDirection::Receive,
1772            file_name: display_path(&entry.relative_path),
1773            bytes_done,
1774            bytes_total: entry.size,
1775        });
1776        if control.is_cancelled() {
1777            if offset == 0 {
1778                let _ = tokio::fs::remove_file(&write_path).await;
1779            }
1780            return Err(Error::TransferCancelled);
1781        }
1782    }
1783
1784    file.flush().await.map_err(|source| Error::IoPath {
1785        path: write_path.clone(),
1786        source,
1787    })?;
1788    drop(file);
1789
1790    if offset == 0 {
1791        tokio::fs::rename(&write_path, &final_path)
1792            .await
1793            .map_err(|source| Error::IoPath {
1794                path: final_path,
1795                source,
1796            })?;
1797    }
1798
1799    Ok(())
1800}
1801
1802async fn write_json_frame<T: Serialize>(send: &mut quinn::SendStream, value: &T) -> Result<()> {
1803    let bytes = serde_json::to_vec(value).map_err(|error| Error::Protocol(error.to_string()))?;
1804    if bytes.len() > MAX_FRAME_LEN {
1805        return Err(Error::Protocol("frame exceeds maximum length".to_string()));
1806    }
1807    send.write_all(&(bytes.len() as u32).to_be_bytes()).await?;
1808    send.write_all(&bytes).await?;
1809    Ok(())
1810}
1811
1812async fn read_json_frame<T: for<'de> Deserialize<'de>>(recv: &mut quinn::RecvStream) -> Result<T> {
1813    let mut len = [0; 4];
1814    recv.read_exact(&mut len).await?;
1815    let len = u32::from_be_bytes(len) as usize;
1816    if len > MAX_FRAME_LEN {
1817        return Err(Error::Protocol("frame exceeds maximum length".to_string()));
1818    }
1819
1820    let mut bytes = vec![0; len];
1821    recv.read_exact(&mut bytes).await?;
1822    serde_json::from_slice(&bytes).map_err(|error| Error::Protocol(error.to_string()))
1823}
1824
1825fn negotiate_direct_protocol(hello: &DirectProtocolHello) -> DirectProtocolResponse {
1826    if hello.min_protocol_version > NATIVE_PROTOCOL_VERSION {
1827        return DirectProtocolResponse::reject(format!(
1828            "peer requires protocol version {}, local max is {}",
1829            hello.min_protocol_version, NATIVE_PROTOCOL_VERSION
1830        ));
1831    }
1832
1833    if hello.protocol_version < MIN_NATIVE_PROTOCOL_VERSION {
1834        return DirectProtocolResponse::reject(format!(
1835            "peer max protocol version {} is below local minimum {}",
1836            hello.protocol_version, MIN_NATIVE_PROTOCOL_VERSION
1837        ));
1838    }
1839
1840    DirectProtocolResponse::accept(NATIVE_PROTOCOL_VERSION.min(hello.protocol_version))
1841}
1842
1843#[derive(Debug, Clone)]
1844struct ManifestSource {
1845    source_path: PathBuf,
1846    relative_path: PathBuf,
1847}
1848
1849fn collect_manifest_sources(paths: &[PathBuf]) -> Result<Vec<ManifestSource>> {
1850    let mut sources = Vec::new();
1851    for path in paths {
1852        let metadata = std::fs::symlink_metadata(path).map_err(|source| Error::IoPath {
1853            path: path.clone(),
1854            source,
1855        })?;
1856        if metadata.file_type().is_symlink() {
1857            return Err(Error::InvalidInput(format!(
1858                "{} is a symlink; symlink transfers are not supported",
1859                display_path(path)
1860            )));
1861        }
1862
1863        let root_name = path
1864            .file_name()
1865            .ok_or_else(|| Error::InvalidInput(format!("{} has no file name", display_path(path))))?
1866            .to_os_string();
1867        let relative_path = PathBuf::from(root_name);
1868
1869        if metadata.is_dir() {
1870            sources.push(ManifestSource {
1871                source_path: path.clone(),
1872                relative_path: relative_path.clone(),
1873            });
1874            collect_dir_sources(path, &relative_path, &mut sources)?;
1875        } else if metadata.is_file() {
1876            sources.push(ManifestSource {
1877                source_path: path.clone(),
1878                relative_path,
1879            });
1880        } else {
1881            return Err(Error::InvalidInput(format!(
1882                "{} is not a regular file or directory",
1883                display_path(path)
1884            )));
1885        }
1886    }
1887
1888    Ok(sources)
1889}
1890
1891fn collect_dir_sources(
1892    dir: &Path,
1893    relative_dir: &Path,
1894    sources: &mut Vec<ManifestSource>,
1895) -> Result<()> {
1896    for entry in std::fs::read_dir(dir).map_err(|source| Error::IoPath {
1897        path: dir.to_path_buf(),
1898        source,
1899    })? {
1900        let entry = entry.map_err(|source| Error::IoPath {
1901            path: dir.to_path_buf(),
1902            source,
1903        })?;
1904        let path = entry.path();
1905        let metadata = std::fs::symlink_metadata(&path).map_err(|source| Error::IoPath {
1906            path: path.clone(),
1907            source,
1908        })?;
1909        if metadata.file_type().is_symlink() {
1910            return Err(Error::InvalidInput(format!(
1911                "{} is a symlink; symlink transfers are not supported",
1912                display_path(&path)
1913            )));
1914        }
1915
1916        let relative_path = relative_dir.join(entry.file_name());
1917        if metadata.is_dir() {
1918            sources.push(ManifestSource {
1919                source_path: path.clone(),
1920                relative_path: relative_path.clone(),
1921            });
1922            collect_dir_sources(&path, &relative_path, sources)?;
1923        } else if metadata.is_file() {
1924            sources.push(ManifestSource {
1925                source_path: path,
1926                relative_path,
1927            });
1928        }
1929    }
1930
1931    Ok(())
1932}
1933
1934fn manifest_source_map(paths: &[PathBuf]) -> Result<BTreeMap<PathBuf, PathBuf>> {
1935    Ok(collect_manifest_sources(paths)?
1936        .into_iter()
1937        .filter_map(|source| {
1938            let metadata = std::fs::metadata(&source.source_path).ok()?;
1939            metadata
1940                .is_file()
1941                .then_some((source.relative_path, source.source_path))
1942        })
1943        .collect())
1944}
1945
1946async fn build_manifest_entry(source: ManifestSource) -> Result<ManifestEntry> {
1947    validate_relative_transfer_path(&source.relative_path)?;
1948    let metadata = tokio::fs::metadata(&source.source_path)
1949        .await
1950        .map_err(|source_error| Error::IoPath {
1951            path: source.source_path.clone(),
1952            source: source_error,
1953        })?;
1954    let unix_mode = unix_mode(&metadata);
1955    let modified_unix_ms = modified_unix_ms(&metadata);
1956
1957    if metadata.is_dir() {
1958        return Ok(ManifestEntry {
1959            relative_path: source.relative_path,
1960            kind: ManifestEntryKind::Directory,
1961            size: 0,
1962            blake3: None,
1963            chunks: Vec::new(),
1964            unix_mode,
1965            modified_unix_ms,
1966        });
1967    }
1968
1969    let (blake3, chunks) = hash_file_with_chunks(&source.source_path, DEFAULT_CHUNK_SIZE).await?;
1970    Ok(ManifestEntry {
1971        relative_path: source.relative_path,
1972        kind: ManifestEntryKind::File,
1973        size: metadata.len(),
1974        blake3: Some(blake3),
1975        chunks,
1976        unix_mode,
1977        modified_unix_ms,
1978    })
1979}
1980
1981fn validate_manifest(manifest: &TransferManifest) -> Result<()> {
1982    if manifest.version != MANIFEST_VERSION {
1983        return Err(Error::Protocol(format!(
1984            "unsupported manifest version {}",
1985            manifest.version
1986        )));
1987    }
1988    if manifest.chunk_size == 0 {
1989        return Err(Error::Protocol("manifest chunk size is zero".to_string()));
1990    }
1991
1992    let mut seen = BTreeSet::new();
1993    for entry in &manifest.entries {
1994        validate_relative_transfer_path(&entry.relative_path)?;
1995        if !seen.insert(entry.relative_path.clone()) {
1996            return Err(Error::Protocol(format!(
1997                "duplicate manifest path: {}",
1998                display_path(&entry.relative_path)
1999            )));
2000        }
2001        if entry.kind == ManifestEntryKind::File && entry.blake3.is_none() {
2002            return Err(Error::Protocol(format!(
2003                "file manifest entry is missing BLAKE3: {}",
2004                display_path(&entry.relative_path)
2005            )));
2006        }
2007    }
2008
2009    Ok(())
2010}
2011
2012fn validate_relative_transfer_path(path: &Path) -> Result<()> {
2013    if path.as_os_str().is_empty() || path.is_absolute() {
2014        return Err(Error::InvalidInput(format!(
2015            "unsafe transfer path: {}",
2016            display_path(path)
2017        )));
2018    }
2019
2020    let mut normal_components = 0usize;
2021    for component in path.components() {
2022        match component {
2023            Component::Normal(value) => {
2024                let Some(name) = value.to_str() else {
2025                    return Err(Error::InvalidInput(format!(
2026                        "transfer path must be UTF-8: {}",
2027                        display_path(path)
2028                    )));
2029                };
2030                validate_path_component(name, path)?;
2031                normal_components += 1;
2032            }
2033            Component::CurDir
2034            | Component::ParentDir
2035            | Component::RootDir
2036            | Component::Prefix(_) => {
2037                return Err(Error::InvalidInput(format!(
2038                    "unsafe transfer path: {}",
2039                    display_path(path)
2040                )));
2041            }
2042        }
2043    }
2044
2045    if normal_components == 0 {
2046        return Err(Error::InvalidInput(format!(
2047            "unsafe transfer path: {}",
2048            display_path(path)
2049        )));
2050    }
2051
2052    Ok(())
2053}
2054
2055fn validate_path_component(component: &str, full_path: &Path) -> Result<()> {
2056    if component.is_empty()
2057        || component.contains('\\')
2058        || component.contains('/')
2059        || component.contains(':')
2060        || component.ends_with(' ')
2061        || component.ends_with('.')
2062        || is_windows_reserved_name(component)
2063    {
2064        return Err(Error::InvalidInput(format!(
2065            "unsafe transfer path: {}",
2066            display_path(full_path)
2067        )));
2068    }
2069
2070    Ok(())
2071}
2072
2073fn is_windows_reserved_name(component: &str) -> bool {
2074    let stem = component
2075        .split('.')
2076        .next()
2077        .unwrap_or(component)
2078        .trim_end_matches([' ', '.'])
2079        .to_ascii_uppercase();
2080    matches!(
2081        stem.as_str(),
2082        "CON"
2083            | "PRN"
2084            | "AUX"
2085            | "NUL"
2086            | "COM1"
2087            | "COM2"
2088            | "COM3"
2089            | "COM4"
2090            | "COM5"
2091            | "COM6"
2092            | "COM7"
2093            | "COM8"
2094            | "COM9"
2095            | "LPT1"
2096            | "LPT2"
2097            | "LPT3"
2098            | "LPT4"
2099            | "LPT5"
2100            | "LPT6"
2101            | "LPT7"
2102            | "LPT8"
2103            | "LPT9"
2104    )
2105}
2106
2107async fn hash_file(path: &Path) -> Result<String> {
2108    hash_file_with_chunks(path, DEFAULT_CHUNK_SIZE)
2109        .await
2110        .map(|(hash, _)| hash)
2111}
2112
2113async fn hash_file_with_chunks(path: &Path, chunk_size: u64) -> Result<(String, Vec<String>)> {
2114    let mut file = tokio::fs::File::open(path)
2115        .await
2116        .map_err(|source| Error::IoPath {
2117            path: path.to_path_buf(),
2118            source,
2119        })?;
2120    let mut hasher = blake3::Hasher::new();
2121    let mut chunk_hasher = blake3::Hasher::new();
2122    let mut chunk_bytes = 0u64;
2123    let mut chunks = Vec::new();
2124    let mut buffer = vec![0; IO_BUFFER_SIZE];
2125
2126    loop {
2127        let read = file
2128            .read(&mut buffer)
2129            .await
2130            .map_err(|source| Error::IoPath {
2131                path: path.to_path_buf(),
2132                source,
2133            })?;
2134        if read == 0 {
2135            break;
2136        }
2137        hasher.update(&buffer[..read]);
2138
2139        let mut remaining = &buffer[..read];
2140        while !remaining.is_empty() {
2141            let take = remaining.len().min((chunk_size - chunk_bytes) as usize);
2142            chunk_hasher.update(&remaining[..take]);
2143            chunk_bytes += take as u64;
2144            remaining = &remaining[take..];
2145            if chunk_bytes == chunk_size {
2146                chunks.push(chunk_hasher.finalize().to_hex().to_string());
2147                chunk_hasher = blake3::Hasher::new();
2148                chunk_bytes = 0;
2149            }
2150        }
2151    }
2152
2153    if chunk_bytes > 0 {
2154        chunks.push(chunk_hasher.finalize().to_hex().to_string());
2155    }
2156
2157    Ok((hasher.finalize().to_hex().to_string(), chunks))
2158}
2159
2160async fn verified_prefix_offset(
2161    path: &Path,
2162    entry: &ManifestEntry,
2163    existing_len: u64,
2164    chunk_size: u64,
2165) -> Result<u64> {
2166    if existing_len == 0 || entry.chunks.is_empty() {
2167        return Ok(0);
2168    }
2169
2170    let chunks_to_check = (existing_len / chunk_size).min(entry.chunks.len() as u64) as usize;
2171    if chunks_to_check == 0 {
2172        return Ok(0);
2173    }
2174
2175    let mut file = tokio::fs::File::open(path)
2176        .await
2177        .map_err(|source| Error::IoPath {
2178            path: path.to_path_buf(),
2179            source,
2180        })?;
2181    let mut buffer = vec![0; IO_BUFFER_SIZE];
2182    let mut verified_chunks = 0usize;
2183
2184    for expected in entry.chunks.iter().take(chunks_to_check) {
2185        let mut remaining = chunk_size;
2186        let mut hasher = blake3::Hasher::new();
2187        while remaining > 0 {
2188            let read_size = buffer.len().min(remaining as usize);
2189            file.read_exact(&mut buffer[..read_size])
2190                .await
2191                .map_err(|source| Error::IoPath {
2192                    path: path.to_path_buf(),
2193                    source,
2194                })?;
2195            hasher.update(&buffer[..read_size]);
2196            remaining -= read_size as u64;
2197        }
2198
2199        if hasher.finalize().to_hex().as_str() != expected {
2200            break;
2201        }
2202        verified_chunks += 1;
2203    }
2204
2205    Ok(verified_chunks as u64 * chunk_size)
2206}
2207
2208fn summary_from_files(
2209    manifest: &TransferManifest,
2210    files: Vec<TransferFileSummary>,
2211) -> Result<TransferSummary> {
2212    let bytes = manifest.total_file_bytes();
2213    let blake3 = if files.len() == 1 {
2214        files[0].blake3.clone()
2215    } else {
2216        manifest.digest()?
2217    };
2218    let file_name = if files.len() == 1 {
2219        display_path(&files[0].relative_path)
2220    } else {
2221        format!("{} files", files.len())
2222    };
2223    let path = files
2224        .first()
2225        .map(|file| file.path.clone())
2226        .unwrap_or_default();
2227
2228    Ok(TransferSummary {
2229        path,
2230        file_name,
2231        bytes,
2232        blake3,
2233        files,
2234    })
2235}
2236
2237fn temp_path_for(final_path: &Path) -> PathBuf {
2238    let file_name = final_path
2239        .file_name()
2240        .and_then(|name| name.to_str())
2241        .unwrap_or("transfer");
2242    final_path.with_file_name(format!(".{file_name}.{}.ferry-part", Uuid::new_v4()))
2243}
2244
2245fn modified_unix_ms(metadata: &Metadata) -> Option<i128> {
2246    system_time_unix_ms(metadata.modified().ok()?)
2247}
2248
2249fn system_time_unix_ms(time: SystemTime) -> Option<i128> {
2250    match time.duration_since(UNIX_EPOCH) {
2251        Ok(duration) => Some(duration.as_millis() as i128),
2252        Err(error) => Some(-(error.duration().as_millis() as i128)),
2253    }
2254}
2255
2256#[cfg(unix)]
2257fn unix_mode(metadata: &Metadata) -> Option<u32> {
2258    use std::os::unix::fs::PermissionsExt;
2259
2260    Some(metadata.permissions().mode())
2261}
2262
2263#[cfg(not(unix))]
2264fn unix_mode(_metadata: &Metadata) -> Option<u32> {
2265    None
2266}
2267
2268fn client_config_capturing_server_fingerprint()
2269-> Result<(quinn::ClientConfig, Arc<Mutex<Option<String>>>)> {
2270    let server_fingerprint = Arc::new(Mutex::new(None));
2271    let crypto = rustls::ClientConfig::builder()
2272        .dangerous()
2273        .with_custom_certificate_verifier(ServerFingerprintVerifier::new(
2274            server_fingerprint.clone(),
2275        ))
2276        .with_no_client_auth();
2277
2278    let config = quinn::ClientConfig::new(Arc::new(
2279        QuicClientConfig::try_from(crypto)
2280            .map_err(|error| Error::CryptoConfig(error.to_string()))?,
2281    ));
2282    Ok((config, server_fingerprint))
2283}
2284
2285fn captured_server_fingerprint(fingerprint: &Arc<Mutex<Option<String>>>) -> Result<String> {
2286    fingerprint
2287        .lock()
2288        .map_err(|_| Error::CryptoConfig("server fingerprint capture lock poisoned".to_string()))?
2289        .clone()
2290        .ok_or_else(|| {
2291            Error::CryptoConfig("server certificate fingerprint was not captured".to_string())
2292        })
2293}
2294
2295#[derive(Debug)]
2296struct ServerFingerprintVerifier {
2297    provider: Arc<CryptoProvider>,
2298    server_fingerprint: Arc<Mutex<Option<String>>>,
2299}
2300
2301impl ServerFingerprintVerifier {
2302    fn new(server_fingerprint: Arc<Mutex<Option<String>>>) -> Arc<Self> {
2303        Arc::new(Self {
2304            provider: Arc::new(rustls::crypto::ring::default_provider()),
2305            server_fingerprint,
2306        })
2307    }
2308}
2309
2310impl ServerCertVerifier for ServerFingerprintVerifier {
2311    fn verify_server_cert(
2312        &self,
2313        end_entity: &CertificateDer<'_>,
2314        _intermediates: &[CertificateDer<'_>],
2315        _server_name: &ServerName<'_>,
2316        _ocsp: &[u8],
2317        _now: UnixTime,
2318    ) -> std::result::Result<ServerCertVerified, rustls::Error> {
2319        let fingerprint = blake3::hash(end_entity.as_ref()).to_hex().to_string();
2320        *self.server_fingerprint.lock().map_err(|_| {
2321            rustls::Error::General("server fingerprint capture lock poisoned".to_string())
2322        })? = Some(fingerprint);
2323        Ok(ServerCertVerified::assertion())
2324    }
2325
2326    fn verify_tls12_signature(
2327        &self,
2328        message: &[u8],
2329        cert: &CertificateDer<'_>,
2330        dss: &DigitallySignedStruct,
2331    ) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
2332        verify_tls12_signature(
2333            message,
2334            cert,
2335            dss,
2336            &self.provider.signature_verification_algorithms,
2337        )
2338    }
2339
2340    fn verify_tls13_signature(
2341        &self,
2342        message: &[u8],
2343        cert: &CertificateDer<'_>,
2344        dss: &DigitallySignedStruct,
2345    ) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
2346        verify_tls13_signature(
2347            message,
2348            cert,
2349            dss,
2350            &self.provider.signature_verification_algorithms,
2351        )
2352    }
2353
2354    fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
2355        self.provider
2356            .signature_verification_algorithms
2357            .supported_schemes()
2358    }
2359}
2360
2361fn ensure_rustls_provider() {
2362    let _ = rustls::crypto::ring::default_provider().install_default();
2363}
2364
2365pub fn config_dir() -> Result<PathBuf> {
2366    let base_dirs = BaseDirs::new().ok_or(Error::ConfigDirUnavailable)?;
2367    Ok(base_dirs.config_dir().join(CONFIG_DIR_NAME))
2368}
2369
2370fn default_alias() -> String {
2371    std::env::var("HOSTNAME")
2372        .or_else(|_| std::env::var("COMPUTERNAME"))
2373        .ok()
2374        .filter(|value| !value.trim().is_empty())
2375        .unwrap_or_else(|| "fileferry-device".to_string())
2376}
2377
2378fn expand_home_path(path: &str) -> Result<PathBuf> {
2379    if path == "~" {
2380        return Ok(BaseDirs::new()
2381            .ok_or(Error::ConfigDirUnavailable)?
2382            .home_dir()
2383            .to_path_buf());
2384    }
2385
2386    if let Some(rest) = path.strip_prefix("~/") {
2387        return Ok(BaseDirs::new()
2388            .ok_or(Error::ConfigDirUnavailable)?
2389            .home_dir()
2390            .join(rest));
2391    }
2392
2393    Ok(PathBuf::from(path))
2394}
2395
2396fn normalized_fingerprint(fingerprint: String) -> Result<String> {
2397    let fingerprint = fingerprint.trim().to_ascii_lowercase();
2398    if fingerprint.len() != 64 || !fingerprint.chars().all(|char| char.is_ascii_hexdigit()) {
2399        return Err(Error::InvalidInput(
2400            "peer fingerprint must be a full 64-character hex BLAKE3 fingerprint".to_string(),
2401        ));
2402    }
2403
2404    Ok(fingerprint)
2405}
2406
2407fn write_toml_file<T: Serialize>(path: &Path, value: &T) -> Result<()> {
2408    let bytes =
2409        toml::to_string_pretty(value).map_err(|error| Error::ConfigSerialize(error.to_string()))?;
2410    if let Some(parent) = path.parent() {
2411        std::fs::create_dir_all(parent).map_err(|source| Error::IoPath {
2412            path: parent.to_path_buf(),
2413            source,
2414        })?;
2415    }
2416
2417    let temp_path = path.with_extension(format!("tmp-{}", Uuid::new_v4()));
2418    std::fs::write(&temp_path, bytes).map_err(|source| Error::IoPath {
2419        path: temp_path.clone(),
2420        source,
2421    })?;
2422    std::fs::rename(&temp_path, path).map_err(|source| Error::IoPath {
2423        path: path.to_path_buf(),
2424        source,
2425    })?;
2426    Ok(())
2427}
2428
2429fn write_private_file(path: &Path, bytes: &[u8]) -> Result<()> {
2430    std::fs::write(path, bytes).map_err(|source| Error::IoPath {
2431        path: path.to_path_buf(),
2432        source,
2433    })?;
2434
2435    #[cfg(unix)]
2436    {
2437        use std::os::unix::fs::PermissionsExt;
2438
2439        let mut permissions = std::fs::metadata(path)
2440            .map_err(|source| Error::IoPath {
2441                path: path.to_path_buf(),
2442                source,
2443            })?
2444            .permissions();
2445        permissions.set_mode(0o600);
2446        std::fs::set_permissions(path, permissions).map_err(|source| Error::IoPath {
2447            path: path.to_path_buf(),
2448            source,
2449        })?;
2450    }
2451
2452    Ok(())
2453}
2454
2455#[cfg(test)]
2456mod tests {
2457    use super::*;
2458    use std::sync::{Arc, Mutex};
2459
2460    #[test]
2461    fn direct_peer_parses_socket_address() {
2462        let peer = DirectPeer::parse("127.0.0.1:53318").expect("address parses");
2463
2464        assert_eq!(peer.address().to_string(), "127.0.0.1:53318");
2465        assert_eq!(peer.expected_fingerprint(), None);
2466    }
2467
2468    #[test]
2469    fn direct_peer_accepts_expected_fingerprint() {
2470        let fingerprint = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
2471        let peer = DirectPeer::parse("127.0.0.1:53318")
2472            .expect("address parses")
2473            .with_expected_fingerprint(fingerprint)
2474            .expect("fingerprint parses");
2475
2476        assert_eq!(peer.expected_fingerprint(), Some(fingerprint));
2477    }
2478
2479    #[test]
2480    fn direct_peer_rejects_missing_port() {
2481        assert!(DirectPeer::parse("127.0.0.1").is_err());
2482    }
2483
2484    #[test]
2485    fn send_request_requires_at_least_one_path() {
2486        let peer = DirectPeer::parse("127.0.0.1:53318").expect("address parses");
2487
2488        assert!(SendRequest::new(peer, Vec::new()).is_err());
2489    }
2490
2491    #[test]
2492    fn validate_send_paths_rejects_empty_slice() {
2493        assert!(validate_send_paths(&[]).is_err());
2494    }
2495
2496    #[tokio::test]
2497    async fn manifest_walks_directory_and_serializes_entries() {
2498        let temp = tempfile::tempdir().expect("tempdir");
2499        let root = temp.path().join("bundle");
2500        let nested = root.join("nested");
2501        tokio::fs::create_dir_all(&nested).await.expect("dirs");
2502        tokio::fs::write(root.join("a.txt"), b"alpha")
2503            .await
2504            .expect("file a");
2505        tokio::fs::write(nested.join("b.txt"), b"beta")
2506            .await
2507            .expect("file b");
2508
2509        let manifest = build_manifest(&[root]).await.expect("manifest");
2510        let json = serde_json::to_string(&manifest).expect("manifest serializes");
2511        let decoded: TransferManifest = serde_json::from_str(&json).expect("manifest decodes");
2512
2513        assert_eq!(decoded.version, MANIFEST_VERSION);
2514        assert!(decoded.entries.iter().any(|entry| {
2515            entry.kind == ManifestEntryKind::Directory
2516                && entry.relative_path == Path::new("bundle/nested")
2517        }));
2518        assert!(decoded.entries.iter().any(|entry| {
2519            entry.kind == ManifestEntryKind::File
2520                && entry.relative_path == Path::new("bundle/nested/b.txt")
2521                && entry.size == 4
2522                && entry.blake3.is_some()
2523        }));
2524    }
2525
2526    #[test]
2527    fn safe_destination_path_rejects_unsafe_paths() {
2528        let dest = Path::new("/tmp/ferry-dest");
2529
2530        assert!(safe_destination_path(dest, Path::new("../escape.txt")).is_err());
2531        assert!(safe_destination_path(dest, Path::new("/absolute.txt")).is_err());
2532        assert!(safe_destination_path(dest, Path::new("nested/CON")).is_err());
2533        assert!(safe_destination_path(dest, Path::new("nested\\escape.txt")).is_err());
2534        assert!(safe_destination_path(dest, Path::new("nested/ok.txt")).is_ok());
2535    }
2536
2537    #[test]
2538    fn safe_destination_path_rejects_generated_escape_and_reserved_patterns() {
2539        let dest = Path::new("/tmp/ferry-dest");
2540        let unsafe_components = [
2541            "..",
2542            ".",
2543            "CON",
2544            "con.txt",
2545            "NUL",
2546            "COM1",
2547            "LPT9",
2548            "trailingspace ",
2549            "trailingdot.",
2550            "has:colon",
2551            "has\\backslash",
2552        ];
2553
2554        for component in unsafe_components {
2555            assert!(
2556                safe_destination_path(dest, Path::new(component)).is_err(),
2557                "{component} should be rejected as a top-level path"
2558            );
2559            if component != "." {
2560                assert!(
2561                    safe_destination_path(dest, Path::new("safe").join(component).as_path())
2562                        .is_err(),
2563                    "{component} should be rejected as a nested path"
2564                );
2565            }
2566        }
2567
2568        for component in ["alpha", "alpha-1", "alpha_1", "report.final.txt"] {
2569            let path = safe_destination_path(dest, Path::new("safe").join(component).as_path())
2570                .expect("safe generated path accepted");
2571            assert!(path.starts_with(dest));
2572        }
2573    }
2574
2575    #[tokio::test]
2576    async fn resume_decision_skips_existing_matching_file() {
2577        let temp = tempfile::tempdir().expect("tempdir");
2578        let source = temp.path().join("source.txt");
2579        let dest = temp.path().join("dest");
2580        tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2581        tokio::fs::write(&source, b"same contents")
2582            .await
2583            .expect("source");
2584        tokio::fs::write(dest.join("source.txt"), b"same contents")
2585            .await
2586            .expect("dest file");
2587        let manifest = build_manifest(&[source]).await.expect("manifest");
2588        let entry = manifest.file_entries().next().expect("file entry");
2589
2590        let decision = decide_resume(&dest, entry, manifest.chunk_size)
2591            .await
2592            .expect("decision");
2593
2594        assert_eq!(decision, ResumeDecision::Skip);
2595    }
2596
2597    #[tokio::test]
2598    async fn resume_decision_returns_verified_chunk_offset() {
2599        let temp = tempfile::tempdir().expect("tempdir");
2600        let source = temp.path().join("large.bin");
2601        let dest = temp.path().join("dest");
2602        tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2603        let bytes = vec![42; (DEFAULT_CHUNK_SIZE as usize * 2) + 128];
2604        tokio::fs::write(&source, &bytes).await.expect("source");
2605        tokio::fs::write(
2606            dest.join("large.bin"),
2607            &bytes[..DEFAULT_CHUNK_SIZE as usize + 99],
2608        )
2609        .await
2610        .expect("partial");
2611        let manifest = build_manifest(&[source]).await.expect("manifest");
2612        let entry = manifest.file_entries().next().expect("file entry");
2613
2614        let decision = decide_resume(&dest, entry, manifest.chunk_size)
2615            .await
2616            .expect("decision");
2617
2618        assert_eq!(decision, ResumeDecision::ResumeFrom(DEFAULT_CHUNK_SIZE));
2619    }
2620
2621    #[tokio::test]
2622    async fn resume_decision_rejects_existing_file_with_mismatched_contents() {
2623        let temp = tempfile::tempdir().expect("tempdir");
2624        let source = temp.path().join("source.txt");
2625        let dest = temp.path().join("dest");
2626        tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2627        tokio::fs::write(&source, b"expected contents")
2628            .await
2629            .expect("source");
2630        tokio::fs::write(dest.join("source.txt"), b"different contents")
2631            .await
2632            .expect("conflicting dest file");
2633        let manifest = build_manifest(&[source]).await.expect("manifest");
2634        let entry = manifest.file_entries().next().expect("file entry");
2635
2636        let decision = decide_resume(&dest, entry, manifest.chunk_size)
2637            .await
2638            .expect("decision");
2639
2640        assert!(matches!(decision, ResumeDecision::Conflict(_)));
2641    }
2642
2643    #[tokio::test]
2644    async fn resume_decision_rejects_existing_file_larger_than_manifest_entry() {
2645        let temp = tempfile::tempdir().expect("tempdir");
2646        let source = temp.path().join("source.txt");
2647        let dest = temp.path().join("dest");
2648        tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2649        tokio::fs::write(&source, b"small").await.expect("source");
2650        tokio::fs::write(dest.join("source.txt"), b"larger destination")
2651            .await
2652            .expect("larger dest file");
2653        let manifest = build_manifest(&[source]).await.expect("manifest");
2654        let entry = manifest.file_entries().next().expect("file entry");
2655
2656        let decision = decide_resume(&dest, entry, manifest.chunk_size)
2657            .await
2658            .expect("decision");
2659
2660        assert!(matches!(decision, ResumeDecision::Conflict(_)));
2661    }
2662
2663    #[test]
2664    fn identity_is_loaded_after_generation() {
2665        let temp = tempfile::tempdir().expect("tempdir");
2666        let generated =
2667            NativeIdentity::load_or_generate_in(temp.path()).expect("identity generated");
2668        let loaded = NativeIdentity::load_or_generate_in(temp.path()).expect("identity loaded");
2669
2670        assert_eq!(generated.fingerprint(), loaded.fingerprint());
2671    }
2672
2673    #[test]
2674    fn app_config_round_trips_toml() {
2675        let temp = tempfile::tempdir().expect("tempdir");
2676        let path = temp.path().join("config.toml");
2677        let config = AppConfig {
2678            alias: "desk".to_string(),
2679            ..AppConfig::default()
2680        };
2681
2682        config.save_to_path(&path).expect("config saved");
2683        let loaded = AppConfig::load_or_default_from(&path).expect("config loaded");
2684
2685        assert_eq!(loaded.alias, "desk");
2686        assert_eq!(loaded.listen_port, 53317);
2687        assert!(loaded.trust.require_fingerprint);
2688    }
2689
2690    #[test]
2691    fn app_config_redacts_psk_for_display_output() {
2692        let config = AppConfig {
2693            trust: TrustConfig {
2694                psk: "super-secret-psk".to_string(),
2695                ..TrustConfig::default()
2696            },
2697            ..AppConfig::default()
2698        };
2699
2700        let redacted = config.to_redacted_toml_string().expect("config serializes");
2701
2702        assert!(!redacted.contains("super-secret-psk"));
2703        assert!(redacted.contains(r#"psk = "<redacted>""#));
2704    }
2705
2706    #[test]
2707    fn daemon_config_defaults_from_app_config_and_round_trips_toml() {
2708        let temp = tempfile::tempdir().expect("tempdir");
2709        let path = temp.path().join("daemon.toml");
2710        let app_config = AppConfig {
2711            quic_port: 54444,
2712            download_dir: temp.path().join("downloads").display().to_string(),
2713            ..AppConfig::default()
2714        };
2715
2716        let defaulted =
2717            DaemonConfig::load_or_default_from(&path, &app_config).expect("daemon config");
2718        assert_eq!(
2719            defaulted.listen,
2720            SocketAddr::from(([0, 0, 0, 0], app_config.quic_port))
2721        );
2722        assert_eq!(defaulted.destination, temp.path().join("downloads"));
2723
2724        let config = DaemonConfig {
2725            listen: SocketAddr::from(([127, 0, 0, 1], 53318)),
2726            destination: temp.path().join("daemon-dest"),
2727        };
2728        config.save_to_path(&path).expect("daemon config saved");
2729        let loaded =
2730            DaemonConfig::load_or_default_from(&path, &app_config).expect("daemon config loaded");
2731
2732        assert_eq!(loaded, config);
2733    }
2734
2735    #[test]
2736    fn trust_store_load_save_and_forget_round_trip() {
2737        let temp = tempfile::tempdir().expect("tempdir");
2738        let path = temp.path().join("known_peers.toml");
2739        let fingerprint = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
2740        let mut store = TrustStore::default();
2741
2742        store
2743            .trust_fingerprint(fingerprint)
2744            .expect("fingerprint trusted");
2745        store.save_to_path(&path).expect("trust store saved");
2746        let mut loaded = TrustStore::load_or_default_from(&path).expect("trust store loaded");
2747
2748        assert_eq!(loaded.records().count(), 1);
2749        assert_eq!(loaded.peers[fingerprint].trust_state, TrustState::Trusted);
2750        assert!(loaded.forget(fingerprint));
2751        assert_eq!(loaded.records().count(), 0);
2752    }
2753
2754    #[test]
2755    fn trust_store_rejects_short_fingerprint_without_discovery_lookup() {
2756        let mut store = TrustStore::default();
2757
2758        assert!(store.trust_fingerprint("9a4f2c").is_err());
2759    }
2760
2761    #[test]
2762    fn transfer_events_serialize_with_stable_event_names() {
2763        let event = TransferEvent::Progress {
2764            direction: TransferDirection::Send,
2765            file_name: "bundle/a.txt".to_string(),
2766            bytes_done: 5,
2767            bytes_total: 9,
2768        };
2769
2770        let json = serde_json::to_value(&event).expect("event serializes");
2771
2772        assert_eq!(json["event"], "progress");
2773        assert_eq!(json["direction"], "send");
2774        assert_eq!(json["file_name"], "bundle/a.txt");
2775        assert_eq!(json["bytes_done"], 5);
2776        assert_eq!(json["bytes_total"], 9);
2777    }
2778
2779    #[test]
2780    fn direct_protocol_hello_has_stable_golden_json() {
2781        let hello = DirectProtocolHello {
2782            protocol_version: 1,
2783            min_protocol_version: 1,
2784            client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2785                .to_string(),
2786        };
2787
2788        let json = serde_json::to_string(&hello).expect("hello serializes");
2789
2790        assert_eq!(
2791            json,
2792            r#"{"protocol_version":1,"min_protocol_version":1,"client_fingerprint":"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"}"#
2793        );
2794    }
2795
2796    #[test]
2797    fn direct_protocol_response_has_stable_golden_json() {
2798        let response = DirectProtocolResponse::accept(1);
2799
2800        let json = serde_json::to_string(&response).expect("response serializes");
2801
2802        assert_eq!(
2803            json,
2804            r#"{"accepted":true,"protocol_version":1,"error":null}"#
2805        );
2806    }
2807
2808    #[test]
2809    fn direct_protocol_negotiates_supported_overlap() {
2810        let hello = DirectProtocolHello {
2811            protocol_version: 3,
2812            min_protocol_version: 1,
2813            client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2814                .to_string(),
2815        };
2816
2817        let response = negotiate_direct_protocol(&hello);
2818
2819        assert_eq!(response, DirectProtocolResponse::accept(1));
2820    }
2821
2822    #[test]
2823    fn direct_protocol_rejects_incompatible_versions() {
2824        let too_new = DirectProtocolHello {
2825            protocol_version: 3,
2826            min_protocol_version: 2,
2827            client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2828                .to_string(),
2829        };
2830        let too_old = DirectProtocolHello {
2831            protocol_version: 0,
2832            min_protocol_version: 0,
2833            client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2834                .to_string(),
2835        };
2836
2837        assert!(!negotiate_direct_protocol(&too_new).accepted);
2838        assert!(!negotiate_direct_protocol(&too_old).accepted);
2839    }
2840
2841    #[test]
2842    fn transfer_manifest_has_stable_golden_json() {
2843        let manifest = TransferManifest {
2844            version: MANIFEST_VERSION,
2845            session_id: Uuid::parse_str("00000000-0000-0000-0000-000000000001")
2846                .expect("uuid parses"),
2847            chunk_size: DEFAULT_CHUNK_SIZE,
2848            entries: vec![ManifestEntry {
2849                relative_path: PathBuf::from("bundle/a.txt"),
2850                kind: ManifestEntryKind::File,
2851                size: 5,
2852                blake3: Some(
2853                    "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef".to_string(),
2854                ),
2855                chunks: vec![
2856                    "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789".to_string(),
2857                ],
2858                unix_mode: Some(0o100644),
2859                modified_unix_ms: Some(1_700_000_000_000),
2860            }],
2861        };
2862
2863        let json = serde_json::to_string(&manifest).expect("manifest serializes");
2864
2865        assert_eq!(
2866            json,
2867            r#"{"version":1,"session_id":"00000000-0000-0000-0000-000000000001","chunk_size":1048576,"entries":[{"relative_path":"bundle/a.txt","kind":"file","size":5,"blake3":"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef","chunks":["abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789"],"unix_mode":33188,"modified_unix_ms":1700000000000}]}"#
2868        );
2869    }
2870
2871    #[test]
2872    fn peer_registry_merges_observations_by_fingerprint() {
2873        let mut registry = PeerRegistry::new();
2874        let fingerprint = "abcdef1234567890";
2875        let first = PeerObservation::new(
2876            fingerprint,
2877            SocketAddr::from(([192, 168, 1, 10], 53318)),
2878            100,
2879        )
2880        .with_alias("desk")
2881        .with_hostname("desk.local")
2882        .with_transport("quic");
2883        let second = PeerObservation::new(
2884            fingerprint,
2885            SocketAddr::from(([192, 168, 1, 11], 53318)),
2886            200,
2887        )
2888        .with_alias("workstation")
2889        .with_transport("quic");
2890
2891        registry.observe(first).expect("first observation");
2892        registry.observe(second).expect("second observation");
2893        let record = registry
2894            .get_by_fingerprint(fingerprint)
2895            .expect("record by fingerprint");
2896
2897        assert_eq!(registry.records().count(), 1);
2898        assert!(record.aliases.contains("desk"));
2899        assert!(record.aliases.contains("workstation"));
2900        assert!(record.hostnames.contains("desk.local"));
2901        assert_eq!(record.addresses.len(), 2);
2902        assert_eq!(record.first_seen_unix_ms, 100);
2903        assert_eq!(record.last_seen_unix_ms, 200);
2904    }
2905
2906    #[test]
2907    fn peer_registry_looks_up_unique_fingerprint_prefix_alias_and_hostname() {
2908        let mut registry = PeerRegistry::new();
2909        registry
2910            .observe(
2911                PeerObservation::new(
2912                    "abcdef1234567890",
2913                    SocketAddr::from(([192, 168, 1, 10], 53318)),
2914                    100,
2915                )
2916                .with_alias("desk")
2917                .with_hostname("desk.local"),
2918            )
2919            .expect("first observation");
2920        registry
2921            .observe(
2922                PeerObservation::new(
2923                    "123456abcdef7890",
2924                    SocketAddr::from(([192, 168, 1, 20], 53318)),
2925                    100,
2926                )
2927                .with_alias("laptop"),
2928            )
2929            .expect("second observation");
2930
2931        assert_eq!(
2932            registry
2933                .lookup("abcdef")
2934                .map(|record| record.fingerprint.as_str()),
2935            Some("abcdef1234567890")
2936        );
2937        assert_eq!(
2938            registry
2939                .lookup("desk")
2940                .map(|record| record.fingerprint.as_str()),
2941            Some("abcdef1234567890")
2942        );
2943        assert_eq!(
2944            registry
2945                .lookup("desk.local")
2946                .map(|record| record.fingerprint.as_str()),
2947            Some("abcdef1234567890")
2948        );
2949        assert!(registry.lookup("missing").is_none());
2950    }
2951
2952    #[test]
2953    fn peer_registry_rejects_ambiguous_lookup_hints() {
2954        let mut registry = PeerRegistry::new();
2955        registry
2956            .observe(
2957                PeerObservation::new(
2958                    "abcdef1234567890",
2959                    SocketAddr::from(([192, 168, 1, 10], 53318)),
2960                    100,
2961                )
2962                .with_alias("desk"),
2963            )
2964            .expect("first observation");
2965        registry
2966            .observe(
2967                PeerObservation::new(
2968                    "abcdef9999999999",
2969                    SocketAddr::from(([192, 168, 1, 11], 53318)),
2970                    100,
2971                )
2972                .with_alias("desk"),
2973            )
2974            .expect("second observation");
2975
2976        assert!(registry.lookup("abcdef").is_none());
2977        assert!(registry.lookup("desk").is_none());
2978        assert_eq!(
2979            registry
2980                .lookup("abcdef1234567890")
2981                .map(|record| record.fingerprint.as_str()),
2982            Some("abcdef1234567890")
2983        );
2984        match registry.lookup_detail("desk") {
2985            PeerLookup::Ambiguous(records) => assert_eq!(records.len(), 2),
2986            other => panic!("expected ambiguous duplicate-alias lookup, got {other:?}"),
2987        }
2988        assert!(matches!(
2989            registry.lookup_detail("missing"),
2990            PeerLookup::Missing
2991        ));
2992    }
2993
2994    #[test]
2995    fn native_announcement_builds_expected_txt_properties() {
2996        let announcement = NativeAnnouncement {
2997            alias: "Stephen MBP".to_string(),
2998            fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2999                .to_string(),
3000            quic_port: 53318,
3001            listen_port: Some(53317),
3002        };
3003
3004        let service = announcement.service_info().expect("service info");
3005
3006        assert_eq!(service.get_type(), NATIVE_SERVICE_TYPE);
3007        assert!(
3008            service
3009                .get_fullname()
3010                .starts_with("stephen-mbp-0123456789ab.")
3011        );
3012        assert_eq!(service.get_property_val_str("pv"), Some("1"));
3013        assert_eq!(service.get_property_val_str("minpv"), Some("1"));
3014        assert_eq!(
3015            service.get_property_val_str("fp"),
3016            Some("0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")
3017        );
3018        assert_eq!(service.get_property_val_str("alias"), Some("Stephen MBP"));
3019        assert_eq!(service.get_property_val_str("transports"), Some("quic"));
3020        assert_eq!(service.get_property_val_str("quic_port"), Some("53318"));
3021        assert_eq!(service.get_property_val_str("listen_port"), Some("53317"));
3022    }
3023
3024    #[test]
3025    fn resolved_native_service_merges_into_registry() {
3026        let properties = [
3027            ("pv", "1"),
3028            ("minpv", "1"),
3029            (
3030                "fp",
3031                "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
3032            ),
3033            ("alias", "desk"),
3034            ("transports", "quic"),
3035            ("quic_port", "53318"),
3036        ];
3037        let service = ServiceInfo::new(
3038            NATIVE_SERVICE_TYPE,
3039            "desk-0123456789ab",
3040            "desk.local.",
3041            "192.168.1.42",
3042            53318,
3043            &properties[..],
3044        )
3045        .expect("service info")
3046        .as_resolved_service();
3047        let mut registry = PeerRegistry::new();
3048
3049        observe_resolved_service(&mut registry, &service).expect("observation");
3050
3051        let record = registry.lookup("desk").expect("record by alias");
3052        assert_eq!(
3053            record.fingerprint,
3054            "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3055        );
3056        assert_eq!(
3057            record.preferred_quic_address(),
3058            Some(SocketAddr::from(([192, 168, 1, 42], 53318)))
3059        );
3060    }
3061
3062    #[test]
3063    fn resolved_native_service_ignores_incompatible_protocol_ranges() {
3064        let properties = [
3065            ("pv", "3"),
3066            ("minpv", "2"),
3067            (
3068                "fp",
3069                "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
3070            ),
3071            ("alias", "desk"),
3072            ("transports", "quic"),
3073            ("quic_port", "53318"),
3074        ];
3075        let service = ServiceInfo::new(
3076            NATIVE_SERVICE_TYPE,
3077            "desk-0123456789ab",
3078            "desk.local.",
3079            "192.168.1.42",
3080            53318,
3081            &properties[..],
3082        )
3083        .expect("service info")
3084        .as_resolved_service();
3085        let mut registry = PeerRegistry::new();
3086
3087        observe_resolved_service(&mut registry, &service).expect("observation");
3088
3089        assert_eq!(registry.records().count(), 0);
3090    }
3091
3092    #[tokio::test]
3093    async fn direct_quic_loopback_sends_one_file() {
3094        let source_dir = tempfile::tempdir().expect("source tempdir");
3095        let dest_dir = tempfile::tempdir().expect("dest tempdir");
3096        let identity_dir = tempfile::tempdir().expect("identity tempdir");
3097        let file_path = source_dir.path().join("hello.txt");
3098        tokio::fs::write(&file_path, b"hello from ferry")
3099            .await
3100            .expect("source file written");
3101
3102        let identity =
3103            NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3104        let recv_identity = identity.clone();
3105        let reserved_socket =
3106            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3107        let recv_addr = reserved_socket.local_addr().expect("local addr");
3108        drop(reserved_socket);
3109        let receive_progress = Arc::new(Mutex::new(Vec::new()));
3110        let receive_progress_log = receive_progress.clone();
3111        let dest_path = dest_dir.path().to_path_buf();
3112        let server = tokio::spawn(async move {
3113            receive_direct_file(
3114                &ReceiveRequest::new(recv_addr),
3115                dest_path,
3116                &recv_identity,
3117                |event| {
3118                    receive_progress_log
3119                        .lock()
3120                        .expect("progress lock")
3121                        .push(event);
3122                },
3123            )
3124            .await
3125        });
3126
3127        tokio::time::sleep(Duration::from_millis(100)).await;
3128
3129        let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3130        let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
3131        let send_progress = Arc::new(Mutex::new(Vec::new()));
3132        let send_progress_log = send_progress.clone();
3133        let send_summary = send_direct_file(&send_request, &identity, |event| {
3134            send_progress_log.lock().expect("progress lock").push(event);
3135        })
3136        .await
3137        .expect("file sent");
3138        let receive_summary = server.await.expect("server joined").expect("file received");
3139
3140        let received = tokio::fs::read(dest_dir.path().join("hello.txt"))
3141            .await
3142            .expect("received file readable");
3143        assert_eq!(received, b"hello from ferry");
3144        assert_eq!(send_summary.bytes, receive_summary.bytes);
3145        assert_eq!(send_summary.blake3, receive_summary.blake3);
3146        let send_progress = send_progress.lock().expect("progress lock");
3147        let receive_progress = receive_progress.lock().expect("progress lock");
3148        assert!(matches!(
3149            send_progress.first(),
3150            Some(TransferEvent::SessionStarted {
3151                direction: TransferDirection::Send,
3152                ..
3153            })
3154        ));
3155        assert!(
3156            send_progress
3157                .iter()
3158                .any(|event| matches!(event, TransferEvent::Progress { .. }))
3159        );
3160        assert!(
3161            send_progress
3162                .iter()
3163                .any(|event| matches!(event, TransferEvent::SessionFinished { .. }))
3164        );
3165        assert!(matches!(
3166            receive_progress.first(),
3167            Some(TransferEvent::SessionStarted {
3168                direction: TransferDirection::Receive,
3169                ..
3170            })
3171        ));
3172        assert!(
3173            receive_progress
3174                .iter()
3175                .any(|event| matches!(event, TransferEvent::Progress { .. }))
3176        );
3177        assert!(
3178            receive_progress
3179                .iter()
3180                .any(|event| matches!(event, TransferEvent::SessionFinished { .. }))
3181        );
3182    }
3183
3184    #[tokio::test]
3185    async fn direct_transfer_rejects_unexpected_receiver_fingerprint_before_payload() {
3186        let source_dir = tempfile::tempdir().expect("source tempdir");
3187        let dest_dir = tempfile::tempdir().expect("dest tempdir");
3188        let sender_identity_dir = tempfile::tempdir().expect("sender identity tempdir");
3189        let receiver_identity_dir = tempfile::tempdir().expect("receiver identity tempdir");
3190        let other_identity_dir = tempfile::tempdir().expect("other identity tempdir");
3191        let file_path = source_dir.path().join("hello.txt");
3192        tokio::fs::write(&file_path, b"hello from ferry")
3193            .await
3194            .expect("source file written");
3195
3196        let sender_identity = NativeIdentity::load_or_generate_in(sender_identity_dir.path())
3197            .expect("sender identity");
3198        let receiver_identity = NativeIdentity::load_or_generate_in(receiver_identity_dir.path())
3199            .expect("receiver identity");
3200        let other_identity =
3201            NativeIdentity::load_or_generate_in(other_identity_dir.path()).expect("other identity");
3202        let reserved_socket =
3203            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3204        let recv_addr = reserved_socket.local_addr().expect("local addr");
3205        drop(reserved_socket);
3206        let dest_path = dest_dir.path().to_path_buf();
3207        let server = tokio::spawn(async move {
3208            receive_direct_file(
3209                &ReceiveRequest::new(recv_addr),
3210                dest_path,
3211                &receiver_identity,
3212                |_| {},
3213            )
3214            .await
3215        });
3216
3217        tokio::time::sleep(Duration::from_millis(100)).await;
3218
3219        let peer = DirectPeer::from_address(recv_addr)
3220            .with_expected_fingerprint(other_identity.fingerprint().to_string())
3221            .expect("expected fingerprint accepted");
3222        let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
3223        let error = send_direct_file(&send_request, &sender_identity, |_| {})
3224            .await
3225            .expect_err("fingerprint mismatch should reject send");
3226
3227        assert!(matches!(error, Error::PeerFingerprintMismatch { .. }));
3228        let server_error = server
3229            .await
3230            .expect("server joined")
3231            .expect_err("server closed");
3232        assert!(matches!(
3233            server_error,
3234            Error::Connection(_) | Error::NoIncomingConnection
3235        ));
3236        assert!(
3237            !tokio::fs::try_exists(dest_dir.path().join("hello.txt"))
3238                .await
3239                .expect("destination checked")
3240        );
3241    }
3242
3243    #[tokio::test]
3244    async fn direct_quic_loopback_sends_directory_and_resumes_partial_file() {
3245        let source_dir = tempfile::tempdir().expect("source tempdir");
3246        let dest_dir = tempfile::tempdir().expect("dest tempdir");
3247        let identity_dir = tempfile::tempdir().expect("identity tempdir");
3248        let bundle = source_dir.path().join("bundle");
3249        let nested = bundle.join("nested");
3250        tokio::fs::create_dir_all(&nested)
3251            .await
3252            .expect("source dirs");
3253        tokio::fs::write(nested.join("small.txt"), b"small")
3254            .await
3255            .expect("small file");
3256        let large_bytes = vec![7; (DEFAULT_CHUNK_SIZE as usize * 2) + 17];
3257        tokio::fs::write(bundle.join("large.bin"), &large_bytes)
3258            .await
3259            .expect("large file");
3260
3261        let dest_bundle = dest_dir.path().join("bundle");
3262        tokio::fs::create_dir_all(&dest_bundle)
3263            .await
3264            .expect("dest bundle");
3265        tokio::fs::write(
3266            dest_bundle.join("large.bin"),
3267            &large_bytes[..DEFAULT_CHUNK_SIZE as usize + 5],
3268        )
3269        .await
3270        .expect("partial large");
3271
3272        let identity =
3273            NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3274        let recv_identity = identity.clone();
3275        let reserved_socket =
3276            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3277        let recv_addr = reserved_socket.local_addr().expect("local addr");
3278        drop(reserved_socket);
3279        let dest_path = dest_dir.path().to_path_buf();
3280        let server = tokio::spawn(async move {
3281            receive_direct_file(
3282                &ReceiveRequest::new(recv_addr),
3283                dest_path,
3284                &recv_identity,
3285                |_| {},
3286            )
3287            .await
3288        });
3289
3290        tokio::time::sleep(Duration::from_millis(100)).await;
3291
3292        let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3293        let send_request = SendRequest::new(peer, vec![bundle]).expect("send request is valid");
3294        let send_summary = send_direct_file(&send_request, &identity, |_| {})
3295            .await
3296            .expect("directory sent");
3297        let receive_summary = server
3298            .await
3299            .expect("server joined")
3300            .expect("directory received");
3301
3302        let received_large = tokio::fs::read(dest_bundle.join("large.bin"))
3303            .await
3304            .expect("large file readable");
3305        let received_small = tokio::fs::read(dest_bundle.join("nested/small.txt"))
3306            .await
3307            .expect("small file readable");
3308        assert_eq!(received_large, large_bytes);
3309        assert_eq!(received_small, b"small");
3310        assert_eq!(send_summary.bytes, receive_summary.bytes);
3311        assert!(
3312            receive_summary
3313                .files
3314                .iter()
3315                .any(|file| file.status == TransferFileStatus::Resumed)
3316        );
3317    }
3318
3319    #[tokio::test]
3320    async fn direct_quic_loopback_cancels_send_and_does_not_finalize_partial_file() {
3321        let source_dir = tempfile::tempdir().expect("source tempdir");
3322        let dest_dir = tempfile::tempdir().expect("dest tempdir");
3323        let identity_dir = tempfile::tempdir().expect("identity tempdir");
3324        let file_path = source_dir.path().join("payload.bin");
3325        tokio::fs::write(&file_path, vec![3; IO_BUFFER_SIZE * 64])
3326            .await
3327            .expect("source file written");
3328
3329        let identity =
3330            NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3331        let recv_identity = identity.clone();
3332        let reserved_socket =
3333            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3334        let recv_addr = reserved_socket.local_addr().expect("local addr");
3335        drop(reserved_socket);
3336
3337        let dest_path = dest_dir.path().to_path_buf();
3338        let server = tokio::spawn(async move {
3339            receive_direct_file(
3340                &ReceiveRequest::new(recv_addr),
3341                dest_path,
3342                &recv_identity,
3343                |_| {},
3344            )
3345            .await
3346        });
3347
3348        tokio::time::sleep(Duration::from_millis(100)).await;
3349
3350        let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3351        let send_request =
3352            SendRequest::new(peer, vec![file_path.clone()]).expect("send request is valid");
3353        let send_control = TransferControl::new();
3354        let cancel_on_progress = send_control.clone();
3355        let send_events = Arc::new(Mutex::new(Vec::new()));
3356        let send_events_log = send_events.clone();
3357        let send_result =
3358            send_direct_file_with_control(&send_request, &identity, send_control, |event| {
3359                if matches!(event, TransferEvent::Progress { .. }) {
3360                    cancel_on_progress.cancel();
3361                }
3362                send_events_log.lock().expect("events lock").push(event);
3363            })
3364            .await;
3365
3366        assert!(matches!(send_result, Err(Error::TransferCancelled)));
3367
3368        let receive_result = tokio::time::timeout(Duration::from_secs(5), server)
3369            .await
3370            .expect("receiver should finish after sender cancellation")
3371            .expect("server joined");
3372        assert!(receive_result.is_err());
3373
3374        assert!(!dest_dir.path().join("payload.bin").exists());
3375        assert!(
3376            send_events
3377                .lock()
3378                .expect("events lock")
3379                .iter()
3380                .any(|event| matches!(event, TransferEvent::SessionCancelled { .. }))
3381        );
3382    }
3383}