Skip to main content

ferry_core/
lib.rs

1pub mod error;
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::fs::Metadata;
5use std::io::SeekFrom;
6use std::net::SocketAddr;
7use std::path::{Component, Path, PathBuf};
8use std::sync::Arc;
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11use directories::BaseDirs;
12use mdns_sd::{ResolvedService, ServiceDaemon, ServiceEvent, ServiceInfo};
13use quinn::crypto::rustls::QuicClientConfig;
14use rcgen::{CertifiedKey, generate_simple_self_signed};
15use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier};
16use rustls::crypto::{CryptoProvider, verify_tls12_signature, verify_tls13_signature};
17use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer, ServerName, UnixTime};
18use rustls::{DigitallySignedStruct, SignatureScheme};
19use serde::{Deserialize, Serialize};
20use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
21use uuid::Uuid;
22
23pub use error::{Error, Result};
24
25pub const NATIVE_PROTOCOL_VERSION: u16 = 1;
26pub const MIN_NATIVE_PROTOCOL_VERSION: u16 = 1;
27pub const NATIVE_SERVICE_TYPE: &str = "_ferry._udp.local.";
28
29const DIRECT_MAGIC: &[u8; 8] = b"FERRY01\n";
30const MAX_FRAME_LEN: usize = 16 * 1024 * 1024;
31const IO_BUFFER_SIZE: usize = 64 * 1024;
32const MANIFEST_VERSION: u16 = 1;
33const DEFAULT_CHUNK_SIZE: u64 = 1024 * 1024;
34const CONFIG_DIR_NAME: &str = "ferry";
35
36#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
37pub struct DirectPeer {
38    address: SocketAddr,
39}
40
41impl DirectPeer {
42    pub fn parse(value: &str) -> Result<Self> {
43        Ok(Self {
44            address: value.parse()?,
45        })
46    }
47
48    pub const fn address(&self) -> SocketAddr {
49        self.address
50    }
51
52    pub const fn from_address(address: SocketAddr) -> Self {
53        Self { address }
54    }
55}
56
57#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
58pub struct SendRequest {
59    peer: DirectPeer,
60    paths: Vec<PathBuf>,
61}
62
63impl SendRequest {
64    pub fn new(peer: DirectPeer, paths: Vec<PathBuf>) -> Result<Self> {
65        if paths.is_empty() {
66            return Err(Error::InvalidInput(
67                "at least one file path is required".to_string(),
68            ));
69        }
70
71        Ok(Self { peer, paths })
72    }
73
74    pub const fn peer(&self) -> &DirectPeer {
75        &self.peer
76    }
77
78    pub fn paths(&self) -> &[PathBuf] {
79        &self.paths
80    }
81}
82
83#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
84pub struct ReceiveRequest {
85    listen: SocketAddr,
86}
87
88impl ReceiveRequest {
89    pub fn new(listen: SocketAddr) -> Self {
90        Self { listen }
91    }
92
93    pub const fn listen(&self) -> SocketAddr {
94        self.listen
95    }
96}
97
98#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
99pub struct AppConfig {
100    pub alias: String,
101    pub listen_port: u16,
102    pub quic_port: u16,
103    pub download_dir: String,
104    pub auto_accept_known: bool,
105    pub auto_accept_unknown: bool,
106    pub discovery: Vec<String>,
107    pub max_concurrent_files: usize,
108    pub trust: TrustConfig,
109}
110
111impl Default for AppConfig {
112    fn default() -> Self {
113        Self {
114            alias: default_alias(),
115            listen_port: 53317,
116            quic_port: 53318,
117            download_dir: "~/Downloads/ferry".to_string(),
118            auto_accept_known: true,
119            auto_accept_unknown: false,
120            discovery: vec!["native".to_string()],
121            max_concurrent_files: 8,
122            trust: TrustConfig::default(),
123        }
124    }
125}
126
127impl AppConfig {
128    pub fn load_or_default() -> Result<Self> {
129        Self::load_or_default_from(config_dir()?.join("config.toml"))
130    }
131
132    pub fn load_or_default_from(path: impl AsRef<Path>) -> Result<Self> {
133        let path = path.as_ref();
134        match std::fs::read_to_string(path) {
135            Ok(contents) => {
136                toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
137            }
138            Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Self::default()),
139            Err(source) => Err(Error::IoPath {
140                path: path.to_path_buf(),
141                source,
142            }),
143        }
144    }
145
146    pub fn save_default_path(&self) -> Result<()> {
147        self.save_to_path(config_dir()?.join("config.toml"))
148    }
149
150    pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
151        write_toml_file(path.as_ref(), self)
152    }
153
154    pub fn to_toml_string(&self) -> Result<String> {
155        toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
156    }
157
158    pub fn redacted(&self) -> Self {
159        let mut config = self.clone();
160        config.trust = config.trust.redacted();
161        config
162    }
163
164    pub fn to_redacted_toml_string(&self) -> Result<String> {
165        self.redacted().to_toml_string()
166    }
167}
168
169#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
170pub struct DaemonConfig {
171    pub listen: SocketAddr,
172    pub destination: PathBuf,
173}
174
175impl DaemonConfig {
176    pub fn from_app_config(config: &AppConfig) -> Result<Self> {
177        Ok(Self {
178            listen: SocketAddr::from(([0, 0, 0, 0], config.quic_port)),
179            destination: expand_home_path(&config.download_dir)?,
180        })
181    }
182
183    pub fn load_or_default() -> Result<Self> {
184        let app_config = AppConfig::load_or_default()?;
185        Self::load_or_default_from(config_dir()?.join("daemon.toml"), &app_config)
186    }
187
188    pub fn load_or_default_from(path: impl AsRef<Path>, app_config: &AppConfig) -> Result<Self> {
189        let path = path.as_ref();
190        match std::fs::read_to_string(path) {
191            Ok(contents) => {
192                toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
193            }
194            Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
195                Self::from_app_config(app_config)
196            }
197            Err(source) => Err(Error::IoPath {
198                path: path.to_path_buf(),
199                source,
200            }),
201        }
202    }
203
204    pub fn save_default_path(&self) -> Result<()> {
205        self.save_to_path(config_dir()?.join("daemon.toml"))
206    }
207
208    pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
209        write_toml_file(path.as_ref(), self)
210    }
211
212    pub fn to_toml_string(&self) -> Result<String> {
213        toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
214    }
215}
216
217#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
218pub struct TrustConfig {
219    pub require_fingerprint: bool,
220    pub psk: String,
221}
222
223impl Default for TrustConfig {
224    fn default() -> Self {
225        Self {
226            require_fingerprint: true,
227            psk: String::new(),
228        }
229    }
230}
231
232impl TrustConfig {
233    pub fn redacted(&self) -> Self {
234        let psk = if self.psk.is_empty() {
235            String::new()
236        } else {
237            "<redacted>".to_string()
238        };
239        Self {
240            require_fingerprint: self.require_fingerprint,
241            psk,
242        }
243    }
244}
245
246#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
247pub struct NativeIdentity {
248    cert_der: Vec<u8>,
249    key_der: Vec<u8>,
250    fingerprint: String,
251}
252
253impl NativeIdentity {
254    pub fn load_or_generate() -> Result<Self> {
255        Self::load_or_generate_in(config_dir()?)
256    }
257
258    pub fn load_or_generate_in(config_dir: impl AsRef<Path>) -> Result<Self> {
259        let config_dir = config_dir.as_ref();
260        let cert_path = config_dir.join("identity.cert.der");
261        let key_path = config_dir.join("identity.key.der");
262
263        if cert_path.exists() && key_path.exists() {
264            let cert_der = std::fs::read(&cert_path).map_err(|source| Error::IoPath {
265                path: cert_path,
266                source,
267            })?;
268            let key_der = std::fs::read(&key_path).map_err(|source| Error::IoPath {
269                path: key_path,
270                source,
271            })?;
272
273            return Ok(Self::from_parts(cert_der, key_der));
274        }
275
276        std::fs::create_dir_all(config_dir).map_err(|source| Error::IoPath {
277            path: config_dir.to_path_buf(),
278            source,
279        })?;
280
281        let identity = Self::generate()?;
282        write_private_file(&cert_path, &identity.cert_der)?;
283        write_private_file(&key_path, &identity.key_der)?;
284        Ok(identity)
285    }
286
287    pub fn generate() -> Result<Self> {
288        let CertifiedKey { cert, signing_key } =
289            generate_simple_self_signed(vec!["localhost".to_string()])?;
290        Ok(Self::from_parts(
291            cert.der().to_vec(),
292            signing_key.serialize_der(),
293        ))
294    }
295
296    pub fn fingerprint(&self) -> &str {
297        &self.fingerprint
298    }
299
300    fn cert_chain(&self) -> Vec<CertificateDer<'static>> {
301        vec![CertificateDer::from(self.cert_der.clone())]
302    }
303
304    fn private_key(&self) -> PrivateKeyDer<'static> {
305        PrivateKeyDer::from(PrivatePkcs8KeyDer::from(self.key_der.clone()))
306    }
307
308    fn from_parts(cert_der: Vec<u8>, key_der: Vec<u8>) -> Self {
309        let fingerprint = blake3::hash(&cert_der).to_hex().to_string();
310        Self {
311            cert_der,
312            key_der,
313            fingerprint,
314        }
315    }
316}
317
318#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
319#[serde(rename_all = "snake_case")]
320pub enum TransferDirection {
321    Send,
322    Receive,
323}
324
325#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
326pub struct TransferProgress {
327    pub direction: TransferDirection,
328    pub file_name: String,
329    pub bytes_done: u64,
330    pub bytes_total: u64,
331}
332
333#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
334#[serde(tag = "event", rename_all = "snake_case")]
335pub enum TransferEvent {
336    SessionStarted {
337        direction: TransferDirection,
338        session_id: Uuid,
339        files_total: usize,
340        bytes_total: u64,
341    },
342    FileStarted {
343        direction: TransferDirection,
344        file_name: String,
345        bytes_total: u64,
346        resume_offset: u64,
347    },
348    Progress {
349        direction: TransferDirection,
350        file_name: String,
351        bytes_done: u64,
352        bytes_total: u64,
353    },
354    FileFinished {
355        direction: TransferDirection,
356        file_name: String,
357        bytes: u64,
358        blake3: String,
359        status: TransferFileStatus,
360    },
361    SessionFinished {
362        direction: TransferDirection,
363        files_total: usize,
364        bytes_total: u64,
365        blake3: String,
366    },
367}
368
369impl TransferEvent {
370    pub fn progress(&self) -> Option<TransferProgress> {
371        match self {
372            Self::Progress {
373                direction,
374                file_name,
375                bytes_done,
376                bytes_total,
377            } => Some(TransferProgress {
378                direction: *direction,
379                file_name: file_name.clone(),
380                bytes_done: *bytes_done,
381                bytes_total: *bytes_total,
382            }),
383            _ => None,
384        }
385    }
386}
387
388#[derive(Debug, Clone, PartialEq, Eq)]
389pub struct TransferSummary {
390    pub path: PathBuf,
391    pub file_name: String,
392    pub bytes: u64,
393    pub blake3: String,
394    pub files: Vec<TransferFileSummary>,
395}
396
397#[derive(Debug, Clone, PartialEq, Eq)]
398pub struct TransferFileSummary {
399    pub path: PathBuf,
400    pub relative_path: PathBuf,
401    pub bytes: u64,
402    pub blake3: String,
403    pub status: TransferFileStatus,
404}
405
406#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
407#[serde(rename_all = "snake_case")]
408pub enum TransferFileStatus {
409    Sent,
410    Received,
411    Skipped,
412    Resumed,
413}
414
415#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
416pub struct TransferManifest {
417    pub version: u16,
418    pub session_id: Uuid,
419    pub chunk_size: u64,
420    pub entries: Vec<ManifestEntry>,
421}
422
423impl TransferManifest {
424    pub fn file_entries(&self) -> impl Iterator<Item = &ManifestEntry> {
425        self.entries
426            .iter()
427            .filter(|entry| entry.kind == ManifestEntryKind::File)
428    }
429
430    pub fn total_file_bytes(&self) -> u64 {
431        self.file_entries().map(|entry| entry.size).sum()
432    }
433
434    pub fn digest(&self) -> Result<String> {
435        let bytes = serde_json::to_vec(self).map_err(|error| Error::Protocol(error.to_string()))?;
436        Ok(blake3::hash(&bytes).to_hex().to_string())
437    }
438}
439
440#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
441pub struct ManifestEntry {
442    pub relative_path: PathBuf,
443    pub kind: ManifestEntryKind,
444    pub size: u64,
445    pub blake3: Option<String>,
446    pub chunks: Vec<String>,
447    pub unix_mode: Option<u32>,
448    pub modified_unix_ms: Option<i128>,
449}
450
451#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
452#[serde(rename_all = "snake_case")]
453pub enum ManifestEntryKind {
454    File,
455    Directory,
456}
457
458#[derive(Debug, Clone, PartialEq, Eq)]
459pub enum ResumeDecision {
460    Create,
461    Skip,
462    ResumeFrom(u64),
463    Conflict(String),
464}
465
466#[derive(Debug, Clone, PartialEq, Eq)]
467pub struct PeerObservation {
468    pub fingerprint: String,
469    pub alias: Option<String>,
470    pub hostname: Option<String>,
471    pub address: SocketAddr,
472    pub transports: BTreeSet<String>,
473    pub seen_unix_ms: i128,
474}
475
476impl PeerObservation {
477    pub fn new(fingerprint: impl Into<String>, address: SocketAddr, seen_unix_ms: i128) -> Self {
478        Self {
479            fingerprint: fingerprint.into(),
480            alias: None,
481            hostname: None,
482            address,
483            transports: BTreeSet::new(),
484            seen_unix_ms,
485        }
486    }
487
488    pub fn with_alias(mut self, alias: impl Into<String>) -> Self {
489        self.alias = Some(alias.into());
490        self
491    }
492
493    pub fn with_hostname(mut self, hostname: impl Into<String>) -> Self {
494        self.hostname = Some(hostname.into());
495        self
496    }
497
498    pub fn with_transport(mut self, transport: impl Into<String>) -> Self {
499        self.transports.insert(transport.into());
500        self
501    }
502}
503
504#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
505pub struct PeerRecord {
506    pub fingerprint: String,
507    pub aliases: BTreeSet<String>,
508    pub hostnames: BTreeSet<String>,
509    pub addresses: BTreeSet<SocketAddr>,
510    pub transports: BTreeSet<String>,
511    pub first_seen_unix_ms: i128,
512    pub last_seen_unix_ms: i128,
513}
514
515impl PeerRecord {
516    fn from_observation(observation: PeerObservation) -> Self {
517        let mut aliases = BTreeSet::new();
518        if let Some(alias) = observation.alias {
519            aliases.insert(alias);
520        }
521
522        let mut hostnames = BTreeSet::new();
523        if let Some(hostname) = observation.hostname {
524            hostnames.insert(hostname);
525        }
526
527        let mut addresses = BTreeSet::new();
528        addresses.insert(observation.address);
529
530        Self {
531            fingerprint: observation.fingerprint,
532            aliases,
533            hostnames,
534            addresses,
535            transports: observation.transports,
536            first_seen_unix_ms: observation.seen_unix_ms,
537            last_seen_unix_ms: observation.seen_unix_ms,
538        }
539    }
540
541    fn merge(&mut self, observation: PeerObservation) {
542        if let Some(alias) = observation.alias {
543            self.aliases.insert(alias);
544        }
545        if let Some(hostname) = observation.hostname {
546            self.hostnames.insert(hostname);
547        }
548        self.addresses.insert(observation.address);
549        self.transports.extend(observation.transports);
550        self.first_seen_unix_ms = self.first_seen_unix_ms.min(observation.seen_unix_ms);
551        self.last_seen_unix_ms = self.last_seen_unix_ms.max(observation.seen_unix_ms);
552    }
553
554    pub fn preferred_quic_address(&self) -> Option<SocketAddr> {
555        if !self.transports.contains("quic") {
556            return None;
557        }
558
559        self.addresses.iter().copied().next()
560    }
561}
562
563#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
564#[serde(rename_all = "snake_case")]
565pub enum TrustState {
566    Trusted,
567    Blocked,
568}
569
570#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
571pub struct KnownPeerEntry {
572    pub fingerprint: String,
573    #[serde(default)]
574    pub aliases: BTreeSet<String>,
575    #[serde(default)]
576    pub hostnames: BTreeSet<String>,
577    #[serde(default)]
578    pub addresses: BTreeSet<SocketAddr>,
579    #[serde(default)]
580    pub transports: BTreeSet<String>,
581    pub trust_state: TrustState,
582    pub first_seen_unix_ms: Option<i128>,
583    pub last_seen_unix_ms: Option<i128>,
584}
585
586impl KnownPeerEntry {
587    pub fn trusted(fingerprint: impl Into<String>) -> Result<Self> {
588        let fingerprint = normalized_fingerprint(fingerprint.into())?;
589        Ok(Self {
590            fingerprint,
591            aliases: BTreeSet::new(),
592            hostnames: BTreeSet::new(),
593            addresses: BTreeSet::new(),
594            transports: BTreeSet::new(),
595            trust_state: TrustState::Trusted,
596            first_seen_unix_ms: None,
597            last_seen_unix_ms: None,
598        })
599    }
600
601    pub fn from_record(record: &PeerRecord, trust_state: TrustState) -> Result<Self> {
602        let fingerprint = normalized_fingerprint(record.fingerprint.clone())?;
603        Ok(Self {
604            fingerprint,
605            aliases: record.aliases.clone(),
606            hostnames: record.hostnames.clone(),
607            addresses: record.addresses.clone(),
608            transports: record.transports.clone(),
609            trust_state,
610            first_seen_unix_ms: Some(record.first_seen_unix_ms),
611            last_seen_unix_ms: Some(record.last_seen_unix_ms),
612        })
613    }
614}
615
616#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
617pub struct TrustStore {
618    #[serde(default)]
619    pub peers: BTreeMap<String, KnownPeerEntry>,
620}
621
622impl TrustStore {
623    pub fn load_or_default() -> Result<Self> {
624        Self::load_or_default_from(config_dir()?.join("known_peers.toml"))
625    }
626
627    pub fn load_or_default_from(path: impl AsRef<Path>) -> Result<Self> {
628        let path = path.as_ref();
629        match std::fs::read_to_string(path) {
630            Ok(contents) => {
631                toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
632            }
633            Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Self::default()),
634            Err(source) => Err(Error::IoPath {
635                path: path.to_path_buf(),
636                source,
637            }),
638        }
639    }
640
641    pub fn save_default_path(&self) -> Result<()> {
642        self.save_to_path(config_dir()?.join("known_peers.toml"))
643    }
644
645    pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
646        write_toml_file(path.as_ref(), self)
647    }
648
649    pub fn trust_fingerprint(&mut self, fingerprint: impl Into<String>) -> Result<&KnownPeerEntry> {
650        let entry = KnownPeerEntry::trusted(fingerprint)?;
651        let fingerprint = entry.fingerprint.clone();
652        self.peers
653            .entry(fingerprint.clone())
654            .and_modify(|existing| existing.trust_state = TrustState::Trusted)
655            .or_insert(entry);
656        Ok(self
657            .peers
658            .get(&fingerprint)
659            .expect("trusted peer entry should exist"))
660    }
661
662    pub fn trust_record(&mut self, record: &PeerRecord) -> Result<&KnownPeerEntry> {
663        let entry = KnownPeerEntry::from_record(record, TrustState::Trusted)?;
664        let fingerprint = entry.fingerprint.clone();
665        self.peers
666            .entry(fingerprint.clone())
667            .and_modify(|existing| {
668                existing.aliases.extend(record.aliases.clone());
669                existing.hostnames.extend(record.hostnames.clone());
670                existing.addresses.extend(record.addresses.clone());
671                existing.transports.extend(record.transports.clone());
672                existing.first_seen_unix_ms = Some(
673                    existing
674                        .first_seen_unix_ms
675                        .unwrap_or(record.first_seen_unix_ms)
676                        .min(record.first_seen_unix_ms),
677                );
678                existing.last_seen_unix_ms = Some(
679                    existing
680                        .last_seen_unix_ms
681                        .unwrap_or(record.last_seen_unix_ms)
682                        .max(record.last_seen_unix_ms),
683                );
684                existing.trust_state = TrustState::Trusted;
685            })
686            .or_insert(entry);
687        Ok(self
688            .peers
689            .get(&fingerprint)
690            .expect("trusted peer entry should exist"))
691    }
692
693    pub fn forget(&mut self, fingerprint: &str) -> bool {
694        self.peers.remove(fingerprint).is_some()
695    }
696
697    pub fn records(&self) -> impl Iterator<Item = &KnownPeerEntry> {
698        self.peers.values()
699    }
700
701    pub fn to_toml_string(&self) -> Result<String> {
702        toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
703    }
704}
705
706#[derive(Debug, Default, Clone, PartialEq, Eq)]
707pub struct PeerRegistry {
708    peers: BTreeMap<String, PeerRecord>,
709}
710
711#[derive(Debug, Clone, PartialEq, Eq)]
712pub enum PeerLookup<'a> {
713    Found(&'a PeerRecord),
714    Ambiguous(Vec<&'a PeerRecord>),
715    Missing,
716}
717
718impl PeerRegistry {
719    pub fn new() -> Self {
720        Self::default()
721    }
722
723    pub fn observe(&mut self, observation: PeerObservation) -> Result<&PeerRecord> {
724        if observation.fingerprint.trim().is_empty() {
725            return Err(Error::InvalidInput(
726                "peer fingerprint must not be empty".to_string(),
727            ));
728        }
729
730        let fingerprint = observation.fingerprint.clone();
731        if let Some(record) = self.peers.get_mut(&fingerprint) {
732            record.merge(observation);
733        } else {
734            self.peers.insert(
735                fingerprint.clone(),
736                PeerRecord::from_observation(observation),
737            );
738        }
739
740        Ok(self
741            .peers
742            .get(&fingerprint)
743            .expect("observed peer record should exist"))
744    }
745
746    pub fn get_by_fingerprint(&self, fingerprint: &str) -> Option<&PeerRecord> {
747        self.peers.get(fingerprint)
748    }
749
750    pub fn lookup(&self, query: &str) -> Option<&PeerRecord> {
751        match self.lookup_detail(query) {
752            PeerLookup::Found(record) => Some(record),
753            PeerLookup::Ambiguous(_) | PeerLookup::Missing => None,
754        }
755    }
756
757    pub fn lookup_detail(&self, query: &str) -> PeerLookup<'_> {
758        if let Some(record) = self.peers.get(query) {
759            return PeerLookup::Found(record);
760        }
761
762        let matches = self
763            .peers
764            .values()
765            .filter(|record| {
766                record.fingerprint.starts_with(query)
767                    || record.aliases.iter().any(|alias| alias == query)
768                    || record.hostnames.iter().any(|hostname| hostname == query)
769            })
770            .collect::<Vec<_>>();
771
772        match matches.as_slice() {
773            [] => PeerLookup::Missing,
774            [record] => PeerLookup::Found(record),
775            _ => PeerLookup::Ambiguous(matches),
776        }
777    }
778
779    pub fn records(&self) -> impl Iterator<Item = &PeerRecord> {
780        self.peers.values()
781    }
782}
783
784#[derive(Debug, Clone, PartialEq, Eq)]
785pub struct NativeAnnouncement {
786    pub alias: String,
787    pub fingerprint: String,
788    pub quic_port: u16,
789    pub listen_port: Option<u16>,
790}
791
792impl NativeAnnouncement {
793    pub fn from_config(config: &AppConfig, identity: &NativeIdentity) -> Self {
794        Self {
795            alias: config.alias.clone(),
796            fingerprint: identity.fingerprint().to_string(),
797            quic_port: config.quic_port,
798            listen_port: Some(config.listen_port),
799        }
800    }
801
802    fn service_info(&self) -> Result<ServiceInfo> {
803        let alias = sanitize_dns_label(&self.alias);
804        let fingerprint_prefix = self
805            .fingerprint
806            .get(..12)
807            .unwrap_or(self.fingerprint.as_str());
808        let instance = format!("{alias}-{fingerprint_prefix}");
809        let hostname = format!("{alias}.local.");
810        let properties = [
811            ("pv", NATIVE_PROTOCOL_VERSION.to_string()),
812            ("minpv", MIN_NATIVE_PROTOCOL_VERSION.to_string()),
813            ("fp", self.fingerprint.clone()),
814            ("alias", self.alias.clone()),
815            ("transports", "quic".to_string()),
816            ("quic_port", self.quic_port.to_string()),
817            (
818                "listen_port",
819                self.listen_port
820                    .map(|port| port.to_string())
821                    .unwrap_or_default(),
822            ),
823        ];
824
825        ServiceInfo::new(
826            NATIVE_SERVICE_TYPE,
827            &instance,
828            &hostname,
829            "",
830            self.quic_port,
831            &properties[..],
832        )
833        .map(ServiceInfo::enable_addr_auto)
834        .map_err(|error| Error::Discovery(error.to_string()))
835    }
836}
837
838pub struct NativeAnnouncer {
839    daemon: ServiceDaemon,
840    fullname: String,
841}
842
843impl NativeAnnouncer {
844    pub fn start(announcement: &NativeAnnouncement) -> Result<Self> {
845        let daemon = ServiceDaemon::new().map_err(|error| Error::Discovery(error.to_string()))?;
846        let service = announcement.service_info()?;
847        let fullname = service.get_fullname().to_string();
848        daemon
849            .register(service)
850            .map_err(|error| Error::Discovery(error.to_string()))?;
851        Ok(Self { daemon, fullname })
852    }
853}
854
855impl Drop for NativeAnnouncer {
856    fn drop(&mut self) {
857        let _ = self.daemon.unregister(&self.fullname);
858        let _ = self.daemon.shutdown();
859    }
860}
861
862pub async fn discover_native_peers(timeout: Duration) -> Result<PeerRegistry> {
863    let daemon = ServiceDaemon::new().map_err(|error| Error::Discovery(error.to_string()))?;
864    let receiver = daemon
865        .browse(NATIVE_SERVICE_TYPE)
866        .map_err(|error| Error::Discovery(error.to_string()))?;
867    let deadline = tokio::time::Instant::now() + timeout;
868    let mut registry = PeerRegistry::new();
869
870    loop {
871        let now = tokio::time::Instant::now();
872        if now >= deadline {
873            break;
874        }
875
876        let wait = deadline - now;
877        match tokio::time::timeout(wait, receiver.recv_async()).await {
878            Ok(Ok(ServiceEvent::ServiceResolved(service))) => {
879                observe_resolved_service(&mut registry, &service)?;
880            }
881            Ok(Ok(_)) => {}
882            Ok(Err(error)) => {
883                return Err(Error::Discovery(error.to_string()));
884            }
885            Err(_) => break,
886        }
887    }
888
889    daemon
890        .stop_browse(NATIVE_SERVICE_TYPE)
891        .map_err(|error| Error::Discovery(error.to_string()))?;
892    let _ = daemon.shutdown();
893    Ok(registry)
894}
895
896fn observe_resolved_service(registry: &mut PeerRegistry, service: &ResolvedService) -> Result<()> {
897    let Some(fingerprint) = service.get_property_val_str("fp") else {
898        return Ok(());
899    };
900    let fingerprint = match normalized_fingerprint(fingerprint.to_string()) {
901        Ok(fingerprint) => fingerprint,
902        Err(_) => return Ok(()),
903    };
904    let Some(highest_version) = parse_txt_u16(service, "pv") else {
905        return Ok(());
906    };
907    let Some(minimum_version) = parse_txt_u16(service, "minpv") else {
908        return Ok(());
909    };
910    if minimum_version > highest_version
911        || minimum_version > NATIVE_PROTOCOL_VERSION
912        || highest_version < MIN_NATIVE_PROTOCOL_VERSION
913    {
914        return Ok(());
915    }
916
917    let Some(quic_port) = parse_txt_u16(service, "quic_port") else {
918        return Ok(());
919    };
920    let transports = service
921        .get_property_val_str("transports")
922        .unwrap_or_default()
923        .split(',')
924        .map(str::trim)
925        .filter(|value| !value.is_empty())
926        .map(ToOwned::to_owned)
927        .collect::<Vec<_>>();
928    if !transports.iter().any(|transport| transport == "quic") {
929        return Ok(());
930    }
931
932    let alias = service.get_property_val_str("alias").map(ToOwned::to_owned);
933    let hostname = Some(service.get_hostname().trim_end_matches('.').to_string());
934    let seen_unix_ms = system_time_unix_ms(SystemTime::now()).unwrap_or_default();
935
936    for address in service.get_addresses_v4() {
937        let mut observation = PeerObservation::new(
938            fingerprint.clone(),
939            SocketAddr::from((address, quic_port)),
940            seen_unix_ms,
941        );
942        if let Some(alias) = &alias {
943            observation = observation.with_alias(alias.clone());
944        }
945        if let Some(hostname) = &hostname {
946            observation = observation.with_hostname(hostname.clone());
947        }
948        for transport in &transports {
949            observation = observation.with_transport(transport.clone());
950        }
951        registry.observe(observation)?;
952    }
953
954    Ok(())
955}
956
957fn parse_txt_u16(service: &ResolvedService, key: &str) -> Option<u16> {
958    service.get_property_val_str(key)?.parse().ok()
959}
960
961fn sanitize_dns_label(value: &str) -> String {
962    let mut label = value
963        .chars()
964        .map(|ch| {
965            if ch.is_ascii_alphanumeric() || ch == '-' {
966                ch.to_ascii_lowercase()
967            } else {
968                '-'
969            }
970        })
971        .collect::<String>();
972    while label.contains("--") {
973        label = label.replace("--", "-");
974    }
975    let label = label.trim_matches('-').to_string();
976    if label.is_empty() {
977        "fileferry-device".to_string()
978    } else {
979        label
980    }
981}
982
983#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
984struct DirectReceivePlan {
985    files: Vec<DirectFilePlan>,
986}
987
988#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
989struct DirectProtocolHello {
990    protocol_version: u16,
991    min_protocol_version: u16,
992    client_fingerprint: String,
993}
994
995impl DirectProtocolHello {
996    fn new(identity: &NativeIdentity) -> Self {
997        Self {
998            protocol_version: NATIVE_PROTOCOL_VERSION,
999            min_protocol_version: MIN_NATIVE_PROTOCOL_VERSION,
1000            client_fingerprint: identity.fingerprint().to_string(),
1001        }
1002    }
1003}
1004
1005#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1006struct DirectProtocolResponse {
1007    accepted: bool,
1008    protocol_version: Option<u16>,
1009    error: Option<String>,
1010}
1011
1012impl DirectProtocolResponse {
1013    fn accept(protocol_version: u16) -> Self {
1014        Self {
1015            accepted: true,
1016            protocol_version: Some(protocol_version),
1017            error: None,
1018        }
1019    }
1020
1021    fn reject(error: impl Into<String>) -> Self {
1022        Self {
1023            accepted: false,
1024            protocol_version: None,
1025            error: Some(error.into()),
1026        }
1027    }
1028}
1029
1030#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1031struct DirectFilePlan {
1032    relative_path: PathBuf,
1033    action: DirectFileAction,
1034    offset: u64,
1035}
1036
1037#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1038#[serde(rename_all = "snake_case")]
1039enum DirectFileAction {
1040    Receive,
1041    Skip,
1042}
1043
1044#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1045struct DirectPayloadHeader {
1046    relative_path: PathBuf,
1047    offset: u64,
1048    bytes: u64,
1049}
1050
1051#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1052struct DirectAck {
1053    ok: bool,
1054    error: Option<String>,
1055}
1056
1057pub fn validate_send_paths(paths: &[PathBuf]) -> Result<()> {
1058    if paths.is_empty() {
1059        return Err(Error::InvalidInput(
1060            "at least one file path is required".to_string(),
1061        ));
1062    }
1063
1064    for path in paths {
1065        if path.as_os_str().is_empty() {
1066            return Err(Error::InvalidInput("empty file path".to_string()));
1067        }
1068    }
1069
1070    Ok(())
1071}
1072
1073pub fn display_path(path: &Path) -> String {
1074    path.to_string_lossy().into_owned()
1075}
1076
1077pub async fn build_manifest(paths: &[PathBuf]) -> Result<TransferManifest> {
1078    validate_send_paths(paths)?;
1079
1080    let sources = collect_manifest_sources(paths)?;
1081    let mut entries = Vec::with_capacity(sources.len());
1082    let mut seen = BTreeSet::new();
1083
1084    for source in sources {
1085        if !seen.insert(source.relative_path.clone()) {
1086            return Err(Error::InvalidInput(format!(
1087                "duplicate transfer path: {}",
1088                display_path(&source.relative_path)
1089            )));
1090        }
1091
1092        entries.push(build_manifest_entry(source).await?);
1093    }
1094
1095    Ok(TransferManifest {
1096        version: MANIFEST_VERSION,
1097        session_id: Uuid::new_v4(),
1098        chunk_size: DEFAULT_CHUNK_SIZE,
1099        entries,
1100    })
1101}
1102
1103pub fn safe_destination_path(destination: &Path, relative_path: &Path) -> Result<PathBuf> {
1104    validate_relative_transfer_path(relative_path)?;
1105    Ok(destination.join(relative_path))
1106}
1107
1108pub async fn decide_resume(
1109    destination: &Path,
1110    entry: &ManifestEntry,
1111    chunk_size: u64,
1112) -> Result<ResumeDecision> {
1113    if entry.kind != ManifestEntryKind::File {
1114        return Ok(ResumeDecision::Create);
1115    }
1116
1117    let path = safe_destination_path(destination, &entry.relative_path)?;
1118    let metadata = match tokio::fs::metadata(&path).await {
1119        Ok(metadata) => metadata,
1120        Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
1121            return Ok(ResumeDecision::Create);
1122        }
1123        Err(source) => {
1124            return Err(Error::IoPath { path, source });
1125        }
1126    };
1127
1128    if !metadata.is_file() {
1129        return Ok(ResumeDecision::Conflict(format!(
1130            "{} already exists and is not a regular file",
1131            display_path(&path)
1132        )));
1133    }
1134
1135    if metadata.len() == entry.size {
1136        let expected = entry
1137            .blake3
1138            .as_deref()
1139            .ok_or_else(|| Error::Protocol("file manifest entry is missing BLAKE3".to_string()))?;
1140        let actual = hash_file(&path).await?;
1141        return if actual == expected {
1142            Ok(ResumeDecision::Skip)
1143        } else {
1144            Ok(ResumeDecision::Conflict(format!(
1145                "{} already exists with different contents",
1146                display_path(&path)
1147            )))
1148        };
1149    }
1150
1151    if metadata.len() > entry.size {
1152        return Ok(ResumeDecision::Conflict(format!(
1153            "{} is larger than incoming file",
1154            display_path(&path)
1155        )));
1156    }
1157
1158    let verified_offset = verified_prefix_offset(&path, entry, metadata.len(), chunk_size).await?;
1159    if verified_offset > 0 {
1160        Ok(ResumeDecision::ResumeFrom(verified_offset))
1161    } else {
1162        Ok(ResumeDecision::Conflict(format!(
1163            "{} already exists and does not match a verified prefix",
1164            display_path(&path)
1165        )))
1166    }
1167}
1168
1169pub async fn send_direct_file(
1170    request: &SendRequest,
1171    identity: &NativeIdentity,
1172    mut events: impl FnMut(TransferEvent),
1173) -> Result<TransferSummary> {
1174    ensure_rustls_provider();
1175
1176    let manifest = build_manifest(request.paths()).await?;
1177    let sources = manifest_source_map(request.paths())?;
1178    events(TransferEvent::SessionStarted {
1179        direction: TransferDirection::Send,
1180        session_id: manifest.session_id,
1181        files_total: manifest.file_entries().count(),
1182        bytes_total: manifest.total_file_bytes(),
1183    });
1184
1185    let mut endpoint = quinn::Endpoint::client(SocketAddr::from(([0, 0, 0, 0], 0)))?;
1186    endpoint.set_default_client_config(insecure_client_config()?);
1187    let connection = endpoint
1188        .connect(request.peer().address(), "localhost")?
1189        .await?;
1190    let (mut send, mut recv) = connection.open_bi().await?;
1191
1192    send.write_all(DIRECT_MAGIC).await?;
1193    write_json_frame(&mut send, &DirectProtocolHello::new(identity)).await?;
1194    let protocol: DirectProtocolResponse = read_json_frame(&mut recv).await?;
1195    if !protocol.accepted {
1196        return Err(Error::Protocol(protocol.error.unwrap_or_else(|| {
1197            "receiver rejected protocol negotiation".to_string()
1198        })));
1199    }
1200    if protocol.protocol_version != Some(NATIVE_PROTOCOL_VERSION) {
1201        return Err(Error::Protocol(format!(
1202            "receiver selected unsupported protocol version {:?}",
1203            protocol.protocol_version
1204        )));
1205    }
1206
1207    write_json_frame(&mut send, &manifest).await?;
1208
1209    let plan: DirectReceivePlan = read_json_frame(&mut recv).await?;
1210    let mut summaries = Vec::new();
1211    for file_plan in plan.files {
1212        let Some(entry) = manifest
1213            .file_entries()
1214            .find(|entry| entry.relative_path == file_plan.relative_path)
1215        else {
1216            return Err(Error::Protocol(format!(
1217                "receiver requested unknown file: {}",
1218                display_path(&file_plan.relative_path)
1219            )));
1220        };
1221
1222        let Some(source_path) = sources.get(&file_plan.relative_path) else {
1223            return Err(Error::Protocol(format!(
1224                "missing source for manifest file: {}",
1225                display_path(&file_plan.relative_path)
1226            )));
1227        };
1228
1229        if file_plan.action == DirectFileAction::Skip {
1230            let blake3 = entry.blake3.clone().unwrap_or_default();
1231            events(TransferEvent::FileFinished {
1232                direction: TransferDirection::Send,
1233                file_name: display_path(&entry.relative_path),
1234                bytes: entry.size,
1235                blake3: blake3.clone(),
1236                status: TransferFileStatus::Skipped,
1237            });
1238            summaries.push(TransferFileSummary {
1239                path: source_path.clone(),
1240                relative_path: entry.relative_path.clone(),
1241                bytes: entry.size,
1242                blake3,
1243                status: TransferFileStatus::Skipped,
1244            });
1245            continue;
1246        }
1247
1248        send_payload_file(&mut send, source_path, entry, file_plan.offset, &mut events).await?;
1249        let status = if file_plan.offset > 0 {
1250            TransferFileStatus::Resumed
1251        } else {
1252            TransferFileStatus::Sent
1253        };
1254        let blake3 = entry.blake3.clone().unwrap_or_default();
1255        events(TransferEvent::FileFinished {
1256            direction: TransferDirection::Send,
1257            file_name: display_path(&entry.relative_path),
1258            bytes: entry.size,
1259            blake3: blake3.clone(),
1260            status,
1261        });
1262        summaries.push(TransferFileSummary {
1263            path: source_path.clone(),
1264            relative_path: entry.relative_path.clone(),
1265            bytes: entry.size,
1266            blake3,
1267            status,
1268        });
1269    }
1270    send.finish()?;
1271
1272    let ack: DirectAck = read_json_frame(&mut recv).await?;
1273    if !ack.ok {
1274        return Err(Error::Transfer(
1275            ack.error
1276                .unwrap_or_else(|| "receiver rejected transfer".to_string()),
1277        ));
1278    }
1279
1280    connection.close(0u32.into(), b"done");
1281    endpoint.wait_idle().await;
1282
1283    let summary = summary_from_files(&manifest, summaries)?;
1284    events(TransferEvent::SessionFinished {
1285        direction: TransferDirection::Send,
1286        files_total: summary.files.len(),
1287        bytes_total: summary.bytes,
1288        blake3: summary.blake3.clone(),
1289    });
1290    Ok(summary)
1291}
1292
1293pub async fn receive_direct_file(
1294    request: &ReceiveRequest,
1295    destination: impl AsRef<Path>,
1296    identity: &NativeIdentity,
1297    mut events: impl FnMut(TransferEvent),
1298) -> Result<TransferSummary> {
1299    ensure_rustls_provider();
1300
1301    let destination = destination.as_ref();
1302    tokio::fs::create_dir_all(destination)
1303        .await
1304        .map_err(|source| Error::IoPath {
1305            path: destination.to_path_buf(),
1306            source,
1307        })?;
1308
1309    let server_config =
1310        quinn::ServerConfig::with_single_cert(identity.cert_chain(), identity.private_key())?;
1311    let endpoint = quinn::Endpoint::server(server_config, request.listen())?;
1312    let incoming = endpoint.accept().await.ok_or(Error::NoIncomingConnection)?;
1313    let connection = incoming.await?;
1314    let (mut send, mut recv) = connection.accept_bi().await?;
1315
1316    let result = receive_stream_file(destination, &mut recv, &mut send, &mut events).await;
1317    let ack = match &result {
1318        Ok(_) => DirectAck {
1319            ok: true,
1320            error: None,
1321        },
1322        Err(error) => DirectAck {
1323            ok: false,
1324            error: Some(error.to_string()),
1325        },
1326    };
1327    write_json_frame(&mut send, &ack).await?;
1328    send.finish()?;
1329    let _ = tokio::time::timeout(Duration::from_secs(1), connection.closed()).await;
1330    endpoint.close(0u32.into(), b"done");
1331    endpoint.wait_idle().await;
1332
1333    result
1334}
1335
1336async fn receive_stream_file(
1337    destination: &Path,
1338    recv: &mut quinn::RecvStream,
1339    send: &mut quinn::SendStream,
1340    events: &mut impl FnMut(TransferEvent),
1341) -> Result<TransferSummary> {
1342    let mut magic = [0; DIRECT_MAGIC.len()];
1343    recv.read_exact(&mut magic).await?;
1344    if &magic != DIRECT_MAGIC {
1345        return Err(Error::Protocol("bad direct-transfer magic".to_string()));
1346    }
1347
1348    let hello: DirectProtocolHello = read_json_frame(recv).await?;
1349    let protocol = negotiate_direct_protocol(&hello);
1350    write_json_frame(send, &protocol).await?;
1351    if !protocol.accepted {
1352        return Err(Error::Protocol(protocol.error.unwrap_or_else(|| {
1353            "unsupported direct protocol version".to_string()
1354        })));
1355    }
1356
1357    let manifest: TransferManifest = read_json_frame(recv).await?;
1358    validate_manifest(&manifest)?;
1359    events(TransferEvent::SessionStarted {
1360        direction: TransferDirection::Receive,
1361        session_id: manifest.session_id,
1362        files_total: manifest.file_entries().count(),
1363        bytes_total: manifest.total_file_bytes(),
1364    });
1365
1366    for entry in &manifest.entries {
1367        if entry.kind == ManifestEntryKind::Directory {
1368            let path = safe_destination_path(destination, &entry.relative_path)?;
1369            tokio::fs::create_dir_all(&path)
1370                .await
1371                .map_err(|source| Error::IoPath { path, source })?;
1372        }
1373    }
1374
1375    let mut plan = DirectReceivePlan { files: Vec::new() };
1376    for entry in manifest.file_entries() {
1377        let decision = decide_resume(destination, entry, manifest.chunk_size).await?;
1378        let (action, offset) = match decision {
1379            ResumeDecision::Create => (DirectFileAction::Receive, 0),
1380            ResumeDecision::Skip => (DirectFileAction::Skip, entry.size),
1381            ResumeDecision::ResumeFrom(offset) => (DirectFileAction::Receive, offset),
1382            ResumeDecision::Conflict(message) => return Err(Error::InvalidInput(message)),
1383        };
1384        plan.files.push(DirectFilePlan {
1385            relative_path: entry.relative_path.clone(),
1386            action,
1387            offset,
1388        });
1389    }
1390    write_json_frame(send, &plan).await?;
1391
1392    let mut summaries = Vec::new();
1393    for file_plan in &plan.files {
1394        let Some(entry) = manifest
1395            .file_entries()
1396            .find(|entry| entry.relative_path == file_plan.relative_path)
1397        else {
1398            return Err(Error::Protocol(format!(
1399                "planned file missing from manifest: {}",
1400                display_path(&file_plan.relative_path)
1401            )));
1402        };
1403
1404        let final_path = safe_destination_path(destination, &entry.relative_path)?;
1405        if file_plan.action == DirectFileAction::Skip {
1406            let blake3 = entry.blake3.clone().unwrap_or_default();
1407            events(TransferEvent::FileFinished {
1408                direction: TransferDirection::Receive,
1409                file_name: display_path(&entry.relative_path),
1410                bytes: entry.size,
1411                blake3: blake3.clone(),
1412                status: TransferFileStatus::Skipped,
1413            });
1414            summaries.push(TransferFileSummary {
1415                path: final_path,
1416                relative_path: entry.relative_path.clone(),
1417                bytes: entry.size,
1418                blake3,
1419                status: TransferFileStatus::Skipped,
1420            });
1421            continue;
1422        }
1423
1424        receive_payload_file(recv, destination, entry, file_plan.offset, events).await?;
1425        let actual_hash = hash_file(&final_path).await?;
1426        let expected_hash = entry
1427            .blake3
1428            .as_deref()
1429            .ok_or_else(|| Error::Protocol("file manifest entry is missing BLAKE3".to_string()))?;
1430        if actual_hash != expected_hash {
1431            return Err(Error::Transfer(format!(
1432                "BLAKE3 hash mismatch for {}",
1433                display_path(&entry.relative_path)
1434            )));
1435        }
1436
1437        let status = if file_plan.offset > 0 {
1438            TransferFileStatus::Resumed
1439        } else {
1440            TransferFileStatus::Received
1441        };
1442        events(TransferEvent::FileFinished {
1443            direction: TransferDirection::Receive,
1444            file_name: display_path(&entry.relative_path),
1445            bytes: entry.size,
1446            blake3: actual_hash.clone(),
1447            status,
1448        });
1449        summaries.push(TransferFileSummary {
1450            path: final_path,
1451            relative_path: entry.relative_path.clone(),
1452            bytes: entry.size,
1453            blake3: actual_hash,
1454            status,
1455        });
1456    }
1457
1458    let summary = summary_from_files(&manifest, summaries)?;
1459    events(TransferEvent::SessionFinished {
1460        direction: TransferDirection::Receive,
1461        files_total: summary.files.len(),
1462        bytes_total: summary.bytes,
1463        blake3: summary.blake3.clone(),
1464    });
1465    Ok(summary)
1466}
1467
1468async fn send_payload_file(
1469    send: &mut quinn::SendStream,
1470    source_path: &Path,
1471    entry: &ManifestEntry,
1472    offset: u64,
1473    events: &mut impl FnMut(TransferEvent),
1474) -> Result<()> {
1475    if offset > entry.size {
1476        return Err(Error::Protocol(format!(
1477            "resume offset exceeds file size for {}",
1478            display_path(&entry.relative_path)
1479        )));
1480    }
1481
1482    let header = DirectPayloadHeader {
1483        relative_path: entry.relative_path.clone(),
1484        offset,
1485        bytes: entry.size - offset,
1486    };
1487    write_json_frame(send, &header).await?;
1488    events(TransferEvent::FileStarted {
1489        direction: TransferDirection::Send,
1490        file_name: display_path(&entry.relative_path),
1491        bytes_total: entry.size,
1492        resume_offset: offset,
1493    });
1494
1495    let mut file = tokio::fs::File::open(source_path)
1496        .await
1497        .map_err(|source| Error::IoPath {
1498            path: source_path.to_path_buf(),
1499            source,
1500        })?;
1501    file.seek(SeekFrom::Start(offset))
1502        .await
1503        .map_err(|source| Error::IoPath {
1504            path: source_path.to_path_buf(),
1505            source,
1506        })?;
1507
1508    let mut buffer = vec![0; IO_BUFFER_SIZE];
1509    let mut bytes_done = offset;
1510    while bytes_done < entry.size {
1511        let remaining = entry.size - bytes_done;
1512        let read_size = buffer.len().min(remaining as usize);
1513        let read = file
1514            .read(&mut buffer[..read_size])
1515            .await
1516            .map_err(|source| Error::IoPath {
1517                path: source_path.to_path_buf(),
1518                source,
1519            })?;
1520        if read == 0 {
1521            return Err(Error::Protocol(format!(
1522                "source file ended early: {}",
1523                display_path(source_path)
1524            )));
1525        }
1526        send.write_all(&buffer[..read]).await?;
1527        bytes_done += read as u64;
1528        events(TransferEvent::Progress {
1529            direction: TransferDirection::Send,
1530            file_name: display_path(&entry.relative_path),
1531            bytes_done,
1532            bytes_total: entry.size,
1533        });
1534    }
1535
1536    Ok(())
1537}
1538
1539async fn receive_payload_file(
1540    recv: &mut quinn::RecvStream,
1541    destination: &Path,
1542    entry: &ManifestEntry,
1543    offset: u64,
1544    events: &mut impl FnMut(TransferEvent),
1545) -> Result<()> {
1546    let header: DirectPayloadHeader = read_json_frame(recv).await?;
1547    if header.relative_path != entry.relative_path
1548        || header.offset != offset
1549        || header.bytes != entry.size - offset
1550    {
1551        return Err(Error::Protocol(format!(
1552            "payload header does not match receive plan for {}",
1553            display_path(&entry.relative_path)
1554        )));
1555    }
1556    events(TransferEvent::FileStarted {
1557        direction: TransferDirection::Receive,
1558        file_name: display_path(&entry.relative_path),
1559        bytes_total: entry.size,
1560        resume_offset: offset,
1561    });
1562
1563    let final_path = safe_destination_path(destination, &entry.relative_path)?;
1564    if let Some(parent) = final_path.parent() {
1565        tokio::fs::create_dir_all(parent)
1566            .await
1567            .map_err(|source| Error::IoPath {
1568                path: parent.to_path_buf(),
1569                source,
1570            })?;
1571    }
1572
1573    let write_path = if offset == 0 {
1574        temp_path_for(&final_path)
1575    } else {
1576        final_path.clone()
1577    };
1578
1579    let mut file = if offset == 0 {
1580        tokio::fs::File::create(&write_path)
1581            .await
1582            .map_err(|source| Error::IoPath {
1583                path: write_path.clone(),
1584                source,
1585            })?
1586    } else {
1587        let mut file = tokio::fs::OpenOptions::new()
1588            .write(true)
1589            .open(&write_path)
1590            .await
1591            .map_err(|source| Error::IoPath {
1592                path: write_path.clone(),
1593                source,
1594            })?;
1595        file.set_len(offset).await.map_err(|source| Error::IoPath {
1596            path: write_path.clone(),
1597            source,
1598        })?;
1599        file.seek(SeekFrom::Start(offset))
1600            .await
1601            .map_err(|source| Error::IoPath {
1602                path: write_path.clone(),
1603                source,
1604            })?;
1605        file
1606    };
1607
1608    let mut bytes_done = offset;
1609    let mut bytes_remaining = header.bytes;
1610    let mut buffer = vec![0; IO_BUFFER_SIZE];
1611    while bytes_remaining > 0 {
1612        let read_size = buffer.len().min(bytes_remaining as usize);
1613        let read = recv.read(&mut buffer[..read_size]).await?;
1614        let Some(read) = read else {
1615            if offset == 0 {
1616                let _ = tokio::fs::remove_file(&write_path).await;
1617            }
1618            return Err(Error::Protocol(
1619                "stream ended before file payload".to_string(),
1620            ));
1621        };
1622
1623        file.write_all(&buffer[..read])
1624            .await
1625            .map_err(|source| Error::IoPath {
1626                path: write_path.clone(),
1627                source,
1628            })?;
1629        bytes_done += read as u64;
1630        bytes_remaining -= read as u64;
1631        events(TransferEvent::Progress {
1632            direction: TransferDirection::Receive,
1633            file_name: display_path(&entry.relative_path),
1634            bytes_done,
1635            bytes_total: entry.size,
1636        });
1637    }
1638
1639    file.flush().await.map_err(|source| Error::IoPath {
1640        path: write_path.clone(),
1641        source,
1642    })?;
1643    drop(file);
1644
1645    if offset == 0 {
1646        tokio::fs::rename(&write_path, &final_path)
1647            .await
1648            .map_err(|source| Error::IoPath {
1649                path: final_path,
1650                source,
1651            })?;
1652    }
1653
1654    Ok(())
1655}
1656
1657async fn write_json_frame<T: Serialize>(send: &mut quinn::SendStream, value: &T) -> Result<()> {
1658    let bytes = serde_json::to_vec(value).map_err(|error| Error::Protocol(error.to_string()))?;
1659    if bytes.len() > MAX_FRAME_LEN {
1660        return Err(Error::Protocol("frame exceeds maximum length".to_string()));
1661    }
1662    send.write_all(&(bytes.len() as u32).to_be_bytes()).await?;
1663    send.write_all(&bytes).await?;
1664    Ok(())
1665}
1666
1667async fn read_json_frame<T: for<'de> Deserialize<'de>>(recv: &mut quinn::RecvStream) -> Result<T> {
1668    let mut len = [0; 4];
1669    recv.read_exact(&mut len).await?;
1670    let len = u32::from_be_bytes(len) as usize;
1671    if len > MAX_FRAME_LEN {
1672        return Err(Error::Protocol("frame exceeds maximum length".to_string()));
1673    }
1674
1675    let mut bytes = vec![0; len];
1676    recv.read_exact(&mut bytes).await?;
1677    serde_json::from_slice(&bytes).map_err(|error| Error::Protocol(error.to_string()))
1678}
1679
1680fn negotiate_direct_protocol(hello: &DirectProtocolHello) -> DirectProtocolResponse {
1681    if hello.min_protocol_version > NATIVE_PROTOCOL_VERSION {
1682        return DirectProtocolResponse::reject(format!(
1683            "peer requires protocol version {}, local max is {}",
1684            hello.min_protocol_version, NATIVE_PROTOCOL_VERSION
1685        ));
1686    }
1687
1688    if hello.protocol_version < MIN_NATIVE_PROTOCOL_VERSION {
1689        return DirectProtocolResponse::reject(format!(
1690            "peer max protocol version {} is below local minimum {}",
1691            hello.protocol_version, MIN_NATIVE_PROTOCOL_VERSION
1692        ));
1693    }
1694
1695    DirectProtocolResponse::accept(NATIVE_PROTOCOL_VERSION.min(hello.protocol_version))
1696}
1697
1698#[derive(Debug, Clone)]
1699struct ManifestSource {
1700    source_path: PathBuf,
1701    relative_path: PathBuf,
1702}
1703
1704fn collect_manifest_sources(paths: &[PathBuf]) -> Result<Vec<ManifestSource>> {
1705    let mut sources = Vec::new();
1706    for path in paths {
1707        let metadata = std::fs::symlink_metadata(path).map_err(|source| Error::IoPath {
1708            path: path.clone(),
1709            source,
1710        })?;
1711        if metadata.file_type().is_symlink() {
1712            return Err(Error::InvalidInput(format!(
1713                "{} is a symlink; symlink transfers are not supported",
1714                display_path(path)
1715            )));
1716        }
1717
1718        let root_name = path
1719            .file_name()
1720            .ok_or_else(|| Error::InvalidInput(format!("{} has no file name", display_path(path))))?
1721            .to_os_string();
1722        let relative_path = PathBuf::from(root_name);
1723
1724        if metadata.is_dir() {
1725            sources.push(ManifestSource {
1726                source_path: path.clone(),
1727                relative_path: relative_path.clone(),
1728            });
1729            collect_dir_sources(path, &relative_path, &mut sources)?;
1730        } else if metadata.is_file() {
1731            sources.push(ManifestSource {
1732                source_path: path.clone(),
1733                relative_path,
1734            });
1735        } else {
1736            return Err(Error::InvalidInput(format!(
1737                "{} is not a regular file or directory",
1738                display_path(path)
1739            )));
1740        }
1741    }
1742
1743    Ok(sources)
1744}
1745
1746fn collect_dir_sources(
1747    dir: &Path,
1748    relative_dir: &Path,
1749    sources: &mut Vec<ManifestSource>,
1750) -> Result<()> {
1751    for entry in std::fs::read_dir(dir).map_err(|source| Error::IoPath {
1752        path: dir.to_path_buf(),
1753        source,
1754    })? {
1755        let entry = entry.map_err(|source| Error::IoPath {
1756            path: dir.to_path_buf(),
1757            source,
1758        })?;
1759        let path = entry.path();
1760        let metadata = std::fs::symlink_metadata(&path).map_err(|source| Error::IoPath {
1761            path: path.clone(),
1762            source,
1763        })?;
1764        if metadata.file_type().is_symlink() {
1765            return Err(Error::InvalidInput(format!(
1766                "{} is a symlink; symlink transfers are not supported",
1767                display_path(&path)
1768            )));
1769        }
1770
1771        let relative_path = relative_dir.join(entry.file_name());
1772        if metadata.is_dir() {
1773            sources.push(ManifestSource {
1774                source_path: path.clone(),
1775                relative_path: relative_path.clone(),
1776            });
1777            collect_dir_sources(&path, &relative_path, sources)?;
1778        } else if metadata.is_file() {
1779            sources.push(ManifestSource {
1780                source_path: path,
1781                relative_path,
1782            });
1783        }
1784    }
1785
1786    Ok(())
1787}
1788
1789fn manifest_source_map(paths: &[PathBuf]) -> Result<BTreeMap<PathBuf, PathBuf>> {
1790    Ok(collect_manifest_sources(paths)?
1791        .into_iter()
1792        .filter_map(|source| {
1793            let metadata = std::fs::metadata(&source.source_path).ok()?;
1794            metadata
1795                .is_file()
1796                .then_some((source.relative_path, source.source_path))
1797        })
1798        .collect())
1799}
1800
1801async fn build_manifest_entry(source: ManifestSource) -> Result<ManifestEntry> {
1802    validate_relative_transfer_path(&source.relative_path)?;
1803    let metadata = tokio::fs::metadata(&source.source_path)
1804        .await
1805        .map_err(|source_error| Error::IoPath {
1806            path: source.source_path.clone(),
1807            source: source_error,
1808        })?;
1809    let unix_mode = unix_mode(&metadata);
1810    let modified_unix_ms = modified_unix_ms(&metadata);
1811
1812    if metadata.is_dir() {
1813        return Ok(ManifestEntry {
1814            relative_path: source.relative_path,
1815            kind: ManifestEntryKind::Directory,
1816            size: 0,
1817            blake3: None,
1818            chunks: Vec::new(),
1819            unix_mode,
1820            modified_unix_ms,
1821        });
1822    }
1823
1824    let (blake3, chunks) = hash_file_with_chunks(&source.source_path, DEFAULT_CHUNK_SIZE).await?;
1825    Ok(ManifestEntry {
1826        relative_path: source.relative_path,
1827        kind: ManifestEntryKind::File,
1828        size: metadata.len(),
1829        blake3: Some(blake3),
1830        chunks,
1831        unix_mode,
1832        modified_unix_ms,
1833    })
1834}
1835
1836fn validate_manifest(manifest: &TransferManifest) -> Result<()> {
1837    if manifest.version != MANIFEST_VERSION {
1838        return Err(Error::Protocol(format!(
1839            "unsupported manifest version {}",
1840            manifest.version
1841        )));
1842    }
1843    if manifest.chunk_size == 0 {
1844        return Err(Error::Protocol("manifest chunk size is zero".to_string()));
1845    }
1846
1847    let mut seen = BTreeSet::new();
1848    for entry in &manifest.entries {
1849        validate_relative_transfer_path(&entry.relative_path)?;
1850        if !seen.insert(entry.relative_path.clone()) {
1851            return Err(Error::Protocol(format!(
1852                "duplicate manifest path: {}",
1853                display_path(&entry.relative_path)
1854            )));
1855        }
1856        if entry.kind == ManifestEntryKind::File && entry.blake3.is_none() {
1857            return Err(Error::Protocol(format!(
1858                "file manifest entry is missing BLAKE3: {}",
1859                display_path(&entry.relative_path)
1860            )));
1861        }
1862    }
1863
1864    Ok(())
1865}
1866
1867fn validate_relative_transfer_path(path: &Path) -> Result<()> {
1868    if path.as_os_str().is_empty() || path.is_absolute() {
1869        return Err(Error::InvalidInput(format!(
1870            "unsafe transfer path: {}",
1871            display_path(path)
1872        )));
1873    }
1874
1875    let mut normal_components = 0usize;
1876    for component in path.components() {
1877        match component {
1878            Component::Normal(value) => {
1879                let Some(name) = value.to_str() else {
1880                    return Err(Error::InvalidInput(format!(
1881                        "transfer path must be UTF-8: {}",
1882                        display_path(path)
1883                    )));
1884                };
1885                validate_path_component(name, path)?;
1886                normal_components += 1;
1887            }
1888            Component::CurDir
1889            | Component::ParentDir
1890            | Component::RootDir
1891            | Component::Prefix(_) => {
1892                return Err(Error::InvalidInput(format!(
1893                    "unsafe transfer path: {}",
1894                    display_path(path)
1895                )));
1896            }
1897        }
1898    }
1899
1900    if normal_components == 0 {
1901        return Err(Error::InvalidInput(format!(
1902            "unsafe transfer path: {}",
1903            display_path(path)
1904        )));
1905    }
1906
1907    Ok(())
1908}
1909
1910fn validate_path_component(component: &str, full_path: &Path) -> Result<()> {
1911    if component.is_empty()
1912        || component.contains('\\')
1913        || component.contains('/')
1914        || component.contains(':')
1915        || component.ends_with(' ')
1916        || component.ends_with('.')
1917        || is_windows_reserved_name(component)
1918    {
1919        return Err(Error::InvalidInput(format!(
1920            "unsafe transfer path: {}",
1921            display_path(full_path)
1922        )));
1923    }
1924
1925    Ok(())
1926}
1927
1928fn is_windows_reserved_name(component: &str) -> bool {
1929    let stem = component
1930        .split('.')
1931        .next()
1932        .unwrap_or(component)
1933        .trim_end_matches([' ', '.'])
1934        .to_ascii_uppercase();
1935    matches!(
1936        stem.as_str(),
1937        "CON"
1938            | "PRN"
1939            | "AUX"
1940            | "NUL"
1941            | "COM1"
1942            | "COM2"
1943            | "COM3"
1944            | "COM4"
1945            | "COM5"
1946            | "COM6"
1947            | "COM7"
1948            | "COM8"
1949            | "COM9"
1950            | "LPT1"
1951            | "LPT2"
1952            | "LPT3"
1953            | "LPT4"
1954            | "LPT5"
1955            | "LPT6"
1956            | "LPT7"
1957            | "LPT8"
1958            | "LPT9"
1959    )
1960}
1961
1962async fn hash_file(path: &Path) -> Result<String> {
1963    hash_file_with_chunks(path, DEFAULT_CHUNK_SIZE)
1964        .await
1965        .map(|(hash, _)| hash)
1966}
1967
1968async fn hash_file_with_chunks(path: &Path, chunk_size: u64) -> Result<(String, Vec<String>)> {
1969    let mut file = tokio::fs::File::open(path)
1970        .await
1971        .map_err(|source| Error::IoPath {
1972            path: path.to_path_buf(),
1973            source,
1974        })?;
1975    let mut hasher = blake3::Hasher::new();
1976    let mut chunk_hasher = blake3::Hasher::new();
1977    let mut chunk_bytes = 0u64;
1978    let mut chunks = Vec::new();
1979    let mut buffer = vec![0; IO_BUFFER_SIZE];
1980
1981    loop {
1982        let read = file
1983            .read(&mut buffer)
1984            .await
1985            .map_err(|source| Error::IoPath {
1986                path: path.to_path_buf(),
1987                source,
1988            })?;
1989        if read == 0 {
1990            break;
1991        }
1992        hasher.update(&buffer[..read]);
1993
1994        let mut remaining = &buffer[..read];
1995        while !remaining.is_empty() {
1996            let take = remaining.len().min((chunk_size - chunk_bytes) as usize);
1997            chunk_hasher.update(&remaining[..take]);
1998            chunk_bytes += take as u64;
1999            remaining = &remaining[take..];
2000            if chunk_bytes == chunk_size {
2001                chunks.push(chunk_hasher.finalize().to_hex().to_string());
2002                chunk_hasher = blake3::Hasher::new();
2003                chunk_bytes = 0;
2004            }
2005        }
2006    }
2007
2008    if chunk_bytes > 0 {
2009        chunks.push(chunk_hasher.finalize().to_hex().to_string());
2010    }
2011
2012    Ok((hasher.finalize().to_hex().to_string(), chunks))
2013}
2014
2015async fn verified_prefix_offset(
2016    path: &Path,
2017    entry: &ManifestEntry,
2018    existing_len: u64,
2019    chunk_size: u64,
2020) -> Result<u64> {
2021    if existing_len == 0 || entry.chunks.is_empty() {
2022        return Ok(0);
2023    }
2024
2025    let chunks_to_check = (existing_len / chunk_size).min(entry.chunks.len() as u64) as usize;
2026    if chunks_to_check == 0 {
2027        return Ok(0);
2028    }
2029
2030    let mut file = tokio::fs::File::open(path)
2031        .await
2032        .map_err(|source| Error::IoPath {
2033            path: path.to_path_buf(),
2034            source,
2035        })?;
2036    let mut buffer = vec![0; IO_BUFFER_SIZE];
2037    let mut verified_chunks = 0usize;
2038
2039    for expected in entry.chunks.iter().take(chunks_to_check) {
2040        let mut remaining = chunk_size;
2041        let mut hasher = blake3::Hasher::new();
2042        while remaining > 0 {
2043            let read_size = buffer.len().min(remaining as usize);
2044            file.read_exact(&mut buffer[..read_size])
2045                .await
2046                .map_err(|source| Error::IoPath {
2047                    path: path.to_path_buf(),
2048                    source,
2049                })?;
2050            hasher.update(&buffer[..read_size]);
2051            remaining -= read_size as u64;
2052        }
2053
2054        if hasher.finalize().to_hex().as_str() != expected {
2055            break;
2056        }
2057        verified_chunks += 1;
2058    }
2059
2060    Ok(verified_chunks as u64 * chunk_size)
2061}
2062
2063fn summary_from_files(
2064    manifest: &TransferManifest,
2065    files: Vec<TransferFileSummary>,
2066) -> Result<TransferSummary> {
2067    let bytes = manifest.total_file_bytes();
2068    let blake3 = if files.len() == 1 {
2069        files[0].blake3.clone()
2070    } else {
2071        manifest.digest()?
2072    };
2073    let file_name = if files.len() == 1 {
2074        display_path(&files[0].relative_path)
2075    } else {
2076        format!("{} files", files.len())
2077    };
2078    let path = files
2079        .first()
2080        .map(|file| file.path.clone())
2081        .unwrap_or_default();
2082
2083    Ok(TransferSummary {
2084        path,
2085        file_name,
2086        bytes,
2087        blake3,
2088        files,
2089    })
2090}
2091
2092fn temp_path_for(final_path: &Path) -> PathBuf {
2093    let file_name = final_path
2094        .file_name()
2095        .and_then(|name| name.to_str())
2096        .unwrap_or("transfer");
2097    final_path.with_file_name(format!(".{file_name}.{}.ferry-part", Uuid::new_v4()))
2098}
2099
2100fn modified_unix_ms(metadata: &Metadata) -> Option<i128> {
2101    system_time_unix_ms(metadata.modified().ok()?)
2102}
2103
2104fn system_time_unix_ms(time: SystemTime) -> Option<i128> {
2105    match time.duration_since(UNIX_EPOCH) {
2106        Ok(duration) => Some(duration.as_millis() as i128),
2107        Err(error) => Some(-(error.duration().as_millis() as i128)),
2108    }
2109}
2110
2111#[cfg(unix)]
2112fn unix_mode(metadata: &Metadata) -> Option<u32> {
2113    use std::os::unix::fs::PermissionsExt;
2114
2115    Some(metadata.permissions().mode())
2116}
2117
2118#[cfg(not(unix))]
2119fn unix_mode(_metadata: &Metadata) -> Option<u32> {
2120    None
2121}
2122
2123fn insecure_client_config() -> Result<quinn::ClientConfig> {
2124    let crypto = rustls::ClientConfig::builder()
2125        .dangerous()
2126        .with_custom_certificate_verifier(SkipServerVerification::new())
2127        .with_no_client_auth();
2128
2129    Ok(quinn::ClientConfig::new(Arc::new(
2130        QuicClientConfig::try_from(crypto)
2131            .map_err(|error| Error::CryptoConfig(error.to_string()))?,
2132    )))
2133}
2134
2135#[derive(Debug)]
2136struct SkipServerVerification(Arc<CryptoProvider>);
2137
2138impl SkipServerVerification {
2139    fn new() -> Arc<Self> {
2140        Arc::new(Self(Arc::new(rustls::crypto::ring::default_provider())))
2141    }
2142}
2143
2144impl ServerCertVerifier for SkipServerVerification {
2145    fn verify_server_cert(
2146        &self,
2147        _end_entity: &CertificateDer<'_>,
2148        _intermediates: &[CertificateDer<'_>],
2149        _server_name: &ServerName<'_>,
2150        _ocsp: &[u8],
2151        _now: UnixTime,
2152    ) -> std::result::Result<ServerCertVerified, rustls::Error> {
2153        Ok(ServerCertVerified::assertion())
2154    }
2155
2156    fn verify_tls12_signature(
2157        &self,
2158        message: &[u8],
2159        cert: &CertificateDer<'_>,
2160        dss: &DigitallySignedStruct,
2161    ) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
2162        verify_tls12_signature(
2163            message,
2164            cert,
2165            dss,
2166            &self.0.signature_verification_algorithms,
2167        )
2168    }
2169
2170    fn verify_tls13_signature(
2171        &self,
2172        message: &[u8],
2173        cert: &CertificateDer<'_>,
2174        dss: &DigitallySignedStruct,
2175    ) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
2176        verify_tls13_signature(
2177            message,
2178            cert,
2179            dss,
2180            &self.0.signature_verification_algorithms,
2181        )
2182    }
2183
2184    fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
2185        self.0.signature_verification_algorithms.supported_schemes()
2186    }
2187}
2188
2189fn ensure_rustls_provider() {
2190    let _ = rustls::crypto::ring::default_provider().install_default();
2191}
2192
2193pub fn config_dir() -> Result<PathBuf> {
2194    let base_dirs = BaseDirs::new().ok_or(Error::ConfigDirUnavailable)?;
2195    Ok(base_dirs.config_dir().join(CONFIG_DIR_NAME))
2196}
2197
2198fn default_alias() -> String {
2199    std::env::var("HOSTNAME")
2200        .or_else(|_| std::env::var("COMPUTERNAME"))
2201        .ok()
2202        .filter(|value| !value.trim().is_empty())
2203        .unwrap_or_else(|| "fileferry-device".to_string())
2204}
2205
2206fn expand_home_path(path: &str) -> Result<PathBuf> {
2207    if path == "~" {
2208        return Ok(BaseDirs::new()
2209            .ok_or(Error::ConfigDirUnavailable)?
2210            .home_dir()
2211            .to_path_buf());
2212    }
2213
2214    if let Some(rest) = path.strip_prefix("~/") {
2215        return Ok(BaseDirs::new()
2216            .ok_or(Error::ConfigDirUnavailable)?
2217            .home_dir()
2218            .join(rest));
2219    }
2220
2221    Ok(PathBuf::from(path))
2222}
2223
2224fn normalized_fingerprint(fingerprint: String) -> Result<String> {
2225    let fingerprint = fingerprint.trim().to_ascii_lowercase();
2226    if fingerprint.len() != 64 || !fingerprint.chars().all(|char| char.is_ascii_hexdigit()) {
2227        return Err(Error::InvalidInput(
2228            "peer fingerprint must be a full 64-character hex BLAKE3 fingerprint".to_string(),
2229        ));
2230    }
2231
2232    Ok(fingerprint)
2233}
2234
2235fn write_toml_file<T: Serialize>(path: &Path, value: &T) -> Result<()> {
2236    let bytes =
2237        toml::to_string_pretty(value).map_err(|error| Error::ConfigSerialize(error.to_string()))?;
2238    if let Some(parent) = path.parent() {
2239        std::fs::create_dir_all(parent).map_err(|source| Error::IoPath {
2240            path: parent.to_path_buf(),
2241            source,
2242        })?;
2243    }
2244
2245    let temp_path = path.with_extension(format!("tmp-{}", Uuid::new_v4()));
2246    std::fs::write(&temp_path, bytes).map_err(|source| Error::IoPath {
2247        path: temp_path.clone(),
2248        source,
2249    })?;
2250    std::fs::rename(&temp_path, path).map_err(|source| Error::IoPath {
2251        path: path.to_path_buf(),
2252        source,
2253    })?;
2254    Ok(())
2255}
2256
2257fn write_private_file(path: &Path, bytes: &[u8]) -> Result<()> {
2258    std::fs::write(path, bytes).map_err(|source| Error::IoPath {
2259        path: path.to_path_buf(),
2260        source,
2261    })?;
2262
2263    #[cfg(unix)]
2264    {
2265        use std::os::unix::fs::PermissionsExt;
2266
2267        let mut permissions = std::fs::metadata(path)
2268            .map_err(|source| Error::IoPath {
2269                path: path.to_path_buf(),
2270                source,
2271            })?
2272            .permissions();
2273        permissions.set_mode(0o600);
2274        std::fs::set_permissions(path, permissions).map_err(|source| Error::IoPath {
2275            path: path.to_path_buf(),
2276            source,
2277        })?;
2278    }
2279
2280    Ok(())
2281}
2282
2283#[cfg(test)]
2284mod tests {
2285    use super::*;
2286    use std::sync::{Arc, Mutex};
2287
2288    #[test]
2289    fn direct_peer_parses_socket_address() {
2290        let peer = DirectPeer::parse("127.0.0.1:53318").expect("address parses");
2291
2292        assert_eq!(peer.address().to_string(), "127.0.0.1:53318");
2293    }
2294
2295    #[test]
2296    fn direct_peer_rejects_missing_port() {
2297        assert!(DirectPeer::parse("127.0.0.1").is_err());
2298    }
2299
2300    #[test]
2301    fn send_request_requires_at_least_one_path() {
2302        let peer = DirectPeer::parse("127.0.0.1:53318").expect("address parses");
2303
2304        assert!(SendRequest::new(peer, Vec::new()).is_err());
2305    }
2306
2307    #[test]
2308    fn validate_send_paths_rejects_empty_slice() {
2309        assert!(validate_send_paths(&[]).is_err());
2310    }
2311
2312    #[tokio::test]
2313    async fn manifest_walks_directory_and_serializes_entries() {
2314        let temp = tempfile::tempdir().expect("tempdir");
2315        let root = temp.path().join("bundle");
2316        let nested = root.join("nested");
2317        tokio::fs::create_dir_all(&nested).await.expect("dirs");
2318        tokio::fs::write(root.join("a.txt"), b"alpha")
2319            .await
2320            .expect("file a");
2321        tokio::fs::write(nested.join("b.txt"), b"beta")
2322            .await
2323            .expect("file b");
2324
2325        let manifest = build_manifest(&[root]).await.expect("manifest");
2326        let json = serde_json::to_string(&manifest).expect("manifest serializes");
2327        let decoded: TransferManifest = serde_json::from_str(&json).expect("manifest decodes");
2328
2329        assert_eq!(decoded.version, MANIFEST_VERSION);
2330        assert!(decoded.entries.iter().any(|entry| {
2331            entry.kind == ManifestEntryKind::Directory
2332                && entry.relative_path == Path::new("bundle/nested")
2333        }));
2334        assert!(decoded.entries.iter().any(|entry| {
2335            entry.kind == ManifestEntryKind::File
2336                && entry.relative_path == Path::new("bundle/nested/b.txt")
2337                && entry.size == 4
2338                && entry.blake3.is_some()
2339        }));
2340    }
2341
2342    #[test]
2343    fn safe_destination_path_rejects_unsafe_paths() {
2344        let dest = Path::new("/tmp/ferry-dest");
2345
2346        assert!(safe_destination_path(dest, Path::new("../escape.txt")).is_err());
2347        assert!(safe_destination_path(dest, Path::new("/absolute.txt")).is_err());
2348        assert!(safe_destination_path(dest, Path::new("nested/CON")).is_err());
2349        assert!(safe_destination_path(dest, Path::new("nested\\escape.txt")).is_err());
2350        assert!(safe_destination_path(dest, Path::new("nested/ok.txt")).is_ok());
2351    }
2352
2353    #[test]
2354    fn safe_destination_path_rejects_generated_escape_and_reserved_patterns() {
2355        let dest = Path::new("/tmp/ferry-dest");
2356        let unsafe_components = [
2357            "..",
2358            ".",
2359            "CON",
2360            "con.txt",
2361            "NUL",
2362            "COM1",
2363            "LPT9",
2364            "trailingspace ",
2365            "trailingdot.",
2366            "has:colon",
2367            "has\\backslash",
2368        ];
2369
2370        for component in unsafe_components {
2371            assert!(
2372                safe_destination_path(dest, Path::new(component)).is_err(),
2373                "{component} should be rejected as a top-level path"
2374            );
2375            if component != "." {
2376                assert!(
2377                    safe_destination_path(dest, Path::new("safe").join(component).as_path())
2378                        .is_err(),
2379                    "{component} should be rejected as a nested path"
2380                );
2381            }
2382        }
2383
2384        for component in ["alpha", "alpha-1", "alpha_1", "report.final.txt"] {
2385            let path = safe_destination_path(dest, Path::new("safe").join(component).as_path())
2386                .expect("safe generated path accepted");
2387            assert!(path.starts_with(dest));
2388        }
2389    }
2390
2391    #[tokio::test]
2392    async fn resume_decision_skips_existing_matching_file() {
2393        let temp = tempfile::tempdir().expect("tempdir");
2394        let source = temp.path().join("source.txt");
2395        let dest = temp.path().join("dest");
2396        tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2397        tokio::fs::write(&source, b"same contents")
2398            .await
2399            .expect("source");
2400        tokio::fs::write(dest.join("source.txt"), b"same contents")
2401            .await
2402            .expect("dest file");
2403        let manifest = build_manifest(&[source]).await.expect("manifest");
2404        let entry = manifest.file_entries().next().expect("file entry");
2405
2406        let decision = decide_resume(&dest, entry, manifest.chunk_size)
2407            .await
2408            .expect("decision");
2409
2410        assert_eq!(decision, ResumeDecision::Skip);
2411    }
2412
2413    #[tokio::test]
2414    async fn resume_decision_returns_verified_chunk_offset() {
2415        let temp = tempfile::tempdir().expect("tempdir");
2416        let source = temp.path().join("large.bin");
2417        let dest = temp.path().join("dest");
2418        tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2419        let bytes = vec![42; (DEFAULT_CHUNK_SIZE as usize * 2) + 128];
2420        tokio::fs::write(&source, &bytes).await.expect("source");
2421        tokio::fs::write(
2422            dest.join("large.bin"),
2423            &bytes[..DEFAULT_CHUNK_SIZE as usize + 99],
2424        )
2425        .await
2426        .expect("partial");
2427        let manifest = build_manifest(&[source]).await.expect("manifest");
2428        let entry = manifest.file_entries().next().expect("file entry");
2429
2430        let decision = decide_resume(&dest, entry, manifest.chunk_size)
2431            .await
2432            .expect("decision");
2433
2434        assert_eq!(decision, ResumeDecision::ResumeFrom(DEFAULT_CHUNK_SIZE));
2435    }
2436
2437    #[tokio::test]
2438    async fn resume_decision_rejects_existing_file_with_mismatched_contents() {
2439        let temp = tempfile::tempdir().expect("tempdir");
2440        let source = temp.path().join("source.txt");
2441        let dest = temp.path().join("dest");
2442        tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2443        tokio::fs::write(&source, b"expected contents")
2444            .await
2445            .expect("source");
2446        tokio::fs::write(dest.join("source.txt"), b"different contents")
2447            .await
2448            .expect("conflicting dest file");
2449        let manifest = build_manifest(&[source]).await.expect("manifest");
2450        let entry = manifest.file_entries().next().expect("file entry");
2451
2452        let decision = decide_resume(&dest, entry, manifest.chunk_size)
2453            .await
2454            .expect("decision");
2455
2456        assert!(matches!(decision, ResumeDecision::Conflict(_)));
2457    }
2458
2459    #[tokio::test]
2460    async fn resume_decision_rejects_existing_file_larger_than_manifest_entry() {
2461        let temp = tempfile::tempdir().expect("tempdir");
2462        let source = temp.path().join("source.txt");
2463        let dest = temp.path().join("dest");
2464        tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2465        tokio::fs::write(&source, b"small").await.expect("source");
2466        tokio::fs::write(dest.join("source.txt"), b"larger destination")
2467            .await
2468            .expect("larger dest file");
2469        let manifest = build_manifest(&[source]).await.expect("manifest");
2470        let entry = manifest.file_entries().next().expect("file entry");
2471
2472        let decision = decide_resume(&dest, entry, manifest.chunk_size)
2473            .await
2474            .expect("decision");
2475
2476        assert!(matches!(decision, ResumeDecision::Conflict(_)));
2477    }
2478
2479    #[test]
2480    fn identity_is_loaded_after_generation() {
2481        let temp = tempfile::tempdir().expect("tempdir");
2482        let generated =
2483            NativeIdentity::load_or_generate_in(temp.path()).expect("identity generated");
2484        let loaded = NativeIdentity::load_or_generate_in(temp.path()).expect("identity loaded");
2485
2486        assert_eq!(generated.fingerprint(), loaded.fingerprint());
2487    }
2488
2489    #[test]
2490    fn app_config_round_trips_toml() {
2491        let temp = tempfile::tempdir().expect("tempdir");
2492        let path = temp.path().join("config.toml");
2493        let config = AppConfig {
2494            alias: "desk".to_string(),
2495            ..AppConfig::default()
2496        };
2497
2498        config.save_to_path(&path).expect("config saved");
2499        let loaded = AppConfig::load_or_default_from(&path).expect("config loaded");
2500
2501        assert_eq!(loaded.alias, "desk");
2502        assert_eq!(loaded.listen_port, 53317);
2503        assert!(loaded.trust.require_fingerprint);
2504    }
2505
2506    #[test]
2507    fn app_config_redacts_psk_for_display_output() {
2508        let config = AppConfig {
2509            trust: TrustConfig {
2510                psk: "super-secret-psk".to_string(),
2511                ..TrustConfig::default()
2512            },
2513            ..AppConfig::default()
2514        };
2515
2516        let redacted = config.to_redacted_toml_string().expect("config serializes");
2517
2518        assert!(!redacted.contains("super-secret-psk"));
2519        assert!(redacted.contains(r#"psk = "<redacted>""#));
2520    }
2521
2522    #[test]
2523    fn daemon_config_defaults_from_app_config_and_round_trips_toml() {
2524        let temp = tempfile::tempdir().expect("tempdir");
2525        let path = temp.path().join("daemon.toml");
2526        let app_config = AppConfig {
2527            quic_port: 54444,
2528            download_dir: temp.path().join("downloads").display().to_string(),
2529            ..AppConfig::default()
2530        };
2531
2532        let defaulted =
2533            DaemonConfig::load_or_default_from(&path, &app_config).expect("daemon config");
2534        assert_eq!(
2535            defaulted.listen,
2536            SocketAddr::from(([0, 0, 0, 0], app_config.quic_port))
2537        );
2538        assert_eq!(defaulted.destination, temp.path().join("downloads"));
2539
2540        let config = DaemonConfig {
2541            listen: SocketAddr::from(([127, 0, 0, 1], 53318)),
2542            destination: temp.path().join("daemon-dest"),
2543        };
2544        config.save_to_path(&path).expect("daemon config saved");
2545        let loaded =
2546            DaemonConfig::load_or_default_from(&path, &app_config).expect("daemon config loaded");
2547
2548        assert_eq!(loaded, config);
2549    }
2550
2551    #[test]
2552    fn trust_store_load_save_and_forget_round_trip() {
2553        let temp = tempfile::tempdir().expect("tempdir");
2554        let path = temp.path().join("known_peers.toml");
2555        let fingerprint = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
2556        let mut store = TrustStore::default();
2557
2558        store
2559            .trust_fingerprint(fingerprint)
2560            .expect("fingerprint trusted");
2561        store.save_to_path(&path).expect("trust store saved");
2562        let mut loaded = TrustStore::load_or_default_from(&path).expect("trust store loaded");
2563
2564        assert_eq!(loaded.records().count(), 1);
2565        assert_eq!(loaded.peers[fingerprint].trust_state, TrustState::Trusted);
2566        assert!(loaded.forget(fingerprint));
2567        assert_eq!(loaded.records().count(), 0);
2568    }
2569
2570    #[test]
2571    fn trust_store_rejects_short_fingerprint_without_discovery_lookup() {
2572        let mut store = TrustStore::default();
2573
2574        assert!(store.trust_fingerprint("9a4f2c").is_err());
2575    }
2576
2577    #[test]
2578    fn transfer_events_serialize_with_stable_event_names() {
2579        let event = TransferEvent::Progress {
2580            direction: TransferDirection::Send,
2581            file_name: "bundle/a.txt".to_string(),
2582            bytes_done: 5,
2583            bytes_total: 9,
2584        };
2585
2586        let json = serde_json::to_value(&event).expect("event serializes");
2587
2588        assert_eq!(json["event"], "progress");
2589        assert_eq!(json["direction"], "send");
2590        assert_eq!(json["file_name"], "bundle/a.txt");
2591        assert_eq!(json["bytes_done"], 5);
2592        assert_eq!(json["bytes_total"], 9);
2593    }
2594
2595    #[test]
2596    fn direct_protocol_hello_has_stable_golden_json() {
2597        let hello = DirectProtocolHello {
2598            protocol_version: 1,
2599            min_protocol_version: 1,
2600            client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2601                .to_string(),
2602        };
2603
2604        let json = serde_json::to_string(&hello).expect("hello serializes");
2605
2606        assert_eq!(
2607            json,
2608            r#"{"protocol_version":1,"min_protocol_version":1,"client_fingerprint":"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"}"#
2609        );
2610    }
2611
2612    #[test]
2613    fn direct_protocol_response_has_stable_golden_json() {
2614        let response = DirectProtocolResponse::accept(1);
2615
2616        let json = serde_json::to_string(&response).expect("response serializes");
2617
2618        assert_eq!(
2619            json,
2620            r#"{"accepted":true,"protocol_version":1,"error":null}"#
2621        );
2622    }
2623
2624    #[test]
2625    fn direct_protocol_negotiates_supported_overlap() {
2626        let hello = DirectProtocolHello {
2627            protocol_version: 3,
2628            min_protocol_version: 1,
2629            client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2630                .to_string(),
2631        };
2632
2633        let response = negotiate_direct_protocol(&hello);
2634
2635        assert_eq!(response, DirectProtocolResponse::accept(1));
2636    }
2637
2638    #[test]
2639    fn direct_protocol_rejects_incompatible_versions() {
2640        let too_new = DirectProtocolHello {
2641            protocol_version: 3,
2642            min_protocol_version: 2,
2643            client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2644                .to_string(),
2645        };
2646        let too_old = DirectProtocolHello {
2647            protocol_version: 0,
2648            min_protocol_version: 0,
2649            client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2650                .to_string(),
2651        };
2652
2653        assert!(!negotiate_direct_protocol(&too_new).accepted);
2654        assert!(!negotiate_direct_protocol(&too_old).accepted);
2655    }
2656
2657    #[test]
2658    fn transfer_manifest_has_stable_golden_json() {
2659        let manifest = TransferManifest {
2660            version: MANIFEST_VERSION,
2661            session_id: Uuid::parse_str("00000000-0000-0000-0000-000000000001")
2662                .expect("uuid parses"),
2663            chunk_size: DEFAULT_CHUNK_SIZE,
2664            entries: vec![ManifestEntry {
2665                relative_path: PathBuf::from("bundle/a.txt"),
2666                kind: ManifestEntryKind::File,
2667                size: 5,
2668                blake3: Some(
2669                    "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef".to_string(),
2670                ),
2671                chunks: vec![
2672                    "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789".to_string(),
2673                ],
2674                unix_mode: Some(0o100644),
2675                modified_unix_ms: Some(1_700_000_000_000),
2676            }],
2677        };
2678
2679        let json = serde_json::to_string(&manifest).expect("manifest serializes");
2680
2681        assert_eq!(
2682            json,
2683            r#"{"version":1,"session_id":"00000000-0000-0000-0000-000000000001","chunk_size":1048576,"entries":[{"relative_path":"bundle/a.txt","kind":"file","size":5,"blake3":"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef","chunks":["abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789"],"unix_mode":33188,"modified_unix_ms":1700000000000}]}"#
2684        );
2685    }
2686
2687    #[test]
2688    fn peer_registry_merges_observations_by_fingerprint() {
2689        let mut registry = PeerRegistry::new();
2690        let fingerprint = "abcdef1234567890";
2691        let first = PeerObservation::new(
2692            fingerprint,
2693            SocketAddr::from(([192, 168, 1, 10], 53318)),
2694            100,
2695        )
2696        .with_alias("desk")
2697        .with_hostname("desk.local")
2698        .with_transport("quic");
2699        let second = PeerObservation::new(
2700            fingerprint,
2701            SocketAddr::from(([192, 168, 1, 11], 53318)),
2702            200,
2703        )
2704        .with_alias("workstation")
2705        .with_transport("quic");
2706
2707        registry.observe(first).expect("first observation");
2708        registry.observe(second).expect("second observation");
2709        let record = registry
2710            .get_by_fingerprint(fingerprint)
2711            .expect("record by fingerprint");
2712
2713        assert_eq!(registry.records().count(), 1);
2714        assert!(record.aliases.contains("desk"));
2715        assert!(record.aliases.contains("workstation"));
2716        assert!(record.hostnames.contains("desk.local"));
2717        assert_eq!(record.addresses.len(), 2);
2718        assert_eq!(record.first_seen_unix_ms, 100);
2719        assert_eq!(record.last_seen_unix_ms, 200);
2720    }
2721
2722    #[test]
2723    fn peer_registry_looks_up_unique_fingerprint_prefix_alias_and_hostname() {
2724        let mut registry = PeerRegistry::new();
2725        registry
2726            .observe(
2727                PeerObservation::new(
2728                    "abcdef1234567890",
2729                    SocketAddr::from(([192, 168, 1, 10], 53318)),
2730                    100,
2731                )
2732                .with_alias("desk")
2733                .with_hostname("desk.local"),
2734            )
2735            .expect("first observation");
2736        registry
2737            .observe(
2738                PeerObservation::new(
2739                    "123456abcdef7890",
2740                    SocketAddr::from(([192, 168, 1, 20], 53318)),
2741                    100,
2742                )
2743                .with_alias("laptop"),
2744            )
2745            .expect("second observation");
2746
2747        assert_eq!(
2748            registry
2749                .lookup("abcdef")
2750                .map(|record| record.fingerprint.as_str()),
2751            Some("abcdef1234567890")
2752        );
2753        assert_eq!(
2754            registry
2755                .lookup("desk")
2756                .map(|record| record.fingerprint.as_str()),
2757            Some("abcdef1234567890")
2758        );
2759        assert_eq!(
2760            registry
2761                .lookup("desk.local")
2762                .map(|record| record.fingerprint.as_str()),
2763            Some("abcdef1234567890")
2764        );
2765        assert!(registry.lookup("missing").is_none());
2766    }
2767
2768    #[test]
2769    fn peer_registry_rejects_ambiguous_lookup_hints() {
2770        let mut registry = PeerRegistry::new();
2771        registry
2772            .observe(
2773                PeerObservation::new(
2774                    "abcdef1234567890",
2775                    SocketAddr::from(([192, 168, 1, 10], 53318)),
2776                    100,
2777                )
2778                .with_alias("desk"),
2779            )
2780            .expect("first observation");
2781        registry
2782            .observe(
2783                PeerObservation::new(
2784                    "abcdef9999999999",
2785                    SocketAddr::from(([192, 168, 1, 11], 53318)),
2786                    100,
2787                )
2788                .with_alias("desk"),
2789            )
2790            .expect("second observation");
2791
2792        assert!(registry.lookup("abcdef").is_none());
2793        assert!(registry.lookup("desk").is_none());
2794        assert_eq!(
2795            registry
2796                .lookup("abcdef1234567890")
2797                .map(|record| record.fingerprint.as_str()),
2798            Some("abcdef1234567890")
2799        );
2800        match registry.lookup_detail("desk") {
2801            PeerLookup::Ambiguous(records) => assert_eq!(records.len(), 2),
2802            other => panic!("expected ambiguous duplicate-alias lookup, got {other:?}"),
2803        }
2804        assert!(matches!(
2805            registry.lookup_detail("missing"),
2806            PeerLookup::Missing
2807        ));
2808    }
2809
2810    #[test]
2811    fn native_announcement_builds_expected_txt_properties() {
2812        let announcement = NativeAnnouncement {
2813            alias: "Stephen MBP".to_string(),
2814            fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2815                .to_string(),
2816            quic_port: 53318,
2817            listen_port: Some(53317),
2818        };
2819
2820        let service = announcement.service_info().expect("service info");
2821
2822        assert_eq!(service.get_type(), NATIVE_SERVICE_TYPE);
2823        assert!(
2824            service
2825                .get_fullname()
2826                .starts_with("stephen-mbp-0123456789ab.")
2827        );
2828        assert_eq!(service.get_property_val_str("pv"), Some("1"));
2829        assert_eq!(service.get_property_val_str("minpv"), Some("1"));
2830        assert_eq!(
2831            service.get_property_val_str("fp"),
2832            Some("0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")
2833        );
2834        assert_eq!(service.get_property_val_str("alias"), Some("Stephen MBP"));
2835        assert_eq!(service.get_property_val_str("transports"), Some("quic"));
2836        assert_eq!(service.get_property_val_str("quic_port"), Some("53318"));
2837        assert_eq!(service.get_property_val_str("listen_port"), Some("53317"));
2838    }
2839
2840    #[test]
2841    fn resolved_native_service_merges_into_registry() {
2842        let properties = [
2843            ("pv", "1"),
2844            ("minpv", "1"),
2845            (
2846                "fp",
2847                "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
2848            ),
2849            ("alias", "desk"),
2850            ("transports", "quic"),
2851            ("quic_port", "53318"),
2852        ];
2853        let service = ServiceInfo::new(
2854            NATIVE_SERVICE_TYPE,
2855            "desk-0123456789ab",
2856            "desk.local.",
2857            "192.168.1.42",
2858            53318,
2859            &properties[..],
2860        )
2861        .expect("service info")
2862        .as_resolved_service();
2863        let mut registry = PeerRegistry::new();
2864
2865        observe_resolved_service(&mut registry, &service).expect("observation");
2866
2867        let record = registry.lookup("desk").expect("record by alias");
2868        assert_eq!(
2869            record.fingerprint,
2870            "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2871        );
2872        assert_eq!(
2873            record.preferred_quic_address(),
2874            Some(SocketAddr::from(([192, 168, 1, 42], 53318)))
2875        );
2876    }
2877
2878    #[test]
2879    fn resolved_native_service_ignores_incompatible_protocol_ranges() {
2880        let properties = [
2881            ("pv", "3"),
2882            ("minpv", "2"),
2883            (
2884                "fp",
2885                "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
2886            ),
2887            ("alias", "desk"),
2888            ("transports", "quic"),
2889            ("quic_port", "53318"),
2890        ];
2891        let service = ServiceInfo::new(
2892            NATIVE_SERVICE_TYPE,
2893            "desk-0123456789ab",
2894            "desk.local.",
2895            "192.168.1.42",
2896            53318,
2897            &properties[..],
2898        )
2899        .expect("service info")
2900        .as_resolved_service();
2901        let mut registry = PeerRegistry::new();
2902
2903        observe_resolved_service(&mut registry, &service).expect("observation");
2904
2905        assert_eq!(registry.records().count(), 0);
2906    }
2907
2908    #[tokio::test]
2909    async fn direct_quic_loopback_sends_one_file() {
2910        let source_dir = tempfile::tempdir().expect("source tempdir");
2911        let dest_dir = tempfile::tempdir().expect("dest tempdir");
2912        let identity_dir = tempfile::tempdir().expect("identity tempdir");
2913        let file_path = source_dir.path().join("hello.txt");
2914        tokio::fs::write(&file_path, b"hello from ferry")
2915            .await
2916            .expect("source file written");
2917
2918        let identity =
2919            NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
2920        let recv_identity = identity.clone();
2921        let reserved_socket =
2922            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
2923        let recv_addr = reserved_socket.local_addr().expect("local addr");
2924        drop(reserved_socket);
2925        let receive_progress = Arc::new(Mutex::new(Vec::new()));
2926        let receive_progress_log = receive_progress.clone();
2927        let dest_path = dest_dir.path().to_path_buf();
2928        let server = tokio::spawn(async move {
2929            receive_direct_file(
2930                &ReceiveRequest::new(recv_addr),
2931                dest_path,
2932                &recv_identity,
2933                |event| {
2934                    receive_progress_log
2935                        .lock()
2936                        .expect("progress lock")
2937                        .push(event);
2938                },
2939            )
2940            .await
2941        });
2942
2943        tokio::time::sleep(Duration::from_millis(100)).await;
2944
2945        let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
2946        let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
2947        let send_progress = Arc::new(Mutex::new(Vec::new()));
2948        let send_progress_log = send_progress.clone();
2949        let send_summary = send_direct_file(&send_request, &identity, |event| {
2950            send_progress_log.lock().expect("progress lock").push(event);
2951        })
2952        .await
2953        .expect("file sent");
2954        let receive_summary = server.await.expect("server joined").expect("file received");
2955
2956        let received = tokio::fs::read(dest_dir.path().join("hello.txt"))
2957            .await
2958            .expect("received file readable");
2959        assert_eq!(received, b"hello from ferry");
2960        assert_eq!(send_summary.bytes, receive_summary.bytes);
2961        assert_eq!(send_summary.blake3, receive_summary.blake3);
2962        let send_progress = send_progress.lock().expect("progress lock");
2963        let receive_progress = receive_progress.lock().expect("progress lock");
2964        assert!(matches!(
2965            send_progress.first(),
2966            Some(TransferEvent::SessionStarted {
2967                direction: TransferDirection::Send,
2968                ..
2969            })
2970        ));
2971        assert!(
2972            send_progress
2973                .iter()
2974                .any(|event| matches!(event, TransferEvent::Progress { .. }))
2975        );
2976        assert!(
2977            send_progress
2978                .iter()
2979                .any(|event| matches!(event, TransferEvent::SessionFinished { .. }))
2980        );
2981        assert!(matches!(
2982            receive_progress.first(),
2983            Some(TransferEvent::SessionStarted {
2984                direction: TransferDirection::Receive,
2985                ..
2986            })
2987        ));
2988        assert!(
2989            receive_progress
2990                .iter()
2991                .any(|event| matches!(event, TransferEvent::Progress { .. }))
2992        );
2993        assert!(
2994            receive_progress
2995                .iter()
2996                .any(|event| matches!(event, TransferEvent::SessionFinished { .. }))
2997        );
2998    }
2999
3000    #[tokio::test]
3001    async fn direct_quic_loopback_sends_directory_and_resumes_partial_file() {
3002        let source_dir = tempfile::tempdir().expect("source tempdir");
3003        let dest_dir = tempfile::tempdir().expect("dest tempdir");
3004        let identity_dir = tempfile::tempdir().expect("identity tempdir");
3005        let bundle = source_dir.path().join("bundle");
3006        let nested = bundle.join("nested");
3007        tokio::fs::create_dir_all(&nested)
3008            .await
3009            .expect("source dirs");
3010        tokio::fs::write(nested.join("small.txt"), b"small")
3011            .await
3012            .expect("small file");
3013        let large_bytes = vec![7; (DEFAULT_CHUNK_SIZE as usize * 2) + 17];
3014        tokio::fs::write(bundle.join("large.bin"), &large_bytes)
3015            .await
3016            .expect("large file");
3017
3018        let dest_bundle = dest_dir.path().join("bundle");
3019        tokio::fs::create_dir_all(&dest_bundle)
3020            .await
3021            .expect("dest bundle");
3022        tokio::fs::write(
3023            dest_bundle.join("large.bin"),
3024            &large_bytes[..DEFAULT_CHUNK_SIZE as usize + 5],
3025        )
3026        .await
3027        .expect("partial large");
3028
3029        let identity =
3030            NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3031        let recv_identity = identity.clone();
3032        let reserved_socket =
3033            std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3034        let recv_addr = reserved_socket.local_addr().expect("local addr");
3035        drop(reserved_socket);
3036        let dest_path = dest_dir.path().to_path_buf();
3037        let server = tokio::spawn(async move {
3038            receive_direct_file(
3039                &ReceiveRequest::new(recv_addr),
3040                dest_path,
3041                &recv_identity,
3042                |_| {},
3043            )
3044            .await
3045        });
3046
3047        tokio::time::sleep(Duration::from_millis(100)).await;
3048
3049        let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3050        let send_request = SendRequest::new(peer, vec![bundle]).expect("send request is valid");
3051        let send_summary = send_direct_file(&send_request, &identity, |_| {})
3052            .await
3053            .expect("directory sent");
3054        let receive_summary = server
3055            .await
3056            .expect("server joined")
3057            .expect("directory received");
3058
3059        let received_large = tokio::fs::read(dest_bundle.join("large.bin"))
3060            .await
3061            .expect("large file readable");
3062        let received_small = tokio::fs::read(dest_bundle.join("nested/small.txt"))
3063            .await
3064            .expect("small file readable");
3065        assert_eq!(received_large, large_bytes);
3066        assert_eq!(received_small, b"small");
3067        assert_eq!(send_summary.bytes, receive_summary.bytes);
3068        assert!(
3069            receive_summary
3070                .files
3071                .iter()
3072                .any(|file| file.status == TransferFileStatus::Resumed)
3073        );
3074    }
3075}