Skip to main content

ferry_core/
lib.rs

1pub mod error;
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::fs::Metadata;
5use std::io::SeekFrom;
6use std::net::SocketAddr;
7use std::path::{Component, Path, PathBuf};
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12use directories::BaseDirs;
13use mdns_sd::{ResolvedService, ServiceDaemon, ServiceEvent, ServiceInfo};
14use quinn::crypto::rustls::QuicClientConfig;
15use rcgen::{CertifiedKey, generate_simple_self_signed};
16use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier};
17use rustls::crypto::{CryptoProvider, verify_tls12_signature, verify_tls13_signature};
18use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer, ServerName, UnixTime};
19use rustls::{DigitallySignedStruct, SignatureScheme};
20use serde::{Deserialize, Serialize};
21use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
22use uuid::Uuid;
23
24pub use error::{Error, Result};
25
26pub const NATIVE_PROTOCOL_VERSION: u16 = 1;
27pub const MIN_NATIVE_PROTOCOL_VERSION: u16 = 1;
28pub const NATIVE_SERVICE_TYPE: &str = "_ferry._udp.local.";
29
30const DIRECT_MAGIC: &[u8; 8] = b"FERRY01\n";
31const MAX_FRAME_LEN: usize = 16 * 1024 * 1024;
32const IO_BUFFER_SIZE: usize = 64 * 1024;
33const MANIFEST_VERSION: u16 = 1;
34const DEFAULT_CHUNK_SIZE: u64 = 1024 * 1024;
35const CONFIG_DIR_NAME: &str = "ferry";
36
37#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
38pub struct DirectPeer {
39    address: SocketAddr,
40}
41
42impl DirectPeer {
43    pub fn parse(value: &str) -> Result<Self> {
44        Ok(Self {
45            address: value.parse()?,
46        })
47    }
48
49    pub const fn address(&self) -> SocketAddr {
50        self.address
51    }
52
53    pub const fn from_address(address: SocketAddr) -> Self {
54        Self { address }
55    }
56}
57
58#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
59pub struct SendRequest {
60    peer: DirectPeer,
61    paths: Vec<PathBuf>,
62}
63
64impl SendRequest {
65    pub fn new(peer: DirectPeer, paths: Vec<PathBuf>) -> Result<Self> {
66        if paths.is_empty() {
67            return Err(Error::InvalidInput(
68                "at least one file path is required".to_string(),
69            ));
70        }
71
72        Ok(Self { peer, paths })
73    }
74
75    pub const fn peer(&self) -> &DirectPeer {
76        &self.peer
77    }
78
79    pub fn paths(&self) -> &[PathBuf] {
80        &self.paths
81    }
82}
83
84#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
85pub struct ReceiveRequest {
86    listen: SocketAddr,
87}
88
89impl ReceiveRequest {
90    pub fn new(listen: SocketAddr) -> Self {
91        Self { listen }
92    }
93
94    pub const fn listen(&self) -> SocketAddr {
95        self.listen
96    }
97}
98
99#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
100pub struct AppConfig {
101    pub alias: String,
102    pub listen_port: u16,
103    pub quic_port: u16,
104    pub download_dir: String,
105    pub auto_accept_known: bool,
106    pub auto_accept_unknown: bool,
107    pub discovery: Vec<String>,
108    pub max_concurrent_files: usize,
109    pub trust: TrustConfig,
110}
111
112impl Default for AppConfig {
113    fn default() -> Self {
114        Self {
115            alias: default_alias(),
116            listen_port: 53317,
117            quic_port: 53318,
118            download_dir: "~/Downloads/ferry".to_string(),
119            auto_accept_known: true,
120            auto_accept_unknown: false,
121            discovery: vec!["native".to_string()],
122            max_concurrent_files: 8,
123            trust: TrustConfig::default(),
124        }
125    }
126}
127
128impl AppConfig {
129    pub fn load_or_default() -> Result<Self> {
130        Self::load_or_default_from(config_dir()?.join("config.toml"))
131    }
132
133    pub fn load_or_default_from(path: impl AsRef<Path>) -> Result<Self> {
134        let path = path.as_ref();
135        match std::fs::read_to_string(path) {
136            Ok(contents) => {
137                toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
138            }
139            Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Self::default()),
140            Err(source) => Err(Error::IoPath {
141                path: path.to_path_buf(),
142                source,
143            }),
144        }
145    }
146
147    pub fn save_default_path(&self) -> Result<()> {
148        self.save_to_path(config_dir()?.join("config.toml"))
149    }
150
151    pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
152        write_toml_file(path.as_ref(), self)
153    }
154
155    pub fn to_toml_string(&self) -> Result<String> {
156        toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
157    }
158
159    pub fn redacted(&self) -> Self {
160        let mut config = self.clone();
161        config.trust = config.trust.redacted();
162        config
163    }
164
165    pub fn to_redacted_toml_string(&self) -> Result<String> {
166        self.redacted().to_toml_string()
167    }
168}
169
170#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
171pub struct DaemonConfig {
172    pub listen: SocketAddr,
173    pub destination: PathBuf,
174}
175
176impl DaemonConfig {
177    pub fn from_app_config(config: &AppConfig) -> Result<Self> {
178        Ok(Self {
179            listen: SocketAddr::from(([0, 0, 0, 0], config.quic_port)),
180            destination: expand_home_path(&config.download_dir)?,
181        })
182    }
183
184    pub fn load_or_default() -> Result<Self> {
185        let app_config = AppConfig::load_or_default()?;
186        Self::load_or_default_from(config_dir()?.join("daemon.toml"), &app_config)
187    }
188
189    pub fn load_or_default_from(path: impl AsRef<Path>, app_config: &AppConfig) -> Result<Self> {
190        let path = path.as_ref();
191        match std::fs::read_to_string(path) {
192            Ok(contents) => {
193                toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
194            }
195            Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
196                Self::from_app_config(app_config)
197            }
198            Err(source) => Err(Error::IoPath {
199                path: path.to_path_buf(),
200                source,
201            }),
202        }
203    }
204
205    pub fn save_default_path(&self) -> Result<()> {
206        self.save_to_path(config_dir()?.join("daemon.toml"))
207    }
208
209    pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
210        write_toml_file(path.as_ref(), self)
211    }
212
213    pub fn to_toml_string(&self) -> Result<String> {
214        toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
215    }
216}
217
218#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
219pub struct TrustConfig {
220    pub require_fingerprint: bool,
221    pub psk: String,
222}
223
224impl Default for TrustConfig {
225    fn default() -> Self {
226        Self {
227            require_fingerprint: true,
228            psk: String::new(),
229        }
230    }
231}
232
233impl TrustConfig {
234    pub fn redacted(&self) -> Self {
235        let psk = if self.psk.is_empty() {
236            String::new()
237        } else {
238            "<redacted>".to_string()
239        };
240        Self {
241            require_fingerprint: self.require_fingerprint,
242            psk,
243        }
244    }
245}
246
247#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
248pub struct NativeIdentity {
249    cert_der: Vec<u8>,
250    key_der: Vec<u8>,
251    fingerprint: String,
252}
253
254impl NativeIdentity {
255    pub fn load_or_generate() -> Result<Self> {
256        Self::load_or_generate_in(config_dir()?)
257    }
258
259    pub fn load_or_generate_in(config_dir: impl AsRef<Path>) -> Result<Self> {
260        let config_dir = config_dir.as_ref();
261        let cert_path = config_dir.join("identity.cert.der");
262        let key_path = config_dir.join("identity.key.der");
263
264        if cert_path.exists() && key_path.exists() {
265            let cert_der = std::fs::read(&cert_path).map_err(|source| Error::IoPath {
266                path: cert_path,
267                source,
268            })?;
269            let key_der = std::fs::read(&key_path).map_err(|source| Error::IoPath {
270                path: key_path,
271                source,
272            })?;
273
274            return Ok(Self::from_parts(cert_der, key_der));
275        }
276
277        std::fs::create_dir_all(config_dir).map_err(|source| Error::IoPath {
278            path: config_dir.to_path_buf(),
279            source,
280        })?;
281
282        let identity = Self::generate()?;
283        write_private_file(&cert_path, &identity.cert_der)?;
284        write_private_file(&key_path, &identity.key_der)?;
285        Ok(identity)
286    }
287
288    pub fn generate() -> Result<Self> {
289        let CertifiedKey { cert, signing_key } =
290            generate_simple_self_signed(vec!["localhost".to_string()])?;
291        Ok(Self::from_parts(
292            cert.der().to_vec(),
293            signing_key.serialize_der(),
294        ))
295    }
296
297    pub fn fingerprint(&self) -> &str {
298        &self.fingerprint
299    }
300
301    fn cert_chain(&self) -> Vec<CertificateDer<'static>> {
302        vec![CertificateDer::from(self.cert_der.clone())]
303    }
304
305    fn private_key(&self) -> PrivateKeyDer<'static> {
306        PrivateKeyDer::from(PrivatePkcs8KeyDer::from(self.key_der.clone()))
307    }
308
309    fn from_parts(cert_der: Vec<u8>, key_der: Vec<u8>) -> Self {
310        let fingerprint = blake3::hash(&cert_der).to_hex().to_string();
311        Self {
312            cert_der,
313            key_der,
314            fingerprint,
315        }
316    }
317}
318
319#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
320#[serde(rename_all = "snake_case")]
321pub enum TransferDirection {
322    Send,
323    Receive,
324}
325
326#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
327pub struct TransferProgress {
328    pub direction: TransferDirection,
329    pub file_name: String,
330    pub bytes_done: u64,
331    pub bytes_total: u64,
332}
333
334#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
335#[serde(tag = "event", rename_all = "snake_case")]
336pub enum TransferEvent {
337    SessionStarted {
338        direction: TransferDirection,
339        session_id: Uuid,
340        files_total: usize,
341        bytes_total: u64,
342    },
343    FileStarted {
344        direction: TransferDirection,
345        file_name: String,
346        bytes_total: u64,
347        resume_offset: u64,
348    },
349    Progress {
350        direction: TransferDirection,
351        file_name: String,
352        bytes_done: u64,
353        bytes_total: u64,
354    },
355    FileFinished {
356        direction: TransferDirection,
357        file_name: String,
358        bytes: u64,
359        blake3: String,
360        status: TransferFileStatus,
361    },
362    SessionFinished {
363        direction: TransferDirection,
364        files_total: usize,
365        bytes_total: u64,
366        blake3: String,
367    },
368    SessionCancelled {
369        direction: TransferDirection,
370        session_id: Uuid,
371    },
372}
373
374impl TransferEvent {
375    pub fn progress(&self) -> Option<TransferProgress> {
376        match self {
377            Self::Progress {
378                direction,
379                file_name,
380                bytes_done,
381                bytes_total,
382            } => Some(TransferProgress {
383                direction: *direction,
384                file_name: file_name.clone(),
385                bytes_done: *bytes_done,
386                bytes_total: *bytes_total,
387            }),
388            _ => None,
389        }
390    }
391}
392
393#[derive(Debug, Clone, Default)]
394pub struct TransferControl {
395    cancelled: Arc<AtomicBool>,
396}
397
398impl TransferControl {
399    pub fn new() -> Self {
400        Self::default()
401    }
402
403    pub fn cancel(&self) {
404        self.cancelled.store(true, Ordering::SeqCst);
405    }
406
407    pub fn is_cancelled(&self) -> bool {
408        self.cancelled.load(Ordering::SeqCst)
409    }
410
411    fn check_cancelled(&self) -> Result<()> {
412        if self.is_cancelled() {
413            Err(Error::TransferCancelled)
414        } else {
415            Ok(())
416        }
417    }
418}
419
420#[derive(Debug, Clone, PartialEq, Eq)]
421pub struct TransferSummary {
422    pub path: PathBuf,
423    pub file_name: String,
424    pub bytes: u64,
425    pub blake3: String,
426    pub files: Vec<TransferFileSummary>,
427}
428
429#[derive(Debug, Clone, PartialEq, Eq)]
430pub struct TransferFileSummary {
431    pub path: PathBuf,
432    pub relative_path: PathBuf,
433    pub bytes: u64,
434    pub blake3: String,
435    pub status: TransferFileStatus,
436}
437
438#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
439#[serde(rename_all = "snake_case")]
440pub enum TransferFileStatus {
441    Sent,
442    Received,
443    Skipped,
444    Resumed,
445}
446
447#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
448pub struct TransferManifest {
449    pub version: u16,
450    pub session_id: Uuid,
451    pub chunk_size: u64,
452    pub entries: Vec<ManifestEntry>,
453}
454
455impl TransferManifest {
456    pub fn file_entries(&self) -> impl Iterator<Item = &ManifestEntry> {
457        self.entries
458            .iter()
459            .filter(|entry| entry.kind == ManifestEntryKind::File)
460    }
461
462    pub fn total_file_bytes(&self) -> u64 {
463        self.file_entries().map(|entry| entry.size).sum()
464    }
465
466    pub fn digest(&self) -> Result<String> {
467        let bytes = serde_json::to_vec(self).map_err(|error| Error::Protocol(error.to_string()))?;
468        Ok(blake3::hash(&bytes).to_hex().to_string())
469    }
470}
471
472#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
473pub struct ManifestEntry {
474    pub relative_path: PathBuf,
475    pub kind: ManifestEntryKind,
476    pub size: u64,
477    pub blake3: Option<String>,
478    pub chunks: Vec<String>,
479    pub unix_mode: Option<u32>,
480    pub modified_unix_ms: Option<i128>,
481}
482
483#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
484#[serde(rename_all = "snake_case")]
485pub enum ManifestEntryKind {
486    File,
487    Directory,
488}
489
490#[derive(Debug, Clone, PartialEq, Eq)]
491pub enum ResumeDecision {
492    Create,
493    Skip,
494    ResumeFrom(u64),
495    Conflict(String),
496}
497
498#[derive(Debug, Clone, PartialEq, Eq)]
499pub struct PeerObservation {
500    pub fingerprint: String,
501    pub alias: Option<String>,
502    pub hostname: Option<String>,
503    pub address: SocketAddr,
504    pub transports: BTreeSet<String>,
505    pub seen_unix_ms: i128,
506}
507
508impl PeerObservation {
509    pub fn new(fingerprint: impl Into<String>, address: SocketAddr, seen_unix_ms: i128) -> Self {
510        Self {
511            fingerprint: fingerprint.into(),
512            alias: None,
513            hostname: None,
514            address,
515            transports: BTreeSet::new(),
516            seen_unix_ms,
517        }
518    }
519
520    pub fn with_alias(mut self, alias: impl Into<String>) -> Self {
521        self.alias = Some(alias.into());
522        self
523    }
524
525    pub fn with_hostname(mut self, hostname: impl Into<String>) -> Self {
526        self.hostname = Some(hostname.into());
527        self
528    }
529
530    pub fn with_transport(mut self, transport: impl Into<String>) -> Self {
531        self.transports.insert(transport.into());
532        self
533    }
534}
535
536#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
537pub struct PeerRecord {
538    pub fingerprint: String,
539    pub aliases: BTreeSet<String>,
540    pub hostnames: BTreeSet<String>,
541    pub addresses: BTreeSet<SocketAddr>,
542    pub transports: BTreeSet<String>,
543    pub first_seen_unix_ms: i128,
544    pub last_seen_unix_ms: i128,
545}
546
547impl PeerRecord {
548    fn from_observation(observation: PeerObservation) -> Self {
549        let mut aliases = BTreeSet::new();
550        if let Some(alias) = observation.alias {
551            aliases.insert(alias);
552        }
553
554        let mut hostnames = BTreeSet::new();
555        if let Some(hostname) = observation.hostname {
556            hostnames.insert(hostname);
557        }
558
559        let mut addresses = BTreeSet::new();
560        addresses.insert(observation.address);
561
562        Self {
563            fingerprint: observation.fingerprint,
564            aliases,
565            hostnames,
566            addresses,
567            transports: observation.transports,
568            first_seen_unix_ms: observation.seen_unix_ms,
569            last_seen_unix_ms: observation.seen_unix_ms,
570        }
571    }
572
573    fn merge(&mut self, observation: PeerObservation) {
574        if let Some(alias) = observation.alias {
575            self.aliases.insert(alias);
576        }
577        if let Some(hostname) = observation.hostname {
578            self.hostnames.insert(hostname);
579        }
580        self.addresses.insert(observation.address);
581        self.transports.extend(observation.transports);
582        self.first_seen_unix_ms = self.first_seen_unix_ms.min(observation.seen_unix_ms);
583        self.last_seen_unix_ms = self.last_seen_unix_ms.max(observation.seen_unix_ms);
584    }
585
586    pub fn preferred_quic_address(&self) -> Option<SocketAddr> {
587        if !self.transports.contains("quic") {
588            return None;
589        }
590
591        self.addresses.iter().copied().next()
592    }
593}
594
595#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
596#[serde(rename_all = "snake_case")]
597pub enum TrustState {
598    Trusted,
599    Blocked,
600}
601
602#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
603pub struct KnownPeerEntry {
604    pub fingerprint: String,
605    #[serde(default)]
606    pub aliases: BTreeSet<String>,
607    #[serde(default)]
608    pub hostnames: BTreeSet<String>,
609    #[serde(default)]
610    pub addresses: BTreeSet<SocketAddr>,
611    #[serde(default)]
612    pub transports: BTreeSet<String>,
613    pub trust_state: TrustState,
614    pub first_seen_unix_ms: Option<i128>,
615    pub last_seen_unix_ms: Option<i128>,
616}
617
618impl KnownPeerEntry {
619    pub fn trusted(fingerprint: impl Into<String>) -> Result<Self> {
620        let fingerprint = normalized_fingerprint(fingerprint.into())?;
621        Ok(Self {
622            fingerprint,
623            aliases: BTreeSet::new(),
624            hostnames: BTreeSet::new(),
625            addresses: BTreeSet::new(),
626            transports: BTreeSet::new(),
627            trust_state: TrustState::Trusted,
628            first_seen_unix_ms: None,
629            last_seen_unix_ms: None,
630        })
631    }
632
633    pub fn from_record(record: &PeerRecord, trust_state: TrustState) -> Result<Self> {
634        let fingerprint = normalized_fingerprint(record.fingerprint.clone())?;
635        Ok(Self {
636            fingerprint,
637            aliases: record.aliases.clone(),
638            hostnames: record.hostnames.clone(),
639            addresses: record.addresses.clone(),
640            transports: record.transports.clone(),
641            trust_state,
642            first_seen_unix_ms: Some(record.first_seen_unix_ms),
643            last_seen_unix_ms: Some(record.last_seen_unix_ms),
644        })
645    }
646}
647
648#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
649pub struct TrustStore {
650    #[serde(default)]
651    pub peers: BTreeMap<String, KnownPeerEntry>,
652}
653
654impl TrustStore {
655    pub fn load_or_default() -> Result<Self> {
656        Self::load_or_default_from(config_dir()?.join("known_peers.toml"))
657    }
658
659    pub fn load_or_default_from(path: impl AsRef<Path>) -> Result<Self> {
660        let path = path.as_ref();
661        match std::fs::read_to_string(path) {
662            Ok(contents) => {
663                toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
664            }
665            Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Self::default()),
666            Err(source) => Err(Error::IoPath {
667                path: path.to_path_buf(),
668                source,
669            }),
670        }
671    }
672
673    pub fn save_default_path(&self) -> Result<()> {
674        self.save_to_path(config_dir()?.join("known_peers.toml"))
675    }
676
677    pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
678        write_toml_file(path.as_ref(), self)
679    }
680
681    pub fn trust_fingerprint(&mut self, fingerprint: impl Into<String>) -> Result<&KnownPeerEntry> {
682        let entry = KnownPeerEntry::trusted(fingerprint)?;
683        let fingerprint = entry.fingerprint.clone();
684        self.peers
685            .entry(fingerprint.clone())
686            .and_modify(|existing| existing.trust_state = TrustState::Trusted)
687            .or_insert(entry);
688        Ok(self
689            .peers
690            .get(&fingerprint)
691            .expect("trusted peer entry should exist"))
692    }
693
694    pub fn trust_record(&mut self, record: &PeerRecord) -> Result<&KnownPeerEntry> {
695        let entry = KnownPeerEntry::from_record(record, TrustState::Trusted)?;
696        let fingerprint = entry.fingerprint.clone();
697        self.peers
698            .entry(fingerprint.clone())
699            .and_modify(|existing| {
700                existing.aliases.extend(record.aliases.clone());
701                existing.hostnames.extend(record.hostnames.clone());
702                existing.addresses.extend(record.addresses.clone());
703                existing.transports.extend(record.transports.clone());
704                existing.first_seen_unix_ms = Some(
705                    existing
706                        .first_seen_unix_ms
707                        .unwrap_or(record.first_seen_unix_ms)
708                        .min(record.first_seen_unix_ms),
709                );
710                existing.last_seen_unix_ms = Some(
711                    existing
712                        .last_seen_unix_ms
713                        .unwrap_or(record.last_seen_unix_ms)
714                        .max(record.last_seen_unix_ms),
715                );
716                existing.trust_state = TrustState::Trusted;
717            })
718            .or_insert(entry);
719        Ok(self
720            .peers
721            .get(&fingerprint)
722            .expect("trusted peer entry should exist"))
723    }
724
725    pub fn forget(&mut self, fingerprint: &str) -> bool {
726        self.peers.remove(fingerprint).is_some()
727    }
728
729    pub fn records(&self) -> impl Iterator<Item = &KnownPeerEntry> {
730        self.peers.values()
731    }
732
733    pub fn to_toml_string(&self) -> Result<String> {
734        toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
735    }
736}
737
738#[derive(Debug, Default, Clone, PartialEq, Eq)]
739pub struct PeerRegistry {
740    peers: BTreeMap<String, PeerRecord>,
741}
742
743#[derive(Debug, Clone, PartialEq, Eq)]
744pub enum PeerLookup<'a> {
745    Found(&'a PeerRecord),
746    Ambiguous(Vec<&'a PeerRecord>),
747    Missing,
748}
749
750impl PeerRegistry {
751    pub fn new() -> Self {
752        Self::default()
753    }
754
755    pub fn observe(&mut self, observation: PeerObservation) -> Result<&PeerRecord> {
756        if observation.fingerprint.trim().is_empty() {
757            return Err(Error::InvalidInput(
758                "peer fingerprint must not be empty".to_string(),
759            ));
760        }
761
762        let fingerprint = observation.fingerprint.clone();
763        if let Some(record) = self.peers.get_mut(&fingerprint) {
764            record.merge(observation);
765        } else {
766            self.peers.insert(
767                fingerprint.clone(),
768                PeerRecord::from_observation(observation),
769            );
770        }
771
772        Ok(self
773            .peers
774            .get(&fingerprint)
775            .expect("observed peer record should exist"))
776    }
777
778    pub fn get_by_fingerprint(&self, fingerprint: &str) -> Option<&PeerRecord> {
779        self.peers.get(fingerprint)
780    }
781
782    pub fn lookup(&self, query: &str) -> Option<&PeerRecord> {
783        match self.lookup_detail(query) {
784            PeerLookup::Found(record) => Some(record),
785            PeerLookup::Ambiguous(_) | PeerLookup::Missing => None,
786        }
787    }
788
789    pub fn lookup_detail(&self, query: &str) -> PeerLookup<'_> {
790        if let Some(record) = self.peers.get(query) {
791            return PeerLookup::Found(record);
792        }
793
794        let matches = self
795            .peers
796            .values()
797            .filter(|record| {
798                record.fingerprint.starts_with(query)
799                    || record.aliases.iter().any(|alias| alias == query)
800                    || record.hostnames.iter().any(|hostname| hostname == query)
801            })
802            .collect::<Vec<_>>();
803
804        match matches.as_slice() {
805            [] => PeerLookup::Missing,
806            [record] => PeerLookup::Found(record),
807            _ => PeerLookup::Ambiguous(matches),
808        }
809    }
810
811    pub fn records(&self) -> impl Iterator<Item = &PeerRecord> {
812        self.peers.values()
813    }
814}
815
816#[derive(Debug, Clone, PartialEq, Eq)]
817pub struct NativeAnnouncement {
818    pub alias: String,
819    pub fingerprint: String,
820    pub quic_port: u16,
821    pub listen_port: Option<u16>,
822}
823
824impl NativeAnnouncement {
825    pub fn from_config(config: &AppConfig, identity: &NativeIdentity) -> Self {
826        Self {
827            alias: config.alias.clone(),
828            fingerprint: identity.fingerprint().to_string(),
829            quic_port: config.quic_port,
830            listen_port: Some(config.listen_port),
831        }
832    }
833
834    fn service_info(&self) -> Result<ServiceInfo> {
835        let alias = sanitize_dns_label(&self.alias);
836        let fingerprint_prefix = self
837            .fingerprint
838            .get(..12)
839            .unwrap_or(self.fingerprint.as_str());
840        let instance = format!("{alias}-{fingerprint_prefix}");
841        let hostname = format!("{alias}.local.");
842        let properties = [
843            ("pv", NATIVE_PROTOCOL_VERSION.to_string()),
844            ("minpv", MIN_NATIVE_PROTOCOL_VERSION.to_string()),
845            ("fp", self.fingerprint.clone()),
846            ("alias", self.alias.clone()),
847            ("transports", "quic".to_string()),
848            ("quic_port", self.quic_port.to_string()),
849            (
850                "listen_port",
851                self.listen_port
852                    .map(|port| port.to_string())
853                    .unwrap_or_default(),
854            ),
855        ];
856
857        ServiceInfo::new(
858            NATIVE_SERVICE_TYPE,
859            &instance,
860            &hostname,
861            "",
862            self.quic_port,
863            &properties[..],
864        )
865        .map(ServiceInfo::enable_addr_auto)
866        .map_err(|error| Error::Discovery(error.to_string()))
867    }
868}
869
870pub struct NativeAnnouncer {
871    daemon: ServiceDaemon,
872    fullname: String,
873}
874
875impl NativeAnnouncer {
876    pub fn start(announcement: &NativeAnnouncement) -> Result<Self> {
877        let daemon = ServiceDaemon::new().map_err(|error| Error::Discovery(error.to_string()))?;
878        let service = announcement.service_info()?;
879        let fullname = service.get_fullname().to_string();
880        daemon
881            .register(service)
882            .map_err(|error| Error::Discovery(error.to_string()))?;
883        Ok(Self { daemon, fullname })
884    }
885}
886
887impl Drop for NativeAnnouncer {
888    fn drop(&mut self) {
889        let _ = self.daemon.unregister(&self.fullname);
890        let _ = self.daemon.shutdown();
891    }
892}
893
894pub async fn discover_native_peers(timeout: Duration) -> Result<PeerRegistry> {
895    let daemon = ServiceDaemon::new().map_err(|error| Error::Discovery(error.to_string()))?;
896    let receiver = daemon
897        .browse(NATIVE_SERVICE_TYPE)
898        .map_err(|error| Error::Discovery(error.to_string()))?;
899    let deadline = tokio::time::Instant::now() + timeout;
900    let mut registry = PeerRegistry::new();
901
902    loop {
903        let now = tokio::time::Instant::now();
904        if now >= deadline {
905            break;
906        }
907
908        let wait = deadline - now;
909        match tokio::time::timeout(wait, receiver.recv_async()).await {
910            Ok(Ok(ServiceEvent::ServiceResolved(service))) => {
911                observe_resolved_service(&mut registry, &service)?;
912            }
913            Ok(Ok(_)) => {}
914            Ok(Err(error)) => {
915                return Err(Error::Discovery(error.to_string()));
916            }
917            Err(_) => break,
918        }
919    }
920
921    daemon
922        .stop_browse(NATIVE_SERVICE_TYPE)
923        .map_err(|error| Error::Discovery(error.to_string()))?;
924    let _ = daemon.shutdown();
925    Ok(registry)
926}
927
928fn observe_resolved_service(registry: &mut PeerRegistry, service: &ResolvedService) -> Result<()> {
929    let Some(fingerprint) = service.get_property_val_str("fp") else {
930        return Ok(());
931    };
932    let fingerprint = match normalized_fingerprint(fingerprint.to_string()) {
933        Ok(fingerprint) => fingerprint,
934        Err(_) => return Ok(()),
935    };
936    let Some(highest_version) = parse_txt_u16(service, "pv") else {
937        return Ok(());
938    };
939    let Some(minimum_version) = parse_txt_u16(service, "minpv") else {
940        return Ok(());
941    };
942    if minimum_version > highest_version
943        || minimum_version > NATIVE_PROTOCOL_VERSION
944        || highest_version < MIN_NATIVE_PROTOCOL_VERSION
945    {
946        return Ok(());
947    }
948
949    let Some(quic_port) = parse_txt_u16(service, "quic_port") else {
950        return Ok(());
951    };
952    let transports = service
953        .get_property_val_str("transports")
954        .unwrap_or_default()
955        .split(',')
956        .map(str::trim)
957        .filter(|value| !value.is_empty())
958        .map(ToOwned::to_owned)
959        .collect::<Vec<_>>();
960    if !transports.iter().any(|transport| transport == "quic") {
961        return Ok(());
962    }
963
964    let alias = service.get_property_val_str("alias").map(ToOwned::to_owned);
965    let hostname = Some(service.get_hostname().trim_end_matches('.').to_string());
966    let seen_unix_ms = system_time_unix_ms(SystemTime::now()).unwrap_or_default();
967
968    for address in service.get_addresses_v4() {
969        let mut observation = PeerObservation::new(
970            fingerprint.clone(),
971            SocketAddr::from((address, quic_port)),
972            seen_unix_ms,
973        );
974        if let Some(alias) = &alias {
975            observation = observation.with_alias(alias.clone());
976        }
977        if let Some(hostname) = &hostname {
978            observation = observation.with_hostname(hostname.clone());
979        }
980        for transport in &transports {
981            observation = observation.with_transport(transport.clone());
982        }
983        registry.observe(observation)?;
984    }
985
986    Ok(())
987}
988
989fn parse_txt_u16(service: &ResolvedService, key: &str) -> Option<u16> {
990    service.get_property_val_str(key)?.parse().ok()
991}
992
993fn sanitize_dns_label(value: &str) -> String {
994    let mut label = value
995        .chars()
996        .map(|ch| {
997            if ch.is_ascii_alphanumeric() || ch == '-' {
998                ch.to_ascii_lowercase()
999            } else {
1000                '-'
1001            }
1002        })
1003        .collect::<String>();
1004    while label.contains("--") {
1005        label = label.replace("--", "-");
1006    }
1007    let label = label.trim_matches('-').to_string();
1008    if label.is_empty() {
1009        "fileferry-device".to_string()
1010    } else {
1011        label
1012    }
1013}
1014
1015#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1016struct DirectReceivePlan {
1017    files: Vec<DirectFilePlan>,
1018}
1019
1020#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1021struct DirectProtocolHello {
1022    protocol_version: u16,
1023    min_protocol_version: u16,
1024    client_fingerprint: String,
1025}
1026
1027impl DirectProtocolHello {
1028    fn new(identity: &NativeIdentity) -> Self {
1029        Self {
1030            protocol_version: NATIVE_PROTOCOL_VERSION,
1031            min_protocol_version: MIN_NATIVE_PROTOCOL_VERSION,
1032            client_fingerprint: identity.fingerprint().to_string(),
1033        }
1034    }
1035}
1036
1037#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1038struct DirectProtocolResponse {
1039    accepted: bool,
1040    protocol_version: Option<u16>,
1041    error: Option<String>,
1042}
1043
1044impl DirectProtocolResponse {
1045    fn accept(protocol_version: u16) -> Self {
1046        Self {
1047            accepted: true,
1048            protocol_version: Some(protocol_version),
1049            error: None,
1050        }
1051    }
1052
1053    fn reject(error: impl Into<String>) -> Self {
1054        Self {
1055            accepted: false,
1056            protocol_version: None,
1057            error: Some(error.into()),
1058        }
1059    }
1060}
1061
1062#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1063struct DirectFilePlan {
1064    relative_path: PathBuf,
1065    action: DirectFileAction,
1066    offset: u64,
1067}
1068
1069#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1070#[serde(rename_all = "snake_case")]
1071enum DirectFileAction {
1072    Receive,
1073    Skip,
1074}
1075
1076#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1077struct DirectPayloadHeader {
1078    relative_path: PathBuf,
1079    offset: u64,
1080    bytes: u64,
1081}
1082
1083#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1084struct DirectAck {
1085    ok: bool,
1086    error: Option<String>,
1087}
1088
1089pub fn validate_send_paths(paths: &[PathBuf]) -> Result<()> {
1090    if paths.is_empty() {
1091        return Err(Error::InvalidInput(
1092            "at least one file path is required".to_string(),
1093        ));
1094    }
1095
1096    for path in paths {
1097        if path.as_os_str().is_empty() {
1098            return Err(Error::InvalidInput("empty file path".to_string()));
1099        }
1100    }
1101
1102    Ok(())
1103}
1104
1105pub fn display_path(path: &Path) -> String {
1106    path.to_string_lossy().into_owned()
1107}
1108
1109pub async fn build_manifest(paths: &[PathBuf]) -> Result<TransferManifest> {
1110    validate_send_paths(paths)?;
1111
1112    let sources = collect_manifest_sources(paths)?;
1113    let mut entries = Vec::with_capacity(sources.len());
1114    let mut seen = BTreeSet::new();
1115
1116    for source in sources {
1117        if !seen.insert(source.relative_path.clone()) {
1118            return Err(Error::InvalidInput(format!(
1119                "duplicate transfer path: {}",
1120                display_path(&source.relative_path)
1121            )));
1122        }
1123
1124        entries.push(build_manifest_entry(source).await?);
1125    }
1126
1127    Ok(TransferManifest {
1128        version: MANIFEST_VERSION,
1129        session_id: Uuid::new_v4(),
1130        chunk_size: DEFAULT_CHUNK_SIZE,
1131        entries,
1132    })
1133}
1134
1135pub fn safe_destination_path(destination: &Path, relative_path: &Path) -> Result<PathBuf> {
1136    validate_relative_transfer_path(relative_path)?;
1137    Ok(destination.join(relative_path))
1138}
1139
1140pub async fn decide_resume(
1141    destination: &Path,
1142    entry: &ManifestEntry,
1143    chunk_size: u64,
1144) -> Result<ResumeDecision> {
1145    if entry.kind != ManifestEntryKind::File {
1146        return Ok(ResumeDecision::Create);
1147    }
1148
1149    let path = safe_destination_path(destination, &entry.relative_path)?;
1150    let metadata = match tokio::fs::metadata(&path).await {
1151        Ok(metadata) => metadata,
1152        Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
1153            return Ok(ResumeDecision::Create);
1154        }
1155        Err(source) => {
1156            return Err(Error::IoPath { path, source });
1157        }
1158    };
1159
1160    if !metadata.is_file() {
1161        return Ok(ResumeDecision::Conflict(format!(
1162            "{} already exists and is not a regular file",
1163            display_path(&path)
1164        )));
1165    }
1166
1167    if metadata.len() == entry.size {
1168        let expected = entry
1169            .blake3
1170            .as_deref()
1171            .ok_or_else(|| Error::Protocol("file manifest entry is missing BLAKE3".to_string()))?;
1172        let actual = hash_file(&path).await?;
1173        return if actual == expected {
1174            Ok(ResumeDecision::Skip)
1175        } else {
1176            Ok(ResumeDecision::Conflict(format!(
1177                "{} already exists with different contents",
1178                display_path(&path)
1179            )))
1180        };
1181    }
1182
1183    if metadata.len() > entry.size {
1184        return Ok(ResumeDecision::Conflict(format!(
1185            "{} is larger than incoming file",
1186            display_path(&path)
1187        )));
1188    }
1189
1190    let verified_offset = verified_prefix_offset(&path, entry, metadata.len(), chunk_size).await?;
1191    if verified_offset > 0 {
1192        Ok(ResumeDecision::ResumeFrom(verified_offset))
1193    } else {
1194        Ok(ResumeDecision::Conflict(format!(
1195            "{} already exists and does not match a verified prefix",
1196            display_path(&path)
1197        )))
1198    }
1199}
1200
1201pub async fn send_direct_file(
1202    request: &SendRequest,
1203    identity: &NativeIdentity,
1204    mut events: impl FnMut(TransferEvent),
1205) -> Result<TransferSummary> {
1206    send_direct_file_with_control(request, identity, TransferControl::new(), &mut events).await
1207}
1208
1209pub async fn send_direct_file_with_control(
1210    request: &SendRequest,
1211    identity: &NativeIdentity,
1212    control: TransferControl,
1213    mut events: impl FnMut(TransferEvent),
1214) -> Result<TransferSummary> {
1215    ensure_rustls_provider();
1216
1217    control.check_cancelled()?;
1218    let manifest = build_manifest(request.paths()).await?;
1219    let sources = manifest_source_map(request.paths())?;
1220    events(TransferEvent::SessionStarted {
1221        direction: TransferDirection::Send,
1222        session_id: manifest.session_id,
1223        files_total: manifest.file_entries().count(),
1224        bytes_total: manifest.total_file_bytes(),
1225    });
1226    control.check_cancelled()?;
1227
1228    let mut endpoint = quinn::Endpoint::client(SocketAddr::from(([0, 0, 0, 0], 0)))?;
1229    endpoint.set_default_client_config(insecure_client_config()?);
1230    let connection = endpoint
1231        .connect(request.peer().address(), "localhost")?
1232        .await?;
1233    let (mut send, mut recv) = connection.open_bi().await?;
1234
1235    send.write_all(DIRECT_MAGIC).await?;
1236    write_json_frame(&mut send, &DirectProtocolHello::new(identity)).await?;
1237    let protocol: DirectProtocolResponse = read_json_frame(&mut recv).await?;
1238    if !protocol.accepted {
1239        return Err(Error::Protocol(protocol.error.unwrap_or_else(|| {
1240            "receiver rejected protocol negotiation".to_string()
1241        })));
1242    }
1243    if protocol.protocol_version != Some(NATIVE_PROTOCOL_VERSION) {
1244        return Err(Error::Protocol(format!(
1245            "receiver selected unsupported protocol version {:?}",
1246            protocol.protocol_version
1247        )));
1248    }
1249
1250    write_json_frame(&mut send, &manifest).await?;
1251
1252    let plan: DirectReceivePlan = read_json_frame(&mut recv).await?;
1253    let mut summaries = Vec::new();
1254    for file_plan in plan.files {
1255        let Some(entry) = manifest
1256            .file_entries()
1257            .find(|entry| entry.relative_path == file_plan.relative_path)
1258        else {
1259            return Err(Error::Protocol(format!(
1260                "receiver requested unknown file: {}",
1261                display_path(&file_plan.relative_path)
1262            )));
1263        };
1264
1265        let Some(source_path) = sources.get(&file_plan.relative_path) else {
1266            return Err(Error::Protocol(format!(
1267                "missing source for manifest file: {}",
1268                display_path(&file_plan.relative_path)
1269            )));
1270        };
1271
1272        if file_plan.action == DirectFileAction::Skip {
1273            let blake3 = entry.blake3.clone().unwrap_or_default();
1274            events(TransferEvent::FileFinished {
1275                direction: TransferDirection::Send,
1276                file_name: display_path(&entry.relative_path),
1277                bytes: entry.size,
1278                blake3: blake3.clone(),
1279                status: TransferFileStatus::Skipped,
1280            });
1281            summaries.push(TransferFileSummary {
1282                path: source_path.clone(),
1283                relative_path: entry.relative_path.clone(),
1284                bytes: entry.size,
1285                blake3,
1286                status: TransferFileStatus::Skipped,
1287            });
1288            continue;
1289        }
1290
1291        match send_payload_file(
1292            &mut send,
1293            source_path,
1294            entry,
1295            file_plan.offset,
1296            &control,
1297            &mut events,
1298        )
1299        .await
1300        {
1301            Ok(()) => {}
1302            Err(Error::TransferCancelled) => {
1303                events(TransferEvent::SessionCancelled {
1304                    direction: TransferDirection::Send,
1305                    session_id: manifest.session_id,
1306                });
1307                connection.close(1u32.into(), b"cancelled");
1308                return Err(Error::TransferCancelled);
1309            }
1310            Err(error) => return Err(error),
1311        }
1312        let status = if file_plan.offset > 0 {
1313            TransferFileStatus::Resumed
1314        } else {
1315            TransferFileStatus::Sent
1316        };
1317        let blake3 = entry.blake3.clone().unwrap_or_default();
1318        events(TransferEvent::FileFinished {
1319            direction: TransferDirection::Send,
1320            file_name: display_path(&entry.relative_path),
1321            bytes: entry.size,
1322            blake3: blake3.clone(),
1323            status,
1324        });
1325        summaries.push(TransferFileSummary {
1326            path: source_path.clone(),
1327            relative_path: entry.relative_path.clone(),
1328            bytes: entry.size,
1329            blake3,
1330            status,
1331        });
1332    }
1333    send.finish()?;
1334
1335    let ack: DirectAck = read_json_frame(&mut recv).await?;
1336    if !ack.ok {
1337        return Err(Error::Transfer(
1338            ack.error
1339                .unwrap_or_else(|| "receiver rejected transfer".to_string()),
1340        ));
1341    }
1342
1343    connection.close(0u32.into(), b"done");
1344    endpoint.wait_idle().await;
1345
1346    let summary = summary_from_files(&manifest, summaries)?;
1347    events(TransferEvent::SessionFinished {
1348        direction: TransferDirection::Send,
1349        files_total: summary.files.len(),
1350        bytes_total: summary.bytes,
1351        blake3: summary.blake3.clone(),
1352    });
1353    Ok(summary)
1354}
1355
1356pub async fn receive_direct_file(
1357    request: &ReceiveRequest,
1358    destination: impl AsRef<Path>,
1359    identity: &NativeIdentity,
1360    events: impl FnMut(TransferEvent),
1361) -> Result<TransferSummary> {
1362    receive_direct_file_with_control(
1363        request,
1364        destination,
1365        identity,
1366        TransferControl::new(),
1367        events,
1368    )
1369    .await
1370}
1371
1372pub async fn receive_direct_file_with_control(
1373    request: &ReceiveRequest,
1374    destination: impl AsRef<Path>,
1375    identity: &NativeIdentity,
1376    control: TransferControl,
1377    mut events: impl FnMut(TransferEvent),
1378) -> Result<TransferSummary> {
1379    ensure_rustls_provider();
1380
1381    control.check_cancelled()?;
1382    let destination = destination.as_ref();
1383    tokio::fs::create_dir_all(destination)
1384        .await
1385        .map_err(|source| Error::IoPath {
1386            path: destination.to_path_buf(),
1387            source,
1388        })?;
1389
1390    let server_config =
1391        quinn::ServerConfig::with_single_cert(identity.cert_chain(), identity.private_key())?;
1392    let endpoint = quinn::Endpoint::server(server_config, request.listen())?;
1393    let incoming = endpoint.accept().await.ok_or(Error::NoIncomingConnection)?;
1394    let connection = incoming.await?;
1395    let (mut send, mut recv) = connection.accept_bi().await?;
1396
1397    let result =
1398        receive_stream_file(destination, &mut recv, &mut send, &control, &mut events).await;
1399    let ack = match &result {
1400        Ok(_) => DirectAck {
1401            ok: true,
1402            error: None,
1403        },
1404        Err(error) => DirectAck {
1405            ok: false,
1406            error: Some(error.to_string()),
1407        },
1408    };
1409    write_json_frame(&mut send, &ack).await?;
1410    send.finish()?;
1411    let _ = tokio::time::timeout(Duration::from_secs(1), connection.closed()).await;
1412    endpoint.close(0u32.into(), b"done");
1413    endpoint.wait_idle().await;
1414
1415    result
1416}
1417
1418async fn receive_stream_file(
1419    destination: &Path,
1420    recv: &mut quinn::RecvStream,
1421    send: &mut quinn::SendStream,
1422    control: &TransferControl,
1423    events: &mut impl FnMut(TransferEvent),
1424) -> Result<TransferSummary> {
1425    let mut magic = [0; DIRECT_MAGIC.len()];
1426    recv.read_exact(&mut magic).await?;
1427    if &magic != DIRECT_MAGIC {
1428        return Err(Error::Protocol("bad direct-transfer magic".to_string()));
1429    }
1430
1431    let hello: DirectProtocolHello = read_json_frame(recv).await?;
1432    let protocol = negotiate_direct_protocol(&hello);
1433    write_json_frame(send, &protocol).await?;
1434    if !protocol.accepted {
1435        return Err(Error::Protocol(protocol.error.unwrap_or_else(|| {
1436            "unsupported direct protocol version".to_string()
1437        })));
1438    }
1439
1440    let manifest: TransferManifest = read_json_frame(recv).await?;
1441    validate_manifest(&manifest)?;
1442    events(TransferEvent::SessionStarted {
1443        direction: TransferDirection::Receive,
1444        session_id: manifest.session_id,
1445        files_total: manifest.file_entries().count(),
1446        bytes_total: manifest.total_file_bytes(),
1447    });
1448    control.check_cancelled()?;
1449
1450    for entry in &manifest.entries {
1451        if entry.kind == ManifestEntryKind::Directory {
1452            let path = safe_destination_path(destination, &entry.relative_path)?;
1453            tokio::fs::create_dir_all(&path)
1454                .await
1455                .map_err(|source| Error::IoPath { path, source })?;
1456        }
1457    }
1458
1459    let mut plan = DirectReceivePlan { files: Vec::new() };
1460    for entry in manifest.file_entries() {
1461        let decision = decide_resume(destination, entry, manifest.chunk_size).await?;
1462        let (action, offset) = match decision {
1463            ResumeDecision::Create => (DirectFileAction::Receive, 0),
1464            ResumeDecision::Skip => (DirectFileAction::Skip, entry.size),
1465            ResumeDecision::ResumeFrom(offset) => (DirectFileAction::Receive, offset),
1466            ResumeDecision::Conflict(message) => return Err(Error::InvalidInput(message)),
1467        };
1468        plan.files.push(DirectFilePlan {
1469            relative_path: entry.relative_path.clone(),
1470            action,
1471            offset,
1472        });
1473    }
1474    write_json_frame(send, &plan).await?;
1475
1476    let mut summaries = Vec::new();
1477    for file_plan in &plan.files {
1478        let Some(entry) = manifest
1479            .file_entries()
1480            .find(|entry| entry.relative_path == file_plan.relative_path)
1481        else {
1482            return Err(Error::Protocol(format!(
1483                "planned file missing from manifest: {}",
1484                display_path(&file_plan.relative_path)
1485            )));
1486        };
1487
1488        let final_path = safe_destination_path(destination, &entry.relative_path)?;
1489        if file_plan.action == DirectFileAction::Skip {
1490            let blake3 = entry.blake3.clone().unwrap_or_default();
1491            events(TransferEvent::FileFinished {
1492                direction: TransferDirection::Receive,
1493                file_name: display_path(&entry.relative_path),
1494                bytes: entry.size,
1495                blake3: blake3.clone(),
1496                status: TransferFileStatus::Skipped,
1497            });
1498            summaries.push(TransferFileSummary {
1499                path: final_path,
1500                relative_path: entry.relative_path.clone(),
1501                bytes: entry.size,
1502                blake3,
1503                status: TransferFileStatus::Skipped,
1504            });
1505            continue;
1506        }
1507
1508        match receive_payload_file(recv, destination, entry, file_plan.offset, control, events)
1509            .await
1510        {
1511            Ok(()) => {}
1512            Err(Error::TransferCancelled) => {
1513                events(TransferEvent::SessionCancelled {
1514                    direction: TransferDirection::Receive,
1515                    session_id: manifest.session_id,
1516                });
1517                return Err(Error::TransferCancelled);
1518            }
1519            Err(error) => return Err(error),
1520        }
1521        let actual_hash = hash_file(&final_path).await?;
1522        let expected_hash = entry
1523            .blake3
1524            .as_deref()
1525            .ok_or_else(|| Error::Protocol("file manifest entry is missing BLAKE3".to_string()))?;
1526        if actual_hash != expected_hash {
1527            return Err(Error::Transfer(format!(
1528                "BLAKE3 hash mismatch for {}",
1529                display_path(&entry.relative_path)
1530            )));
1531        }
1532
1533        let status = if file_plan.offset > 0 {
1534            TransferFileStatus::Resumed
1535        } else {
1536            TransferFileStatus::Received
1537        };
1538        events(TransferEvent::FileFinished {
1539            direction: TransferDirection::Receive,
1540            file_name: display_path(&entry.relative_path),
1541            bytes: entry.size,
1542            blake3: actual_hash.clone(),
1543            status,
1544        });
1545        summaries.push(TransferFileSummary {
1546            path: final_path,
1547            relative_path: entry.relative_path.clone(),
1548            bytes: entry.size,
1549            blake3: actual_hash,
1550            status,
1551        });
1552    }
1553
1554    let summary = summary_from_files(&manifest, summaries)?;
1555    events(TransferEvent::SessionFinished {
1556        direction: TransferDirection::Receive,
1557        files_total: summary.files.len(),
1558        bytes_total: summary.bytes,
1559        blake3: summary.blake3.clone(),
1560    });
1561    Ok(summary)
1562}
1563
1564async fn send_payload_file(
1565    send: &mut quinn::SendStream,
1566    source_path: &Path,
1567    entry: &ManifestEntry,
1568    offset: u64,
1569    control: &TransferControl,
1570    events: &mut impl FnMut(TransferEvent),
1571) -> Result<()> {
1572    if offset > entry.size {
1573        return Err(Error::Protocol(format!(
1574            "resume offset exceeds file size for {}",
1575            display_path(&entry.relative_path)
1576        )));
1577    }
1578
1579    let header = DirectPayloadHeader {
1580        relative_path: entry.relative_path.clone(),
1581        offset,
1582        bytes: entry.size - offset,
1583    };
1584    write_json_frame(send, &header).await?;
1585    events(TransferEvent::FileStarted {
1586        direction: TransferDirection::Send,
1587        file_name: display_path(&entry.relative_path),
1588        bytes_total: entry.size,
1589        resume_offset: offset,
1590    });
1591
1592    let mut file = tokio::fs::File::open(source_path)
1593        .await
1594        .map_err(|source| Error::IoPath {
1595            path: source_path.to_path_buf(),
1596            source,
1597        })?;
1598    file.seek(SeekFrom::Start(offset))
1599        .await
1600        .map_err(|source| Error::IoPath {
1601            path: source_path.to_path_buf(),
1602            source,
1603        })?;
1604
1605    let mut buffer = vec![0; IO_BUFFER_SIZE];
1606    let mut bytes_done = offset;
1607    while bytes_done < entry.size {
1608        control.check_cancelled()?;
1609        let remaining = entry.size - bytes_done;
1610        let read_size = buffer.len().min(remaining as usize);
1611        let read = file
1612            .read(&mut buffer[..read_size])
1613            .await
1614            .map_err(|source| Error::IoPath {
1615                path: source_path.to_path_buf(),
1616                source,
1617            })?;
1618        if read == 0 {
1619            return Err(Error::Protocol(format!(
1620                "source file ended early: {}",
1621                display_path(source_path)
1622            )));
1623        }
1624        send.write_all(&buffer[..read]).await?;
1625        bytes_done += read as u64;
1626        events(TransferEvent::Progress {
1627            direction: TransferDirection::Send,
1628            file_name: display_path(&entry.relative_path),
1629            bytes_done,
1630            bytes_total: entry.size,
1631        });
1632        control.check_cancelled()?;
1633    }
1634
1635    Ok(())
1636}
1637
1638async fn receive_payload_file(
1639    recv: &mut quinn::RecvStream,
1640    destination: &Path,
1641    entry: &ManifestEntry,
1642    offset: u64,
1643    control: &TransferControl,
1644    events: &mut impl FnMut(TransferEvent),
1645) -> Result<()> {
1646    let header: DirectPayloadHeader = read_json_frame(recv).await?;
1647    if header.relative_path != entry.relative_path
1648        || header.offset != offset
1649        || header.bytes != entry.size - offset
1650    {
1651        return Err(Error::Protocol(format!(
1652            "payload header does not match receive plan for {}",
1653            display_path(&entry.relative_path)
1654        )));
1655    }
1656    events(TransferEvent::FileStarted {
1657        direction: TransferDirection::Receive,
1658        file_name: display_path(&entry.relative_path),
1659        bytes_total: entry.size,
1660        resume_offset: offset,
1661    });
1662
1663    let final_path = safe_destination_path(destination, &entry.relative_path)?;
1664    if let Some(parent) = final_path.parent() {
1665        tokio::fs::create_dir_all(parent)
1666            .await
1667            .map_err(|source| Error::IoPath {
1668                path: parent.to_path_buf(),
1669                source,
1670            })?;
1671    }
1672
1673    let write_path = if offset == 0 {
1674        temp_path_for(&final_path)
1675    } else {
1676        final_path.clone()
1677    };
1678
1679    let mut file = if offset == 0 {
1680        tokio::fs::File::create(&write_path)
1681            .await
1682            .map_err(|source| Error::IoPath {
1683                path: write_path.clone(),
1684                source,
1685            })?
1686    } else {
1687        let mut file = tokio::fs::OpenOptions::new()
1688            .write(true)
1689            .open(&write_path)
1690            .await
1691            .map_err(|source| Error::IoPath {
1692                path: write_path.clone(),
1693                source,
1694            })?;
1695        file.set_len(offset).await.map_err(|source| Error::IoPath {
1696            path: write_path.clone(),
1697            source,
1698        })?;
1699        file.seek(SeekFrom::Start(offset))
1700            .await
1701            .map_err(|source| Error::IoPath {
1702                path: write_path.clone(),
1703                source,
1704            })?;
1705        file
1706    };
1707
1708    let mut bytes_done = offset;
1709    let mut bytes_remaining = header.bytes;
1710    let mut buffer = vec![0; IO_BUFFER_SIZE];
1711    while bytes_remaining > 0 {
1712        if control.is_cancelled() {
1713            if offset == 0 {
1714                let _ = tokio::fs::remove_file(&write_path).await;
1715            }
1716            return Err(Error::TransferCancelled);
1717        }
1718        let read_size = buffer.len().min(bytes_remaining as usize);
1719        let read = match recv.read(&mut buffer[..read_size]).await {
1720            Ok(Some(read)) => read,
1721            Ok(None) => {
1722                if offset == 0 {
1723                    let _ = tokio::fs::remove_file(&write_path).await;
1724                }
1725                return Err(Error::Protocol(
1726                    "stream ended before file payload".to_string(),
1727                ));
1728            }
1729            Err(error) => {
1730                if offset == 0 {
1731                    let _ = tokio::fs::remove_file(&write_path).await;
1732                }
1733                return Err(Error::Read(error));
1734            }
1735        };
1736
1737        file.write_all(&buffer[..read])
1738            .await
1739            .map_err(|source| Error::IoPath {
1740                path: write_path.clone(),
1741                source,
1742            })?;
1743        bytes_done += read as u64;
1744        bytes_remaining -= read as u64;
1745        events(TransferEvent::Progress {
1746            direction: TransferDirection::Receive,
1747            file_name: display_path(&entry.relative_path),
1748            bytes_done,
1749            bytes_total: entry.size,
1750        });
1751        if control.is_cancelled() {
1752            if offset == 0 {
1753                let _ = tokio::fs::remove_file(&write_path).await;
1754            }
1755            return Err(Error::TransferCancelled);
1756        }
1757    }
1758
1759    file.flush().await.map_err(|source| Error::IoPath {
1760        path: write_path.clone(),
1761        source,
1762    })?;
1763    drop(file);
1764
1765    if offset == 0 {
1766        tokio::fs::rename(&write_path, &final_path)
1767            .await
1768            .map_err(|source| Error::IoPath {
1769                path: final_path,
1770                source,
1771            })?;
1772    }
1773
1774    Ok(())
1775}
1776
1777async fn write_json_frame<T: Serialize>(send: &mut quinn::SendStream, value: &T) -> Result<()> {
1778    let bytes = serde_json::to_vec(value).map_err(|error| Error::Protocol(error.to_string()))?;
1779    if bytes.len() > MAX_FRAME_LEN {
1780        return Err(Error::Protocol("frame exceeds maximum length".to_string()));
1781    }
1782    send.write_all(&(bytes.len() as u32).to_be_bytes()).await?;
1783    send.write_all(&bytes).await?;
1784    Ok(())
1785}
1786
1787async fn read_json_frame<T: for<'de> Deserialize<'de>>(recv: &mut quinn::RecvStream) -> Result<T> {
1788    let mut len = [0; 4];
1789    recv.read_exact(&mut len).await?;
1790    let len = u32::from_be_bytes(len) as usize;
1791    if len > MAX_FRAME_LEN {
1792        return Err(Error::Protocol("frame exceeds maximum length".to_string()));
1793    }
1794
1795    let mut bytes = vec![0; len];
1796    recv.read_exact(&mut bytes).await?;
1797    serde_json::from_slice(&bytes).map_err(|error| Error::Protocol(error.to_string()))
1798}
1799
1800fn negotiate_direct_protocol(hello: &DirectProtocolHello) -> DirectProtocolResponse {
1801    if hello.min_protocol_version > NATIVE_PROTOCOL_VERSION {
1802        return DirectProtocolResponse::reject(format!(
1803            "peer requires protocol version {}, local max is {}",
1804            hello.min_protocol_version, NATIVE_PROTOCOL_VERSION
1805        ));
1806    }
1807
1808    if hello.protocol_version < MIN_NATIVE_PROTOCOL_VERSION {
1809        return DirectProtocolResponse::reject(format!(
1810            "peer max protocol version {} is below local minimum {}",
1811            hello.protocol_version, MIN_NATIVE_PROTOCOL_VERSION
1812        ));
1813    }
1814
1815    DirectProtocolResponse::accept(NATIVE_PROTOCOL_VERSION.min(hello.protocol_version))
1816}
1817
1818#[derive(Debug, Clone)]
1819struct ManifestSource {
1820    source_path: PathBuf,
1821    relative_path: PathBuf,
1822}
1823
1824fn collect_manifest_sources(paths: &[PathBuf]) -> Result<Vec<ManifestSource>> {
1825    let mut sources = Vec::new();
1826    for path in paths {
1827        let metadata = std::fs::symlink_metadata(path).map_err(|source| Error::IoPath {
1828            path: path.clone(),
1829            source,
1830        })?;
1831        if metadata.file_type().is_symlink() {
1832            return Err(Error::InvalidInput(format!(
1833                "{} is a symlink; symlink transfers are not supported",
1834                display_path(path)
1835            )));
1836        }
1837
1838        let root_name = path
1839            .file_name()
1840            .ok_or_else(|| Error::InvalidInput(format!("{} has no file name", display_path(path))))?
1841            .to_os_string();
1842        let relative_path = PathBuf::from(root_name);
1843
1844        if metadata.is_dir() {
1845            sources.push(ManifestSource {
1846                source_path: path.clone(),
1847                relative_path: relative_path.clone(),
1848            });
1849            collect_dir_sources(path, &relative_path, &mut sources)?;
1850        } else if metadata.is_file() {
1851            sources.push(ManifestSource {
1852                source_path: path.clone(),
1853                relative_path,
1854            });
1855        } else {
1856            return Err(Error::InvalidInput(format!(
1857                "{} is not a regular file or directory",
1858                display_path(path)
1859            )));
1860        }
1861    }
1862
1863    Ok(sources)
1864}
1865
1866fn collect_dir_sources(
1867    dir: &Path,
1868    relative_dir: &Path,
1869    sources: &mut Vec<ManifestSource>,
1870) -> Result<()> {
1871    for entry in std::fs::read_dir(dir).map_err(|source| Error::IoPath {
1872        path: dir.to_path_buf(),
1873        source,
1874    })? {
1875        let entry = entry.map_err(|source| Error::IoPath {
1876            path: dir.to_path_buf(),
1877            source,
1878        })?;
1879        let path = entry.path();
1880        let metadata = std::fs::symlink_metadata(&path).map_err(|source| Error::IoPath {
1881            path: path.clone(),
1882            source,
1883        })?;
1884        if metadata.file_type().is_symlink() {
1885            return Err(Error::InvalidInput(format!(
1886                "{} is a symlink; symlink transfers are not supported",
1887                display_path(&path)
1888            )));
1889        }
1890
1891        let relative_path = relative_dir.join(entry.file_name());
1892        if metadata.is_dir() {
1893            sources.push(ManifestSource {
1894                source_path: path.clone(),
1895                relative_path: relative_path.clone(),
1896            });
1897            collect_dir_sources(&path, &relative_path, sources)?;
1898        } else if metadata.is_file() {
1899            sources.push(ManifestSource {
1900                source_path: path,
1901                relative_path,
1902            });
1903        }
1904    }
1905
1906    Ok(())
1907}
1908
1909fn manifest_source_map(paths: &[PathBuf]) -> Result<BTreeMap<PathBuf, PathBuf>> {
1910    Ok(collect_manifest_sources(paths)?
1911        .into_iter()
1912        .filter_map(|source| {
1913            let metadata = std::fs::metadata(&source.source_path).ok()?;
1914            metadata
1915                .is_file()
1916                .then_some((source.relative_path, source.source_path))
1917        })
1918        .collect())
1919}
1920
1921async fn build_manifest_entry(source: ManifestSource) -> Result<ManifestEntry> {
1922    validate_relative_transfer_path(&source.relative_path)?;
1923    let metadata = tokio::fs::metadata(&source.source_path)
1924        .await
1925        .map_err(|source_error| Error::IoPath {
1926            path: source.source_path.clone(),
1927            source: source_error,
1928        })?;
1929    let unix_mode = unix_mode(&metadata);
1930    let modified_unix_ms = modified_unix_ms(&metadata);
1931
1932    if metadata.is_dir() {
1933        return Ok(ManifestEntry {
1934            relative_path: source.relative_path,
1935            kind: ManifestEntryKind::Directory,
1936            size: 0,
1937            blake3: None,
1938            chunks: Vec::new(),
1939            unix_mode,
1940            modified_unix_ms,
1941        });
1942    }
1943
1944    let (blake3, chunks) = hash_file_with_chunks(&source.source_path, DEFAULT_CHUNK_SIZE).await?;
1945    Ok(ManifestEntry {
1946        relative_path: source.relative_path,
1947        kind: ManifestEntryKind::File,
1948        size: metadata.len(),
1949        blake3: Some(blake3),
1950        chunks,
1951        unix_mode,
1952        modified_unix_ms,
1953    })
1954}
1955
1956fn validate_manifest(manifest: &TransferManifest) -> Result<()> {
1957    if manifest.version != MANIFEST_VERSION {
1958        return Err(Error::Protocol(format!(
1959            "unsupported manifest version {}",
1960            manifest.version
1961        )));
1962    }
1963    if manifest.chunk_size == 0 {
1964        return Err(Error::Protocol("manifest chunk size is zero".to_string()));
1965    }
1966
1967    let mut seen = BTreeSet::new();
1968    for entry in &manifest.entries {
1969        validate_relative_transfer_path(&entry.relative_path)?;
1970        if !seen.insert(entry.relative_path.clone()) {
1971            return Err(Error::Protocol(format!(
1972                "duplicate manifest path: {}",
1973                display_path(&entry.relative_path)
1974            )));
1975        }
1976        if entry.kind == ManifestEntryKind::File && entry.blake3.is_none() {
1977            return Err(Error::Protocol(format!(
1978                "file manifest entry is missing BLAKE3: {}",
1979                display_path(&entry.relative_path)
1980            )));
1981        }
1982    }
1983
1984    Ok(())
1985}
1986
1987fn validate_relative_transfer_path(path: &Path) -> Result<()> {
1988    if path.as_os_str().is_empty() || path.is_absolute() {
1989        return Err(Error::InvalidInput(format!(
1990            "unsafe transfer path: {}",
1991            display_path(path)
1992        )));
1993    }
1994
1995    let mut normal_components = 0usize;
1996    for component in path.components() {
1997        match component {
1998            Component::Normal(value) => {
1999                let Some(name) = value.to_str() else {
2000                    return Err(Error::InvalidInput(format!(
2001                        "transfer path must be UTF-8: {}",
2002                        display_path(path)
2003                    )));
2004                };
2005                validate_path_component(name, path)?;
2006                normal_components += 1;
2007            }
2008            Component::CurDir
2009            | Component::ParentDir
2010            | Component::RootDir
2011            | Component::Prefix(_) => {
2012                return Err(Error::InvalidInput(format!(
2013                    "unsafe transfer path: {}",
2014                    display_path(path)
2015                )));
2016            }
2017        }
2018    }
2019
2020    if normal_components == 0 {
2021        return Err(Error::InvalidInput(format!(
2022            "unsafe transfer path: {}",
2023            display_path(path)
2024        )));
2025    }
2026
2027    Ok(())
2028}
2029
2030fn validate_path_component(component: &str, full_path: &Path) -> Result<()> {
2031    if component.is_empty()
2032        || component.contains('\\')
2033        || component.contains('/')
2034        || component.contains(':')
2035        || component.ends_with(' ')
2036        || component.ends_with('.')
2037        || is_windows_reserved_name(component)
2038    {
2039        return Err(Error::InvalidInput(format!(
2040            "unsafe transfer path: {}",
2041            display_path(full_path)
2042        )));
2043    }
2044
2045    Ok(())
2046}
2047
2048fn is_windows_reserved_name(component: &str) -> bool {
2049    let stem = component
2050        .split('.')
2051        .next()
2052        .unwrap_or(component)
2053        .trim_end_matches([' ', '.'])
2054        .to_ascii_uppercase();
2055    matches!(
2056        stem.as_str(),
2057        "CON"
2058            | "PRN"
2059            | "AUX"
2060            | "NUL"
2061            | "COM1"
2062            | "COM2"
2063            | "COM3"
2064            | "COM4"
2065            | "COM5"
2066            | "COM6"
2067            | "COM7"
2068            | "COM8"
2069            | "COM9"
2070            | "LPT1"
2071            | "LPT2"
2072            | "LPT3"
2073            | "LPT4"
2074            | "LPT5"
2075            | "LPT6"
2076            | "LPT7"
2077            | "LPT8"
2078            | "LPT9"
2079    )
2080}
2081
2082async fn hash_file(path: &Path) -> Result<String> {
2083    hash_file_with_chunks(path, DEFAULT_CHUNK_SIZE)
2084        .await
2085        .map(|(hash, _)| hash)
2086}
2087
2088async fn hash_file_with_chunks(path: &Path, chunk_size: u64) -> Result<(String, Vec<String>)> {
2089    let mut file = tokio::fs::File::open(path)
2090        .await
2091        .map_err(|source| Error::IoPath {
2092            path: path.to_path_buf(),
2093            source,
2094        })?;
2095    let mut hasher = blake3::Hasher::new();
2096    let mut chunk_hasher = blake3::Hasher::new();
2097    let mut chunk_bytes = 0u64;
2098    let mut chunks = Vec::new();
2099    let mut buffer = vec![0; IO_BUFFER_SIZE];
2100
2101    loop {
2102        let read = file
2103            .read(&mut buffer)
2104            .await
2105            .map_err(|source| Error::IoPath {
2106                path: path.to_path_buf(),
2107                source,
2108            })?;
2109        if read == 0 {
2110            break;
2111        }
2112        hasher.update(&buffer[..read]);
2113
2114        let mut remaining = &buffer[..read];
2115        while !remaining.is_empty() {
2116            let take = remaining.len().min((chunk_size - chunk_bytes) as usize);
2117            chunk_hasher.update(&remaining[..take]);
2118            chunk_bytes += take as u64;
2119            remaining = &remaining[take..];
2120            if chunk_bytes == chunk_size {
2121                chunks.push(chunk_hasher.finalize().to_hex().to_string());
2122                chunk_hasher = blake3::Hasher::new();
2123                chunk_bytes = 0;
2124            }
2125        }
2126    }
2127
2128    if chunk_bytes > 0 {
2129        chunks.push(chunk_hasher.finalize().to_hex().to_string());
2130    }
2131
2132    Ok((hasher.finalize().to_hex().to_string(), chunks))
2133}
2134
2135async fn verified_prefix_offset(
2136    path: &Path,
2137    entry: &ManifestEntry,
2138    existing_len: u64,
2139    chunk_size: u64,
2140) -> Result<u64> {
2141    if existing_len == 0 || entry.chunks.is_empty() {
2142        return Ok(0);
2143    }
2144
2145    let chunks_to_check = (existing_len / chunk_size).min(entry.chunks.len() as u64) as usize;
2146    if chunks_to_check == 0 {
2147        return Ok(0);
2148    }
2149
2150    let mut file = tokio::fs::File::open(path)
2151        .await
2152        .map_err(|source| Error::IoPath {
2153            path: path.to_path_buf(),
2154            source,
2155        })?;
2156    let mut buffer = vec![0; IO_BUFFER_SIZE];
2157    let mut verified_chunks = 0usize;
2158
2159    for expected in entry.chunks.iter().take(chunks_to_check) {
2160        let mut remaining = chunk_size;
2161        let mut hasher = blake3::Hasher::new();
2162        while remaining > 0 {
2163            let read_size = buffer.len().min(remaining as usize);
2164            file.read_exact(&mut buffer[..read_size])
2165                .await
2166                .map_err(|source| Error::IoPath {
2167                    path: path.to_path_buf(),
2168                    source,
2169                })?;
2170            hasher.update(&buffer[..read_size]);
2171            remaining -= read_size as u64;
2172        }
2173
2174        if hasher.finalize().to_hex().as_str() != expected {
2175            break;
2176        }
2177        verified_chunks += 1;
2178    }
2179
2180    Ok(verified_chunks as u64 * chunk_size)
2181}
2182
2183fn summary_from_files(
2184    manifest: &TransferManifest,
2185    files: Vec<TransferFileSummary>,
2186) -> Result<TransferSummary> {
2187    let bytes = manifest.total_file_bytes();
2188    let blake3 = if files.len() == 1 {
2189        files[0].blake3.clone()
2190    } else {
2191        manifest.digest()?
2192    };
2193    let file_name = if files.len() == 1 {
2194        display_path(&files[0].relative_path)
2195    } else {
2196        format!("{} files", files.len())
2197    };
2198    let path = files
2199        .first()
2200        .map(|file| file.path.clone())
2201        .unwrap_or_default();
2202
2203    Ok(TransferSummary {
2204        path,
2205        file_name,
2206        bytes,
2207        blake3,
2208        files,
2209    })
2210}
2211
2212fn temp_path_for(final_path: &Path) -> PathBuf {
2213    let file_name = final_path
2214        .file_name()
2215        .and_then(|name| name.to_str())
2216        .unwrap_or("transfer");
2217    final_path.with_file_name(format!(".{file_name}.{}.ferry-part", Uuid::new_v4()))
2218}
2219
2220fn modified_unix_ms(metadata: &Metadata) -> Option<i128> {
2221    system_time_unix_ms(metadata.modified().ok()?)
2222}
2223
2224fn system_time_unix_ms(time: SystemTime) -> Option<i128> {
2225    match time.duration_since(UNIX_EPOCH) {
2226        Ok(duration) => Some(duration.as_millis() as i128),
2227        Err(error) => Some(-(error.duration().as_millis() as i128)),
2228    }
2229}
2230
2231#[cfg(unix)]
2232fn unix_mode(metadata: &Metadata) -> Option<u32> {
2233    use std::os::unix::fs::PermissionsExt;
2234
2235    Some(metadata.permissions().mode())
2236}
2237
2238#[cfg(not(unix))]
2239fn unix_mode(_metadata: &Metadata) -> Option<u32> {
2240    None
2241}
2242
2243fn insecure_client_config() -> Result<quinn::ClientConfig> {
2244    let crypto = rustls::ClientConfig::builder()
2245        .dangerous()
2246        .with_custom_certificate_verifier(SkipServerVerification::new())
2247        .with_no_client_auth();
2248
2249    Ok(quinn::ClientConfig::new(Arc::new(
2250        QuicClientConfig::try_from(crypto)
2251            .map_err(|error| Error::CryptoConfig(error.to_string()))?,
2252    )))
2253}
2254
2255#[derive(Debug)]
2256struct SkipServerVerification(Arc<CryptoProvider>);
2257
2258impl SkipServerVerification {
2259    fn new() -> Arc<Self> {
2260        Arc::new(Self(Arc::new(rustls::crypto::ring::default_provider())))
2261    }
2262}
2263
2264impl ServerCertVerifier for SkipServerVerification {
2265    fn verify_server_cert(
2266        &self,
2267        _end_entity: &CertificateDer<'_>,
2268        _intermediates: &[CertificateDer<'_>],
2269        _server_name: &ServerName<'_>,
2270        _ocsp: &[u8],
2271        _now: UnixTime,
2272    ) -> std::result::Result<ServerCertVerified, rustls::Error> {
2273        Ok(ServerCertVerified::assertion())
2274    }
2275
2276    fn verify_tls12_signature(
2277        &self,
2278        message: &[u8],
2279        cert: &CertificateDer<'_>,
2280        dss: &DigitallySignedStruct,
2281    ) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
2282        verify_tls12_signature(
2283            message,
2284            cert,
2285            dss,
2286            &self.0.signature_verification_algorithms,
2287        )
2288    }
2289
2290    fn verify_tls13_signature(
2291        &self,
2292        message: &[u8],
2293        cert: &CertificateDer<'_>,
2294        dss: &DigitallySignedStruct,
2295    ) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
2296        verify_tls13_signature(
2297            message,
2298            cert,
2299            dss,
2300            &self.0.signature_verification_algorithms,
2301        )
2302    }
2303
2304    fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
2305        self.0.signature_verification_algorithms.supported_schemes()
2306    }
2307}
2308
2309fn ensure_rustls_provider() {
2310    let _ = rustls::crypto::ring::default_provider().install_default();
2311}
2312
2313pub fn config_dir() -> Result<PathBuf> {
2314    let base_dirs = BaseDirs::new().ok_or(Error::ConfigDirUnavailable)?;
2315    Ok(base_dirs.config_dir().join(CONFIG_DIR_NAME))
2316}
2317
2318fn default_alias() -> String {
2319    std::env::var("HOSTNAME")
2320        .or_else(|_| std::env::var("COMPUTERNAME"))
2321        .ok()
2322        .filter(|value| !value.trim().is_empty())
2323        .unwrap_or_else(|| "fileferry-device".to_string())
2324}
2325
2326fn expand_home_path(path: &str) -> Result<PathBuf> {
2327    if path == "~" {
2328        return Ok(BaseDirs::new()
2329            .ok_or(Error::ConfigDirUnavailable)?
2330            .home_dir()
2331            .to_path_buf());
2332    }
2333
2334    if let Some(rest) = path.strip_prefix("~/") {
2335        return Ok(BaseDirs::new()
2336            .ok_or(Error::ConfigDirUnavailable)?
2337            .home_dir()
2338            .join(rest));
2339    }
2340
2341    Ok(PathBuf::from(path))
2342}
2343
2344fn normalized_fingerprint(fingerprint: String) -> Result<String> {
2345    let fingerprint = fingerprint.trim().to_ascii_lowercase();
2346    if fingerprint.len() != 64 || !fingerprint.chars().all(|char| char.is_ascii_hexdigit()) {
2347        return Err(Error::InvalidInput(
2348            "peer fingerprint must be a full 64-character hex BLAKE3 fingerprint".to_string(),
2349        ));
2350    }
2351
2352    Ok(fingerprint)
2353}
2354
2355fn write_toml_file<T: Serialize>(path: &Path, value: &T) -> Result<()> {
2356    let bytes =
2357        toml::to_string_pretty(value).map_err(|error| Error::ConfigSerialize(error.to_string()))?;
2358    if let Some(parent) = path.parent() {
2359        std::fs::create_dir_all(parent).map_err(|source| Error::IoPath {
2360            path: parent.to_path_buf(),
2361            source,
2362        })?;
2363    }
2364
2365    let temp_path = path.with_extension(format!("tmp-{}", Uuid::new_v4()));
2366    std::fs::write(&temp_path, bytes).map_err(|source| Error::IoPath {
2367        path: temp_path.clone(),
2368        source,
2369    })?;
2370    std::fs::rename(&temp_path, path).map_err(|source| Error::IoPath {
2371        path: path.to_path_buf(),
2372        source,
2373    })?;
2374    Ok(())
2375}
2376
2377fn write_private_file(path: &Path, bytes: &[u8]) -> Result<()> {
2378    std::fs::write(path, bytes).map_err(|source| Error::IoPath {
2379        path: path.to_path_buf(),
2380        source,
2381    })?;
2382
2383    #[cfg(unix)]
2384    {
2385        use std::os::unix::fs::PermissionsExt;
2386
2387        let mut permissions = std::fs::metadata(path)
2388            .map_err(|source| Error::IoPath {
2389                path: path.to_path_buf(),
2390                source,
2391            })?
2392            .permissions();
2393        permissions.set_mode(0o600);
2394        std::fs::set_permissions(path, permissions).map_err(|source| Error::IoPath {
2395            path: path.to_path_buf(),
2396            source,
2397        })?;
2398    }
2399
2400    Ok(())
2401}
2402
2403#[cfg(test)]
2404mod tests {
2405    use super::*;
2406    use std::sync::{Arc, Mutex};
2407
2408    #[test]
2409    fn direct_peer_parses_socket_address() {
2410        let peer = DirectPeer::parse("127.0.0.1:53318").expect("address parses");
2411
2412        assert_eq!(peer.address().to_string(), "127.0.0.1:53318");
2413    }
2414
2415    #[test]
2416    fn direct_peer_rejects_missing_port() {
2417        assert!(DirectPeer::parse("127.0.0.1").is_err());
2418    }
2419
2420    #[test]
2421    fn send_request_requires_at_least_one_path() {
2422        let peer = DirectPeer::parse("127.0.0.1:53318").expect("address parses");
2423
2424        assert!(SendRequest::new(peer, Vec::new()).is_err());
2425    }
2426
2427    #[test]
2428    fn validate_send_paths_rejects_empty_slice() {
2429        assert!(validate_send_paths(&[]).is_err());
2430    }
2431
2432    #[tokio::test]
2433    async fn manifest_walks_directory_and_serializes_entries() {
2434        let temp = tempfile::tempdir().expect("tempdir");
2435        let root = temp.path().join("bundle");
2436        let nested = root.join("nested");
2437        tokio::fs::create_dir_all(&nested).await.expect("dirs");
2438        tokio::fs::write(root.join("a.txt"), b"alpha")
2439            .await
2440            .expect("file a");
2441        tokio::fs::write(nested.join("b.txt"), b"beta")
2442            .await
2443            .expect("file b");
2444
2445        let manifest = build_manifest(&[root]).await.expect("manifest");
2446        let json = serde_json::to_string(&manifest).expect("manifest serializes");
2447        let decoded: TransferManifest = serde_json::from_str(&json).expect("manifest decodes");
2448
2449        assert_eq!(decoded.version, MANIFEST_VERSION);
2450        assert!(decoded.entries.iter().any(|entry| {
2451            entry.kind == ManifestEntryKind::Directory
2452                && entry.relative_path == Path::new("bundle/nested")
2453        }));
2454        assert!(decoded.entries.iter().any(|entry| {
2455            entry.kind == ManifestEntryKind::File
2456                && entry.relative_path == Path::new("bundle/nested/b.txt")
2457                && entry.size == 4
2458                && entry.blake3.is_some()
2459        }));
2460    }
2461
2462    #[test]
2463    fn safe_destination_path_rejects_unsafe_paths() {
2464        let dest = Path::new("/tmp/ferry-dest");
2465
2466        assert!(safe_destination_path(dest, Path::new("../escape.txt")).is_err());
2467        assert!(safe_destination_path(dest, Path::new("/absolute.txt")).is_err());
2468        assert!(safe_destination_path(dest, Path::new("nested/CON")).is_err());
2469        assert!(safe_destination_path(dest, Path::new("nested\\escape.txt")).is_err());
2470        assert!(safe_destination_path(dest, Path::new("nested/ok.txt")).is_ok());
2471    }
2472
2473    #[test]
2474    fn safe_destination_path_rejects_generated_escape_and_reserved_patterns() {
2475        let dest = Path::new("/tmp/ferry-dest");
2476        let unsafe_components = [
2477            "..",
2478            ".",
2479            "CON",
2480            "con.txt",
2481            "NUL",
2482            "COM1",
2483            "LPT9",
2484            "trailingspace ",
2485            "trailingdot.",
2486            "has:colon",
2487            "has\\backslash",
2488        ];
2489
2490        for component in unsafe_components {
2491            assert!(
2492                safe_destination_path(dest, Path::new(component)).is_err(),
2493                "{component} should be rejected as a top-level path"
2494            );
2495            if component != "." {
2496                assert!(
2497                    safe_destination_path(dest, Path::new("safe").join(component).as_path())
2498                        .is_err(),
2499                    "{component} should be rejected as a nested path"
2500                );
2501            }
2502        }
2503
2504        for component in ["alpha", "alpha-1", "alpha_1", "report.final.txt"] {
2505            let path = safe_destination_path(dest, Path::new("safe").join(component).as_path())
2506                .expect("safe generated path accepted");
2507            assert!(path.starts_with(dest));
2508        }
2509    }
2510
2511    #[tokio::test]
2512    async fn resume_decision_skips_existing_matching_file() {
2513        let temp = tempfile::tempdir().expect("tempdir");
2514        let source = temp.path().join("source.txt");
2515        let dest = temp.path().join("dest");
2516        tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2517        tokio::fs::write(&source, b"same contents")
2518            .await
2519            .expect("source");
2520        tokio::fs::write(dest.join("source.txt"), b"same contents")
2521            .await
2522            .expect("dest file");
2523        let manifest = build_manifest(&[source]).await.expect("manifest");
2524        let entry = manifest.file_entries().next().expect("file entry");
2525
2526        let decision = decide_resume(&dest, entry, manifest.chunk_size)
2527            .await
2528            .expect("decision");
2529
2530        assert_eq!(decision, ResumeDecision::Skip);
2531    }
2532
2533    #[tokio::test]
2534    async fn resume_decision_returns_verified_chunk_offset() {
2535        let temp = tempfile::tempdir().expect("tempdir");
2536        let source = temp.path().join("large.bin");
2537        let dest = temp.path().join("dest");
2538        tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2539        let bytes = vec![42; (DEFAULT_CHUNK_SIZE as usize * 2) + 128];
2540        tokio::fs::write(&source, &bytes).await.expect("source");
2541        tokio::fs::write(
2542            dest.join("large.bin"),
2543            &bytes[..DEFAULT_CHUNK_SIZE as usize + 99],
2544        )
2545        .await
2546        .expect("partial");
2547        let manifest = build_manifest(&[source]).await.expect("manifest");
2548        let entry = manifest.file_entries().next().expect("file entry");
2549
2550        let decision = decide_resume(&dest, entry, manifest.chunk_size)
2551            .await
2552            .expect("decision");
2553
2554        assert_eq!(decision, ResumeDecision::ResumeFrom(DEFAULT_CHUNK_SIZE));
2555    }
2556
2557    #[tokio::test]
2558    async fn resume_decision_rejects_existing_file_with_mismatched_contents() {
2559        let temp = tempfile::tempdir().expect("tempdir");
2560        let source = temp.path().join("source.txt");
2561        let dest = temp.path().join("dest");
2562        tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2563        tokio::fs::write(&source, b"expected contents")
2564            .await
2565            .expect("source");
2566        tokio::fs::write(dest.join("source.txt"), b"different contents")
2567            .await
2568            .expect("conflicting dest file");
2569        let manifest = build_manifest(&[source]).await.expect("manifest");
2570        let entry = manifest.file_entries().next().expect("file entry");
2571
2572        let decision = decide_resume(&dest, entry, manifest.chunk_size)
2573            .await
2574            .expect("decision");
2575
2576        assert!(matches!(decision, ResumeDecision::Conflict(_)));
2577    }
2578
2579    #[tokio::test]
2580    async fn resume_decision_rejects_existing_file_larger_than_manifest_entry() {
2581        let temp = tempfile::tempdir().expect("tempdir");
2582        let source = temp.path().join("source.txt");
2583        let dest = temp.path().join("dest");
2584        tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2585        tokio::fs::write(&source, b"small").await.expect("source");
2586        tokio::fs::write(dest.join("source.txt"), b"larger destination")
2587            .await
2588            .expect("larger dest file");
2589        let manifest = build_manifest(&[source]).await.expect("manifest");
2590        let entry = manifest.file_entries().next().expect("file entry");
2591
2592        let decision = decide_resume(&dest, entry, manifest.chunk_size)
2593            .await
2594            .expect("decision");
2595
2596        assert!(matches!(decision, ResumeDecision::Conflict(_)));
2597    }
2598
2599    #[test]
2600    fn identity_is_loaded_after_generation() {
2601        let temp = tempfile::tempdir().expect("tempdir");
2602        let generated =
2603            NativeIdentity::load_or_generate_in(temp.path()).expect("identity generated");
2604        let loaded = NativeIdentity::load_or_generate_in(temp.path()).expect("identity loaded");
2605
2606        assert_eq!(generated.fingerprint(), loaded.fingerprint());
2607    }
2608
2609    #[test]
2610    fn app_config_round_trips_toml() {
2611        let temp = tempfile::tempdir().expect("tempdir");
2612        let path = temp.path().join("config.toml");
2613        let config = AppConfig {
2614            alias: "desk".to_string(),
2615            ..AppConfig::default()
2616        };
2617
2618        config.save_to_path(&path).expect("config saved");
2619        let loaded = AppConfig::load_or_default_from(&path).expect("config loaded");
2620
2621        assert_eq!(loaded.alias, "desk");
2622        assert_eq!(loaded.listen_port, 53317);
2623        assert!(loaded.trust.require_fingerprint);
2624    }
2625
2626    #[test]
2627    fn app_config_redacts_psk_for_display_output() {
2628        let config = AppConfig {
2629            trust: TrustConfig {
2630                psk: "super-secret-psk".to_string(),
2631                ..TrustConfig::default()
2632            },
2633            ..AppConfig::default()
2634        };
2635
2636        let redacted = config.to_redacted_toml_string().expect("config serializes");
2637
2638        assert!(!redacted.contains("super-secret-psk"));
2639        assert!(redacted.contains(r#"psk = "<redacted>""#));
2640    }
2641
2642    #[test]
2643    fn daemon_config_defaults_from_app_config_and_round_trips_toml() {
2644        let temp = tempfile::tempdir().expect("tempdir");
2645        let path = temp.path().join("daemon.toml");
2646        let app_config = AppConfig {
2647            quic_port: 54444,
2648            download_dir: temp.path().join("downloads").display().to_string(),
2649            ..AppConfig::default()
2650        };
2651
2652        let defaulted =
2653            DaemonConfig::load_or_default_from(&path, &app_config).expect("daemon config");
2654        assert_eq!(
2655            defaulted.listen,
2656            SocketAddr::from(([0, 0, 0, 0], app_config.quic_port))
2657        );
2658        assert_eq!(defaulted.destination, temp.path().join("downloads"));
2659
2660        let config = DaemonConfig {
2661            listen: SocketAddr::from(([127, 0, 0, 1], 53318)),
2662            destination: temp.path().join("daemon-dest"),
2663        };
2664        config.save_to_path(&path).expect("daemon config saved");
2665        let loaded =
2666            DaemonConfig::load_or_default_from(&path, &app_config).expect("daemon config loaded");
2667
2668        assert_eq!(loaded, config);
2669    }
2670
2671    #[test]
2672    fn trust_store_load_save_and_forget_round_trip() {
2673        let temp = tempfile::tempdir().expect("tempdir");
2674        let path = temp.path().join("known_peers.toml");
2675        let fingerprint = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
2676        let mut store = TrustStore::default();
2677
2678        store
2679            .trust_fingerprint(fingerprint)
2680            .expect("fingerprint trusted");
2681        store.save_to_path(&path).expect("trust store saved");
2682        let mut loaded = TrustStore::load_or_default_from(&path).expect("trust store loaded");
2683
2684        assert_eq!(loaded.records().count(), 1);
2685        assert_eq!(loaded.peers[fingerprint].trust_state, TrustState::Trusted);
2686        assert!(loaded.forget(fingerprint));
2687        assert_eq!(loaded.records().count(), 0);
2688    }
2689
2690    #[test]
2691    fn trust_store_rejects_short_fingerprint_without_discovery_lookup() {
2692        let mut store = TrustStore::default();
2693
2694        assert!(store.trust_fingerprint("9a4f2c").is_err());
2695    }
2696
2697    #[test]
2698    fn transfer_events_serialize_with_stable_event_names() {
2699        let event = TransferEvent::Progress {
2700            direction: TransferDirection::Send,
2701            file_name: "bundle/a.txt".to_string(),
2702            bytes_done: 5,
2703            bytes_total: 9,
2704        };
2705
2706        let json = serde_json::to_value(&event).expect("event serializes");
2707
2708        assert_eq!(json["event"], "progress");
2709        assert_eq!(json["direction"], "send");
2710        assert_eq!(json["file_name"], "bundle/a.txt");
2711        assert_eq!(json["bytes_done"], 5);
2712        assert_eq!(json["bytes_total"], 9);
2713    }
2714
2715    #[test]
2716    fn direct_protocol_hello_has_stable_golden_json() {
2717        let hello = DirectProtocolHello {
2718            protocol_version: 1,
2719            min_protocol_version: 1,
2720            client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2721                .to_string(),
2722        };
2723
2724        let json = serde_json::to_string(&hello).expect("hello serializes");
2725
2726        assert_eq!(
2727            json,
2728            r#"{"protocol_version":1,"min_protocol_version":1,"client_fingerprint":"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"}"#
2729        );
2730    }
2731
2732    #[test]
2733    fn direct_protocol_response_has_stable_golden_json() {
2734        let response = DirectProtocolResponse::accept(1);
2735
2736        let json = serde_json::to_string(&response).expect("response serializes");
2737
2738        assert_eq!(
2739            json,
2740            r#"{"accepted":true,"protocol_version":1,"error":null}"#
2741        );
2742    }
2743
2744    #[test]
2745    fn direct_protocol_negotiates_supported_overlap() {
2746        let hello = DirectProtocolHello {
2747            protocol_version: 3,
2748            min_protocol_version: 1,
2749            client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2750                .to_string(),
2751        };
2752
2753        let response = negotiate_direct_protocol(&hello);
2754
2755        assert_eq!(response, DirectProtocolResponse::accept(1));
2756    }
2757
2758    #[test]
2759    fn direct_protocol_rejects_incompatible_versions() {
2760        let too_new = DirectProtocolHello {
2761            protocol_version: 3,
2762            min_protocol_version: 2,
2763            client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2764                .to_string(),
2765        };
2766        let too_old = DirectProtocolHello {
2767            protocol_version: 0,
2768            min_protocol_version: 0,
2769            client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2770                .to_string(),
2771        };
2772
2773        assert!(!negotiate_direct_protocol(&too_new).accepted);
2774        assert!(!negotiate_direct_protocol(&too_old).accepted);
2775    }
2776
2777    #[test]
2778    fn transfer_manifest_has_stable_golden_json() {
2779        let manifest = TransferManifest {
2780            version: MANIFEST_VERSION,
2781            session_id: Uuid::parse_str("00000000-0000-0000-0000-000000000001")
2782                .expect("uuid parses"),
2783            chunk_size: DEFAULT_CHUNK_SIZE,
2784            entries: vec![ManifestEntry {
2785                relative_path: PathBuf::from("bundle/a.txt"),
2786                kind: ManifestEntryKind::File,
2787                size: 5,
2788                blake3: Some(
2789                    "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef".to_string(),
2790                ),
2791                chunks: vec![
2792                    "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789".to_string(),
2793                ],
2794                unix_mode: Some(0o100644),
2795                modified_unix_ms: Some(1_700_000_000_000),
2796            }],
2797        };
2798
2799        let json = serde_json::to_string(&manifest).expect("manifest serializes");
2800
2801        assert_eq!(
2802            json,
2803            r#"{"version":1,"session_id":"00000000-0000-0000-0000-000000000001","chunk_size":1048576,"entries":[{"relative_path":"bundle/a.txt","kind":"file","size":5,"blake3":"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef","chunks":["abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789"],"unix_mode":33188,"modified_unix_ms":1700000000000}]}"#
2804        );
2805    }
2806
2807    #[test]
2808    fn peer_registry_merges_observations_by_fingerprint() {
2809        let mut registry = PeerRegistry::new();
2810        let fingerprint = "abcdef1234567890";
2811        let first = PeerObservation::new(
2812            fingerprint,
2813            SocketAddr::from(([192, 168, 1, 10], 53318)),
2814            100,
2815        )
2816        .with_alias("desk")
2817        .with_hostname("desk.local")
2818        .with_transport("quic");
2819        let second = PeerObservation::new(
2820            fingerprint,
2821            SocketAddr::from(([192, 168, 1, 11], 53318)),
2822            200,
2823        )
2824        .with_alias("workstation")
2825        .with_transport("quic");
2826
2827        registry.observe(first).expect("first observation");
2828        registry.observe(second).expect("second observation");
2829        let record = registry
2830            .get_by_fingerprint(fingerprint)
2831            .expect("record by fingerprint");
2832
2833        assert_eq!(registry.records().count(), 1);
2834        assert!(record.aliases.contains("desk"));
2835        assert!(record.aliases.contains("workstation"));
2836        assert!(record.hostnames.contains("desk.local"));
2837        assert_eq!(record.addresses.len(), 2);
2838        assert_eq!(record.first_seen_unix_ms, 100);
2839        assert_eq!(record.last_seen_unix_ms, 200);
2840    }
2841
2842    #[test]
2843    fn peer_registry_looks_up_unique_fingerprint_prefix_alias_and_hostname() {
2844        let mut registry = PeerRegistry::new();
2845        registry
2846            .observe(
2847                PeerObservation::new(
2848                    "abcdef1234567890",
2849                    SocketAddr::from(([192, 168, 1, 10], 53318)),
2850                    100,
2851                )
2852                .with_alias("desk")
2853                .with_hostname("desk.local"),
2854            )
2855            .expect("first observation");
2856        registry
2857            .observe(
2858                PeerObservation::new(
2859                    "123456abcdef7890",
2860                    SocketAddr::from(([192, 168, 1, 20], 53318)),
2861                    100,
2862                )
2863                .with_alias("laptop"),
2864            )
2865            .expect("second observation");
2866
2867        assert_eq!(
2868            registry
2869                .lookup("abcdef")
2870                .map(|record| record.fingerprint.as_str()),
2871            Some("abcdef1234567890")
2872        );
2873        assert_eq!(
2874            registry
2875                .lookup("desk")
2876                .map(|record| record.fingerprint.as_str()),
2877            Some("abcdef1234567890")
2878        );
2879        assert_eq!(
2880            registry
2881                .lookup("desk.local")
2882                .map(|record| record.fingerprint.as_str()),
2883            Some("abcdef1234567890")
2884        );
2885        assert!(registry.lookup("missing").is_none());
2886    }
2887
2888    #[test]
2889    fn peer_registry_rejects_ambiguous_lookup_hints() {
2890        let mut registry = PeerRegistry::new();
2891        registry
2892            .observe(
2893                PeerObservation::new(
2894                    "abcdef1234567890",
2895                    SocketAddr::from(([192, 168, 1, 10], 53318)),
2896                    100,
2897                )
2898                .with_alias("desk"),
2899            )
2900            .expect("first observation");
2901        registry
2902            .observe(
2903                PeerObservation::new(
2904                    "abcdef9999999999",
2905                    SocketAddr::from(([192, 168, 1, 11], 53318)),
2906                    100,
2907                )
2908                .with_alias("desk"),
2909            )
2910            .expect("second observation");
2911
2912        assert!(registry.lookup("abcdef").is_none());
2913        assert!(registry.lookup("desk").is_none());
2914        assert_eq!(
2915            registry
2916                .lookup("abcdef1234567890")
2917                .map(|record| record.fingerprint.as_str()),
2918            Some("abcdef1234567890")
2919        );
2920        match registry.lookup_detail("desk") {
2921            PeerLookup::Ambiguous(records) => assert_eq!(records.len(), 2),
2922            other => panic!("expected ambiguous duplicate-alias lookup, got {other:?}"),
2923        }
2924        assert!(matches!(
2925            registry.lookup_detail("missing"),
2926            PeerLookup::Missing
2927        ));
2928    }
2929
2930    #[test]
2931    fn native_announcement_builds_expected_txt_properties() {
2932        let announcement = NativeAnnouncement {
2933            alias: "Stephen MBP".to_string(),
2934            fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2935                .to_string(),
2936            quic_port: 53318,
2937            listen_port: Some(53317),
2938        };
2939
2940        let service = announcement.service_info().expect("service info");
2941
2942        assert_eq!(service.get_type(), NATIVE_SERVICE_TYPE);
2943        assert!(
2944            service
2945                .get_fullname()
2946                .starts_with("stephen-mbp-0123456789ab.")
2947        );
2948        assert_eq!(service.get_property_val_str("pv"), Some("1"));
2949        assert_eq!(service.get_property_val_str("minpv"), Some("1"));
2950        assert_eq!(
2951            service.get_property_val_str("fp"),
2952            Some("0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")
2953        );
2954        assert_eq!(service.get_property_val_str("alias"), Some("Stephen MBP"));
2955        assert_eq!(service.get_property_val_str("transports"), Some("quic"));
2956        assert_eq!(service.get_property_val_str("quic_port"), Some("53318"));
2957        assert_eq!(service.get_property_val_str("listen_port"), Some("53317"));
2958    }
2959
2960    #[test]
2961    fn resolved_native_service_merges_into_registry() {
2962        let properties = [
2963            ("pv", "1"),
2964            ("minpv", "1"),
2965            (
2966                "fp",
2967                "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
2968            ),
2969            ("alias", "desk"),
2970            ("transports", "quic"),
2971            ("quic_port", "53318"),
2972        ];
2973        let service = ServiceInfo::new(
2974            NATIVE_SERVICE_TYPE,
2975            "desk-0123456789ab",
2976            "desk.local.",
2977            "192.168.1.42",
2978            53318,
2979            &properties[..],
2980        )
2981        .expect("service info")
2982        .as_resolved_service();
2983        let mut registry = PeerRegistry::new();
2984
2985        observe_resolved_service(&mut registry, &service).expect("observation");
2986
2987        let record = registry.lookup("desk").expect("record by alias");
2988        assert_eq!(
2989            record.fingerprint,
2990            "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2991        );
2992        assert_eq!(
2993            record.preferred_quic_address(),
2994            Some(SocketAddr::from(([192, 168, 1, 42], 53318)))
2995        );
2996    }
2997
2998    #[test]
2999    fn resolved_native_service_ignores_incompatible_protocol_ranges() {
3000        let properties = [
3001            ("pv", "3"),
3002            ("minpv", "2"),
3003            (
3004                "fp",
3005                "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
3006            ),
3007            ("alias", "desk"),
3008            ("transports", "quic"),
3009            ("quic_port", "53318"),
3010        ];
3011        let service = ServiceInfo::new(
3012            NATIVE_SERVICE_TYPE,
3013            "desk-0123456789ab",
3014            "desk.local.",
3015            "192.168.1.42",
3016            53318,
3017            &properties[..],
3018        )
3019        .expect("service info")
3020        .as_resolved_service();
3021        let mut registry = PeerRegistry::new();
3022
3023        observe_resolved_service(&mut registry, &service).expect("observation");
3024
3025        assert_eq!(registry.records().count(), 0);
3026    }
3027
3028    #[tokio::test]
3029    async fn direct_quic_loopback_sends_one_file() {
3030        let source_dir = tempfile::tempdir().expect("source tempdir");
3031        let dest_dir = tempfile::tempdir().expect("dest tempdir");
3032        let identity_dir = tempfile::tempdir().expect("identity tempdir");
3033        let file_path = source_dir.path().join("hello.txt");
3034        tokio::fs::write(&file_path, b"hello from ferry")
3035            .await
3036            .expect("source file written");
3037
3038        let identity =
3039            NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3040        let recv_identity = identity.clone();
3041        let reserved_socket =
3042            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3043        let recv_addr = reserved_socket.local_addr().expect("local addr");
3044        drop(reserved_socket);
3045        let receive_progress = Arc::new(Mutex::new(Vec::new()));
3046        let receive_progress_log = receive_progress.clone();
3047        let dest_path = dest_dir.path().to_path_buf();
3048        let server = tokio::spawn(async move {
3049            receive_direct_file(
3050                &ReceiveRequest::new(recv_addr),
3051                dest_path,
3052                &recv_identity,
3053                |event| {
3054                    receive_progress_log
3055                        .lock()
3056                        .expect("progress lock")
3057                        .push(event);
3058                },
3059            )
3060            .await
3061        });
3062
3063        tokio::time::sleep(Duration::from_millis(100)).await;
3064
3065        let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3066        let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
3067        let send_progress = Arc::new(Mutex::new(Vec::new()));
3068        let send_progress_log = send_progress.clone();
3069        let send_summary = send_direct_file(&send_request, &identity, |event| {
3070            send_progress_log.lock().expect("progress lock").push(event);
3071        })
3072        .await
3073        .expect("file sent");
3074        let receive_summary = server.await.expect("server joined").expect("file received");
3075
3076        let received = tokio::fs::read(dest_dir.path().join("hello.txt"))
3077            .await
3078            .expect("received file readable");
3079        assert_eq!(received, b"hello from ferry");
3080        assert_eq!(send_summary.bytes, receive_summary.bytes);
3081        assert_eq!(send_summary.blake3, receive_summary.blake3);
3082        let send_progress = send_progress.lock().expect("progress lock");
3083        let receive_progress = receive_progress.lock().expect("progress lock");
3084        assert!(matches!(
3085            send_progress.first(),
3086            Some(TransferEvent::SessionStarted {
3087                direction: TransferDirection::Send,
3088                ..
3089            })
3090        ));
3091        assert!(
3092            send_progress
3093                .iter()
3094                .any(|event| matches!(event, TransferEvent::Progress { .. }))
3095        );
3096        assert!(
3097            send_progress
3098                .iter()
3099                .any(|event| matches!(event, TransferEvent::SessionFinished { .. }))
3100        );
3101        assert!(matches!(
3102            receive_progress.first(),
3103            Some(TransferEvent::SessionStarted {
3104                direction: TransferDirection::Receive,
3105                ..
3106            })
3107        ));
3108        assert!(
3109            receive_progress
3110                .iter()
3111                .any(|event| matches!(event, TransferEvent::Progress { .. }))
3112        );
3113        assert!(
3114            receive_progress
3115                .iter()
3116                .any(|event| matches!(event, TransferEvent::SessionFinished { .. }))
3117        );
3118    }
3119
3120    #[tokio::test]
3121    async fn direct_quic_loopback_sends_directory_and_resumes_partial_file() {
3122        let source_dir = tempfile::tempdir().expect("source tempdir");
3123        let dest_dir = tempfile::tempdir().expect("dest tempdir");
3124        let identity_dir = tempfile::tempdir().expect("identity tempdir");
3125        let bundle = source_dir.path().join("bundle");
3126        let nested = bundle.join("nested");
3127        tokio::fs::create_dir_all(&nested)
3128            .await
3129            .expect("source dirs");
3130        tokio::fs::write(nested.join("small.txt"), b"small")
3131            .await
3132            .expect("small file");
3133        let large_bytes = vec![7; (DEFAULT_CHUNK_SIZE as usize * 2) + 17];
3134        tokio::fs::write(bundle.join("large.bin"), &large_bytes)
3135            .await
3136            .expect("large file");
3137
3138        let dest_bundle = dest_dir.path().join("bundle");
3139        tokio::fs::create_dir_all(&dest_bundle)
3140            .await
3141            .expect("dest bundle");
3142        tokio::fs::write(
3143            dest_bundle.join("large.bin"),
3144            &large_bytes[..DEFAULT_CHUNK_SIZE as usize + 5],
3145        )
3146        .await
3147        .expect("partial large");
3148
3149        let identity =
3150            NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3151        let recv_identity = identity.clone();
3152        let reserved_socket =
3153            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3154        let recv_addr = reserved_socket.local_addr().expect("local addr");
3155        drop(reserved_socket);
3156        let dest_path = dest_dir.path().to_path_buf();
3157        let server = tokio::spawn(async move {
3158            receive_direct_file(
3159                &ReceiveRequest::new(recv_addr),
3160                dest_path,
3161                &recv_identity,
3162                |_| {},
3163            )
3164            .await
3165        });
3166
3167        tokio::time::sleep(Duration::from_millis(100)).await;
3168
3169        let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3170        let send_request = SendRequest::new(peer, vec![bundle]).expect("send request is valid");
3171        let send_summary = send_direct_file(&send_request, &identity, |_| {})
3172            .await
3173            .expect("directory sent");
3174        let receive_summary = server
3175            .await
3176            .expect("server joined")
3177            .expect("directory received");
3178
3179        let received_large = tokio::fs::read(dest_bundle.join("large.bin"))
3180            .await
3181            .expect("large file readable");
3182        let received_small = tokio::fs::read(dest_bundle.join("nested/small.txt"))
3183            .await
3184            .expect("small file readable");
3185        assert_eq!(received_large, large_bytes);
3186        assert_eq!(received_small, b"small");
3187        assert_eq!(send_summary.bytes, receive_summary.bytes);
3188        assert!(
3189            receive_summary
3190                .files
3191                .iter()
3192                .any(|file| file.status == TransferFileStatus::Resumed)
3193        );
3194    }
3195
3196    #[tokio::test]
3197    async fn direct_quic_loopback_cancels_send_and_does_not_finalize_partial_file() {
3198        let source_dir = tempfile::tempdir().expect("source tempdir");
3199        let dest_dir = tempfile::tempdir().expect("dest tempdir");
3200        let identity_dir = tempfile::tempdir().expect("identity tempdir");
3201        let file_path = source_dir.path().join("payload.bin");
3202        tokio::fs::write(&file_path, vec![3; IO_BUFFER_SIZE * 64])
3203            .await
3204            .expect("source file written");
3205
3206        let identity =
3207            NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3208        let recv_identity = identity.clone();
3209        let reserved_socket =
3210            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3211        let recv_addr = reserved_socket.local_addr().expect("local addr");
3212        drop(reserved_socket);
3213
3214        let dest_path = dest_dir.path().to_path_buf();
3215        let server = tokio::spawn(async move {
3216            receive_direct_file(
3217                &ReceiveRequest::new(recv_addr),
3218                dest_path,
3219                &recv_identity,
3220                |_| {},
3221            )
3222            .await
3223        });
3224
3225        tokio::time::sleep(Duration::from_millis(100)).await;
3226
3227        let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3228        let send_request =
3229            SendRequest::new(peer, vec![file_path.clone()]).expect("send request is valid");
3230        let send_control = TransferControl::new();
3231        let cancel_on_progress = send_control.clone();
3232        let send_events = Arc::new(Mutex::new(Vec::new()));
3233        let send_events_log = send_events.clone();
3234        let send_result =
3235            send_direct_file_with_control(&send_request, &identity, send_control, |event| {
3236                if matches!(event, TransferEvent::Progress { .. }) {
3237                    cancel_on_progress.cancel();
3238                }
3239                send_events_log.lock().expect("events lock").push(event);
3240            })
3241            .await;
3242
3243        assert!(matches!(send_result, Err(Error::TransferCancelled)));
3244
3245        let receive_result = tokio::time::timeout(Duration::from_secs(5), server)
3246            .await
3247            .expect("receiver should finish after sender cancellation")
3248            .expect("server joined");
3249        assert!(receive_result.is_err());
3250
3251        assert!(!dest_dir.path().join("payload.bin").exists());
3252        assert!(
3253            send_events
3254                .lock()
3255                .expect("events lock")
3256                .iter()
3257                .any(|event| matches!(event, TransferEvent::SessionCancelled { .. }))
3258        );
3259    }
3260}