Skip to main content

ferry_core/
lib.rs

1pub mod error;
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::fs::Metadata;
5use std::io::SeekFrom;
6use std::net::SocketAddr;
7use std::path::{Component, Path, PathBuf};
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::{Arc, Mutex};
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12use directories::BaseDirs;
13use mdns_sd::{ResolvedService, ServiceDaemon, ServiceEvent, ServiceInfo};
14use quinn::crypto::rustls::QuicClientConfig;
15use rcgen::{CertifiedKey, generate_simple_self_signed};
16use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier};
17use rustls::crypto::{CryptoProvider, verify_tls12_signature, verify_tls13_signature};
18use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer, ServerName, UnixTime};
19use rustls::{DigitallySignedStruct, SignatureScheme};
20use serde::{Deserialize, Serialize};
21use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
22use uuid::Uuid;
23
24pub use error::{Error, Result};
25
26pub const NATIVE_PROTOCOL_VERSION: u16 = 1;
27pub const MIN_NATIVE_PROTOCOL_VERSION: u16 = 1;
28pub const NATIVE_SERVICE_TYPE: &str = "_ferry._udp.local.";
29
30const DIRECT_MAGIC: &[u8; 8] = b"FERRY01\n";
31const MAX_FRAME_LEN: usize = 16 * 1024 * 1024;
32const IO_BUFFER_SIZE: usize = 64 * 1024;
33const MANIFEST_VERSION: u16 = 1;
34const DEFAULT_CHUNK_SIZE: u64 = 1024 * 1024;
35const CONFIG_DIR_NAME: &str = "ferry";
36const PSK_PROOF_CONTEXT: &[u8] = b"fileferry-direct-psk-v1";
37
38#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
39pub struct DirectPeer {
40    address: SocketAddr,
41    expected_fingerprint: Option<String>,
42}
43
44impl DirectPeer {
45    pub fn parse(value: &str) -> Result<Self> {
46        Ok(Self {
47            address: value.parse()?,
48            expected_fingerprint: None,
49        })
50    }
51
52    pub const fn address(&self) -> SocketAddr {
53        self.address
54    }
55
56    pub const fn from_address(address: SocketAddr) -> Self {
57        Self {
58            address,
59            expected_fingerprint: None,
60        }
61    }
62
63    pub fn with_expected_fingerprint(mut self, fingerprint: impl Into<String>) -> Result<Self> {
64        self.expected_fingerprint = Some(normalized_fingerprint(fingerprint.into())?);
65        Ok(self)
66    }
67
68    pub fn expected_fingerprint(&self) -> Option<&str> {
69        self.expected_fingerprint.as_deref()
70    }
71}
72
73#[derive(Clone, PartialEq, Eq, Deserialize)]
74pub struct PskSecret(String);
75
76impl PskSecret {
77    pub fn new(psk: impl Into<String>) -> Result<Self> {
78        validate_psk(psk.into()).map(Self)
79    }
80
81    fn expose_secret(&self) -> &str {
82        &self.0
83    }
84}
85
86impl std::fmt::Debug for PskSecret {
87    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88        formatter.write_str("PskSecret(<redacted>)")
89    }
90}
91
92impl Serialize for PskSecret {
93    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
94    where
95        S: serde::Serializer,
96    {
97        serializer.serialize_str("<redacted>")
98    }
99}
100
101#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
102pub struct SendRequest {
103    peer: DirectPeer,
104    paths: Vec<PathBuf>,
105    #[serde(default, skip_serializing)]
106    psk: Option<PskSecret>,
107}
108
109impl SendRequest {
110    pub fn new(peer: DirectPeer, paths: Vec<PathBuf>) -> Result<Self> {
111        if paths.is_empty() {
112            return Err(Error::InvalidInput(
113                "at least one file path is required".to_string(),
114            ));
115        }
116
117        Ok(Self {
118            peer,
119            paths,
120            psk: None,
121        })
122    }
123
124    pub const fn peer(&self) -> &DirectPeer {
125        &self.peer
126    }
127
128    pub fn paths(&self) -> &[PathBuf] {
129        &self.paths
130    }
131
132    pub fn with_psk(mut self, psk: impl Into<String>) -> Result<Self> {
133        self.psk = Some(PskSecret::new(psk)?);
134        Ok(self)
135    }
136
137    pub fn psk(&self) -> Option<&str> {
138        self.psk.as_ref().map(PskSecret::expose_secret)
139    }
140}
141
142#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
143pub struct ReceiveRequest {
144    listen: SocketAddr,
145    #[serde(default, skip_serializing)]
146    psk: Option<PskSecret>,
147}
148
149impl ReceiveRequest {
150    pub fn new(listen: SocketAddr) -> Self {
151        Self { listen, psk: None }
152    }
153
154    pub const fn listen(&self) -> SocketAddr {
155        self.listen
156    }
157
158    pub fn with_psk(mut self, psk: impl Into<String>) -> Result<Self> {
159        self.psk = Some(PskSecret::new(psk)?);
160        Ok(self)
161    }
162
163    pub fn psk(&self) -> Option<&str> {
164        self.psk.as_ref().map(PskSecret::expose_secret)
165    }
166}
167
168#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
169pub struct AppConfig {
170    pub alias: String,
171    pub listen_port: u16,
172    pub quic_port: u16,
173    pub download_dir: String,
174    pub auto_accept_known: bool,
175    pub auto_accept_unknown: bool,
176    pub discovery: Vec<String>,
177    pub max_concurrent_files: usize,
178    pub trust: TrustConfig,
179}
180
181impl Default for AppConfig {
182    fn default() -> Self {
183        Self {
184            alias: default_alias(),
185            listen_port: 53317,
186            quic_port: 53318,
187            download_dir: "~/Downloads/ferry".to_string(),
188            auto_accept_known: true,
189            auto_accept_unknown: false,
190            discovery: vec!["native".to_string()],
191            max_concurrent_files: 8,
192            trust: TrustConfig::default(),
193        }
194    }
195}
196
197impl AppConfig {
198    pub fn load_or_default() -> Result<Self> {
199        Self::load_or_default_from(config_dir()?.join("config.toml"))
200    }
201
202    pub fn load_or_default_from(path: impl AsRef<Path>) -> Result<Self> {
203        let path = path.as_ref();
204        match std::fs::read_to_string(path) {
205            Ok(contents) => {
206                toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
207            }
208            Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Self::default()),
209            Err(source) => Err(Error::IoPath {
210                path: path.to_path_buf(),
211                source,
212            }),
213        }
214    }
215
216    pub fn save_default_path(&self) -> Result<()> {
217        self.save_to_path(config_dir()?.join("config.toml"))
218    }
219
220    pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
221        write_toml_file(path.as_ref(), self)
222    }
223
224    pub fn to_toml_string(&self) -> Result<String> {
225        toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
226    }
227
228    pub fn redacted(&self) -> Self {
229        let mut config = self.clone();
230        config.trust = config.trust.redacted();
231        config
232    }
233
234    pub fn to_redacted_toml_string(&self) -> Result<String> {
235        self.redacted().to_toml_string()
236    }
237}
238
239#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
240pub struct DaemonConfig {
241    pub listen: SocketAddr,
242    pub destination: PathBuf,
243}
244
245impl DaemonConfig {
246    pub fn from_app_config(config: &AppConfig) -> Result<Self> {
247        Ok(Self {
248            listen: SocketAddr::from(([0, 0, 0, 0], config.quic_port)),
249            destination: expand_home_path(&config.download_dir)?,
250        })
251    }
252
253    pub fn load_or_default() -> Result<Self> {
254        let app_config = AppConfig::load_or_default()?;
255        Self::load_or_default_from(config_dir()?.join("daemon.toml"), &app_config)
256    }
257
258    pub fn load_or_default_from(path: impl AsRef<Path>, app_config: &AppConfig) -> Result<Self> {
259        let path = path.as_ref();
260        match std::fs::read_to_string(path) {
261            Ok(contents) => {
262                toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
263            }
264            Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
265                Self::from_app_config(app_config)
266            }
267            Err(source) => Err(Error::IoPath {
268                path: path.to_path_buf(),
269                source,
270            }),
271        }
272    }
273
274    pub fn save_default_path(&self) -> Result<()> {
275        self.save_to_path(config_dir()?.join("daemon.toml"))
276    }
277
278    pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
279        write_toml_file(path.as_ref(), self)
280    }
281
282    pub fn to_toml_string(&self) -> Result<String> {
283        toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
284    }
285}
286
287#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
288pub struct TrustConfig {
289    pub require_fingerprint: bool,
290    pub psk: String,
291}
292
293impl Default for TrustConfig {
294    fn default() -> Self {
295        Self {
296            require_fingerprint: true,
297            psk: String::new(),
298        }
299    }
300}
301
302impl TrustConfig {
303    pub fn redacted(&self) -> Self {
304        let psk = if self.psk.is_empty() {
305            String::new()
306        } else {
307            "<redacted>".to_string()
308        };
309        Self {
310            require_fingerprint: self.require_fingerprint,
311            psk,
312        }
313    }
314}
315
316#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
317pub struct NativeIdentity {
318    cert_der: Vec<u8>,
319    key_der: Vec<u8>,
320    fingerprint: String,
321}
322
323impl NativeIdentity {
324    pub fn load_or_generate() -> Result<Self> {
325        Self::load_or_generate_in(config_dir()?)
326    }
327
328    pub fn load_or_generate_in(config_dir: impl AsRef<Path>) -> Result<Self> {
329        let config_dir = config_dir.as_ref();
330        let cert_path = config_dir.join("identity.cert.der");
331        let key_path = config_dir.join("identity.key.der");
332
333        if cert_path.exists() && key_path.exists() {
334            let cert_der = std::fs::read(&cert_path).map_err(|source| Error::IoPath {
335                path: cert_path,
336                source,
337            })?;
338            let key_der = std::fs::read(&key_path).map_err(|source| Error::IoPath {
339                path: key_path,
340                source,
341            })?;
342
343            return Ok(Self::from_parts(cert_der, key_der));
344        }
345
346        std::fs::create_dir_all(config_dir).map_err(|source| Error::IoPath {
347            path: config_dir.to_path_buf(),
348            source,
349        })?;
350
351        let identity = Self::generate()?;
352        write_private_file(&cert_path, &identity.cert_der)?;
353        write_private_file(&key_path, &identity.key_der)?;
354        Ok(identity)
355    }
356
357    pub fn generate() -> Result<Self> {
358        let CertifiedKey { cert, signing_key } =
359            generate_simple_self_signed(vec!["localhost".to_string()])?;
360        Ok(Self::from_parts(
361            cert.der().to_vec(),
362            signing_key.serialize_der(),
363        ))
364    }
365
366    pub fn fingerprint(&self) -> &str {
367        &self.fingerprint
368    }
369
370    fn cert_chain(&self) -> Vec<CertificateDer<'static>> {
371        vec![CertificateDer::from(self.cert_der.clone())]
372    }
373
374    fn private_key(&self) -> PrivateKeyDer<'static> {
375        PrivateKeyDer::from(PrivatePkcs8KeyDer::from(self.key_der.clone()))
376    }
377
378    fn from_parts(cert_der: Vec<u8>, key_der: Vec<u8>) -> Self {
379        let fingerprint = blake3::hash(&cert_der).to_hex().to_string();
380        Self {
381            cert_der,
382            key_der,
383            fingerprint,
384        }
385    }
386}
387
388#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
389#[serde(rename_all = "snake_case")]
390pub enum TransferDirection {
391    Send,
392    Receive,
393}
394
395#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
396pub struct TransferProgress {
397    pub direction: TransferDirection,
398    pub file_name: String,
399    pub bytes_done: u64,
400    pub bytes_total: u64,
401}
402
403#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
404#[serde(tag = "event", rename_all = "snake_case")]
405pub enum TransferEvent {
406    SessionStarted {
407        direction: TransferDirection,
408        session_id: Uuid,
409        files_total: usize,
410        bytes_total: u64,
411    },
412    FileStarted {
413        direction: TransferDirection,
414        file_name: String,
415        bytes_total: u64,
416        resume_offset: u64,
417    },
418    Progress {
419        direction: TransferDirection,
420        file_name: String,
421        bytes_done: u64,
422        bytes_total: u64,
423    },
424    FileFinished {
425        direction: TransferDirection,
426        file_name: String,
427        bytes: u64,
428        blake3: String,
429        status: TransferFileStatus,
430    },
431    SessionFinished {
432        direction: TransferDirection,
433        files_total: usize,
434        bytes_total: u64,
435        blake3: String,
436    },
437    SessionCancelled {
438        direction: TransferDirection,
439        session_id: Uuid,
440    },
441}
442
443impl TransferEvent {
444    pub fn progress(&self) -> Option<TransferProgress> {
445        match self {
446            Self::Progress {
447                direction,
448                file_name,
449                bytes_done,
450                bytes_total,
451            } => Some(TransferProgress {
452                direction: *direction,
453                file_name: file_name.clone(),
454                bytes_done: *bytes_done,
455                bytes_total: *bytes_total,
456            }),
457            _ => None,
458        }
459    }
460}
461
462#[derive(Debug, Clone, Default)]
463pub struct TransferControl {
464    cancelled: Arc<AtomicBool>,
465}
466
467impl TransferControl {
468    pub fn new() -> Self {
469        Self::default()
470    }
471
472    pub fn cancel(&self) {
473        self.cancelled.store(true, Ordering::SeqCst);
474    }
475
476    pub fn is_cancelled(&self) -> bool {
477        self.cancelled.load(Ordering::SeqCst)
478    }
479
480    fn check_cancelled(&self) -> Result<()> {
481        if self.is_cancelled() {
482            Err(Error::TransferCancelled)
483        } else {
484            Ok(())
485        }
486    }
487}
488
489#[derive(Debug, Clone, PartialEq, Eq)]
490pub struct TransferSummary {
491    pub path: PathBuf,
492    pub file_name: String,
493    pub bytes: u64,
494    pub blake3: String,
495    pub files: Vec<TransferFileSummary>,
496}
497
498#[derive(Debug, Clone, PartialEq, Eq)]
499pub struct TransferFileSummary {
500    pub path: PathBuf,
501    pub relative_path: PathBuf,
502    pub bytes: u64,
503    pub blake3: String,
504    pub status: TransferFileStatus,
505}
506
507#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
508#[serde(rename_all = "snake_case")]
509pub enum TransferFileStatus {
510    Sent,
511    Received,
512    Skipped,
513    Resumed,
514}
515
516#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
517pub struct TransferManifest {
518    pub version: u16,
519    pub session_id: Uuid,
520    pub chunk_size: u64,
521    pub entries: Vec<ManifestEntry>,
522}
523
524impl TransferManifest {
525    pub fn file_entries(&self) -> impl Iterator<Item = &ManifestEntry> {
526        self.entries
527            .iter()
528            .filter(|entry| entry.kind == ManifestEntryKind::File)
529    }
530
531    pub fn total_file_bytes(&self) -> u64 {
532        self.file_entries().map(|entry| entry.size).sum()
533    }
534
535    pub fn digest(&self) -> Result<String> {
536        let bytes = serde_json::to_vec(self).map_err(|error| Error::Protocol(error.to_string()))?;
537        Ok(blake3::hash(&bytes).to_hex().to_string())
538    }
539}
540
541#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
542pub struct ManifestEntry {
543    pub relative_path: PathBuf,
544    pub kind: ManifestEntryKind,
545    pub size: u64,
546    pub blake3: Option<String>,
547    pub chunks: Vec<String>,
548    pub unix_mode: Option<u32>,
549    pub modified_unix_ms: Option<i128>,
550}
551
552#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
553#[serde(rename_all = "snake_case")]
554pub enum ManifestEntryKind {
555    File,
556    Directory,
557}
558
559#[derive(Debug, Clone, PartialEq, Eq)]
560pub enum ResumeDecision {
561    Create,
562    Skip,
563    ResumeFrom(u64),
564    Conflict(String),
565}
566
567#[derive(Debug, Clone, PartialEq, Eq)]
568pub struct PeerObservation {
569    pub fingerprint: String,
570    pub alias: Option<String>,
571    pub hostname: Option<String>,
572    pub address: SocketAddr,
573    pub transports: BTreeSet<String>,
574    pub seen_unix_ms: i128,
575}
576
577impl PeerObservation {
578    pub fn new(fingerprint: impl Into<String>, address: SocketAddr, seen_unix_ms: i128) -> Self {
579        Self {
580            fingerprint: fingerprint.into(),
581            alias: None,
582            hostname: None,
583            address,
584            transports: BTreeSet::new(),
585            seen_unix_ms,
586        }
587    }
588
589    pub fn with_alias(mut self, alias: impl Into<String>) -> Self {
590        self.alias = Some(alias.into());
591        self
592    }
593
594    pub fn with_hostname(mut self, hostname: impl Into<String>) -> Self {
595        self.hostname = Some(hostname.into());
596        self
597    }
598
599    pub fn with_transport(mut self, transport: impl Into<String>) -> Self {
600        self.transports.insert(transport.into());
601        self
602    }
603}
604
605#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
606pub struct PeerRecord {
607    pub fingerprint: String,
608    pub aliases: BTreeSet<String>,
609    pub hostnames: BTreeSet<String>,
610    pub addresses: BTreeSet<SocketAddr>,
611    pub transports: BTreeSet<String>,
612    pub first_seen_unix_ms: i128,
613    pub last_seen_unix_ms: i128,
614}
615
616impl PeerRecord {
617    fn from_observation(observation: PeerObservation) -> Self {
618        let mut aliases = BTreeSet::new();
619        if let Some(alias) = observation.alias {
620            aliases.insert(alias);
621        }
622
623        let mut hostnames = BTreeSet::new();
624        if let Some(hostname) = observation.hostname {
625            hostnames.insert(hostname);
626        }
627
628        let mut addresses = BTreeSet::new();
629        addresses.insert(observation.address);
630
631        Self {
632            fingerprint: observation.fingerprint,
633            aliases,
634            hostnames,
635            addresses,
636            transports: observation.transports,
637            first_seen_unix_ms: observation.seen_unix_ms,
638            last_seen_unix_ms: observation.seen_unix_ms,
639        }
640    }
641
642    fn merge(&mut self, observation: PeerObservation) {
643        if let Some(alias) = observation.alias {
644            self.aliases.insert(alias);
645        }
646        if let Some(hostname) = observation.hostname {
647            self.hostnames.insert(hostname);
648        }
649        self.addresses.insert(observation.address);
650        self.transports.extend(observation.transports);
651        self.first_seen_unix_ms = self.first_seen_unix_ms.min(observation.seen_unix_ms);
652        self.last_seen_unix_ms = self.last_seen_unix_ms.max(observation.seen_unix_ms);
653    }
654
655    pub fn preferred_quic_address(&self) -> Option<SocketAddr> {
656        if !self.transports.contains("quic") {
657            return None;
658        }
659
660        self.addresses.iter().copied().next()
661    }
662}
663
664#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
665#[serde(rename_all = "snake_case")]
666pub enum TrustState {
667    Trusted,
668    Blocked,
669}
670
671#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
672pub struct KnownPeerEntry {
673    pub fingerprint: String,
674    #[serde(default)]
675    pub aliases: BTreeSet<String>,
676    #[serde(default)]
677    pub hostnames: BTreeSet<String>,
678    #[serde(default)]
679    pub addresses: BTreeSet<SocketAddr>,
680    #[serde(default)]
681    pub transports: BTreeSet<String>,
682    pub trust_state: TrustState,
683    pub first_seen_unix_ms: Option<i128>,
684    pub last_seen_unix_ms: Option<i128>,
685}
686
687impl KnownPeerEntry {
688    pub fn trusted(fingerprint: impl Into<String>) -> Result<Self> {
689        let fingerprint = normalized_fingerprint(fingerprint.into())?;
690        Ok(Self {
691            fingerprint,
692            aliases: BTreeSet::new(),
693            hostnames: BTreeSet::new(),
694            addresses: BTreeSet::new(),
695            transports: BTreeSet::new(),
696            trust_state: TrustState::Trusted,
697            first_seen_unix_ms: None,
698            last_seen_unix_ms: None,
699        })
700    }
701
702    pub fn from_record(record: &PeerRecord, trust_state: TrustState) -> Result<Self> {
703        let fingerprint = normalized_fingerprint(record.fingerprint.clone())?;
704        Ok(Self {
705            fingerprint,
706            aliases: record.aliases.clone(),
707            hostnames: record.hostnames.clone(),
708            addresses: record.addresses.clone(),
709            transports: record.transports.clone(),
710            trust_state,
711            first_seen_unix_ms: Some(record.first_seen_unix_ms),
712            last_seen_unix_ms: Some(record.last_seen_unix_ms),
713        })
714    }
715}
716
717#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
718pub struct TrustStore {
719    #[serde(default)]
720    pub peers: BTreeMap<String, KnownPeerEntry>,
721}
722
723impl TrustStore {
724    pub fn load_or_default() -> Result<Self> {
725        Self::load_or_default_from(config_dir()?.join("known_peers.toml"))
726    }
727
728    pub fn load_or_default_from(path: impl AsRef<Path>) -> Result<Self> {
729        let path = path.as_ref();
730        match std::fs::read_to_string(path) {
731            Ok(contents) => {
732                toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
733            }
734            Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Self::default()),
735            Err(source) => Err(Error::IoPath {
736                path: path.to_path_buf(),
737                source,
738            }),
739        }
740    }
741
742    pub fn save_default_path(&self) -> Result<()> {
743        self.save_to_path(config_dir()?.join("known_peers.toml"))
744    }
745
746    pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
747        write_toml_file(path.as_ref(), self)
748    }
749
750    pub fn trust_fingerprint(&mut self, fingerprint: impl Into<String>) -> Result<&KnownPeerEntry> {
751        let entry = KnownPeerEntry::trusted(fingerprint)?;
752        let fingerprint = entry.fingerprint.clone();
753        self.peers
754            .entry(fingerprint.clone())
755            .and_modify(|existing| existing.trust_state = TrustState::Trusted)
756            .or_insert(entry);
757        Ok(self
758            .peers
759            .get(&fingerprint)
760            .expect("trusted peer entry should exist"))
761    }
762
763    pub fn trust_record(&mut self, record: &PeerRecord) -> Result<&KnownPeerEntry> {
764        let entry = KnownPeerEntry::from_record(record, TrustState::Trusted)?;
765        let fingerprint = entry.fingerprint.clone();
766        self.peers
767            .entry(fingerprint.clone())
768            .and_modify(|existing| {
769                existing.aliases.extend(record.aliases.clone());
770                existing.hostnames.extend(record.hostnames.clone());
771                existing.addresses.extend(record.addresses.clone());
772                existing.transports.extend(record.transports.clone());
773                existing.first_seen_unix_ms = Some(
774                    existing
775                        .first_seen_unix_ms
776                        .unwrap_or(record.first_seen_unix_ms)
777                        .min(record.first_seen_unix_ms),
778                );
779                existing.last_seen_unix_ms = Some(
780                    existing
781                        .last_seen_unix_ms
782                        .unwrap_or(record.last_seen_unix_ms)
783                        .max(record.last_seen_unix_ms),
784                );
785                existing.trust_state = TrustState::Trusted;
786            })
787            .or_insert(entry);
788        Ok(self
789            .peers
790            .get(&fingerprint)
791            .expect("trusted peer entry should exist"))
792    }
793
794    pub fn forget(&mut self, fingerprint: &str) -> bool {
795        self.peers.remove(fingerprint).is_some()
796    }
797
798    pub fn records(&self) -> impl Iterator<Item = &KnownPeerEntry> {
799        self.peers.values()
800    }
801
802    pub fn to_toml_string(&self) -> Result<String> {
803        toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
804    }
805}
806
807#[derive(Debug, Default, Clone, PartialEq, Eq)]
808pub struct PeerRegistry {
809    peers: BTreeMap<String, PeerRecord>,
810}
811
812#[derive(Debug, Clone, PartialEq, Eq)]
813pub enum PeerLookup<'a> {
814    Found(&'a PeerRecord),
815    Ambiguous(Vec<&'a PeerRecord>),
816    Missing,
817}
818
819impl PeerRegistry {
820    pub fn new() -> Self {
821        Self::default()
822    }
823
824    pub fn observe(&mut self, observation: PeerObservation) -> Result<&PeerRecord> {
825        if observation.fingerprint.trim().is_empty() {
826            return Err(Error::InvalidInput(
827                "peer fingerprint must not be empty".to_string(),
828            ));
829        }
830
831        let fingerprint = observation.fingerprint.clone();
832        if let Some(record) = self.peers.get_mut(&fingerprint) {
833            record.merge(observation);
834        } else {
835            self.peers.insert(
836                fingerprint.clone(),
837                PeerRecord::from_observation(observation),
838            );
839        }
840
841        Ok(self
842            .peers
843            .get(&fingerprint)
844            .expect("observed peer record should exist"))
845    }
846
847    pub fn get_by_fingerprint(&self, fingerprint: &str) -> Option<&PeerRecord> {
848        self.peers.get(fingerprint)
849    }
850
851    pub fn lookup(&self, query: &str) -> Option<&PeerRecord> {
852        match self.lookup_detail(query) {
853            PeerLookup::Found(record) => Some(record),
854            PeerLookup::Ambiguous(_) | PeerLookup::Missing => None,
855        }
856    }
857
858    pub fn lookup_detail(&self, query: &str) -> PeerLookup<'_> {
859        if let Some(record) = self.peers.get(query) {
860            return PeerLookup::Found(record);
861        }
862
863        let matches = self
864            .peers
865            .values()
866            .filter(|record| {
867                record.fingerprint.starts_with(query)
868                    || record.aliases.iter().any(|alias| alias == query)
869                    || record.hostnames.iter().any(|hostname| hostname == query)
870            })
871            .collect::<Vec<_>>();
872
873        match matches.as_slice() {
874            [] => PeerLookup::Missing,
875            [record] => PeerLookup::Found(record),
876            _ => PeerLookup::Ambiguous(matches),
877        }
878    }
879
880    pub fn records(&self) -> impl Iterator<Item = &PeerRecord> {
881        self.peers.values()
882    }
883}
884
885#[derive(Debug, Clone, PartialEq, Eq)]
886pub struct NativeAnnouncement {
887    pub alias: String,
888    pub fingerprint: String,
889    pub quic_port: u16,
890    pub listen_port: Option<u16>,
891}
892
893impl NativeAnnouncement {
894    pub fn from_config(config: &AppConfig, identity: &NativeIdentity) -> Self {
895        Self {
896            alias: config.alias.clone(),
897            fingerprint: identity.fingerprint().to_string(),
898            quic_port: config.quic_port,
899            listen_port: Some(config.listen_port),
900        }
901    }
902
903    fn service_info(&self) -> Result<ServiceInfo> {
904        let alias = sanitize_dns_label(&self.alias);
905        let fingerprint_prefix = self
906            .fingerprint
907            .get(..12)
908            .unwrap_or(self.fingerprint.as_str());
909        let instance = format!("{alias}-{fingerprint_prefix}");
910        let hostname = format!("{alias}.local.");
911        let properties = [
912            ("pv", NATIVE_PROTOCOL_VERSION.to_string()),
913            ("minpv", MIN_NATIVE_PROTOCOL_VERSION.to_string()),
914            ("fp", self.fingerprint.clone()),
915            ("alias", self.alias.clone()),
916            ("transports", "quic".to_string()),
917            ("quic_port", self.quic_port.to_string()),
918            (
919                "listen_port",
920                self.listen_port
921                    .map(|port| port.to_string())
922                    .unwrap_or_default(),
923            ),
924        ];
925
926        ServiceInfo::new(
927            NATIVE_SERVICE_TYPE,
928            &instance,
929            &hostname,
930            "",
931            self.quic_port,
932            &properties[..],
933        )
934        .map(ServiceInfo::enable_addr_auto)
935        .map_err(|error| Error::Discovery(error.to_string()))
936    }
937}
938
939pub struct NativeAnnouncer {
940    daemon: ServiceDaemon,
941    fullname: String,
942}
943
944impl NativeAnnouncer {
945    pub fn start(announcement: &NativeAnnouncement) -> Result<Self> {
946        let daemon = ServiceDaemon::new().map_err(|error| Error::Discovery(error.to_string()))?;
947        let service = announcement.service_info()?;
948        let fullname = service.get_fullname().to_string();
949        daemon
950            .register(service)
951            .map_err(|error| Error::Discovery(error.to_string()))?;
952        Ok(Self { daemon, fullname })
953    }
954}
955
956impl Drop for NativeAnnouncer {
957    fn drop(&mut self) {
958        let _ = self.daemon.unregister(&self.fullname);
959        let _ = self.daemon.shutdown();
960    }
961}
962
963pub async fn discover_native_peers(timeout: Duration) -> Result<PeerRegistry> {
964    let daemon = ServiceDaemon::new().map_err(|error| Error::Discovery(error.to_string()))?;
965    let receiver = daemon
966        .browse(NATIVE_SERVICE_TYPE)
967        .map_err(|error| Error::Discovery(error.to_string()))?;
968    let deadline = tokio::time::Instant::now() + timeout;
969    let mut registry = PeerRegistry::new();
970
971    loop {
972        let now = tokio::time::Instant::now();
973        if now >= deadline {
974            break;
975        }
976
977        let wait = deadline - now;
978        match tokio::time::timeout(wait, receiver.recv_async()).await {
979            Ok(Ok(ServiceEvent::ServiceResolved(service))) => {
980                observe_resolved_service(&mut registry, &service)?;
981            }
982            Ok(Ok(_)) => {}
983            Ok(Err(error)) => {
984                return Err(Error::Discovery(error.to_string()));
985            }
986            Err(_) => break,
987        }
988    }
989
990    daemon
991        .stop_browse(NATIVE_SERVICE_TYPE)
992        .map_err(|error| Error::Discovery(error.to_string()))?;
993    let _ = daemon.shutdown();
994    Ok(registry)
995}
996
997fn observe_resolved_service(registry: &mut PeerRegistry, service: &ResolvedService) -> Result<()> {
998    let Some(fingerprint) = service.get_property_val_str("fp") else {
999        return Ok(());
1000    };
1001    let fingerprint = match normalized_fingerprint(fingerprint.to_string()) {
1002        Ok(fingerprint) => fingerprint,
1003        Err(_) => return Ok(()),
1004    };
1005    let Some(highest_version) = parse_txt_u16(service, "pv") else {
1006        return Ok(());
1007    };
1008    let Some(minimum_version) = parse_txt_u16(service, "minpv") else {
1009        return Ok(());
1010    };
1011    if minimum_version > highest_version
1012        || minimum_version > NATIVE_PROTOCOL_VERSION
1013        || highest_version < MIN_NATIVE_PROTOCOL_VERSION
1014    {
1015        return Ok(());
1016    }
1017
1018    let Some(quic_port) = parse_txt_u16(service, "quic_port") else {
1019        return Ok(());
1020    };
1021    let transports = service
1022        .get_property_val_str("transports")
1023        .unwrap_or_default()
1024        .split(',')
1025        .map(str::trim)
1026        .filter(|value| !value.is_empty())
1027        .map(ToOwned::to_owned)
1028        .collect::<Vec<_>>();
1029    if !transports.iter().any(|transport| transport == "quic") {
1030        return Ok(());
1031    }
1032
1033    let alias = service.get_property_val_str("alias").map(ToOwned::to_owned);
1034    let hostname = Some(service.get_hostname().trim_end_matches('.').to_string());
1035    let seen_unix_ms = system_time_unix_ms(SystemTime::now()).unwrap_or_default();
1036
1037    for address in service.get_addresses_v4() {
1038        let mut observation = PeerObservation::new(
1039            fingerprint.clone(),
1040            SocketAddr::from((address, quic_port)),
1041            seen_unix_ms,
1042        );
1043        if let Some(alias) = &alias {
1044            observation = observation.with_alias(alias.clone());
1045        }
1046        if let Some(hostname) = &hostname {
1047            observation = observation.with_hostname(hostname.clone());
1048        }
1049        for transport in &transports {
1050            observation = observation.with_transport(transport.clone());
1051        }
1052        registry.observe(observation)?;
1053    }
1054
1055    Ok(())
1056}
1057
1058fn parse_txt_u16(service: &ResolvedService, key: &str) -> Option<u16> {
1059    service.get_property_val_str(key)?.parse().ok()
1060}
1061
1062fn sanitize_dns_label(value: &str) -> String {
1063    let mut label = value
1064        .chars()
1065        .map(|ch| {
1066            if ch.is_ascii_alphanumeric() || ch == '-' {
1067                ch.to_ascii_lowercase()
1068            } else {
1069                '-'
1070            }
1071        })
1072        .collect::<String>();
1073    while label.contains("--") {
1074        label = label.replace("--", "-");
1075    }
1076    let label = label.trim_matches('-').to_string();
1077    if label.is_empty() {
1078        "fileferry-device".to_string()
1079    } else {
1080        label
1081    }
1082}
1083
1084#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1085struct DirectReceivePlan {
1086    files: Vec<DirectFilePlan>,
1087}
1088
1089#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1090struct DirectProtocolHello {
1091    protocol_version: u16,
1092    min_protocol_version: u16,
1093    client_fingerprint: String,
1094    #[serde(default)]
1095    psk: bool,
1096}
1097
1098impl DirectProtocolHello {
1099    fn new(identity: &NativeIdentity, psk: Option<&str>) -> Self {
1100        Self {
1101            protocol_version: NATIVE_PROTOCOL_VERSION,
1102            min_protocol_version: MIN_NATIVE_PROTOCOL_VERSION,
1103            client_fingerprint: identity.fingerprint().to_string(),
1104            psk: psk.is_some(),
1105        }
1106    }
1107}
1108
1109#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1110struct DirectProtocolResponse {
1111    accepted: bool,
1112    protocol_version: Option<u16>,
1113    #[serde(default)]
1114    psk_challenge: Option<String>,
1115    error: Option<String>,
1116}
1117
1118impl DirectProtocolResponse {
1119    fn accept(protocol_version: u16) -> Self {
1120        Self {
1121            accepted: true,
1122            protocol_version: Some(protocol_version),
1123            psk_challenge: None,
1124            error: None,
1125        }
1126    }
1127
1128    fn accept_with_psk(protocol_version: u16, challenge: impl Into<String>) -> Self {
1129        Self {
1130            accepted: true,
1131            protocol_version: Some(protocol_version),
1132            psk_challenge: Some(challenge.into()),
1133            error: None,
1134        }
1135    }
1136
1137    fn reject(error: impl Into<String>) -> Self {
1138        Self {
1139            accepted: false,
1140            protocol_version: None,
1141            psk_challenge: None,
1142            error: Some(error.into()),
1143        }
1144    }
1145}
1146
1147#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1148struct DirectPskProof {
1149    proof: String,
1150}
1151
1152#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1153struct DirectPskResult {
1154    accepted: bool,
1155    error: Option<String>,
1156}
1157
1158impl DirectPskResult {
1159    fn accept() -> Self {
1160        Self {
1161            accepted: true,
1162            error: None,
1163        }
1164    }
1165
1166    fn reject(error: impl Into<String>) -> Self {
1167        Self {
1168            accepted: false,
1169            error: Some(error.into()),
1170        }
1171    }
1172}
1173
1174#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1175struct DirectFilePlan {
1176    relative_path: PathBuf,
1177    action: DirectFileAction,
1178    offset: u64,
1179}
1180
1181#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1182#[serde(rename_all = "snake_case")]
1183enum DirectFileAction {
1184    Receive,
1185    Skip,
1186}
1187
1188#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1189struct DirectPayloadHeader {
1190    relative_path: PathBuf,
1191    offset: u64,
1192    bytes: u64,
1193}
1194
1195#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1196struct DirectAck {
1197    ok: bool,
1198    error: Option<String>,
1199}
1200
1201pub fn validate_send_paths(paths: &[PathBuf]) -> Result<()> {
1202    if paths.is_empty() {
1203        return Err(Error::InvalidInput(
1204            "at least one file path is required".to_string(),
1205        ));
1206    }
1207
1208    for path in paths {
1209        if path.as_os_str().is_empty() {
1210            return Err(Error::InvalidInput("empty file path".to_string()));
1211        }
1212    }
1213
1214    Ok(())
1215}
1216
1217pub fn display_path(path: &Path) -> String {
1218    path.to_string_lossy().into_owned()
1219}
1220
1221pub async fn build_manifest(paths: &[PathBuf]) -> Result<TransferManifest> {
1222    validate_send_paths(paths)?;
1223
1224    let sources = collect_manifest_sources(paths)?;
1225    let mut entries = Vec::with_capacity(sources.len());
1226    let mut seen = BTreeSet::new();
1227
1228    for source in sources {
1229        if !seen.insert(source.relative_path.clone()) {
1230            return Err(Error::InvalidInput(format!(
1231                "duplicate transfer path: {}",
1232                display_path(&source.relative_path)
1233            )));
1234        }
1235
1236        entries.push(build_manifest_entry(source).await?);
1237    }
1238
1239    Ok(TransferManifest {
1240        version: MANIFEST_VERSION,
1241        session_id: Uuid::new_v4(),
1242        chunk_size: DEFAULT_CHUNK_SIZE,
1243        entries,
1244    })
1245}
1246
1247pub fn safe_destination_path(destination: &Path, relative_path: &Path) -> Result<PathBuf> {
1248    validate_relative_transfer_path(relative_path)?;
1249    Ok(destination.join(relative_path))
1250}
1251
1252pub async fn decide_resume(
1253    destination: &Path,
1254    entry: &ManifestEntry,
1255    chunk_size: u64,
1256) -> Result<ResumeDecision> {
1257    if entry.kind != ManifestEntryKind::File {
1258        return Ok(ResumeDecision::Create);
1259    }
1260
1261    let path = safe_destination_path(destination, &entry.relative_path)?;
1262    let metadata = match tokio::fs::metadata(&path).await {
1263        Ok(metadata) => metadata,
1264        Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
1265            return Ok(ResumeDecision::Create);
1266        }
1267        Err(source) => {
1268            return Err(Error::IoPath { path, source });
1269        }
1270    };
1271
1272    if !metadata.is_file() {
1273        return Ok(ResumeDecision::Conflict(format!(
1274            "{} already exists and is not a regular file",
1275            display_path(&path)
1276        )));
1277    }
1278
1279    if metadata.len() == entry.size {
1280        let expected = entry
1281            .blake3
1282            .as_deref()
1283            .ok_or_else(|| Error::Protocol("file manifest entry is missing BLAKE3".to_string()))?;
1284        let actual = hash_file(&path).await?;
1285        return if actual == expected {
1286            Ok(ResumeDecision::Skip)
1287        } else {
1288            Ok(ResumeDecision::Conflict(format!(
1289                "{} already exists with different contents",
1290                display_path(&path)
1291            )))
1292        };
1293    }
1294
1295    if metadata.len() > entry.size {
1296        return Ok(ResumeDecision::Conflict(format!(
1297            "{} is larger than incoming file",
1298            display_path(&path)
1299        )));
1300    }
1301
1302    let verified_offset = verified_prefix_offset(&path, entry, metadata.len(), chunk_size).await?;
1303    if verified_offset > 0 {
1304        Ok(ResumeDecision::ResumeFrom(verified_offset))
1305    } else {
1306        Ok(ResumeDecision::Conflict(format!(
1307            "{} already exists and does not match a verified prefix",
1308            display_path(&path)
1309        )))
1310    }
1311}
1312
1313pub async fn send_direct_file(
1314    request: &SendRequest,
1315    identity: &NativeIdentity,
1316    mut events: impl FnMut(TransferEvent),
1317) -> Result<TransferSummary> {
1318    send_direct_file_with_control(request, identity, TransferControl::new(), &mut events).await
1319}
1320
1321pub async fn send_direct_file_with_control(
1322    request: &SendRequest,
1323    identity: &NativeIdentity,
1324    control: TransferControl,
1325    mut events: impl FnMut(TransferEvent),
1326) -> Result<TransferSummary> {
1327    ensure_rustls_provider();
1328
1329    control.check_cancelled()?;
1330    let manifest = build_manifest(request.paths()).await?;
1331    let sources = manifest_source_map(request.paths())?;
1332    events(TransferEvent::SessionStarted {
1333        direction: TransferDirection::Send,
1334        session_id: manifest.session_id,
1335        files_total: manifest.file_entries().count(),
1336        bytes_total: manifest.total_file_bytes(),
1337    });
1338    control.check_cancelled()?;
1339
1340    let (client_config, server_fingerprint) = client_config_capturing_server_fingerprint()?;
1341    let mut endpoint = quinn::Endpoint::client(SocketAddr::from(([0, 0, 0, 0], 0)))?;
1342    endpoint.set_default_client_config(client_config);
1343    let connection = endpoint
1344        .connect(request.peer().address(), "localhost")?
1345        .await?;
1346    if let Some(expected) = request.peer().expected_fingerprint() {
1347        let actual = captured_server_fingerprint(&server_fingerprint)?;
1348        if actual != expected {
1349            connection.close(1u32.into(), b"fingerprint mismatch");
1350            return Err(Error::PeerFingerprintMismatch {
1351                expected: expected.to_string(),
1352                actual,
1353            });
1354        }
1355    }
1356    let (mut send, mut recv) = connection.open_bi().await?;
1357
1358    send.write_all(DIRECT_MAGIC).await?;
1359    write_json_frame(
1360        &mut send,
1361        &DirectProtocolHello::new(identity, request.psk()),
1362    )
1363    .await?;
1364    let protocol: DirectProtocolResponse = read_json_frame(&mut recv).await?;
1365    if !protocol.accepted {
1366        return Err(Error::Protocol(protocol.error.unwrap_or_else(|| {
1367            "receiver rejected protocol negotiation".to_string()
1368        })));
1369    }
1370    if protocol.protocol_version != Some(NATIVE_PROTOCOL_VERSION) {
1371        return Err(Error::Protocol(format!(
1372            "receiver selected unsupported protocol version {:?}",
1373            protocol.protocol_version
1374        )));
1375    }
1376    match (request.psk(), protocol.psk_challenge.as_deref()) {
1377        (Some(psk), Some(challenge)) => {
1378            let proof = DirectPskProof {
1379                proof: psk_proof(
1380                    psk,
1381                    challenge,
1382                    identity.fingerprint(),
1383                    NATIVE_PROTOCOL_VERSION,
1384                )?,
1385            };
1386            write_json_frame(&mut send, &proof).await?;
1387            let auth: DirectPskResult = read_json_frame(&mut recv).await?;
1388            if !auth.accepted {
1389                return Err(Error::Authentication(
1390                    auth.error
1391                        .unwrap_or_else(|| "PSK authentication rejected".to_string()),
1392                ));
1393            }
1394        }
1395        (Some(_), None) => {
1396            return Err(Error::Authentication(
1397                "receiver is not configured for PSK mode".to_string(),
1398            ));
1399        }
1400        (None, Some(_)) => {
1401            return Err(Error::Authentication(
1402                "receiver requires PSK authentication".to_string(),
1403            ));
1404        }
1405        (None, None) => {}
1406    }
1407
1408    write_json_frame(&mut send, &manifest).await?;
1409
1410    let plan: DirectReceivePlan = read_json_frame(&mut recv).await?;
1411    let mut summaries = Vec::new();
1412    for file_plan in plan.files {
1413        let Some(entry) = manifest
1414            .file_entries()
1415            .find(|entry| entry.relative_path == file_plan.relative_path)
1416        else {
1417            return Err(Error::Protocol(format!(
1418                "receiver requested unknown file: {}",
1419                display_path(&file_plan.relative_path)
1420            )));
1421        };
1422
1423        let Some(source_path) = sources.get(&file_plan.relative_path) else {
1424            return Err(Error::Protocol(format!(
1425                "missing source for manifest file: {}",
1426                display_path(&file_plan.relative_path)
1427            )));
1428        };
1429
1430        if file_plan.action == DirectFileAction::Skip {
1431            let blake3 = entry.blake3.clone().unwrap_or_default();
1432            events(TransferEvent::FileFinished {
1433                direction: TransferDirection::Send,
1434                file_name: display_path(&entry.relative_path),
1435                bytes: entry.size,
1436                blake3: blake3.clone(),
1437                status: TransferFileStatus::Skipped,
1438            });
1439            summaries.push(TransferFileSummary {
1440                path: source_path.clone(),
1441                relative_path: entry.relative_path.clone(),
1442                bytes: entry.size,
1443                blake3,
1444                status: TransferFileStatus::Skipped,
1445            });
1446            continue;
1447        }
1448
1449        match send_payload_file(
1450            &mut send,
1451            source_path,
1452            entry,
1453            file_plan.offset,
1454            &control,
1455            &mut events,
1456        )
1457        .await
1458        {
1459            Ok(()) => {}
1460            Err(Error::TransferCancelled) => {
1461                events(TransferEvent::SessionCancelled {
1462                    direction: TransferDirection::Send,
1463                    session_id: manifest.session_id,
1464                });
1465                connection.close(1u32.into(), b"cancelled");
1466                return Err(Error::TransferCancelled);
1467            }
1468            Err(error) => return Err(error),
1469        }
1470        let status = if file_plan.offset > 0 {
1471            TransferFileStatus::Resumed
1472        } else {
1473            TransferFileStatus::Sent
1474        };
1475        let blake3 = entry.blake3.clone().unwrap_or_default();
1476        events(TransferEvent::FileFinished {
1477            direction: TransferDirection::Send,
1478            file_name: display_path(&entry.relative_path),
1479            bytes: entry.size,
1480            blake3: blake3.clone(),
1481            status,
1482        });
1483        summaries.push(TransferFileSummary {
1484            path: source_path.clone(),
1485            relative_path: entry.relative_path.clone(),
1486            bytes: entry.size,
1487            blake3,
1488            status,
1489        });
1490    }
1491    send.finish()?;
1492
1493    let ack: DirectAck = read_json_frame(&mut recv).await?;
1494    if !ack.ok {
1495        return Err(Error::Transfer(
1496            ack.error
1497                .unwrap_or_else(|| "receiver rejected transfer".to_string()),
1498        ));
1499    }
1500
1501    connection.close(0u32.into(), b"done");
1502    endpoint.wait_idle().await;
1503
1504    let summary = summary_from_files(&manifest, summaries)?;
1505    events(TransferEvent::SessionFinished {
1506        direction: TransferDirection::Send,
1507        files_total: summary.files.len(),
1508        bytes_total: summary.bytes,
1509        blake3: summary.blake3.clone(),
1510    });
1511    Ok(summary)
1512}
1513
1514pub async fn receive_direct_file(
1515    request: &ReceiveRequest,
1516    destination: impl AsRef<Path>,
1517    identity: &NativeIdentity,
1518    events: impl FnMut(TransferEvent),
1519) -> Result<TransferSummary> {
1520    receive_direct_file_with_control(
1521        request,
1522        destination,
1523        identity,
1524        TransferControl::new(),
1525        events,
1526    )
1527    .await
1528}
1529
1530pub async fn receive_direct_file_with_control(
1531    request: &ReceiveRequest,
1532    destination: impl AsRef<Path>,
1533    identity: &NativeIdentity,
1534    control: TransferControl,
1535    mut events: impl FnMut(TransferEvent),
1536) -> Result<TransferSummary> {
1537    ensure_rustls_provider();
1538
1539    control.check_cancelled()?;
1540    let destination = destination.as_ref();
1541    tokio::fs::create_dir_all(destination)
1542        .await
1543        .map_err(|source| Error::IoPath {
1544            path: destination.to_path_buf(),
1545            source,
1546        })?;
1547
1548    let server_config =
1549        quinn::ServerConfig::with_single_cert(identity.cert_chain(), identity.private_key())?;
1550    let endpoint = quinn::Endpoint::server(server_config, request.listen())?;
1551    let incoming = endpoint.accept().await.ok_or(Error::NoIncomingConnection)?;
1552    let connection = incoming.await?;
1553    let (mut send, mut recv) = connection.accept_bi().await?;
1554
1555    let result = receive_stream_file(
1556        destination,
1557        request.psk(),
1558        &mut recv,
1559        &mut send,
1560        &control,
1561        &mut events,
1562    )
1563    .await;
1564    let ack = match &result {
1565        Ok(_) => DirectAck {
1566            ok: true,
1567            error: None,
1568        },
1569        Err(error) => DirectAck {
1570            ok: false,
1571            error: Some(error.to_string()),
1572        },
1573    };
1574    write_json_frame(&mut send, &ack).await?;
1575    send.finish()?;
1576    let _ = tokio::time::timeout(Duration::from_secs(1), connection.closed()).await;
1577    endpoint.close(0u32.into(), b"done");
1578    endpoint.wait_idle().await;
1579
1580    result
1581}
1582
1583async fn receive_stream_file(
1584    destination: &Path,
1585    psk: Option<&str>,
1586    recv: &mut quinn::RecvStream,
1587    send: &mut quinn::SendStream,
1588    control: &TransferControl,
1589    events: &mut impl FnMut(TransferEvent),
1590) -> Result<TransferSummary> {
1591    let mut magic = [0; DIRECT_MAGIC.len()];
1592    recv.read_exact(&mut magic).await?;
1593    if &magic != DIRECT_MAGIC {
1594        return Err(Error::Protocol("bad direct-transfer magic".to_string()));
1595    }
1596
1597    let hello: DirectProtocolHello = read_json_frame(recv).await?;
1598    let protocol = negotiate_direct_protocol(&hello, psk);
1599    write_json_frame(send, &protocol).await?;
1600    if !protocol.accepted {
1601        return Err(Error::Protocol(protocol.error.unwrap_or_else(|| {
1602            "unsupported direct protocol version".to_string()
1603        })));
1604    }
1605    if let (Some(psk), Some(challenge)) = (psk, protocol.psk_challenge.as_deref()) {
1606        let proof: DirectPskProof = read_json_frame(recv).await?;
1607        let expected = psk_proof(
1608            psk,
1609            challenge,
1610            &hello.client_fingerprint,
1611            protocol.protocol_version.unwrap_or(NATIVE_PROTOCOL_VERSION),
1612        )?;
1613        if proof.proof != expected {
1614            write_json_frame(send, &DirectPskResult::reject("PSK proof mismatch")).await?;
1615            return Err(Error::Authentication("PSK proof mismatch".to_string()));
1616        }
1617        write_json_frame(send, &DirectPskResult::accept()).await?;
1618    }
1619
1620    let manifest: TransferManifest = read_json_frame(recv).await?;
1621    validate_manifest(&manifest)?;
1622    events(TransferEvent::SessionStarted {
1623        direction: TransferDirection::Receive,
1624        session_id: manifest.session_id,
1625        files_total: manifest.file_entries().count(),
1626        bytes_total: manifest.total_file_bytes(),
1627    });
1628    control.check_cancelled()?;
1629
1630    for entry in &manifest.entries {
1631        if entry.kind == ManifestEntryKind::Directory {
1632            let path = safe_destination_path(destination, &entry.relative_path)?;
1633            tokio::fs::create_dir_all(&path)
1634                .await
1635                .map_err(|source| Error::IoPath { path, source })?;
1636        }
1637    }
1638
1639    let mut plan = DirectReceivePlan { files: Vec::new() };
1640    for entry in manifest.file_entries() {
1641        let decision = decide_resume(destination, entry, manifest.chunk_size).await?;
1642        let (action, offset) = match decision {
1643            ResumeDecision::Create => (DirectFileAction::Receive, 0),
1644            ResumeDecision::Skip => (DirectFileAction::Skip, entry.size),
1645            ResumeDecision::ResumeFrom(offset) => (DirectFileAction::Receive, offset),
1646            ResumeDecision::Conflict(message) => return Err(Error::InvalidInput(message)),
1647        };
1648        plan.files.push(DirectFilePlan {
1649            relative_path: entry.relative_path.clone(),
1650            action,
1651            offset,
1652        });
1653    }
1654    write_json_frame(send, &plan).await?;
1655
1656    let mut summaries = Vec::new();
1657    for file_plan in &plan.files {
1658        let Some(entry) = manifest
1659            .file_entries()
1660            .find(|entry| entry.relative_path == file_plan.relative_path)
1661        else {
1662            return Err(Error::Protocol(format!(
1663                "planned file missing from manifest: {}",
1664                display_path(&file_plan.relative_path)
1665            )));
1666        };
1667
1668        let final_path = safe_destination_path(destination, &entry.relative_path)?;
1669        if file_plan.action == DirectFileAction::Skip {
1670            let blake3 = entry.blake3.clone().unwrap_or_default();
1671            events(TransferEvent::FileFinished {
1672                direction: TransferDirection::Receive,
1673                file_name: display_path(&entry.relative_path),
1674                bytes: entry.size,
1675                blake3: blake3.clone(),
1676                status: TransferFileStatus::Skipped,
1677            });
1678            summaries.push(TransferFileSummary {
1679                path: final_path,
1680                relative_path: entry.relative_path.clone(),
1681                bytes: entry.size,
1682                blake3,
1683                status: TransferFileStatus::Skipped,
1684            });
1685            continue;
1686        }
1687
1688        match receive_payload_file(recv, destination, entry, file_plan.offset, control, events)
1689            .await
1690        {
1691            Ok(()) => {}
1692            Err(Error::TransferCancelled) => {
1693                events(TransferEvent::SessionCancelled {
1694                    direction: TransferDirection::Receive,
1695                    session_id: manifest.session_id,
1696                });
1697                return Err(Error::TransferCancelled);
1698            }
1699            Err(error) => return Err(error),
1700        }
1701        let actual_hash = hash_file(&final_path).await?;
1702        let expected_hash = entry
1703            .blake3
1704            .as_deref()
1705            .ok_or_else(|| Error::Protocol("file manifest entry is missing BLAKE3".to_string()))?;
1706        if actual_hash != expected_hash {
1707            return Err(Error::Transfer(format!(
1708                "BLAKE3 hash mismatch for {}",
1709                display_path(&entry.relative_path)
1710            )));
1711        }
1712
1713        let status = if file_plan.offset > 0 {
1714            TransferFileStatus::Resumed
1715        } else {
1716            TransferFileStatus::Received
1717        };
1718        events(TransferEvent::FileFinished {
1719            direction: TransferDirection::Receive,
1720            file_name: display_path(&entry.relative_path),
1721            bytes: entry.size,
1722            blake3: actual_hash.clone(),
1723            status,
1724        });
1725        summaries.push(TransferFileSummary {
1726            path: final_path,
1727            relative_path: entry.relative_path.clone(),
1728            bytes: entry.size,
1729            blake3: actual_hash,
1730            status,
1731        });
1732    }
1733
1734    let summary = summary_from_files(&manifest, summaries)?;
1735    events(TransferEvent::SessionFinished {
1736        direction: TransferDirection::Receive,
1737        files_total: summary.files.len(),
1738        bytes_total: summary.bytes,
1739        blake3: summary.blake3.clone(),
1740    });
1741    Ok(summary)
1742}
1743
1744async fn send_payload_file(
1745    send: &mut quinn::SendStream,
1746    source_path: &Path,
1747    entry: &ManifestEntry,
1748    offset: u64,
1749    control: &TransferControl,
1750    events: &mut impl FnMut(TransferEvent),
1751) -> Result<()> {
1752    if offset > entry.size {
1753        return Err(Error::Protocol(format!(
1754            "resume offset exceeds file size for {}",
1755            display_path(&entry.relative_path)
1756        )));
1757    }
1758
1759    let header = DirectPayloadHeader {
1760        relative_path: entry.relative_path.clone(),
1761        offset,
1762        bytes: entry.size - offset,
1763    };
1764    write_json_frame(send, &header).await?;
1765    events(TransferEvent::FileStarted {
1766        direction: TransferDirection::Send,
1767        file_name: display_path(&entry.relative_path),
1768        bytes_total: entry.size,
1769        resume_offset: offset,
1770    });
1771
1772    let mut file = tokio::fs::File::open(source_path)
1773        .await
1774        .map_err(|source| Error::IoPath {
1775            path: source_path.to_path_buf(),
1776            source,
1777        })?;
1778    file.seek(SeekFrom::Start(offset))
1779        .await
1780        .map_err(|source| Error::IoPath {
1781            path: source_path.to_path_buf(),
1782            source,
1783        })?;
1784
1785    let mut buffer = vec![0; IO_BUFFER_SIZE];
1786    let mut bytes_done = offset;
1787    while bytes_done < entry.size {
1788        control.check_cancelled()?;
1789        let remaining = entry.size - bytes_done;
1790        let read_size = buffer.len().min(remaining as usize);
1791        let read = file
1792            .read(&mut buffer[..read_size])
1793            .await
1794            .map_err(|source| Error::IoPath {
1795                path: source_path.to_path_buf(),
1796                source,
1797            })?;
1798        if read == 0 {
1799            return Err(Error::Protocol(format!(
1800                "source file ended early: {}",
1801                display_path(source_path)
1802            )));
1803        }
1804        send.write_all(&buffer[..read]).await?;
1805        bytes_done += read as u64;
1806        events(TransferEvent::Progress {
1807            direction: TransferDirection::Send,
1808            file_name: display_path(&entry.relative_path),
1809            bytes_done,
1810            bytes_total: entry.size,
1811        });
1812        control.check_cancelled()?;
1813    }
1814
1815    Ok(())
1816}
1817
1818async fn receive_payload_file(
1819    recv: &mut quinn::RecvStream,
1820    destination: &Path,
1821    entry: &ManifestEntry,
1822    offset: u64,
1823    control: &TransferControl,
1824    events: &mut impl FnMut(TransferEvent),
1825) -> Result<()> {
1826    let header: DirectPayloadHeader = read_json_frame(recv).await?;
1827    if header.relative_path != entry.relative_path
1828        || header.offset != offset
1829        || header.bytes != entry.size - offset
1830    {
1831        return Err(Error::Protocol(format!(
1832            "payload header does not match receive plan for {}",
1833            display_path(&entry.relative_path)
1834        )));
1835    }
1836    events(TransferEvent::FileStarted {
1837        direction: TransferDirection::Receive,
1838        file_name: display_path(&entry.relative_path),
1839        bytes_total: entry.size,
1840        resume_offset: offset,
1841    });
1842
1843    let final_path = safe_destination_path(destination, &entry.relative_path)?;
1844    if let Some(parent) = final_path.parent() {
1845        tokio::fs::create_dir_all(parent)
1846            .await
1847            .map_err(|source| Error::IoPath {
1848                path: parent.to_path_buf(),
1849                source,
1850            })?;
1851    }
1852
1853    let write_path = if offset == 0 {
1854        temp_path_for(&final_path)
1855    } else {
1856        final_path.clone()
1857    };
1858
1859    let mut file = if offset == 0 {
1860        tokio::fs::File::create(&write_path)
1861            .await
1862            .map_err(|source| Error::IoPath {
1863                path: write_path.clone(),
1864                source,
1865            })?
1866    } else {
1867        let mut file = tokio::fs::OpenOptions::new()
1868            .write(true)
1869            .open(&write_path)
1870            .await
1871            .map_err(|source| Error::IoPath {
1872                path: write_path.clone(),
1873                source,
1874            })?;
1875        file.set_len(offset).await.map_err(|source| Error::IoPath {
1876            path: write_path.clone(),
1877            source,
1878        })?;
1879        file.seek(SeekFrom::Start(offset))
1880            .await
1881            .map_err(|source| Error::IoPath {
1882                path: write_path.clone(),
1883                source,
1884            })?;
1885        file
1886    };
1887
1888    let mut bytes_done = offset;
1889    let mut bytes_remaining = header.bytes;
1890    let mut buffer = vec![0; IO_BUFFER_SIZE];
1891    while bytes_remaining > 0 {
1892        if control.is_cancelled() {
1893            if offset == 0 {
1894                let _ = tokio::fs::remove_file(&write_path).await;
1895            }
1896            return Err(Error::TransferCancelled);
1897        }
1898        let read_size = buffer.len().min(bytes_remaining as usize);
1899        let read = match recv.read(&mut buffer[..read_size]).await {
1900            Ok(Some(read)) => read,
1901            Ok(None) => {
1902                if offset == 0 {
1903                    let _ = tokio::fs::remove_file(&write_path).await;
1904                }
1905                return Err(Error::Protocol(
1906                    "stream ended before file payload".to_string(),
1907                ));
1908            }
1909            Err(error) => {
1910                if offset == 0 {
1911                    let _ = tokio::fs::remove_file(&write_path).await;
1912                }
1913                return Err(Error::Read(error));
1914            }
1915        };
1916
1917        file.write_all(&buffer[..read])
1918            .await
1919            .map_err(|source| Error::IoPath {
1920                path: write_path.clone(),
1921                source,
1922            })?;
1923        bytes_done += read as u64;
1924        bytes_remaining -= read as u64;
1925        events(TransferEvent::Progress {
1926            direction: TransferDirection::Receive,
1927            file_name: display_path(&entry.relative_path),
1928            bytes_done,
1929            bytes_total: entry.size,
1930        });
1931        if control.is_cancelled() {
1932            if offset == 0 {
1933                let _ = tokio::fs::remove_file(&write_path).await;
1934            }
1935            return Err(Error::TransferCancelled);
1936        }
1937    }
1938
1939    file.flush().await.map_err(|source| Error::IoPath {
1940        path: write_path.clone(),
1941        source,
1942    })?;
1943    drop(file);
1944
1945    if offset == 0 {
1946        tokio::fs::rename(&write_path, &final_path)
1947            .await
1948            .map_err(|source| Error::IoPath {
1949                path: final_path,
1950                source,
1951            })?;
1952    }
1953
1954    Ok(())
1955}
1956
1957async fn write_json_frame<T: Serialize>(send: &mut quinn::SendStream, value: &T) -> Result<()> {
1958    let bytes = serde_json::to_vec(value).map_err(|error| Error::Protocol(error.to_string()))?;
1959    if bytes.len() > MAX_FRAME_LEN {
1960        return Err(Error::Protocol("frame exceeds maximum length".to_string()));
1961    }
1962    send.write_all(&(bytes.len() as u32).to_be_bytes()).await?;
1963    send.write_all(&bytes).await?;
1964    Ok(())
1965}
1966
1967async fn read_json_frame<T: for<'de> Deserialize<'de>>(recv: &mut quinn::RecvStream) -> Result<T> {
1968    let mut len = [0; 4];
1969    recv.read_exact(&mut len).await?;
1970    let len = u32::from_be_bytes(len) as usize;
1971    if len > MAX_FRAME_LEN {
1972        return Err(Error::Protocol("frame exceeds maximum length".to_string()));
1973    }
1974
1975    let mut bytes = vec![0; len];
1976    recv.read_exact(&mut bytes).await?;
1977    serde_json::from_slice(&bytes).map_err(|error| Error::Protocol(error.to_string()))
1978}
1979
1980fn negotiate_direct_protocol(
1981    hello: &DirectProtocolHello,
1982    psk: Option<&str>,
1983) -> DirectProtocolResponse {
1984    if hello.min_protocol_version > NATIVE_PROTOCOL_VERSION {
1985        return DirectProtocolResponse::reject(format!(
1986            "peer requires protocol version {}, local max is {}",
1987            hello.min_protocol_version, NATIVE_PROTOCOL_VERSION
1988        ));
1989    }
1990
1991    if hello.protocol_version < MIN_NATIVE_PROTOCOL_VERSION {
1992        return DirectProtocolResponse::reject(format!(
1993            "peer max protocol version {} is below local minimum {}",
1994            hello.protocol_version, MIN_NATIVE_PROTOCOL_VERSION
1995        ));
1996    }
1997
1998    let selected_version = NATIVE_PROTOCOL_VERSION.min(hello.protocol_version);
1999    match (psk, hello.psk) {
2000        (Some(_), false) => DirectProtocolResponse::reject("receiver requires PSK authentication"),
2001        (Some(_), true) => {
2002            DirectProtocolResponse::accept_with_psk(selected_version, Uuid::new_v4().to_string())
2003        }
2004        (None, true) => DirectProtocolResponse::reject("receiver is not configured for PSK mode"),
2005        (None, false) => DirectProtocolResponse::accept(selected_version),
2006    }
2007}
2008
2009#[derive(Debug, Clone)]
2010struct ManifestSource {
2011    source_path: PathBuf,
2012    relative_path: PathBuf,
2013}
2014
2015fn collect_manifest_sources(paths: &[PathBuf]) -> Result<Vec<ManifestSource>> {
2016    let mut sources = Vec::new();
2017    for path in paths {
2018        let metadata = std::fs::symlink_metadata(path).map_err(|source| Error::IoPath {
2019            path: path.clone(),
2020            source,
2021        })?;
2022        if metadata.file_type().is_symlink() {
2023            return Err(Error::InvalidInput(format!(
2024                "{} is a symlink; symlink transfers are not supported",
2025                display_path(path)
2026            )));
2027        }
2028
2029        let root_name = path
2030            .file_name()
2031            .ok_or_else(|| Error::InvalidInput(format!("{} has no file name", display_path(path))))?
2032            .to_os_string();
2033        let relative_path = PathBuf::from(root_name);
2034
2035        if metadata.is_dir() {
2036            sources.push(ManifestSource {
2037                source_path: path.clone(),
2038                relative_path: relative_path.clone(),
2039            });
2040            collect_dir_sources(path, &relative_path, &mut sources)?;
2041        } else if metadata.is_file() {
2042            sources.push(ManifestSource {
2043                source_path: path.clone(),
2044                relative_path,
2045            });
2046        } else {
2047            return Err(Error::InvalidInput(format!(
2048                "{} is not a regular file or directory",
2049                display_path(path)
2050            )));
2051        }
2052    }
2053
2054    Ok(sources)
2055}
2056
2057fn collect_dir_sources(
2058    dir: &Path,
2059    relative_dir: &Path,
2060    sources: &mut Vec<ManifestSource>,
2061) -> Result<()> {
2062    for entry in std::fs::read_dir(dir).map_err(|source| Error::IoPath {
2063        path: dir.to_path_buf(),
2064        source,
2065    })? {
2066        let entry = entry.map_err(|source| Error::IoPath {
2067            path: dir.to_path_buf(),
2068            source,
2069        })?;
2070        let path = entry.path();
2071        let metadata = std::fs::symlink_metadata(&path).map_err(|source| Error::IoPath {
2072            path: path.clone(),
2073            source,
2074        })?;
2075        if metadata.file_type().is_symlink() {
2076            return Err(Error::InvalidInput(format!(
2077                "{} is a symlink; symlink transfers are not supported",
2078                display_path(&path)
2079            )));
2080        }
2081
2082        let relative_path = relative_dir.join(entry.file_name());
2083        if metadata.is_dir() {
2084            sources.push(ManifestSource {
2085                source_path: path.clone(),
2086                relative_path: relative_path.clone(),
2087            });
2088            collect_dir_sources(&path, &relative_path, sources)?;
2089        } else if metadata.is_file() {
2090            sources.push(ManifestSource {
2091                source_path: path,
2092                relative_path,
2093            });
2094        }
2095    }
2096
2097    Ok(())
2098}
2099
2100fn manifest_source_map(paths: &[PathBuf]) -> Result<BTreeMap<PathBuf, PathBuf>> {
2101    Ok(collect_manifest_sources(paths)?
2102        .into_iter()
2103        .filter_map(|source| {
2104            let metadata = std::fs::metadata(&source.source_path).ok()?;
2105            metadata
2106                .is_file()
2107                .then_some((source.relative_path, source.source_path))
2108        })
2109        .collect())
2110}
2111
2112async fn build_manifest_entry(source: ManifestSource) -> Result<ManifestEntry> {
2113    validate_relative_transfer_path(&source.relative_path)?;
2114    let metadata = tokio::fs::metadata(&source.source_path)
2115        .await
2116        .map_err(|source_error| Error::IoPath {
2117            path: source.source_path.clone(),
2118            source: source_error,
2119        })?;
2120    let unix_mode = unix_mode(&metadata);
2121    let modified_unix_ms = modified_unix_ms(&metadata);
2122
2123    if metadata.is_dir() {
2124        return Ok(ManifestEntry {
2125            relative_path: source.relative_path,
2126            kind: ManifestEntryKind::Directory,
2127            size: 0,
2128            blake3: None,
2129            chunks: Vec::new(),
2130            unix_mode,
2131            modified_unix_ms,
2132        });
2133    }
2134
2135    let (blake3, chunks) = hash_file_with_chunks(&source.source_path, DEFAULT_CHUNK_SIZE).await?;
2136    Ok(ManifestEntry {
2137        relative_path: source.relative_path,
2138        kind: ManifestEntryKind::File,
2139        size: metadata.len(),
2140        blake3: Some(blake3),
2141        chunks,
2142        unix_mode,
2143        modified_unix_ms,
2144    })
2145}
2146
2147fn validate_manifest(manifest: &TransferManifest) -> Result<()> {
2148    if manifest.version != MANIFEST_VERSION {
2149        return Err(Error::Protocol(format!(
2150            "unsupported manifest version {}",
2151            manifest.version
2152        )));
2153    }
2154    if manifest.chunk_size == 0 {
2155        return Err(Error::Protocol("manifest chunk size is zero".to_string()));
2156    }
2157
2158    let mut seen = BTreeSet::new();
2159    for entry in &manifest.entries {
2160        validate_relative_transfer_path(&entry.relative_path)?;
2161        if !seen.insert(entry.relative_path.clone()) {
2162            return Err(Error::Protocol(format!(
2163                "duplicate manifest path: {}",
2164                display_path(&entry.relative_path)
2165            )));
2166        }
2167        if entry.kind == ManifestEntryKind::File && entry.blake3.is_none() {
2168            return Err(Error::Protocol(format!(
2169                "file manifest entry is missing BLAKE3: {}",
2170                display_path(&entry.relative_path)
2171            )));
2172        }
2173    }
2174
2175    Ok(())
2176}
2177
2178fn validate_relative_transfer_path(path: &Path) -> Result<()> {
2179    if path.as_os_str().is_empty() || path.is_absolute() {
2180        return Err(Error::InvalidInput(format!(
2181            "unsafe transfer path: {}",
2182            display_path(path)
2183        )));
2184    }
2185
2186    let mut normal_components = 0usize;
2187    for component in path.components() {
2188        match component {
2189            Component::Normal(value) => {
2190                let Some(name) = value.to_str() else {
2191                    return Err(Error::InvalidInput(format!(
2192                        "transfer path must be UTF-8: {}",
2193                        display_path(path)
2194                    )));
2195                };
2196                validate_path_component(name, path)?;
2197                normal_components += 1;
2198            }
2199            Component::CurDir
2200            | Component::ParentDir
2201            | Component::RootDir
2202            | Component::Prefix(_) => {
2203                return Err(Error::InvalidInput(format!(
2204                    "unsafe transfer path: {}",
2205                    display_path(path)
2206                )));
2207            }
2208        }
2209    }
2210
2211    if normal_components == 0 {
2212        return Err(Error::InvalidInput(format!(
2213            "unsafe transfer path: {}",
2214            display_path(path)
2215        )));
2216    }
2217
2218    Ok(())
2219}
2220
2221fn validate_path_component(component: &str, full_path: &Path) -> Result<()> {
2222    if component.is_empty()
2223        || component.contains('\\')
2224        || component.contains('/')
2225        || component.contains(':')
2226        || component.ends_with(' ')
2227        || component.ends_with('.')
2228        || is_windows_reserved_name(component)
2229    {
2230        return Err(Error::InvalidInput(format!(
2231            "unsafe transfer path: {}",
2232            display_path(full_path)
2233        )));
2234    }
2235
2236    Ok(())
2237}
2238
2239fn is_windows_reserved_name(component: &str) -> bool {
2240    let stem = component
2241        .split('.')
2242        .next()
2243        .unwrap_or(component)
2244        .trim_end_matches([' ', '.'])
2245        .to_ascii_uppercase();
2246    matches!(
2247        stem.as_str(),
2248        "CON"
2249            | "PRN"
2250            | "AUX"
2251            | "NUL"
2252            | "COM1"
2253            | "COM2"
2254            | "COM3"
2255            | "COM4"
2256            | "COM5"
2257            | "COM6"
2258            | "COM7"
2259            | "COM8"
2260            | "COM9"
2261            | "LPT1"
2262            | "LPT2"
2263            | "LPT3"
2264            | "LPT4"
2265            | "LPT5"
2266            | "LPT6"
2267            | "LPT7"
2268            | "LPT8"
2269            | "LPT9"
2270    )
2271}
2272
2273async fn hash_file(path: &Path) -> Result<String> {
2274    hash_file_with_chunks(path, DEFAULT_CHUNK_SIZE)
2275        .await
2276        .map(|(hash, _)| hash)
2277}
2278
2279async fn hash_file_with_chunks(path: &Path, chunk_size: u64) -> Result<(String, Vec<String>)> {
2280    let mut file = tokio::fs::File::open(path)
2281        .await
2282        .map_err(|source| Error::IoPath {
2283            path: path.to_path_buf(),
2284            source,
2285        })?;
2286    let mut hasher = blake3::Hasher::new();
2287    let mut chunk_hasher = blake3::Hasher::new();
2288    let mut chunk_bytes = 0u64;
2289    let mut chunks = Vec::new();
2290    let mut buffer = vec![0; IO_BUFFER_SIZE];
2291
2292    loop {
2293        let read = file
2294            .read(&mut buffer)
2295            .await
2296            .map_err(|source| Error::IoPath {
2297                path: path.to_path_buf(),
2298                source,
2299            })?;
2300        if read == 0 {
2301            break;
2302        }
2303        hasher.update(&buffer[..read]);
2304
2305        let mut remaining = &buffer[..read];
2306        while !remaining.is_empty() {
2307            let take = remaining.len().min((chunk_size - chunk_bytes) as usize);
2308            chunk_hasher.update(&remaining[..take]);
2309            chunk_bytes += take as u64;
2310            remaining = &remaining[take..];
2311            if chunk_bytes == chunk_size {
2312                chunks.push(chunk_hasher.finalize().to_hex().to_string());
2313                chunk_hasher = blake3::Hasher::new();
2314                chunk_bytes = 0;
2315            }
2316        }
2317    }
2318
2319    if chunk_bytes > 0 {
2320        chunks.push(chunk_hasher.finalize().to_hex().to_string());
2321    }
2322
2323    Ok((hasher.finalize().to_hex().to_string(), chunks))
2324}
2325
2326async fn verified_prefix_offset(
2327    path: &Path,
2328    entry: &ManifestEntry,
2329    existing_len: u64,
2330    chunk_size: u64,
2331) -> Result<u64> {
2332    if existing_len == 0 || entry.chunks.is_empty() {
2333        return Ok(0);
2334    }
2335
2336    let chunks_to_check = (existing_len / chunk_size).min(entry.chunks.len() as u64) as usize;
2337    if chunks_to_check == 0 {
2338        return Ok(0);
2339    }
2340
2341    let mut file = tokio::fs::File::open(path)
2342        .await
2343        .map_err(|source| Error::IoPath {
2344            path: path.to_path_buf(),
2345            source,
2346        })?;
2347    let mut buffer = vec![0; IO_BUFFER_SIZE];
2348    let mut verified_chunks = 0usize;
2349
2350    for expected in entry.chunks.iter().take(chunks_to_check) {
2351        let mut remaining = chunk_size;
2352        let mut hasher = blake3::Hasher::new();
2353        while remaining > 0 {
2354            let read_size = buffer.len().min(remaining as usize);
2355            file.read_exact(&mut buffer[..read_size])
2356                .await
2357                .map_err(|source| Error::IoPath {
2358                    path: path.to_path_buf(),
2359                    source,
2360                })?;
2361            hasher.update(&buffer[..read_size]);
2362            remaining -= read_size as u64;
2363        }
2364
2365        if hasher.finalize().to_hex().as_str() != expected {
2366            break;
2367        }
2368        verified_chunks += 1;
2369    }
2370
2371    Ok(verified_chunks as u64 * chunk_size)
2372}
2373
2374fn summary_from_files(
2375    manifest: &TransferManifest,
2376    files: Vec<TransferFileSummary>,
2377) -> Result<TransferSummary> {
2378    let bytes = manifest.total_file_bytes();
2379    let blake3 = if files.len() == 1 {
2380        files[0].blake3.clone()
2381    } else {
2382        manifest.digest()?
2383    };
2384    let file_name = if files.len() == 1 {
2385        display_path(&files[0].relative_path)
2386    } else {
2387        format!("{} files", files.len())
2388    };
2389    let path = files
2390        .first()
2391        .map(|file| file.path.clone())
2392        .unwrap_or_default();
2393
2394    Ok(TransferSummary {
2395        path,
2396        file_name,
2397        bytes,
2398        blake3,
2399        files,
2400    })
2401}
2402
2403fn temp_path_for(final_path: &Path) -> PathBuf {
2404    let file_name = final_path
2405        .file_name()
2406        .and_then(|name| name.to_str())
2407        .unwrap_or("transfer");
2408    final_path.with_file_name(format!(".{file_name}.{}.ferry-part", Uuid::new_v4()))
2409}
2410
2411fn modified_unix_ms(metadata: &Metadata) -> Option<i128> {
2412    system_time_unix_ms(metadata.modified().ok()?)
2413}
2414
2415fn system_time_unix_ms(time: SystemTime) -> Option<i128> {
2416    match time.duration_since(UNIX_EPOCH) {
2417        Ok(duration) => Some(duration.as_millis() as i128),
2418        Err(error) => Some(-(error.duration().as_millis() as i128)),
2419    }
2420}
2421
2422#[cfg(unix)]
2423fn unix_mode(metadata: &Metadata) -> Option<u32> {
2424    use std::os::unix::fs::PermissionsExt;
2425
2426    Some(metadata.permissions().mode())
2427}
2428
2429#[cfg(not(unix))]
2430fn unix_mode(_metadata: &Metadata) -> Option<u32> {
2431    None
2432}
2433
2434fn client_config_capturing_server_fingerprint()
2435-> Result<(quinn::ClientConfig, Arc<Mutex<Option<String>>>)> {
2436    let server_fingerprint = Arc::new(Mutex::new(None));
2437    let crypto = rustls::ClientConfig::builder()
2438        .dangerous()
2439        .with_custom_certificate_verifier(ServerFingerprintVerifier::new(
2440            server_fingerprint.clone(),
2441        ))
2442        .with_no_client_auth();
2443
2444    let config = quinn::ClientConfig::new(Arc::new(
2445        QuicClientConfig::try_from(crypto)
2446            .map_err(|error| Error::CryptoConfig(error.to_string()))?,
2447    ));
2448    Ok((config, server_fingerprint))
2449}
2450
2451fn captured_server_fingerprint(fingerprint: &Arc<Mutex<Option<String>>>) -> Result<String> {
2452    fingerprint
2453        .lock()
2454        .map_err(|_| Error::CryptoConfig("server fingerprint capture lock poisoned".to_string()))?
2455        .clone()
2456        .ok_or_else(|| {
2457            Error::CryptoConfig("server certificate fingerprint was not captured".to_string())
2458        })
2459}
2460
2461#[derive(Debug)]
2462struct ServerFingerprintVerifier {
2463    provider: Arc<CryptoProvider>,
2464    server_fingerprint: Arc<Mutex<Option<String>>>,
2465}
2466
2467impl ServerFingerprintVerifier {
2468    fn new(server_fingerprint: Arc<Mutex<Option<String>>>) -> Arc<Self> {
2469        Arc::new(Self {
2470            provider: Arc::new(rustls::crypto::ring::default_provider()),
2471            server_fingerprint,
2472        })
2473    }
2474}
2475
2476impl ServerCertVerifier for ServerFingerprintVerifier {
2477    fn verify_server_cert(
2478        &self,
2479        end_entity: &CertificateDer<'_>,
2480        _intermediates: &[CertificateDer<'_>],
2481        _server_name: &ServerName<'_>,
2482        _ocsp: &[u8],
2483        _now: UnixTime,
2484    ) -> std::result::Result<ServerCertVerified, rustls::Error> {
2485        let fingerprint = blake3::hash(end_entity.as_ref()).to_hex().to_string();
2486        *self.server_fingerprint.lock().map_err(|_| {
2487            rustls::Error::General("server fingerprint capture lock poisoned".to_string())
2488        })? = Some(fingerprint);
2489        Ok(ServerCertVerified::assertion())
2490    }
2491
2492    fn verify_tls12_signature(
2493        &self,
2494        message: &[u8],
2495        cert: &CertificateDer<'_>,
2496        dss: &DigitallySignedStruct,
2497    ) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
2498        verify_tls12_signature(
2499            message,
2500            cert,
2501            dss,
2502            &self.provider.signature_verification_algorithms,
2503        )
2504    }
2505
2506    fn verify_tls13_signature(
2507        &self,
2508        message: &[u8],
2509        cert: &CertificateDer<'_>,
2510        dss: &DigitallySignedStruct,
2511    ) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
2512        verify_tls13_signature(
2513            message,
2514            cert,
2515            dss,
2516            &self.provider.signature_verification_algorithms,
2517        )
2518    }
2519
2520    fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
2521        self.provider
2522            .signature_verification_algorithms
2523            .supported_schemes()
2524    }
2525}
2526
2527fn ensure_rustls_provider() {
2528    let _ = rustls::crypto::ring::default_provider().install_default();
2529}
2530
2531pub fn config_dir() -> Result<PathBuf> {
2532    let base_dirs = BaseDirs::new().ok_or(Error::ConfigDirUnavailable)?;
2533    Ok(base_dirs.config_dir().join(CONFIG_DIR_NAME))
2534}
2535
2536fn default_alias() -> String {
2537    std::env::var("HOSTNAME")
2538        .or_else(|_| std::env::var("COMPUTERNAME"))
2539        .ok()
2540        .filter(|value| !value.trim().is_empty())
2541        .unwrap_or_else(|| "fileferry-device".to_string())
2542}
2543
2544fn expand_home_path(path: &str) -> Result<PathBuf> {
2545    if path == "~" {
2546        return Ok(BaseDirs::new()
2547            .ok_or(Error::ConfigDirUnavailable)?
2548            .home_dir()
2549            .to_path_buf());
2550    }
2551
2552    if let Some(rest) = path.strip_prefix("~/") {
2553        return Ok(BaseDirs::new()
2554            .ok_or(Error::ConfigDirUnavailable)?
2555            .home_dir()
2556            .join(rest));
2557    }
2558
2559    Ok(PathBuf::from(path))
2560}
2561
2562fn normalized_fingerprint(fingerprint: String) -> Result<String> {
2563    let fingerprint = fingerprint.trim().to_ascii_lowercase();
2564    if fingerprint.len() != 64 || !fingerprint.chars().all(|char| char.is_ascii_hexdigit()) {
2565        return Err(Error::InvalidInput(
2566            "peer fingerprint must be a full 64-character hex BLAKE3 fingerprint".to_string(),
2567        ));
2568    }
2569
2570    Ok(fingerprint)
2571}
2572
2573fn validate_psk(psk: String) -> Result<String> {
2574    if psk.is_empty() {
2575        return Err(Error::InvalidInput("PSK must not be empty".to_string()));
2576    }
2577
2578    Ok(psk)
2579}
2580
2581fn psk_proof(
2582    psk: &str,
2583    challenge: &str,
2584    client_fingerprint: &str,
2585    protocol_version: u16,
2586) -> Result<String> {
2587    validate_psk(psk.to_string())?;
2588    let key = blake3::hash(psk.as_bytes());
2589    let mut message = Vec::new();
2590    message.extend_from_slice(PSK_PROOF_CONTEXT);
2591    message.push(0);
2592    message.extend_from_slice(challenge.as_bytes());
2593    message.push(0);
2594    message.extend_from_slice(client_fingerprint.as_bytes());
2595    message.push(0);
2596    message.extend_from_slice(protocol_version.to_string().as_bytes());
2597    Ok(blake3::keyed_hash(key.as_bytes(), &message)
2598        .to_hex()
2599        .to_string())
2600}
2601
2602fn write_toml_file<T: Serialize>(path: &Path, value: &T) -> Result<()> {
2603    let bytes =
2604        toml::to_string_pretty(value).map_err(|error| Error::ConfigSerialize(error.to_string()))?;
2605    if let Some(parent) = path.parent() {
2606        std::fs::create_dir_all(parent).map_err(|source| Error::IoPath {
2607            path: parent.to_path_buf(),
2608            source,
2609        })?;
2610    }
2611
2612    let temp_path = path.with_extension(format!("tmp-{}", Uuid::new_v4()));
2613    std::fs::write(&temp_path, bytes).map_err(|source| Error::IoPath {
2614        path: temp_path.clone(),
2615        source,
2616    })?;
2617    std::fs::rename(&temp_path, path).map_err(|source| Error::IoPath {
2618        path: path.to_path_buf(),
2619        source,
2620    })?;
2621    Ok(())
2622}
2623
2624fn write_private_file(path: &Path, bytes: &[u8]) -> Result<()> {
2625    std::fs::write(path, bytes).map_err(|source| Error::IoPath {
2626        path: path.to_path_buf(),
2627        source,
2628    })?;
2629
2630    #[cfg(unix)]
2631    {
2632        use std::os::unix::fs::PermissionsExt;
2633
2634        let mut permissions = std::fs::metadata(path)
2635            .map_err(|source| Error::IoPath {
2636                path: path.to_path_buf(),
2637                source,
2638            })?
2639            .permissions();
2640        permissions.set_mode(0o600);
2641        std::fs::set_permissions(path, permissions).map_err(|source| Error::IoPath {
2642            path: path.to_path_buf(),
2643            source,
2644        })?;
2645    }
2646
2647    Ok(())
2648}
2649
2650#[cfg(test)]
2651mod tests {
2652    use super::*;
2653    use std::sync::{Arc, Mutex};
2654
2655    #[test]
2656    fn direct_peer_parses_socket_address() {
2657        let peer = DirectPeer::parse("127.0.0.1:53318").expect("address parses");
2658
2659        assert_eq!(peer.address().to_string(), "127.0.0.1:53318");
2660        assert_eq!(peer.expected_fingerprint(), None);
2661    }
2662
2663    #[test]
2664    fn direct_peer_accepts_expected_fingerprint() {
2665        let fingerprint = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
2666        let peer = DirectPeer::parse("127.0.0.1:53318")
2667            .expect("address parses")
2668            .with_expected_fingerprint(fingerprint)
2669            .expect("fingerprint parses");
2670
2671        assert_eq!(peer.expected_fingerprint(), Some(fingerprint));
2672    }
2673
2674    #[test]
2675    fn direct_peer_rejects_missing_port() {
2676        assert!(DirectPeer::parse("127.0.0.1").is_err());
2677    }
2678
2679    #[test]
2680    fn send_request_requires_at_least_one_path() {
2681        let peer = DirectPeer::parse("127.0.0.1:53318").expect("address parses");
2682
2683        assert!(SendRequest::new(peer, Vec::new()).is_err());
2684    }
2685
2686    #[test]
2687    fn psk_secret_is_redacted_in_debug_and_serialization() {
2688        let secret = PskSecret::new("do-not-print").expect("psk accepted");
2689
2690        assert_eq!(format!("{secret:?}"), "PskSecret(<redacted>)");
2691        assert_eq!(
2692            serde_json::to_string(&secret).expect("secret serializes"),
2693            r#""<redacted>""#
2694        );
2695
2696        let peer = DirectPeer::parse("127.0.0.1:53318").expect("address parses");
2697        let request = SendRequest::new(peer, vec![PathBuf::from("file.txt")])
2698            .expect("send request")
2699            .with_psk("do-not-print")
2700            .expect("psk accepted");
2701        assert!(!format!("{request:?}").contains("do-not-print"));
2702    }
2703
2704    #[test]
2705    fn validate_send_paths_rejects_empty_slice() {
2706        assert!(validate_send_paths(&[]).is_err());
2707    }
2708
2709    #[tokio::test]
2710    async fn manifest_walks_directory_and_serializes_entries() {
2711        let temp = tempfile::tempdir().expect("tempdir");
2712        let root = temp.path().join("bundle");
2713        let nested = root.join("nested");
2714        tokio::fs::create_dir_all(&nested).await.expect("dirs");
2715        tokio::fs::write(root.join("a.txt"), b"alpha")
2716            .await
2717            .expect("file a");
2718        tokio::fs::write(nested.join("b.txt"), b"beta")
2719            .await
2720            .expect("file b");
2721
2722        let manifest = build_manifest(&[root]).await.expect("manifest");
2723        let json = serde_json::to_string(&manifest).expect("manifest serializes");
2724        let decoded: TransferManifest = serde_json::from_str(&json).expect("manifest decodes");
2725
2726        assert_eq!(decoded.version, MANIFEST_VERSION);
2727        assert!(decoded.entries.iter().any(|entry| {
2728            entry.kind == ManifestEntryKind::Directory
2729                && entry.relative_path == Path::new("bundle/nested")
2730        }));
2731        assert!(decoded.entries.iter().any(|entry| {
2732            entry.kind == ManifestEntryKind::File
2733                && entry.relative_path == Path::new("bundle/nested/b.txt")
2734                && entry.size == 4
2735                && entry.blake3.is_some()
2736        }));
2737    }
2738
2739    #[test]
2740    fn safe_destination_path_rejects_unsafe_paths() {
2741        let dest = Path::new("/tmp/ferry-dest");
2742
2743        assert!(safe_destination_path(dest, Path::new("../escape.txt")).is_err());
2744        assert!(safe_destination_path(dest, Path::new("/absolute.txt")).is_err());
2745        assert!(safe_destination_path(dest, Path::new("nested/CON")).is_err());
2746        assert!(safe_destination_path(dest, Path::new("nested\\escape.txt")).is_err());
2747        assert!(safe_destination_path(dest, Path::new("nested/ok.txt")).is_ok());
2748    }
2749
2750    #[test]
2751    fn safe_destination_path_rejects_generated_escape_and_reserved_patterns() {
2752        let dest = Path::new("/tmp/ferry-dest");
2753        let unsafe_components = [
2754            "..",
2755            ".",
2756            "CON",
2757            "con.txt",
2758            "NUL",
2759            "COM1",
2760            "LPT9",
2761            "trailingspace ",
2762            "trailingdot.",
2763            "has:colon",
2764            "has\\backslash",
2765        ];
2766
2767        for component in unsafe_components {
2768            assert!(
2769                safe_destination_path(dest, Path::new(component)).is_err(),
2770                "{component} should be rejected as a top-level path"
2771            );
2772            if component != "." {
2773                assert!(
2774                    safe_destination_path(dest, Path::new("safe").join(component).as_path())
2775                        .is_err(),
2776                    "{component} should be rejected as a nested path"
2777                );
2778            }
2779        }
2780
2781        for component in ["alpha", "alpha-1", "alpha_1", "report.final.txt"] {
2782            let path = safe_destination_path(dest, Path::new("safe").join(component).as_path())
2783                .expect("safe generated path accepted");
2784            assert!(path.starts_with(dest));
2785        }
2786    }
2787
2788    #[tokio::test]
2789    async fn resume_decision_skips_existing_matching_file() {
2790        let temp = tempfile::tempdir().expect("tempdir");
2791        let source = temp.path().join("source.txt");
2792        let dest = temp.path().join("dest");
2793        tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2794        tokio::fs::write(&source, b"same contents")
2795            .await
2796            .expect("source");
2797        tokio::fs::write(dest.join("source.txt"), b"same contents")
2798            .await
2799            .expect("dest file");
2800        let manifest = build_manifest(&[source]).await.expect("manifest");
2801        let entry = manifest.file_entries().next().expect("file entry");
2802
2803        let decision = decide_resume(&dest, entry, manifest.chunk_size)
2804            .await
2805            .expect("decision");
2806
2807        assert_eq!(decision, ResumeDecision::Skip);
2808    }
2809
2810    #[tokio::test]
2811    async fn resume_decision_returns_verified_chunk_offset() {
2812        let temp = tempfile::tempdir().expect("tempdir");
2813        let source = temp.path().join("large.bin");
2814        let dest = temp.path().join("dest");
2815        tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2816        let bytes = vec![42; (DEFAULT_CHUNK_SIZE as usize * 2) + 128];
2817        tokio::fs::write(&source, &bytes).await.expect("source");
2818        tokio::fs::write(
2819            dest.join("large.bin"),
2820            &bytes[..DEFAULT_CHUNK_SIZE as usize + 99],
2821        )
2822        .await
2823        .expect("partial");
2824        let manifest = build_manifest(&[source]).await.expect("manifest");
2825        let entry = manifest.file_entries().next().expect("file entry");
2826
2827        let decision = decide_resume(&dest, entry, manifest.chunk_size)
2828            .await
2829            .expect("decision");
2830
2831        assert_eq!(decision, ResumeDecision::ResumeFrom(DEFAULT_CHUNK_SIZE));
2832    }
2833
2834    #[tokio::test]
2835    async fn resume_decision_rejects_existing_file_with_mismatched_contents() {
2836        let temp = tempfile::tempdir().expect("tempdir");
2837        let source = temp.path().join("source.txt");
2838        let dest = temp.path().join("dest");
2839        tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2840        tokio::fs::write(&source, b"expected contents")
2841            .await
2842            .expect("source");
2843        tokio::fs::write(dest.join("source.txt"), b"different contents")
2844            .await
2845            .expect("conflicting dest file");
2846        let manifest = build_manifest(&[source]).await.expect("manifest");
2847        let entry = manifest.file_entries().next().expect("file entry");
2848
2849        let decision = decide_resume(&dest, entry, manifest.chunk_size)
2850            .await
2851            .expect("decision");
2852
2853        assert!(matches!(decision, ResumeDecision::Conflict(_)));
2854    }
2855
2856    #[tokio::test]
2857    async fn resume_decision_rejects_existing_file_larger_than_manifest_entry() {
2858        let temp = tempfile::tempdir().expect("tempdir");
2859        let source = temp.path().join("source.txt");
2860        let dest = temp.path().join("dest");
2861        tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2862        tokio::fs::write(&source, b"small").await.expect("source");
2863        tokio::fs::write(dest.join("source.txt"), b"larger destination")
2864            .await
2865            .expect("larger dest file");
2866        let manifest = build_manifest(&[source]).await.expect("manifest");
2867        let entry = manifest.file_entries().next().expect("file entry");
2868
2869        let decision = decide_resume(&dest, entry, manifest.chunk_size)
2870            .await
2871            .expect("decision");
2872
2873        assert!(matches!(decision, ResumeDecision::Conflict(_)));
2874    }
2875
2876    #[test]
2877    fn identity_is_loaded_after_generation() {
2878        let temp = tempfile::tempdir().expect("tempdir");
2879        let generated =
2880            NativeIdentity::load_or_generate_in(temp.path()).expect("identity generated");
2881        let loaded = NativeIdentity::load_or_generate_in(temp.path()).expect("identity loaded");
2882
2883        assert_eq!(generated.fingerprint(), loaded.fingerprint());
2884    }
2885
2886    #[test]
2887    fn app_config_round_trips_toml() {
2888        let temp = tempfile::tempdir().expect("tempdir");
2889        let path = temp.path().join("config.toml");
2890        let config = AppConfig {
2891            alias: "desk".to_string(),
2892            ..AppConfig::default()
2893        };
2894
2895        config.save_to_path(&path).expect("config saved");
2896        let loaded = AppConfig::load_or_default_from(&path).expect("config loaded");
2897
2898        assert_eq!(loaded.alias, "desk");
2899        assert_eq!(loaded.listen_port, 53317);
2900        assert!(loaded.trust.require_fingerprint);
2901    }
2902
2903    #[test]
2904    fn app_config_redacts_psk_for_display_output() {
2905        let config = AppConfig {
2906            trust: TrustConfig {
2907                psk: "super-secret-psk".to_string(),
2908                ..TrustConfig::default()
2909            },
2910            ..AppConfig::default()
2911        };
2912
2913        let redacted = config.to_redacted_toml_string().expect("config serializes");
2914
2915        assert!(!redacted.contains("super-secret-psk"));
2916        assert!(redacted.contains(r#"psk = "<redacted>""#));
2917    }
2918
2919    #[test]
2920    fn daemon_config_defaults_from_app_config_and_round_trips_toml() {
2921        let temp = tempfile::tempdir().expect("tempdir");
2922        let path = temp.path().join("daemon.toml");
2923        let app_config = AppConfig {
2924            quic_port: 54444,
2925            download_dir: temp.path().join("downloads").display().to_string(),
2926            ..AppConfig::default()
2927        };
2928
2929        let defaulted =
2930            DaemonConfig::load_or_default_from(&path, &app_config).expect("daemon config");
2931        assert_eq!(
2932            defaulted.listen,
2933            SocketAddr::from(([0, 0, 0, 0], app_config.quic_port))
2934        );
2935        assert_eq!(defaulted.destination, temp.path().join("downloads"));
2936
2937        let config = DaemonConfig {
2938            listen: SocketAddr::from(([127, 0, 0, 1], 53318)),
2939            destination: temp.path().join("daemon-dest"),
2940        };
2941        config.save_to_path(&path).expect("daemon config saved");
2942        let loaded =
2943            DaemonConfig::load_or_default_from(&path, &app_config).expect("daemon config loaded");
2944
2945        assert_eq!(loaded, config);
2946    }
2947
2948    #[test]
2949    fn trust_store_load_save_and_forget_round_trip() {
2950        let temp = tempfile::tempdir().expect("tempdir");
2951        let path = temp.path().join("known_peers.toml");
2952        let fingerprint = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
2953        let mut store = TrustStore::default();
2954
2955        store
2956            .trust_fingerprint(fingerprint)
2957            .expect("fingerprint trusted");
2958        store.save_to_path(&path).expect("trust store saved");
2959        let mut loaded = TrustStore::load_or_default_from(&path).expect("trust store loaded");
2960
2961        assert_eq!(loaded.records().count(), 1);
2962        assert_eq!(loaded.peers[fingerprint].trust_state, TrustState::Trusted);
2963        assert!(loaded.forget(fingerprint));
2964        assert_eq!(loaded.records().count(), 0);
2965    }
2966
2967    #[test]
2968    fn trust_store_rejects_short_fingerprint_without_discovery_lookup() {
2969        let mut store = TrustStore::default();
2970
2971        assert!(store.trust_fingerprint("9a4f2c").is_err());
2972    }
2973
2974    #[test]
2975    fn transfer_events_serialize_with_stable_event_names() {
2976        let event = TransferEvent::Progress {
2977            direction: TransferDirection::Send,
2978            file_name: "bundle/a.txt".to_string(),
2979            bytes_done: 5,
2980            bytes_total: 9,
2981        };
2982
2983        let json = serde_json::to_value(&event).expect("event serializes");
2984
2985        assert_eq!(json["event"], "progress");
2986        assert_eq!(json["direction"], "send");
2987        assert_eq!(json["file_name"], "bundle/a.txt");
2988        assert_eq!(json["bytes_done"], 5);
2989        assert_eq!(json["bytes_total"], 9);
2990    }
2991
2992    #[test]
2993    fn direct_protocol_hello_has_stable_golden_json() {
2994        let hello = DirectProtocolHello {
2995            protocol_version: 1,
2996            min_protocol_version: 1,
2997            client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2998                .to_string(),
2999            psk: false,
3000        };
3001
3002        let json = serde_json::to_string(&hello).expect("hello serializes");
3003
3004        assert_eq!(
3005            json,
3006            r#"{"protocol_version":1,"min_protocol_version":1,"client_fingerprint":"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef","psk":false}"#
3007        );
3008    }
3009
3010    #[test]
3011    fn direct_protocol_response_has_stable_golden_json() {
3012        let response = DirectProtocolResponse::accept(1);
3013
3014        let json = serde_json::to_string(&response).expect("response serializes");
3015
3016        assert_eq!(
3017            json,
3018            r#"{"accepted":true,"protocol_version":1,"psk_challenge":null,"error":null}"#
3019        );
3020    }
3021
3022    #[test]
3023    fn direct_protocol_decodes_pre_psk_negotiation_frames() {
3024        let hello: DirectProtocolHello = serde_json::from_str(
3025            r#"{"protocol_version":1,"min_protocol_version":1,"client_fingerprint":"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"}"#,
3026        )
3027        .expect("old hello decodes");
3028        assert!(!hello.psk);
3029
3030        let response: DirectProtocolResponse =
3031            serde_json::from_str(r#"{"accepted":true,"protocol_version":1,"error":null}"#)
3032                .expect("old response decodes");
3033        assert_eq!(response, DirectProtocolResponse::accept(1));
3034    }
3035
3036    #[test]
3037    fn direct_protocol_negotiates_supported_overlap() {
3038        let hello = DirectProtocolHello {
3039            protocol_version: 3,
3040            min_protocol_version: 1,
3041            client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3042                .to_string(),
3043            psk: false,
3044        };
3045
3046        let response = negotiate_direct_protocol(&hello, None);
3047
3048        assert_eq!(response, DirectProtocolResponse::accept(1));
3049    }
3050
3051    #[test]
3052    fn direct_protocol_rejects_incompatible_versions() {
3053        let too_new = DirectProtocolHello {
3054            protocol_version: 3,
3055            min_protocol_version: 2,
3056            client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3057                .to_string(),
3058            psk: false,
3059        };
3060        let too_old = DirectProtocolHello {
3061            protocol_version: 0,
3062            min_protocol_version: 0,
3063            client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3064                .to_string(),
3065            psk: false,
3066        };
3067
3068        assert!(!negotiate_direct_protocol(&too_new, None).accepted);
3069        assert!(!negotiate_direct_protocol(&too_old, None).accepted);
3070    }
3071
3072    #[test]
3073    fn direct_protocol_requires_matching_psk_mode() {
3074        let no_psk = DirectProtocolHello {
3075            protocol_version: 1,
3076            min_protocol_version: 1,
3077            client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3078                .to_string(),
3079            psk: false,
3080        };
3081        let with_psk = DirectProtocolHello {
3082            psk: true,
3083            ..no_psk.clone()
3084        };
3085
3086        assert!(!negotiate_direct_protocol(&no_psk, Some("secret")).accepted);
3087        assert!(!negotiate_direct_protocol(&with_psk, None).accepted);
3088
3089        let accepted = negotiate_direct_protocol(&with_psk, Some("secret"));
3090        assert!(accepted.accepted);
3091        assert!(accepted.psk_challenge.is_some());
3092    }
3093
3094    #[test]
3095    fn transfer_manifest_has_stable_golden_json() {
3096        let manifest = TransferManifest {
3097            version: MANIFEST_VERSION,
3098            session_id: Uuid::parse_str("00000000-0000-0000-0000-000000000001")
3099                .expect("uuid parses"),
3100            chunk_size: DEFAULT_CHUNK_SIZE,
3101            entries: vec![ManifestEntry {
3102                relative_path: PathBuf::from("bundle/a.txt"),
3103                kind: ManifestEntryKind::File,
3104                size: 5,
3105                blake3: Some(
3106                    "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef".to_string(),
3107                ),
3108                chunks: vec![
3109                    "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789".to_string(),
3110                ],
3111                unix_mode: Some(0o100644),
3112                modified_unix_ms: Some(1_700_000_000_000),
3113            }],
3114        };
3115
3116        let json = serde_json::to_string(&manifest).expect("manifest serializes");
3117
3118        assert_eq!(
3119            json,
3120            r#"{"version":1,"session_id":"00000000-0000-0000-0000-000000000001","chunk_size":1048576,"entries":[{"relative_path":"bundle/a.txt","kind":"file","size":5,"blake3":"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef","chunks":["abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789"],"unix_mode":33188,"modified_unix_ms":1700000000000}]}"#
3121        );
3122    }
3123
3124    #[test]
3125    fn peer_registry_merges_observations_by_fingerprint() {
3126        let mut registry = PeerRegistry::new();
3127        let fingerprint = "abcdef1234567890";
3128        let first = PeerObservation::new(
3129            fingerprint,
3130            SocketAddr::from(([192, 168, 1, 10], 53318)),
3131            100,
3132        )
3133        .with_alias("desk")
3134        .with_hostname("desk.local")
3135        .with_transport("quic");
3136        let second = PeerObservation::new(
3137            fingerprint,
3138            SocketAddr::from(([192, 168, 1, 11], 53318)),
3139            200,
3140        )
3141        .with_alias("workstation")
3142        .with_transport("quic");
3143
3144        registry.observe(first).expect("first observation");
3145        registry.observe(second).expect("second observation");
3146        let record = registry
3147            .get_by_fingerprint(fingerprint)
3148            .expect("record by fingerprint");
3149
3150        assert_eq!(registry.records().count(), 1);
3151        assert!(record.aliases.contains("desk"));
3152        assert!(record.aliases.contains("workstation"));
3153        assert!(record.hostnames.contains("desk.local"));
3154        assert_eq!(record.addresses.len(), 2);
3155        assert_eq!(record.first_seen_unix_ms, 100);
3156        assert_eq!(record.last_seen_unix_ms, 200);
3157    }
3158
3159    #[test]
3160    fn peer_registry_looks_up_unique_fingerprint_prefix_alias_and_hostname() {
3161        let mut registry = PeerRegistry::new();
3162        registry
3163            .observe(
3164                PeerObservation::new(
3165                    "abcdef1234567890",
3166                    SocketAddr::from(([192, 168, 1, 10], 53318)),
3167                    100,
3168                )
3169                .with_alias("desk")
3170                .with_hostname("desk.local"),
3171            )
3172            .expect("first observation");
3173        registry
3174            .observe(
3175                PeerObservation::new(
3176                    "123456abcdef7890",
3177                    SocketAddr::from(([192, 168, 1, 20], 53318)),
3178                    100,
3179                )
3180                .with_alias("laptop"),
3181            )
3182            .expect("second observation");
3183
3184        assert_eq!(
3185            registry
3186                .lookup("abcdef")
3187                .map(|record| record.fingerprint.as_str()),
3188            Some("abcdef1234567890")
3189        );
3190        assert_eq!(
3191            registry
3192                .lookup("desk")
3193                .map(|record| record.fingerprint.as_str()),
3194            Some("abcdef1234567890")
3195        );
3196        assert_eq!(
3197            registry
3198                .lookup("desk.local")
3199                .map(|record| record.fingerprint.as_str()),
3200            Some("abcdef1234567890")
3201        );
3202        assert!(registry.lookup("missing").is_none());
3203    }
3204
3205    #[test]
3206    fn peer_registry_rejects_ambiguous_lookup_hints() {
3207        let mut registry = PeerRegistry::new();
3208        registry
3209            .observe(
3210                PeerObservation::new(
3211                    "abcdef1234567890",
3212                    SocketAddr::from(([192, 168, 1, 10], 53318)),
3213                    100,
3214                )
3215                .with_alias("desk"),
3216            )
3217            .expect("first observation");
3218        registry
3219            .observe(
3220                PeerObservation::new(
3221                    "abcdef9999999999",
3222                    SocketAddr::from(([192, 168, 1, 11], 53318)),
3223                    100,
3224                )
3225                .with_alias("desk"),
3226            )
3227            .expect("second observation");
3228
3229        assert!(registry.lookup("abcdef").is_none());
3230        assert!(registry.lookup("desk").is_none());
3231        assert_eq!(
3232            registry
3233                .lookup("abcdef1234567890")
3234                .map(|record| record.fingerprint.as_str()),
3235            Some("abcdef1234567890")
3236        );
3237        match registry.lookup_detail("desk") {
3238            PeerLookup::Ambiguous(records) => assert_eq!(records.len(), 2),
3239            other => panic!("expected ambiguous duplicate-alias lookup, got {other:?}"),
3240        }
3241        assert!(matches!(
3242            registry.lookup_detail("missing"),
3243            PeerLookup::Missing
3244        ));
3245    }
3246
3247    #[test]
3248    fn native_announcement_builds_expected_txt_properties() {
3249        let announcement = NativeAnnouncement {
3250            alias: "Stephen MBP".to_string(),
3251            fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3252                .to_string(),
3253            quic_port: 53318,
3254            listen_port: Some(53317),
3255        };
3256
3257        let service = announcement.service_info().expect("service info");
3258
3259        assert_eq!(service.get_type(), NATIVE_SERVICE_TYPE);
3260        assert!(
3261            service
3262                .get_fullname()
3263                .starts_with("stephen-mbp-0123456789ab.")
3264        );
3265        assert_eq!(service.get_property_val_str("pv"), Some("1"));
3266        assert_eq!(service.get_property_val_str("minpv"), Some("1"));
3267        assert_eq!(
3268            service.get_property_val_str("fp"),
3269            Some("0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")
3270        );
3271        assert_eq!(service.get_property_val_str("alias"), Some("Stephen MBP"));
3272        assert_eq!(service.get_property_val_str("transports"), Some("quic"));
3273        assert_eq!(service.get_property_val_str("quic_port"), Some("53318"));
3274        assert_eq!(service.get_property_val_str("listen_port"), Some("53317"));
3275    }
3276
3277    #[test]
3278    fn resolved_native_service_merges_into_registry() {
3279        let properties = [
3280            ("pv", "1"),
3281            ("minpv", "1"),
3282            (
3283                "fp",
3284                "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
3285            ),
3286            ("alias", "desk"),
3287            ("transports", "quic"),
3288            ("quic_port", "53318"),
3289        ];
3290        let service = ServiceInfo::new(
3291            NATIVE_SERVICE_TYPE,
3292            "desk-0123456789ab",
3293            "desk.local.",
3294            "192.168.1.42",
3295            53318,
3296            &properties[..],
3297        )
3298        .expect("service info")
3299        .as_resolved_service();
3300        let mut registry = PeerRegistry::new();
3301
3302        observe_resolved_service(&mut registry, &service).expect("observation");
3303
3304        let record = registry.lookup("desk").expect("record by alias");
3305        assert_eq!(
3306            record.fingerprint,
3307            "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3308        );
3309        assert_eq!(
3310            record.preferred_quic_address(),
3311            Some(SocketAddr::from(([192, 168, 1, 42], 53318)))
3312        );
3313    }
3314
3315    #[test]
3316    fn resolved_native_service_ignores_incompatible_protocol_ranges() {
3317        let properties = [
3318            ("pv", "3"),
3319            ("minpv", "2"),
3320            (
3321                "fp",
3322                "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
3323            ),
3324            ("alias", "desk"),
3325            ("transports", "quic"),
3326            ("quic_port", "53318"),
3327        ];
3328        let service = ServiceInfo::new(
3329            NATIVE_SERVICE_TYPE,
3330            "desk-0123456789ab",
3331            "desk.local.",
3332            "192.168.1.42",
3333            53318,
3334            &properties[..],
3335        )
3336        .expect("service info")
3337        .as_resolved_service();
3338        let mut registry = PeerRegistry::new();
3339
3340        observe_resolved_service(&mut registry, &service).expect("observation");
3341
3342        assert_eq!(registry.records().count(), 0);
3343    }
3344
3345    #[tokio::test]
3346    async fn direct_quic_loopback_sends_one_file() {
3347        let source_dir = tempfile::tempdir().expect("source tempdir");
3348        let dest_dir = tempfile::tempdir().expect("dest tempdir");
3349        let identity_dir = tempfile::tempdir().expect("identity tempdir");
3350        let file_path = source_dir.path().join("hello.txt");
3351        tokio::fs::write(&file_path, b"hello from ferry")
3352            .await
3353            .expect("source file written");
3354
3355        let identity =
3356            NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3357        let recv_identity = identity.clone();
3358        let reserved_socket =
3359            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3360        let recv_addr = reserved_socket.local_addr().expect("local addr");
3361        drop(reserved_socket);
3362        let receive_progress = Arc::new(Mutex::new(Vec::new()));
3363        let receive_progress_log = receive_progress.clone();
3364        let dest_path = dest_dir.path().to_path_buf();
3365        let server = tokio::spawn(async move {
3366            receive_direct_file(
3367                &ReceiveRequest::new(recv_addr),
3368                dest_path,
3369                &recv_identity,
3370                |event| {
3371                    receive_progress_log
3372                        .lock()
3373                        .expect("progress lock")
3374                        .push(event);
3375                },
3376            )
3377            .await
3378        });
3379
3380        tokio::time::sleep(Duration::from_millis(100)).await;
3381
3382        let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3383        let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
3384        let send_progress = Arc::new(Mutex::new(Vec::new()));
3385        let send_progress_log = send_progress.clone();
3386        let send_summary = send_direct_file(&send_request, &identity, |event| {
3387            send_progress_log.lock().expect("progress lock").push(event);
3388        })
3389        .await
3390        .expect("file sent");
3391        let receive_summary = server.await.expect("server joined").expect("file received");
3392
3393        let received = tokio::fs::read(dest_dir.path().join("hello.txt"))
3394            .await
3395            .expect("received file readable");
3396        assert_eq!(received, b"hello from ferry");
3397        assert_eq!(send_summary.bytes, receive_summary.bytes);
3398        assert_eq!(send_summary.blake3, receive_summary.blake3);
3399        let send_progress = send_progress.lock().expect("progress lock");
3400        let receive_progress = receive_progress.lock().expect("progress lock");
3401        assert!(matches!(
3402            send_progress.first(),
3403            Some(TransferEvent::SessionStarted {
3404                direction: TransferDirection::Send,
3405                ..
3406            })
3407        ));
3408        assert!(
3409            send_progress
3410                .iter()
3411                .any(|event| matches!(event, TransferEvent::Progress { .. }))
3412        );
3413        assert!(
3414            send_progress
3415                .iter()
3416                .any(|event| matches!(event, TransferEvent::SessionFinished { .. }))
3417        );
3418        assert!(matches!(
3419            receive_progress.first(),
3420            Some(TransferEvent::SessionStarted {
3421                direction: TransferDirection::Receive,
3422                ..
3423            })
3424        ));
3425        assert!(
3426            receive_progress
3427                .iter()
3428                .any(|event| matches!(event, TransferEvent::Progress { .. }))
3429        );
3430        assert!(
3431            receive_progress
3432                .iter()
3433                .any(|event| matches!(event, TransferEvent::SessionFinished { .. }))
3434        );
3435    }
3436
3437    #[tokio::test]
3438    async fn direct_transfer_accepts_expected_receiver_fingerprint() {
3439        let source_dir = tempfile::tempdir().expect("source tempdir");
3440        let dest_dir = tempfile::tempdir().expect("dest tempdir");
3441        let sender_identity_dir = tempfile::tempdir().expect("sender identity tempdir");
3442        let receiver_identity_dir = tempfile::tempdir().expect("receiver identity tempdir");
3443        let file_path = source_dir.path().join("hello.txt");
3444        tokio::fs::write(&file_path, b"hello from ferry")
3445            .await
3446            .expect("source file written");
3447
3448        let sender_identity = NativeIdentity::load_or_generate_in(sender_identity_dir.path())
3449            .expect("sender identity");
3450        let receiver_identity = NativeIdentity::load_or_generate_in(receiver_identity_dir.path())
3451            .expect("receiver identity");
3452        let expected_fingerprint = receiver_identity.fingerprint().to_string();
3453        let reserved_socket =
3454            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3455        let recv_addr = reserved_socket.local_addr().expect("local addr");
3456        drop(reserved_socket);
3457        let dest_path = dest_dir.path().to_path_buf();
3458        let server = tokio::spawn(async move {
3459            receive_direct_file(
3460                &ReceiveRequest::new(recv_addr),
3461                dest_path,
3462                &receiver_identity,
3463                |_| {},
3464            )
3465            .await
3466        });
3467
3468        tokio::time::sleep(Duration::from_millis(100)).await;
3469
3470        let peer = DirectPeer::from_address(recv_addr)
3471            .with_expected_fingerprint(expected_fingerprint)
3472            .expect("expected fingerprint accepted");
3473        let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
3474        let send_summary = send_direct_file(&send_request, &sender_identity, |_| {})
3475            .await
3476            .expect("file sent");
3477        let receive_summary = server.await.expect("server joined").expect("file received");
3478
3479        let received = tokio::fs::read(dest_dir.path().join("hello.txt"))
3480            .await
3481            .expect("received file readable");
3482        assert_eq!(received, b"hello from ferry");
3483        assert_eq!(send_summary.bytes, receive_summary.bytes);
3484        assert_eq!(send_summary.blake3, receive_summary.blake3);
3485    }
3486
3487    #[tokio::test]
3488    async fn direct_transfer_rejects_unexpected_receiver_fingerprint_before_payload() {
3489        let source_dir = tempfile::tempdir().expect("source tempdir");
3490        let dest_dir = tempfile::tempdir().expect("dest tempdir");
3491        let sender_identity_dir = tempfile::tempdir().expect("sender identity tempdir");
3492        let receiver_identity_dir = tempfile::tempdir().expect("receiver identity tempdir");
3493        let other_identity_dir = tempfile::tempdir().expect("other identity tempdir");
3494        let file_path = source_dir.path().join("hello.txt");
3495        tokio::fs::write(&file_path, b"hello from ferry")
3496            .await
3497            .expect("source file written");
3498
3499        let sender_identity = NativeIdentity::load_or_generate_in(sender_identity_dir.path())
3500            .expect("sender identity");
3501        let receiver_identity = NativeIdentity::load_or_generate_in(receiver_identity_dir.path())
3502            .expect("receiver identity");
3503        let other_identity =
3504            NativeIdentity::load_or_generate_in(other_identity_dir.path()).expect("other identity");
3505        let reserved_socket =
3506            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3507        let recv_addr = reserved_socket.local_addr().expect("local addr");
3508        drop(reserved_socket);
3509        let dest_path = dest_dir.path().to_path_buf();
3510        let server = tokio::spawn(async move {
3511            receive_direct_file(
3512                &ReceiveRequest::new(recv_addr),
3513                dest_path,
3514                &receiver_identity,
3515                |_| {},
3516            )
3517            .await
3518        });
3519
3520        tokio::time::sleep(Duration::from_millis(100)).await;
3521
3522        let peer = DirectPeer::from_address(recv_addr)
3523            .with_expected_fingerprint(other_identity.fingerprint().to_string())
3524            .expect("expected fingerprint accepted");
3525        let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
3526        let error = send_direct_file(&send_request, &sender_identity, |_| {})
3527            .await
3528            .expect_err("fingerprint mismatch should reject send");
3529
3530        assert!(matches!(error, Error::PeerFingerprintMismatch { .. }));
3531        let server_error = server
3532            .await
3533            .expect("server joined")
3534            .expect_err("server closed");
3535        assert!(matches!(
3536            server_error,
3537            Error::Connection(_) | Error::NoIncomingConnection
3538        ));
3539        assert!(
3540            !tokio::fs::try_exists(dest_dir.path().join("hello.txt"))
3541                .await
3542                .expect("destination checked")
3543        );
3544    }
3545
3546    #[tokio::test]
3547    async fn direct_transfer_accepts_matching_psk_before_payload() {
3548        let source_dir = tempfile::tempdir().expect("source tempdir");
3549        let dest_dir = tempfile::tempdir().expect("dest tempdir");
3550        let identity_dir = tempfile::tempdir().expect("identity tempdir");
3551        let file_path = source_dir.path().join("hello.txt");
3552        tokio::fs::write(&file_path, b"hello from ferry")
3553            .await
3554            .expect("source file written");
3555
3556        let identity =
3557            NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3558        let recv_identity = identity.clone();
3559        let reserved_socket =
3560            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3561        let recv_addr = reserved_socket.local_addr().expect("local addr");
3562        drop(reserved_socket);
3563        let dest_path = dest_dir.path().to_path_buf();
3564        let receive_request = ReceiveRequest::new(recv_addr)
3565            .with_psk("correct horse battery staple")
3566            .expect("receiver psk accepted");
3567        let server = tokio::spawn(async move {
3568            receive_direct_file(&receive_request, dest_path, &recv_identity, |_| {}).await
3569        });
3570
3571        tokio::time::sleep(Duration::from_millis(100)).await;
3572
3573        let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3574        let send_request = SendRequest::new(peer, vec![file_path])
3575            .expect("send request is valid")
3576            .with_psk("correct horse battery staple")
3577            .expect("sender psk accepted");
3578        let send_summary = send_direct_file(&send_request, &identity, |_| {})
3579            .await
3580            .expect("file sent");
3581        let receive_summary = server.await.expect("server joined").expect("file received");
3582
3583        let received = tokio::fs::read(dest_dir.path().join("hello.txt"))
3584            .await
3585            .expect("received file readable");
3586        assert_eq!(received, b"hello from ferry");
3587        assert_eq!(send_summary.bytes, receive_summary.bytes);
3588    }
3589
3590    #[tokio::test]
3591    async fn direct_transfer_rejects_missing_psk_before_payload() {
3592        let source_dir = tempfile::tempdir().expect("source tempdir");
3593        let dest_dir = tempfile::tempdir().expect("dest tempdir");
3594        let identity_dir = tempfile::tempdir().expect("identity tempdir");
3595        let file_path = source_dir.path().join("hello.txt");
3596        tokio::fs::write(&file_path, b"hello from ferry")
3597            .await
3598            .expect("source file written");
3599
3600        let identity =
3601            NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3602        let recv_identity = identity.clone();
3603        let reserved_socket =
3604            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3605        let recv_addr = reserved_socket.local_addr().expect("local addr");
3606        drop(reserved_socket);
3607        let dest_path = dest_dir.path().to_path_buf();
3608        let receive_request = ReceiveRequest::new(recv_addr)
3609            .with_psk("receiver secret")
3610            .expect("receiver psk accepted");
3611        let server = tokio::spawn(async move {
3612            receive_direct_file(&receive_request, dest_path, &recv_identity, |_| {}).await
3613        });
3614
3615        tokio::time::sleep(Duration::from_millis(100)).await;
3616
3617        let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3618        let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
3619        let error = send_direct_file(&send_request, &identity, |_| {})
3620            .await
3621            .expect_err("missing psk should reject send");
3622
3623        assert!(matches!(error, Error::Protocol(_)));
3624        let _ = server.await.expect("server joined");
3625        assert!(
3626            !tokio::fs::try_exists(dest_dir.path().join("hello.txt"))
3627                .await
3628                .expect("destination checked")
3629        );
3630    }
3631
3632    #[tokio::test]
3633    async fn direct_transfer_rejects_wrong_psk_before_payload() {
3634        let source_dir = tempfile::tempdir().expect("source tempdir");
3635        let dest_dir = tempfile::tempdir().expect("dest tempdir");
3636        let identity_dir = tempfile::tempdir().expect("identity tempdir");
3637        let file_path = source_dir.path().join("hello.txt");
3638        tokio::fs::write(&file_path, b"hello from ferry")
3639            .await
3640            .expect("source file written");
3641
3642        let identity =
3643            NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3644        let recv_identity = identity.clone();
3645        let reserved_socket =
3646            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3647        let recv_addr = reserved_socket.local_addr().expect("local addr");
3648        drop(reserved_socket);
3649        let dest_path = dest_dir.path().to_path_buf();
3650        let receive_request = ReceiveRequest::new(recv_addr)
3651            .with_psk("receiver secret")
3652            .expect("receiver psk accepted");
3653        let server = tokio::spawn(async move {
3654            receive_direct_file(&receive_request, dest_path, &recv_identity, |_| {}).await
3655        });
3656
3657        tokio::time::sleep(Duration::from_millis(100)).await;
3658
3659        let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3660        let send_request = SendRequest::new(peer, vec![file_path])
3661            .expect("send request is valid")
3662            .with_psk("wrong secret")
3663            .expect("sender psk accepted");
3664        let error = send_direct_file(&send_request, &identity, |_| {})
3665            .await
3666            .expect_err("wrong psk should reject send");
3667
3668        assert!(matches!(error, Error::Authentication(_)));
3669        assert!(!error.to_string().contains("wrong secret"));
3670        assert!(!error.to_string().contains("receiver secret"));
3671        let server_error = server
3672            .await
3673            .expect("server joined")
3674            .expect_err("server should reject wrong psk");
3675        assert!(matches!(
3676            server_error,
3677            Error::Authentication(_) | Error::Write(_)
3678        ));
3679        assert!(
3680            !tokio::fs::try_exists(dest_dir.path().join("hello.txt"))
3681                .await
3682                .expect("destination checked")
3683        );
3684    }
3685
3686    #[tokio::test]
3687    async fn direct_quic_loopback_sends_directory_and_resumes_partial_file() {
3688        let source_dir = tempfile::tempdir().expect("source tempdir");
3689        let dest_dir = tempfile::tempdir().expect("dest tempdir");
3690        let identity_dir = tempfile::tempdir().expect("identity tempdir");
3691        let bundle = source_dir.path().join("bundle");
3692        let nested = bundle.join("nested");
3693        tokio::fs::create_dir_all(&nested)
3694            .await
3695            .expect("source dirs");
3696        tokio::fs::write(nested.join("small.txt"), b"small")
3697            .await
3698            .expect("small file");
3699        let large_bytes = vec![7; (DEFAULT_CHUNK_SIZE as usize * 2) + 17];
3700        tokio::fs::write(bundle.join("large.bin"), &large_bytes)
3701            .await
3702            .expect("large file");
3703
3704        let dest_bundle = dest_dir.path().join("bundle");
3705        tokio::fs::create_dir_all(&dest_bundle)
3706            .await
3707            .expect("dest bundle");
3708        tokio::fs::write(
3709            dest_bundle.join("large.bin"),
3710            &large_bytes[..DEFAULT_CHUNK_SIZE as usize + 5],
3711        )
3712        .await
3713        .expect("partial large");
3714
3715        let identity =
3716            NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3717        let recv_identity = identity.clone();
3718        let reserved_socket =
3719            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3720        let recv_addr = reserved_socket.local_addr().expect("local addr");
3721        drop(reserved_socket);
3722        let dest_path = dest_dir.path().to_path_buf();
3723        let server = tokio::spawn(async move {
3724            receive_direct_file(
3725                &ReceiveRequest::new(recv_addr),
3726                dest_path,
3727                &recv_identity,
3728                |_| {},
3729            )
3730            .await
3731        });
3732
3733        tokio::time::sleep(Duration::from_millis(100)).await;
3734
3735        let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3736        let send_request = SendRequest::new(peer, vec![bundle]).expect("send request is valid");
3737        let send_summary = send_direct_file(&send_request, &identity, |_| {})
3738            .await
3739            .expect("directory sent");
3740        let receive_summary = server
3741            .await
3742            .expect("server joined")
3743            .expect("directory received");
3744
3745        let received_large = tokio::fs::read(dest_bundle.join("large.bin"))
3746            .await
3747            .expect("large file readable");
3748        let received_small = tokio::fs::read(dest_bundle.join("nested/small.txt"))
3749            .await
3750            .expect("small file readable");
3751        assert_eq!(received_large, large_bytes);
3752        assert_eq!(received_small, b"small");
3753        assert_eq!(send_summary.bytes, receive_summary.bytes);
3754        assert!(
3755            receive_summary
3756                .files
3757                .iter()
3758                .any(|file| file.status == TransferFileStatus::Resumed)
3759        );
3760    }
3761
3762    #[tokio::test]
3763    async fn direct_quic_loopback_cancels_send_and_does_not_finalize_partial_file() {
3764        let source_dir = tempfile::tempdir().expect("source tempdir");
3765        let dest_dir = tempfile::tempdir().expect("dest tempdir");
3766        let identity_dir = tempfile::tempdir().expect("identity tempdir");
3767        let file_path = source_dir.path().join("payload.bin");
3768        tokio::fs::write(&file_path, vec![3; IO_BUFFER_SIZE * 64])
3769            .await
3770            .expect("source file written");
3771
3772        let identity =
3773            NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3774        let recv_identity = identity.clone();
3775        let reserved_socket =
3776            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3777        let recv_addr = reserved_socket.local_addr().expect("local addr");
3778        drop(reserved_socket);
3779
3780        let dest_path = dest_dir.path().to_path_buf();
3781        let server = tokio::spawn(async move {
3782            receive_direct_file(
3783                &ReceiveRequest::new(recv_addr),
3784                dest_path,
3785                &recv_identity,
3786                |_| {},
3787            )
3788            .await
3789        });
3790
3791        tokio::time::sleep(Duration::from_millis(100)).await;
3792
3793        let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3794        let send_request =
3795            SendRequest::new(peer, vec![file_path.clone()]).expect("send request is valid");
3796        let send_control = TransferControl::new();
3797        let cancel_on_progress = send_control.clone();
3798        let send_events = Arc::new(Mutex::new(Vec::new()));
3799        let send_events_log = send_events.clone();
3800        let send_result =
3801            send_direct_file_with_control(&send_request, &identity, send_control, |event| {
3802                if matches!(event, TransferEvent::Progress { .. }) {
3803                    cancel_on_progress.cancel();
3804                }
3805                send_events_log.lock().expect("events lock").push(event);
3806            })
3807            .await;
3808
3809        assert!(matches!(send_result, Err(Error::TransferCancelled)));
3810
3811        let receive_result = tokio::time::timeout(Duration::from_secs(5), server)
3812            .await
3813            .expect("receiver should finish after sender cancellation")
3814            .expect("server joined");
3815        assert!(receive_result.is_err());
3816
3817        assert!(!dest_dir.path().join("payload.bin").exists());
3818        assert!(
3819            send_events
3820                .lock()
3821                .expect("events lock")
3822                .iter()
3823                .any(|event| matches!(event, TransferEvent::SessionCancelled { .. }))
3824        );
3825    }
3826}