Skip to main content

ferry_core/
lib.rs

1pub mod error;
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::fs::Metadata;
5use std::io::SeekFrom;
6use std::net::SocketAddr;
7use std::path::{Component, Path, PathBuf};
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::{Arc, Mutex};
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12use directories::BaseDirs;
13use mdns_sd::{ResolvedService, ServiceDaemon, ServiceEvent, ServiceInfo};
14use quinn::crypto::rustls::QuicClientConfig;
15use rcgen::{CertifiedKey, generate_simple_self_signed};
16use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier};
17use rustls::crypto::{CryptoProvider, verify_tls12_signature, verify_tls13_signature};
18use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer, ServerName, UnixTime};
19use rustls::{DigitallySignedStruct, SignatureScheme};
20use serde::{Deserialize, Serialize};
21use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
22use uuid::Uuid;
23
24pub use error::{Error, Result};
25
26pub const NATIVE_PROTOCOL_VERSION: u16 = 1;
27pub const MIN_NATIVE_PROTOCOL_VERSION: u16 = 1;
28pub const NATIVE_SERVICE_TYPE: &str = "_ferry._udp.local.";
29
30const DIRECT_MAGIC: &[u8; 8] = b"FERRY01\n";
31const MAX_FRAME_LEN: usize = 16 * 1024 * 1024;
32const IO_BUFFER_SIZE: usize = 64 * 1024;
33const MANIFEST_VERSION: u16 = 1;
34const DEFAULT_CHUNK_SIZE: u64 = 1024 * 1024;
35const CONFIG_DIR_NAME: &str = "ferry";
36const PSK_PROOF_CONTEXT: &[u8] = b"fileferry-direct-psk-v1";
37
38#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
39pub struct DirectPeer {
40    address: SocketAddr,
41    expected_fingerprint: Option<String>,
42}
43
44impl DirectPeer {
45    pub fn parse(value: &str) -> Result<Self> {
46        Ok(Self {
47            address: value.parse()?,
48            expected_fingerprint: None,
49        })
50    }
51
52    pub const fn address(&self) -> SocketAddr {
53        self.address
54    }
55
56    pub const fn from_address(address: SocketAddr) -> Self {
57        Self {
58            address,
59            expected_fingerprint: None,
60        }
61    }
62
63    pub fn with_expected_fingerprint(mut self, fingerprint: impl Into<String>) -> Result<Self> {
64        self.expected_fingerprint = Some(normalized_fingerprint(fingerprint.into())?);
65        Ok(self)
66    }
67
68    pub fn expected_fingerprint(&self) -> Option<&str> {
69        self.expected_fingerprint.as_deref()
70    }
71}
72
73#[derive(Clone, PartialEq, Eq, Deserialize)]
74pub struct PskSecret(String);
75
76impl PskSecret {
77    pub fn new(psk: impl Into<String>) -> Result<Self> {
78        validate_psk(psk.into()).map(Self)
79    }
80
81    fn expose_secret(&self) -> &str {
82        &self.0
83    }
84}
85
86impl std::fmt::Debug for PskSecret {
87    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88        formatter.write_str("PskSecret(<redacted>)")
89    }
90}
91
92impl Serialize for PskSecret {
93    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
94    where
95        S: serde::Serializer,
96    {
97        serializer.serialize_str("<redacted>")
98    }
99}
100
101#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
102pub struct SendRequest {
103    peer: DirectPeer,
104    paths: Vec<PathBuf>,
105    #[serde(default, skip_serializing)]
106    psk: Option<PskSecret>,
107}
108
109impl SendRequest {
110    pub fn new(peer: DirectPeer, paths: Vec<PathBuf>) -> Result<Self> {
111        if paths.is_empty() {
112            return Err(Error::InvalidInput(
113                "at least one file path is required".to_string(),
114            ));
115        }
116
117        Ok(Self {
118            peer,
119            paths,
120            psk: None,
121        })
122    }
123
124    pub const fn peer(&self) -> &DirectPeer {
125        &self.peer
126    }
127
128    pub fn paths(&self) -> &[PathBuf] {
129        &self.paths
130    }
131
132    pub fn with_psk(mut self, psk: impl Into<String>) -> Result<Self> {
133        self.psk = Some(PskSecret::new(psk)?);
134        Ok(self)
135    }
136
137    pub fn psk(&self) -> Option<&str> {
138        self.psk.as_ref().map(PskSecret::expose_secret)
139    }
140}
141
142#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
143pub struct ReceiveRequest {
144    listen: SocketAddr,
145    #[serde(default, skip_serializing)]
146    psk: Option<PskSecret>,
147}
148
149impl ReceiveRequest {
150    pub fn new(listen: SocketAddr) -> Self {
151        Self { listen, psk: None }
152    }
153
154    pub const fn listen(&self) -> SocketAddr {
155        self.listen
156    }
157
158    pub fn with_psk(mut self, psk: impl Into<String>) -> Result<Self> {
159        self.psk = Some(PskSecret::new(psk)?);
160        Ok(self)
161    }
162
163    pub fn psk(&self) -> Option<&str> {
164        self.psk.as_ref().map(PskSecret::expose_secret)
165    }
166}
167
168#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
169pub struct AppConfig {
170    pub alias: String,
171    pub listen_port: u16,
172    pub quic_port: u16,
173    pub download_dir: String,
174    pub auto_accept_known: bool,
175    pub auto_accept_unknown: bool,
176    pub discovery: Vec<String>,
177    pub max_concurrent_files: usize,
178    pub trust: TrustConfig,
179}
180
181impl Default for AppConfig {
182    fn default() -> Self {
183        Self {
184            alias: default_alias(),
185            listen_port: 53317,
186            quic_port: 53318,
187            download_dir: "~/Downloads/ferry".to_string(),
188            auto_accept_known: true,
189            auto_accept_unknown: false,
190            discovery: vec!["native".to_string()],
191            max_concurrent_files: 8,
192            trust: TrustConfig::default(),
193        }
194    }
195}
196
197impl AppConfig {
198    pub fn load_or_default() -> Result<Self> {
199        Self::load_or_default_from(config_dir()?.join("config.toml"))
200    }
201
202    pub fn load_or_default_from(path: impl AsRef<Path>) -> Result<Self> {
203        let path = path.as_ref();
204        match std::fs::read_to_string(path) {
205            Ok(contents) => {
206                toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
207            }
208            Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Self::default()),
209            Err(source) => Err(Error::IoPath {
210                path: path.to_path_buf(),
211                source,
212            }),
213        }
214    }
215
216    pub fn save_default_path(&self) -> Result<()> {
217        self.save_to_path(config_dir()?.join("config.toml"))
218    }
219
220    pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
221        write_toml_file(path.as_ref(), self)
222    }
223
224    pub fn to_toml_string(&self) -> Result<String> {
225        toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
226    }
227
228    pub fn redacted(&self) -> Self {
229        let mut config = self.clone();
230        config.trust = config.trust.redacted();
231        config
232    }
233
234    pub fn to_redacted_toml_string(&self) -> Result<String> {
235        self.redacted().to_toml_string()
236    }
237}
238
239#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
240pub struct DaemonConfig {
241    pub listen: SocketAddr,
242    pub destination: PathBuf,
243}
244
245impl DaemonConfig {
246    pub fn from_app_config(config: &AppConfig) -> Result<Self> {
247        Ok(Self {
248            listen: SocketAddr::from(([0, 0, 0, 0], config.quic_port)),
249            destination: expand_home_path(&config.download_dir)?,
250        })
251    }
252
253    pub fn load_or_default() -> Result<Self> {
254        let app_config = AppConfig::load_or_default()?;
255        Self::load_or_default_from(config_dir()?.join("daemon.toml"), &app_config)
256    }
257
258    pub fn load_or_default_from(path: impl AsRef<Path>, app_config: &AppConfig) -> Result<Self> {
259        let path = path.as_ref();
260        match std::fs::read_to_string(path) {
261            Ok(contents) => {
262                toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
263            }
264            Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
265                Self::from_app_config(app_config)
266            }
267            Err(source) => Err(Error::IoPath {
268                path: path.to_path_buf(),
269                source,
270            }),
271        }
272    }
273
274    pub fn save_default_path(&self) -> Result<()> {
275        self.save_to_path(config_dir()?.join("daemon.toml"))
276    }
277
278    pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
279        write_toml_file(path.as_ref(), self)
280    }
281
282    pub fn to_toml_string(&self) -> Result<String> {
283        toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
284    }
285}
286
287#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
288pub struct TrustConfig {
289    pub require_fingerprint: bool,
290    pub psk: String,
291}
292
293impl Default for TrustConfig {
294    fn default() -> Self {
295        Self {
296            require_fingerprint: true,
297            psk: String::new(),
298        }
299    }
300}
301
302impl TrustConfig {
303    pub fn redacted(&self) -> Self {
304        let psk = if self.psk.is_empty() {
305            String::new()
306        } else {
307            "<redacted>".to_string()
308        };
309        Self {
310            require_fingerprint: self.require_fingerprint,
311            psk,
312        }
313    }
314}
315
316#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
317pub struct NativeIdentity {
318    cert_der: Vec<u8>,
319    key_der: Vec<u8>,
320    fingerprint: String,
321}
322
323impl NativeIdentity {
324    pub fn load_or_generate() -> Result<Self> {
325        Self::load_or_generate_in(config_dir()?)
326    }
327
328    pub fn load_or_generate_in(config_dir: impl AsRef<Path>) -> Result<Self> {
329        let config_dir = config_dir.as_ref();
330        let cert_path = config_dir.join("identity.cert.der");
331        let key_path = config_dir.join("identity.key.der");
332
333        if cert_path.exists() && key_path.exists() {
334            let cert_der = std::fs::read(&cert_path).map_err(|source| Error::IoPath {
335                path: cert_path,
336                source,
337            })?;
338            let key_der = std::fs::read(&key_path).map_err(|source| Error::IoPath {
339                path: key_path,
340                source,
341            })?;
342
343            return Ok(Self::from_parts(cert_der, key_der));
344        }
345
346        std::fs::create_dir_all(config_dir).map_err(|source| Error::IoPath {
347            path: config_dir.to_path_buf(),
348            source,
349        })?;
350
351        let identity = Self::generate()?;
352        write_private_file(&cert_path, &identity.cert_der)?;
353        write_private_file(&key_path, &identity.key_der)?;
354        Ok(identity)
355    }
356
357    pub fn generate() -> Result<Self> {
358        let CertifiedKey { cert, signing_key } =
359            generate_simple_self_signed(vec!["localhost".to_string()])?;
360        Ok(Self::from_parts(
361            cert.der().to_vec(),
362            signing_key.serialize_der(),
363        ))
364    }
365
366    pub fn fingerprint(&self) -> &str {
367        &self.fingerprint
368    }
369
370    fn cert_chain(&self) -> Vec<CertificateDer<'static>> {
371        vec![CertificateDer::from(self.cert_der.clone())]
372    }
373
374    fn private_key(&self) -> PrivateKeyDer<'static> {
375        PrivateKeyDer::from(PrivatePkcs8KeyDer::from(self.key_der.clone()))
376    }
377
378    fn from_parts(cert_der: Vec<u8>, key_der: Vec<u8>) -> Self {
379        let fingerprint = blake3::hash(&cert_der).to_hex().to_string();
380        Self {
381            cert_der,
382            key_der,
383            fingerprint,
384        }
385    }
386}
387
388#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
389#[serde(rename_all = "snake_case")]
390pub enum TransferDirection {
391    Send,
392    Receive,
393}
394
395#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
396pub struct TransferProgress {
397    pub direction: TransferDirection,
398    pub file_name: String,
399    pub bytes_done: u64,
400    pub bytes_total: u64,
401}
402
403#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
404#[serde(tag = "event", rename_all = "snake_case")]
405pub enum TransferEvent {
406    SessionStarted {
407        direction: TransferDirection,
408        session_id: Uuid,
409        files_total: usize,
410        bytes_total: u64,
411    },
412    FileStarted {
413        direction: TransferDirection,
414        file_name: String,
415        bytes_total: u64,
416        resume_offset: u64,
417    },
418    Progress {
419        direction: TransferDirection,
420        file_name: String,
421        bytes_done: u64,
422        bytes_total: u64,
423    },
424    FileFinished {
425        direction: TransferDirection,
426        file_name: String,
427        bytes: u64,
428        blake3: String,
429        status: TransferFileStatus,
430    },
431    SessionFinished {
432        direction: TransferDirection,
433        files_total: usize,
434        bytes_total: u64,
435        blake3: String,
436    },
437    SessionCancelled {
438        direction: TransferDirection,
439        session_id: Uuid,
440    },
441}
442
443impl TransferEvent {
444    pub fn progress(&self) -> Option<TransferProgress> {
445        match self {
446            Self::Progress {
447                direction,
448                file_name,
449                bytes_done,
450                bytes_total,
451            } => Some(TransferProgress {
452                direction: *direction,
453                file_name: file_name.clone(),
454                bytes_done: *bytes_done,
455                bytes_total: *bytes_total,
456            }),
457            _ => None,
458        }
459    }
460}
461
462#[derive(Debug, Clone, Default)]
463pub struct TransferControl {
464    cancelled: Arc<AtomicBool>,
465}
466
467impl TransferControl {
468    pub fn new() -> Self {
469        Self::default()
470    }
471
472    pub fn cancel(&self) {
473        self.cancelled.store(true, Ordering::SeqCst);
474    }
475
476    pub fn is_cancelled(&self) -> bool {
477        self.cancelled.load(Ordering::SeqCst)
478    }
479
480    fn check_cancelled(&self) -> Result<()> {
481        if self.is_cancelled() {
482            Err(Error::TransferCancelled)
483        } else {
484            Ok(())
485        }
486    }
487}
488
489#[derive(Debug, Clone, PartialEq, Eq)]
490pub struct TransferSummary {
491    pub path: PathBuf,
492    pub file_name: String,
493    pub bytes: u64,
494    pub blake3: String,
495    pub files: Vec<TransferFileSummary>,
496}
497
498#[derive(Debug, Clone, PartialEq, Eq)]
499pub struct TransferFileSummary {
500    pub path: PathBuf,
501    pub relative_path: PathBuf,
502    pub bytes: u64,
503    pub blake3: String,
504    pub status: TransferFileStatus,
505}
506
507#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
508#[serde(rename_all = "snake_case")]
509pub enum TransferFileStatus {
510    Sent,
511    Received,
512    Skipped,
513    Resumed,
514}
515
516#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
517pub struct TransferManifest {
518    pub version: u16,
519    pub session_id: Uuid,
520    pub chunk_size: u64,
521    pub entries: Vec<ManifestEntry>,
522}
523
524impl TransferManifest {
525    pub fn file_entries(&self) -> impl Iterator<Item = &ManifestEntry> {
526        self.entries
527            .iter()
528            .filter(|entry| entry.kind == ManifestEntryKind::File)
529    }
530
531    pub fn total_file_bytes(&self) -> u64 {
532        self.file_entries().map(|entry| entry.size).sum()
533    }
534
535    pub fn digest(&self) -> Result<String> {
536        let bytes = serde_json::to_vec(self).map_err(|error| Error::Protocol(error.to_string()))?;
537        Ok(blake3::hash(&bytes).to_hex().to_string())
538    }
539}
540
541#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
542pub struct ManifestEntry {
543    pub relative_path: PathBuf,
544    pub kind: ManifestEntryKind,
545    pub size: u64,
546    pub blake3: Option<String>,
547    pub chunks: Vec<String>,
548    pub unix_mode: Option<u32>,
549    pub modified_unix_ms: Option<i128>,
550}
551
552#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
553#[serde(rename_all = "snake_case")]
554pub enum ManifestEntryKind {
555    File,
556    Directory,
557}
558
559#[derive(Debug, Clone, PartialEq, Eq)]
560pub enum ResumeDecision {
561    Create,
562    Skip,
563    ResumeFrom(u64),
564    Conflict(String),
565}
566
567#[derive(Debug, Clone, PartialEq, Eq)]
568pub struct PeerObservation {
569    pub fingerprint: String,
570    pub alias: Option<String>,
571    pub hostname: Option<String>,
572    pub address: SocketAddr,
573    pub transports: BTreeSet<String>,
574    pub seen_unix_ms: i128,
575}
576
577impl PeerObservation {
578    pub fn new(fingerprint: impl Into<String>, address: SocketAddr, seen_unix_ms: i128) -> Self {
579        Self {
580            fingerprint: fingerprint.into(),
581            alias: None,
582            hostname: None,
583            address,
584            transports: BTreeSet::new(),
585            seen_unix_ms,
586        }
587    }
588
589    pub fn with_alias(mut self, alias: impl Into<String>) -> Self {
590        self.alias = Some(alias.into());
591        self
592    }
593
594    pub fn with_hostname(mut self, hostname: impl Into<String>) -> Self {
595        self.hostname = Some(hostname.into());
596        self
597    }
598
599    pub fn with_transport(mut self, transport: impl Into<String>) -> Self {
600        self.transports.insert(transport.into());
601        self
602    }
603}
604
605#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
606pub struct PeerRecord {
607    pub fingerprint: String,
608    pub aliases: BTreeSet<String>,
609    pub hostnames: BTreeSet<String>,
610    pub addresses: BTreeSet<SocketAddr>,
611    pub transports: BTreeSet<String>,
612    pub first_seen_unix_ms: i128,
613    pub last_seen_unix_ms: i128,
614}
615
616impl PeerRecord {
617    fn from_observation(observation: PeerObservation) -> Self {
618        let mut aliases = BTreeSet::new();
619        if let Some(alias) = observation.alias {
620            aliases.insert(alias);
621        }
622
623        let mut hostnames = BTreeSet::new();
624        if let Some(hostname) = observation.hostname {
625            hostnames.insert(hostname);
626        }
627
628        let mut addresses = BTreeSet::new();
629        addresses.insert(observation.address);
630
631        Self {
632            fingerprint: observation.fingerprint,
633            aliases,
634            hostnames,
635            addresses,
636            transports: observation.transports,
637            first_seen_unix_ms: observation.seen_unix_ms,
638            last_seen_unix_ms: observation.seen_unix_ms,
639        }
640    }
641
642    fn merge(&mut self, observation: PeerObservation) {
643        if let Some(alias) = observation.alias {
644            self.aliases.insert(alias);
645        }
646        if let Some(hostname) = observation.hostname {
647            self.hostnames.insert(hostname);
648        }
649        self.addresses.insert(observation.address);
650        self.transports.extend(observation.transports);
651        self.first_seen_unix_ms = self.first_seen_unix_ms.min(observation.seen_unix_ms);
652        self.last_seen_unix_ms = self.last_seen_unix_ms.max(observation.seen_unix_ms);
653    }
654
655    pub fn preferred_quic_address(&self) -> Option<SocketAddr> {
656        if !self.transports.contains("quic") {
657            return None;
658        }
659
660        self.addresses.iter().copied().next()
661    }
662}
663
664#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
665#[serde(rename_all = "snake_case")]
666pub enum TrustState {
667    Trusted,
668    Blocked,
669}
670
671#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
672pub struct KnownPeerEntry {
673    pub fingerprint: String,
674    #[serde(default)]
675    pub aliases: BTreeSet<String>,
676    #[serde(default)]
677    pub hostnames: BTreeSet<String>,
678    #[serde(default)]
679    pub addresses: BTreeSet<SocketAddr>,
680    #[serde(default)]
681    pub transports: BTreeSet<String>,
682    pub trust_state: TrustState,
683    pub first_seen_unix_ms: Option<i128>,
684    pub last_seen_unix_ms: Option<i128>,
685}
686
687impl KnownPeerEntry {
688    pub fn trusted(fingerprint: impl Into<String>) -> Result<Self> {
689        let fingerprint = normalized_fingerprint(fingerprint.into())?;
690        Ok(Self {
691            fingerprint,
692            aliases: BTreeSet::new(),
693            hostnames: BTreeSet::new(),
694            addresses: BTreeSet::new(),
695            transports: BTreeSet::new(),
696            trust_state: TrustState::Trusted,
697            first_seen_unix_ms: None,
698            last_seen_unix_ms: None,
699        })
700    }
701
702    pub fn from_record(record: &PeerRecord, trust_state: TrustState) -> Result<Self> {
703        let fingerprint = normalized_fingerprint(record.fingerprint.clone())?;
704        Ok(Self {
705            fingerprint,
706            aliases: record.aliases.clone(),
707            hostnames: record.hostnames.clone(),
708            addresses: record.addresses.clone(),
709            transports: record.transports.clone(),
710            trust_state,
711            first_seen_unix_ms: Some(record.first_seen_unix_ms),
712            last_seen_unix_ms: Some(record.last_seen_unix_ms),
713        })
714    }
715}
716
717#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
718pub struct TrustStore {
719    #[serde(default)]
720    pub peers: BTreeMap<String, KnownPeerEntry>,
721}
722
723impl TrustStore {
724    pub fn load_or_default() -> Result<Self> {
725        Self::load_or_default_from(config_dir()?.join("known_peers.toml"))
726    }
727
728    pub fn load_or_default_from(path: impl AsRef<Path>) -> Result<Self> {
729        let path = path.as_ref();
730        match std::fs::read_to_string(path) {
731            Ok(contents) => {
732                toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
733            }
734            Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Self::default()),
735            Err(source) => Err(Error::IoPath {
736                path: path.to_path_buf(),
737                source,
738            }),
739        }
740    }
741
742    pub fn save_default_path(&self) -> Result<()> {
743        self.save_to_path(config_dir()?.join("known_peers.toml"))
744    }
745
746    pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
747        write_toml_file(path.as_ref(), self)
748    }
749
750    pub fn trust_fingerprint(&mut self, fingerprint: impl Into<String>) -> Result<&KnownPeerEntry> {
751        let entry = KnownPeerEntry::trusted(fingerprint)?;
752        let fingerprint = entry.fingerprint.clone();
753        self.peers
754            .entry(fingerprint.clone())
755            .and_modify(|existing| existing.trust_state = TrustState::Trusted)
756            .or_insert(entry);
757        Ok(self
758            .peers
759            .get(&fingerprint)
760            .expect("trusted peer entry should exist"))
761    }
762
763    pub fn trust_record(&mut self, record: &PeerRecord) -> Result<&KnownPeerEntry> {
764        let entry = KnownPeerEntry::from_record(record, TrustState::Trusted)?;
765        let fingerprint = entry.fingerprint.clone();
766        self.peers
767            .entry(fingerprint.clone())
768            .and_modify(|existing| {
769                existing.aliases.extend(record.aliases.clone());
770                existing.hostnames.extend(record.hostnames.clone());
771                existing.addresses.extend(record.addresses.clone());
772                existing.transports.extend(record.transports.clone());
773                existing.first_seen_unix_ms = Some(
774                    existing
775                        .first_seen_unix_ms
776                        .unwrap_or(record.first_seen_unix_ms)
777                        .min(record.first_seen_unix_ms),
778                );
779                existing.last_seen_unix_ms = Some(
780                    existing
781                        .last_seen_unix_ms
782                        .unwrap_or(record.last_seen_unix_ms)
783                        .max(record.last_seen_unix_ms),
784                );
785                existing.trust_state = TrustState::Trusted;
786            })
787            .or_insert(entry);
788        Ok(self
789            .peers
790            .get(&fingerprint)
791            .expect("trusted peer entry should exist"))
792    }
793
794    pub fn trust_direct_address(
795        &mut self,
796        fingerprint: impl Into<String>,
797        address: SocketAddr,
798    ) -> Result<&KnownPeerEntry> {
799        let mut entry = KnownPeerEntry::trusted(fingerprint)?;
800        let seen = system_time_unix_ms(SystemTime::now()).unwrap_or_default();
801        entry.addresses.insert(address);
802        entry.transports.insert("quic".to_string());
803        entry.first_seen_unix_ms = Some(seen);
804        entry.last_seen_unix_ms = Some(seen);
805
806        let fingerprint = entry.fingerprint.clone();
807        self.peers
808            .entry(fingerprint.clone())
809            .and_modify(|existing| {
810                existing.addresses.insert(address);
811                existing.transports.insert("quic".to_string());
812                existing.first_seen_unix_ms =
813                    Some(existing.first_seen_unix_ms.unwrap_or(seen).min(seen));
814                existing.last_seen_unix_ms =
815                    Some(existing.last_seen_unix_ms.unwrap_or(seen).max(seen));
816                existing.trust_state = TrustState::Trusted;
817            })
818            .or_insert(entry);
819        Ok(self
820            .peers
821            .get(&fingerprint)
822            .expect("trusted peer entry should exist"))
823    }
824
825    pub fn is_trusted_fingerprint(&self, fingerprint: &str) -> bool {
826        self.peers
827            .get(fingerprint)
828            .is_some_and(|entry| entry.trust_state == TrustState::Trusted)
829    }
830
831    pub fn trusted_fingerprint_for_address(&self, address: SocketAddr) -> Option<&str> {
832        self.peers
833            .values()
834            .find(|entry| {
835                entry.trust_state == TrustState::Trusted && entry.addresses.contains(&address)
836            })
837            .map(|entry| entry.fingerprint.as_str())
838    }
839
840    pub fn forget(&mut self, fingerprint: &str) -> bool {
841        self.peers.remove(fingerprint).is_some()
842    }
843
844    pub fn records(&self) -> impl Iterator<Item = &KnownPeerEntry> {
845        self.peers.values()
846    }
847
848    pub fn to_toml_string(&self) -> Result<String> {
849        toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
850    }
851}
852
853#[derive(Debug, Default, Clone, PartialEq, Eq)]
854pub struct PeerRegistry {
855    peers: BTreeMap<String, PeerRecord>,
856}
857
858#[derive(Debug, Clone, PartialEq, Eq)]
859pub enum PeerLookup<'a> {
860    Found(&'a PeerRecord),
861    Ambiguous(Vec<&'a PeerRecord>),
862    Missing,
863}
864
865impl PeerRegistry {
866    pub fn new() -> Self {
867        Self::default()
868    }
869
870    pub fn observe(&mut self, observation: PeerObservation) -> Result<&PeerRecord> {
871        if observation.fingerprint.trim().is_empty() {
872            return Err(Error::InvalidInput(
873                "peer fingerprint must not be empty".to_string(),
874            ));
875        }
876
877        let fingerprint = observation.fingerprint.clone();
878        if let Some(record) = self.peers.get_mut(&fingerprint) {
879            record.merge(observation);
880        } else {
881            self.peers.insert(
882                fingerprint.clone(),
883                PeerRecord::from_observation(observation),
884            );
885        }
886
887        Ok(self
888            .peers
889            .get(&fingerprint)
890            .expect("observed peer record should exist"))
891    }
892
893    pub fn get_by_fingerprint(&self, fingerprint: &str) -> Option<&PeerRecord> {
894        self.peers.get(fingerprint)
895    }
896
897    pub fn lookup(&self, query: &str) -> Option<&PeerRecord> {
898        match self.lookup_detail(query) {
899            PeerLookup::Found(record) => Some(record),
900            PeerLookup::Ambiguous(_) | PeerLookup::Missing => None,
901        }
902    }
903
904    pub fn lookup_detail(&self, query: &str) -> PeerLookup<'_> {
905        if let Some(record) = self.peers.get(query) {
906            return PeerLookup::Found(record);
907        }
908
909        let matches = self
910            .peers
911            .values()
912            .filter(|record| {
913                record.fingerprint.starts_with(query)
914                    || record.aliases.iter().any(|alias| alias == query)
915                    || record.hostnames.iter().any(|hostname| hostname == query)
916            })
917            .collect::<Vec<_>>();
918
919        match matches.as_slice() {
920            [] => PeerLookup::Missing,
921            [record] => PeerLookup::Found(record),
922            _ => PeerLookup::Ambiguous(matches),
923        }
924    }
925
926    pub fn records(&self) -> impl Iterator<Item = &PeerRecord> {
927        self.peers.values()
928    }
929}
930
931#[derive(Debug, Clone, PartialEq, Eq)]
932pub struct NativeAnnouncement {
933    pub alias: String,
934    pub fingerprint: String,
935    pub quic_port: u16,
936    pub listen_port: Option<u16>,
937}
938
939impl NativeAnnouncement {
940    pub fn from_config(config: &AppConfig, identity: &NativeIdentity) -> Self {
941        Self {
942            alias: config.alias.clone(),
943            fingerprint: identity.fingerprint().to_string(),
944            quic_port: config.quic_port,
945            listen_port: Some(config.listen_port),
946        }
947    }
948
949    fn service_info(&self) -> Result<ServiceInfo> {
950        let alias = sanitize_dns_label(&self.alias);
951        let fingerprint_prefix = self
952            .fingerprint
953            .get(..12)
954            .unwrap_or(self.fingerprint.as_str());
955        let instance = format!("{alias}-{fingerprint_prefix}");
956        let hostname = format!("{alias}.local.");
957        let properties = [
958            ("pv", NATIVE_PROTOCOL_VERSION.to_string()),
959            ("minpv", MIN_NATIVE_PROTOCOL_VERSION.to_string()),
960            ("fp", self.fingerprint.clone()),
961            ("alias", self.alias.clone()),
962            ("transports", "quic".to_string()),
963            ("quic_port", self.quic_port.to_string()),
964            (
965                "listen_port",
966                self.listen_port
967                    .map(|port| port.to_string())
968                    .unwrap_or_default(),
969            ),
970        ];
971
972        ServiceInfo::new(
973            NATIVE_SERVICE_TYPE,
974            &instance,
975            &hostname,
976            "",
977            self.quic_port,
978            &properties[..],
979        )
980        .map(ServiceInfo::enable_addr_auto)
981        .map_err(|error| Error::Discovery(error.to_string()))
982    }
983}
984
985pub struct NativeAnnouncer {
986    daemon: ServiceDaemon,
987    fullname: String,
988}
989
990impl NativeAnnouncer {
991    pub fn start(announcement: &NativeAnnouncement) -> Result<Self> {
992        let daemon = ServiceDaemon::new().map_err(|error| Error::Discovery(error.to_string()))?;
993        let service = announcement.service_info()?;
994        let fullname = service.get_fullname().to_string();
995        daemon
996            .register(service)
997            .map_err(|error| Error::Discovery(error.to_string()))?;
998        Ok(Self { daemon, fullname })
999    }
1000}
1001
1002impl Drop for NativeAnnouncer {
1003    fn drop(&mut self) {
1004        let _ = self.daemon.unregister(&self.fullname);
1005        let _ = self.daemon.shutdown();
1006    }
1007}
1008
1009pub async fn discover_native_peers(timeout: Duration) -> Result<PeerRegistry> {
1010    let daemon = ServiceDaemon::new().map_err(|error| Error::Discovery(error.to_string()))?;
1011    let receiver = daemon
1012        .browse(NATIVE_SERVICE_TYPE)
1013        .map_err(|error| Error::Discovery(error.to_string()))?;
1014    let deadline = tokio::time::Instant::now() + timeout;
1015    let mut registry = PeerRegistry::new();
1016
1017    loop {
1018        let now = tokio::time::Instant::now();
1019        if now >= deadline {
1020            break;
1021        }
1022
1023        let wait = deadline - now;
1024        match tokio::time::timeout(wait, receiver.recv_async()).await {
1025            Ok(Ok(ServiceEvent::ServiceResolved(service))) => {
1026                observe_resolved_service(&mut registry, &service)?;
1027            }
1028            Ok(Ok(_)) => {}
1029            Ok(Err(error)) => {
1030                return Err(Error::Discovery(error.to_string()));
1031            }
1032            Err(_) => break,
1033        }
1034    }
1035
1036    daemon
1037        .stop_browse(NATIVE_SERVICE_TYPE)
1038        .map_err(|error| Error::Discovery(error.to_string()))?;
1039    let _ = daemon.shutdown();
1040    Ok(registry)
1041}
1042
1043fn observe_resolved_service(registry: &mut PeerRegistry, service: &ResolvedService) -> Result<()> {
1044    let Some(fingerprint) = service.get_property_val_str("fp") else {
1045        return Ok(());
1046    };
1047    let fingerprint = match normalized_fingerprint(fingerprint.to_string()) {
1048        Ok(fingerprint) => fingerprint,
1049        Err(_) => return Ok(()),
1050    };
1051    let Some(highest_version) = parse_txt_u16(service, "pv") else {
1052        return Ok(());
1053    };
1054    let Some(minimum_version) = parse_txt_u16(service, "minpv") else {
1055        return Ok(());
1056    };
1057    if minimum_version > highest_version
1058        || minimum_version > NATIVE_PROTOCOL_VERSION
1059        || highest_version < MIN_NATIVE_PROTOCOL_VERSION
1060    {
1061        return Ok(());
1062    }
1063
1064    let Some(quic_port) = parse_txt_u16(service, "quic_port") else {
1065        return Ok(());
1066    };
1067    let transports = service
1068        .get_property_val_str("transports")
1069        .unwrap_or_default()
1070        .split(',')
1071        .map(str::trim)
1072        .filter(|value| !value.is_empty())
1073        .map(ToOwned::to_owned)
1074        .collect::<Vec<_>>();
1075    if !transports.iter().any(|transport| transport == "quic") {
1076        return Ok(());
1077    }
1078
1079    let alias = service.get_property_val_str("alias").map(ToOwned::to_owned);
1080    let hostname = Some(service.get_hostname().trim_end_matches('.').to_string());
1081    let seen_unix_ms = system_time_unix_ms(SystemTime::now()).unwrap_or_default();
1082
1083    for address in service.get_addresses_v4() {
1084        let mut observation = PeerObservation::new(
1085            fingerprint.clone(),
1086            SocketAddr::from((address, quic_port)),
1087            seen_unix_ms,
1088        );
1089        if let Some(alias) = &alias {
1090            observation = observation.with_alias(alias.clone());
1091        }
1092        if let Some(hostname) = &hostname {
1093            observation = observation.with_hostname(hostname.clone());
1094        }
1095        for transport in &transports {
1096            observation = observation.with_transport(transport.clone());
1097        }
1098        registry.observe(observation)?;
1099    }
1100
1101    Ok(())
1102}
1103
1104fn parse_txt_u16(service: &ResolvedService, key: &str) -> Option<u16> {
1105    service.get_property_val_str(key)?.parse().ok()
1106}
1107
1108fn sanitize_dns_label(value: &str) -> String {
1109    let mut label = value
1110        .chars()
1111        .map(|ch| {
1112            if ch.is_ascii_alphanumeric() || ch == '-' {
1113                ch.to_ascii_lowercase()
1114            } else {
1115                '-'
1116            }
1117        })
1118        .collect::<String>();
1119    while label.contains("--") {
1120        label = label.replace("--", "-");
1121    }
1122    let label = label.trim_matches('-').to_string();
1123    if label.is_empty() {
1124        "fileferry-device".to_string()
1125    } else {
1126        label
1127    }
1128}
1129
1130#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1131struct DirectReceivePlan {
1132    files: Vec<DirectFilePlan>,
1133}
1134
1135#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1136struct DirectProtocolHello {
1137    protocol_version: u16,
1138    min_protocol_version: u16,
1139    client_fingerprint: String,
1140    #[serde(default)]
1141    psk: bool,
1142}
1143
1144impl DirectProtocolHello {
1145    fn new(identity: &NativeIdentity, psk: Option<&str>) -> Self {
1146        Self {
1147            protocol_version: NATIVE_PROTOCOL_VERSION,
1148            min_protocol_version: MIN_NATIVE_PROTOCOL_VERSION,
1149            client_fingerprint: identity.fingerprint().to_string(),
1150            psk: psk.is_some(),
1151        }
1152    }
1153}
1154
1155#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1156struct DirectProtocolResponse {
1157    accepted: bool,
1158    protocol_version: Option<u16>,
1159    #[serde(default)]
1160    psk_challenge: Option<String>,
1161    error: Option<String>,
1162}
1163
1164impl DirectProtocolResponse {
1165    fn accept(protocol_version: u16) -> Self {
1166        Self {
1167            accepted: true,
1168            protocol_version: Some(protocol_version),
1169            psk_challenge: None,
1170            error: None,
1171        }
1172    }
1173
1174    fn accept_with_psk(protocol_version: u16, challenge: impl Into<String>) -> Self {
1175        Self {
1176            accepted: true,
1177            protocol_version: Some(protocol_version),
1178            psk_challenge: Some(challenge.into()),
1179            error: None,
1180        }
1181    }
1182
1183    fn reject(error: impl Into<String>) -> Self {
1184        Self {
1185            accepted: false,
1186            protocol_version: None,
1187            psk_challenge: None,
1188            error: Some(error.into()),
1189        }
1190    }
1191}
1192
1193#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1194struct DirectPskProof {
1195    proof: String,
1196}
1197
1198#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1199struct DirectPskResult {
1200    accepted: bool,
1201    error: Option<String>,
1202}
1203
1204impl DirectPskResult {
1205    fn accept() -> Self {
1206        Self {
1207            accepted: true,
1208            error: None,
1209        }
1210    }
1211
1212    fn reject(error: impl Into<String>) -> Self {
1213        Self {
1214            accepted: false,
1215            error: Some(error.into()),
1216        }
1217    }
1218}
1219
1220#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1221struct DirectFilePlan {
1222    relative_path: PathBuf,
1223    action: DirectFileAction,
1224    offset: u64,
1225}
1226
1227#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1228#[serde(rename_all = "snake_case")]
1229enum DirectFileAction {
1230    Receive,
1231    Skip,
1232}
1233
1234#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1235struct DirectPayloadHeader {
1236    relative_path: PathBuf,
1237    offset: u64,
1238    bytes: u64,
1239}
1240
1241#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1242struct DirectAck {
1243    ok: bool,
1244    error: Option<String>,
1245}
1246
1247pub fn validate_send_paths(paths: &[PathBuf]) -> Result<()> {
1248    if paths.is_empty() {
1249        return Err(Error::InvalidInput(
1250            "at least one file path is required".to_string(),
1251        ));
1252    }
1253
1254    for path in paths {
1255        if path.as_os_str().is_empty() {
1256            return Err(Error::InvalidInput("empty file path".to_string()));
1257        }
1258    }
1259
1260    Ok(())
1261}
1262
1263pub fn display_path(path: &Path) -> String {
1264    path.to_string_lossy().into_owned()
1265}
1266
1267pub async fn build_manifest(paths: &[PathBuf]) -> Result<TransferManifest> {
1268    validate_send_paths(paths)?;
1269
1270    let sources = collect_manifest_sources(paths)?;
1271    let mut entries = Vec::with_capacity(sources.len());
1272    let mut seen = BTreeSet::new();
1273
1274    for source in sources {
1275        if !seen.insert(source.relative_path.clone()) {
1276            return Err(Error::InvalidInput(format!(
1277                "duplicate transfer path: {}",
1278                display_path(&source.relative_path)
1279            )));
1280        }
1281
1282        entries.push(build_manifest_entry(source).await?);
1283    }
1284
1285    Ok(TransferManifest {
1286        version: MANIFEST_VERSION,
1287        session_id: Uuid::new_v4(),
1288        chunk_size: DEFAULT_CHUNK_SIZE,
1289        entries,
1290    })
1291}
1292
1293pub fn safe_destination_path(destination: &Path, relative_path: &Path) -> Result<PathBuf> {
1294    validate_relative_transfer_path(relative_path)?;
1295    Ok(destination.join(relative_path))
1296}
1297
1298pub async fn decide_resume(
1299    destination: &Path,
1300    entry: &ManifestEntry,
1301    chunk_size: u64,
1302) -> Result<ResumeDecision> {
1303    if entry.kind != ManifestEntryKind::File {
1304        return Ok(ResumeDecision::Create);
1305    }
1306
1307    let path = safe_destination_path(destination, &entry.relative_path)?;
1308    let metadata = match tokio::fs::metadata(&path).await {
1309        Ok(metadata) => metadata,
1310        Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
1311            return Ok(ResumeDecision::Create);
1312        }
1313        Err(source) => {
1314            return Err(Error::IoPath { path, source });
1315        }
1316    };
1317
1318    if !metadata.is_file() {
1319        return Ok(ResumeDecision::Conflict(format!(
1320            "{} already exists and is not a regular file",
1321            display_path(&path)
1322        )));
1323    }
1324
1325    if metadata.len() == entry.size {
1326        let expected = entry
1327            .blake3
1328            .as_deref()
1329            .ok_or_else(|| Error::Protocol("file manifest entry is missing BLAKE3".to_string()))?;
1330        let actual = hash_file(&path).await?;
1331        return if actual == expected {
1332            Ok(ResumeDecision::Skip)
1333        } else {
1334            Ok(ResumeDecision::Conflict(format!(
1335                "{} already exists with different contents",
1336                display_path(&path)
1337            )))
1338        };
1339    }
1340
1341    if metadata.len() > entry.size {
1342        return Ok(ResumeDecision::Conflict(format!(
1343            "{} is larger than incoming file",
1344            display_path(&path)
1345        )));
1346    }
1347
1348    let verified_offset = verified_prefix_offset(&path, entry, metadata.len(), chunk_size).await?;
1349    if verified_offset > 0 {
1350        Ok(ResumeDecision::ResumeFrom(verified_offset))
1351    } else {
1352        Ok(ResumeDecision::Conflict(format!(
1353            "{} already exists and does not match a verified prefix",
1354            display_path(&path)
1355        )))
1356    }
1357}
1358
1359pub async fn send_direct_file(
1360    request: &SendRequest,
1361    identity: &NativeIdentity,
1362    mut events: impl FnMut(TransferEvent),
1363) -> Result<TransferSummary> {
1364    send_direct_file_with_control(request, identity, TransferControl::new(), &mut events).await
1365}
1366
1367pub async fn send_direct_file_with_control(
1368    request: &SendRequest,
1369    identity: &NativeIdentity,
1370    control: TransferControl,
1371    mut events: impl FnMut(TransferEvent),
1372) -> Result<TransferSummary> {
1373    send_direct_file_with_control_and_trust(request, identity, control, &mut events, |_| Ok(()))
1374        .await
1375}
1376
1377pub async fn send_direct_file_with_control_and_trust(
1378    request: &SendRequest,
1379    identity: &NativeIdentity,
1380    control: TransferControl,
1381    mut events: impl FnMut(TransferEvent),
1382    mut confirm_unpinned_receiver: impl FnMut(&str) -> Result<()>,
1383) -> Result<TransferSummary> {
1384    ensure_rustls_provider();
1385
1386    control.check_cancelled()?;
1387    let manifest = build_manifest(request.paths()).await?;
1388    let sources = manifest_source_map(request.paths())?;
1389    events(TransferEvent::SessionStarted {
1390        direction: TransferDirection::Send,
1391        session_id: manifest.session_id,
1392        files_total: manifest.file_entries().count(),
1393        bytes_total: manifest.total_file_bytes(),
1394    });
1395    control.check_cancelled()?;
1396
1397    let (client_config, server_fingerprint) = client_config_capturing_server_fingerprint()?;
1398    let mut endpoint = quinn::Endpoint::client(SocketAddr::from(([0, 0, 0, 0], 0)))?;
1399    endpoint.set_default_client_config(client_config);
1400    let connection = endpoint
1401        .connect(request.peer().address(), "localhost")?
1402        .await?;
1403    let actual = captured_server_fingerprint(&server_fingerprint)?;
1404    match request.peer().expected_fingerprint() {
1405        Some(expected) if actual != expected => {
1406            connection.close(1u32.into(), b"fingerprint mismatch");
1407            return Err(Error::PeerFingerprintMismatch {
1408                expected: expected.to_string(),
1409                actual,
1410            });
1411        }
1412        Some(_) => {}
1413        None => {
1414            if let Err(error) = confirm_unpinned_receiver(&actual) {
1415                connection.close(1u32.into(), b"receiver fingerprint not trusted");
1416                return Err(error);
1417            }
1418        }
1419    }
1420    let (mut send, mut recv) = connection.open_bi().await?;
1421
1422    send.write_all(DIRECT_MAGIC).await?;
1423    write_json_frame(
1424        &mut send,
1425        &DirectProtocolHello::new(identity, request.psk()),
1426    )
1427    .await?;
1428    let protocol: DirectProtocolResponse = read_json_frame(&mut recv).await?;
1429    if !protocol.accepted {
1430        return Err(Error::Protocol(protocol.error.unwrap_or_else(|| {
1431            "receiver rejected protocol negotiation".to_string()
1432        })));
1433    }
1434    if protocol.protocol_version != Some(NATIVE_PROTOCOL_VERSION) {
1435        return Err(Error::Protocol(format!(
1436            "receiver selected unsupported protocol version {:?}",
1437            protocol.protocol_version
1438        )));
1439    }
1440    match (request.psk(), protocol.psk_challenge.as_deref()) {
1441        (Some(psk), Some(challenge)) => {
1442            let proof = DirectPskProof {
1443                proof: psk_proof(
1444                    psk,
1445                    challenge,
1446                    identity.fingerprint(),
1447                    NATIVE_PROTOCOL_VERSION,
1448                )?,
1449            };
1450            write_json_frame(&mut send, &proof).await?;
1451            let auth: DirectPskResult = read_json_frame(&mut recv).await?;
1452            if !auth.accepted {
1453                return Err(Error::Authentication(
1454                    auth.error
1455                        .unwrap_or_else(|| "PSK authentication rejected".to_string()),
1456                ));
1457            }
1458        }
1459        (Some(_), None) => {
1460            return Err(Error::Authentication(
1461                "receiver is not configured for PSK mode".to_string(),
1462            ));
1463        }
1464        (None, Some(_)) => {
1465            return Err(Error::Authentication(
1466                "receiver requires PSK authentication".to_string(),
1467            ));
1468        }
1469        (None, None) => {}
1470    }
1471
1472    write_json_frame(&mut send, &manifest).await?;
1473
1474    let plan: DirectReceivePlan = read_json_frame(&mut recv).await?;
1475    let mut summaries = Vec::new();
1476    for file_plan in plan.files {
1477        let Some(entry) = manifest
1478            .file_entries()
1479            .find(|entry| entry.relative_path == file_plan.relative_path)
1480        else {
1481            return Err(Error::Protocol(format!(
1482                "receiver requested unknown file: {}",
1483                display_path(&file_plan.relative_path)
1484            )));
1485        };
1486
1487        let Some(source_path) = sources.get(&file_plan.relative_path) else {
1488            return Err(Error::Protocol(format!(
1489                "missing source for manifest file: {}",
1490                display_path(&file_plan.relative_path)
1491            )));
1492        };
1493
1494        if file_plan.action == DirectFileAction::Skip {
1495            let blake3 = entry.blake3.clone().unwrap_or_default();
1496            events(TransferEvent::FileFinished {
1497                direction: TransferDirection::Send,
1498                file_name: display_path(&entry.relative_path),
1499                bytes: entry.size,
1500                blake3: blake3.clone(),
1501                status: TransferFileStatus::Skipped,
1502            });
1503            summaries.push(TransferFileSummary {
1504                path: source_path.clone(),
1505                relative_path: entry.relative_path.clone(),
1506                bytes: entry.size,
1507                blake3,
1508                status: TransferFileStatus::Skipped,
1509            });
1510            continue;
1511        }
1512
1513        match send_payload_file(
1514            &mut send,
1515            source_path,
1516            entry,
1517            file_plan.offset,
1518            &control,
1519            &mut events,
1520        )
1521        .await
1522        {
1523            Ok(()) => {}
1524            Err(Error::TransferCancelled) => {
1525                events(TransferEvent::SessionCancelled {
1526                    direction: TransferDirection::Send,
1527                    session_id: manifest.session_id,
1528                });
1529                connection.close(1u32.into(), b"cancelled");
1530                return Err(Error::TransferCancelled);
1531            }
1532            Err(error) => return Err(error),
1533        }
1534        let status = if file_plan.offset > 0 {
1535            TransferFileStatus::Resumed
1536        } else {
1537            TransferFileStatus::Sent
1538        };
1539        let blake3 = entry.blake3.clone().unwrap_or_default();
1540        events(TransferEvent::FileFinished {
1541            direction: TransferDirection::Send,
1542            file_name: display_path(&entry.relative_path),
1543            bytes: entry.size,
1544            blake3: blake3.clone(),
1545            status,
1546        });
1547        summaries.push(TransferFileSummary {
1548            path: source_path.clone(),
1549            relative_path: entry.relative_path.clone(),
1550            bytes: entry.size,
1551            blake3,
1552            status,
1553        });
1554    }
1555    send.finish()?;
1556
1557    let ack: DirectAck = read_json_frame(&mut recv).await?;
1558    if !ack.ok {
1559        return Err(Error::Transfer(
1560            ack.error
1561                .unwrap_or_else(|| "receiver rejected transfer".to_string()),
1562        ));
1563    }
1564
1565    connection.close(0u32.into(), b"done");
1566    endpoint.wait_idle().await;
1567
1568    let summary = summary_from_files(&manifest, summaries)?;
1569    events(TransferEvent::SessionFinished {
1570        direction: TransferDirection::Send,
1571        files_total: summary.files.len(),
1572        bytes_total: summary.bytes,
1573        blake3: summary.blake3.clone(),
1574    });
1575    Ok(summary)
1576}
1577
1578pub async fn receive_direct_file(
1579    request: &ReceiveRequest,
1580    destination: impl AsRef<Path>,
1581    identity: &NativeIdentity,
1582    events: impl FnMut(TransferEvent),
1583) -> Result<TransferSummary> {
1584    receive_direct_file_with_control(
1585        request,
1586        destination,
1587        identity,
1588        TransferControl::new(),
1589        events,
1590    )
1591    .await
1592}
1593
1594pub async fn receive_direct_file_with_control(
1595    request: &ReceiveRequest,
1596    destination: impl AsRef<Path>,
1597    identity: &NativeIdentity,
1598    control: TransferControl,
1599    mut events: impl FnMut(TransferEvent),
1600) -> Result<TransferSummary> {
1601    ensure_rustls_provider();
1602
1603    control.check_cancelled()?;
1604    let destination = destination.as_ref();
1605    tokio::fs::create_dir_all(destination)
1606        .await
1607        .map_err(|source| Error::IoPath {
1608            path: destination.to_path_buf(),
1609            source,
1610        })?;
1611
1612    let server_config =
1613        quinn::ServerConfig::with_single_cert(identity.cert_chain(), identity.private_key())?;
1614    let endpoint = quinn::Endpoint::server(server_config, request.listen())?;
1615    let incoming = endpoint.accept().await.ok_or(Error::NoIncomingConnection)?;
1616    let connection = incoming.await?;
1617    let (mut send, mut recv) = connection.accept_bi().await?;
1618
1619    let result = receive_stream_file(
1620        destination,
1621        request.psk(),
1622        &mut recv,
1623        &mut send,
1624        &control,
1625        &mut events,
1626    )
1627    .await;
1628    let ack = match &result {
1629        Ok(_) => DirectAck {
1630            ok: true,
1631            error: None,
1632        },
1633        Err(error) => DirectAck {
1634            ok: false,
1635            error: Some(error.to_string()),
1636        },
1637    };
1638    write_json_frame(&mut send, &ack).await?;
1639    send.finish()?;
1640    let _ = tokio::time::timeout(Duration::from_secs(1), connection.closed()).await;
1641    endpoint.close(0u32.into(), b"done");
1642    endpoint.wait_idle().await;
1643
1644    result
1645}
1646
1647async fn receive_stream_file(
1648    destination: &Path,
1649    psk: Option<&str>,
1650    recv: &mut quinn::RecvStream,
1651    send: &mut quinn::SendStream,
1652    control: &TransferControl,
1653    events: &mut impl FnMut(TransferEvent),
1654) -> Result<TransferSummary> {
1655    let mut magic = [0; DIRECT_MAGIC.len()];
1656    recv.read_exact(&mut magic).await?;
1657    if &magic != DIRECT_MAGIC {
1658        return Err(Error::Protocol("bad direct-transfer magic".to_string()));
1659    }
1660
1661    let hello: DirectProtocolHello = read_json_frame(recv).await?;
1662    let protocol = negotiate_direct_protocol(&hello, psk);
1663    write_json_frame(send, &protocol).await?;
1664    if !protocol.accepted {
1665        return Err(Error::Protocol(protocol.error.unwrap_or_else(|| {
1666            "unsupported direct protocol version".to_string()
1667        })));
1668    }
1669    if let (Some(psk), Some(challenge)) = (psk, protocol.psk_challenge.as_deref()) {
1670        let proof: DirectPskProof = read_json_frame(recv).await?;
1671        let expected = psk_proof(
1672            psk,
1673            challenge,
1674            &hello.client_fingerprint,
1675            protocol.protocol_version.unwrap_or(NATIVE_PROTOCOL_VERSION),
1676        )?;
1677        if proof.proof != expected {
1678            write_json_frame(send, &DirectPskResult::reject("PSK proof mismatch")).await?;
1679            return Err(Error::Authentication("PSK proof mismatch".to_string()));
1680        }
1681        write_json_frame(send, &DirectPskResult::accept()).await?;
1682    }
1683
1684    let manifest: TransferManifest = read_json_frame(recv).await?;
1685    validate_manifest(&manifest)?;
1686    events(TransferEvent::SessionStarted {
1687        direction: TransferDirection::Receive,
1688        session_id: manifest.session_id,
1689        files_total: manifest.file_entries().count(),
1690        bytes_total: manifest.total_file_bytes(),
1691    });
1692    control.check_cancelled()?;
1693
1694    for entry in &manifest.entries {
1695        if entry.kind == ManifestEntryKind::Directory {
1696            let path = safe_destination_path(destination, &entry.relative_path)?;
1697            tokio::fs::create_dir_all(&path)
1698                .await
1699                .map_err(|source| Error::IoPath { path, source })?;
1700        }
1701    }
1702
1703    let mut plan = DirectReceivePlan { files: Vec::new() };
1704    for entry in manifest.file_entries() {
1705        let decision = decide_resume(destination, entry, manifest.chunk_size).await?;
1706        let (action, offset) = match decision {
1707            ResumeDecision::Create => (DirectFileAction::Receive, 0),
1708            ResumeDecision::Skip => (DirectFileAction::Skip, entry.size),
1709            ResumeDecision::ResumeFrom(offset) => (DirectFileAction::Receive, offset),
1710            ResumeDecision::Conflict(message) => return Err(Error::InvalidInput(message)),
1711        };
1712        plan.files.push(DirectFilePlan {
1713            relative_path: entry.relative_path.clone(),
1714            action,
1715            offset,
1716        });
1717    }
1718    write_json_frame(send, &plan).await?;
1719
1720    let mut summaries = Vec::new();
1721    for file_plan in &plan.files {
1722        let Some(entry) = manifest
1723            .file_entries()
1724            .find(|entry| entry.relative_path == file_plan.relative_path)
1725        else {
1726            return Err(Error::Protocol(format!(
1727                "planned file missing from manifest: {}",
1728                display_path(&file_plan.relative_path)
1729            )));
1730        };
1731
1732        let final_path = safe_destination_path(destination, &entry.relative_path)?;
1733        if file_plan.action == DirectFileAction::Skip {
1734            let blake3 = entry.blake3.clone().unwrap_or_default();
1735            events(TransferEvent::FileFinished {
1736                direction: TransferDirection::Receive,
1737                file_name: display_path(&entry.relative_path),
1738                bytes: entry.size,
1739                blake3: blake3.clone(),
1740                status: TransferFileStatus::Skipped,
1741            });
1742            summaries.push(TransferFileSummary {
1743                path: final_path,
1744                relative_path: entry.relative_path.clone(),
1745                bytes: entry.size,
1746                blake3,
1747                status: TransferFileStatus::Skipped,
1748            });
1749            continue;
1750        }
1751
1752        match receive_payload_file(recv, destination, entry, file_plan.offset, control, events)
1753            .await
1754        {
1755            Ok(()) => {}
1756            Err(Error::TransferCancelled) => {
1757                events(TransferEvent::SessionCancelled {
1758                    direction: TransferDirection::Receive,
1759                    session_id: manifest.session_id,
1760                });
1761                return Err(Error::TransferCancelled);
1762            }
1763            Err(error) => return Err(error),
1764        }
1765        let actual_hash = hash_file(&final_path).await?;
1766        let expected_hash = entry
1767            .blake3
1768            .as_deref()
1769            .ok_or_else(|| Error::Protocol("file manifest entry is missing BLAKE3".to_string()))?;
1770        if actual_hash != expected_hash {
1771            return Err(Error::Transfer(format!(
1772                "BLAKE3 hash mismatch for {}",
1773                display_path(&entry.relative_path)
1774            )));
1775        }
1776
1777        let status = if file_plan.offset > 0 {
1778            TransferFileStatus::Resumed
1779        } else {
1780            TransferFileStatus::Received
1781        };
1782        events(TransferEvent::FileFinished {
1783            direction: TransferDirection::Receive,
1784            file_name: display_path(&entry.relative_path),
1785            bytes: entry.size,
1786            blake3: actual_hash.clone(),
1787            status,
1788        });
1789        summaries.push(TransferFileSummary {
1790            path: final_path,
1791            relative_path: entry.relative_path.clone(),
1792            bytes: entry.size,
1793            blake3: actual_hash,
1794            status,
1795        });
1796    }
1797
1798    let summary = summary_from_files(&manifest, summaries)?;
1799    events(TransferEvent::SessionFinished {
1800        direction: TransferDirection::Receive,
1801        files_total: summary.files.len(),
1802        bytes_total: summary.bytes,
1803        blake3: summary.blake3.clone(),
1804    });
1805    Ok(summary)
1806}
1807
1808async fn send_payload_file(
1809    send: &mut quinn::SendStream,
1810    source_path: &Path,
1811    entry: &ManifestEntry,
1812    offset: u64,
1813    control: &TransferControl,
1814    events: &mut impl FnMut(TransferEvent),
1815) -> Result<()> {
1816    if offset > entry.size {
1817        return Err(Error::Protocol(format!(
1818            "resume offset exceeds file size for {}",
1819            display_path(&entry.relative_path)
1820        )));
1821    }
1822
1823    let header = DirectPayloadHeader {
1824        relative_path: entry.relative_path.clone(),
1825        offset,
1826        bytes: entry.size - offset,
1827    };
1828    write_json_frame(send, &header).await?;
1829    events(TransferEvent::FileStarted {
1830        direction: TransferDirection::Send,
1831        file_name: display_path(&entry.relative_path),
1832        bytes_total: entry.size,
1833        resume_offset: offset,
1834    });
1835
1836    let mut file = tokio::fs::File::open(source_path)
1837        .await
1838        .map_err(|source| Error::IoPath {
1839            path: source_path.to_path_buf(),
1840            source,
1841        })?;
1842    file.seek(SeekFrom::Start(offset))
1843        .await
1844        .map_err(|source| Error::IoPath {
1845            path: source_path.to_path_buf(),
1846            source,
1847        })?;
1848
1849    let mut buffer = vec![0; IO_BUFFER_SIZE];
1850    let mut bytes_done = offset;
1851    while bytes_done < entry.size {
1852        control.check_cancelled()?;
1853        let remaining = entry.size - bytes_done;
1854        let read_size = buffer.len().min(remaining as usize);
1855        let read = file
1856            .read(&mut buffer[..read_size])
1857            .await
1858            .map_err(|source| Error::IoPath {
1859                path: source_path.to_path_buf(),
1860                source,
1861            })?;
1862        if read == 0 {
1863            return Err(Error::Protocol(format!(
1864                "source file ended early: {}",
1865                display_path(source_path)
1866            )));
1867        }
1868        send.write_all(&buffer[..read]).await?;
1869        bytes_done += read as u64;
1870        events(TransferEvent::Progress {
1871            direction: TransferDirection::Send,
1872            file_name: display_path(&entry.relative_path),
1873            bytes_done,
1874            bytes_total: entry.size,
1875        });
1876        control.check_cancelled()?;
1877    }
1878
1879    Ok(())
1880}
1881
1882async fn receive_payload_file(
1883    recv: &mut quinn::RecvStream,
1884    destination: &Path,
1885    entry: &ManifestEntry,
1886    offset: u64,
1887    control: &TransferControl,
1888    events: &mut impl FnMut(TransferEvent),
1889) -> Result<()> {
1890    let header: DirectPayloadHeader = read_json_frame(recv).await?;
1891    if header.relative_path != entry.relative_path
1892        || header.offset != offset
1893        || header.bytes != entry.size - offset
1894    {
1895        return Err(Error::Protocol(format!(
1896            "payload header does not match receive plan for {}",
1897            display_path(&entry.relative_path)
1898        )));
1899    }
1900    events(TransferEvent::FileStarted {
1901        direction: TransferDirection::Receive,
1902        file_name: display_path(&entry.relative_path),
1903        bytes_total: entry.size,
1904        resume_offset: offset,
1905    });
1906
1907    let final_path = safe_destination_path(destination, &entry.relative_path)?;
1908    if let Some(parent) = final_path.parent() {
1909        tokio::fs::create_dir_all(parent)
1910            .await
1911            .map_err(|source| Error::IoPath {
1912                path: parent.to_path_buf(),
1913                source,
1914            })?;
1915    }
1916
1917    let write_path = if offset == 0 {
1918        temp_path_for(&final_path)
1919    } else {
1920        final_path.clone()
1921    };
1922
1923    let mut file = if offset == 0 {
1924        tokio::fs::File::create(&write_path)
1925            .await
1926            .map_err(|source| Error::IoPath {
1927                path: write_path.clone(),
1928                source,
1929            })?
1930    } else {
1931        let mut file = tokio::fs::OpenOptions::new()
1932            .write(true)
1933            .open(&write_path)
1934            .await
1935            .map_err(|source| Error::IoPath {
1936                path: write_path.clone(),
1937                source,
1938            })?;
1939        file.set_len(offset).await.map_err(|source| Error::IoPath {
1940            path: write_path.clone(),
1941            source,
1942        })?;
1943        file.seek(SeekFrom::Start(offset))
1944            .await
1945            .map_err(|source| Error::IoPath {
1946                path: write_path.clone(),
1947                source,
1948            })?;
1949        file
1950    };
1951
1952    let mut bytes_done = offset;
1953    let mut bytes_remaining = header.bytes;
1954    let mut buffer = vec![0; IO_BUFFER_SIZE];
1955    while bytes_remaining > 0 {
1956        if control.is_cancelled() {
1957            if offset == 0 {
1958                let _ = tokio::fs::remove_file(&write_path).await;
1959            }
1960            return Err(Error::TransferCancelled);
1961        }
1962        let read_size = buffer.len().min(bytes_remaining as usize);
1963        let read = match recv.read(&mut buffer[..read_size]).await {
1964            Ok(Some(read)) => read,
1965            Ok(None) => {
1966                if offset == 0 {
1967                    let _ = tokio::fs::remove_file(&write_path).await;
1968                }
1969                return Err(Error::Protocol(
1970                    "stream ended before file payload".to_string(),
1971                ));
1972            }
1973            Err(error) => {
1974                if offset == 0 {
1975                    let _ = tokio::fs::remove_file(&write_path).await;
1976                }
1977                return Err(Error::Read(error));
1978            }
1979        };
1980
1981        file.write_all(&buffer[..read])
1982            .await
1983            .map_err(|source| Error::IoPath {
1984                path: write_path.clone(),
1985                source,
1986            })?;
1987        bytes_done += read as u64;
1988        bytes_remaining -= read as u64;
1989        events(TransferEvent::Progress {
1990            direction: TransferDirection::Receive,
1991            file_name: display_path(&entry.relative_path),
1992            bytes_done,
1993            bytes_total: entry.size,
1994        });
1995        if control.is_cancelled() {
1996            if offset == 0 {
1997                let _ = tokio::fs::remove_file(&write_path).await;
1998            }
1999            return Err(Error::TransferCancelled);
2000        }
2001    }
2002
2003    file.flush().await.map_err(|source| Error::IoPath {
2004        path: write_path.clone(),
2005        source,
2006    })?;
2007    drop(file);
2008
2009    if offset == 0 {
2010        tokio::fs::rename(&write_path, &final_path)
2011            .await
2012            .map_err(|source| Error::IoPath {
2013                path: final_path,
2014                source,
2015            })?;
2016    }
2017
2018    Ok(())
2019}
2020
2021async fn write_json_frame<T: Serialize>(send: &mut quinn::SendStream, value: &T) -> Result<()> {
2022    let bytes = serde_json::to_vec(value).map_err(|error| Error::Protocol(error.to_string()))?;
2023    if bytes.len() > MAX_FRAME_LEN {
2024        return Err(Error::Protocol("frame exceeds maximum length".to_string()));
2025    }
2026    send.write_all(&(bytes.len() as u32).to_be_bytes()).await?;
2027    send.write_all(&bytes).await?;
2028    Ok(())
2029}
2030
2031async fn read_json_frame<T: for<'de> Deserialize<'de>>(recv: &mut quinn::RecvStream) -> Result<T> {
2032    let mut len = [0; 4];
2033    recv.read_exact(&mut len).await?;
2034    let len = u32::from_be_bytes(len) as usize;
2035    if len > MAX_FRAME_LEN {
2036        return Err(Error::Protocol("frame exceeds maximum length".to_string()));
2037    }
2038
2039    let mut bytes = vec![0; len];
2040    recv.read_exact(&mut bytes).await?;
2041    serde_json::from_slice(&bytes).map_err(|error| Error::Protocol(error.to_string()))
2042}
2043
2044fn negotiate_direct_protocol(
2045    hello: &DirectProtocolHello,
2046    psk: Option<&str>,
2047) -> DirectProtocolResponse {
2048    if hello.min_protocol_version > NATIVE_PROTOCOL_VERSION {
2049        return DirectProtocolResponse::reject(format!(
2050            "peer requires protocol version {}, local max is {}",
2051            hello.min_protocol_version, NATIVE_PROTOCOL_VERSION
2052        ));
2053    }
2054
2055    if hello.protocol_version < MIN_NATIVE_PROTOCOL_VERSION {
2056        return DirectProtocolResponse::reject(format!(
2057            "peer max protocol version {} is below local minimum {}",
2058            hello.protocol_version, MIN_NATIVE_PROTOCOL_VERSION
2059        ));
2060    }
2061
2062    let selected_version = NATIVE_PROTOCOL_VERSION.min(hello.protocol_version);
2063    match (psk, hello.psk) {
2064        (Some(_), false) => DirectProtocolResponse::reject("receiver requires PSK authentication"),
2065        (Some(_), true) => {
2066            DirectProtocolResponse::accept_with_psk(selected_version, Uuid::new_v4().to_string())
2067        }
2068        (None, true) => DirectProtocolResponse::reject("receiver is not configured for PSK mode"),
2069        (None, false) => DirectProtocolResponse::accept(selected_version),
2070    }
2071}
2072
2073#[derive(Debug, Clone)]
2074struct ManifestSource {
2075    source_path: PathBuf,
2076    relative_path: PathBuf,
2077}
2078
2079fn collect_manifest_sources(paths: &[PathBuf]) -> Result<Vec<ManifestSource>> {
2080    let mut sources = Vec::new();
2081    for path in paths {
2082        let metadata = std::fs::symlink_metadata(path).map_err(|source| Error::IoPath {
2083            path: path.clone(),
2084            source,
2085        })?;
2086        if metadata.file_type().is_symlink() {
2087            return Err(Error::InvalidInput(format!(
2088                "{} is a symlink; symlink transfers are not supported",
2089                display_path(path)
2090            )));
2091        }
2092
2093        let root_name = path
2094            .file_name()
2095            .ok_or_else(|| Error::InvalidInput(format!("{} has no file name", display_path(path))))?
2096            .to_os_string();
2097        let relative_path = PathBuf::from(root_name);
2098
2099        if metadata.is_dir() {
2100            sources.push(ManifestSource {
2101                source_path: path.clone(),
2102                relative_path: relative_path.clone(),
2103            });
2104            collect_dir_sources(path, &relative_path, &mut sources)?;
2105        } else if metadata.is_file() {
2106            sources.push(ManifestSource {
2107                source_path: path.clone(),
2108                relative_path,
2109            });
2110        } else {
2111            return Err(Error::InvalidInput(format!(
2112                "{} is not a regular file or directory",
2113                display_path(path)
2114            )));
2115        }
2116    }
2117
2118    Ok(sources)
2119}
2120
2121fn collect_dir_sources(
2122    dir: &Path,
2123    relative_dir: &Path,
2124    sources: &mut Vec<ManifestSource>,
2125) -> Result<()> {
2126    for entry in std::fs::read_dir(dir).map_err(|source| Error::IoPath {
2127        path: dir.to_path_buf(),
2128        source,
2129    })? {
2130        let entry = entry.map_err(|source| Error::IoPath {
2131            path: dir.to_path_buf(),
2132            source,
2133        })?;
2134        let path = entry.path();
2135        let metadata = std::fs::symlink_metadata(&path).map_err(|source| Error::IoPath {
2136            path: path.clone(),
2137            source,
2138        })?;
2139        if metadata.file_type().is_symlink() {
2140            return Err(Error::InvalidInput(format!(
2141                "{} is a symlink; symlink transfers are not supported",
2142                display_path(&path)
2143            )));
2144        }
2145
2146        let relative_path = relative_dir.join(entry.file_name());
2147        if metadata.is_dir() {
2148            sources.push(ManifestSource {
2149                source_path: path.clone(),
2150                relative_path: relative_path.clone(),
2151            });
2152            collect_dir_sources(&path, &relative_path, sources)?;
2153        } else if metadata.is_file() {
2154            sources.push(ManifestSource {
2155                source_path: path,
2156                relative_path,
2157            });
2158        }
2159    }
2160
2161    Ok(())
2162}
2163
2164fn manifest_source_map(paths: &[PathBuf]) -> Result<BTreeMap<PathBuf, PathBuf>> {
2165    Ok(collect_manifest_sources(paths)?
2166        .into_iter()
2167        .filter_map(|source| {
2168            let metadata = std::fs::metadata(&source.source_path).ok()?;
2169            metadata
2170                .is_file()
2171                .then_some((source.relative_path, source.source_path))
2172        })
2173        .collect())
2174}
2175
2176async fn build_manifest_entry(source: ManifestSource) -> Result<ManifestEntry> {
2177    validate_relative_transfer_path(&source.relative_path)?;
2178    let metadata = tokio::fs::metadata(&source.source_path)
2179        .await
2180        .map_err(|source_error| Error::IoPath {
2181            path: source.source_path.clone(),
2182            source: source_error,
2183        })?;
2184    let unix_mode = unix_mode(&metadata);
2185    let modified_unix_ms = modified_unix_ms(&metadata);
2186
2187    if metadata.is_dir() {
2188        return Ok(ManifestEntry {
2189            relative_path: source.relative_path,
2190            kind: ManifestEntryKind::Directory,
2191            size: 0,
2192            blake3: None,
2193            chunks: Vec::new(),
2194            unix_mode,
2195            modified_unix_ms,
2196        });
2197    }
2198
2199    let (blake3, chunks) = hash_file_with_chunks(&source.source_path, DEFAULT_CHUNK_SIZE).await?;
2200    Ok(ManifestEntry {
2201        relative_path: source.relative_path,
2202        kind: ManifestEntryKind::File,
2203        size: metadata.len(),
2204        blake3: Some(blake3),
2205        chunks,
2206        unix_mode,
2207        modified_unix_ms,
2208    })
2209}
2210
2211fn validate_manifest(manifest: &TransferManifest) -> Result<()> {
2212    if manifest.version != MANIFEST_VERSION {
2213        return Err(Error::Protocol(format!(
2214            "unsupported manifest version {}",
2215            manifest.version
2216        )));
2217    }
2218    if manifest.chunk_size == 0 {
2219        return Err(Error::Protocol("manifest chunk size is zero".to_string()));
2220    }
2221
2222    let mut seen = BTreeSet::new();
2223    for entry in &manifest.entries {
2224        validate_relative_transfer_path(&entry.relative_path)?;
2225        if !seen.insert(entry.relative_path.clone()) {
2226            return Err(Error::Protocol(format!(
2227                "duplicate manifest path: {}",
2228                display_path(&entry.relative_path)
2229            )));
2230        }
2231        if entry.kind == ManifestEntryKind::File && entry.blake3.is_none() {
2232            return Err(Error::Protocol(format!(
2233                "file manifest entry is missing BLAKE3: {}",
2234                display_path(&entry.relative_path)
2235            )));
2236        }
2237    }
2238
2239    Ok(())
2240}
2241
2242fn validate_relative_transfer_path(path: &Path) -> Result<()> {
2243    if path.as_os_str().is_empty() || path.is_absolute() {
2244        return Err(Error::InvalidInput(format!(
2245            "unsafe transfer path: {}",
2246            display_path(path)
2247        )));
2248    }
2249
2250    let mut normal_components = 0usize;
2251    for component in path.components() {
2252        match component {
2253            Component::Normal(value) => {
2254                let Some(name) = value.to_str() else {
2255                    return Err(Error::InvalidInput(format!(
2256                        "transfer path must be UTF-8: {}",
2257                        display_path(path)
2258                    )));
2259                };
2260                validate_path_component(name, path)?;
2261                normal_components += 1;
2262            }
2263            Component::CurDir
2264            | Component::ParentDir
2265            | Component::RootDir
2266            | Component::Prefix(_) => {
2267                return Err(Error::InvalidInput(format!(
2268                    "unsafe transfer path: {}",
2269                    display_path(path)
2270                )));
2271            }
2272        }
2273    }
2274
2275    if normal_components == 0 {
2276        return Err(Error::InvalidInput(format!(
2277            "unsafe transfer path: {}",
2278            display_path(path)
2279        )));
2280    }
2281
2282    Ok(())
2283}
2284
2285fn validate_path_component(component: &str, full_path: &Path) -> Result<()> {
2286    if component.is_empty()
2287        || component.contains('\\')
2288        || component.contains('/')
2289        || component.contains(':')
2290        || component.ends_with(' ')
2291        || component.ends_with('.')
2292        || is_windows_reserved_name(component)
2293    {
2294        return Err(Error::InvalidInput(format!(
2295            "unsafe transfer path: {}",
2296            display_path(full_path)
2297        )));
2298    }
2299
2300    Ok(())
2301}
2302
2303fn is_windows_reserved_name(component: &str) -> bool {
2304    let stem = component
2305        .split('.')
2306        .next()
2307        .unwrap_or(component)
2308        .trim_end_matches([' ', '.'])
2309        .to_ascii_uppercase();
2310    matches!(
2311        stem.as_str(),
2312        "CON"
2313            | "PRN"
2314            | "AUX"
2315            | "NUL"
2316            | "COM1"
2317            | "COM2"
2318            | "COM3"
2319            | "COM4"
2320            | "COM5"
2321            | "COM6"
2322            | "COM7"
2323            | "COM8"
2324            | "COM9"
2325            | "LPT1"
2326            | "LPT2"
2327            | "LPT3"
2328            | "LPT4"
2329            | "LPT5"
2330            | "LPT6"
2331            | "LPT7"
2332            | "LPT8"
2333            | "LPT9"
2334    )
2335}
2336
2337async fn hash_file(path: &Path) -> Result<String> {
2338    hash_file_with_chunks(path, DEFAULT_CHUNK_SIZE)
2339        .await
2340        .map(|(hash, _)| hash)
2341}
2342
2343async fn hash_file_with_chunks(path: &Path, chunk_size: u64) -> Result<(String, Vec<String>)> {
2344    let mut file = tokio::fs::File::open(path)
2345        .await
2346        .map_err(|source| Error::IoPath {
2347            path: path.to_path_buf(),
2348            source,
2349        })?;
2350    let mut hasher = blake3::Hasher::new();
2351    let mut chunk_hasher = blake3::Hasher::new();
2352    let mut chunk_bytes = 0u64;
2353    let mut chunks = Vec::new();
2354    let mut buffer = vec![0; IO_BUFFER_SIZE];
2355
2356    loop {
2357        let read = file
2358            .read(&mut buffer)
2359            .await
2360            .map_err(|source| Error::IoPath {
2361                path: path.to_path_buf(),
2362                source,
2363            })?;
2364        if read == 0 {
2365            break;
2366        }
2367        hasher.update(&buffer[..read]);
2368
2369        let mut remaining = &buffer[..read];
2370        while !remaining.is_empty() {
2371            let take = remaining.len().min((chunk_size - chunk_bytes) as usize);
2372            chunk_hasher.update(&remaining[..take]);
2373            chunk_bytes += take as u64;
2374            remaining = &remaining[take..];
2375            if chunk_bytes == chunk_size {
2376                chunks.push(chunk_hasher.finalize().to_hex().to_string());
2377                chunk_hasher = blake3::Hasher::new();
2378                chunk_bytes = 0;
2379            }
2380        }
2381    }
2382
2383    if chunk_bytes > 0 {
2384        chunks.push(chunk_hasher.finalize().to_hex().to_string());
2385    }
2386
2387    Ok((hasher.finalize().to_hex().to_string(), chunks))
2388}
2389
2390async fn verified_prefix_offset(
2391    path: &Path,
2392    entry: &ManifestEntry,
2393    existing_len: u64,
2394    chunk_size: u64,
2395) -> Result<u64> {
2396    if existing_len == 0 || entry.chunks.is_empty() {
2397        return Ok(0);
2398    }
2399
2400    let chunks_to_check = (existing_len / chunk_size).min(entry.chunks.len() as u64) as usize;
2401    if chunks_to_check == 0 {
2402        return Ok(0);
2403    }
2404
2405    let mut file = tokio::fs::File::open(path)
2406        .await
2407        .map_err(|source| Error::IoPath {
2408            path: path.to_path_buf(),
2409            source,
2410        })?;
2411    let mut buffer = vec![0; IO_BUFFER_SIZE];
2412    let mut verified_chunks = 0usize;
2413
2414    for expected in entry.chunks.iter().take(chunks_to_check) {
2415        let mut remaining = chunk_size;
2416        let mut hasher = blake3::Hasher::new();
2417        while remaining > 0 {
2418            let read_size = buffer.len().min(remaining as usize);
2419            file.read_exact(&mut buffer[..read_size])
2420                .await
2421                .map_err(|source| Error::IoPath {
2422                    path: path.to_path_buf(),
2423                    source,
2424                })?;
2425            hasher.update(&buffer[..read_size]);
2426            remaining -= read_size as u64;
2427        }
2428
2429        if hasher.finalize().to_hex().as_str() != expected {
2430            break;
2431        }
2432        verified_chunks += 1;
2433    }
2434
2435    Ok(verified_chunks as u64 * chunk_size)
2436}
2437
2438fn summary_from_files(
2439    manifest: &TransferManifest,
2440    files: Vec<TransferFileSummary>,
2441) -> Result<TransferSummary> {
2442    let bytes = manifest.total_file_bytes();
2443    let blake3 = if files.len() == 1 {
2444        files[0].blake3.clone()
2445    } else {
2446        manifest.digest()?
2447    };
2448    let file_name = if files.len() == 1 {
2449        display_path(&files[0].relative_path)
2450    } else {
2451        format!("{} files", files.len())
2452    };
2453    let path = files
2454        .first()
2455        .map(|file| file.path.clone())
2456        .unwrap_or_default();
2457
2458    Ok(TransferSummary {
2459        path,
2460        file_name,
2461        bytes,
2462        blake3,
2463        files,
2464    })
2465}
2466
2467fn temp_path_for(final_path: &Path) -> PathBuf {
2468    let file_name = final_path
2469        .file_name()
2470        .and_then(|name| name.to_str())
2471        .unwrap_or("transfer");
2472    final_path.with_file_name(format!(".{file_name}.{}.ferry-part", Uuid::new_v4()))
2473}
2474
2475fn modified_unix_ms(metadata: &Metadata) -> Option<i128> {
2476    system_time_unix_ms(metadata.modified().ok()?)
2477}
2478
2479fn system_time_unix_ms(time: SystemTime) -> Option<i128> {
2480    match time.duration_since(UNIX_EPOCH) {
2481        Ok(duration) => Some(duration.as_millis() as i128),
2482        Err(error) => Some(-(error.duration().as_millis() as i128)),
2483    }
2484}
2485
2486#[cfg(unix)]
2487fn unix_mode(metadata: &Metadata) -> Option<u32> {
2488    use std::os::unix::fs::PermissionsExt;
2489
2490    Some(metadata.permissions().mode())
2491}
2492
2493#[cfg(not(unix))]
2494fn unix_mode(_metadata: &Metadata) -> Option<u32> {
2495    None
2496}
2497
2498fn client_config_capturing_server_fingerprint()
2499-> Result<(quinn::ClientConfig, Arc<Mutex<Option<String>>>)> {
2500    let server_fingerprint = Arc::new(Mutex::new(None));
2501    let crypto = rustls::ClientConfig::builder()
2502        .dangerous()
2503        .with_custom_certificate_verifier(ServerFingerprintVerifier::new(
2504            server_fingerprint.clone(),
2505        ))
2506        .with_no_client_auth();
2507
2508    let config = quinn::ClientConfig::new(Arc::new(
2509        QuicClientConfig::try_from(crypto)
2510            .map_err(|error| Error::CryptoConfig(error.to_string()))?,
2511    ));
2512    Ok((config, server_fingerprint))
2513}
2514
2515fn captured_server_fingerprint(fingerprint: &Arc<Mutex<Option<String>>>) -> Result<String> {
2516    fingerprint
2517        .lock()
2518        .map_err(|_| Error::CryptoConfig("server fingerprint capture lock poisoned".to_string()))?
2519        .clone()
2520        .ok_or_else(|| {
2521            Error::CryptoConfig("server certificate fingerprint was not captured".to_string())
2522        })
2523}
2524
2525#[derive(Debug)]
2526struct ServerFingerprintVerifier {
2527    provider: Arc<CryptoProvider>,
2528    server_fingerprint: Arc<Mutex<Option<String>>>,
2529}
2530
2531impl ServerFingerprintVerifier {
2532    fn new(server_fingerprint: Arc<Mutex<Option<String>>>) -> Arc<Self> {
2533        Arc::new(Self {
2534            provider: Arc::new(rustls::crypto::ring::default_provider()),
2535            server_fingerprint,
2536        })
2537    }
2538}
2539
2540impl ServerCertVerifier for ServerFingerprintVerifier {
2541    fn verify_server_cert(
2542        &self,
2543        end_entity: &CertificateDer<'_>,
2544        _intermediates: &[CertificateDer<'_>],
2545        _server_name: &ServerName<'_>,
2546        _ocsp: &[u8],
2547        _now: UnixTime,
2548    ) -> std::result::Result<ServerCertVerified, rustls::Error> {
2549        let fingerprint = blake3::hash(end_entity.as_ref()).to_hex().to_string();
2550        *self.server_fingerprint.lock().map_err(|_| {
2551            rustls::Error::General("server fingerprint capture lock poisoned".to_string())
2552        })? = Some(fingerprint);
2553        Ok(ServerCertVerified::assertion())
2554    }
2555
2556    fn verify_tls12_signature(
2557        &self,
2558        message: &[u8],
2559        cert: &CertificateDer<'_>,
2560        dss: &DigitallySignedStruct,
2561    ) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
2562        verify_tls12_signature(
2563            message,
2564            cert,
2565            dss,
2566            &self.provider.signature_verification_algorithms,
2567        )
2568    }
2569
2570    fn verify_tls13_signature(
2571        &self,
2572        message: &[u8],
2573        cert: &CertificateDer<'_>,
2574        dss: &DigitallySignedStruct,
2575    ) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
2576        verify_tls13_signature(
2577            message,
2578            cert,
2579            dss,
2580            &self.provider.signature_verification_algorithms,
2581        )
2582    }
2583
2584    fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
2585        self.provider
2586            .signature_verification_algorithms
2587            .supported_schemes()
2588    }
2589}
2590
2591fn ensure_rustls_provider() {
2592    let _ = rustls::crypto::ring::default_provider().install_default();
2593}
2594
2595pub fn config_dir() -> Result<PathBuf> {
2596    let base_dirs = BaseDirs::new().ok_or(Error::ConfigDirUnavailable)?;
2597    Ok(base_dirs.config_dir().join(CONFIG_DIR_NAME))
2598}
2599
2600fn default_alias() -> String {
2601    std::env::var("HOSTNAME")
2602        .or_else(|_| std::env::var("COMPUTERNAME"))
2603        .ok()
2604        .filter(|value| !value.trim().is_empty())
2605        .unwrap_or_else(|| "fileferry-device".to_string())
2606}
2607
2608fn expand_home_path(path: &str) -> Result<PathBuf> {
2609    if path == "~" {
2610        return Ok(BaseDirs::new()
2611            .ok_or(Error::ConfigDirUnavailable)?
2612            .home_dir()
2613            .to_path_buf());
2614    }
2615
2616    if let Some(rest) = path.strip_prefix("~/") {
2617        return Ok(BaseDirs::new()
2618            .ok_or(Error::ConfigDirUnavailable)?
2619            .home_dir()
2620            .join(rest));
2621    }
2622
2623    Ok(PathBuf::from(path))
2624}
2625
2626fn normalized_fingerprint(fingerprint: String) -> Result<String> {
2627    let fingerprint = fingerprint.trim().to_ascii_lowercase();
2628    if fingerprint.len() != 64 || !fingerprint.chars().all(|char| char.is_ascii_hexdigit()) {
2629        return Err(Error::InvalidInput(
2630            "peer fingerprint must be a full 64-character hex BLAKE3 fingerprint".to_string(),
2631        ));
2632    }
2633
2634    Ok(fingerprint)
2635}
2636
2637fn validate_psk(psk: String) -> Result<String> {
2638    if psk.is_empty() {
2639        return Err(Error::InvalidInput("PSK must not be empty".to_string()));
2640    }
2641
2642    Ok(psk)
2643}
2644
2645fn psk_proof(
2646    psk: &str,
2647    challenge: &str,
2648    client_fingerprint: &str,
2649    protocol_version: u16,
2650) -> Result<String> {
2651    validate_psk(psk.to_string())?;
2652    let key = blake3::hash(psk.as_bytes());
2653    let mut message = Vec::new();
2654    message.extend_from_slice(PSK_PROOF_CONTEXT);
2655    message.push(0);
2656    message.extend_from_slice(challenge.as_bytes());
2657    message.push(0);
2658    message.extend_from_slice(client_fingerprint.as_bytes());
2659    message.push(0);
2660    message.extend_from_slice(protocol_version.to_string().as_bytes());
2661    Ok(blake3::keyed_hash(key.as_bytes(), &message)
2662        .to_hex()
2663        .to_string())
2664}
2665
2666fn write_toml_file<T: Serialize>(path: &Path, value: &T) -> Result<()> {
2667    let bytes =
2668        toml::to_string_pretty(value).map_err(|error| Error::ConfigSerialize(error.to_string()))?;
2669    if let Some(parent) = path.parent() {
2670        std::fs::create_dir_all(parent).map_err(|source| Error::IoPath {
2671            path: parent.to_path_buf(),
2672            source,
2673        })?;
2674    }
2675
2676    let temp_path = path.with_extension(format!("tmp-{}", Uuid::new_v4()));
2677    std::fs::write(&temp_path, bytes).map_err(|source| Error::IoPath {
2678        path: temp_path.clone(),
2679        source,
2680    })?;
2681    std::fs::rename(&temp_path, path).map_err(|source| Error::IoPath {
2682        path: path.to_path_buf(),
2683        source,
2684    })?;
2685    Ok(())
2686}
2687
2688fn write_private_file(path: &Path, bytes: &[u8]) -> Result<()> {
2689    std::fs::write(path, bytes).map_err(|source| Error::IoPath {
2690        path: path.to_path_buf(),
2691        source,
2692    })?;
2693
2694    #[cfg(unix)]
2695    {
2696        use std::os::unix::fs::PermissionsExt;
2697
2698        let mut permissions = std::fs::metadata(path)
2699            .map_err(|source| Error::IoPath {
2700                path: path.to_path_buf(),
2701                source,
2702            })?
2703            .permissions();
2704        permissions.set_mode(0o600);
2705        std::fs::set_permissions(path, permissions).map_err(|source| Error::IoPath {
2706            path: path.to_path_buf(),
2707            source,
2708        })?;
2709    }
2710
2711    Ok(())
2712}
2713
2714#[cfg(test)]
2715mod tests {
2716    use super::*;
2717    use std::sync::{Arc, Mutex};
2718
2719    #[test]
2720    fn direct_peer_parses_socket_address() {
2721        let peer = DirectPeer::parse("127.0.0.1:53318").expect("address parses");
2722
2723        assert_eq!(peer.address().to_string(), "127.0.0.1:53318");
2724        assert_eq!(peer.expected_fingerprint(), None);
2725    }
2726
2727    #[test]
2728    fn direct_peer_accepts_expected_fingerprint() {
2729        let fingerprint = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
2730        let peer = DirectPeer::parse("127.0.0.1:53318")
2731            .expect("address parses")
2732            .with_expected_fingerprint(fingerprint)
2733            .expect("fingerprint parses");
2734
2735        assert_eq!(peer.expected_fingerprint(), Some(fingerprint));
2736    }
2737
2738    #[test]
2739    fn direct_peer_rejects_missing_port() {
2740        assert!(DirectPeer::parse("127.0.0.1").is_err());
2741    }
2742
2743    #[test]
2744    fn send_request_requires_at_least_one_path() {
2745        let peer = DirectPeer::parse("127.0.0.1:53318").expect("address parses");
2746
2747        assert!(SendRequest::new(peer, Vec::new()).is_err());
2748    }
2749
2750    #[test]
2751    fn psk_secret_is_redacted_in_debug_and_serialization() {
2752        let secret = PskSecret::new("do-not-print").expect("psk accepted");
2753
2754        assert_eq!(format!("{secret:?}"), "PskSecret(<redacted>)");
2755        assert_eq!(
2756            serde_json::to_string(&secret).expect("secret serializes"),
2757            r#""<redacted>""#
2758        );
2759
2760        let peer = DirectPeer::parse("127.0.0.1:53318").expect("address parses");
2761        let request = SendRequest::new(peer, vec![PathBuf::from("file.txt")])
2762            .expect("send request")
2763            .with_psk("do-not-print")
2764            .expect("psk accepted");
2765        assert!(!format!("{request:?}").contains("do-not-print"));
2766    }
2767
2768    #[test]
2769    fn validate_send_paths_rejects_empty_slice() {
2770        assert!(validate_send_paths(&[]).is_err());
2771    }
2772
2773    #[tokio::test]
2774    async fn manifest_walks_directory_and_serializes_entries() {
2775        let temp = tempfile::tempdir().expect("tempdir");
2776        let root = temp.path().join("bundle");
2777        let nested = root.join("nested");
2778        tokio::fs::create_dir_all(&nested).await.expect("dirs");
2779        tokio::fs::write(root.join("a.txt"), b"alpha")
2780            .await
2781            .expect("file a");
2782        tokio::fs::write(nested.join("b.txt"), b"beta")
2783            .await
2784            .expect("file b");
2785
2786        let manifest = build_manifest(&[root]).await.expect("manifest");
2787        let json = serde_json::to_string(&manifest).expect("manifest serializes");
2788        let decoded: TransferManifest = serde_json::from_str(&json).expect("manifest decodes");
2789
2790        assert_eq!(decoded.version, MANIFEST_VERSION);
2791        assert!(decoded.entries.iter().any(|entry| {
2792            entry.kind == ManifestEntryKind::Directory
2793                && entry.relative_path == Path::new("bundle/nested")
2794        }));
2795        assert!(decoded.entries.iter().any(|entry| {
2796            entry.kind == ManifestEntryKind::File
2797                && entry.relative_path == Path::new("bundle/nested/b.txt")
2798                && entry.size == 4
2799                && entry.blake3.is_some()
2800        }));
2801    }
2802
2803    #[test]
2804    fn safe_destination_path_rejects_unsafe_paths() {
2805        let dest = Path::new("/tmp/ferry-dest");
2806
2807        assert!(safe_destination_path(dest, Path::new("../escape.txt")).is_err());
2808        assert!(safe_destination_path(dest, Path::new("/absolute.txt")).is_err());
2809        assert!(safe_destination_path(dest, Path::new("nested/CON")).is_err());
2810        assert!(safe_destination_path(dest, Path::new("nested\\escape.txt")).is_err());
2811        assert!(safe_destination_path(dest, Path::new("nested/ok.txt")).is_ok());
2812    }
2813
2814    #[test]
2815    fn safe_destination_path_rejects_generated_escape_and_reserved_patterns() {
2816        let dest = Path::new("/tmp/ferry-dest");
2817        let unsafe_components = [
2818            "..",
2819            ".",
2820            "CON",
2821            "con.txt",
2822            "NUL",
2823            "COM1",
2824            "LPT9",
2825            "trailingspace ",
2826            "trailingdot.",
2827            "has:colon",
2828            "has\\backslash",
2829        ];
2830
2831        for component in unsafe_components {
2832            assert!(
2833                safe_destination_path(dest, Path::new(component)).is_err(),
2834                "{component} should be rejected as a top-level path"
2835            );
2836            if component != "." {
2837                assert!(
2838                    safe_destination_path(dest, Path::new("safe").join(component).as_path())
2839                        .is_err(),
2840                    "{component} should be rejected as a nested path"
2841                );
2842            }
2843        }
2844
2845        for component in ["alpha", "alpha-1", "alpha_1", "report.final.txt"] {
2846            let path = safe_destination_path(dest, Path::new("safe").join(component).as_path())
2847                .expect("safe generated path accepted");
2848            assert!(path.starts_with(dest));
2849        }
2850    }
2851
2852    #[tokio::test]
2853    async fn resume_decision_skips_existing_matching_file() {
2854        let temp = tempfile::tempdir().expect("tempdir");
2855        let source = temp.path().join("source.txt");
2856        let dest = temp.path().join("dest");
2857        tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2858        tokio::fs::write(&source, b"same contents")
2859            .await
2860            .expect("source");
2861        tokio::fs::write(dest.join("source.txt"), b"same contents")
2862            .await
2863            .expect("dest file");
2864        let manifest = build_manifest(&[source]).await.expect("manifest");
2865        let entry = manifest.file_entries().next().expect("file entry");
2866
2867        let decision = decide_resume(&dest, entry, manifest.chunk_size)
2868            .await
2869            .expect("decision");
2870
2871        assert_eq!(decision, ResumeDecision::Skip);
2872    }
2873
2874    #[tokio::test]
2875    async fn resume_decision_returns_verified_chunk_offset() {
2876        let temp = tempfile::tempdir().expect("tempdir");
2877        let source = temp.path().join("large.bin");
2878        let dest = temp.path().join("dest");
2879        tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2880        let bytes = vec![42; (DEFAULT_CHUNK_SIZE as usize * 2) + 128];
2881        tokio::fs::write(&source, &bytes).await.expect("source");
2882        tokio::fs::write(
2883            dest.join("large.bin"),
2884            &bytes[..DEFAULT_CHUNK_SIZE as usize + 99],
2885        )
2886        .await
2887        .expect("partial");
2888        let manifest = build_manifest(&[source]).await.expect("manifest");
2889        let entry = manifest.file_entries().next().expect("file entry");
2890
2891        let decision = decide_resume(&dest, entry, manifest.chunk_size)
2892            .await
2893            .expect("decision");
2894
2895        assert_eq!(decision, ResumeDecision::ResumeFrom(DEFAULT_CHUNK_SIZE));
2896    }
2897
2898    #[tokio::test]
2899    async fn resume_decision_rejects_existing_file_with_mismatched_contents() {
2900        let temp = tempfile::tempdir().expect("tempdir");
2901        let source = temp.path().join("source.txt");
2902        let dest = temp.path().join("dest");
2903        tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2904        tokio::fs::write(&source, b"expected contents")
2905            .await
2906            .expect("source");
2907        tokio::fs::write(dest.join("source.txt"), b"different contents")
2908            .await
2909            .expect("conflicting dest file");
2910        let manifest = build_manifest(&[source]).await.expect("manifest");
2911        let entry = manifest.file_entries().next().expect("file entry");
2912
2913        let decision = decide_resume(&dest, entry, manifest.chunk_size)
2914            .await
2915            .expect("decision");
2916
2917        assert!(matches!(decision, ResumeDecision::Conflict(_)));
2918    }
2919
2920    #[tokio::test]
2921    async fn resume_decision_rejects_existing_file_larger_than_manifest_entry() {
2922        let temp = tempfile::tempdir().expect("tempdir");
2923        let source = temp.path().join("source.txt");
2924        let dest = temp.path().join("dest");
2925        tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2926        tokio::fs::write(&source, b"small").await.expect("source");
2927        tokio::fs::write(dest.join("source.txt"), b"larger destination")
2928            .await
2929            .expect("larger dest file");
2930        let manifest = build_manifest(&[source]).await.expect("manifest");
2931        let entry = manifest.file_entries().next().expect("file entry");
2932
2933        let decision = decide_resume(&dest, entry, manifest.chunk_size)
2934            .await
2935            .expect("decision");
2936
2937        assert!(matches!(decision, ResumeDecision::Conflict(_)));
2938    }
2939
2940    #[test]
2941    fn identity_is_loaded_after_generation() {
2942        let temp = tempfile::tempdir().expect("tempdir");
2943        let generated =
2944            NativeIdentity::load_or_generate_in(temp.path()).expect("identity generated");
2945        let loaded = NativeIdentity::load_or_generate_in(temp.path()).expect("identity loaded");
2946
2947        assert_eq!(generated.fingerprint(), loaded.fingerprint());
2948    }
2949
2950    #[test]
2951    fn app_config_round_trips_toml() {
2952        let temp = tempfile::tempdir().expect("tempdir");
2953        let path = temp.path().join("config.toml");
2954        let config = AppConfig {
2955            alias: "desk".to_string(),
2956            ..AppConfig::default()
2957        };
2958
2959        config.save_to_path(&path).expect("config saved");
2960        let loaded = AppConfig::load_or_default_from(&path).expect("config loaded");
2961
2962        assert_eq!(loaded.alias, "desk");
2963        assert_eq!(loaded.listen_port, 53317);
2964        assert!(loaded.trust.require_fingerprint);
2965    }
2966
2967    #[test]
2968    fn app_config_redacts_psk_for_display_output() {
2969        let config = AppConfig {
2970            trust: TrustConfig {
2971                psk: "super-secret-psk".to_string(),
2972                ..TrustConfig::default()
2973            },
2974            ..AppConfig::default()
2975        };
2976
2977        let redacted = config.to_redacted_toml_string().expect("config serializes");
2978
2979        assert!(!redacted.contains("super-secret-psk"));
2980        assert!(redacted.contains(r#"psk = "<redacted>""#));
2981    }
2982
2983    #[test]
2984    fn daemon_config_defaults_from_app_config_and_round_trips_toml() {
2985        let temp = tempfile::tempdir().expect("tempdir");
2986        let path = temp.path().join("daemon.toml");
2987        let app_config = AppConfig {
2988            quic_port: 54444,
2989            download_dir: temp.path().join("downloads").display().to_string(),
2990            ..AppConfig::default()
2991        };
2992
2993        let defaulted =
2994            DaemonConfig::load_or_default_from(&path, &app_config).expect("daemon config");
2995        assert_eq!(
2996            defaulted.listen,
2997            SocketAddr::from(([0, 0, 0, 0], app_config.quic_port))
2998        );
2999        assert_eq!(defaulted.destination, temp.path().join("downloads"));
3000
3001        let config = DaemonConfig {
3002            listen: SocketAddr::from(([127, 0, 0, 1], 53318)),
3003            destination: temp.path().join("daemon-dest"),
3004        };
3005        config.save_to_path(&path).expect("daemon config saved");
3006        let loaded =
3007            DaemonConfig::load_or_default_from(&path, &app_config).expect("daemon config loaded");
3008
3009        assert_eq!(loaded, config);
3010    }
3011
3012    #[test]
3013    fn trust_store_load_save_and_forget_round_trip() {
3014        let temp = tempfile::tempdir().expect("tempdir");
3015        let path = temp.path().join("known_peers.toml");
3016        let fingerprint = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
3017        let mut store = TrustStore::default();
3018
3019        store
3020            .trust_fingerprint(fingerprint)
3021            .expect("fingerprint trusted");
3022        store.save_to_path(&path).expect("trust store saved");
3023        let mut loaded = TrustStore::load_or_default_from(&path).expect("trust store loaded");
3024
3025        assert_eq!(loaded.records().count(), 1);
3026        assert_eq!(loaded.peers[fingerprint].trust_state, TrustState::Trusted);
3027        assert!(loaded.forget(fingerprint));
3028        assert_eq!(loaded.records().count(), 0);
3029    }
3030
3031    #[test]
3032    fn trust_store_rejects_short_fingerprint_without_discovery_lookup() {
3033        let mut store = TrustStore::default();
3034
3035        assert!(store.trust_fingerprint("9a4f2c").is_err());
3036    }
3037
3038    #[test]
3039    fn trust_store_pins_direct_address_to_fingerprint() {
3040        let fingerprint = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
3041        let address = SocketAddr::from(([192, 168, 1, 42], 53318));
3042        let mut store = TrustStore::default();
3043
3044        store
3045            .trust_direct_address(fingerprint, address)
3046            .expect("direct address trusted");
3047
3048        assert!(store.is_trusted_fingerprint(fingerprint));
3049        assert_eq!(
3050            store.trusted_fingerprint_for_address(address),
3051            Some(fingerprint)
3052        );
3053        assert!(store.peers[fingerprint].addresses.contains(&address));
3054        assert!(store.peers[fingerprint].transports.contains("quic"));
3055        assert_eq!(store.peers[fingerprint].trust_state, TrustState::Trusted);
3056    }
3057
3058    #[test]
3059    fn transfer_events_serialize_with_stable_event_names() {
3060        let event = TransferEvent::Progress {
3061            direction: TransferDirection::Send,
3062            file_name: "bundle/a.txt".to_string(),
3063            bytes_done: 5,
3064            bytes_total: 9,
3065        };
3066
3067        let json = serde_json::to_value(&event).expect("event serializes");
3068
3069        assert_eq!(json["event"], "progress");
3070        assert_eq!(json["direction"], "send");
3071        assert_eq!(json["file_name"], "bundle/a.txt");
3072        assert_eq!(json["bytes_done"], 5);
3073        assert_eq!(json["bytes_total"], 9);
3074    }
3075
3076    #[test]
3077    fn direct_protocol_hello_has_stable_golden_json() {
3078        let hello = DirectProtocolHello {
3079            protocol_version: 1,
3080            min_protocol_version: 1,
3081            client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3082                .to_string(),
3083            psk: false,
3084        };
3085
3086        let json = serde_json::to_string(&hello).expect("hello serializes");
3087
3088        assert_eq!(
3089            json,
3090            r#"{"protocol_version":1,"min_protocol_version":1,"client_fingerprint":"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef","psk":false}"#
3091        );
3092    }
3093
3094    #[test]
3095    fn direct_protocol_response_has_stable_golden_json() {
3096        let response = DirectProtocolResponse::accept(1);
3097
3098        let json = serde_json::to_string(&response).expect("response serializes");
3099
3100        assert_eq!(
3101            json,
3102            r#"{"accepted":true,"protocol_version":1,"psk_challenge":null,"error":null}"#
3103        );
3104    }
3105
3106    #[test]
3107    fn direct_protocol_decodes_pre_psk_negotiation_frames() {
3108        let hello: DirectProtocolHello = serde_json::from_str(
3109            r#"{"protocol_version":1,"min_protocol_version":1,"client_fingerprint":"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"}"#,
3110        )
3111        .expect("old hello decodes");
3112        assert!(!hello.psk);
3113
3114        let response: DirectProtocolResponse =
3115            serde_json::from_str(r#"{"accepted":true,"protocol_version":1,"error":null}"#)
3116                .expect("old response decodes");
3117        assert_eq!(response, DirectProtocolResponse::accept(1));
3118    }
3119
3120    #[test]
3121    fn direct_protocol_negotiates_supported_overlap() {
3122        let hello = DirectProtocolHello {
3123            protocol_version: 3,
3124            min_protocol_version: 1,
3125            client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3126                .to_string(),
3127            psk: false,
3128        };
3129
3130        let response = negotiate_direct_protocol(&hello, None);
3131
3132        assert_eq!(response, DirectProtocolResponse::accept(1));
3133    }
3134
3135    #[test]
3136    fn direct_protocol_rejects_incompatible_versions() {
3137        let too_new = DirectProtocolHello {
3138            protocol_version: 3,
3139            min_protocol_version: 2,
3140            client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3141                .to_string(),
3142            psk: false,
3143        };
3144        let too_old = DirectProtocolHello {
3145            protocol_version: 0,
3146            min_protocol_version: 0,
3147            client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3148                .to_string(),
3149            psk: false,
3150        };
3151
3152        assert!(!negotiate_direct_protocol(&too_new, None).accepted);
3153        assert!(!negotiate_direct_protocol(&too_old, None).accepted);
3154    }
3155
3156    #[test]
3157    fn direct_protocol_requires_matching_psk_mode() {
3158        let no_psk = DirectProtocolHello {
3159            protocol_version: 1,
3160            min_protocol_version: 1,
3161            client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3162                .to_string(),
3163            psk: false,
3164        };
3165        let with_psk = DirectProtocolHello {
3166            psk: true,
3167            ..no_psk.clone()
3168        };
3169
3170        assert!(!negotiate_direct_protocol(&no_psk, Some("secret")).accepted);
3171        assert!(!negotiate_direct_protocol(&with_psk, None).accepted);
3172
3173        let accepted = negotiate_direct_protocol(&with_psk, Some("secret"));
3174        assert!(accepted.accepted);
3175        assert!(accepted.psk_challenge.is_some());
3176    }
3177
3178    #[test]
3179    fn transfer_manifest_has_stable_golden_json() {
3180        let manifest = TransferManifest {
3181            version: MANIFEST_VERSION,
3182            session_id: Uuid::parse_str("00000000-0000-0000-0000-000000000001")
3183                .expect("uuid parses"),
3184            chunk_size: DEFAULT_CHUNK_SIZE,
3185            entries: vec![ManifestEntry {
3186                relative_path: PathBuf::from("bundle/a.txt"),
3187                kind: ManifestEntryKind::File,
3188                size: 5,
3189                blake3: Some(
3190                    "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef".to_string(),
3191                ),
3192                chunks: vec![
3193                    "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789".to_string(),
3194                ],
3195                unix_mode: Some(0o100644),
3196                modified_unix_ms: Some(1_700_000_000_000),
3197            }],
3198        };
3199
3200        let json = serde_json::to_string(&manifest).expect("manifest serializes");
3201
3202        assert_eq!(
3203            json,
3204            r#"{"version":1,"session_id":"00000000-0000-0000-0000-000000000001","chunk_size":1048576,"entries":[{"relative_path":"bundle/a.txt","kind":"file","size":5,"blake3":"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef","chunks":["abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789"],"unix_mode":33188,"modified_unix_ms":1700000000000}]}"#
3205        );
3206    }
3207
3208    #[test]
3209    fn peer_registry_merges_observations_by_fingerprint() {
3210        let mut registry = PeerRegistry::new();
3211        let fingerprint = "abcdef1234567890";
3212        let first = PeerObservation::new(
3213            fingerprint,
3214            SocketAddr::from(([192, 168, 1, 10], 53318)),
3215            100,
3216        )
3217        .with_alias("desk")
3218        .with_hostname("desk.local")
3219        .with_transport("quic");
3220        let second = PeerObservation::new(
3221            fingerprint,
3222            SocketAddr::from(([192, 168, 1, 11], 53318)),
3223            200,
3224        )
3225        .with_alias("workstation")
3226        .with_transport("quic");
3227
3228        registry.observe(first).expect("first observation");
3229        registry.observe(second).expect("second observation");
3230        let record = registry
3231            .get_by_fingerprint(fingerprint)
3232            .expect("record by fingerprint");
3233
3234        assert_eq!(registry.records().count(), 1);
3235        assert!(record.aliases.contains("desk"));
3236        assert!(record.aliases.contains("workstation"));
3237        assert!(record.hostnames.contains("desk.local"));
3238        assert_eq!(record.addresses.len(), 2);
3239        assert_eq!(record.first_seen_unix_ms, 100);
3240        assert_eq!(record.last_seen_unix_ms, 200);
3241    }
3242
3243    #[test]
3244    fn peer_registry_looks_up_unique_fingerprint_prefix_alias_and_hostname() {
3245        let mut registry = PeerRegistry::new();
3246        registry
3247            .observe(
3248                PeerObservation::new(
3249                    "abcdef1234567890",
3250                    SocketAddr::from(([192, 168, 1, 10], 53318)),
3251                    100,
3252                )
3253                .with_alias("desk")
3254                .with_hostname("desk.local"),
3255            )
3256            .expect("first observation");
3257        registry
3258            .observe(
3259                PeerObservation::new(
3260                    "123456abcdef7890",
3261                    SocketAddr::from(([192, 168, 1, 20], 53318)),
3262                    100,
3263                )
3264                .with_alias("laptop"),
3265            )
3266            .expect("second observation");
3267
3268        assert_eq!(
3269            registry
3270                .lookup("abcdef")
3271                .map(|record| record.fingerprint.as_str()),
3272            Some("abcdef1234567890")
3273        );
3274        assert_eq!(
3275            registry
3276                .lookup("desk")
3277                .map(|record| record.fingerprint.as_str()),
3278            Some("abcdef1234567890")
3279        );
3280        assert_eq!(
3281            registry
3282                .lookup("desk.local")
3283                .map(|record| record.fingerprint.as_str()),
3284            Some("abcdef1234567890")
3285        );
3286        assert!(registry.lookup("missing").is_none());
3287    }
3288
3289    #[test]
3290    fn peer_registry_rejects_ambiguous_lookup_hints() {
3291        let mut registry = PeerRegistry::new();
3292        registry
3293            .observe(
3294                PeerObservation::new(
3295                    "abcdef1234567890",
3296                    SocketAddr::from(([192, 168, 1, 10], 53318)),
3297                    100,
3298                )
3299                .with_alias("desk"),
3300            )
3301            .expect("first observation");
3302        registry
3303            .observe(
3304                PeerObservation::new(
3305                    "abcdef9999999999",
3306                    SocketAddr::from(([192, 168, 1, 11], 53318)),
3307                    100,
3308                )
3309                .with_alias("desk"),
3310            )
3311            .expect("second observation");
3312
3313        assert!(registry.lookup("abcdef").is_none());
3314        assert!(registry.lookup("desk").is_none());
3315        assert_eq!(
3316            registry
3317                .lookup("abcdef1234567890")
3318                .map(|record| record.fingerprint.as_str()),
3319            Some("abcdef1234567890")
3320        );
3321        match registry.lookup_detail("desk") {
3322            PeerLookup::Ambiguous(records) => assert_eq!(records.len(), 2),
3323            other => panic!("expected ambiguous duplicate-alias lookup, got {other:?}"),
3324        }
3325        assert!(matches!(
3326            registry.lookup_detail("missing"),
3327            PeerLookup::Missing
3328        ));
3329    }
3330
3331    #[test]
3332    fn native_announcement_builds_expected_txt_properties() {
3333        let announcement = NativeAnnouncement {
3334            alias: "Stephen MBP".to_string(),
3335            fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3336                .to_string(),
3337            quic_port: 53318,
3338            listen_port: Some(53317),
3339        };
3340
3341        let service = announcement.service_info().expect("service info");
3342
3343        assert_eq!(service.get_type(), NATIVE_SERVICE_TYPE);
3344        assert!(
3345            service
3346                .get_fullname()
3347                .starts_with("stephen-mbp-0123456789ab.")
3348        );
3349        assert_eq!(service.get_property_val_str("pv"), Some("1"));
3350        assert_eq!(service.get_property_val_str("minpv"), Some("1"));
3351        assert_eq!(
3352            service.get_property_val_str("fp"),
3353            Some("0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")
3354        );
3355        assert_eq!(service.get_property_val_str("alias"), Some("Stephen MBP"));
3356        assert_eq!(service.get_property_val_str("transports"), Some("quic"));
3357        assert_eq!(service.get_property_val_str("quic_port"), Some("53318"));
3358        assert_eq!(service.get_property_val_str("listen_port"), Some("53317"));
3359    }
3360
3361    #[test]
3362    fn resolved_native_service_merges_into_registry() {
3363        let properties = [
3364            ("pv", "1"),
3365            ("minpv", "1"),
3366            (
3367                "fp",
3368                "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
3369            ),
3370            ("alias", "desk"),
3371            ("transports", "quic"),
3372            ("quic_port", "53318"),
3373        ];
3374        let service = ServiceInfo::new(
3375            NATIVE_SERVICE_TYPE,
3376            "desk-0123456789ab",
3377            "desk.local.",
3378            "192.168.1.42",
3379            53318,
3380            &properties[..],
3381        )
3382        .expect("service info")
3383        .as_resolved_service();
3384        let mut registry = PeerRegistry::new();
3385
3386        observe_resolved_service(&mut registry, &service).expect("observation");
3387
3388        let record = registry.lookup("desk").expect("record by alias");
3389        assert_eq!(
3390            record.fingerprint,
3391            "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3392        );
3393        assert_eq!(
3394            record.preferred_quic_address(),
3395            Some(SocketAddr::from(([192, 168, 1, 42], 53318)))
3396        );
3397    }
3398
3399    #[test]
3400    fn resolved_native_service_ignores_incompatible_protocol_ranges() {
3401        let properties = [
3402            ("pv", "3"),
3403            ("minpv", "2"),
3404            (
3405                "fp",
3406                "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
3407            ),
3408            ("alias", "desk"),
3409            ("transports", "quic"),
3410            ("quic_port", "53318"),
3411        ];
3412        let service = ServiceInfo::new(
3413            NATIVE_SERVICE_TYPE,
3414            "desk-0123456789ab",
3415            "desk.local.",
3416            "192.168.1.42",
3417            53318,
3418            &properties[..],
3419        )
3420        .expect("service info")
3421        .as_resolved_service();
3422        let mut registry = PeerRegistry::new();
3423
3424        observe_resolved_service(&mut registry, &service).expect("observation");
3425
3426        assert_eq!(registry.records().count(), 0);
3427    }
3428
3429    #[tokio::test]
3430    async fn direct_quic_loopback_sends_one_file() {
3431        let source_dir = tempfile::tempdir().expect("source tempdir");
3432        let dest_dir = tempfile::tempdir().expect("dest tempdir");
3433        let identity_dir = tempfile::tempdir().expect("identity tempdir");
3434        let file_path = source_dir.path().join("hello.txt");
3435        tokio::fs::write(&file_path, b"hello from ferry")
3436            .await
3437            .expect("source file written");
3438
3439        let identity =
3440            NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3441        let recv_identity = identity.clone();
3442        let reserved_socket =
3443            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3444        let recv_addr = reserved_socket.local_addr().expect("local addr");
3445        drop(reserved_socket);
3446        let receive_progress = Arc::new(Mutex::new(Vec::new()));
3447        let receive_progress_log = receive_progress.clone();
3448        let dest_path = dest_dir.path().to_path_buf();
3449        let server = tokio::spawn(async move {
3450            receive_direct_file(
3451                &ReceiveRequest::new(recv_addr),
3452                dest_path,
3453                &recv_identity,
3454                |event| {
3455                    receive_progress_log
3456                        .lock()
3457                        .expect("progress lock")
3458                        .push(event);
3459                },
3460            )
3461            .await
3462        });
3463
3464        tokio::time::sleep(Duration::from_millis(100)).await;
3465
3466        let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3467        let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
3468        let send_progress = Arc::new(Mutex::new(Vec::new()));
3469        let send_progress_log = send_progress.clone();
3470        let send_summary = send_direct_file(&send_request, &identity, |event| {
3471            send_progress_log.lock().expect("progress lock").push(event);
3472        })
3473        .await
3474        .expect("file sent");
3475        let receive_summary = server.await.expect("server joined").expect("file received");
3476
3477        let received = tokio::fs::read(dest_dir.path().join("hello.txt"))
3478            .await
3479            .expect("received file readable");
3480        assert_eq!(received, b"hello from ferry");
3481        assert_eq!(send_summary.bytes, receive_summary.bytes);
3482        assert_eq!(send_summary.blake3, receive_summary.blake3);
3483        let send_progress = send_progress.lock().expect("progress lock");
3484        let receive_progress = receive_progress.lock().expect("progress lock");
3485        assert!(matches!(
3486            send_progress.first(),
3487            Some(TransferEvent::SessionStarted {
3488                direction: TransferDirection::Send,
3489                ..
3490            })
3491        ));
3492        assert!(
3493            send_progress
3494                .iter()
3495                .any(|event| matches!(event, TransferEvent::Progress { .. }))
3496        );
3497        assert!(
3498            send_progress
3499                .iter()
3500                .any(|event| matches!(event, TransferEvent::SessionFinished { .. }))
3501        );
3502        assert!(matches!(
3503            receive_progress.first(),
3504            Some(TransferEvent::SessionStarted {
3505                direction: TransferDirection::Receive,
3506                ..
3507            })
3508        ));
3509        assert!(
3510            receive_progress
3511                .iter()
3512                .any(|event| matches!(event, TransferEvent::Progress { .. }))
3513        );
3514        assert!(
3515            receive_progress
3516                .iter()
3517                .any(|event| matches!(event, TransferEvent::SessionFinished { .. }))
3518        );
3519    }
3520
3521    #[tokio::test]
3522    async fn direct_transfer_accepts_expected_receiver_fingerprint() {
3523        let source_dir = tempfile::tempdir().expect("source tempdir");
3524        let dest_dir = tempfile::tempdir().expect("dest tempdir");
3525        let sender_identity_dir = tempfile::tempdir().expect("sender identity tempdir");
3526        let receiver_identity_dir = tempfile::tempdir().expect("receiver identity tempdir");
3527        let file_path = source_dir.path().join("hello.txt");
3528        tokio::fs::write(&file_path, b"hello from ferry")
3529            .await
3530            .expect("source file written");
3531
3532        let sender_identity = NativeIdentity::load_or_generate_in(sender_identity_dir.path())
3533            .expect("sender identity");
3534        let receiver_identity = NativeIdentity::load_or_generate_in(receiver_identity_dir.path())
3535            .expect("receiver identity");
3536        let expected_fingerprint = receiver_identity.fingerprint().to_string();
3537        let reserved_socket =
3538            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3539        let recv_addr = reserved_socket.local_addr().expect("local addr");
3540        drop(reserved_socket);
3541        let dest_path = dest_dir.path().to_path_buf();
3542        let server = tokio::spawn(async move {
3543            receive_direct_file(
3544                &ReceiveRequest::new(recv_addr),
3545                dest_path,
3546                &receiver_identity,
3547                |_| {},
3548            )
3549            .await
3550        });
3551
3552        tokio::time::sleep(Duration::from_millis(100)).await;
3553
3554        let peer = DirectPeer::from_address(recv_addr)
3555            .with_expected_fingerprint(expected_fingerprint)
3556            .expect("expected fingerprint accepted");
3557        let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
3558        let send_summary = send_direct_file(&send_request, &sender_identity, |_| {})
3559            .await
3560            .expect("file sent");
3561        let receive_summary = server.await.expect("server joined").expect("file received");
3562
3563        let received = tokio::fs::read(dest_dir.path().join("hello.txt"))
3564            .await
3565            .expect("received file readable");
3566        assert_eq!(received, b"hello from ferry");
3567        assert_eq!(send_summary.bytes, receive_summary.bytes);
3568        assert_eq!(send_summary.blake3, receive_summary.blake3);
3569    }
3570
3571    #[tokio::test]
3572    async fn direct_transfer_confirms_unpinned_receiver_before_payload() {
3573        let source_dir = tempfile::tempdir().expect("source tempdir");
3574        let dest_dir = tempfile::tempdir().expect("dest tempdir");
3575        let sender_identity_dir = tempfile::tempdir().expect("sender identity tempdir");
3576        let receiver_identity_dir = tempfile::tempdir().expect("receiver identity tempdir");
3577        let file_path = source_dir.path().join("hello.txt");
3578        tokio::fs::write(&file_path, b"hello from ferry")
3579            .await
3580            .expect("source file written");
3581
3582        let sender_identity = NativeIdentity::load_or_generate_in(sender_identity_dir.path())
3583            .expect("sender identity");
3584        let receiver_identity = NativeIdentity::load_or_generate_in(receiver_identity_dir.path())
3585            .expect("receiver identity");
3586        let expected_fingerprint = receiver_identity.fingerprint().to_string();
3587        let reserved_socket =
3588            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3589        let recv_addr = reserved_socket.local_addr().expect("local addr");
3590        drop(reserved_socket);
3591        let dest_path = dest_dir.path().to_path_buf();
3592        let server = tokio::spawn(async move {
3593            receive_direct_file(
3594                &ReceiveRequest::new(recv_addr),
3595                dest_path,
3596                &receiver_identity,
3597                |_| {},
3598            )
3599            .await
3600        });
3601
3602        tokio::time::sleep(Duration::from_millis(100)).await;
3603
3604        let peer = DirectPeer::from_address(recv_addr);
3605        let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
3606        let callback_seen = Arc::new(AtomicBool::new(false));
3607        let callback_seen_send = callback_seen.clone();
3608        let send_summary = send_direct_file_with_control_and_trust(
3609            &send_request,
3610            &sender_identity,
3611            TransferControl::new(),
3612            |_| {},
3613            |fingerprint| {
3614                callback_seen_send.store(true, Ordering::SeqCst);
3615                assert_eq!(fingerprint, expected_fingerprint);
3616                Ok(())
3617            },
3618        )
3619        .await
3620        .expect("file sent after trust callback");
3621        let receive_summary = server.await.expect("server joined").expect("file received");
3622
3623        assert!(callback_seen.load(Ordering::SeqCst));
3624        assert_eq!(send_summary.bytes, receive_summary.bytes);
3625        assert_eq!(send_summary.blake3, receive_summary.blake3);
3626    }
3627
3628    #[tokio::test]
3629    async fn direct_transfer_rejects_unexpected_receiver_fingerprint_before_payload() {
3630        let source_dir = tempfile::tempdir().expect("source tempdir");
3631        let dest_dir = tempfile::tempdir().expect("dest tempdir");
3632        let sender_identity_dir = tempfile::tempdir().expect("sender identity tempdir");
3633        let receiver_identity_dir = tempfile::tempdir().expect("receiver identity tempdir");
3634        let other_identity_dir = tempfile::tempdir().expect("other identity tempdir");
3635        let file_path = source_dir.path().join("hello.txt");
3636        tokio::fs::write(&file_path, b"hello from ferry")
3637            .await
3638            .expect("source file written");
3639
3640        let sender_identity = NativeIdentity::load_or_generate_in(sender_identity_dir.path())
3641            .expect("sender identity");
3642        let receiver_identity = NativeIdentity::load_or_generate_in(receiver_identity_dir.path())
3643            .expect("receiver identity");
3644        let other_identity =
3645            NativeIdentity::load_or_generate_in(other_identity_dir.path()).expect("other identity");
3646        let reserved_socket =
3647            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3648        let recv_addr = reserved_socket.local_addr().expect("local addr");
3649        drop(reserved_socket);
3650        let dest_path = dest_dir.path().to_path_buf();
3651        let server = tokio::spawn(async move {
3652            receive_direct_file(
3653                &ReceiveRequest::new(recv_addr),
3654                dest_path,
3655                &receiver_identity,
3656                |_| {},
3657            )
3658            .await
3659        });
3660
3661        tokio::time::sleep(Duration::from_millis(100)).await;
3662
3663        let peer = DirectPeer::from_address(recv_addr)
3664            .with_expected_fingerprint(other_identity.fingerprint().to_string())
3665            .expect("expected fingerprint accepted");
3666        let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
3667        let error = send_direct_file(&send_request, &sender_identity, |_| {})
3668            .await
3669            .expect_err("fingerprint mismatch should reject send");
3670
3671        assert!(matches!(error, Error::PeerFingerprintMismatch { .. }));
3672        let server_error = server
3673            .await
3674            .expect("server joined")
3675            .expect_err("server closed");
3676        assert!(matches!(
3677            server_error,
3678            Error::Connection(_) | Error::NoIncomingConnection
3679        ));
3680        assert!(
3681            !tokio::fs::try_exists(dest_dir.path().join("hello.txt"))
3682                .await
3683                .expect("destination checked")
3684        );
3685    }
3686
3687    #[tokio::test]
3688    async fn direct_transfer_accepts_matching_psk_before_payload() {
3689        let source_dir = tempfile::tempdir().expect("source tempdir");
3690        let dest_dir = tempfile::tempdir().expect("dest tempdir");
3691        let identity_dir = tempfile::tempdir().expect("identity tempdir");
3692        let file_path = source_dir.path().join("hello.txt");
3693        tokio::fs::write(&file_path, b"hello from ferry")
3694            .await
3695            .expect("source file written");
3696
3697        let identity =
3698            NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3699        let recv_identity = identity.clone();
3700        let reserved_socket =
3701            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3702        let recv_addr = reserved_socket.local_addr().expect("local addr");
3703        drop(reserved_socket);
3704        let dest_path = dest_dir.path().to_path_buf();
3705        let receive_request = ReceiveRequest::new(recv_addr)
3706            .with_psk("correct horse battery staple")
3707            .expect("receiver psk accepted");
3708        let server = tokio::spawn(async move {
3709            receive_direct_file(&receive_request, dest_path, &recv_identity, |_| {}).await
3710        });
3711
3712        tokio::time::sleep(Duration::from_millis(100)).await;
3713
3714        let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3715        let send_request = SendRequest::new(peer, vec![file_path])
3716            .expect("send request is valid")
3717            .with_psk("correct horse battery staple")
3718            .expect("sender psk accepted");
3719        let send_summary = send_direct_file(&send_request, &identity, |_| {})
3720            .await
3721            .expect("file sent");
3722        let receive_summary = server.await.expect("server joined").expect("file received");
3723
3724        let received = tokio::fs::read(dest_dir.path().join("hello.txt"))
3725            .await
3726            .expect("received file readable");
3727        assert_eq!(received, b"hello from ferry");
3728        assert_eq!(send_summary.bytes, receive_summary.bytes);
3729    }
3730
3731    #[tokio::test]
3732    async fn direct_transfer_rejects_missing_psk_before_payload() {
3733        let source_dir = tempfile::tempdir().expect("source tempdir");
3734        let dest_dir = tempfile::tempdir().expect("dest tempdir");
3735        let identity_dir = tempfile::tempdir().expect("identity tempdir");
3736        let file_path = source_dir.path().join("hello.txt");
3737        tokio::fs::write(&file_path, b"hello from ferry")
3738            .await
3739            .expect("source file written");
3740
3741        let identity =
3742            NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3743        let recv_identity = identity.clone();
3744        let reserved_socket =
3745            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3746        let recv_addr = reserved_socket.local_addr().expect("local addr");
3747        drop(reserved_socket);
3748        let dest_path = dest_dir.path().to_path_buf();
3749        let receive_request = ReceiveRequest::new(recv_addr)
3750            .with_psk("receiver secret")
3751            .expect("receiver psk accepted");
3752        let server = tokio::spawn(async move {
3753            receive_direct_file(&receive_request, dest_path, &recv_identity, |_| {}).await
3754        });
3755
3756        tokio::time::sleep(Duration::from_millis(100)).await;
3757
3758        let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3759        let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
3760        let error = send_direct_file(&send_request, &identity, |_| {})
3761            .await
3762            .expect_err("missing psk should reject send");
3763
3764        assert!(matches!(error, Error::Protocol(_)));
3765        let _ = server.await.expect("server joined");
3766        assert!(
3767            !tokio::fs::try_exists(dest_dir.path().join("hello.txt"))
3768                .await
3769                .expect("destination checked")
3770        );
3771    }
3772
3773    #[tokio::test]
3774    async fn direct_transfer_rejects_wrong_psk_before_payload() {
3775        let source_dir = tempfile::tempdir().expect("source tempdir");
3776        let dest_dir = tempfile::tempdir().expect("dest tempdir");
3777        let identity_dir = tempfile::tempdir().expect("identity tempdir");
3778        let file_path = source_dir.path().join("hello.txt");
3779        tokio::fs::write(&file_path, b"hello from ferry")
3780            .await
3781            .expect("source file written");
3782
3783        let identity =
3784            NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3785        let recv_identity = identity.clone();
3786        let reserved_socket =
3787            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3788        let recv_addr = reserved_socket.local_addr().expect("local addr");
3789        drop(reserved_socket);
3790        let dest_path = dest_dir.path().to_path_buf();
3791        let receive_request = ReceiveRequest::new(recv_addr)
3792            .with_psk("receiver secret")
3793            .expect("receiver psk accepted");
3794        let server = tokio::spawn(async move {
3795            receive_direct_file(&receive_request, dest_path, &recv_identity, |_| {}).await
3796        });
3797
3798        tokio::time::sleep(Duration::from_millis(100)).await;
3799
3800        let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3801        let send_request = SendRequest::new(peer, vec![file_path])
3802            .expect("send request is valid")
3803            .with_psk("wrong secret")
3804            .expect("sender psk accepted");
3805        let error = send_direct_file(&send_request, &identity, |_| {})
3806            .await
3807            .expect_err("wrong psk should reject send");
3808
3809        assert!(matches!(error, Error::Authentication(_)));
3810        assert!(!error.to_string().contains("wrong secret"));
3811        assert!(!error.to_string().contains("receiver secret"));
3812        let server_error = server
3813            .await
3814            .expect("server joined")
3815            .expect_err("server should reject wrong psk");
3816        assert!(matches!(
3817            server_error,
3818            Error::Authentication(_) | Error::Write(_)
3819        ));
3820        assert!(
3821            !tokio::fs::try_exists(dest_dir.path().join("hello.txt"))
3822                .await
3823                .expect("destination checked")
3824        );
3825    }
3826
3827    #[tokio::test]
3828    async fn direct_quic_loopback_sends_directory_and_resumes_partial_file() {
3829        let source_dir = tempfile::tempdir().expect("source tempdir");
3830        let dest_dir = tempfile::tempdir().expect("dest tempdir");
3831        let identity_dir = tempfile::tempdir().expect("identity tempdir");
3832        let bundle = source_dir.path().join("bundle");
3833        let nested = bundle.join("nested");
3834        tokio::fs::create_dir_all(&nested)
3835            .await
3836            .expect("source dirs");
3837        tokio::fs::write(nested.join("small.txt"), b"small")
3838            .await
3839            .expect("small file");
3840        let large_bytes = vec![7; (DEFAULT_CHUNK_SIZE as usize * 2) + 17];
3841        tokio::fs::write(bundle.join("large.bin"), &large_bytes)
3842            .await
3843            .expect("large file");
3844
3845        let dest_bundle = dest_dir.path().join("bundle");
3846        tokio::fs::create_dir_all(&dest_bundle)
3847            .await
3848            .expect("dest bundle");
3849        tokio::fs::write(
3850            dest_bundle.join("large.bin"),
3851            &large_bytes[..DEFAULT_CHUNK_SIZE as usize + 5],
3852        )
3853        .await
3854        .expect("partial large");
3855
3856        let identity =
3857            NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3858        let recv_identity = identity.clone();
3859        let reserved_socket =
3860            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3861        let recv_addr = reserved_socket.local_addr().expect("local addr");
3862        drop(reserved_socket);
3863        let dest_path = dest_dir.path().to_path_buf();
3864        let server = tokio::spawn(async move {
3865            receive_direct_file(
3866                &ReceiveRequest::new(recv_addr),
3867                dest_path,
3868                &recv_identity,
3869                |_| {},
3870            )
3871            .await
3872        });
3873
3874        tokio::time::sleep(Duration::from_millis(100)).await;
3875
3876        let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3877        let send_request = SendRequest::new(peer, vec![bundle]).expect("send request is valid");
3878        let send_summary = send_direct_file(&send_request, &identity, |_| {})
3879            .await
3880            .expect("directory sent");
3881        let receive_summary = server
3882            .await
3883            .expect("server joined")
3884            .expect("directory received");
3885
3886        let received_large = tokio::fs::read(dest_bundle.join("large.bin"))
3887            .await
3888            .expect("large file readable");
3889        let received_small = tokio::fs::read(dest_bundle.join("nested/small.txt"))
3890            .await
3891            .expect("small file readable");
3892        assert_eq!(received_large, large_bytes);
3893        assert_eq!(received_small, b"small");
3894        assert_eq!(send_summary.bytes, receive_summary.bytes);
3895        assert!(
3896            receive_summary
3897                .files
3898                .iter()
3899                .any(|file| file.status == TransferFileStatus::Resumed)
3900        );
3901    }
3902
3903    #[tokio::test]
3904    async fn direct_quic_loopback_cancels_send_and_does_not_finalize_partial_file() {
3905        let source_dir = tempfile::tempdir().expect("source tempdir");
3906        let dest_dir = tempfile::tempdir().expect("dest tempdir");
3907        let identity_dir = tempfile::tempdir().expect("identity tempdir");
3908        let file_path = source_dir.path().join("payload.bin");
3909        tokio::fs::write(&file_path, vec![3; IO_BUFFER_SIZE * 64])
3910            .await
3911            .expect("source file written");
3912
3913        let identity =
3914            NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3915        let recv_identity = identity.clone();
3916        let reserved_socket =
3917            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3918        let recv_addr = reserved_socket.local_addr().expect("local addr");
3919        drop(reserved_socket);
3920
3921        let dest_path = dest_dir.path().to_path_buf();
3922        let server = tokio::spawn(async move {
3923            receive_direct_file(
3924                &ReceiveRequest::new(recv_addr),
3925                dest_path,
3926                &recv_identity,
3927                |_| {},
3928            )
3929            .await
3930        });
3931
3932        tokio::time::sleep(Duration::from_millis(100)).await;
3933
3934        let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3935        let send_request =
3936            SendRequest::new(peer, vec![file_path.clone()]).expect("send request is valid");
3937        let send_control = TransferControl::new();
3938        let cancel_on_progress = send_control.clone();
3939        let send_events = Arc::new(Mutex::new(Vec::new()));
3940        let send_events_log = send_events.clone();
3941        let send_result =
3942            send_direct_file_with_control(&send_request, &identity, send_control, |event| {
3943                if matches!(event, TransferEvent::Progress { .. }) {
3944                    cancel_on_progress.cancel();
3945                }
3946                send_events_log.lock().expect("events lock").push(event);
3947            })
3948            .await;
3949
3950        assert!(matches!(send_result, Err(Error::TransferCancelled)));
3951
3952        let receive_result = tokio::time::timeout(Duration::from_secs(5), server)
3953            .await
3954            .expect("receiver should finish after sender cancellation")
3955            .expect("server joined");
3956        assert!(receive_result.is_err());
3957
3958        assert!(!dest_dir.path().join("payload.bin").exists());
3959        assert!(
3960            send_events
3961                .lock()
3962                .expect("events lock")
3963                .iter()
3964                .any(|event| matches!(event, TransferEvent::SessionCancelled { .. }))
3965        );
3966    }
3967}