1pub mod error;
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::fs::Metadata;
5use std::io::SeekFrom;
6use std::net::SocketAddr;
7use std::path::{Component, Path, PathBuf};
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12use directories::BaseDirs;
13use mdns_sd::{ResolvedService, ServiceDaemon, ServiceEvent, ServiceInfo};
14use quinn::crypto::rustls::QuicClientConfig;
15use rcgen::{CertifiedKey, generate_simple_self_signed};
16use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier};
17use rustls::crypto::{CryptoProvider, verify_tls12_signature, verify_tls13_signature};
18use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer, ServerName, UnixTime};
19use rustls::{DigitallySignedStruct, SignatureScheme};
20use serde::{Deserialize, Serialize};
21use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
22use uuid::Uuid;
23
24pub use error::{Error, Result};
25
26pub const NATIVE_PROTOCOL_VERSION: u16 = 1;
27pub const MIN_NATIVE_PROTOCOL_VERSION: u16 = 1;
28pub const NATIVE_SERVICE_TYPE: &str = "_ferry._udp.local.";
29
30const DIRECT_MAGIC: &[u8; 8] = b"FERRY01\n";
31const MAX_FRAME_LEN: usize = 16 * 1024 * 1024;
32const IO_BUFFER_SIZE: usize = 64 * 1024;
33const MANIFEST_VERSION: u16 = 1;
34const DEFAULT_CHUNK_SIZE: u64 = 1024 * 1024;
35const CONFIG_DIR_NAME: &str = "ferry";
36
37#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
38pub struct DirectPeer {
39 address: SocketAddr,
40}
41
42impl DirectPeer {
43 pub fn parse(value: &str) -> Result<Self> {
44 Ok(Self {
45 address: value.parse()?,
46 })
47 }
48
49 pub const fn address(&self) -> SocketAddr {
50 self.address
51 }
52
53 pub const fn from_address(address: SocketAddr) -> Self {
54 Self { address }
55 }
56}
57
58#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
59pub struct SendRequest {
60 peer: DirectPeer,
61 paths: Vec<PathBuf>,
62}
63
64impl SendRequest {
65 pub fn new(peer: DirectPeer, paths: Vec<PathBuf>) -> Result<Self> {
66 if paths.is_empty() {
67 return Err(Error::InvalidInput(
68 "at least one file path is required".to_string(),
69 ));
70 }
71
72 Ok(Self { peer, paths })
73 }
74
75 pub const fn peer(&self) -> &DirectPeer {
76 &self.peer
77 }
78
79 pub fn paths(&self) -> &[PathBuf] {
80 &self.paths
81 }
82}
83
84#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
85pub struct ReceiveRequest {
86 listen: SocketAddr,
87}
88
89impl ReceiveRequest {
90 pub fn new(listen: SocketAddr) -> Self {
91 Self { listen }
92 }
93
94 pub const fn listen(&self) -> SocketAddr {
95 self.listen
96 }
97}
98
99#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
100pub struct AppConfig {
101 pub alias: String,
102 pub listen_port: u16,
103 pub quic_port: u16,
104 pub download_dir: String,
105 pub auto_accept_known: bool,
106 pub auto_accept_unknown: bool,
107 pub discovery: Vec<String>,
108 pub max_concurrent_files: usize,
109 pub trust: TrustConfig,
110}
111
112impl Default for AppConfig {
113 fn default() -> Self {
114 Self {
115 alias: default_alias(),
116 listen_port: 53317,
117 quic_port: 53318,
118 download_dir: "~/Downloads/ferry".to_string(),
119 auto_accept_known: true,
120 auto_accept_unknown: false,
121 discovery: vec!["native".to_string()],
122 max_concurrent_files: 8,
123 trust: TrustConfig::default(),
124 }
125 }
126}
127
128impl AppConfig {
129 pub fn load_or_default() -> Result<Self> {
130 Self::load_or_default_from(config_dir()?.join("config.toml"))
131 }
132
133 pub fn load_or_default_from(path: impl AsRef<Path>) -> Result<Self> {
134 let path = path.as_ref();
135 match std::fs::read_to_string(path) {
136 Ok(contents) => {
137 toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
138 }
139 Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Self::default()),
140 Err(source) => Err(Error::IoPath {
141 path: path.to_path_buf(),
142 source,
143 }),
144 }
145 }
146
147 pub fn save_default_path(&self) -> Result<()> {
148 self.save_to_path(config_dir()?.join("config.toml"))
149 }
150
151 pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
152 write_toml_file(path.as_ref(), self)
153 }
154
155 pub fn to_toml_string(&self) -> Result<String> {
156 toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
157 }
158
159 pub fn redacted(&self) -> Self {
160 let mut config = self.clone();
161 config.trust = config.trust.redacted();
162 config
163 }
164
165 pub fn to_redacted_toml_string(&self) -> Result<String> {
166 self.redacted().to_toml_string()
167 }
168}
169
170#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
171pub struct DaemonConfig {
172 pub listen: SocketAddr,
173 pub destination: PathBuf,
174}
175
176impl DaemonConfig {
177 pub fn from_app_config(config: &AppConfig) -> Result<Self> {
178 Ok(Self {
179 listen: SocketAddr::from(([0, 0, 0, 0], config.quic_port)),
180 destination: expand_home_path(&config.download_dir)?,
181 })
182 }
183
184 pub fn load_or_default() -> Result<Self> {
185 let app_config = AppConfig::load_or_default()?;
186 Self::load_or_default_from(config_dir()?.join("daemon.toml"), &app_config)
187 }
188
189 pub fn load_or_default_from(path: impl AsRef<Path>, app_config: &AppConfig) -> Result<Self> {
190 let path = path.as_ref();
191 match std::fs::read_to_string(path) {
192 Ok(contents) => {
193 toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
194 }
195 Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
196 Self::from_app_config(app_config)
197 }
198 Err(source) => Err(Error::IoPath {
199 path: path.to_path_buf(),
200 source,
201 }),
202 }
203 }
204
205 pub fn save_default_path(&self) -> Result<()> {
206 self.save_to_path(config_dir()?.join("daemon.toml"))
207 }
208
209 pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
210 write_toml_file(path.as_ref(), self)
211 }
212
213 pub fn to_toml_string(&self) -> Result<String> {
214 toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
215 }
216}
217
218#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
219pub struct TrustConfig {
220 pub require_fingerprint: bool,
221 pub psk: String,
222}
223
224impl Default for TrustConfig {
225 fn default() -> Self {
226 Self {
227 require_fingerprint: true,
228 psk: String::new(),
229 }
230 }
231}
232
233impl TrustConfig {
234 pub fn redacted(&self) -> Self {
235 let psk = if self.psk.is_empty() {
236 String::new()
237 } else {
238 "<redacted>".to_string()
239 };
240 Self {
241 require_fingerprint: self.require_fingerprint,
242 psk,
243 }
244 }
245}
246
247#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
248pub struct NativeIdentity {
249 cert_der: Vec<u8>,
250 key_der: Vec<u8>,
251 fingerprint: String,
252}
253
254impl NativeIdentity {
255 pub fn load_or_generate() -> Result<Self> {
256 Self::load_or_generate_in(config_dir()?)
257 }
258
259 pub fn load_or_generate_in(config_dir: impl AsRef<Path>) -> Result<Self> {
260 let config_dir = config_dir.as_ref();
261 let cert_path = config_dir.join("identity.cert.der");
262 let key_path = config_dir.join("identity.key.der");
263
264 if cert_path.exists() && key_path.exists() {
265 let cert_der = std::fs::read(&cert_path).map_err(|source| Error::IoPath {
266 path: cert_path,
267 source,
268 })?;
269 let key_der = std::fs::read(&key_path).map_err(|source| Error::IoPath {
270 path: key_path,
271 source,
272 })?;
273
274 return Ok(Self::from_parts(cert_der, key_der));
275 }
276
277 std::fs::create_dir_all(config_dir).map_err(|source| Error::IoPath {
278 path: config_dir.to_path_buf(),
279 source,
280 })?;
281
282 let identity = Self::generate()?;
283 write_private_file(&cert_path, &identity.cert_der)?;
284 write_private_file(&key_path, &identity.key_der)?;
285 Ok(identity)
286 }
287
288 pub fn generate() -> Result<Self> {
289 let CertifiedKey { cert, signing_key } =
290 generate_simple_self_signed(vec!["localhost".to_string()])?;
291 Ok(Self::from_parts(
292 cert.der().to_vec(),
293 signing_key.serialize_der(),
294 ))
295 }
296
297 pub fn fingerprint(&self) -> &str {
298 &self.fingerprint
299 }
300
301 fn cert_chain(&self) -> Vec<CertificateDer<'static>> {
302 vec![CertificateDer::from(self.cert_der.clone())]
303 }
304
305 fn private_key(&self) -> PrivateKeyDer<'static> {
306 PrivateKeyDer::from(PrivatePkcs8KeyDer::from(self.key_der.clone()))
307 }
308
309 fn from_parts(cert_der: Vec<u8>, key_der: Vec<u8>) -> Self {
310 let fingerprint = blake3::hash(&cert_der).to_hex().to_string();
311 Self {
312 cert_der,
313 key_der,
314 fingerprint,
315 }
316 }
317}
318
319#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
320#[serde(rename_all = "snake_case")]
321pub enum TransferDirection {
322 Send,
323 Receive,
324}
325
326#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
327pub struct TransferProgress {
328 pub direction: TransferDirection,
329 pub file_name: String,
330 pub bytes_done: u64,
331 pub bytes_total: u64,
332}
333
334#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
335#[serde(tag = "event", rename_all = "snake_case")]
336pub enum TransferEvent {
337 SessionStarted {
338 direction: TransferDirection,
339 session_id: Uuid,
340 files_total: usize,
341 bytes_total: u64,
342 },
343 FileStarted {
344 direction: TransferDirection,
345 file_name: String,
346 bytes_total: u64,
347 resume_offset: u64,
348 },
349 Progress {
350 direction: TransferDirection,
351 file_name: String,
352 bytes_done: u64,
353 bytes_total: u64,
354 },
355 FileFinished {
356 direction: TransferDirection,
357 file_name: String,
358 bytes: u64,
359 blake3: String,
360 status: TransferFileStatus,
361 },
362 SessionFinished {
363 direction: TransferDirection,
364 files_total: usize,
365 bytes_total: u64,
366 blake3: String,
367 },
368 SessionCancelled {
369 direction: TransferDirection,
370 session_id: Uuid,
371 },
372}
373
374impl TransferEvent {
375 pub fn progress(&self) -> Option<TransferProgress> {
376 match self {
377 Self::Progress {
378 direction,
379 file_name,
380 bytes_done,
381 bytes_total,
382 } => Some(TransferProgress {
383 direction: *direction,
384 file_name: file_name.clone(),
385 bytes_done: *bytes_done,
386 bytes_total: *bytes_total,
387 }),
388 _ => None,
389 }
390 }
391}
392
393#[derive(Debug, Clone, Default)]
394pub struct TransferControl {
395 cancelled: Arc<AtomicBool>,
396}
397
398impl TransferControl {
399 pub fn new() -> Self {
400 Self::default()
401 }
402
403 pub fn cancel(&self) {
404 self.cancelled.store(true, Ordering::SeqCst);
405 }
406
407 pub fn is_cancelled(&self) -> bool {
408 self.cancelled.load(Ordering::SeqCst)
409 }
410
411 fn check_cancelled(&self) -> Result<()> {
412 if self.is_cancelled() {
413 Err(Error::TransferCancelled)
414 } else {
415 Ok(())
416 }
417 }
418}
419
420#[derive(Debug, Clone, PartialEq, Eq)]
421pub struct TransferSummary {
422 pub path: PathBuf,
423 pub file_name: String,
424 pub bytes: u64,
425 pub blake3: String,
426 pub files: Vec<TransferFileSummary>,
427}
428
429#[derive(Debug, Clone, PartialEq, Eq)]
430pub struct TransferFileSummary {
431 pub path: PathBuf,
432 pub relative_path: PathBuf,
433 pub bytes: u64,
434 pub blake3: String,
435 pub status: TransferFileStatus,
436}
437
438#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
439#[serde(rename_all = "snake_case")]
440pub enum TransferFileStatus {
441 Sent,
442 Received,
443 Skipped,
444 Resumed,
445}
446
447#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
448pub struct TransferManifest {
449 pub version: u16,
450 pub session_id: Uuid,
451 pub chunk_size: u64,
452 pub entries: Vec<ManifestEntry>,
453}
454
455impl TransferManifest {
456 pub fn file_entries(&self) -> impl Iterator<Item = &ManifestEntry> {
457 self.entries
458 .iter()
459 .filter(|entry| entry.kind == ManifestEntryKind::File)
460 }
461
462 pub fn total_file_bytes(&self) -> u64 {
463 self.file_entries().map(|entry| entry.size).sum()
464 }
465
466 pub fn digest(&self) -> Result<String> {
467 let bytes = serde_json::to_vec(self).map_err(|error| Error::Protocol(error.to_string()))?;
468 Ok(blake3::hash(&bytes).to_hex().to_string())
469 }
470}
471
472#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
473pub struct ManifestEntry {
474 pub relative_path: PathBuf,
475 pub kind: ManifestEntryKind,
476 pub size: u64,
477 pub blake3: Option<String>,
478 pub chunks: Vec<String>,
479 pub unix_mode: Option<u32>,
480 pub modified_unix_ms: Option<i128>,
481}
482
483#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
484#[serde(rename_all = "snake_case")]
485pub enum ManifestEntryKind {
486 File,
487 Directory,
488}
489
490#[derive(Debug, Clone, PartialEq, Eq)]
491pub enum ResumeDecision {
492 Create,
493 Skip,
494 ResumeFrom(u64),
495 Conflict(String),
496}
497
498#[derive(Debug, Clone, PartialEq, Eq)]
499pub struct PeerObservation {
500 pub fingerprint: String,
501 pub alias: Option<String>,
502 pub hostname: Option<String>,
503 pub address: SocketAddr,
504 pub transports: BTreeSet<String>,
505 pub seen_unix_ms: i128,
506}
507
508impl PeerObservation {
509 pub fn new(fingerprint: impl Into<String>, address: SocketAddr, seen_unix_ms: i128) -> Self {
510 Self {
511 fingerprint: fingerprint.into(),
512 alias: None,
513 hostname: None,
514 address,
515 transports: BTreeSet::new(),
516 seen_unix_ms,
517 }
518 }
519
520 pub fn with_alias(mut self, alias: impl Into<String>) -> Self {
521 self.alias = Some(alias.into());
522 self
523 }
524
525 pub fn with_hostname(mut self, hostname: impl Into<String>) -> Self {
526 self.hostname = Some(hostname.into());
527 self
528 }
529
530 pub fn with_transport(mut self, transport: impl Into<String>) -> Self {
531 self.transports.insert(transport.into());
532 self
533 }
534}
535
536#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
537pub struct PeerRecord {
538 pub fingerprint: String,
539 pub aliases: BTreeSet<String>,
540 pub hostnames: BTreeSet<String>,
541 pub addresses: BTreeSet<SocketAddr>,
542 pub transports: BTreeSet<String>,
543 pub first_seen_unix_ms: i128,
544 pub last_seen_unix_ms: i128,
545}
546
547impl PeerRecord {
548 fn from_observation(observation: PeerObservation) -> Self {
549 let mut aliases = BTreeSet::new();
550 if let Some(alias) = observation.alias {
551 aliases.insert(alias);
552 }
553
554 let mut hostnames = BTreeSet::new();
555 if let Some(hostname) = observation.hostname {
556 hostnames.insert(hostname);
557 }
558
559 let mut addresses = BTreeSet::new();
560 addresses.insert(observation.address);
561
562 Self {
563 fingerprint: observation.fingerprint,
564 aliases,
565 hostnames,
566 addresses,
567 transports: observation.transports,
568 first_seen_unix_ms: observation.seen_unix_ms,
569 last_seen_unix_ms: observation.seen_unix_ms,
570 }
571 }
572
573 fn merge(&mut self, observation: PeerObservation) {
574 if let Some(alias) = observation.alias {
575 self.aliases.insert(alias);
576 }
577 if let Some(hostname) = observation.hostname {
578 self.hostnames.insert(hostname);
579 }
580 self.addresses.insert(observation.address);
581 self.transports.extend(observation.transports);
582 self.first_seen_unix_ms = self.first_seen_unix_ms.min(observation.seen_unix_ms);
583 self.last_seen_unix_ms = self.last_seen_unix_ms.max(observation.seen_unix_ms);
584 }
585
586 pub fn preferred_quic_address(&self) -> Option<SocketAddr> {
587 if !self.transports.contains("quic") {
588 return None;
589 }
590
591 self.addresses.iter().copied().next()
592 }
593}
594
595#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
596#[serde(rename_all = "snake_case")]
597pub enum TrustState {
598 Trusted,
599 Blocked,
600}
601
602#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
603pub struct KnownPeerEntry {
604 pub fingerprint: String,
605 #[serde(default)]
606 pub aliases: BTreeSet<String>,
607 #[serde(default)]
608 pub hostnames: BTreeSet<String>,
609 #[serde(default)]
610 pub addresses: BTreeSet<SocketAddr>,
611 #[serde(default)]
612 pub transports: BTreeSet<String>,
613 pub trust_state: TrustState,
614 pub first_seen_unix_ms: Option<i128>,
615 pub last_seen_unix_ms: Option<i128>,
616}
617
618impl KnownPeerEntry {
619 pub fn trusted(fingerprint: impl Into<String>) -> Result<Self> {
620 let fingerprint = normalized_fingerprint(fingerprint.into())?;
621 Ok(Self {
622 fingerprint,
623 aliases: BTreeSet::new(),
624 hostnames: BTreeSet::new(),
625 addresses: BTreeSet::new(),
626 transports: BTreeSet::new(),
627 trust_state: TrustState::Trusted,
628 first_seen_unix_ms: None,
629 last_seen_unix_ms: None,
630 })
631 }
632
633 pub fn from_record(record: &PeerRecord, trust_state: TrustState) -> Result<Self> {
634 let fingerprint = normalized_fingerprint(record.fingerprint.clone())?;
635 Ok(Self {
636 fingerprint,
637 aliases: record.aliases.clone(),
638 hostnames: record.hostnames.clone(),
639 addresses: record.addresses.clone(),
640 transports: record.transports.clone(),
641 trust_state,
642 first_seen_unix_ms: Some(record.first_seen_unix_ms),
643 last_seen_unix_ms: Some(record.last_seen_unix_ms),
644 })
645 }
646}
647
648#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
649pub struct TrustStore {
650 #[serde(default)]
651 pub peers: BTreeMap<String, KnownPeerEntry>,
652}
653
654impl TrustStore {
655 pub fn load_or_default() -> Result<Self> {
656 Self::load_or_default_from(config_dir()?.join("known_peers.toml"))
657 }
658
659 pub fn load_or_default_from(path: impl AsRef<Path>) -> Result<Self> {
660 let path = path.as_ref();
661 match std::fs::read_to_string(path) {
662 Ok(contents) => {
663 toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
664 }
665 Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Self::default()),
666 Err(source) => Err(Error::IoPath {
667 path: path.to_path_buf(),
668 source,
669 }),
670 }
671 }
672
673 pub fn save_default_path(&self) -> Result<()> {
674 self.save_to_path(config_dir()?.join("known_peers.toml"))
675 }
676
677 pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
678 write_toml_file(path.as_ref(), self)
679 }
680
681 pub fn trust_fingerprint(&mut self, fingerprint: impl Into<String>) -> Result<&KnownPeerEntry> {
682 let entry = KnownPeerEntry::trusted(fingerprint)?;
683 let fingerprint = entry.fingerprint.clone();
684 self.peers
685 .entry(fingerprint.clone())
686 .and_modify(|existing| existing.trust_state = TrustState::Trusted)
687 .or_insert(entry);
688 Ok(self
689 .peers
690 .get(&fingerprint)
691 .expect("trusted peer entry should exist"))
692 }
693
694 pub fn trust_record(&mut self, record: &PeerRecord) -> Result<&KnownPeerEntry> {
695 let entry = KnownPeerEntry::from_record(record, TrustState::Trusted)?;
696 let fingerprint = entry.fingerprint.clone();
697 self.peers
698 .entry(fingerprint.clone())
699 .and_modify(|existing| {
700 existing.aliases.extend(record.aliases.clone());
701 existing.hostnames.extend(record.hostnames.clone());
702 existing.addresses.extend(record.addresses.clone());
703 existing.transports.extend(record.transports.clone());
704 existing.first_seen_unix_ms = Some(
705 existing
706 .first_seen_unix_ms
707 .unwrap_or(record.first_seen_unix_ms)
708 .min(record.first_seen_unix_ms),
709 );
710 existing.last_seen_unix_ms = Some(
711 existing
712 .last_seen_unix_ms
713 .unwrap_or(record.last_seen_unix_ms)
714 .max(record.last_seen_unix_ms),
715 );
716 existing.trust_state = TrustState::Trusted;
717 })
718 .or_insert(entry);
719 Ok(self
720 .peers
721 .get(&fingerprint)
722 .expect("trusted peer entry should exist"))
723 }
724
725 pub fn forget(&mut self, fingerprint: &str) -> bool {
726 self.peers.remove(fingerprint).is_some()
727 }
728
729 pub fn records(&self) -> impl Iterator<Item = &KnownPeerEntry> {
730 self.peers.values()
731 }
732
733 pub fn to_toml_string(&self) -> Result<String> {
734 toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
735 }
736}
737
738#[derive(Debug, Default, Clone, PartialEq, Eq)]
739pub struct PeerRegistry {
740 peers: BTreeMap<String, PeerRecord>,
741}
742
743#[derive(Debug, Clone, PartialEq, Eq)]
744pub enum PeerLookup<'a> {
745 Found(&'a PeerRecord),
746 Ambiguous(Vec<&'a PeerRecord>),
747 Missing,
748}
749
750impl PeerRegistry {
751 pub fn new() -> Self {
752 Self::default()
753 }
754
755 pub fn observe(&mut self, observation: PeerObservation) -> Result<&PeerRecord> {
756 if observation.fingerprint.trim().is_empty() {
757 return Err(Error::InvalidInput(
758 "peer fingerprint must not be empty".to_string(),
759 ));
760 }
761
762 let fingerprint = observation.fingerprint.clone();
763 if let Some(record) = self.peers.get_mut(&fingerprint) {
764 record.merge(observation);
765 } else {
766 self.peers.insert(
767 fingerprint.clone(),
768 PeerRecord::from_observation(observation),
769 );
770 }
771
772 Ok(self
773 .peers
774 .get(&fingerprint)
775 .expect("observed peer record should exist"))
776 }
777
778 pub fn get_by_fingerprint(&self, fingerprint: &str) -> Option<&PeerRecord> {
779 self.peers.get(fingerprint)
780 }
781
782 pub fn lookup(&self, query: &str) -> Option<&PeerRecord> {
783 match self.lookup_detail(query) {
784 PeerLookup::Found(record) => Some(record),
785 PeerLookup::Ambiguous(_) | PeerLookup::Missing => None,
786 }
787 }
788
789 pub fn lookup_detail(&self, query: &str) -> PeerLookup<'_> {
790 if let Some(record) = self.peers.get(query) {
791 return PeerLookup::Found(record);
792 }
793
794 let matches = self
795 .peers
796 .values()
797 .filter(|record| {
798 record.fingerprint.starts_with(query)
799 || record.aliases.iter().any(|alias| alias == query)
800 || record.hostnames.iter().any(|hostname| hostname == query)
801 })
802 .collect::<Vec<_>>();
803
804 match matches.as_slice() {
805 [] => PeerLookup::Missing,
806 [record] => PeerLookup::Found(record),
807 _ => PeerLookup::Ambiguous(matches),
808 }
809 }
810
811 pub fn records(&self) -> impl Iterator<Item = &PeerRecord> {
812 self.peers.values()
813 }
814}
815
816#[derive(Debug, Clone, PartialEq, Eq)]
817pub struct NativeAnnouncement {
818 pub alias: String,
819 pub fingerprint: String,
820 pub quic_port: u16,
821 pub listen_port: Option<u16>,
822}
823
824impl NativeAnnouncement {
825 pub fn from_config(config: &AppConfig, identity: &NativeIdentity) -> Self {
826 Self {
827 alias: config.alias.clone(),
828 fingerprint: identity.fingerprint().to_string(),
829 quic_port: config.quic_port,
830 listen_port: Some(config.listen_port),
831 }
832 }
833
834 fn service_info(&self) -> Result<ServiceInfo> {
835 let alias = sanitize_dns_label(&self.alias);
836 let fingerprint_prefix = self
837 .fingerprint
838 .get(..12)
839 .unwrap_or(self.fingerprint.as_str());
840 let instance = format!("{alias}-{fingerprint_prefix}");
841 let hostname = format!("{alias}.local.");
842 let properties = [
843 ("pv", NATIVE_PROTOCOL_VERSION.to_string()),
844 ("minpv", MIN_NATIVE_PROTOCOL_VERSION.to_string()),
845 ("fp", self.fingerprint.clone()),
846 ("alias", self.alias.clone()),
847 ("transports", "quic".to_string()),
848 ("quic_port", self.quic_port.to_string()),
849 (
850 "listen_port",
851 self.listen_port
852 .map(|port| port.to_string())
853 .unwrap_or_default(),
854 ),
855 ];
856
857 ServiceInfo::new(
858 NATIVE_SERVICE_TYPE,
859 &instance,
860 &hostname,
861 "",
862 self.quic_port,
863 &properties[..],
864 )
865 .map(ServiceInfo::enable_addr_auto)
866 .map_err(|error| Error::Discovery(error.to_string()))
867 }
868}
869
870pub struct NativeAnnouncer {
871 daemon: ServiceDaemon,
872 fullname: String,
873}
874
875impl NativeAnnouncer {
876 pub fn start(announcement: &NativeAnnouncement) -> Result<Self> {
877 let daemon = ServiceDaemon::new().map_err(|error| Error::Discovery(error.to_string()))?;
878 let service = announcement.service_info()?;
879 let fullname = service.get_fullname().to_string();
880 daemon
881 .register(service)
882 .map_err(|error| Error::Discovery(error.to_string()))?;
883 Ok(Self { daemon, fullname })
884 }
885}
886
887impl Drop for NativeAnnouncer {
888 fn drop(&mut self) {
889 let _ = self.daemon.unregister(&self.fullname);
890 let _ = self.daemon.shutdown();
891 }
892}
893
894pub async fn discover_native_peers(timeout: Duration) -> Result<PeerRegistry> {
895 let daemon = ServiceDaemon::new().map_err(|error| Error::Discovery(error.to_string()))?;
896 let receiver = daemon
897 .browse(NATIVE_SERVICE_TYPE)
898 .map_err(|error| Error::Discovery(error.to_string()))?;
899 let deadline = tokio::time::Instant::now() + timeout;
900 let mut registry = PeerRegistry::new();
901
902 loop {
903 let now = tokio::time::Instant::now();
904 if now >= deadline {
905 break;
906 }
907
908 let wait = deadline - now;
909 match tokio::time::timeout(wait, receiver.recv_async()).await {
910 Ok(Ok(ServiceEvent::ServiceResolved(service))) => {
911 observe_resolved_service(&mut registry, &service)?;
912 }
913 Ok(Ok(_)) => {}
914 Ok(Err(error)) => {
915 return Err(Error::Discovery(error.to_string()));
916 }
917 Err(_) => break,
918 }
919 }
920
921 daemon
922 .stop_browse(NATIVE_SERVICE_TYPE)
923 .map_err(|error| Error::Discovery(error.to_string()))?;
924 let _ = daemon.shutdown();
925 Ok(registry)
926}
927
928fn observe_resolved_service(registry: &mut PeerRegistry, service: &ResolvedService) -> Result<()> {
929 let Some(fingerprint) = service.get_property_val_str("fp") else {
930 return Ok(());
931 };
932 let fingerprint = match normalized_fingerprint(fingerprint.to_string()) {
933 Ok(fingerprint) => fingerprint,
934 Err(_) => return Ok(()),
935 };
936 let Some(highest_version) = parse_txt_u16(service, "pv") else {
937 return Ok(());
938 };
939 let Some(minimum_version) = parse_txt_u16(service, "minpv") else {
940 return Ok(());
941 };
942 if minimum_version > highest_version
943 || minimum_version > NATIVE_PROTOCOL_VERSION
944 || highest_version < MIN_NATIVE_PROTOCOL_VERSION
945 {
946 return Ok(());
947 }
948
949 let Some(quic_port) = parse_txt_u16(service, "quic_port") else {
950 return Ok(());
951 };
952 let transports = service
953 .get_property_val_str("transports")
954 .unwrap_or_default()
955 .split(',')
956 .map(str::trim)
957 .filter(|value| !value.is_empty())
958 .map(ToOwned::to_owned)
959 .collect::<Vec<_>>();
960 if !transports.iter().any(|transport| transport == "quic") {
961 return Ok(());
962 }
963
964 let alias = service.get_property_val_str("alias").map(ToOwned::to_owned);
965 let hostname = Some(service.get_hostname().trim_end_matches('.').to_string());
966 let seen_unix_ms = system_time_unix_ms(SystemTime::now()).unwrap_or_default();
967
968 for address in service.get_addresses_v4() {
969 let mut observation = PeerObservation::new(
970 fingerprint.clone(),
971 SocketAddr::from((address, quic_port)),
972 seen_unix_ms,
973 );
974 if let Some(alias) = &alias {
975 observation = observation.with_alias(alias.clone());
976 }
977 if let Some(hostname) = &hostname {
978 observation = observation.with_hostname(hostname.clone());
979 }
980 for transport in &transports {
981 observation = observation.with_transport(transport.clone());
982 }
983 registry.observe(observation)?;
984 }
985
986 Ok(())
987}
988
989fn parse_txt_u16(service: &ResolvedService, key: &str) -> Option<u16> {
990 service.get_property_val_str(key)?.parse().ok()
991}
992
993fn sanitize_dns_label(value: &str) -> String {
994 let mut label = value
995 .chars()
996 .map(|ch| {
997 if ch.is_ascii_alphanumeric() || ch == '-' {
998 ch.to_ascii_lowercase()
999 } else {
1000 '-'
1001 }
1002 })
1003 .collect::<String>();
1004 while label.contains("--") {
1005 label = label.replace("--", "-");
1006 }
1007 let label = label.trim_matches('-').to_string();
1008 if label.is_empty() {
1009 "fileferry-device".to_string()
1010 } else {
1011 label
1012 }
1013}
1014
1015#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1016struct DirectReceivePlan {
1017 files: Vec<DirectFilePlan>,
1018}
1019
1020#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1021struct DirectProtocolHello {
1022 protocol_version: u16,
1023 min_protocol_version: u16,
1024 client_fingerprint: String,
1025}
1026
1027impl DirectProtocolHello {
1028 fn new(identity: &NativeIdentity) -> Self {
1029 Self {
1030 protocol_version: NATIVE_PROTOCOL_VERSION,
1031 min_protocol_version: MIN_NATIVE_PROTOCOL_VERSION,
1032 client_fingerprint: identity.fingerprint().to_string(),
1033 }
1034 }
1035}
1036
1037#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1038struct DirectProtocolResponse {
1039 accepted: bool,
1040 protocol_version: Option<u16>,
1041 error: Option<String>,
1042}
1043
1044impl DirectProtocolResponse {
1045 fn accept(protocol_version: u16) -> Self {
1046 Self {
1047 accepted: true,
1048 protocol_version: Some(protocol_version),
1049 error: None,
1050 }
1051 }
1052
1053 fn reject(error: impl Into<String>) -> Self {
1054 Self {
1055 accepted: false,
1056 protocol_version: None,
1057 error: Some(error.into()),
1058 }
1059 }
1060}
1061
1062#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1063struct DirectFilePlan {
1064 relative_path: PathBuf,
1065 action: DirectFileAction,
1066 offset: u64,
1067}
1068
1069#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1070#[serde(rename_all = "snake_case")]
1071enum DirectFileAction {
1072 Receive,
1073 Skip,
1074}
1075
1076#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1077struct DirectPayloadHeader {
1078 relative_path: PathBuf,
1079 offset: u64,
1080 bytes: u64,
1081}
1082
1083#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1084struct DirectAck {
1085 ok: bool,
1086 error: Option<String>,
1087}
1088
1089pub fn validate_send_paths(paths: &[PathBuf]) -> Result<()> {
1090 if paths.is_empty() {
1091 return Err(Error::InvalidInput(
1092 "at least one file path is required".to_string(),
1093 ));
1094 }
1095
1096 for path in paths {
1097 if path.as_os_str().is_empty() {
1098 return Err(Error::InvalidInput("empty file path".to_string()));
1099 }
1100 }
1101
1102 Ok(())
1103}
1104
1105pub fn display_path(path: &Path) -> String {
1106 path.to_string_lossy().into_owned()
1107}
1108
1109pub async fn build_manifest(paths: &[PathBuf]) -> Result<TransferManifest> {
1110 validate_send_paths(paths)?;
1111
1112 let sources = collect_manifest_sources(paths)?;
1113 let mut entries = Vec::with_capacity(sources.len());
1114 let mut seen = BTreeSet::new();
1115
1116 for source in sources {
1117 if !seen.insert(source.relative_path.clone()) {
1118 return Err(Error::InvalidInput(format!(
1119 "duplicate transfer path: {}",
1120 display_path(&source.relative_path)
1121 )));
1122 }
1123
1124 entries.push(build_manifest_entry(source).await?);
1125 }
1126
1127 Ok(TransferManifest {
1128 version: MANIFEST_VERSION,
1129 session_id: Uuid::new_v4(),
1130 chunk_size: DEFAULT_CHUNK_SIZE,
1131 entries,
1132 })
1133}
1134
1135pub fn safe_destination_path(destination: &Path, relative_path: &Path) -> Result<PathBuf> {
1136 validate_relative_transfer_path(relative_path)?;
1137 Ok(destination.join(relative_path))
1138}
1139
1140pub async fn decide_resume(
1141 destination: &Path,
1142 entry: &ManifestEntry,
1143 chunk_size: u64,
1144) -> Result<ResumeDecision> {
1145 if entry.kind != ManifestEntryKind::File {
1146 return Ok(ResumeDecision::Create);
1147 }
1148
1149 let path = safe_destination_path(destination, &entry.relative_path)?;
1150 let metadata = match tokio::fs::metadata(&path).await {
1151 Ok(metadata) => metadata,
1152 Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
1153 return Ok(ResumeDecision::Create);
1154 }
1155 Err(source) => {
1156 return Err(Error::IoPath { path, source });
1157 }
1158 };
1159
1160 if !metadata.is_file() {
1161 return Ok(ResumeDecision::Conflict(format!(
1162 "{} already exists and is not a regular file",
1163 display_path(&path)
1164 )));
1165 }
1166
1167 if metadata.len() == entry.size {
1168 let expected = entry
1169 .blake3
1170 .as_deref()
1171 .ok_or_else(|| Error::Protocol("file manifest entry is missing BLAKE3".to_string()))?;
1172 let actual = hash_file(&path).await?;
1173 return if actual == expected {
1174 Ok(ResumeDecision::Skip)
1175 } else {
1176 Ok(ResumeDecision::Conflict(format!(
1177 "{} already exists with different contents",
1178 display_path(&path)
1179 )))
1180 };
1181 }
1182
1183 if metadata.len() > entry.size {
1184 return Ok(ResumeDecision::Conflict(format!(
1185 "{} is larger than incoming file",
1186 display_path(&path)
1187 )));
1188 }
1189
1190 let verified_offset = verified_prefix_offset(&path, entry, metadata.len(), chunk_size).await?;
1191 if verified_offset > 0 {
1192 Ok(ResumeDecision::ResumeFrom(verified_offset))
1193 } else {
1194 Ok(ResumeDecision::Conflict(format!(
1195 "{} already exists and does not match a verified prefix",
1196 display_path(&path)
1197 )))
1198 }
1199}
1200
1201pub async fn send_direct_file(
1202 request: &SendRequest,
1203 identity: &NativeIdentity,
1204 mut events: impl FnMut(TransferEvent),
1205) -> Result<TransferSummary> {
1206 send_direct_file_with_control(request, identity, TransferControl::new(), &mut events).await
1207}
1208
1209pub async fn send_direct_file_with_control(
1210 request: &SendRequest,
1211 identity: &NativeIdentity,
1212 control: TransferControl,
1213 mut events: impl FnMut(TransferEvent),
1214) -> Result<TransferSummary> {
1215 ensure_rustls_provider();
1216
1217 control.check_cancelled()?;
1218 let manifest = build_manifest(request.paths()).await?;
1219 let sources = manifest_source_map(request.paths())?;
1220 events(TransferEvent::SessionStarted {
1221 direction: TransferDirection::Send,
1222 session_id: manifest.session_id,
1223 files_total: manifest.file_entries().count(),
1224 bytes_total: manifest.total_file_bytes(),
1225 });
1226 control.check_cancelled()?;
1227
1228 let mut endpoint = quinn::Endpoint::client(SocketAddr::from(([0, 0, 0, 0], 0)))?;
1229 endpoint.set_default_client_config(insecure_client_config()?);
1230 let connection = endpoint
1231 .connect(request.peer().address(), "localhost")?
1232 .await?;
1233 let (mut send, mut recv) = connection.open_bi().await?;
1234
1235 send.write_all(DIRECT_MAGIC).await?;
1236 write_json_frame(&mut send, &DirectProtocolHello::new(identity)).await?;
1237 let protocol: DirectProtocolResponse = read_json_frame(&mut recv).await?;
1238 if !protocol.accepted {
1239 return Err(Error::Protocol(protocol.error.unwrap_or_else(|| {
1240 "receiver rejected protocol negotiation".to_string()
1241 })));
1242 }
1243 if protocol.protocol_version != Some(NATIVE_PROTOCOL_VERSION) {
1244 return Err(Error::Protocol(format!(
1245 "receiver selected unsupported protocol version {:?}",
1246 protocol.protocol_version
1247 )));
1248 }
1249
1250 write_json_frame(&mut send, &manifest).await?;
1251
1252 let plan: DirectReceivePlan = read_json_frame(&mut recv).await?;
1253 let mut summaries = Vec::new();
1254 for file_plan in plan.files {
1255 let Some(entry) = manifest
1256 .file_entries()
1257 .find(|entry| entry.relative_path == file_plan.relative_path)
1258 else {
1259 return Err(Error::Protocol(format!(
1260 "receiver requested unknown file: {}",
1261 display_path(&file_plan.relative_path)
1262 )));
1263 };
1264
1265 let Some(source_path) = sources.get(&file_plan.relative_path) else {
1266 return Err(Error::Protocol(format!(
1267 "missing source for manifest file: {}",
1268 display_path(&file_plan.relative_path)
1269 )));
1270 };
1271
1272 if file_plan.action == DirectFileAction::Skip {
1273 let blake3 = entry.blake3.clone().unwrap_or_default();
1274 events(TransferEvent::FileFinished {
1275 direction: TransferDirection::Send,
1276 file_name: display_path(&entry.relative_path),
1277 bytes: entry.size,
1278 blake3: blake3.clone(),
1279 status: TransferFileStatus::Skipped,
1280 });
1281 summaries.push(TransferFileSummary {
1282 path: source_path.clone(),
1283 relative_path: entry.relative_path.clone(),
1284 bytes: entry.size,
1285 blake3,
1286 status: TransferFileStatus::Skipped,
1287 });
1288 continue;
1289 }
1290
1291 match send_payload_file(
1292 &mut send,
1293 source_path,
1294 entry,
1295 file_plan.offset,
1296 &control,
1297 &mut events,
1298 )
1299 .await
1300 {
1301 Ok(()) => {}
1302 Err(Error::TransferCancelled) => {
1303 events(TransferEvent::SessionCancelled {
1304 direction: TransferDirection::Send,
1305 session_id: manifest.session_id,
1306 });
1307 connection.close(1u32.into(), b"cancelled");
1308 return Err(Error::TransferCancelled);
1309 }
1310 Err(error) => return Err(error),
1311 }
1312 let status = if file_plan.offset > 0 {
1313 TransferFileStatus::Resumed
1314 } else {
1315 TransferFileStatus::Sent
1316 };
1317 let blake3 = entry.blake3.clone().unwrap_or_default();
1318 events(TransferEvent::FileFinished {
1319 direction: TransferDirection::Send,
1320 file_name: display_path(&entry.relative_path),
1321 bytes: entry.size,
1322 blake3: blake3.clone(),
1323 status,
1324 });
1325 summaries.push(TransferFileSummary {
1326 path: source_path.clone(),
1327 relative_path: entry.relative_path.clone(),
1328 bytes: entry.size,
1329 blake3,
1330 status,
1331 });
1332 }
1333 send.finish()?;
1334
1335 let ack: DirectAck = read_json_frame(&mut recv).await?;
1336 if !ack.ok {
1337 return Err(Error::Transfer(
1338 ack.error
1339 .unwrap_or_else(|| "receiver rejected transfer".to_string()),
1340 ));
1341 }
1342
1343 connection.close(0u32.into(), b"done");
1344 endpoint.wait_idle().await;
1345
1346 let summary = summary_from_files(&manifest, summaries)?;
1347 events(TransferEvent::SessionFinished {
1348 direction: TransferDirection::Send,
1349 files_total: summary.files.len(),
1350 bytes_total: summary.bytes,
1351 blake3: summary.blake3.clone(),
1352 });
1353 Ok(summary)
1354}
1355
1356pub async fn receive_direct_file(
1357 request: &ReceiveRequest,
1358 destination: impl AsRef<Path>,
1359 identity: &NativeIdentity,
1360 events: impl FnMut(TransferEvent),
1361) -> Result<TransferSummary> {
1362 receive_direct_file_with_control(
1363 request,
1364 destination,
1365 identity,
1366 TransferControl::new(),
1367 events,
1368 )
1369 .await
1370}
1371
1372pub async fn receive_direct_file_with_control(
1373 request: &ReceiveRequest,
1374 destination: impl AsRef<Path>,
1375 identity: &NativeIdentity,
1376 control: TransferControl,
1377 mut events: impl FnMut(TransferEvent),
1378) -> Result<TransferSummary> {
1379 ensure_rustls_provider();
1380
1381 control.check_cancelled()?;
1382 let destination = destination.as_ref();
1383 tokio::fs::create_dir_all(destination)
1384 .await
1385 .map_err(|source| Error::IoPath {
1386 path: destination.to_path_buf(),
1387 source,
1388 })?;
1389
1390 let server_config =
1391 quinn::ServerConfig::with_single_cert(identity.cert_chain(), identity.private_key())?;
1392 let endpoint = quinn::Endpoint::server(server_config, request.listen())?;
1393 let incoming = endpoint.accept().await.ok_or(Error::NoIncomingConnection)?;
1394 let connection = incoming.await?;
1395 let (mut send, mut recv) = connection.accept_bi().await?;
1396
1397 let result =
1398 receive_stream_file(destination, &mut recv, &mut send, &control, &mut events).await;
1399 let ack = match &result {
1400 Ok(_) => DirectAck {
1401 ok: true,
1402 error: None,
1403 },
1404 Err(error) => DirectAck {
1405 ok: false,
1406 error: Some(error.to_string()),
1407 },
1408 };
1409 write_json_frame(&mut send, &ack).await?;
1410 send.finish()?;
1411 let _ = tokio::time::timeout(Duration::from_secs(1), connection.closed()).await;
1412 endpoint.close(0u32.into(), b"done");
1413 endpoint.wait_idle().await;
1414
1415 result
1416}
1417
1418async fn receive_stream_file(
1419 destination: &Path,
1420 recv: &mut quinn::RecvStream,
1421 send: &mut quinn::SendStream,
1422 control: &TransferControl,
1423 events: &mut impl FnMut(TransferEvent),
1424) -> Result<TransferSummary> {
1425 let mut magic = [0; DIRECT_MAGIC.len()];
1426 recv.read_exact(&mut magic).await?;
1427 if &magic != DIRECT_MAGIC {
1428 return Err(Error::Protocol("bad direct-transfer magic".to_string()));
1429 }
1430
1431 let hello: DirectProtocolHello = read_json_frame(recv).await?;
1432 let protocol = negotiate_direct_protocol(&hello);
1433 write_json_frame(send, &protocol).await?;
1434 if !protocol.accepted {
1435 return Err(Error::Protocol(protocol.error.unwrap_or_else(|| {
1436 "unsupported direct protocol version".to_string()
1437 })));
1438 }
1439
1440 let manifest: TransferManifest = read_json_frame(recv).await?;
1441 validate_manifest(&manifest)?;
1442 events(TransferEvent::SessionStarted {
1443 direction: TransferDirection::Receive,
1444 session_id: manifest.session_id,
1445 files_total: manifest.file_entries().count(),
1446 bytes_total: manifest.total_file_bytes(),
1447 });
1448 control.check_cancelled()?;
1449
1450 for entry in &manifest.entries {
1451 if entry.kind == ManifestEntryKind::Directory {
1452 let path = safe_destination_path(destination, &entry.relative_path)?;
1453 tokio::fs::create_dir_all(&path)
1454 .await
1455 .map_err(|source| Error::IoPath { path, source })?;
1456 }
1457 }
1458
1459 let mut plan = DirectReceivePlan { files: Vec::new() };
1460 for entry in manifest.file_entries() {
1461 let decision = decide_resume(destination, entry, manifest.chunk_size).await?;
1462 let (action, offset) = match decision {
1463 ResumeDecision::Create => (DirectFileAction::Receive, 0),
1464 ResumeDecision::Skip => (DirectFileAction::Skip, entry.size),
1465 ResumeDecision::ResumeFrom(offset) => (DirectFileAction::Receive, offset),
1466 ResumeDecision::Conflict(message) => return Err(Error::InvalidInput(message)),
1467 };
1468 plan.files.push(DirectFilePlan {
1469 relative_path: entry.relative_path.clone(),
1470 action,
1471 offset,
1472 });
1473 }
1474 write_json_frame(send, &plan).await?;
1475
1476 let mut summaries = Vec::new();
1477 for file_plan in &plan.files {
1478 let Some(entry) = manifest
1479 .file_entries()
1480 .find(|entry| entry.relative_path == file_plan.relative_path)
1481 else {
1482 return Err(Error::Protocol(format!(
1483 "planned file missing from manifest: {}",
1484 display_path(&file_plan.relative_path)
1485 )));
1486 };
1487
1488 let final_path = safe_destination_path(destination, &entry.relative_path)?;
1489 if file_plan.action == DirectFileAction::Skip {
1490 let blake3 = entry.blake3.clone().unwrap_or_default();
1491 events(TransferEvent::FileFinished {
1492 direction: TransferDirection::Receive,
1493 file_name: display_path(&entry.relative_path),
1494 bytes: entry.size,
1495 blake3: blake3.clone(),
1496 status: TransferFileStatus::Skipped,
1497 });
1498 summaries.push(TransferFileSummary {
1499 path: final_path,
1500 relative_path: entry.relative_path.clone(),
1501 bytes: entry.size,
1502 blake3,
1503 status: TransferFileStatus::Skipped,
1504 });
1505 continue;
1506 }
1507
1508 match receive_payload_file(recv, destination, entry, file_plan.offset, control, events)
1509 .await
1510 {
1511 Ok(()) => {}
1512 Err(Error::TransferCancelled) => {
1513 events(TransferEvent::SessionCancelled {
1514 direction: TransferDirection::Receive,
1515 session_id: manifest.session_id,
1516 });
1517 return Err(Error::TransferCancelled);
1518 }
1519 Err(error) => return Err(error),
1520 }
1521 let actual_hash = hash_file(&final_path).await?;
1522 let expected_hash = entry
1523 .blake3
1524 .as_deref()
1525 .ok_or_else(|| Error::Protocol("file manifest entry is missing BLAKE3".to_string()))?;
1526 if actual_hash != expected_hash {
1527 return Err(Error::Transfer(format!(
1528 "BLAKE3 hash mismatch for {}",
1529 display_path(&entry.relative_path)
1530 )));
1531 }
1532
1533 let status = if file_plan.offset > 0 {
1534 TransferFileStatus::Resumed
1535 } else {
1536 TransferFileStatus::Received
1537 };
1538 events(TransferEvent::FileFinished {
1539 direction: TransferDirection::Receive,
1540 file_name: display_path(&entry.relative_path),
1541 bytes: entry.size,
1542 blake3: actual_hash.clone(),
1543 status,
1544 });
1545 summaries.push(TransferFileSummary {
1546 path: final_path,
1547 relative_path: entry.relative_path.clone(),
1548 bytes: entry.size,
1549 blake3: actual_hash,
1550 status,
1551 });
1552 }
1553
1554 let summary = summary_from_files(&manifest, summaries)?;
1555 events(TransferEvent::SessionFinished {
1556 direction: TransferDirection::Receive,
1557 files_total: summary.files.len(),
1558 bytes_total: summary.bytes,
1559 blake3: summary.blake3.clone(),
1560 });
1561 Ok(summary)
1562}
1563
1564async fn send_payload_file(
1565 send: &mut quinn::SendStream,
1566 source_path: &Path,
1567 entry: &ManifestEntry,
1568 offset: u64,
1569 control: &TransferControl,
1570 events: &mut impl FnMut(TransferEvent),
1571) -> Result<()> {
1572 if offset > entry.size {
1573 return Err(Error::Protocol(format!(
1574 "resume offset exceeds file size for {}",
1575 display_path(&entry.relative_path)
1576 )));
1577 }
1578
1579 let header = DirectPayloadHeader {
1580 relative_path: entry.relative_path.clone(),
1581 offset,
1582 bytes: entry.size - offset,
1583 };
1584 write_json_frame(send, &header).await?;
1585 events(TransferEvent::FileStarted {
1586 direction: TransferDirection::Send,
1587 file_name: display_path(&entry.relative_path),
1588 bytes_total: entry.size,
1589 resume_offset: offset,
1590 });
1591
1592 let mut file = tokio::fs::File::open(source_path)
1593 .await
1594 .map_err(|source| Error::IoPath {
1595 path: source_path.to_path_buf(),
1596 source,
1597 })?;
1598 file.seek(SeekFrom::Start(offset))
1599 .await
1600 .map_err(|source| Error::IoPath {
1601 path: source_path.to_path_buf(),
1602 source,
1603 })?;
1604
1605 let mut buffer = vec![0; IO_BUFFER_SIZE];
1606 let mut bytes_done = offset;
1607 while bytes_done < entry.size {
1608 control.check_cancelled()?;
1609 let remaining = entry.size - bytes_done;
1610 let read_size = buffer.len().min(remaining as usize);
1611 let read = file
1612 .read(&mut buffer[..read_size])
1613 .await
1614 .map_err(|source| Error::IoPath {
1615 path: source_path.to_path_buf(),
1616 source,
1617 })?;
1618 if read == 0 {
1619 return Err(Error::Protocol(format!(
1620 "source file ended early: {}",
1621 display_path(source_path)
1622 )));
1623 }
1624 send.write_all(&buffer[..read]).await?;
1625 bytes_done += read as u64;
1626 events(TransferEvent::Progress {
1627 direction: TransferDirection::Send,
1628 file_name: display_path(&entry.relative_path),
1629 bytes_done,
1630 bytes_total: entry.size,
1631 });
1632 control.check_cancelled()?;
1633 }
1634
1635 Ok(())
1636}
1637
1638async fn receive_payload_file(
1639 recv: &mut quinn::RecvStream,
1640 destination: &Path,
1641 entry: &ManifestEntry,
1642 offset: u64,
1643 control: &TransferControl,
1644 events: &mut impl FnMut(TransferEvent),
1645) -> Result<()> {
1646 let header: DirectPayloadHeader = read_json_frame(recv).await?;
1647 if header.relative_path != entry.relative_path
1648 || header.offset != offset
1649 || header.bytes != entry.size - offset
1650 {
1651 return Err(Error::Protocol(format!(
1652 "payload header does not match receive plan for {}",
1653 display_path(&entry.relative_path)
1654 )));
1655 }
1656 events(TransferEvent::FileStarted {
1657 direction: TransferDirection::Receive,
1658 file_name: display_path(&entry.relative_path),
1659 bytes_total: entry.size,
1660 resume_offset: offset,
1661 });
1662
1663 let final_path = safe_destination_path(destination, &entry.relative_path)?;
1664 if let Some(parent) = final_path.parent() {
1665 tokio::fs::create_dir_all(parent)
1666 .await
1667 .map_err(|source| Error::IoPath {
1668 path: parent.to_path_buf(),
1669 source,
1670 })?;
1671 }
1672
1673 let write_path = if offset == 0 {
1674 temp_path_for(&final_path)
1675 } else {
1676 final_path.clone()
1677 };
1678
1679 let mut file = if offset == 0 {
1680 tokio::fs::File::create(&write_path)
1681 .await
1682 .map_err(|source| Error::IoPath {
1683 path: write_path.clone(),
1684 source,
1685 })?
1686 } else {
1687 let mut file = tokio::fs::OpenOptions::new()
1688 .write(true)
1689 .open(&write_path)
1690 .await
1691 .map_err(|source| Error::IoPath {
1692 path: write_path.clone(),
1693 source,
1694 })?;
1695 file.set_len(offset).await.map_err(|source| Error::IoPath {
1696 path: write_path.clone(),
1697 source,
1698 })?;
1699 file.seek(SeekFrom::Start(offset))
1700 .await
1701 .map_err(|source| Error::IoPath {
1702 path: write_path.clone(),
1703 source,
1704 })?;
1705 file
1706 };
1707
1708 let mut bytes_done = offset;
1709 let mut bytes_remaining = header.bytes;
1710 let mut buffer = vec![0; IO_BUFFER_SIZE];
1711 while bytes_remaining > 0 {
1712 if control.is_cancelled() {
1713 if offset == 0 {
1714 let _ = tokio::fs::remove_file(&write_path).await;
1715 }
1716 return Err(Error::TransferCancelled);
1717 }
1718 let read_size = buffer.len().min(bytes_remaining as usize);
1719 let read = match recv.read(&mut buffer[..read_size]).await {
1720 Ok(Some(read)) => read,
1721 Ok(None) => {
1722 if offset == 0 {
1723 let _ = tokio::fs::remove_file(&write_path).await;
1724 }
1725 return Err(Error::Protocol(
1726 "stream ended before file payload".to_string(),
1727 ));
1728 }
1729 Err(error) => {
1730 if offset == 0 {
1731 let _ = tokio::fs::remove_file(&write_path).await;
1732 }
1733 return Err(Error::Read(error));
1734 }
1735 };
1736
1737 file.write_all(&buffer[..read])
1738 .await
1739 .map_err(|source| Error::IoPath {
1740 path: write_path.clone(),
1741 source,
1742 })?;
1743 bytes_done += read as u64;
1744 bytes_remaining -= read as u64;
1745 events(TransferEvent::Progress {
1746 direction: TransferDirection::Receive,
1747 file_name: display_path(&entry.relative_path),
1748 bytes_done,
1749 bytes_total: entry.size,
1750 });
1751 if control.is_cancelled() {
1752 if offset == 0 {
1753 let _ = tokio::fs::remove_file(&write_path).await;
1754 }
1755 return Err(Error::TransferCancelled);
1756 }
1757 }
1758
1759 file.flush().await.map_err(|source| Error::IoPath {
1760 path: write_path.clone(),
1761 source,
1762 })?;
1763 drop(file);
1764
1765 if offset == 0 {
1766 tokio::fs::rename(&write_path, &final_path)
1767 .await
1768 .map_err(|source| Error::IoPath {
1769 path: final_path,
1770 source,
1771 })?;
1772 }
1773
1774 Ok(())
1775}
1776
1777async fn write_json_frame<T: Serialize>(send: &mut quinn::SendStream, value: &T) -> Result<()> {
1778 let bytes = serde_json::to_vec(value).map_err(|error| Error::Protocol(error.to_string()))?;
1779 if bytes.len() > MAX_FRAME_LEN {
1780 return Err(Error::Protocol("frame exceeds maximum length".to_string()));
1781 }
1782 send.write_all(&(bytes.len() as u32).to_be_bytes()).await?;
1783 send.write_all(&bytes).await?;
1784 Ok(())
1785}
1786
1787async fn read_json_frame<T: for<'de> Deserialize<'de>>(recv: &mut quinn::RecvStream) -> Result<T> {
1788 let mut len = [0; 4];
1789 recv.read_exact(&mut len).await?;
1790 let len = u32::from_be_bytes(len) as usize;
1791 if len > MAX_FRAME_LEN {
1792 return Err(Error::Protocol("frame exceeds maximum length".to_string()));
1793 }
1794
1795 let mut bytes = vec![0; len];
1796 recv.read_exact(&mut bytes).await?;
1797 serde_json::from_slice(&bytes).map_err(|error| Error::Protocol(error.to_string()))
1798}
1799
1800fn negotiate_direct_protocol(hello: &DirectProtocolHello) -> DirectProtocolResponse {
1801 if hello.min_protocol_version > NATIVE_PROTOCOL_VERSION {
1802 return DirectProtocolResponse::reject(format!(
1803 "peer requires protocol version {}, local max is {}",
1804 hello.min_protocol_version, NATIVE_PROTOCOL_VERSION
1805 ));
1806 }
1807
1808 if hello.protocol_version < MIN_NATIVE_PROTOCOL_VERSION {
1809 return DirectProtocolResponse::reject(format!(
1810 "peer max protocol version {} is below local minimum {}",
1811 hello.protocol_version, MIN_NATIVE_PROTOCOL_VERSION
1812 ));
1813 }
1814
1815 DirectProtocolResponse::accept(NATIVE_PROTOCOL_VERSION.min(hello.protocol_version))
1816}
1817
1818#[derive(Debug, Clone)]
1819struct ManifestSource {
1820 source_path: PathBuf,
1821 relative_path: PathBuf,
1822}
1823
1824fn collect_manifest_sources(paths: &[PathBuf]) -> Result<Vec<ManifestSource>> {
1825 let mut sources = Vec::new();
1826 for path in paths {
1827 let metadata = std::fs::symlink_metadata(path).map_err(|source| Error::IoPath {
1828 path: path.clone(),
1829 source,
1830 })?;
1831 if metadata.file_type().is_symlink() {
1832 return Err(Error::InvalidInput(format!(
1833 "{} is a symlink; symlink transfers are not supported",
1834 display_path(path)
1835 )));
1836 }
1837
1838 let root_name = path
1839 .file_name()
1840 .ok_or_else(|| Error::InvalidInput(format!("{} has no file name", display_path(path))))?
1841 .to_os_string();
1842 let relative_path = PathBuf::from(root_name);
1843
1844 if metadata.is_dir() {
1845 sources.push(ManifestSource {
1846 source_path: path.clone(),
1847 relative_path: relative_path.clone(),
1848 });
1849 collect_dir_sources(path, &relative_path, &mut sources)?;
1850 } else if metadata.is_file() {
1851 sources.push(ManifestSource {
1852 source_path: path.clone(),
1853 relative_path,
1854 });
1855 } else {
1856 return Err(Error::InvalidInput(format!(
1857 "{} is not a regular file or directory",
1858 display_path(path)
1859 )));
1860 }
1861 }
1862
1863 Ok(sources)
1864}
1865
1866fn collect_dir_sources(
1867 dir: &Path,
1868 relative_dir: &Path,
1869 sources: &mut Vec<ManifestSource>,
1870) -> Result<()> {
1871 for entry in std::fs::read_dir(dir).map_err(|source| Error::IoPath {
1872 path: dir.to_path_buf(),
1873 source,
1874 })? {
1875 let entry = entry.map_err(|source| Error::IoPath {
1876 path: dir.to_path_buf(),
1877 source,
1878 })?;
1879 let path = entry.path();
1880 let metadata = std::fs::symlink_metadata(&path).map_err(|source| Error::IoPath {
1881 path: path.clone(),
1882 source,
1883 })?;
1884 if metadata.file_type().is_symlink() {
1885 return Err(Error::InvalidInput(format!(
1886 "{} is a symlink; symlink transfers are not supported",
1887 display_path(&path)
1888 )));
1889 }
1890
1891 let relative_path = relative_dir.join(entry.file_name());
1892 if metadata.is_dir() {
1893 sources.push(ManifestSource {
1894 source_path: path.clone(),
1895 relative_path: relative_path.clone(),
1896 });
1897 collect_dir_sources(&path, &relative_path, sources)?;
1898 } else if metadata.is_file() {
1899 sources.push(ManifestSource {
1900 source_path: path,
1901 relative_path,
1902 });
1903 }
1904 }
1905
1906 Ok(())
1907}
1908
1909fn manifest_source_map(paths: &[PathBuf]) -> Result<BTreeMap<PathBuf, PathBuf>> {
1910 Ok(collect_manifest_sources(paths)?
1911 .into_iter()
1912 .filter_map(|source| {
1913 let metadata = std::fs::metadata(&source.source_path).ok()?;
1914 metadata
1915 .is_file()
1916 .then_some((source.relative_path, source.source_path))
1917 })
1918 .collect())
1919}
1920
1921async fn build_manifest_entry(source: ManifestSource) -> Result<ManifestEntry> {
1922 validate_relative_transfer_path(&source.relative_path)?;
1923 let metadata = tokio::fs::metadata(&source.source_path)
1924 .await
1925 .map_err(|source_error| Error::IoPath {
1926 path: source.source_path.clone(),
1927 source: source_error,
1928 })?;
1929 let unix_mode = unix_mode(&metadata);
1930 let modified_unix_ms = modified_unix_ms(&metadata);
1931
1932 if metadata.is_dir() {
1933 return Ok(ManifestEntry {
1934 relative_path: source.relative_path,
1935 kind: ManifestEntryKind::Directory,
1936 size: 0,
1937 blake3: None,
1938 chunks: Vec::new(),
1939 unix_mode,
1940 modified_unix_ms,
1941 });
1942 }
1943
1944 let (blake3, chunks) = hash_file_with_chunks(&source.source_path, DEFAULT_CHUNK_SIZE).await?;
1945 Ok(ManifestEntry {
1946 relative_path: source.relative_path,
1947 kind: ManifestEntryKind::File,
1948 size: metadata.len(),
1949 blake3: Some(blake3),
1950 chunks,
1951 unix_mode,
1952 modified_unix_ms,
1953 })
1954}
1955
1956fn validate_manifest(manifest: &TransferManifest) -> Result<()> {
1957 if manifest.version != MANIFEST_VERSION {
1958 return Err(Error::Protocol(format!(
1959 "unsupported manifest version {}",
1960 manifest.version
1961 )));
1962 }
1963 if manifest.chunk_size == 0 {
1964 return Err(Error::Protocol("manifest chunk size is zero".to_string()));
1965 }
1966
1967 let mut seen = BTreeSet::new();
1968 for entry in &manifest.entries {
1969 validate_relative_transfer_path(&entry.relative_path)?;
1970 if !seen.insert(entry.relative_path.clone()) {
1971 return Err(Error::Protocol(format!(
1972 "duplicate manifest path: {}",
1973 display_path(&entry.relative_path)
1974 )));
1975 }
1976 if entry.kind == ManifestEntryKind::File && entry.blake3.is_none() {
1977 return Err(Error::Protocol(format!(
1978 "file manifest entry is missing BLAKE3: {}",
1979 display_path(&entry.relative_path)
1980 )));
1981 }
1982 }
1983
1984 Ok(())
1985}
1986
1987fn validate_relative_transfer_path(path: &Path) -> Result<()> {
1988 if path.as_os_str().is_empty() || path.is_absolute() {
1989 return Err(Error::InvalidInput(format!(
1990 "unsafe transfer path: {}",
1991 display_path(path)
1992 )));
1993 }
1994
1995 let mut normal_components = 0usize;
1996 for component in path.components() {
1997 match component {
1998 Component::Normal(value) => {
1999 let Some(name) = value.to_str() else {
2000 return Err(Error::InvalidInput(format!(
2001 "transfer path must be UTF-8: {}",
2002 display_path(path)
2003 )));
2004 };
2005 validate_path_component(name, path)?;
2006 normal_components += 1;
2007 }
2008 Component::CurDir
2009 | Component::ParentDir
2010 | Component::RootDir
2011 | Component::Prefix(_) => {
2012 return Err(Error::InvalidInput(format!(
2013 "unsafe transfer path: {}",
2014 display_path(path)
2015 )));
2016 }
2017 }
2018 }
2019
2020 if normal_components == 0 {
2021 return Err(Error::InvalidInput(format!(
2022 "unsafe transfer path: {}",
2023 display_path(path)
2024 )));
2025 }
2026
2027 Ok(())
2028}
2029
2030fn validate_path_component(component: &str, full_path: &Path) -> Result<()> {
2031 if component.is_empty()
2032 || component.contains('\\')
2033 || component.contains('/')
2034 || component.contains(':')
2035 || component.ends_with(' ')
2036 || component.ends_with('.')
2037 || is_windows_reserved_name(component)
2038 {
2039 return Err(Error::InvalidInput(format!(
2040 "unsafe transfer path: {}",
2041 display_path(full_path)
2042 )));
2043 }
2044
2045 Ok(())
2046}
2047
2048fn is_windows_reserved_name(component: &str) -> bool {
2049 let stem = component
2050 .split('.')
2051 .next()
2052 .unwrap_or(component)
2053 .trim_end_matches([' ', '.'])
2054 .to_ascii_uppercase();
2055 matches!(
2056 stem.as_str(),
2057 "CON"
2058 | "PRN"
2059 | "AUX"
2060 | "NUL"
2061 | "COM1"
2062 | "COM2"
2063 | "COM3"
2064 | "COM4"
2065 | "COM5"
2066 | "COM6"
2067 | "COM7"
2068 | "COM8"
2069 | "COM9"
2070 | "LPT1"
2071 | "LPT2"
2072 | "LPT3"
2073 | "LPT4"
2074 | "LPT5"
2075 | "LPT6"
2076 | "LPT7"
2077 | "LPT8"
2078 | "LPT9"
2079 )
2080}
2081
2082async fn hash_file(path: &Path) -> Result<String> {
2083 hash_file_with_chunks(path, DEFAULT_CHUNK_SIZE)
2084 .await
2085 .map(|(hash, _)| hash)
2086}
2087
2088async fn hash_file_with_chunks(path: &Path, chunk_size: u64) -> Result<(String, Vec<String>)> {
2089 let mut file = tokio::fs::File::open(path)
2090 .await
2091 .map_err(|source| Error::IoPath {
2092 path: path.to_path_buf(),
2093 source,
2094 })?;
2095 let mut hasher = blake3::Hasher::new();
2096 let mut chunk_hasher = blake3::Hasher::new();
2097 let mut chunk_bytes = 0u64;
2098 let mut chunks = Vec::new();
2099 let mut buffer = vec![0; IO_BUFFER_SIZE];
2100
2101 loop {
2102 let read = file
2103 .read(&mut buffer)
2104 .await
2105 .map_err(|source| Error::IoPath {
2106 path: path.to_path_buf(),
2107 source,
2108 })?;
2109 if read == 0 {
2110 break;
2111 }
2112 hasher.update(&buffer[..read]);
2113
2114 let mut remaining = &buffer[..read];
2115 while !remaining.is_empty() {
2116 let take = remaining.len().min((chunk_size - chunk_bytes) as usize);
2117 chunk_hasher.update(&remaining[..take]);
2118 chunk_bytes += take as u64;
2119 remaining = &remaining[take..];
2120 if chunk_bytes == chunk_size {
2121 chunks.push(chunk_hasher.finalize().to_hex().to_string());
2122 chunk_hasher = blake3::Hasher::new();
2123 chunk_bytes = 0;
2124 }
2125 }
2126 }
2127
2128 if chunk_bytes > 0 {
2129 chunks.push(chunk_hasher.finalize().to_hex().to_string());
2130 }
2131
2132 Ok((hasher.finalize().to_hex().to_string(), chunks))
2133}
2134
2135async fn verified_prefix_offset(
2136 path: &Path,
2137 entry: &ManifestEntry,
2138 existing_len: u64,
2139 chunk_size: u64,
2140) -> Result<u64> {
2141 if existing_len == 0 || entry.chunks.is_empty() {
2142 return Ok(0);
2143 }
2144
2145 let chunks_to_check = (existing_len / chunk_size).min(entry.chunks.len() as u64) as usize;
2146 if chunks_to_check == 0 {
2147 return Ok(0);
2148 }
2149
2150 let mut file = tokio::fs::File::open(path)
2151 .await
2152 .map_err(|source| Error::IoPath {
2153 path: path.to_path_buf(),
2154 source,
2155 })?;
2156 let mut buffer = vec![0; IO_BUFFER_SIZE];
2157 let mut verified_chunks = 0usize;
2158
2159 for expected in entry.chunks.iter().take(chunks_to_check) {
2160 let mut remaining = chunk_size;
2161 let mut hasher = blake3::Hasher::new();
2162 while remaining > 0 {
2163 let read_size = buffer.len().min(remaining as usize);
2164 file.read_exact(&mut buffer[..read_size])
2165 .await
2166 .map_err(|source| Error::IoPath {
2167 path: path.to_path_buf(),
2168 source,
2169 })?;
2170 hasher.update(&buffer[..read_size]);
2171 remaining -= read_size as u64;
2172 }
2173
2174 if hasher.finalize().to_hex().as_str() != expected {
2175 break;
2176 }
2177 verified_chunks += 1;
2178 }
2179
2180 Ok(verified_chunks as u64 * chunk_size)
2181}
2182
2183fn summary_from_files(
2184 manifest: &TransferManifest,
2185 files: Vec<TransferFileSummary>,
2186) -> Result<TransferSummary> {
2187 let bytes = manifest.total_file_bytes();
2188 let blake3 = if files.len() == 1 {
2189 files[0].blake3.clone()
2190 } else {
2191 manifest.digest()?
2192 };
2193 let file_name = if files.len() == 1 {
2194 display_path(&files[0].relative_path)
2195 } else {
2196 format!("{} files", files.len())
2197 };
2198 let path = files
2199 .first()
2200 .map(|file| file.path.clone())
2201 .unwrap_or_default();
2202
2203 Ok(TransferSummary {
2204 path,
2205 file_name,
2206 bytes,
2207 blake3,
2208 files,
2209 })
2210}
2211
2212fn temp_path_for(final_path: &Path) -> PathBuf {
2213 let file_name = final_path
2214 .file_name()
2215 .and_then(|name| name.to_str())
2216 .unwrap_or("transfer");
2217 final_path.with_file_name(format!(".{file_name}.{}.ferry-part", Uuid::new_v4()))
2218}
2219
2220fn modified_unix_ms(metadata: &Metadata) -> Option<i128> {
2221 system_time_unix_ms(metadata.modified().ok()?)
2222}
2223
2224fn system_time_unix_ms(time: SystemTime) -> Option<i128> {
2225 match time.duration_since(UNIX_EPOCH) {
2226 Ok(duration) => Some(duration.as_millis() as i128),
2227 Err(error) => Some(-(error.duration().as_millis() as i128)),
2228 }
2229}
2230
2231#[cfg(unix)]
2232fn unix_mode(metadata: &Metadata) -> Option<u32> {
2233 use std::os::unix::fs::PermissionsExt;
2234
2235 Some(metadata.permissions().mode())
2236}
2237
2238#[cfg(not(unix))]
2239fn unix_mode(_metadata: &Metadata) -> Option<u32> {
2240 None
2241}
2242
2243fn insecure_client_config() -> Result<quinn::ClientConfig> {
2244 let crypto = rustls::ClientConfig::builder()
2245 .dangerous()
2246 .with_custom_certificate_verifier(SkipServerVerification::new())
2247 .with_no_client_auth();
2248
2249 Ok(quinn::ClientConfig::new(Arc::new(
2250 QuicClientConfig::try_from(crypto)
2251 .map_err(|error| Error::CryptoConfig(error.to_string()))?,
2252 )))
2253}
2254
2255#[derive(Debug)]
2256struct SkipServerVerification(Arc<CryptoProvider>);
2257
2258impl SkipServerVerification {
2259 fn new() -> Arc<Self> {
2260 Arc::new(Self(Arc::new(rustls::crypto::ring::default_provider())))
2261 }
2262}
2263
2264impl ServerCertVerifier for SkipServerVerification {
2265 fn verify_server_cert(
2266 &self,
2267 _end_entity: &CertificateDer<'_>,
2268 _intermediates: &[CertificateDer<'_>],
2269 _server_name: &ServerName<'_>,
2270 _ocsp: &[u8],
2271 _now: UnixTime,
2272 ) -> std::result::Result<ServerCertVerified, rustls::Error> {
2273 Ok(ServerCertVerified::assertion())
2274 }
2275
2276 fn verify_tls12_signature(
2277 &self,
2278 message: &[u8],
2279 cert: &CertificateDer<'_>,
2280 dss: &DigitallySignedStruct,
2281 ) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
2282 verify_tls12_signature(
2283 message,
2284 cert,
2285 dss,
2286 &self.0.signature_verification_algorithms,
2287 )
2288 }
2289
2290 fn verify_tls13_signature(
2291 &self,
2292 message: &[u8],
2293 cert: &CertificateDer<'_>,
2294 dss: &DigitallySignedStruct,
2295 ) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
2296 verify_tls13_signature(
2297 message,
2298 cert,
2299 dss,
2300 &self.0.signature_verification_algorithms,
2301 )
2302 }
2303
2304 fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
2305 self.0.signature_verification_algorithms.supported_schemes()
2306 }
2307}
2308
2309fn ensure_rustls_provider() {
2310 let _ = rustls::crypto::ring::default_provider().install_default();
2311}
2312
2313pub fn config_dir() -> Result<PathBuf> {
2314 let base_dirs = BaseDirs::new().ok_or(Error::ConfigDirUnavailable)?;
2315 Ok(base_dirs.config_dir().join(CONFIG_DIR_NAME))
2316}
2317
2318fn default_alias() -> String {
2319 std::env::var("HOSTNAME")
2320 .or_else(|_| std::env::var("COMPUTERNAME"))
2321 .ok()
2322 .filter(|value| !value.trim().is_empty())
2323 .unwrap_or_else(|| "fileferry-device".to_string())
2324}
2325
2326fn expand_home_path(path: &str) -> Result<PathBuf> {
2327 if path == "~" {
2328 return Ok(BaseDirs::new()
2329 .ok_or(Error::ConfigDirUnavailable)?
2330 .home_dir()
2331 .to_path_buf());
2332 }
2333
2334 if let Some(rest) = path.strip_prefix("~/") {
2335 return Ok(BaseDirs::new()
2336 .ok_or(Error::ConfigDirUnavailable)?
2337 .home_dir()
2338 .join(rest));
2339 }
2340
2341 Ok(PathBuf::from(path))
2342}
2343
2344fn normalized_fingerprint(fingerprint: String) -> Result<String> {
2345 let fingerprint = fingerprint.trim().to_ascii_lowercase();
2346 if fingerprint.len() != 64 || !fingerprint.chars().all(|char| char.is_ascii_hexdigit()) {
2347 return Err(Error::InvalidInput(
2348 "peer fingerprint must be a full 64-character hex BLAKE3 fingerprint".to_string(),
2349 ));
2350 }
2351
2352 Ok(fingerprint)
2353}
2354
2355fn write_toml_file<T: Serialize>(path: &Path, value: &T) -> Result<()> {
2356 let bytes =
2357 toml::to_string_pretty(value).map_err(|error| Error::ConfigSerialize(error.to_string()))?;
2358 if let Some(parent) = path.parent() {
2359 std::fs::create_dir_all(parent).map_err(|source| Error::IoPath {
2360 path: parent.to_path_buf(),
2361 source,
2362 })?;
2363 }
2364
2365 let temp_path = path.with_extension(format!("tmp-{}", Uuid::new_v4()));
2366 std::fs::write(&temp_path, bytes).map_err(|source| Error::IoPath {
2367 path: temp_path.clone(),
2368 source,
2369 })?;
2370 std::fs::rename(&temp_path, path).map_err(|source| Error::IoPath {
2371 path: path.to_path_buf(),
2372 source,
2373 })?;
2374 Ok(())
2375}
2376
2377fn write_private_file(path: &Path, bytes: &[u8]) -> Result<()> {
2378 std::fs::write(path, bytes).map_err(|source| Error::IoPath {
2379 path: path.to_path_buf(),
2380 source,
2381 })?;
2382
2383 #[cfg(unix)]
2384 {
2385 use std::os::unix::fs::PermissionsExt;
2386
2387 let mut permissions = std::fs::metadata(path)
2388 .map_err(|source| Error::IoPath {
2389 path: path.to_path_buf(),
2390 source,
2391 })?
2392 .permissions();
2393 permissions.set_mode(0o600);
2394 std::fs::set_permissions(path, permissions).map_err(|source| Error::IoPath {
2395 path: path.to_path_buf(),
2396 source,
2397 })?;
2398 }
2399
2400 Ok(())
2401}
2402
2403#[cfg(test)]
2404mod tests {
2405 use super::*;
2406 use std::sync::{Arc, Mutex};
2407
2408 #[test]
2409 fn direct_peer_parses_socket_address() {
2410 let peer = DirectPeer::parse("127.0.0.1:53318").expect("address parses");
2411
2412 assert_eq!(peer.address().to_string(), "127.0.0.1:53318");
2413 }
2414
2415 #[test]
2416 fn direct_peer_rejects_missing_port() {
2417 assert!(DirectPeer::parse("127.0.0.1").is_err());
2418 }
2419
2420 #[test]
2421 fn send_request_requires_at_least_one_path() {
2422 let peer = DirectPeer::parse("127.0.0.1:53318").expect("address parses");
2423
2424 assert!(SendRequest::new(peer, Vec::new()).is_err());
2425 }
2426
2427 #[test]
2428 fn validate_send_paths_rejects_empty_slice() {
2429 assert!(validate_send_paths(&[]).is_err());
2430 }
2431
2432 #[tokio::test]
2433 async fn manifest_walks_directory_and_serializes_entries() {
2434 let temp = tempfile::tempdir().expect("tempdir");
2435 let root = temp.path().join("bundle");
2436 let nested = root.join("nested");
2437 tokio::fs::create_dir_all(&nested).await.expect("dirs");
2438 tokio::fs::write(root.join("a.txt"), b"alpha")
2439 .await
2440 .expect("file a");
2441 tokio::fs::write(nested.join("b.txt"), b"beta")
2442 .await
2443 .expect("file b");
2444
2445 let manifest = build_manifest(&[root]).await.expect("manifest");
2446 let json = serde_json::to_string(&manifest).expect("manifest serializes");
2447 let decoded: TransferManifest = serde_json::from_str(&json).expect("manifest decodes");
2448
2449 assert_eq!(decoded.version, MANIFEST_VERSION);
2450 assert!(decoded.entries.iter().any(|entry| {
2451 entry.kind == ManifestEntryKind::Directory
2452 && entry.relative_path == Path::new("bundle/nested")
2453 }));
2454 assert!(decoded.entries.iter().any(|entry| {
2455 entry.kind == ManifestEntryKind::File
2456 && entry.relative_path == Path::new("bundle/nested/b.txt")
2457 && entry.size == 4
2458 && entry.blake3.is_some()
2459 }));
2460 }
2461
2462 #[test]
2463 fn safe_destination_path_rejects_unsafe_paths() {
2464 let dest = Path::new("/tmp/ferry-dest");
2465
2466 assert!(safe_destination_path(dest, Path::new("../escape.txt")).is_err());
2467 assert!(safe_destination_path(dest, Path::new("/absolute.txt")).is_err());
2468 assert!(safe_destination_path(dest, Path::new("nested/CON")).is_err());
2469 assert!(safe_destination_path(dest, Path::new("nested\\escape.txt")).is_err());
2470 assert!(safe_destination_path(dest, Path::new("nested/ok.txt")).is_ok());
2471 }
2472
2473 #[test]
2474 fn safe_destination_path_rejects_generated_escape_and_reserved_patterns() {
2475 let dest = Path::new("/tmp/ferry-dest");
2476 let unsafe_components = [
2477 "..",
2478 ".",
2479 "CON",
2480 "con.txt",
2481 "NUL",
2482 "COM1",
2483 "LPT9",
2484 "trailingspace ",
2485 "trailingdot.",
2486 "has:colon",
2487 "has\\backslash",
2488 ];
2489
2490 for component in unsafe_components {
2491 assert!(
2492 safe_destination_path(dest, Path::new(component)).is_err(),
2493 "{component} should be rejected as a top-level path"
2494 );
2495 if component != "." {
2496 assert!(
2497 safe_destination_path(dest, Path::new("safe").join(component).as_path())
2498 .is_err(),
2499 "{component} should be rejected as a nested path"
2500 );
2501 }
2502 }
2503
2504 for component in ["alpha", "alpha-1", "alpha_1", "report.final.txt"] {
2505 let path = safe_destination_path(dest, Path::new("safe").join(component).as_path())
2506 .expect("safe generated path accepted");
2507 assert!(path.starts_with(dest));
2508 }
2509 }
2510
2511 #[tokio::test]
2512 async fn resume_decision_skips_existing_matching_file() {
2513 let temp = tempfile::tempdir().expect("tempdir");
2514 let source = temp.path().join("source.txt");
2515 let dest = temp.path().join("dest");
2516 tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2517 tokio::fs::write(&source, b"same contents")
2518 .await
2519 .expect("source");
2520 tokio::fs::write(dest.join("source.txt"), b"same contents")
2521 .await
2522 .expect("dest file");
2523 let manifest = build_manifest(&[source]).await.expect("manifest");
2524 let entry = manifest.file_entries().next().expect("file entry");
2525
2526 let decision = decide_resume(&dest, entry, manifest.chunk_size)
2527 .await
2528 .expect("decision");
2529
2530 assert_eq!(decision, ResumeDecision::Skip);
2531 }
2532
2533 #[tokio::test]
2534 async fn resume_decision_returns_verified_chunk_offset() {
2535 let temp = tempfile::tempdir().expect("tempdir");
2536 let source = temp.path().join("large.bin");
2537 let dest = temp.path().join("dest");
2538 tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2539 let bytes = vec![42; (DEFAULT_CHUNK_SIZE as usize * 2) + 128];
2540 tokio::fs::write(&source, &bytes).await.expect("source");
2541 tokio::fs::write(
2542 dest.join("large.bin"),
2543 &bytes[..DEFAULT_CHUNK_SIZE as usize + 99],
2544 )
2545 .await
2546 .expect("partial");
2547 let manifest = build_manifest(&[source]).await.expect("manifest");
2548 let entry = manifest.file_entries().next().expect("file entry");
2549
2550 let decision = decide_resume(&dest, entry, manifest.chunk_size)
2551 .await
2552 .expect("decision");
2553
2554 assert_eq!(decision, ResumeDecision::ResumeFrom(DEFAULT_CHUNK_SIZE));
2555 }
2556
2557 #[tokio::test]
2558 async fn resume_decision_rejects_existing_file_with_mismatched_contents() {
2559 let temp = tempfile::tempdir().expect("tempdir");
2560 let source = temp.path().join("source.txt");
2561 let dest = temp.path().join("dest");
2562 tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2563 tokio::fs::write(&source, b"expected contents")
2564 .await
2565 .expect("source");
2566 tokio::fs::write(dest.join("source.txt"), b"different contents")
2567 .await
2568 .expect("conflicting dest file");
2569 let manifest = build_manifest(&[source]).await.expect("manifest");
2570 let entry = manifest.file_entries().next().expect("file entry");
2571
2572 let decision = decide_resume(&dest, entry, manifest.chunk_size)
2573 .await
2574 .expect("decision");
2575
2576 assert!(matches!(decision, ResumeDecision::Conflict(_)));
2577 }
2578
2579 #[tokio::test]
2580 async fn resume_decision_rejects_existing_file_larger_than_manifest_entry() {
2581 let temp = tempfile::tempdir().expect("tempdir");
2582 let source = temp.path().join("source.txt");
2583 let dest = temp.path().join("dest");
2584 tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2585 tokio::fs::write(&source, b"small").await.expect("source");
2586 tokio::fs::write(dest.join("source.txt"), b"larger destination")
2587 .await
2588 .expect("larger dest file");
2589 let manifest = build_manifest(&[source]).await.expect("manifest");
2590 let entry = manifest.file_entries().next().expect("file entry");
2591
2592 let decision = decide_resume(&dest, entry, manifest.chunk_size)
2593 .await
2594 .expect("decision");
2595
2596 assert!(matches!(decision, ResumeDecision::Conflict(_)));
2597 }
2598
2599 #[test]
2600 fn identity_is_loaded_after_generation() {
2601 let temp = tempfile::tempdir().expect("tempdir");
2602 let generated =
2603 NativeIdentity::load_or_generate_in(temp.path()).expect("identity generated");
2604 let loaded = NativeIdentity::load_or_generate_in(temp.path()).expect("identity loaded");
2605
2606 assert_eq!(generated.fingerprint(), loaded.fingerprint());
2607 }
2608
2609 #[test]
2610 fn app_config_round_trips_toml() {
2611 let temp = tempfile::tempdir().expect("tempdir");
2612 let path = temp.path().join("config.toml");
2613 let config = AppConfig {
2614 alias: "desk".to_string(),
2615 ..AppConfig::default()
2616 };
2617
2618 config.save_to_path(&path).expect("config saved");
2619 let loaded = AppConfig::load_or_default_from(&path).expect("config loaded");
2620
2621 assert_eq!(loaded.alias, "desk");
2622 assert_eq!(loaded.listen_port, 53317);
2623 assert!(loaded.trust.require_fingerprint);
2624 }
2625
2626 #[test]
2627 fn app_config_redacts_psk_for_display_output() {
2628 let config = AppConfig {
2629 trust: TrustConfig {
2630 psk: "super-secret-psk".to_string(),
2631 ..TrustConfig::default()
2632 },
2633 ..AppConfig::default()
2634 };
2635
2636 let redacted = config.to_redacted_toml_string().expect("config serializes");
2637
2638 assert!(!redacted.contains("super-secret-psk"));
2639 assert!(redacted.contains(r#"psk = "<redacted>""#));
2640 }
2641
2642 #[test]
2643 fn daemon_config_defaults_from_app_config_and_round_trips_toml() {
2644 let temp = tempfile::tempdir().expect("tempdir");
2645 let path = temp.path().join("daemon.toml");
2646 let app_config = AppConfig {
2647 quic_port: 54444,
2648 download_dir: temp.path().join("downloads").display().to_string(),
2649 ..AppConfig::default()
2650 };
2651
2652 let defaulted =
2653 DaemonConfig::load_or_default_from(&path, &app_config).expect("daemon config");
2654 assert_eq!(
2655 defaulted.listen,
2656 SocketAddr::from(([0, 0, 0, 0], app_config.quic_port))
2657 );
2658 assert_eq!(defaulted.destination, temp.path().join("downloads"));
2659
2660 let config = DaemonConfig {
2661 listen: SocketAddr::from(([127, 0, 0, 1], 53318)),
2662 destination: temp.path().join("daemon-dest"),
2663 };
2664 config.save_to_path(&path).expect("daemon config saved");
2665 let loaded =
2666 DaemonConfig::load_or_default_from(&path, &app_config).expect("daemon config loaded");
2667
2668 assert_eq!(loaded, config);
2669 }
2670
2671 #[test]
2672 fn trust_store_load_save_and_forget_round_trip() {
2673 let temp = tempfile::tempdir().expect("tempdir");
2674 let path = temp.path().join("known_peers.toml");
2675 let fingerprint = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
2676 let mut store = TrustStore::default();
2677
2678 store
2679 .trust_fingerprint(fingerprint)
2680 .expect("fingerprint trusted");
2681 store.save_to_path(&path).expect("trust store saved");
2682 let mut loaded = TrustStore::load_or_default_from(&path).expect("trust store loaded");
2683
2684 assert_eq!(loaded.records().count(), 1);
2685 assert_eq!(loaded.peers[fingerprint].trust_state, TrustState::Trusted);
2686 assert!(loaded.forget(fingerprint));
2687 assert_eq!(loaded.records().count(), 0);
2688 }
2689
2690 #[test]
2691 fn trust_store_rejects_short_fingerprint_without_discovery_lookup() {
2692 let mut store = TrustStore::default();
2693
2694 assert!(store.trust_fingerprint("9a4f2c").is_err());
2695 }
2696
2697 #[test]
2698 fn transfer_events_serialize_with_stable_event_names() {
2699 let event = TransferEvent::Progress {
2700 direction: TransferDirection::Send,
2701 file_name: "bundle/a.txt".to_string(),
2702 bytes_done: 5,
2703 bytes_total: 9,
2704 };
2705
2706 let json = serde_json::to_value(&event).expect("event serializes");
2707
2708 assert_eq!(json["event"], "progress");
2709 assert_eq!(json["direction"], "send");
2710 assert_eq!(json["file_name"], "bundle/a.txt");
2711 assert_eq!(json["bytes_done"], 5);
2712 assert_eq!(json["bytes_total"], 9);
2713 }
2714
2715 #[test]
2716 fn direct_protocol_hello_has_stable_golden_json() {
2717 let hello = DirectProtocolHello {
2718 protocol_version: 1,
2719 min_protocol_version: 1,
2720 client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2721 .to_string(),
2722 };
2723
2724 let json = serde_json::to_string(&hello).expect("hello serializes");
2725
2726 assert_eq!(
2727 json,
2728 r#"{"protocol_version":1,"min_protocol_version":1,"client_fingerprint":"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"}"#
2729 );
2730 }
2731
2732 #[test]
2733 fn direct_protocol_response_has_stable_golden_json() {
2734 let response = DirectProtocolResponse::accept(1);
2735
2736 let json = serde_json::to_string(&response).expect("response serializes");
2737
2738 assert_eq!(
2739 json,
2740 r#"{"accepted":true,"protocol_version":1,"error":null}"#
2741 );
2742 }
2743
2744 #[test]
2745 fn direct_protocol_negotiates_supported_overlap() {
2746 let hello = DirectProtocolHello {
2747 protocol_version: 3,
2748 min_protocol_version: 1,
2749 client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2750 .to_string(),
2751 };
2752
2753 let response = negotiate_direct_protocol(&hello);
2754
2755 assert_eq!(response, DirectProtocolResponse::accept(1));
2756 }
2757
2758 #[test]
2759 fn direct_protocol_rejects_incompatible_versions() {
2760 let too_new = DirectProtocolHello {
2761 protocol_version: 3,
2762 min_protocol_version: 2,
2763 client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2764 .to_string(),
2765 };
2766 let too_old = DirectProtocolHello {
2767 protocol_version: 0,
2768 min_protocol_version: 0,
2769 client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2770 .to_string(),
2771 };
2772
2773 assert!(!negotiate_direct_protocol(&too_new).accepted);
2774 assert!(!negotiate_direct_protocol(&too_old).accepted);
2775 }
2776
2777 #[test]
2778 fn transfer_manifest_has_stable_golden_json() {
2779 let manifest = TransferManifest {
2780 version: MANIFEST_VERSION,
2781 session_id: Uuid::parse_str("00000000-0000-0000-0000-000000000001")
2782 .expect("uuid parses"),
2783 chunk_size: DEFAULT_CHUNK_SIZE,
2784 entries: vec![ManifestEntry {
2785 relative_path: PathBuf::from("bundle/a.txt"),
2786 kind: ManifestEntryKind::File,
2787 size: 5,
2788 blake3: Some(
2789 "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef".to_string(),
2790 ),
2791 chunks: vec![
2792 "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789".to_string(),
2793 ],
2794 unix_mode: Some(0o100644),
2795 modified_unix_ms: Some(1_700_000_000_000),
2796 }],
2797 };
2798
2799 let json = serde_json::to_string(&manifest).expect("manifest serializes");
2800
2801 assert_eq!(
2802 json,
2803 r#"{"version":1,"session_id":"00000000-0000-0000-0000-000000000001","chunk_size":1048576,"entries":[{"relative_path":"bundle/a.txt","kind":"file","size":5,"blake3":"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef","chunks":["abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789"],"unix_mode":33188,"modified_unix_ms":1700000000000}]}"#
2804 );
2805 }
2806
2807 #[test]
2808 fn peer_registry_merges_observations_by_fingerprint() {
2809 let mut registry = PeerRegistry::new();
2810 let fingerprint = "abcdef1234567890";
2811 let first = PeerObservation::new(
2812 fingerprint,
2813 SocketAddr::from(([192, 168, 1, 10], 53318)),
2814 100,
2815 )
2816 .with_alias("desk")
2817 .with_hostname("desk.local")
2818 .with_transport("quic");
2819 let second = PeerObservation::new(
2820 fingerprint,
2821 SocketAddr::from(([192, 168, 1, 11], 53318)),
2822 200,
2823 )
2824 .with_alias("workstation")
2825 .with_transport("quic");
2826
2827 registry.observe(first).expect("first observation");
2828 registry.observe(second).expect("second observation");
2829 let record = registry
2830 .get_by_fingerprint(fingerprint)
2831 .expect("record by fingerprint");
2832
2833 assert_eq!(registry.records().count(), 1);
2834 assert!(record.aliases.contains("desk"));
2835 assert!(record.aliases.contains("workstation"));
2836 assert!(record.hostnames.contains("desk.local"));
2837 assert_eq!(record.addresses.len(), 2);
2838 assert_eq!(record.first_seen_unix_ms, 100);
2839 assert_eq!(record.last_seen_unix_ms, 200);
2840 }
2841
2842 #[test]
2843 fn peer_registry_looks_up_unique_fingerprint_prefix_alias_and_hostname() {
2844 let mut registry = PeerRegistry::new();
2845 registry
2846 .observe(
2847 PeerObservation::new(
2848 "abcdef1234567890",
2849 SocketAddr::from(([192, 168, 1, 10], 53318)),
2850 100,
2851 )
2852 .with_alias("desk")
2853 .with_hostname("desk.local"),
2854 )
2855 .expect("first observation");
2856 registry
2857 .observe(
2858 PeerObservation::new(
2859 "123456abcdef7890",
2860 SocketAddr::from(([192, 168, 1, 20], 53318)),
2861 100,
2862 )
2863 .with_alias("laptop"),
2864 )
2865 .expect("second observation");
2866
2867 assert_eq!(
2868 registry
2869 .lookup("abcdef")
2870 .map(|record| record.fingerprint.as_str()),
2871 Some("abcdef1234567890")
2872 );
2873 assert_eq!(
2874 registry
2875 .lookup("desk")
2876 .map(|record| record.fingerprint.as_str()),
2877 Some("abcdef1234567890")
2878 );
2879 assert_eq!(
2880 registry
2881 .lookup("desk.local")
2882 .map(|record| record.fingerprint.as_str()),
2883 Some("abcdef1234567890")
2884 );
2885 assert!(registry.lookup("missing").is_none());
2886 }
2887
2888 #[test]
2889 fn peer_registry_rejects_ambiguous_lookup_hints() {
2890 let mut registry = PeerRegistry::new();
2891 registry
2892 .observe(
2893 PeerObservation::new(
2894 "abcdef1234567890",
2895 SocketAddr::from(([192, 168, 1, 10], 53318)),
2896 100,
2897 )
2898 .with_alias("desk"),
2899 )
2900 .expect("first observation");
2901 registry
2902 .observe(
2903 PeerObservation::new(
2904 "abcdef9999999999",
2905 SocketAddr::from(([192, 168, 1, 11], 53318)),
2906 100,
2907 )
2908 .with_alias("desk"),
2909 )
2910 .expect("second observation");
2911
2912 assert!(registry.lookup("abcdef").is_none());
2913 assert!(registry.lookup("desk").is_none());
2914 assert_eq!(
2915 registry
2916 .lookup("abcdef1234567890")
2917 .map(|record| record.fingerprint.as_str()),
2918 Some("abcdef1234567890")
2919 );
2920 match registry.lookup_detail("desk") {
2921 PeerLookup::Ambiguous(records) => assert_eq!(records.len(), 2),
2922 other => panic!("expected ambiguous duplicate-alias lookup, got {other:?}"),
2923 }
2924 assert!(matches!(
2925 registry.lookup_detail("missing"),
2926 PeerLookup::Missing
2927 ));
2928 }
2929
2930 #[test]
2931 fn native_announcement_builds_expected_txt_properties() {
2932 let announcement = NativeAnnouncement {
2933 alias: "Stephen MBP".to_string(),
2934 fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2935 .to_string(),
2936 quic_port: 53318,
2937 listen_port: Some(53317),
2938 };
2939
2940 let service = announcement.service_info().expect("service info");
2941
2942 assert_eq!(service.get_type(), NATIVE_SERVICE_TYPE);
2943 assert!(
2944 service
2945 .get_fullname()
2946 .starts_with("stephen-mbp-0123456789ab.")
2947 );
2948 assert_eq!(service.get_property_val_str("pv"), Some("1"));
2949 assert_eq!(service.get_property_val_str("minpv"), Some("1"));
2950 assert_eq!(
2951 service.get_property_val_str("fp"),
2952 Some("0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")
2953 );
2954 assert_eq!(service.get_property_val_str("alias"), Some("Stephen MBP"));
2955 assert_eq!(service.get_property_val_str("transports"), Some("quic"));
2956 assert_eq!(service.get_property_val_str("quic_port"), Some("53318"));
2957 assert_eq!(service.get_property_val_str("listen_port"), Some("53317"));
2958 }
2959
2960 #[test]
2961 fn resolved_native_service_merges_into_registry() {
2962 let properties = [
2963 ("pv", "1"),
2964 ("minpv", "1"),
2965 (
2966 "fp",
2967 "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
2968 ),
2969 ("alias", "desk"),
2970 ("transports", "quic"),
2971 ("quic_port", "53318"),
2972 ];
2973 let service = ServiceInfo::new(
2974 NATIVE_SERVICE_TYPE,
2975 "desk-0123456789ab",
2976 "desk.local.",
2977 "192.168.1.42",
2978 53318,
2979 &properties[..],
2980 )
2981 .expect("service info")
2982 .as_resolved_service();
2983 let mut registry = PeerRegistry::new();
2984
2985 observe_resolved_service(&mut registry, &service).expect("observation");
2986
2987 let record = registry.lookup("desk").expect("record by alias");
2988 assert_eq!(
2989 record.fingerprint,
2990 "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2991 );
2992 assert_eq!(
2993 record.preferred_quic_address(),
2994 Some(SocketAddr::from(([192, 168, 1, 42], 53318)))
2995 );
2996 }
2997
2998 #[test]
2999 fn resolved_native_service_ignores_incompatible_protocol_ranges() {
3000 let properties = [
3001 ("pv", "3"),
3002 ("minpv", "2"),
3003 (
3004 "fp",
3005 "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
3006 ),
3007 ("alias", "desk"),
3008 ("transports", "quic"),
3009 ("quic_port", "53318"),
3010 ];
3011 let service = ServiceInfo::new(
3012 NATIVE_SERVICE_TYPE,
3013 "desk-0123456789ab",
3014 "desk.local.",
3015 "192.168.1.42",
3016 53318,
3017 &properties[..],
3018 )
3019 .expect("service info")
3020 .as_resolved_service();
3021 let mut registry = PeerRegistry::new();
3022
3023 observe_resolved_service(&mut registry, &service).expect("observation");
3024
3025 assert_eq!(registry.records().count(), 0);
3026 }
3027
3028 #[tokio::test]
3029 async fn direct_quic_loopback_sends_one_file() {
3030 let source_dir = tempfile::tempdir().expect("source tempdir");
3031 let dest_dir = tempfile::tempdir().expect("dest tempdir");
3032 let identity_dir = tempfile::tempdir().expect("identity tempdir");
3033 let file_path = source_dir.path().join("hello.txt");
3034 tokio::fs::write(&file_path, b"hello from ferry")
3035 .await
3036 .expect("source file written");
3037
3038 let identity =
3039 NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3040 let recv_identity = identity.clone();
3041 let reserved_socket =
3042 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3043 let recv_addr = reserved_socket.local_addr().expect("local addr");
3044 drop(reserved_socket);
3045 let receive_progress = Arc::new(Mutex::new(Vec::new()));
3046 let receive_progress_log = receive_progress.clone();
3047 let dest_path = dest_dir.path().to_path_buf();
3048 let server = tokio::spawn(async move {
3049 receive_direct_file(
3050 &ReceiveRequest::new(recv_addr),
3051 dest_path,
3052 &recv_identity,
3053 |event| {
3054 receive_progress_log
3055 .lock()
3056 .expect("progress lock")
3057 .push(event);
3058 },
3059 )
3060 .await
3061 });
3062
3063 tokio::time::sleep(Duration::from_millis(100)).await;
3064
3065 let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3066 let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
3067 let send_progress = Arc::new(Mutex::new(Vec::new()));
3068 let send_progress_log = send_progress.clone();
3069 let send_summary = send_direct_file(&send_request, &identity, |event| {
3070 send_progress_log.lock().expect("progress lock").push(event);
3071 })
3072 .await
3073 .expect("file sent");
3074 let receive_summary = server.await.expect("server joined").expect("file received");
3075
3076 let received = tokio::fs::read(dest_dir.path().join("hello.txt"))
3077 .await
3078 .expect("received file readable");
3079 assert_eq!(received, b"hello from ferry");
3080 assert_eq!(send_summary.bytes, receive_summary.bytes);
3081 assert_eq!(send_summary.blake3, receive_summary.blake3);
3082 let send_progress = send_progress.lock().expect("progress lock");
3083 let receive_progress = receive_progress.lock().expect("progress lock");
3084 assert!(matches!(
3085 send_progress.first(),
3086 Some(TransferEvent::SessionStarted {
3087 direction: TransferDirection::Send,
3088 ..
3089 })
3090 ));
3091 assert!(
3092 send_progress
3093 .iter()
3094 .any(|event| matches!(event, TransferEvent::Progress { .. }))
3095 );
3096 assert!(
3097 send_progress
3098 .iter()
3099 .any(|event| matches!(event, TransferEvent::SessionFinished { .. }))
3100 );
3101 assert!(matches!(
3102 receive_progress.first(),
3103 Some(TransferEvent::SessionStarted {
3104 direction: TransferDirection::Receive,
3105 ..
3106 })
3107 ));
3108 assert!(
3109 receive_progress
3110 .iter()
3111 .any(|event| matches!(event, TransferEvent::Progress { .. }))
3112 );
3113 assert!(
3114 receive_progress
3115 .iter()
3116 .any(|event| matches!(event, TransferEvent::SessionFinished { .. }))
3117 );
3118 }
3119
3120 #[tokio::test]
3121 async fn direct_quic_loopback_sends_directory_and_resumes_partial_file() {
3122 let source_dir = tempfile::tempdir().expect("source tempdir");
3123 let dest_dir = tempfile::tempdir().expect("dest tempdir");
3124 let identity_dir = tempfile::tempdir().expect("identity tempdir");
3125 let bundle = source_dir.path().join("bundle");
3126 let nested = bundle.join("nested");
3127 tokio::fs::create_dir_all(&nested)
3128 .await
3129 .expect("source dirs");
3130 tokio::fs::write(nested.join("small.txt"), b"small")
3131 .await
3132 .expect("small file");
3133 let large_bytes = vec![7; (DEFAULT_CHUNK_SIZE as usize * 2) + 17];
3134 tokio::fs::write(bundle.join("large.bin"), &large_bytes)
3135 .await
3136 .expect("large file");
3137
3138 let dest_bundle = dest_dir.path().join("bundle");
3139 tokio::fs::create_dir_all(&dest_bundle)
3140 .await
3141 .expect("dest bundle");
3142 tokio::fs::write(
3143 dest_bundle.join("large.bin"),
3144 &large_bytes[..DEFAULT_CHUNK_SIZE as usize + 5],
3145 )
3146 .await
3147 .expect("partial large");
3148
3149 let identity =
3150 NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3151 let recv_identity = identity.clone();
3152 let reserved_socket =
3153 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3154 let recv_addr = reserved_socket.local_addr().expect("local addr");
3155 drop(reserved_socket);
3156 let dest_path = dest_dir.path().to_path_buf();
3157 let server = tokio::spawn(async move {
3158 receive_direct_file(
3159 &ReceiveRequest::new(recv_addr),
3160 dest_path,
3161 &recv_identity,
3162 |_| {},
3163 )
3164 .await
3165 });
3166
3167 tokio::time::sleep(Duration::from_millis(100)).await;
3168
3169 let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3170 let send_request = SendRequest::new(peer, vec![bundle]).expect("send request is valid");
3171 let send_summary = send_direct_file(&send_request, &identity, |_| {})
3172 .await
3173 .expect("directory sent");
3174 let receive_summary = server
3175 .await
3176 .expect("server joined")
3177 .expect("directory received");
3178
3179 let received_large = tokio::fs::read(dest_bundle.join("large.bin"))
3180 .await
3181 .expect("large file readable");
3182 let received_small = tokio::fs::read(dest_bundle.join("nested/small.txt"))
3183 .await
3184 .expect("small file readable");
3185 assert_eq!(received_large, large_bytes);
3186 assert_eq!(received_small, b"small");
3187 assert_eq!(send_summary.bytes, receive_summary.bytes);
3188 assert!(
3189 receive_summary
3190 .files
3191 .iter()
3192 .any(|file| file.status == TransferFileStatus::Resumed)
3193 );
3194 }
3195
3196 #[tokio::test]
3197 async fn direct_quic_loopback_cancels_send_and_does_not_finalize_partial_file() {
3198 let source_dir = tempfile::tempdir().expect("source tempdir");
3199 let dest_dir = tempfile::tempdir().expect("dest tempdir");
3200 let identity_dir = tempfile::tempdir().expect("identity tempdir");
3201 let file_path = source_dir.path().join("payload.bin");
3202 tokio::fs::write(&file_path, vec![3; IO_BUFFER_SIZE * 64])
3203 .await
3204 .expect("source file written");
3205
3206 let identity =
3207 NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3208 let recv_identity = identity.clone();
3209 let reserved_socket =
3210 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3211 let recv_addr = reserved_socket.local_addr().expect("local addr");
3212 drop(reserved_socket);
3213
3214 let dest_path = dest_dir.path().to_path_buf();
3215 let server = tokio::spawn(async move {
3216 receive_direct_file(
3217 &ReceiveRequest::new(recv_addr),
3218 dest_path,
3219 &recv_identity,
3220 |_| {},
3221 )
3222 .await
3223 });
3224
3225 tokio::time::sleep(Duration::from_millis(100)).await;
3226
3227 let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3228 let send_request =
3229 SendRequest::new(peer, vec![file_path.clone()]).expect("send request is valid");
3230 let send_control = TransferControl::new();
3231 let cancel_on_progress = send_control.clone();
3232 let send_events = Arc::new(Mutex::new(Vec::new()));
3233 let send_events_log = send_events.clone();
3234 let send_result =
3235 send_direct_file_with_control(&send_request, &identity, send_control, |event| {
3236 if matches!(event, TransferEvent::Progress { .. }) {
3237 cancel_on_progress.cancel();
3238 }
3239 send_events_log.lock().expect("events lock").push(event);
3240 })
3241 .await;
3242
3243 assert!(matches!(send_result, Err(Error::TransferCancelled)));
3244
3245 let receive_result = tokio::time::timeout(Duration::from_secs(5), server)
3246 .await
3247 .expect("receiver should finish after sender cancellation")
3248 .expect("server joined");
3249 assert!(receive_result.is_err());
3250
3251 assert!(!dest_dir.path().join("payload.bin").exists());
3252 assert!(
3253 send_events
3254 .lock()
3255 .expect("events lock")
3256 .iter()
3257 .any(|event| matches!(event, TransferEvent::SessionCancelled { .. }))
3258 );
3259 }
3260}