1pub mod error;
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::fs::Metadata;
5use std::io::SeekFrom;
6use std::net::SocketAddr;
7use std::path::{Component, Path, PathBuf};
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::{Arc, Mutex};
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12use directories::BaseDirs;
13use mdns_sd::{ResolvedService, ServiceDaemon, ServiceEvent, ServiceInfo};
14use quinn::crypto::rustls::QuicClientConfig;
15use rcgen::{CertifiedKey, generate_simple_self_signed};
16use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier};
17use rustls::crypto::{CryptoProvider, verify_tls12_signature, verify_tls13_signature};
18use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer, ServerName, UnixTime};
19use rustls::{DigitallySignedStruct, SignatureScheme};
20use serde::{Deserialize, Serialize};
21use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
22use uuid::Uuid;
23
24pub use error::{Error, Result};
25
26pub const NATIVE_PROTOCOL_VERSION: u16 = 1;
27pub const MIN_NATIVE_PROTOCOL_VERSION: u16 = 1;
28pub const NATIVE_SERVICE_TYPE: &str = "_ferry._udp.local.";
29
30const DIRECT_MAGIC: &[u8; 8] = b"FERRY01\n";
31const MAX_FRAME_LEN: usize = 16 * 1024 * 1024;
32const IO_BUFFER_SIZE: usize = 64 * 1024;
33const MANIFEST_VERSION: u16 = 1;
34const DEFAULT_CHUNK_SIZE: u64 = 1024 * 1024;
35const CONFIG_DIR_NAME: &str = "ferry";
36
37#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
38pub struct DirectPeer {
39 address: SocketAddr,
40 expected_fingerprint: Option<String>,
41}
42
43impl DirectPeer {
44 pub fn parse(value: &str) -> Result<Self> {
45 Ok(Self {
46 address: value.parse()?,
47 expected_fingerprint: None,
48 })
49 }
50
51 pub const fn address(&self) -> SocketAddr {
52 self.address
53 }
54
55 pub const fn from_address(address: SocketAddr) -> Self {
56 Self {
57 address,
58 expected_fingerprint: None,
59 }
60 }
61
62 pub fn with_expected_fingerprint(mut self, fingerprint: impl Into<String>) -> Result<Self> {
63 self.expected_fingerprint = Some(normalized_fingerprint(fingerprint.into())?);
64 Ok(self)
65 }
66
67 pub fn expected_fingerprint(&self) -> Option<&str> {
68 self.expected_fingerprint.as_deref()
69 }
70}
71
72#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
73pub struct SendRequest {
74 peer: DirectPeer,
75 paths: Vec<PathBuf>,
76}
77
78impl SendRequest {
79 pub fn new(peer: DirectPeer, paths: Vec<PathBuf>) -> Result<Self> {
80 if paths.is_empty() {
81 return Err(Error::InvalidInput(
82 "at least one file path is required".to_string(),
83 ));
84 }
85
86 Ok(Self { peer, paths })
87 }
88
89 pub const fn peer(&self) -> &DirectPeer {
90 &self.peer
91 }
92
93 pub fn paths(&self) -> &[PathBuf] {
94 &self.paths
95 }
96}
97
98#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
99pub struct ReceiveRequest {
100 listen: SocketAddr,
101}
102
103impl ReceiveRequest {
104 pub fn new(listen: SocketAddr) -> Self {
105 Self { listen }
106 }
107
108 pub const fn listen(&self) -> SocketAddr {
109 self.listen
110 }
111}
112
113#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
114pub struct AppConfig {
115 pub alias: String,
116 pub listen_port: u16,
117 pub quic_port: u16,
118 pub download_dir: String,
119 pub auto_accept_known: bool,
120 pub auto_accept_unknown: bool,
121 pub discovery: Vec<String>,
122 pub max_concurrent_files: usize,
123 pub trust: TrustConfig,
124}
125
126impl Default for AppConfig {
127 fn default() -> Self {
128 Self {
129 alias: default_alias(),
130 listen_port: 53317,
131 quic_port: 53318,
132 download_dir: "~/Downloads/ferry".to_string(),
133 auto_accept_known: true,
134 auto_accept_unknown: false,
135 discovery: vec!["native".to_string()],
136 max_concurrent_files: 8,
137 trust: TrustConfig::default(),
138 }
139 }
140}
141
142impl AppConfig {
143 pub fn load_or_default() -> Result<Self> {
144 Self::load_or_default_from(config_dir()?.join("config.toml"))
145 }
146
147 pub fn load_or_default_from(path: impl AsRef<Path>) -> Result<Self> {
148 let path = path.as_ref();
149 match std::fs::read_to_string(path) {
150 Ok(contents) => {
151 toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
152 }
153 Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Self::default()),
154 Err(source) => Err(Error::IoPath {
155 path: path.to_path_buf(),
156 source,
157 }),
158 }
159 }
160
161 pub fn save_default_path(&self) -> Result<()> {
162 self.save_to_path(config_dir()?.join("config.toml"))
163 }
164
165 pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
166 write_toml_file(path.as_ref(), self)
167 }
168
169 pub fn to_toml_string(&self) -> Result<String> {
170 toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
171 }
172
173 pub fn redacted(&self) -> Self {
174 let mut config = self.clone();
175 config.trust = config.trust.redacted();
176 config
177 }
178
179 pub fn to_redacted_toml_string(&self) -> Result<String> {
180 self.redacted().to_toml_string()
181 }
182}
183
184#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
185pub struct DaemonConfig {
186 pub listen: SocketAddr,
187 pub destination: PathBuf,
188}
189
190impl DaemonConfig {
191 pub fn from_app_config(config: &AppConfig) -> Result<Self> {
192 Ok(Self {
193 listen: SocketAddr::from(([0, 0, 0, 0], config.quic_port)),
194 destination: expand_home_path(&config.download_dir)?,
195 })
196 }
197
198 pub fn load_or_default() -> Result<Self> {
199 let app_config = AppConfig::load_or_default()?;
200 Self::load_or_default_from(config_dir()?.join("daemon.toml"), &app_config)
201 }
202
203 pub fn load_or_default_from(path: impl AsRef<Path>, app_config: &AppConfig) -> Result<Self> {
204 let path = path.as_ref();
205 match std::fs::read_to_string(path) {
206 Ok(contents) => {
207 toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
208 }
209 Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
210 Self::from_app_config(app_config)
211 }
212 Err(source) => Err(Error::IoPath {
213 path: path.to_path_buf(),
214 source,
215 }),
216 }
217 }
218
219 pub fn save_default_path(&self) -> Result<()> {
220 self.save_to_path(config_dir()?.join("daemon.toml"))
221 }
222
223 pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
224 write_toml_file(path.as_ref(), self)
225 }
226
227 pub fn to_toml_string(&self) -> Result<String> {
228 toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
229 }
230}
231
232#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
233pub struct TrustConfig {
234 pub require_fingerprint: bool,
235 pub psk: String,
236}
237
238impl Default for TrustConfig {
239 fn default() -> Self {
240 Self {
241 require_fingerprint: true,
242 psk: String::new(),
243 }
244 }
245}
246
247impl TrustConfig {
248 pub fn redacted(&self) -> Self {
249 let psk = if self.psk.is_empty() {
250 String::new()
251 } else {
252 "<redacted>".to_string()
253 };
254 Self {
255 require_fingerprint: self.require_fingerprint,
256 psk,
257 }
258 }
259}
260
261#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
262pub struct NativeIdentity {
263 cert_der: Vec<u8>,
264 key_der: Vec<u8>,
265 fingerprint: String,
266}
267
268impl NativeIdentity {
269 pub fn load_or_generate() -> Result<Self> {
270 Self::load_or_generate_in(config_dir()?)
271 }
272
273 pub fn load_or_generate_in(config_dir: impl AsRef<Path>) -> Result<Self> {
274 let config_dir = config_dir.as_ref();
275 let cert_path = config_dir.join("identity.cert.der");
276 let key_path = config_dir.join("identity.key.der");
277
278 if cert_path.exists() && key_path.exists() {
279 let cert_der = std::fs::read(&cert_path).map_err(|source| Error::IoPath {
280 path: cert_path,
281 source,
282 })?;
283 let key_der = std::fs::read(&key_path).map_err(|source| Error::IoPath {
284 path: key_path,
285 source,
286 })?;
287
288 return Ok(Self::from_parts(cert_der, key_der));
289 }
290
291 std::fs::create_dir_all(config_dir).map_err(|source| Error::IoPath {
292 path: config_dir.to_path_buf(),
293 source,
294 })?;
295
296 let identity = Self::generate()?;
297 write_private_file(&cert_path, &identity.cert_der)?;
298 write_private_file(&key_path, &identity.key_der)?;
299 Ok(identity)
300 }
301
302 pub fn generate() -> Result<Self> {
303 let CertifiedKey { cert, signing_key } =
304 generate_simple_self_signed(vec!["localhost".to_string()])?;
305 Ok(Self::from_parts(
306 cert.der().to_vec(),
307 signing_key.serialize_der(),
308 ))
309 }
310
311 pub fn fingerprint(&self) -> &str {
312 &self.fingerprint
313 }
314
315 fn cert_chain(&self) -> Vec<CertificateDer<'static>> {
316 vec![CertificateDer::from(self.cert_der.clone())]
317 }
318
319 fn private_key(&self) -> PrivateKeyDer<'static> {
320 PrivateKeyDer::from(PrivatePkcs8KeyDer::from(self.key_der.clone()))
321 }
322
323 fn from_parts(cert_der: Vec<u8>, key_der: Vec<u8>) -> Self {
324 let fingerprint = blake3::hash(&cert_der).to_hex().to_string();
325 Self {
326 cert_der,
327 key_der,
328 fingerprint,
329 }
330 }
331}
332
333#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
334#[serde(rename_all = "snake_case")]
335pub enum TransferDirection {
336 Send,
337 Receive,
338}
339
340#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
341pub struct TransferProgress {
342 pub direction: TransferDirection,
343 pub file_name: String,
344 pub bytes_done: u64,
345 pub bytes_total: u64,
346}
347
348#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
349#[serde(tag = "event", rename_all = "snake_case")]
350pub enum TransferEvent {
351 SessionStarted {
352 direction: TransferDirection,
353 session_id: Uuid,
354 files_total: usize,
355 bytes_total: u64,
356 },
357 FileStarted {
358 direction: TransferDirection,
359 file_name: String,
360 bytes_total: u64,
361 resume_offset: u64,
362 },
363 Progress {
364 direction: TransferDirection,
365 file_name: String,
366 bytes_done: u64,
367 bytes_total: u64,
368 },
369 FileFinished {
370 direction: TransferDirection,
371 file_name: String,
372 bytes: u64,
373 blake3: String,
374 status: TransferFileStatus,
375 },
376 SessionFinished {
377 direction: TransferDirection,
378 files_total: usize,
379 bytes_total: u64,
380 blake3: String,
381 },
382 SessionCancelled {
383 direction: TransferDirection,
384 session_id: Uuid,
385 },
386}
387
388impl TransferEvent {
389 pub fn progress(&self) -> Option<TransferProgress> {
390 match self {
391 Self::Progress {
392 direction,
393 file_name,
394 bytes_done,
395 bytes_total,
396 } => Some(TransferProgress {
397 direction: *direction,
398 file_name: file_name.clone(),
399 bytes_done: *bytes_done,
400 bytes_total: *bytes_total,
401 }),
402 _ => None,
403 }
404 }
405}
406
407#[derive(Debug, Clone, Default)]
408pub struct TransferControl {
409 cancelled: Arc<AtomicBool>,
410}
411
412impl TransferControl {
413 pub fn new() -> Self {
414 Self::default()
415 }
416
417 pub fn cancel(&self) {
418 self.cancelled.store(true, Ordering::SeqCst);
419 }
420
421 pub fn is_cancelled(&self) -> bool {
422 self.cancelled.load(Ordering::SeqCst)
423 }
424
425 fn check_cancelled(&self) -> Result<()> {
426 if self.is_cancelled() {
427 Err(Error::TransferCancelled)
428 } else {
429 Ok(())
430 }
431 }
432}
433
434#[derive(Debug, Clone, PartialEq, Eq)]
435pub struct TransferSummary {
436 pub path: PathBuf,
437 pub file_name: String,
438 pub bytes: u64,
439 pub blake3: String,
440 pub files: Vec<TransferFileSummary>,
441}
442
443#[derive(Debug, Clone, PartialEq, Eq)]
444pub struct TransferFileSummary {
445 pub path: PathBuf,
446 pub relative_path: PathBuf,
447 pub bytes: u64,
448 pub blake3: String,
449 pub status: TransferFileStatus,
450}
451
452#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
453#[serde(rename_all = "snake_case")]
454pub enum TransferFileStatus {
455 Sent,
456 Received,
457 Skipped,
458 Resumed,
459}
460
461#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
462pub struct TransferManifest {
463 pub version: u16,
464 pub session_id: Uuid,
465 pub chunk_size: u64,
466 pub entries: Vec<ManifestEntry>,
467}
468
469impl TransferManifest {
470 pub fn file_entries(&self) -> impl Iterator<Item = &ManifestEntry> {
471 self.entries
472 .iter()
473 .filter(|entry| entry.kind == ManifestEntryKind::File)
474 }
475
476 pub fn total_file_bytes(&self) -> u64 {
477 self.file_entries().map(|entry| entry.size).sum()
478 }
479
480 pub fn digest(&self) -> Result<String> {
481 let bytes = serde_json::to_vec(self).map_err(|error| Error::Protocol(error.to_string()))?;
482 Ok(blake3::hash(&bytes).to_hex().to_string())
483 }
484}
485
486#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
487pub struct ManifestEntry {
488 pub relative_path: PathBuf,
489 pub kind: ManifestEntryKind,
490 pub size: u64,
491 pub blake3: Option<String>,
492 pub chunks: Vec<String>,
493 pub unix_mode: Option<u32>,
494 pub modified_unix_ms: Option<i128>,
495}
496
497#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
498#[serde(rename_all = "snake_case")]
499pub enum ManifestEntryKind {
500 File,
501 Directory,
502}
503
504#[derive(Debug, Clone, PartialEq, Eq)]
505pub enum ResumeDecision {
506 Create,
507 Skip,
508 ResumeFrom(u64),
509 Conflict(String),
510}
511
512#[derive(Debug, Clone, PartialEq, Eq)]
513pub struct PeerObservation {
514 pub fingerprint: String,
515 pub alias: Option<String>,
516 pub hostname: Option<String>,
517 pub address: SocketAddr,
518 pub transports: BTreeSet<String>,
519 pub seen_unix_ms: i128,
520}
521
522impl PeerObservation {
523 pub fn new(fingerprint: impl Into<String>, address: SocketAddr, seen_unix_ms: i128) -> Self {
524 Self {
525 fingerprint: fingerprint.into(),
526 alias: None,
527 hostname: None,
528 address,
529 transports: BTreeSet::new(),
530 seen_unix_ms,
531 }
532 }
533
534 pub fn with_alias(mut self, alias: impl Into<String>) -> Self {
535 self.alias = Some(alias.into());
536 self
537 }
538
539 pub fn with_hostname(mut self, hostname: impl Into<String>) -> Self {
540 self.hostname = Some(hostname.into());
541 self
542 }
543
544 pub fn with_transport(mut self, transport: impl Into<String>) -> Self {
545 self.transports.insert(transport.into());
546 self
547 }
548}
549
550#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
551pub struct PeerRecord {
552 pub fingerprint: String,
553 pub aliases: BTreeSet<String>,
554 pub hostnames: BTreeSet<String>,
555 pub addresses: BTreeSet<SocketAddr>,
556 pub transports: BTreeSet<String>,
557 pub first_seen_unix_ms: i128,
558 pub last_seen_unix_ms: i128,
559}
560
561impl PeerRecord {
562 fn from_observation(observation: PeerObservation) -> Self {
563 let mut aliases = BTreeSet::new();
564 if let Some(alias) = observation.alias {
565 aliases.insert(alias);
566 }
567
568 let mut hostnames = BTreeSet::new();
569 if let Some(hostname) = observation.hostname {
570 hostnames.insert(hostname);
571 }
572
573 let mut addresses = BTreeSet::new();
574 addresses.insert(observation.address);
575
576 Self {
577 fingerprint: observation.fingerprint,
578 aliases,
579 hostnames,
580 addresses,
581 transports: observation.transports,
582 first_seen_unix_ms: observation.seen_unix_ms,
583 last_seen_unix_ms: observation.seen_unix_ms,
584 }
585 }
586
587 fn merge(&mut self, observation: PeerObservation) {
588 if let Some(alias) = observation.alias {
589 self.aliases.insert(alias);
590 }
591 if let Some(hostname) = observation.hostname {
592 self.hostnames.insert(hostname);
593 }
594 self.addresses.insert(observation.address);
595 self.transports.extend(observation.transports);
596 self.first_seen_unix_ms = self.first_seen_unix_ms.min(observation.seen_unix_ms);
597 self.last_seen_unix_ms = self.last_seen_unix_ms.max(observation.seen_unix_ms);
598 }
599
600 pub fn preferred_quic_address(&self) -> Option<SocketAddr> {
601 if !self.transports.contains("quic") {
602 return None;
603 }
604
605 self.addresses.iter().copied().next()
606 }
607}
608
609#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
610#[serde(rename_all = "snake_case")]
611pub enum TrustState {
612 Trusted,
613 Blocked,
614}
615
616#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
617pub struct KnownPeerEntry {
618 pub fingerprint: String,
619 #[serde(default)]
620 pub aliases: BTreeSet<String>,
621 #[serde(default)]
622 pub hostnames: BTreeSet<String>,
623 #[serde(default)]
624 pub addresses: BTreeSet<SocketAddr>,
625 #[serde(default)]
626 pub transports: BTreeSet<String>,
627 pub trust_state: TrustState,
628 pub first_seen_unix_ms: Option<i128>,
629 pub last_seen_unix_ms: Option<i128>,
630}
631
632impl KnownPeerEntry {
633 pub fn trusted(fingerprint: impl Into<String>) -> Result<Self> {
634 let fingerprint = normalized_fingerprint(fingerprint.into())?;
635 Ok(Self {
636 fingerprint,
637 aliases: BTreeSet::new(),
638 hostnames: BTreeSet::new(),
639 addresses: BTreeSet::new(),
640 transports: BTreeSet::new(),
641 trust_state: TrustState::Trusted,
642 first_seen_unix_ms: None,
643 last_seen_unix_ms: None,
644 })
645 }
646
647 pub fn from_record(record: &PeerRecord, trust_state: TrustState) -> Result<Self> {
648 let fingerprint = normalized_fingerprint(record.fingerprint.clone())?;
649 Ok(Self {
650 fingerprint,
651 aliases: record.aliases.clone(),
652 hostnames: record.hostnames.clone(),
653 addresses: record.addresses.clone(),
654 transports: record.transports.clone(),
655 trust_state,
656 first_seen_unix_ms: Some(record.first_seen_unix_ms),
657 last_seen_unix_ms: Some(record.last_seen_unix_ms),
658 })
659 }
660}
661
662#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
663pub struct TrustStore {
664 #[serde(default)]
665 pub peers: BTreeMap<String, KnownPeerEntry>,
666}
667
668impl TrustStore {
669 pub fn load_or_default() -> Result<Self> {
670 Self::load_or_default_from(config_dir()?.join("known_peers.toml"))
671 }
672
673 pub fn load_or_default_from(path: impl AsRef<Path>) -> Result<Self> {
674 let path = path.as_ref();
675 match std::fs::read_to_string(path) {
676 Ok(contents) => {
677 toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
678 }
679 Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Self::default()),
680 Err(source) => Err(Error::IoPath {
681 path: path.to_path_buf(),
682 source,
683 }),
684 }
685 }
686
687 pub fn save_default_path(&self) -> Result<()> {
688 self.save_to_path(config_dir()?.join("known_peers.toml"))
689 }
690
691 pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
692 write_toml_file(path.as_ref(), self)
693 }
694
695 pub fn trust_fingerprint(&mut self, fingerprint: impl Into<String>) -> Result<&KnownPeerEntry> {
696 let entry = KnownPeerEntry::trusted(fingerprint)?;
697 let fingerprint = entry.fingerprint.clone();
698 self.peers
699 .entry(fingerprint.clone())
700 .and_modify(|existing| existing.trust_state = TrustState::Trusted)
701 .or_insert(entry);
702 Ok(self
703 .peers
704 .get(&fingerprint)
705 .expect("trusted peer entry should exist"))
706 }
707
708 pub fn trust_record(&mut self, record: &PeerRecord) -> Result<&KnownPeerEntry> {
709 let entry = KnownPeerEntry::from_record(record, TrustState::Trusted)?;
710 let fingerprint = entry.fingerprint.clone();
711 self.peers
712 .entry(fingerprint.clone())
713 .and_modify(|existing| {
714 existing.aliases.extend(record.aliases.clone());
715 existing.hostnames.extend(record.hostnames.clone());
716 existing.addresses.extend(record.addresses.clone());
717 existing.transports.extend(record.transports.clone());
718 existing.first_seen_unix_ms = Some(
719 existing
720 .first_seen_unix_ms
721 .unwrap_or(record.first_seen_unix_ms)
722 .min(record.first_seen_unix_ms),
723 );
724 existing.last_seen_unix_ms = Some(
725 existing
726 .last_seen_unix_ms
727 .unwrap_or(record.last_seen_unix_ms)
728 .max(record.last_seen_unix_ms),
729 );
730 existing.trust_state = TrustState::Trusted;
731 })
732 .or_insert(entry);
733 Ok(self
734 .peers
735 .get(&fingerprint)
736 .expect("trusted peer entry should exist"))
737 }
738
739 pub fn forget(&mut self, fingerprint: &str) -> bool {
740 self.peers.remove(fingerprint).is_some()
741 }
742
743 pub fn records(&self) -> impl Iterator<Item = &KnownPeerEntry> {
744 self.peers.values()
745 }
746
747 pub fn to_toml_string(&self) -> Result<String> {
748 toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
749 }
750}
751
752#[derive(Debug, Default, Clone, PartialEq, Eq)]
753pub struct PeerRegistry {
754 peers: BTreeMap<String, PeerRecord>,
755}
756
757#[derive(Debug, Clone, PartialEq, Eq)]
758pub enum PeerLookup<'a> {
759 Found(&'a PeerRecord),
760 Ambiguous(Vec<&'a PeerRecord>),
761 Missing,
762}
763
764impl PeerRegistry {
765 pub fn new() -> Self {
766 Self::default()
767 }
768
769 pub fn observe(&mut self, observation: PeerObservation) -> Result<&PeerRecord> {
770 if observation.fingerprint.trim().is_empty() {
771 return Err(Error::InvalidInput(
772 "peer fingerprint must not be empty".to_string(),
773 ));
774 }
775
776 let fingerprint = observation.fingerprint.clone();
777 if let Some(record) = self.peers.get_mut(&fingerprint) {
778 record.merge(observation);
779 } else {
780 self.peers.insert(
781 fingerprint.clone(),
782 PeerRecord::from_observation(observation),
783 );
784 }
785
786 Ok(self
787 .peers
788 .get(&fingerprint)
789 .expect("observed peer record should exist"))
790 }
791
792 pub fn get_by_fingerprint(&self, fingerprint: &str) -> Option<&PeerRecord> {
793 self.peers.get(fingerprint)
794 }
795
796 pub fn lookup(&self, query: &str) -> Option<&PeerRecord> {
797 match self.lookup_detail(query) {
798 PeerLookup::Found(record) => Some(record),
799 PeerLookup::Ambiguous(_) | PeerLookup::Missing => None,
800 }
801 }
802
803 pub fn lookup_detail(&self, query: &str) -> PeerLookup<'_> {
804 if let Some(record) = self.peers.get(query) {
805 return PeerLookup::Found(record);
806 }
807
808 let matches = self
809 .peers
810 .values()
811 .filter(|record| {
812 record.fingerprint.starts_with(query)
813 || record.aliases.iter().any(|alias| alias == query)
814 || record.hostnames.iter().any(|hostname| hostname == query)
815 })
816 .collect::<Vec<_>>();
817
818 match matches.as_slice() {
819 [] => PeerLookup::Missing,
820 [record] => PeerLookup::Found(record),
821 _ => PeerLookup::Ambiguous(matches),
822 }
823 }
824
825 pub fn records(&self) -> impl Iterator<Item = &PeerRecord> {
826 self.peers.values()
827 }
828}
829
830#[derive(Debug, Clone, PartialEq, Eq)]
831pub struct NativeAnnouncement {
832 pub alias: String,
833 pub fingerprint: String,
834 pub quic_port: u16,
835 pub listen_port: Option<u16>,
836}
837
838impl NativeAnnouncement {
839 pub fn from_config(config: &AppConfig, identity: &NativeIdentity) -> Self {
840 Self {
841 alias: config.alias.clone(),
842 fingerprint: identity.fingerprint().to_string(),
843 quic_port: config.quic_port,
844 listen_port: Some(config.listen_port),
845 }
846 }
847
848 fn service_info(&self) -> Result<ServiceInfo> {
849 let alias = sanitize_dns_label(&self.alias);
850 let fingerprint_prefix = self
851 .fingerprint
852 .get(..12)
853 .unwrap_or(self.fingerprint.as_str());
854 let instance = format!("{alias}-{fingerprint_prefix}");
855 let hostname = format!("{alias}.local.");
856 let properties = [
857 ("pv", NATIVE_PROTOCOL_VERSION.to_string()),
858 ("minpv", MIN_NATIVE_PROTOCOL_VERSION.to_string()),
859 ("fp", self.fingerprint.clone()),
860 ("alias", self.alias.clone()),
861 ("transports", "quic".to_string()),
862 ("quic_port", self.quic_port.to_string()),
863 (
864 "listen_port",
865 self.listen_port
866 .map(|port| port.to_string())
867 .unwrap_or_default(),
868 ),
869 ];
870
871 ServiceInfo::new(
872 NATIVE_SERVICE_TYPE,
873 &instance,
874 &hostname,
875 "",
876 self.quic_port,
877 &properties[..],
878 )
879 .map(ServiceInfo::enable_addr_auto)
880 .map_err(|error| Error::Discovery(error.to_string()))
881 }
882}
883
884pub struct NativeAnnouncer {
885 daemon: ServiceDaemon,
886 fullname: String,
887}
888
889impl NativeAnnouncer {
890 pub fn start(announcement: &NativeAnnouncement) -> Result<Self> {
891 let daemon = ServiceDaemon::new().map_err(|error| Error::Discovery(error.to_string()))?;
892 let service = announcement.service_info()?;
893 let fullname = service.get_fullname().to_string();
894 daemon
895 .register(service)
896 .map_err(|error| Error::Discovery(error.to_string()))?;
897 Ok(Self { daemon, fullname })
898 }
899}
900
901impl Drop for NativeAnnouncer {
902 fn drop(&mut self) {
903 let _ = self.daemon.unregister(&self.fullname);
904 let _ = self.daemon.shutdown();
905 }
906}
907
908pub async fn discover_native_peers(timeout: Duration) -> Result<PeerRegistry> {
909 let daemon = ServiceDaemon::new().map_err(|error| Error::Discovery(error.to_string()))?;
910 let receiver = daemon
911 .browse(NATIVE_SERVICE_TYPE)
912 .map_err(|error| Error::Discovery(error.to_string()))?;
913 let deadline = tokio::time::Instant::now() + timeout;
914 let mut registry = PeerRegistry::new();
915
916 loop {
917 let now = tokio::time::Instant::now();
918 if now >= deadline {
919 break;
920 }
921
922 let wait = deadline - now;
923 match tokio::time::timeout(wait, receiver.recv_async()).await {
924 Ok(Ok(ServiceEvent::ServiceResolved(service))) => {
925 observe_resolved_service(&mut registry, &service)?;
926 }
927 Ok(Ok(_)) => {}
928 Ok(Err(error)) => {
929 return Err(Error::Discovery(error.to_string()));
930 }
931 Err(_) => break,
932 }
933 }
934
935 daemon
936 .stop_browse(NATIVE_SERVICE_TYPE)
937 .map_err(|error| Error::Discovery(error.to_string()))?;
938 let _ = daemon.shutdown();
939 Ok(registry)
940}
941
942fn observe_resolved_service(registry: &mut PeerRegistry, service: &ResolvedService) -> Result<()> {
943 let Some(fingerprint) = service.get_property_val_str("fp") else {
944 return Ok(());
945 };
946 let fingerprint = match normalized_fingerprint(fingerprint.to_string()) {
947 Ok(fingerprint) => fingerprint,
948 Err(_) => return Ok(()),
949 };
950 let Some(highest_version) = parse_txt_u16(service, "pv") else {
951 return Ok(());
952 };
953 let Some(minimum_version) = parse_txt_u16(service, "minpv") else {
954 return Ok(());
955 };
956 if minimum_version > highest_version
957 || minimum_version > NATIVE_PROTOCOL_VERSION
958 || highest_version < MIN_NATIVE_PROTOCOL_VERSION
959 {
960 return Ok(());
961 }
962
963 let Some(quic_port) = parse_txt_u16(service, "quic_port") else {
964 return Ok(());
965 };
966 let transports = service
967 .get_property_val_str("transports")
968 .unwrap_or_default()
969 .split(',')
970 .map(str::trim)
971 .filter(|value| !value.is_empty())
972 .map(ToOwned::to_owned)
973 .collect::<Vec<_>>();
974 if !transports.iter().any(|transport| transport == "quic") {
975 return Ok(());
976 }
977
978 let alias = service.get_property_val_str("alias").map(ToOwned::to_owned);
979 let hostname = Some(service.get_hostname().trim_end_matches('.').to_string());
980 let seen_unix_ms = system_time_unix_ms(SystemTime::now()).unwrap_or_default();
981
982 for address in service.get_addresses_v4() {
983 let mut observation = PeerObservation::new(
984 fingerprint.clone(),
985 SocketAddr::from((address, quic_port)),
986 seen_unix_ms,
987 );
988 if let Some(alias) = &alias {
989 observation = observation.with_alias(alias.clone());
990 }
991 if let Some(hostname) = &hostname {
992 observation = observation.with_hostname(hostname.clone());
993 }
994 for transport in &transports {
995 observation = observation.with_transport(transport.clone());
996 }
997 registry.observe(observation)?;
998 }
999
1000 Ok(())
1001}
1002
1003fn parse_txt_u16(service: &ResolvedService, key: &str) -> Option<u16> {
1004 service.get_property_val_str(key)?.parse().ok()
1005}
1006
1007fn sanitize_dns_label(value: &str) -> String {
1008 let mut label = value
1009 .chars()
1010 .map(|ch| {
1011 if ch.is_ascii_alphanumeric() || ch == '-' {
1012 ch.to_ascii_lowercase()
1013 } else {
1014 '-'
1015 }
1016 })
1017 .collect::<String>();
1018 while label.contains("--") {
1019 label = label.replace("--", "-");
1020 }
1021 let label = label.trim_matches('-').to_string();
1022 if label.is_empty() {
1023 "fileferry-device".to_string()
1024 } else {
1025 label
1026 }
1027}
1028
1029#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1030struct DirectReceivePlan {
1031 files: Vec<DirectFilePlan>,
1032}
1033
1034#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1035struct DirectProtocolHello {
1036 protocol_version: u16,
1037 min_protocol_version: u16,
1038 client_fingerprint: String,
1039}
1040
1041impl DirectProtocolHello {
1042 fn new(identity: &NativeIdentity) -> Self {
1043 Self {
1044 protocol_version: NATIVE_PROTOCOL_VERSION,
1045 min_protocol_version: MIN_NATIVE_PROTOCOL_VERSION,
1046 client_fingerprint: identity.fingerprint().to_string(),
1047 }
1048 }
1049}
1050
1051#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1052struct DirectProtocolResponse {
1053 accepted: bool,
1054 protocol_version: Option<u16>,
1055 error: Option<String>,
1056}
1057
1058impl DirectProtocolResponse {
1059 fn accept(protocol_version: u16) -> Self {
1060 Self {
1061 accepted: true,
1062 protocol_version: Some(protocol_version),
1063 error: None,
1064 }
1065 }
1066
1067 fn reject(error: impl Into<String>) -> Self {
1068 Self {
1069 accepted: false,
1070 protocol_version: None,
1071 error: Some(error.into()),
1072 }
1073 }
1074}
1075
1076#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1077struct DirectFilePlan {
1078 relative_path: PathBuf,
1079 action: DirectFileAction,
1080 offset: u64,
1081}
1082
1083#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1084#[serde(rename_all = "snake_case")]
1085enum DirectFileAction {
1086 Receive,
1087 Skip,
1088}
1089
1090#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1091struct DirectPayloadHeader {
1092 relative_path: PathBuf,
1093 offset: u64,
1094 bytes: u64,
1095}
1096
1097#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1098struct DirectAck {
1099 ok: bool,
1100 error: Option<String>,
1101}
1102
1103pub fn validate_send_paths(paths: &[PathBuf]) -> Result<()> {
1104 if paths.is_empty() {
1105 return Err(Error::InvalidInput(
1106 "at least one file path is required".to_string(),
1107 ));
1108 }
1109
1110 for path in paths {
1111 if path.as_os_str().is_empty() {
1112 return Err(Error::InvalidInput("empty file path".to_string()));
1113 }
1114 }
1115
1116 Ok(())
1117}
1118
1119pub fn display_path(path: &Path) -> String {
1120 path.to_string_lossy().into_owned()
1121}
1122
1123pub async fn build_manifest(paths: &[PathBuf]) -> Result<TransferManifest> {
1124 validate_send_paths(paths)?;
1125
1126 let sources = collect_manifest_sources(paths)?;
1127 let mut entries = Vec::with_capacity(sources.len());
1128 let mut seen = BTreeSet::new();
1129
1130 for source in sources {
1131 if !seen.insert(source.relative_path.clone()) {
1132 return Err(Error::InvalidInput(format!(
1133 "duplicate transfer path: {}",
1134 display_path(&source.relative_path)
1135 )));
1136 }
1137
1138 entries.push(build_manifest_entry(source).await?);
1139 }
1140
1141 Ok(TransferManifest {
1142 version: MANIFEST_VERSION,
1143 session_id: Uuid::new_v4(),
1144 chunk_size: DEFAULT_CHUNK_SIZE,
1145 entries,
1146 })
1147}
1148
1149pub fn safe_destination_path(destination: &Path, relative_path: &Path) -> Result<PathBuf> {
1150 validate_relative_transfer_path(relative_path)?;
1151 Ok(destination.join(relative_path))
1152}
1153
1154pub async fn decide_resume(
1155 destination: &Path,
1156 entry: &ManifestEntry,
1157 chunk_size: u64,
1158) -> Result<ResumeDecision> {
1159 if entry.kind != ManifestEntryKind::File {
1160 return Ok(ResumeDecision::Create);
1161 }
1162
1163 let path = safe_destination_path(destination, &entry.relative_path)?;
1164 let metadata = match tokio::fs::metadata(&path).await {
1165 Ok(metadata) => metadata,
1166 Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
1167 return Ok(ResumeDecision::Create);
1168 }
1169 Err(source) => {
1170 return Err(Error::IoPath { path, source });
1171 }
1172 };
1173
1174 if !metadata.is_file() {
1175 return Ok(ResumeDecision::Conflict(format!(
1176 "{} already exists and is not a regular file",
1177 display_path(&path)
1178 )));
1179 }
1180
1181 if metadata.len() == entry.size {
1182 let expected = entry
1183 .blake3
1184 .as_deref()
1185 .ok_or_else(|| Error::Protocol("file manifest entry is missing BLAKE3".to_string()))?;
1186 let actual = hash_file(&path).await?;
1187 return if actual == expected {
1188 Ok(ResumeDecision::Skip)
1189 } else {
1190 Ok(ResumeDecision::Conflict(format!(
1191 "{} already exists with different contents",
1192 display_path(&path)
1193 )))
1194 };
1195 }
1196
1197 if metadata.len() > entry.size {
1198 return Ok(ResumeDecision::Conflict(format!(
1199 "{} is larger than incoming file",
1200 display_path(&path)
1201 )));
1202 }
1203
1204 let verified_offset = verified_prefix_offset(&path, entry, metadata.len(), chunk_size).await?;
1205 if verified_offset > 0 {
1206 Ok(ResumeDecision::ResumeFrom(verified_offset))
1207 } else {
1208 Ok(ResumeDecision::Conflict(format!(
1209 "{} already exists and does not match a verified prefix",
1210 display_path(&path)
1211 )))
1212 }
1213}
1214
1215pub async fn send_direct_file(
1216 request: &SendRequest,
1217 identity: &NativeIdentity,
1218 mut events: impl FnMut(TransferEvent),
1219) -> Result<TransferSummary> {
1220 send_direct_file_with_control(request, identity, TransferControl::new(), &mut events).await
1221}
1222
1223pub async fn send_direct_file_with_control(
1224 request: &SendRequest,
1225 identity: &NativeIdentity,
1226 control: TransferControl,
1227 mut events: impl FnMut(TransferEvent),
1228) -> Result<TransferSummary> {
1229 ensure_rustls_provider();
1230
1231 control.check_cancelled()?;
1232 let manifest = build_manifest(request.paths()).await?;
1233 let sources = manifest_source_map(request.paths())?;
1234 events(TransferEvent::SessionStarted {
1235 direction: TransferDirection::Send,
1236 session_id: manifest.session_id,
1237 files_total: manifest.file_entries().count(),
1238 bytes_total: manifest.total_file_bytes(),
1239 });
1240 control.check_cancelled()?;
1241
1242 let (client_config, server_fingerprint) = client_config_capturing_server_fingerprint()?;
1243 let mut endpoint = quinn::Endpoint::client(SocketAddr::from(([0, 0, 0, 0], 0)))?;
1244 endpoint.set_default_client_config(client_config);
1245 let connection = endpoint
1246 .connect(request.peer().address(), "localhost")?
1247 .await?;
1248 if let Some(expected) = request.peer().expected_fingerprint() {
1249 let actual = captured_server_fingerprint(&server_fingerprint)?;
1250 if actual != expected {
1251 connection.close(1u32.into(), b"fingerprint mismatch");
1252 return Err(Error::PeerFingerprintMismatch {
1253 expected: expected.to_string(),
1254 actual,
1255 });
1256 }
1257 }
1258 let (mut send, mut recv) = connection.open_bi().await?;
1259
1260 send.write_all(DIRECT_MAGIC).await?;
1261 write_json_frame(&mut send, &DirectProtocolHello::new(identity)).await?;
1262 let protocol: DirectProtocolResponse = read_json_frame(&mut recv).await?;
1263 if !protocol.accepted {
1264 return Err(Error::Protocol(protocol.error.unwrap_or_else(|| {
1265 "receiver rejected protocol negotiation".to_string()
1266 })));
1267 }
1268 if protocol.protocol_version != Some(NATIVE_PROTOCOL_VERSION) {
1269 return Err(Error::Protocol(format!(
1270 "receiver selected unsupported protocol version {:?}",
1271 protocol.protocol_version
1272 )));
1273 }
1274
1275 write_json_frame(&mut send, &manifest).await?;
1276
1277 let plan: DirectReceivePlan = read_json_frame(&mut recv).await?;
1278 let mut summaries = Vec::new();
1279 for file_plan in plan.files {
1280 let Some(entry) = manifest
1281 .file_entries()
1282 .find(|entry| entry.relative_path == file_plan.relative_path)
1283 else {
1284 return Err(Error::Protocol(format!(
1285 "receiver requested unknown file: {}",
1286 display_path(&file_plan.relative_path)
1287 )));
1288 };
1289
1290 let Some(source_path) = sources.get(&file_plan.relative_path) else {
1291 return Err(Error::Protocol(format!(
1292 "missing source for manifest file: {}",
1293 display_path(&file_plan.relative_path)
1294 )));
1295 };
1296
1297 if file_plan.action == DirectFileAction::Skip {
1298 let blake3 = entry.blake3.clone().unwrap_or_default();
1299 events(TransferEvent::FileFinished {
1300 direction: TransferDirection::Send,
1301 file_name: display_path(&entry.relative_path),
1302 bytes: entry.size,
1303 blake3: blake3.clone(),
1304 status: TransferFileStatus::Skipped,
1305 });
1306 summaries.push(TransferFileSummary {
1307 path: source_path.clone(),
1308 relative_path: entry.relative_path.clone(),
1309 bytes: entry.size,
1310 blake3,
1311 status: TransferFileStatus::Skipped,
1312 });
1313 continue;
1314 }
1315
1316 match send_payload_file(
1317 &mut send,
1318 source_path,
1319 entry,
1320 file_plan.offset,
1321 &control,
1322 &mut events,
1323 )
1324 .await
1325 {
1326 Ok(()) => {}
1327 Err(Error::TransferCancelled) => {
1328 events(TransferEvent::SessionCancelled {
1329 direction: TransferDirection::Send,
1330 session_id: manifest.session_id,
1331 });
1332 connection.close(1u32.into(), b"cancelled");
1333 return Err(Error::TransferCancelled);
1334 }
1335 Err(error) => return Err(error),
1336 }
1337 let status = if file_plan.offset > 0 {
1338 TransferFileStatus::Resumed
1339 } else {
1340 TransferFileStatus::Sent
1341 };
1342 let blake3 = entry.blake3.clone().unwrap_or_default();
1343 events(TransferEvent::FileFinished {
1344 direction: TransferDirection::Send,
1345 file_name: display_path(&entry.relative_path),
1346 bytes: entry.size,
1347 blake3: blake3.clone(),
1348 status,
1349 });
1350 summaries.push(TransferFileSummary {
1351 path: source_path.clone(),
1352 relative_path: entry.relative_path.clone(),
1353 bytes: entry.size,
1354 blake3,
1355 status,
1356 });
1357 }
1358 send.finish()?;
1359
1360 let ack: DirectAck = read_json_frame(&mut recv).await?;
1361 if !ack.ok {
1362 return Err(Error::Transfer(
1363 ack.error
1364 .unwrap_or_else(|| "receiver rejected transfer".to_string()),
1365 ));
1366 }
1367
1368 connection.close(0u32.into(), b"done");
1369 endpoint.wait_idle().await;
1370
1371 let summary = summary_from_files(&manifest, summaries)?;
1372 events(TransferEvent::SessionFinished {
1373 direction: TransferDirection::Send,
1374 files_total: summary.files.len(),
1375 bytes_total: summary.bytes,
1376 blake3: summary.blake3.clone(),
1377 });
1378 Ok(summary)
1379}
1380
1381pub async fn receive_direct_file(
1382 request: &ReceiveRequest,
1383 destination: impl AsRef<Path>,
1384 identity: &NativeIdentity,
1385 events: impl FnMut(TransferEvent),
1386) -> Result<TransferSummary> {
1387 receive_direct_file_with_control(
1388 request,
1389 destination,
1390 identity,
1391 TransferControl::new(),
1392 events,
1393 )
1394 .await
1395}
1396
1397pub async fn receive_direct_file_with_control(
1398 request: &ReceiveRequest,
1399 destination: impl AsRef<Path>,
1400 identity: &NativeIdentity,
1401 control: TransferControl,
1402 mut events: impl FnMut(TransferEvent),
1403) -> Result<TransferSummary> {
1404 ensure_rustls_provider();
1405
1406 control.check_cancelled()?;
1407 let destination = destination.as_ref();
1408 tokio::fs::create_dir_all(destination)
1409 .await
1410 .map_err(|source| Error::IoPath {
1411 path: destination.to_path_buf(),
1412 source,
1413 })?;
1414
1415 let server_config =
1416 quinn::ServerConfig::with_single_cert(identity.cert_chain(), identity.private_key())?;
1417 let endpoint = quinn::Endpoint::server(server_config, request.listen())?;
1418 let incoming = endpoint.accept().await.ok_or(Error::NoIncomingConnection)?;
1419 let connection = incoming.await?;
1420 let (mut send, mut recv) = connection.accept_bi().await?;
1421
1422 let result =
1423 receive_stream_file(destination, &mut recv, &mut send, &control, &mut events).await;
1424 let ack = match &result {
1425 Ok(_) => DirectAck {
1426 ok: true,
1427 error: None,
1428 },
1429 Err(error) => DirectAck {
1430 ok: false,
1431 error: Some(error.to_string()),
1432 },
1433 };
1434 write_json_frame(&mut send, &ack).await?;
1435 send.finish()?;
1436 let _ = tokio::time::timeout(Duration::from_secs(1), connection.closed()).await;
1437 endpoint.close(0u32.into(), b"done");
1438 endpoint.wait_idle().await;
1439
1440 result
1441}
1442
1443async fn receive_stream_file(
1444 destination: &Path,
1445 recv: &mut quinn::RecvStream,
1446 send: &mut quinn::SendStream,
1447 control: &TransferControl,
1448 events: &mut impl FnMut(TransferEvent),
1449) -> Result<TransferSummary> {
1450 let mut magic = [0; DIRECT_MAGIC.len()];
1451 recv.read_exact(&mut magic).await?;
1452 if &magic != DIRECT_MAGIC {
1453 return Err(Error::Protocol("bad direct-transfer magic".to_string()));
1454 }
1455
1456 let hello: DirectProtocolHello = read_json_frame(recv).await?;
1457 let protocol = negotiate_direct_protocol(&hello);
1458 write_json_frame(send, &protocol).await?;
1459 if !protocol.accepted {
1460 return Err(Error::Protocol(protocol.error.unwrap_or_else(|| {
1461 "unsupported direct protocol version".to_string()
1462 })));
1463 }
1464
1465 let manifest: TransferManifest = read_json_frame(recv).await?;
1466 validate_manifest(&manifest)?;
1467 events(TransferEvent::SessionStarted {
1468 direction: TransferDirection::Receive,
1469 session_id: manifest.session_id,
1470 files_total: manifest.file_entries().count(),
1471 bytes_total: manifest.total_file_bytes(),
1472 });
1473 control.check_cancelled()?;
1474
1475 for entry in &manifest.entries {
1476 if entry.kind == ManifestEntryKind::Directory {
1477 let path = safe_destination_path(destination, &entry.relative_path)?;
1478 tokio::fs::create_dir_all(&path)
1479 .await
1480 .map_err(|source| Error::IoPath { path, source })?;
1481 }
1482 }
1483
1484 let mut plan = DirectReceivePlan { files: Vec::new() };
1485 for entry in manifest.file_entries() {
1486 let decision = decide_resume(destination, entry, manifest.chunk_size).await?;
1487 let (action, offset) = match decision {
1488 ResumeDecision::Create => (DirectFileAction::Receive, 0),
1489 ResumeDecision::Skip => (DirectFileAction::Skip, entry.size),
1490 ResumeDecision::ResumeFrom(offset) => (DirectFileAction::Receive, offset),
1491 ResumeDecision::Conflict(message) => return Err(Error::InvalidInput(message)),
1492 };
1493 plan.files.push(DirectFilePlan {
1494 relative_path: entry.relative_path.clone(),
1495 action,
1496 offset,
1497 });
1498 }
1499 write_json_frame(send, &plan).await?;
1500
1501 let mut summaries = Vec::new();
1502 for file_plan in &plan.files {
1503 let Some(entry) = manifest
1504 .file_entries()
1505 .find(|entry| entry.relative_path == file_plan.relative_path)
1506 else {
1507 return Err(Error::Protocol(format!(
1508 "planned file missing from manifest: {}",
1509 display_path(&file_plan.relative_path)
1510 )));
1511 };
1512
1513 let final_path = safe_destination_path(destination, &entry.relative_path)?;
1514 if file_plan.action == DirectFileAction::Skip {
1515 let blake3 = entry.blake3.clone().unwrap_or_default();
1516 events(TransferEvent::FileFinished {
1517 direction: TransferDirection::Receive,
1518 file_name: display_path(&entry.relative_path),
1519 bytes: entry.size,
1520 blake3: blake3.clone(),
1521 status: TransferFileStatus::Skipped,
1522 });
1523 summaries.push(TransferFileSummary {
1524 path: final_path,
1525 relative_path: entry.relative_path.clone(),
1526 bytes: entry.size,
1527 blake3,
1528 status: TransferFileStatus::Skipped,
1529 });
1530 continue;
1531 }
1532
1533 match receive_payload_file(recv, destination, entry, file_plan.offset, control, events)
1534 .await
1535 {
1536 Ok(()) => {}
1537 Err(Error::TransferCancelled) => {
1538 events(TransferEvent::SessionCancelled {
1539 direction: TransferDirection::Receive,
1540 session_id: manifest.session_id,
1541 });
1542 return Err(Error::TransferCancelled);
1543 }
1544 Err(error) => return Err(error),
1545 }
1546 let actual_hash = hash_file(&final_path).await?;
1547 let expected_hash = entry
1548 .blake3
1549 .as_deref()
1550 .ok_or_else(|| Error::Protocol("file manifest entry is missing BLAKE3".to_string()))?;
1551 if actual_hash != expected_hash {
1552 return Err(Error::Transfer(format!(
1553 "BLAKE3 hash mismatch for {}",
1554 display_path(&entry.relative_path)
1555 )));
1556 }
1557
1558 let status = if file_plan.offset > 0 {
1559 TransferFileStatus::Resumed
1560 } else {
1561 TransferFileStatus::Received
1562 };
1563 events(TransferEvent::FileFinished {
1564 direction: TransferDirection::Receive,
1565 file_name: display_path(&entry.relative_path),
1566 bytes: entry.size,
1567 blake3: actual_hash.clone(),
1568 status,
1569 });
1570 summaries.push(TransferFileSummary {
1571 path: final_path,
1572 relative_path: entry.relative_path.clone(),
1573 bytes: entry.size,
1574 blake3: actual_hash,
1575 status,
1576 });
1577 }
1578
1579 let summary = summary_from_files(&manifest, summaries)?;
1580 events(TransferEvent::SessionFinished {
1581 direction: TransferDirection::Receive,
1582 files_total: summary.files.len(),
1583 bytes_total: summary.bytes,
1584 blake3: summary.blake3.clone(),
1585 });
1586 Ok(summary)
1587}
1588
1589async fn send_payload_file(
1590 send: &mut quinn::SendStream,
1591 source_path: &Path,
1592 entry: &ManifestEntry,
1593 offset: u64,
1594 control: &TransferControl,
1595 events: &mut impl FnMut(TransferEvent),
1596) -> Result<()> {
1597 if offset > entry.size {
1598 return Err(Error::Protocol(format!(
1599 "resume offset exceeds file size for {}",
1600 display_path(&entry.relative_path)
1601 )));
1602 }
1603
1604 let header = DirectPayloadHeader {
1605 relative_path: entry.relative_path.clone(),
1606 offset,
1607 bytes: entry.size - offset,
1608 };
1609 write_json_frame(send, &header).await?;
1610 events(TransferEvent::FileStarted {
1611 direction: TransferDirection::Send,
1612 file_name: display_path(&entry.relative_path),
1613 bytes_total: entry.size,
1614 resume_offset: offset,
1615 });
1616
1617 let mut file = tokio::fs::File::open(source_path)
1618 .await
1619 .map_err(|source| Error::IoPath {
1620 path: source_path.to_path_buf(),
1621 source,
1622 })?;
1623 file.seek(SeekFrom::Start(offset))
1624 .await
1625 .map_err(|source| Error::IoPath {
1626 path: source_path.to_path_buf(),
1627 source,
1628 })?;
1629
1630 let mut buffer = vec![0; IO_BUFFER_SIZE];
1631 let mut bytes_done = offset;
1632 while bytes_done < entry.size {
1633 control.check_cancelled()?;
1634 let remaining = entry.size - bytes_done;
1635 let read_size = buffer.len().min(remaining as usize);
1636 let read = file
1637 .read(&mut buffer[..read_size])
1638 .await
1639 .map_err(|source| Error::IoPath {
1640 path: source_path.to_path_buf(),
1641 source,
1642 })?;
1643 if read == 0 {
1644 return Err(Error::Protocol(format!(
1645 "source file ended early: {}",
1646 display_path(source_path)
1647 )));
1648 }
1649 send.write_all(&buffer[..read]).await?;
1650 bytes_done += read as u64;
1651 events(TransferEvent::Progress {
1652 direction: TransferDirection::Send,
1653 file_name: display_path(&entry.relative_path),
1654 bytes_done,
1655 bytes_total: entry.size,
1656 });
1657 control.check_cancelled()?;
1658 }
1659
1660 Ok(())
1661}
1662
1663async fn receive_payload_file(
1664 recv: &mut quinn::RecvStream,
1665 destination: &Path,
1666 entry: &ManifestEntry,
1667 offset: u64,
1668 control: &TransferControl,
1669 events: &mut impl FnMut(TransferEvent),
1670) -> Result<()> {
1671 let header: DirectPayloadHeader = read_json_frame(recv).await?;
1672 if header.relative_path != entry.relative_path
1673 || header.offset != offset
1674 || header.bytes != entry.size - offset
1675 {
1676 return Err(Error::Protocol(format!(
1677 "payload header does not match receive plan for {}",
1678 display_path(&entry.relative_path)
1679 )));
1680 }
1681 events(TransferEvent::FileStarted {
1682 direction: TransferDirection::Receive,
1683 file_name: display_path(&entry.relative_path),
1684 bytes_total: entry.size,
1685 resume_offset: offset,
1686 });
1687
1688 let final_path = safe_destination_path(destination, &entry.relative_path)?;
1689 if let Some(parent) = final_path.parent() {
1690 tokio::fs::create_dir_all(parent)
1691 .await
1692 .map_err(|source| Error::IoPath {
1693 path: parent.to_path_buf(),
1694 source,
1695 })?;
1696 }
1697
1698 let write_path = if offset == 0 {
1699 temp_path_for(&final_path)
1700 } else {
1701 final_path.clone()
1702 };
1703
1704 let mut file = if offset == 0 {
1705 tokio::fs::File::create(&write_path)
1706 .await
1707 .map_err(|source| Error::IoPath {
1708 path: write_path.clone(),
1709 source,
1710 })?
1711 } else {
1712 let mut file = tokio::fs::OpenOptions::new()
1713 .write(true)
1714 .open(&write_path)
1715 .await
1716 .map_err(|source| Error::IoPath {
1717 path: write_path.clone(),
1718 source,
1719 })?;
1720 file.set_len(offset).await.map_err(|source| Error::IoPath {
1721 path: write_path.clone(),
1722 source,
1723 })?;
1724 file.seek(SeekFrom::Start(offset))
1725 .await
1726 .map_err(|source| Error::IoPath {
1727 path: write_path.clone(),
1728 source,
1729 })?;
1730 file
1731 };
1732
1733 let mut bytes_done = offset;
1734 let mut bytes_remaining = header.bytes;
1735 let mut buffer = vec![0; IO_BUFFER_SIZE];
1736 while bytes_remaining > 0 {
1737 if control.is_cancelled() {
1738 if offset == 0 {
1739 let _ = tokio::fs::remove_file(&write_path).await;
1740 }
1741 return Err(Error::TransferCancelled);
1742 }
1743 let read_size = buffer.len().min(bytes_remaining as usize);
1744 let read = match recv.read(&mut buffer[..read_size]).await {
1745 Ok(Some(read)) => read,
1746 Ok(None) => {
1747 if offset == 0 {
1748 let _ = tokio::fs::remove_file(&write_path).await;
1749 }
1750 return Err(Error::Protocol(
1751 "stream ended before file payload".to_string(),
1752 ));
1753 }
1754 Err(error) => {
1755 if offset == 0 {
1756 let _ = tokio::fs::remove_file(&write_path).await;
1757 }
1758 return Err(Error::Read(error));
1759 }
1760 };
1761
1762 file.write_all(&buffer[..read])
1763 .await
1764 .map_err(|source| Error::IoPath {
1765 path: write_path.clone(),
1766 source,
1767 })?;
1768 bytes_done += read as u64;
1769 bytes_remaining -= read as u64;
1770 events(TransferEvent::Progress {
1771 direction: TransferDirection::Receive,
1772 file_name: display_path(&entry.relative_path),
1773 bytes_done,
1774 bytes_total: entry.size,
1775 });
1776 if control.is_cancelled() {
1777 if offset == 0 {
1778 let _ = tokio::fs::remove_file(&write_path).await;
1779 }
1780 return Err(Error::TransferCancelled);
1781 }
1782 }
1783
1784 file.flush().await.map_err(|source| Error::IoPath {
1785 path: write_path.clone(),
1786 source,
1787 })?;
1788 drop(file);
1789
1790 if offset == 0 {
1791 tokio::fs::rename(&write_path, &final_path)
1792 .await
1793 .map_err(|source| Error::IoPath {
1794 path: final_path,
1795 source,
1796 })?;
1797 }
1798
1799 Ok(())
1800}
1801
1802async fn write_json_frame<T: Serialize>(send: &mut quinn::SendStream, value: &T) -> Result<()> {
1803 let bytes = serde_json::to_vec(value).map_err(|error| Error::Protocol(error.to_string()))?;
1804 if bytes.len() > MAX_FRAME_LEN {
1805 return Err(Error::Protocol("frame exceeds maximum length".to_string()));
1806 }
1807 send.write_all(&(bytes.len() as u32).to_be_bytes()).await?;
1808 send.write_all(&bytes).await?;
1809 Ok(())
1810}
1811
1812async fn read_json_frame<T: for<'de> Deserialize<'de>>(recv: &mut quinn::RecvStream) -> Result<T> {
1813 let mut len = [0; 4];
1814 recv.read_exact(&mut len).await?;
1815 let len = u32::from_be_bytes(len) as usize;
1816 if len > MAX_FRAME_LEN {
1817 return Err(Error::Protocol("frame exceeds maximum length".to_string()));
1818 }
1819
1820 let mut bytes = vec![0; len];
1821 recv.read_exact(&mut bytes).await?;
1822 serde_json::from_slice(&bytes).map_err(|error| Error::Protocol(error.to_string()))
1823}
1824
1825fn negotiate_direct_protocol(hello: &DirectProtocolHello) -> DirectProtocolResponse {
1826 if hello.min_protocol_version > NATIVE_PROTOCOL_VERSION {
1827 return DirectProtocolResponse::reject(format!(
1828 "peer requires protocol version {}, local max is {}",
1829 hello.min_protocol_version, NATIVE_PROTOCOL_VERSION
1830 ));
1831 }
1832
1833 if hello.protocol_version < MIN_NATIVE_PROTOCOL_VERSION {
1834 return DirectProtocolResponse::reject(format!(
1835 "peer max protocol version {} is below local minimum {}",
1836 hello.protocol_version, MIN_NATIVE_PROTOCOL_VERSION
1837 ));
1838 }
1839
1840 DirectProtocolResponse::accept(NATIVE_PROTOCOL_VERSION.min(hello.protocol_version))
1841}
1842
1843#[derive(Debug, Clone)]
1844struct ManifestSource {
1845 source_path: PathBuf,
1846 relative_path: PathBuf,
1847}
1848
1849fn collect_manifest_sources(paths: &[PathBuf]) -> Result<Vec<ManifestSource>> {
1850 let mut sources = Vec::new();
1851 for path in paths {
1852 let metadata = std::fs::symlink_metadata(path).map_err(|source| Error::IoPath {
1853 path: path.clone(),
1854 source,
1855 })?;
1856 if metadata.file_type().is_symlink() {
1857 return Err(Error::InvalidInput(format!(
1858 "{} is a symlink; symlink transfers are not supported",
1859 display_path(path)
1860 )));
1861 }
1862
1863 let root_name = path
1864 .file_name()
1865 .ok_or_else(|| Error::InvalidInput(format!("{} has no file name", display_path(path))))?
1866 .to_os_string();
1867 let relative_path = PathBuf::from(root_name);
1868
1869 if metadata.is_dir() {
1870 sources.push(ManifestSource {
1871 source_path: path.clone(),
1872 relative_path: relative_path.clone(),
1873 });
1874 collect_dir_sources(path, &relative_path, &mut sources)?;
1875 } else if metadata.is_file() {
1876 sources.push(ManifestSource {
1877 source_path: path.clone(),
1878 relative_path,
1879 });
1880 } else {
1881 return Err(Error::InvalidInput(format!(
1882 "{} is not a regular file or directory",
1883 display_path(path)
1884 )));
1885 }
1886 }
1887
1888 Ok(sources)
1889}
1890
1891fn collect_dir_sources(
1892 dir: &Path,
1893 relative_dir: &Path,
1894 sources: &mut Vec<ManifestSource>,
1895) -> Result<()> {
1896 for entry in std::fs::read_dir(dir).map_err(|source| Error::IoPath {
1897 path: dir.to_path_buf(),
1898 source,
1899 })? {
1900 let entry = entry.map_err(|source| Error::IoPath {
1901 path: dir.to_path_buf(),
1902 source,
1903 })?;
1904 let path = entry.path();
1905 let metadata = std::fs::symlink_metadata(&path).map_err(|source| Error::IoPath {
1906 path: path.clone(),
1907 source,
1908 })?;
1909 if metadata.file_type().is_symlink() {
1910 return Err(Error::InvalidInput(format!(
1911 "{} is a symlink; symlink transfers are not supported",
1912 display_path(&path)
1913 )));
1914 }
1915
1916 let relative_path = relative_dir.join(entry.file_name());
1917 if metadata.is_dir() {
1918 sources.push(ManifestSource {
1919 source_path: path.clone(),
1920 relative_path: relative_path.clone(),
1921 });
1922 collect_dir_sources(&path, &relative_path, sources)?;
1923 } else if metadata.is_file() {
1924 sources.push(ManifestSource {
1925 source_path: path,
1926 relative_path,
1927 });
1928 }
1929 }
1930
1931 Ok(())
1932}
1933
1934fn manifest_source_map(paths: &[PathBuf]) -> Result<BTreeMap<PathBuf, PathBuf>> {
1935 Ok(collect_manifest_sources(paths)?
1936 .into_iter()
1937 .filter_map(|source| {
1938 let metadata = std::fs::metadata(&source.source_path).ok()?;
1939 metadata
1940 .is_file()
1941 .then_some((source.relative_path, source.source_path))
1942 })
1943 .collect())
1944}
1945
1946async fn build_manifest_entry(source: ManifestSource) -> Result<ManifestEntry> {
1947 validate_relative_transfer_path(&source.relative_path)?;
1948 let metadata = tokio::fs::metadata(&source.source_path)
1949 .await
1950 .map_err(|source_error| Error::IoPath {
1951 path: source.source_path.clone(),
1952 source: source_error,
1953 })?;
1954 let unix_mode = unix_mode(&metadata);
1955 let modified_unix_ms = modified_unix_ms(&metadata);
1956
1957 if metadata.is_dir() {
1958 return Ok(ManifestEntry {
1959 relative_path: source.relative_path,
1960 kind: ManifestEntryKind::Directory,
1961 size: 0,
1962 blake3: None,
1963 chunks: Vec::new(),
1964 unix_mode,
1965 modified_unix_ms,
1966 });
1967 }
1968
1969 let (blake3, chunks) = hash_file_with_chunks(&source.source_path, DEFAULT_CHUNK_SIZE).await?;
1970 Ok(ManifestEntry {
1971 relative_path: source.relative_path,
1972 kind: ManifestEntryKind::File,
1973 size: metadata.len(),
1974 blake3: Some(blake3),
1975 chunks,
1976 unix_mode,
1977 modified_unix_ms,
1978 })
1979}
1980
1981fn validate_manifest(manifest: &TransferManifest) -> Result<()> {
1982 if manifest.version != MANIFEST_VERSION {
1983 return Err(Error::Protocol(format!(
1984 "unsupported manifest version {}",
1985 manifest.version
1986 )));
1987 }
1988 if manifest.chunk_size == 0 {
1989 return Err(Error::Protocol("manifest chunk size is zero".to_string()));
1990 }
1991
1992 let mut seen = BTreeSet::new();
1993 for entry in &manifest.entries {
1994 validate_relative_transfer_path(&entry.relative_path)?;
1995 if !seen.insert(entry.relative_path.clone()) {
1996 return Err(Error::Protocol(format!(
1997 "duplicate manifest path: {}",
1998 display_path(&entry.relative_path)
1999 )));
2000 }
2001 if entry.kind == ManifestEntryKind::File && entry.blake3.is_none() {
2002 return Err(Error::Protocol(format!(
2003 "file manifest entry is missing BLAKE3: {}",
2004 display_path(&entry.relative_path)
2005 )));
2006 }
2007 }
2008
2009 Ok(())
2010}
2011
2012fn validate_relative_transfer_path(path: &Path) -> Result<()> {
2013 if path.as_os_str().is_empty() || path.is_absolute() {
2014 return Err(Error::InvalidInput(format!(
2015 "unsafe transfer path: {}",
2016 display_path(path)
2017 )));
2018 }
2019
2020 let mut normal_components = 0usize;
2021 for component in path.components() {
2022 match component {
2023 Component::Normal(value) => {
2024 let Some(name) = value.to_str() else {
2025 return Err(Error::InvalidInput(format!(
2026 "transfer path must be UTF-8: {}",
2027 display_path(path)
2028 )));
2029 };
2030 validate_path_component(name, path)?;
2031 normal_components += 1;
2032 }
2033 Component::CurDir
2034 | Component::ParentDir
2035 | Component::RootDir
2036 | Component::Prefix(_) => {
2037 return Err(Error::InvalidInput(format!(
2038 "unsafe transfer path: {}",
2039 display_path(path)
2040 )));
2041 }
2042 }
2043 }
2044
2045 if normal_components == 0 {
2046 return Err(Error::InvalidInput(format!(
2047 "unsafe transfer path: {}",
2048 display_path(path)
2049 )));
2050 }
2051
2052 Ok(())
2053}
2054
2055fn validate_path_component(component: &str, full_path: &Path) -> Result<()> {
2056 if component.is_empty()
2057 || component.contains('\\')
2058 || component.contains('/')
2059 || component.contains(':')
2060 || component.ends_with(' ')
2061 || component.ends_with('.')
2062 || is_windows_reserved_name(component)
2063 {
2064 return Err(Error::InvalidInput(format!(
2065 "unsafe transfer path: {}",
2066 display_path(full_path)
2067 )));
2068 }
2069
2070 Ok(())
2071}
2072
2073fn is_windows_reserved_name(component: &str) -> bool {
2074 let stem = component
2075 .split('.')
2076 .next()
2077 .unwrap_or(component)
2078 .trim_end_matches([' ', '.'])
2079 .to_ascii_uppercase();
2080 matches!(
2081 stem.as_str(),
2082 "CON"
2083 | "PRN"
2084 | "AUX"
2085 | "NUL"
2086 | "COM1"
2087 | "COM2"
2088 | "COM3"
2089 | "COM4"
2090 | "COM5"
2091 | "COM6"
2092 | "COM7"
2093 | "COM8"
2094 | "COM9"
2095 | "LPT1"
2096 | "LPT2"
2097 | "LPT3"
2098 | "LPT4"
2099 | "LPT5"
2100 | "LPT6"
2101 | "LPT7"
2102 | "LPT8"
2103 | "LPT9"
2104 )
2105}
2106
2107async fn hash_file(path: &Path) -> Result<String> {
2108 hash_file_with_chunks(path, DEFAULT_CHUNK_SIZE)
2109 .await
2110 .map(|(hash, _)| hash)
2111}
2112
2113async fn hash_file_with_chunks(path: &Path, chunk_size: u64) -> Result<(String, Vec<String>)> {
2114 let mut file = tokio::fs::File::open(path)
2115 .await
2116 .map_err(|source| Error::IoPath {
2117 path: path.to_path_buf(),
2118 source,
2119 })?;
2120 let mut hasher = blake3::Hasher::new();
2121 let mut chunk_hasher = blake3::Hasher::new();
2122 let mut chunk_bytes = 0u64;
2123 let mut chunks = Vec::new();
2124 let mut buffer = vec![0; IO_BUFFER_SIZE];
2125
2126 loop {
2127 let read = file
2128 .read(&mut buffer)
2129 .await
2130 .map_err(|source| Error::IoPath {
2131 path: path.to_path_buf(),
2132 source,
2133 })?;
2134 if read == 0 {
2135 break;
2136 }
2137 hasher.update(&buffer[..read]);
2138
2139 let mut remaining = &buffer[..read];
2140 while !remaining.is_empty() {
2141 let take = remaining.len().min((chunk_size - chunk_bytes) as usize);
2142 chunk_hasher.update(&remaining[..take]);
2143 chunk_bytes += take as u64;
2144 remaining = &remaining[take..];
2145 if chunk_bytes == chunk_size {
2146 chunks.push(chunk_hasher.finalize().to_hex().to_string());
2147 chunk_hasher = blake3::Hasher::new();
2148 chunk_bytes = 0;
2149 }
2150 }
2151 }
2152
2153 if chunk_bytes > 0 {
2154 chunks.push(chunk_hasher.finalize().to_hex().to_string());
2155 }
2156
2157 Ok((hasher.finalize().to_hex().to_string(), chunks))
2158}
2159
2160async fn verified_prefix_offset(
2161 path: &Path,
2162 entry: &ManifestEntry,
2163 existing_len: u64,
2164 chunk_size: u64,
2165) -> Result<u64> {
2166 if existing_len == 0 || entry.chunks.is_empty() {
2167 return Ok(0);
2168 }
2169
2170 let chunks_to_check = (existing_len / chunk_size).min(entry.chunks.len() as u64) as usize;
2171 if chunks_to_check == 0 {
2172 return Ok(0);
2173 }
2174
2175 let mut file = tokio::fs::File::open(path)
2176 .await
2177 .map_err(|source| Error::IoPath {
2178 path: path.to_path_buf(),
2179 source,
2180 })?;
2181 let mut buffer = vec![0; IO_BUFFER_SIZE];
2182 let mut verified_chunks = 0usize;
2183
2184 for expected in entry.chunks.iter().take(chunks_to_check) {
2185 let mut remaining = chunk_size;
2186 let mut hasher = blake3::Hasher::new();
2187 while remaining > 0 {
2188 let read_size = buffer.len().min(remaining as usize);
2189 file.read_exact(&mut buffer[..read_size])
2190 .await
2191 .map_err(|source| Error::IoPath {
2192 path: path.to_path_buf(),
2193 source,
2194 })?;
2195 hasher.update(&buffer[..read_size]);
2196 remaining -= read_size as u64;
2197 }
2198
2199 if hasher.finalize().to_hex().as_str() != expected {
2200 break;
2201 }
2202 verified_chunks += 1;
2203 }
2204
2205 Ok(verified_chunks as u64 * chunk_size)
2206}
2207
2208fn summary_from_files(
2209 manifest: &TransferManifest,
2210 files: Vec<TransferFileSummary>,
2211) -> Result<TransferSummary> {
2212 let bytes = manifest.total_file_bytes();
2213 let blake3 = if files.len() == 1 {
2214 files[0].blake3.clone()
2215 } else {
2216 manifest.digest()?
2217 };
2218 let file_name = if files.len() == 1 {
2219 display_path(&files[0].relative_path)
2220 } else {
2221 format!("{} files", files.len())
2222 };
2223 let path = files
2224 .first()
2225 .map(|file| file.path.clone())
2226 .unwrap_or_default();
2227
2228 Ok(TransferSummary {
2229 path,
2230 file_name,
2231 bytes,
2232 blake3,
2233 files,
2234 })
2235}
2236
2237fn temp_path_for(final_path: &Path) -> PathBuf {
2238 let file_name = final_path
2239 .file_name()
2240 .and_then(|name| name.to_str())
2241 .unwrap_or("transfer");
2242 final_path.with_file_name(format!(".{file_name}.{}.ferry-part", Uuid::new_v4()))
2243}
2244
2245fn modified_unix_ms(metadata: &Metadata) -> Option<i128> {
2246 system_time_unix_ms(metadata.modified().ok()?)
2247}
2248
2249fn system_time_unix_ms(time: SystemTime) -> Option<i128> {
2250 match time.duration_since(UNIX_EPOCH) {
2251 Ok(duration) => Some(duration.as_millis() as i128),
2252 Err(error) => Some(-(error.duration().as_millis() as i128)),
2253 }
2254}
2255
2256#[cfg(unix)]
2257fn unix_mode(metadata: &Metadata) -> Option<u32> {
2258 use std::os::unix::fs::PermissionsExt;
2259
2260 Some(metadata.permissions().mode())
2261}
2262
2263#[cfg(not(unix))]
2264fn unix_mode(_metadata: &Metadata) -> Option<u32> {
2265 None
2266}
2267
2268fn client_config_capturing_server_fingerprint()
2269-> Result<(quinn::ClientConfig, Arc<Mutex<Option<String>>>)> {
2270 let server_fingerprint = Arc::new(Mutex::new(None));
2271 let crypto = rustls::ClientConfig::builder()
2272 .dangerous()
2273 .with_custom_certificate_verifier(ServerFingerprintVerifier::new(
2274 server_fingerprint.clone(),
2275 ))
2276 .with_no_client_auth();
2277
2278 let config = quinn::ClientConfig::new(Arc::new(
2279 QuicClientConfig::try_from(crypto)
2280 .map_err(|error| Error::CryptoConfig(error.to_string()))?,
2281 ));
2282 Ok((config, server_fingerprint))
2283}
2284
2285fn captured_server_fingerprint(fingerprint: &Arc<Mutex<Option<String>>>) -> Result<String> {
2286 fingerprint
2287 .lock()
2288 .map_err(|_| Error::CryptoConfig("server fingerprint capture lock poisoned".to_string()))?
2289 .clone()
2290 .ok_or_else(|| {
2291 Error::CryptoConfig("server certificate fingerprint was not captured".to_string())
2292 })
2293}
2294
2295#[derive(Debug)]
2296struct ServerFingerprintVerifier {
2297 provider: Arc<CryptoProvider>,
2298 server_fingerprint: Arc<Mutex<Option<String>>>,
2299}
2300
2301impl ServerFingerprintVerifier {
2302 fn new(server_fingerprint: Arc<Mutex<Option<String>>>) -> Arc<Self> {
2303 Arc::new(Self {
2304 provider: Arc::new(rustls::crypto::ring::default_provider()),
2305 server_fingerprint,
2306 })
2307 }
2308}
2309
2310impl ServerCertVerifier for ServerFingerprintVerifier {
2311 fn verify_server_cert(
2312 &self,
2313 end_entity: &CertificateDer<'_>,
2314 _intermediates: &[CertificateDer<'_>],
2315 _server_name: &ServerName<'_>,
2316 _ocsp: &[u8],
2317 _now: UnixTime,
2318 ) -> std::result::Result<ServerCertVerified, rustls::Error> {
2319 let fingerprint = blake3::hash(end_entity.as_ref()).to_hex().to_string();
2320 *self.server_fingerprint.lock().map_err(|_| {
2321 rustls::Error::General("server fingerprint capture lock poisoned".to_string())
2322 })? = Some(fingerprint);
2323 Ok(ServerCertVerified::assertion())
2324 }
2325
2326 fn verify_tls12_signature(
2327 &self,
2328 message: &[u8],
2329 cert: &CertificateDer<'_>,
2330 dss: &DigitallySignedStruct,
2331 ) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
2332 verify_tls12_signature(
2333 message,
2334 cert,
2335 dss,
2336 &self.provider.signature_verification_algorithms,
2337 )
2338 }
2339
2340 fn verify_tls13_signature(
2341 &self,
2342 message: &[u8],
2343 cert: &CertificateDer<'_>,
2344 dss: &DigitallySignedStruct,
2345 ) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
2346 verify_tls13_signature(
2347 message,
2348 cert,
2349 dss,
2350 &self.provider.signature_verification_algorithms,
2351 )
2352 }
2353
2354 fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
2355 self.provider
2356 .signature_verification_algorithms
2357 .supported_schemes()
2358 }
2359}
2360
2361fn ensure_rustls_provider() {
2362 let _ = rustls::crypto::ring::default_provider().install_default();
2363}
2364
2365pub fn config_dir() -> Result<PathBuf> {
2366 let base_dirs = BaseDirs::new().ok_or(Error::ConfigDirUnavailable)?;
2367 Ok(base_dirs.config_dir().join(CONFIG_DIR_NAME))
2368}
2369
2370fn default_alias() -> String {
2371 std::env::var("HOSTNAME")
2372 .or_else(|_| std::env::var("COMPUTERNAME"))
2373 .ok()
2374 .filter(|value| !value.trim().is_empty())
2375 .unwrap_or_else(|| "fileferry-device".to_string())
2376}
2377
2378fn expand_home_path(path: &str) -> Result<PathBuf> {
2379 if path == "~" {
2380 return Ok(BaseDirs::new()
2381 .ok_or(Error::ConfigDirUnavailable)?
2382 .home_dir()
2383 .to_path_buf());
2384 }
2385
2386 if let Some(rest) = path.strip_prefix("~/") {
2387 return Ok(BaseDirs::new()
2388 .ok_or(Error::ConfigDirUnavailable)?
2389 .home_dir()
2390 .join(rest));
2391 }
2392
2393 Ok(PathBuf::from(path))
2394}
2395
2396fn normalized_fingerprint(fingerprint: String) -> Result<String> {
2397 let fingerprint = fingerprint.trim().to_ascii_lowercase();
2398 if fingerprint.len() != 64 || !fingerprint.chars().all(|char| char.is_ascii_hexdigit()) {
2399 return Err(Error::InvalidInput(
2400 "peer fingerprint must be a full 64-character hex BLAKE3 fingerprint".to_string(),
2401 ));
2402 }
2403
2404 Ok(fingerprint)
2405}
2406
2407fn write_toml_file<T: Serialize>(path: &Path, value: &T) -> Result<()> {
2408 let bytes =
2409 toml::to_string_pretty(value).map_err(|error| Error::ConfigSerialize(error.to_string()))?;
2410 if let Some(parent) = path.parent() {
2411 std::fs::create_dir_all(parent).map_err(|source| Error::IoPath {
2412 path: parent.to_path_buf(),
2413 source,
2414 })?;
2415 }
2416
2417 let temp_path = path.with_extension(format!("tmp-{}", Uuid::new_v4()));
2418 std::fs::write(&temp_path, bytes).map_err(|source| Error::IoPath {
2419 path: temp_path.clone(),
2420 source,
2421 })?;
2422 std::fs::rename(&temp_path, path).map_err(|source| Error::IoPath {
2423 path: path.to_path_buf(),
2424 source,
2425 })?;
2426 Ok(())
2427}
2428
2429fn write_private_file(path: &Path, bytes: &[u8]) -> Result<()> {
2430 std::fs::write(path, bytes).map_err(|source| Error::IoPath {
2431 path: path.to_path_buf(),
2432 source,
2433 })?;
2434
2435 #[cfg(unix)]
2436 {
2437 use std::os::unix::fs::PermissionsExt;
2438
2439 let mut permissions = std::fs::metadata(path)
2440 .map_err(|source| Error::IoPath {
2441 path: path.to_path_buf(),
2442 source,
2443 })?
2444 .permissions();
2445 permissions.set_mode(0o600);
2446 std::fs::set_permissions(path, permissions).map_err(|source| Error::IoPath {
2447 path: path.to_path_buf(),
2448 source,
2449 })?;
2450 }
2451
2452 Ok(())
2453}
2454
2455#[cfg(test)]
2456mod tests {
2457 use super::*;
2458 use std::sync::{Arc, Mutex};
2459
2460 #[test]
2461 fn direct_peer_parses_socket_address() {
2462 let peer = DirectPeer::parse("127.0.0.1:53318").expect("address parses");
2463
2464 assert_eq!(peer.address().to_string(), "127.0.0.1:53318");
2465 assert_eq!(peer.expected_fingerprint(), None);
2466 }
2467
2468 #[test]
2469 fn direct_peer_accepts_expected_fingerprint() {
2470 let fingerprint = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
2471 let peer = DirectPeer::parse("127.0.0.1:53318")
2472 .expect("address parses")
2473 .with_expected_fingerprint(fingerprint)
2474 .expect("fingerprint parses");
2475
2476 assert_eq!(peer.expected_fingerprint(), Some(fingerprint));
2477 }
2478
2479 #[test]
2480 fn direct_peer_rejects_missing_port() {
2481 assert!(DirectPeer::parse("127.0.0.1").is_err());
2482 }
2483
2484 #[test]
2485 fn send_request_requires_at_least_one_path() {
2486 let peer = DirectPeer::parse("127.0.0.1:53318").expect("address parses");
2487
2488 assert!(SendRequest::new(peer, Vec::new()).is_err());
2489 }
2490
2491 #[test]
2492 fn validate_send_paths_rejects_empty_slice() {
2493 assert!(validate_send_paths(&[]).is_err());
2494 }
2495
2496 #[tokio::test]
2497 async fn manifest_walks_directory_and_serializes_entries() {
2498 let temp = tempfile::tempdir().expect("tempdir");
2499 let root = temp.path().join("bundle");
2500 let nested = root.join("nested");
2501 tokio::fs::create_dir_all(&nested).await.expect("dirs");
2502 tokio::fs::write(root.join("a.txt"), b"alpha")
2503 .await
2504 .expect("file a");
2505 tokio::fs::write(nested.join("b.txt"), b"beta")
2506 .await
2507 .expect("file b");
2508
2509 let manifest = build_manifest(&[root]).await.expect("manifest");
2510 let json = serde_json::to_string(&manifest).expect("manifest serializes");
2511 let decoded: TransferManifest = serde_json::from_str(&json).expect("manifest decodes");
2512
2513 assert_eq!(decoded.version, MANIFEST_VERSION);
2514 assert!(decoded.entries.iter().any(|entry| {
2515 entry.kind == ManifestEntryKind::Directory
2516 && entry.relative_path == Path::new("bundle/nested")
2517 }));
2518 assert!(decoded.entries.iter().any(|entry| {
2519 entry.kind == ManifestEntryKind::File
2520 && entry.relative_path == Path::new("bundle/nested/b.txt")
2521 && entry.size == 4
2522 && entry.blake3.is_some()
2523 }));
2524 }
2525
2526 #[test]
2527 fn safe_destination_path_rejects_unsafe_paths() {
2528 let dest = Path::new("/tmp/ferry-dest");
2529
2530 assert!(safe_destination_path(dest, Path::new("../escape.txt")).is_err());
2531 assert!(safe_destination_path(dest, Path::new("/absolute.txt")).is_err());
2532 assert!(safe_destination_path(dest, Path::new("nested/CON")).is_err());
2533 assert!(safe_destination_path(dest, Path::new("nested\\escape.txt")).is_err());
2534 assert!(safe_destination_path(dest, Path::new("nested/ok.txt")).is_ok());
2535 }
2536
2537 #[test]
2538 fn safe_destination_path_rejects_generated_escape_and_reserved_patterns() {
2539 let dest = Path::new("/tmp/ferry-dest");
2540 let unsafe_components = [
2541 "..",
2542 ".",
2543 "CON",
2544 "con.txt",
2545 "NUL",
2546 "COM1",
2547 "LPT9",
2548 "trailingspace ",
2549 "trailingdot.",
2550 "has:colon",
2551 "has\\backslash",
2552 ];
2553
2554 for component in unsafe_components {
2555 assert!(
2556 safe_destination_path(dest, Path::new(component)).is_err(),
2557 "{component} should be rejected as a top-level path"
2558 );
2559 if component != "." {
2560 assert!(
2561 safe_destination_path(dest, Path::new("safe").join(component).as_path())
2562 .is_err(),
2563 "{component} should be rejected as a nested path"
2564 );
2565 }
2566 }
2567
2568 for component in ["alpha", "alpha-1", "alpha_1", "report.final.txt"] {
2569 let path = safe_destination_path(dest, Path::new("safe").join(component).as_path())
2570 .expect("safe generated path accepted");
2571 assert!(path.starts_with(dest));
2572 }
2573 }
2574
2575 #[tokio::test]
2576 async fn resume_decision_skips_existing_matching_file() {
2577 let temp = tempfile::tempdir().expect("tempdir");
2578 let source = temp.path().join("source.txt");
2579 let dest = temp.path().join("dest");
2580 tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2581 tokio::fs::write(&source, b"same contents")
2582 .await
2583 .expect("source");
2584 tokio::fs::write(dest.join("source.txt"), b"same contents")
2585 .await
2586 .expect("dest file");
2587 let manifest = build_manifest(&[source]).await.expect("manifest");
2588 let entry = manifest.file_entries().next().expect("file entry");
2589
2590 let decision = decide_resume(&dest, entry, manifest.chunk_size)
2591 .await
2592 .expect("decision");
2593
2594 assert_eq!(decision, ResumeDecision::Skip);
2595 }
2596
2597 #[tokio::test]
2598 async fn resume_decision_returns_verified_chunk_offset() {
2599 let temp = tempfile::tempdir().expect("tempdir");
2600 let source = temp.path().join("large.bin");
2601 let dest = temp.path().join("dest");
2602 tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2603 let bytes = vec![42; (DEFAULT_CHUNK_SIZE as usize * 2) + 128];
2604 tokio::fs::write(&source, &bytes).await.expect("source");
2605 tokio::fs::write(
2606 dest.join("large.bin"),
2607 &bytes[..DEFAULT_CHUNK_SIZE as usize + 99],
2608 )
2609 .await
2610 .expect("partial");
2611 let manifest = build_manifest(&[source]).await.expect("manifest");
2612 let entry = manifest.file_entries().next().expect("file entry");
2613
2614 let decision = decide_resume(&dest, entry, manifest.chunk_size)
2615 .await
2616 .expect("decision");
2617
2618 assert_eq!(decision, ResumeDecision::ResumeFrom(DEFAULT_CHUNK_SIZE));
2619 }
2620
2621 #[tokio::test]
2622 async fn resume_decision_rejects_existing_file_with_mismatched_contents() {
2623 let temp = tempfile::tempdir().expect("tempdir");
2624 let source = temp.path().join("source.txt");
2625 let dest = temp.path().join("dest");
2626 tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2627 tokio::fs::write(&source, b"expected contents")
2628 .await
2629 .expect("source");
2630 tokio::fs::write(dest.join("source.txt"), b"different contents")
2631 .await
2632 .expect("conflicting dest file");
2633 let manifest = build_manifest(&[source]).await.expect("manifest");
2634 let entry = manifest.file_entries().next().expect("file entry");
2635
2636 let decision = decide_resume(&dest, entry, manifest.chunk_size)
2637 .await
2638 .expect("decision");
2639
2640 assert!(matches!(decision, ResumeDecision::Conflict(_)));
2641 }
2642
2643 #[tokio::test]
2644 async fn resume_decision_rejects_existing_file_larger_than_manifest_entry() {
2645 let temp = tempfile::tempdir().expect("tempdir");
2646 let source = temp.path().join("source.txt");
2647 let dest = temp.path().join("dest");
2648 tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2649 tokio::fs::write(&source, b"small").await.expect("source");
2650 tokio::fs::write(dest.join("source.txt"), b"larger destination")
2651 .await
2652 .expect("larger dest file");
2653 let manifest = build_manifest(&[source]).await.expect("manifest");
2654 let entry = manifest.file_entries().next().expect("file entry");
2655
2656 let decision = decide_resume(&dest, entry, manifest.chunk_size)
2657 .await
2658 .expect("decision");
2659
2660 assert!(matches!(decision, ResumeDecision::Conflict(_)));
2661 }
2662
2663 #[test]
2664 fn identity_is_loaded_after_generation() {
2665 let temp = tempfile::tempdir().expect("tempdir");
2666 let generated =
2667 NativeIdentity::load_or_generate_in(temp.path()).expect("identity generated");
2668 let loaded = NativeIdentity::load_or_generate_in(temp.path()).expect("identity loaded");
2669
2670 assert_eq!(generated.fingerprint(), loaded.fingerprint());
2671 }
2672
2673 #[test]
2674 fn app_config_round_trips_toml() {
2675 let temp = tempfile::tempdir().expect("tempdir");
2676 let path = temp.path().join("config.toml");
2677 let config = AppConfig {
2678 alias: "desk".to_string(),
2679 ..AppConfig::default()
2680 };
2681
2682 config.save_to_path(&path).expect("config saved");
2683 let loaded = AppConfig::load_or_default_from(&path).expect("config loaded");
2684
2685 assert_eq!(loaded.alias, "desk");
2686 assert_eq!(loaded.listen_port, 53317);
2687 assert!(loaded.trust.require_fingerprint);
2688 }
2689
2690 #[test]
2691 fn app_config_redacts_psk_for_display_output() {
2692 let config = AppConfig {
2693 trust: TrustConfig {
2694 psk: "super-secret-psk".to_string(),
2695 ..TrustConfig::default()
2696 },
2697 ..AppConfig::default()
2698 };
2699
2700 let redacted = config.to_redacted_toml_string().expect("config serializes");
2701
2702 assert!(!redacted.contains("super-secret-psk"));
2703 assert!(redacted.contains(r#"psk = "<redacted>""#));
2704 }
2705
2706 #[test]
2707 fn daemon_config_defaults_from_app_config_and_round_trips_toml() {
2708 let temp = tempfile::tempdir().expect("tempdir");
2709 let path = temp.path().join("daemon.toml");
2710 let app_config = AppConfig {
2711 quic_port: 54444,
2712 download_dir: temp.path().join("downloads").display().to_string(),
2713 ..AppConfig::default()
2714 };
2715
2716 let defaulted =
2717 DaemonConfig::load_or_default_from(&path, &app_config).expect("daemon config");
2718 assert_eq!(
2719 defaulted.listen,
2720 SocketAddr::from(([0, 0, 0, 0], app_config.quic_port))
2721 );
2722 assert_eq!(defaulted.destination, temp.path().join("downloads"));
2723
2724 let config = DaemonConfig {
2725 listen: SocketAddr::from(([127, 0, 0, 1], 53318)),
2726 destination: temp.path().join("daemon-dest"),
2727 };
2728 config.save_to_path(&path).expect("daemon config saved");
2729 let loaded =
2730 DaemonConfig::load_or_default_from(&path, &app_config).expect("daemon config loaded");
2731
2732 assert_eq!(loaded, config);
2733 }
2734
2735 #[test]
2736 fn trust_store_load_save_and_forget_round_trip() {
2737 let temp = tempfile::tempdir().expect("tempdir");
2738 let path = temp.path().join("known_peers.toml");
2739 let fingerprint = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
2740 let mut store = TrustStore::default();
2741
2742 store
2743 .trust_fingerprint(fingerprint)
2744 .expect("fingerprint trusted");
2745 store.save_to_path(&path).expect("trust store saved");
2746 let mut loaded = TrustStore::load_or_default_from(&path).expect("trust store loaded");
2747
2748 assert_eq!(loaded.records().count(), 1);
2749 assert_eq!(loaded.peers[fingerprint].trust_state, TrustState::Trusted);
2750 assert!(loaded.forget(fingerprint));
2751 assert_eq!(loaded.records().count(), 0);
2752 }
2753
2754 #[test]
2755 fn trust_store_rejects_short_fingerprint_without_discovery_lookup() {
2756 let mut store = TrustStore::default();
2757
2758 assert!(store.trust_fingerprint("9a4f2c").is_err());
2759 }
2760
2761 #[test]
2762 fn transfer_events_serialize_with_stable_event_names() {
2763 let event = TransferEvent::Progress {
2764 direction: TransferDirection::Send,
2765 file_name: "bundle/a.txt".to_string(),
2766 bytes_done: 5,
2767 bytes_total: 9,
2768 };
2769
2770 let json = serde_json::to_value(&event).expect("event serializes");
2771
2772 assert_eq!(json["event"], "progress");
2773 assert_eq!(json["direction"], "send");
2774 assert_eq!(json["file_name"], "bundle/a.txt");
2775 assert_eq!(json["bytes_done"], 5);
2776 assert_eq!(json["bytes_total"], 9);
2777 }
2778
2779 #[test]
2780 fn direct_protocol_hello_has_stable_golden_json() {
2781 let hello = DirectProtocolHello {
2782 protocol_version: 1,
2783 min_protocol_version: 1,
2784 client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2785 .to_string(),
2786 };
2787
2788 let json = serde_json::to_string(&hello).expect("hello serializes");
2789
2790 assert_eq!(
2791 json,
2792 r#"{"protocol_version":1,"min_protocol_version":1,"client_fingerprint":"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"}"#
2793 );
2794 }
2795
2796 #[test]
2797 fn direct_protocol_response_has_stable_golden_json() {
2798 let response = DirectProtocolResponse::accept(1);
2799
2800 let json = serde_json::to_string(&response).expect("response serializes");
2801
2802 assert_eq!(
2803 json,
2804 r#"{"accepted":true,"protocol_version":1,"error":null}"#
2805 );
2806 }
2807
2808 #[test]
2809 fn direct_protocol_negotiates_supported_overlap() {
2810 let hello = DirectProtocolHello {
2811 protocol_version: 3,
2812 min_protocol_version: 1,
2813 client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2814 .to_string(),
2815 };
2816
2817 let response = negotiate_direct_protocol(&hello);
2818
2819 assert_eq!(response, DirectProtocolResponse::accept(1));
2820 }
2821
2822 #[test]
2823 fn direct_protocol_rejects_incompatible_versions() {
2824 let too_new = DirectProtocolHello {
2825 protocol_version: 3,
2826 min_protocol_version: 2,
2827 client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2828 .to_string(),
2829 };
2830 let too_old = DirectProtocolHello {
2831 protocol_version: 0,
2832 min_protocol_version: 0,
2833 client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2834 .to_string(),
2835 };
2836
2837 assert!(!negotiate_direct_protocol(&too_new).accepted);
2838 assert!(!negotiate_direct_protocol(&too_old).accepted);
2839 }
2840
2841 #[test]
2842 fn transfer_manifest_has_stable_golden_json() {
2843 let manifest = TransferManifest {
2844 version: MANIFEST_VERSION,
2845 session_id: Uuid::parse_str("00000000-0000-0000-0000-000000000001")
2846 .expect("uuid parses"),
2847 chunk_size: DEFAULT_CHUNK_SIZE,
2848 entries: vec![ManifestEntry {
2849 relative_path: PathBuf::from("bundle/a.txt"),
2850 kind: ManifestEntryKind::File,
2851 size: 5,
2852 blake3: Some(
2853 "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef".to_string(),
2854 ),
2855 chunks: vec![
2856 "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789".to_string(),
2857 ],
2858 unix_mode: Some(0o100644),
2859 modified_unix_ms: Some(1_700_000_000_000),
2860 }],
2861 };
2862
2863 let json = serde_json::to_string(&manifest).expect("manifest serializes");
2864
2865 assert_eq!(
2866 json,
2867 r#"{"version":1,"session_id":"00000000-0000-0000-0000-000000000001","chunk_size":1048576,"entries":[{"relative_path":"bundle/a.txt","kind":"file","size":5,"blake3":"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef","chunks":["abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789"],"unix_mode":33188,"modified_unix_ms":1700000000000}]}"#
2868 );
2869 }
2870
2871 #[test]
2872 fn peer_registry_merges_observations_by_fingerprint() {
2873 let mut registry = PeerRegistry::new();
2874 let fingerprint = "abcdef1234567890";
2875 let first = PeerObservation::new(
2876 fingerprint,
2877 SocketAddr::from(([192, 168, 1, 10], 53318)),
2878 100,
2879 )
2880 .with_alias("desk")
2881 .with_hostname("desk.local")
2882 .with_transport("quic");
2883 let second = PeerObservation::new(
2884 fingerprint,
2885 SocketAddr::from(([192, 168, 1, 11], 53318)),
2886 200,
2887 )
2888 .with_alias("workstation")
2889 .with_transport("quic");
2890
2891 registry.observe(first).expect("first observation");
2892 registry.observe(second).expect("second observation");
2893 let record = registry
2894 .get_by_fingerprint(fingerprint)
2895 .expect("record by fingerprint");
2896
2897 assert_eq!(registry.records().count(), 1);
2898 assert!(record.aliases.contains("desk"));
2899 assert!(record.aliases.contains("workstation"));
2900 assert!(record.hostnames.contains("desk.local"));
2901 assert_eq!(record.addresses.len(), 2);
2902 assert_eq!(record.first_seen_unix_ms, 100);
2903 assert_eq!(record.last_seen_unix_ms, 200);
2904 }
2905
2906 #[test]
2907 fn peer_registry_looks_up_unique_fingerprint_prefix_alias_and_hostname() {
2908 let mut registry = PeerRegistry::new();
2909 registry
2910 .observe(
2911 PeerObservation::new(
2912 "abcdef1234567890",
2913 SocketAddr::from(([192, 168, 1, 10], 53318)),
2914 100,
2915 )
2916 .with_alias("desk")
2917 .with_hostname("desk.local"),
2918 )
2919 .expect("first observation");
2920 registry
2921 .observe(
2922 PeerObservation::new(
2923 "123456abcdef7890",
2924 SocketAddr::from(([192, 168, 1, 20], 53318)),
2925 100,
2926 )
2927 .with_alias("laptop"),
2928 )
2929 .expect("second observation");
2930
2931 assert_eq!(
2932 registry
2933 .lookup("abcdef")
2934 .map(|record| record.fingerprint.as_str()),
2935 Some("abcdef1234567890")
2936 );
2937 assert_eq!(
2938 registry
2939 .lookup("desk")
2940 .map(|record| record.fingerprint.as_str()),
2941 Some("abcdef1234567890")
2942 );
2943 assert_eq!(
2944 registry
2945 .lookup("desk.local")
2946 .map(|record| record.fingerprint.as_str()),
2947 Some("abcdef1234567890")
2948 );
2949 assert!(registry.lookup("missing").is_none());
2950 }
2951
2952 #[test]
2953 fn peer_registry_rejects_ambiguous_lookup_hints() {
2954 let mut registry = PeerRegistry::new();
2955 registry
2956 .observe(
2957 PeerObservation::new(
2958 "abcdef1234567890",
2959 SocketAddr::from(([192, 168, 1, 10], 53318)),
2960 100,
2961 )
2962 .with_alias("desk"),
2963 )
2964 .expect("first observation");
2965 registry
2966 .observe(
2967 PeerObservation::new(
2968 "abcdef9999999999",
2969 SocketAddr::from(([192, 168, 1, 11], 53318)),
2970 100,
2971 )
2972 .with_alias("desk"),
2973 )
2974 .expect("second observation");
2975
2976 assert!(registry.lookup("abcdef").is_none());
2977 assert!(registry.lookup("desk").is_none());
2978 assert_eq!(
2979 registry
2980 .lookup("abcdef1234567890")
2981 .map(|record| record.fingerprint.as_str()),
2982 Some("abcdef1234567890")
2983 );
2984 match registry.lookup_detail("desk") {
2985 PeerLookup::Ambiguous(records) => assert_eq!(records.len(), 2),
2986 other => panic!("expected ambiguous duplicate-alias lookup, got {other:?}"),
2987 }
2988 assert!(matches!(
2989 registry.lookup_detail("missing"),
2990 PeerLookup::Missing
2991 ));
2992 }
2993
2994 #[test]
2995 fn native_announcement_builds_expected_txt_properties() {
2996 let announcement = NativeAnnouncement {
2997 alias: "Stephen MBP".to_string(),
2998 fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2999 .to_string(),
3000 quic_port: 53318,
3001 listen_port: Some(53317),
3002 };
3003
3004 let service = announcement.service_info().expect("service info");
3005
3006 assert_eq!(service.get_type(), NATIVE_SERVICE_TYPE);
3007 assert!(
3008 service
3009 .get_fullname()
3010 .starts_with("stephen-mbp-0123456789ab.")
3011 );
3012 assert_eq!(service.get_property_val_str("pv"), Some("1"));
3013 assert_eq!(service.get_property_val_str("minpv"), Some("1"));
3014 assert_eq!(
3015 service.get_property_val_str("fp"),
3016 Some("0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")
3017 );
3018 assert_eq!(service.get_property_val_str("alias"), Some("Stephen MBP"));
3019 assert_eq!(service.get_property_val_str("transports"), Some("quic"));
3020 assert_eq!(service.get_property_val_str("quic_port"), Some("53318"));
3021 assert_eq!(service.get_property_val_str("listen_port"), Some("53317"));
3022 }
3023
3024 #[test]
3025 fn resolved_native_service_merges_into_registry() {
3026 let properties = [
3027 ("pv", "1"),
3028 ("minpv", "1"),
3029 (
3030 "fp",
3031 "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
3032 ),
3033 ("alias", "desk"),
3034 ("transports", "quic"),
3035 ("quic_port", "53318"),
3036 ];
3037 let service = ServiceInfo::new(
3038 NATIVE_SERVICE_TYPE,
3039 "desk-0123456789ab",
3040 "desk.local.",
3041 "192.168.1.42",
3042 53318,
3043 &properties[..],
3044 )
3045 .expect("service info")
3046 .as_resolved_service();
3047 let mut registry = PeerRegistry::new();
3048
3049 observe_resolved_service(&mut registry, &service).expect("observation");
3050
3051 let record = registry.lookup("desk").expect("record by alias");
3052 assert_eq!(
3053 record.fingerprint,
3054 "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3055 );
3056 assert_eq!(
3057 record.preferred_quic_address(),
3058 Some(SocketAddr::from(([192, 168, 1, 42], 53318)))
3059 );
3060 }
3061
3062 #[test]
3063 fn resolved_native_service_ignores_incompatible_protocol_ranges() {
3064 let properties = [
3065 ("pv", "3"),
3066 ("minpv", "2"),
3067 (
3068 "fp",
3069 "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
3070 ),
3071 ("alias", "desk"),
3072 ("transports", "quic"),
3073 ("quic_port", "53318"),
3074 ];
3075 let service = ServiceInfo::new(
3076 NATIVE_SERVICE_TYPE,
3077 "desk-0123456789ab",
3078 "desk.local.",
3079 "192.168.1.42",
3080 53318,
3081 &properties[..],
3082 )
3083 .expect("service info")
3084 .as_resolved_service();
3085 let mut registry = PeerRegistry::new();
3086
3087 observe_resolved_service(&mut registry, &service).expect("observation");
3088
3089 assert_eq!(registry.records().count(), 0);
3090 }
3091
3092 #[tokio::test]
3093 async fn direct_quic_loopback_sends_one_file() {
3094 let source_dir = tempfile::tempdir().expect("source tempdir");
3095 let dest_dir = tempfile::tempdir().expect("dest tempdir");
3096 let identity_dir = tempfile::tempdir().expect("identity tempdir");
3097 let file_path = source_dir.path().join("hello.txt");
3098 tokio::fs::write(&file_path, b"hello from ferry")
3099 .await
3100 .expect("source file written");
3101
3102 let identity =
3103 NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3104 let recv_identity = identity.clone();
3105 let reserved_socket =
3106 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3107 let recv_addr = reserved_socket.local_addr().expect("local addr");
3108 drop(reserved_socket);
3109 let receive_progress = Arc::new(Mutex::new(Vec::new()));
3110 let receive_progress_log = receive_progress.clone();
3111 let dest_path = dest_dir.path().to_path_buf();
3112 let server = tokio::spawn(async move {
3113 receive_direct_file(
3114 &ReceiveRequest::new(recv_addr),
3115 dest_path,
3116 &recv_identity,
3117 |event| {
3118 receive_progress_log
3119 .lock()
3120 .expect("progress lock")
3121 .push(event);
3122 },
3123 )
3124 .await
3125 });
3126
3127 tokio::time::sleep(Duration::from_millis(100)).await;
3128
3129 let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3130 let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
3131 let send_progress = Arc::new(Mutex::new(Vec::new()));
3132 let send_progress_log = send_progress.clone();
3133 let send_summary = send_direct_file(&send_request, &identity, |event| {
3134 send_progress_log.lock().expect("progress lock").push(event);
3135 })
3136 .await
3137 .expect("file sent");
3138 let receive_summary = server.await.expect("server joined").expect("file received");
3139
3140 let received = tokio::fs::read(dest_dir.path().join("hello.txt"))
3141 .await
3142 .expect("received file readable");
3143 assert_eq!(received, b"hello from ferry");
3144 assert_eq!(send_summary.bytes, receive_summary.bytes);
3145 assert_eq!(send_summary.blake3, receive_summary.blake3);
3146 let send_progress = send_progress.lock().expect("progress lock");
3147 let receive_progress = receive_progress.lock().expect("progress lock");
3148 assert!(matches!(
3149 send_progress.first(),
3150 Some(TransferEvent::SessionStarted {
3151 direction: TransferDirection::Send,
3152 ..
3153 })
3154 ));
3155 assert!(
3156 send_progress
3157 .iter()
3158 .any(|event| matches!(event, TransferEvent::Progress { .. }))
3159 );
3160 assert!(
3161 send_progress
3162 .iter()
3163 .any(|event| matches!(event, TransferEvent::SessionFinished { .. }))
3164 );
3165 assert!(matches!(
3166 receive_progress.first(),
3167 Some(TransferEvent::SessionStarted {
3168 direction: TransferDirection::Receive,
3169 ..
3170 })
3171 ));
3172 assert!(
3173 receive_progress
3174 .iter()
3175 .any(|event| matches!(event, TransferEvent::Progress { .. }))
3176 );
3177 assert!(
3178 receive_progress
3179 .iter()
3180 .any(|event| matches!(event, TransferEvent::SessionFinished { .. }))
3181 );
3182 }
3183
3184 #[tokio::test]
3185 async fn direct_transfer_rejects_unexpected_receiver_fingerprint_before_payload() {
3186 let source_dir = tempfile::tempdir().expect("source tempdir");
3187 let dest_dir = tempfile::tempdir().expect("dest tempdir");
3188 let sender_identity_dir = tempfile::tempdir().expect("sender identity tempdir");
3189 let receiver_identity_dir = tempfile::tempdir().expect("receiver identity tempdir");
3190 let other_identity_dir = tempfile::tempdir().expect("other identity tempdir");
3191 let file_path = source_dir.path().join("hello.txt");
3192 tokio::fs::write(&file_path, b"hello from ferry")
3193 .await
3194 .expect("source file written");
3195
3196 let sender_identity = NativeIdentity::load_or_generate_in(sender_identity_dir.path())
3197 .expect("sender identity");
3198 let receiver_identity = NativeIdentity::load_or_generate_in(receiver_identity_dir.path())
3199 .expect("receiver identity");
3200 let other_identity =
3201 NativeIdentity::load_or_generate_in(other_identity_dir.path()).expect("other identity");
3202 let reserved_socket =
3203 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3204 let recv_addr = reserved_socket.local_addr().expect("local addr");
3205 drop(reserved_socket);
3206 let dest_path = dest_dir.path().to_path_buf();
3207 let server = tokio::spawn(async move {
3208 receive_direct_file(
3209 &ReceiveRequest::new(recv_addr),
3210 dest_path,
3211 &receiver_identity,
3212 |_| {},
3213 )
3214 .await
3215 });
3216
3217 tokio::time::sleep(Duration::from_millis(100)).await;
3218
3219 let peer = DirectPeer::from_address(recv_addr)
3220 .with_expected_fingerprint(other_identity.fingerprint().to_string())
3221 .expect("expected fingerprint accepted");
3222 let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
3223 let error = send_direct_file(&send_request, &sender_identity, |_| {})
3224 .await
3225 .expect_err("fingerprint mismatch should reject send");
3226
3227 assert!(matches!(error, Error::PeerFingerprintMismatch { .. }));
3228 let server_error = server
3229 .await
3230 .expect("server joined")
3231 .expect_err("server closed");
3232 assert!(matches!(
3233 server_error,
3234 Error::Connection(_) | Error::NoIncomingConnection
3235 ));
3236 assert!(
3237 !tokio::fs::try_exists(dest_dir.path().join("hello.txt"))
3238 .await
3239 .expect("destination checked")
3240 );
3241 }
3242
3243 #[tokio::test]
3244 async fn direct_quic_loopback_sends_directory_and_resumes_partial_file() {
3245 let source_dir = tempfile::tempdir().expect("source tempdir");
3246 let dest_dir = tempfile::tempdir().expect("dest tempdir");
3247 let identity_dir = tempfile::tempdir().expect("identity tempdir");
3248 let bundle = source_dir.path().join("bundle");
3249 let nested = bundle.join("nested");
3250 tokio::fs::create_dir_all(&nested)
3251 .await
3252 .expect("source dirs");
3253 tokio::fs::write(nested.join("small.txt"), b"small")
3254 .await
3255 .expect("small file");
3256 let large_bytes = vec![7; (DEFAULT_CHUNK_SIZE as usize * 2) + 17];
3257 tokio::fs::write(bundle.join("large.bin"), &large_bytes)
3258 .await
3259 .expect("large file");
3260
3261 let dest_bundle = dest_dir.path().join("bundle");
3262 tokio::fs::create_dir_all(&dest_bundle)
3263 .await
3264 .expect("dest bundle");
3265 tokio::fs::write(
3266 dest_bundle.join("large.bin"),
3267 &large_bytes[..DEFAULT_CHUNK_SIZE as usize + 5],
3268 )
3269 .await
3270 .expect("partial large");
3271
3272 let identity =
3273 NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3274 let recv_identity = identity.clone();
3275 let reserved_socket =
3276 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3277 let recv_addr = reserved_socket.local_addr().expect("local addr");
3278 drop(reserved_socket);
3279 let dest_path = dest_dir.path().to_path_buf();
3280 let server = tokio::spawn(async move {
3281 receive_direct_file(
3282 &ReceiveRequest::new(recv_addr),
3283 dest_path,
3284 &recv_identity,
3285 |_| {},
3286 )
3287 .await
3288 });
3289
3290 tokio::time::sleep(Duration::from_millis(100)).await;
3291
3292 let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3293 let send_request = SendRequest::new(peer, vec![bundle]).expect("send request is valid");
3294 let send_summary = send_direct_file(&send_request, &identity, |_| {})
3295 .await
3296 .expect("directory sent");
3297 let receive_summary = server
3298 .await
3299 .expect("server joined")
3300 .expect("directory received");
3301
3302 let received_large = tokio::fs::read(dest_bundle.join("large.bin"))
3303 .await
3304 .expect("large file readable");
3305 let received_small = tokio::fs::read(dest_bundle.join("nested/small.txt"))
3306 .await
3307 .expect("small file readable");
3308 assert_eq!(received_large, large_bytes);
3309 assert_eq!(received_small, b"small");
3310 assert_eq!(send_summary.bytes, receive_summary.bytes);
3311 assert!(
3312 receive_summary
3313 .files
3314 .iter()
3315 .any(|file| file.status == TransferFileStatus::Resumed)
3316 );
3317 }
3318
3319 #[tokio::test]
3320 async fn direct_quic_loopback_cancels_send_and_does_not_finalize_partial_file() {
3321 let source_dir = tempfile::tempdir().expect("source tempdir");
3322 let dest_dir = tempfile::tempdir().expect("dest tempdir");
3323 let identity_dir = tempfile::tempdir().expect("identity tempdir");
3324 let file_path = source_dir.path().join("payload.bin");
3325 tokio::fs::write(&file_path, vec![3; IO_BUFFER_SIZE * 64])
3326 .await
3327 .expect("source file written");
3328
3329 let identity =
3330 NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3331 let recv_identity = identity.clone();
3332 let reserved_socket =
3333 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3334 let recv_addr = reserved_socket.local_addr().expect("local addr");
3335 drop(reserved_socket);
3336
3337 let dest_path = dest_dir.path().to_path_buf();
3338 let server = tokio::spawn(async move {
3339 receive_direct_file(
3340 &ReceiveRequest::new(recv_addr),
3341 dest_path,
3342 &recv_identity,
3343 |_| {},
3344 )
3345 .await
3346 });
3347
3348 tokio::time::sleep(Duration::from_millis(100)).await;
3349
3350 let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3351 let send_request =
3352 SendRequest::new(peer, vec![file_path.clone()]).expect("send request is valid");
3353 let send_control = TransferControl::new();
3354 let cancel_on_progress = send_control.clone();
3355 let send_events = Arc::new(Mutex::new(Vec::new()));
3356 let send_events_log = send_events.clone();
3357 let send_result =
3358 send_direct_file_with_control(&send_request, &identity, send_control, |event| {
3359 if matches!(event, TransferEvent::Progress { .. }) {
3360 cancel_on_progress.cancel();
3361 }
3362 send_events_log.lock().expect("events lock").push(event);
3363 })
3364 .await;
3365
3366 assert!(matches!(send_result, Err(Error::TransferCancelled)));
3367
3368 let receive_result = tokio::time::timeout(Duration::from_secs(5), server)
3369 .await
3370 .expect("receiver should finish after sender cancellation")
3371 .expect("server joined");
3372 assert!(receive_result.is_err());
3373
3374 assert!(!dest_dir.path().join("payload.bin").exists());
3375 assert!(
3376 send_events
3377 .lock()
3378 .expect("events lock")
3379 .iter()
3380 .any(|event| matches!(event, TransferEvent::SessionCancelled { .. }))
3381 );
3382 }
3383}