1pub mod error;
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::fs::Metadata;
5use std::io::SeekFrom;
6use std::net::SocketAddr;
7use std::path::{Component, Path, PathBuf};
8use std::sync::Arc;
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11use directories::BaseDirs;
12use mdns_sd::{ResolvedService, ServiceDaemon, ServiceEvent, ServiceInfo};
13use quinn::crypto::rustls::QuicClientConfig;
14use rcgen::{CertifiedKey, generate_simple_self_signed};
15use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier};
16use rustls::crypto::{CryptoProvider, verify_tls12_signature, verify_tls13_signature};
17use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer, ServerName, UnixTime};
18use rustls::{DigitallySignedStruct, SignatureScheme};
19use serde::{Deserialize, Serialize};
20use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
21use uuid::Uuid;
22
23pub use error::{Error, Result};
24
25pub const NATIVE_PROTOCOL_VERSION: u16 = 1;
26pub const MIN_NATIVE_PROTOCOL_VERSION: u16 = 1;
27pub const NATIVE_SERVICE_TYPE: &str = "_ferry._udp.local.";
28
29const DIRECT_MAGIC: &[u8; 8] = b"FERRY01\n";
30const MAX_FRAME_LEN: usize = 16 * 1024 * 1024;
31const IO_BUFFER_SIZE: usize = 64 * 1024;
32const MANIFEST_VERSION: u16 = 1;
33const DEFAULT_CHUNK_SIZE: u64 = 1024 * 1024;
34const CONFIG_DIR_NAME: &str = "ferry";
35
36#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
37pub struct DirectPeer {
38 address: SocketAddr,
39}
40
41impl DirectPeer {
42 pub fn parse(value: &str) -> Result<Self> {
43 Ok(Self {
44 address: value.parse()?,
45 })
46 }
47
48 pub const fn address(&self) -> SocketAddr {
49 self.address
50 }
51
52 pub const fn from_address(address: SocketAddr) -> Self {
53 Self { address }
54 }
55}
56
57#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
58pub struct SendRequest {
59 peer: DirectPeer,
60 paths: Vec<PathBuf>,
61}
62
63impl SendRequest {
64 pub fn new(peer: DirectPeer, paths: Vec<PathBuf>) -> Result<Self> {
65 if paths.is_empty() {
66 return Err(Error::InvalidInput(
67 "at least one file path is required".to_string(),
68 ));
69 }
70
71 Ok(Self { peer, paths })
72 }
73
74 pub const fn peer(&self) -> &DirectPeer {
75 &self.peer
76 }
77
78 pub fn paths(&self) -> &[PathBuf] {
79 &self.paths
80 }
81}
82
83#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
84pub struct ReceiveRequest {
85 listen: SocketAddr,
86}
87
88impl ReceiveRequest {
89 pub fn new(listen: SocketAddr) -> Self {
90 Self { listen }
91 }
92
93 pub const fn listen(&self) -> SocketAddr {
94 self.listen
95 }
96}
97
98#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
99pub struct AppConfig {
100 pub alias: String,
101 pub listen_port: u16,
102 pub quic_port: u16,
103 pub download_dir: String,
104 pub auto_accept_known: bool,
105 pub auto_accept_unknown: bool,
106 pub discovery: Vec<String>,
107 pub max_concurrent_files: usize,
108 pub trust: TrustConfig,
109}
110
111impl Default for AppConfig {
112 fn default() -> Self {
113 Self {
114 alias: default_alias(),
115 listen_port: 53317,
116 quic_port: 53318,
117 download_dir: "~/Downloads/ferry".to_string(),
118 auto_accept_known: true,
119 auto_accept_unknown: false,
120 discovery: vec!["native".to_string()],
121 max_concurrent_files: 8,
122 trust: TrustConfig::default(),
123 }
124 }
125}
126
127impl AppConfig {
128 pub fn load_or_default() -> Result<Self> {
129 Self::load_or_default_from(config_dir()?.join("config.toml"))
130 }
131
132 pub fn load_or_default_from(path: impl AsRef<Path>) -> Result<Self> {
133 let path = path.as_ref();
134 match std::fs::read_to_string(path) {
135 Ok(contents) => {
136 toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
137 }
138 Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Self::default()),
139 Err(source) => Err(Error::IoPath {
140 path: path.to_path_buf(),
141 source,
142 }),
143 }
144 }
145
146 pub fn save_default_path(&self) -> Result<()> {
147 self.save_to_path(config_dir()?.join("config.toml"))
148 }
149
150 pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
151 write_toml_file(path.as_ref(), self)
152 }
153
154 pub fn to_toml_string(&self) -> Result<String> {
155 toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
156 }
157
158 pub fn redacted(&self) -> Self {
159 let mut config = self.clone();
160 config.trust = config.trust.redacted();
161 config
162 }
163
164 pub fn to_redacted_toml_string(&self) -> Result<String> {
165 self.redacted().to_toml_string()
166 }
167}
168
169#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
170pub struct DaemonConfig {
171 pub listen: SocketAddr,
172 pub destination: PathBuf,
173}
174
175impl DaemonConfig {
176 pub fn from_app_config(config: &AppConfig) -> Result<Self> {
177 Ok(Self {
178 listen: SocketAddr::from(([0, 0, 0, 0], config.quic_port)),
179 destination: expand_home_path(&config.download_dir)?,
180 })
181 }
182
183 pub fn load_or_default() -> Result<Self> {
184 let app_config = AppConfig::load_or_default()?;
185 Self::load_or_default_from(config_dir()?.join("daemon.toml"), &app_config)
186 }
187
188 pub fn load_or_default_from(path: impl AsRef<Path>, app_config: &AppConfig) -> Result<Self> {
189 let path = path.as_ref();
190 match std::fs::read_to_string(path) {
191 Ok(contents) => {
192 toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
193 }
194 Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
195 Self::from_app_config(app_config)
196 }
197 Err(source) => Err(Error::IoPath {
198 path: path.to_path_buf(),
199 source,
200 }),
201 }
202 }
203
204 pub fn save_default_path(&self) -> Result<()> {
205 self.save_to_path(config_dir()?.join("daemon.toml"))
206 }
207
208 pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
209 write_toml_file(path.as_ref(), self)
210 }
211
212 pub fn to_toml_string(&self) -> Result<String> {
213 toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
214 }
215}
216
217#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
218pub struct TrustConfig {
219 pub require_fingerprint: bool,
220 pub psk: String,
221}
222
223impl Default for TrustConfig {
224 fn default() -> Self {
225 Self {
226 require_fingerprint: true,
227 psk: String::new(),
228 }
229 }
230}
231
232impl TrustConfig {
233 pub fn redacted(&self) -> Self {
234 let psk = if self.psk.is_empty() {
235 String::new()
236 } else {
237 "<redacted>".to_string()
238 };
239 Self {
240 require_fingerprint: self.require_fingerprint,
241 psk,
242 }
243 }
244}
245
246#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
247pub struct NativeIdentity {
248 cert_der: Vec<u8>,
249 key_der: Vec<u8>,
250 fingerprint: String,
251}
252
253impl NativeIdentity {
254 pub fn load_or_generate() -> Result<Self> {
255 Self::load_or_generate_in(config_dir()?)
256 }
257
258 pub fn load_or_generate_in(config_dir: impl AsRef<Path>) -> Result<Self> {
259 let config_dir = config_dir.as_ref();
260 let cert_path = config_dir.join("identity.cert.der");
261 let key_path = config_dir.join("identity.key.der");
262
263 if cert_path.exists() && key_path.exists() {
264 let cert_der = std::fs::read(&cert_path).map_err(|source| Error::IoPath {
265 path: cert_path,
266 source,
267 })?;
268 let key_der = std::fs::read(&key_path).map_err(|source| Error::IoPath {
269 path: key_path,
270 source,
271 })?;
272
273 return Ok(Self::from_parts(cert_der, key_der));
274 }
275
276 std::fs::create_dir_all(config_dir).map_err(|source| Error::IoPath {
277 path: config_dir.to_path_buf(),
278 source,
279 })?;
280
281 let identity = Self::generate()?;
282 write_private_file(&cert_path, &identity.cert_der)?;
283 write_private_file(&key_path, &identity.key_der)?;
284 Ok(identity)
285 }
286
287 pub fn generate() -> Result<Self> {
288 let CertifiedKey { cert, signing_key } =
289 generate_simple_self_signed(vec!["localhost".to_string()])?;
290 Ok(Self::from_parts(
291 cert.der().to_vec(),
292 signing_key.serialize_der(),
293 ))
294 }
295
296 pub fn fingerprint(&self) -> &str {
297 &self.fingerprint
298 }
299
300 fn cert_chain(&self) -> Vec<CertificateDer<'static>> {
301 vec![CertificateDer::from(self.cert_der.clone())]
302 }
303
304 fn private_key(&self) -> PrivateKeyDer<'static> {
305 PrivateKeyDer::from(PrivatePkcs8KeyDer::from(self.key_der.clone()))
306 }
307
308 fn from_parts(cert_der: Vec<u8>, key_der: Vec<u8>) -> Self {
309 let fingerprint = blake3::hash(&cert_der).to_hex().to_string();
310 Self {
311 cert_der,
312 key_der,
313 fingerprint,
314 }
315 }
316}
317
318#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
319#[serde(rename_all = "snake_case")]
320pub enum TransferDirection {
321 Send,
322 Receive,
323}
324
325#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
326pub struct TransferProgress {
327 pub direction: TransferDirection,
328 pub file_name: String,
329 pub bytes_done: u64,
330 pub bytes_total: u64,
331}
332
333#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
334#[serde(tag = "event", rename_all = "snake_case")]
335pub enum TransferEvent {
336 SessionStarted {
337 direction: TransferDirection,
338 session_id: Uuid,
339 files_total: usize,
340 bytes_total: u64,
341 },
342 FileStarted {
343 direction: TransferDirection,
344 file_name: String,
345 bytes_total: u64,
346 resume_offset: u64,
347 },
348 Progress {
349 direction: TransferDirection,
350 file_name: String,
351 bytes_done: u64,
352 bytes_total: u64,
353 },
354 FileFinished {
355 direction: TransferDirection,
356 file_name: String,
357 bytes: u64,
358 blake3: String,
359 status: TransferFileStatus,
360 },
361 SessionFinished {
362 direction: TransferDirection,
363 files_total: usize,
364 bytes_total: u64,
365 blake3: String,
366 },
367}
368
369impl TransferEvent {
370 pub fn progress(&self) -> Option<TransferProgress> {
371 match self {
372 Self::Progress {
373 direction,
374 file_name,
375 bytes_done,
376 bytes_total,
377 } => Some(TransferProgress {
378 direction: *direction,
379 file_name: file_name.clone(),
380 bytes_done: *bytes_done,
381 bytes_total: *bytes_total,
382 }),
383 _ => None,
384 }
385 }
386}
387
388#[derive(Debug, Clone, PartialEq, Eq)]
389pub struct TransferSummary {
390 pub path: PathBuf,
391 pub file_name: String,
392 pub bytes: u64,
393 pub blake3: String,
394 pub files: Vec<TransferFileSummary>,
395}
396
397#[derive(Debug, Clone, PartialEq, Eq)]
398pub struct TransferFileSummary {
399 pub path: PathBuf,
400 pub relative_path: PathBuf,
401 pub bytes: u64,
402 pub blake3: String,
403 pub status: TransferFileStatus,
404}
405
406#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
407#[serde(rename_all = "snake_case")]
408pub enum TransferFileStatus {
409 Sent,
410 Received,
411 Skipped,
412 Resumed,
413}
414
415#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
416pub struct TransferManifest {
417 pub version: u16,
418 pub session_id: Uuid,
419 pub chunk_size: u64,
420 pub entries: Vec<ManifestEntry>,
421}
422
423impl TransferManifest {
424 pub fn file_entries(&self) -> impl Iterator<Item = &ManifestEntry> {
425 self.entries
426 .iter()
427 .filter(|entry| entry.kind == ManifestEntryKind::File)
428 }
429
430 pub fn total_file_bytes(&self) -> u64 {
431 self.file_entries().map(|entry| entry.size).sum()
432 }
433
434 pub fn digest(&self) -> Result<String> {
435 let bytes = serde_json::to_vec(self).map_err(|error| Error::Protocol(error.to_string()))?;
436 Ok(blake3::hash(&bytes).to_hex().to_string())
437 }
438}
439
440#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
441pub struct ManifestEntry {
442 pub relative_path: PathBuf,
443 pub kind: ManifestEntryKind,
444 pub size: u64,
445 pub blake3: Option<String>,
446 pub chunks: Vec<String>,
447 pub unix_mode: Option<u32>,
448 pub modified_unix_ms: Option<i128>,
449}
450
451#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
452#[serde(rename_all = "snake_case")]
453pub enum ManifestEntryKind {
454 File,
455 Directory,
456}
457
458#[derive(Debug, Clone, PartialEq, Eq)]
459pub enum ResumeDecision {
460 Create,
461 Skip,
462 ResumeFrom(u64),
463 Conflict(String),
464}
465
466#[derive(Debug, Clone, PartialEq, Eq)]
467pub struct PeerObservation {
468 pub fingerprint: String,
469 pub alias: Option<String>,
470 pub hostname: Option<String>,
471 pub address: SocketAddr,
472 pub transports: BTreeSet<String>,
473 pub seen_unix_ms: i128,
474}
475
476impl PeerObservation {
477 pub fn new(fingerprint: impl Into<String>, address: SocketAddr, seen_unix_ms: i128) -> Self {
478 Self {
479 fingerprint: fingerprint.into(),
480 alias: None,
481 hostname: None,
482 address,
483 transports: BTreeSet::new(),
484 seen_unix_ms,
485 }
486 }
487
488 pub fn with_alias(mut self, alias: impl Into<String>) -> Self {
489 self.alias = Some(alias.into());
490 self
491 }
492
493 pub fn with_hostname(mut self, hostname: impl Into<String>) -> Self {
494 self.hostname = Some(hostname.into());
495 self
496 }
497
498 pub fn with_transport(mut self, transport: impl Into<String>) -> Self {
499 self.transports.insert(transport.into());
500 self
501 }
502}
503
504#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
505pub struct PeerRecord {
506 pub fingerprint: String,
507 pub aliases: BTreeSet<String>,
508 pub hostnames: BTreeSet<String>,
509 pub addresses: BTreeSet<SocketAddr>,
510 pub transports: BTreeSet<String>,
511 pub first_seen_unix_ms: i128,
512 pub last_seen_unix_ms: i128,
513}
514
515impl PeerRecord {
516 fn from_observation(observation: PeerObservation) -> Self {
517 let mut aliases = BTreeSet::new();
518 if let Some(alias) = observation.alias {
519 aliases.insert(alias);
520 }
521
522 let mut hostnames = BTreeSet::new();
523 if let Some(hostname) = observation.hostname {
524 hostnames.insert(hostname);
525 }
526
527 let mut addresses = BTreeSet::new();
528 addresses.insert(observation.address);
529
530 Self {
531 fingerprint: observation.fingerprint,
532 aliases,
533 hostnames,
534 addresses,
535 transports: observation.transports,
536 first_seen_unix_ms: observation.seen_unix_ms,
537 last_seen_unix_ms: observation.seen_unix_ms,
538 }
539 }
540
541 fn merge(&mut self, observation: PeerObservation) {
542 if let Some(alias) = observation.alias {
543 self.aliases.insert(alias);
544 }
545 if let Some(hostname) = observation.hostname {
546 self.hostnames.insert(hostname);
547 }
548 self.addresses.insert(observation.address);
549 self.transports.extend(observation.transports);
550 self.first_seen_unix_ms = self.first_seen_unix_ms.min(observation.seen_unix_ms);
551 self.last_seen_unix_ms = self.last_seen_unix_ms.max(observation.seen_unix_ms);
552 }
553
554 pub fn preferred_quic_address(&self) -> Option<SocketAddr> {
555 if !self.transports.contains("quic") {
556 return None;
557 }
558
559 self.addresses.iter().copied().next()
560 }
561}
562
563#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
564#[serde(rename_all = "snake_case")]
565pub enum TrustState {
566 Trusted,
567 Blocked,
568}
569
570#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
571pub struct KnownPeerEntry {
572 pub fingerprint: String,
573 #[serde(default)]
574 pub aliases: BTreeSet<String>,
575 #[serde(default)]
576 pub hostnames: BTreeSet<String>,
577 #[serde(default)]
578 pub addresses: BTreeSet<SocketAddr>,
579 #[serde(default)]
580 pub transports: BTreeSet<String>,
581 pub trust_state: TrustState,
582 pub first_seen_unix_ms: Option<i128>,
583 pub last_seen_unix_ms: Option<i128>,
584}
585
586impl KnownPeerEntry {
587 pub fn trusted(fingerprint: impl Into<String>) -> Result<Self> {
588 let fingerprint = normalized_fingerprint(fingerprint.into())?;
589 Ok(Self {
590 fingerprint,
591 aliases: BTreeSet::new(),
592 hostnames: BTreeSet::new(),
593 addresses: BTreeSet::new(),
594 transports: BTreeSet::new(),
595 trust_state: TrustState::Trusted,
596 first_seen_unix_ms: None,
597 last_seen_unix_ms: None,
598 })
599 }
600
601 pub fn from_record(record: &PeerRecord, trust_state: TrustState) -> Result<Self> {
602 let fingerprint = normalized_fingerprint(record.fingerprint.clone())?;
603 Ok(Self {
604 fingerprint,
605 aliases: record.aliases.clone(),
606 hostnames: record.hostnames.clone(),
607 addresses: record.addresses.clone(),
608 transports: record.transports.clone(),
609 trust_state,
610 first_seen_unix_ms: Some(record.first_seen_unix_ms),
611 last_seen_unix_ms: Some(record.last_seen_unix_ms),
612 })
613 }
614}
615
616#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
617pub struct TrustStore {
618 #[serde(default)]
619 pub peers: BTreeMap<String, KnownPeerEntry>,
620}
621
622impl TrustStore {
623 pub fn load_or_default() -> Result<Self> {
624 Self::load_or_default_from(config_dir()?.join("known_peers.toml"))
625 }
626
627 pub fn load_or_default_from(path: impl AsRef<Path>) -> Result<Self> {
628 let path = path.as_ref();
629 match std::fs::read_to_string(path) {
630 Ok(contents) => {
631 toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
632 }
633 Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Self::default()),
634 Err(source) => Err(Error::IoPath {
635 path: path.to_path_buf(),
636 source,
637 }),
638 }
639 }
640
641 pub fn save_default_path(&self) -> Result<()> {
642 self.save_to_path(config_dir()?.join("known_peers.toml"))
643 }
644
645 pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
646 write_toml_file(path.as_ref(), self)
647 }
648
649 pub fn trust_fingerprint(&mut self, fingerprint: impl Into<String>) -> Result<&KnownPeerEntry> {
650 let entry = KnownPeerEntry::trusted(fingerprint)?;
651 let fingerprint = entry.fingerprint.clone();
652 self.peers
653 .entry(fingerprint.clone())
654 .and_modify(|existing| existing.trust_state = TrustState::Trusted)
655 .or_insert(entry);
656 Ok(self
657 .peers
658 .get(&fingerprint)
659 .expect("trusted peer entry should exist"))
660 }
661
662 pub fn trust_record(&mut self, record: &PeerRecord) -> Result<&KnownPeerEntry> {
663 let entry = KnownPeerEntry::from_record(record, TrustState::Trusted)?;
664 let fingerprint = entry.fingerprint.clone();
665 self.peers
666 .entry(fingerprint.clone())
667 .and_modify(|existing| {
668 existing.aliases.extend(record.aliases.clone());
669 existing.hostnames.extend(record.hostnames.clone());
670 existing.addresses.extend(record.addresses.clone());
671 existing.transports.extend(record.transports.clone());
672 existing.first_seen_unix_ms = Some(
673 existing
674 .first_seen_unix_ms
675 .unwrap_or(record.first_seen_unix_ms)
676 .min(record.first_seen_unix_ms),
677 );
678 existing.last_seen_unix_ms = Some(
679 existing
680 .last_seen_unix_ms
681 .unwrap_or(record.last_seen_unix_ms)
682 .max(record.last_seen_unix_ms),
683 );
684 existing.trust_state = TrustState::Trusted;
685 })
686 .or_insert(entry);
687 Ok(self
688 .peers
689 .get(&fingerprint)
690 .expect("trusted peer entry should exist"))
691 }
692
693 pub fn forget(&mut self, fingerprint: &str) -> bool {
694 self.peers.remove(fingerprint).is_some()
695 }
696
697 pub fn records(&self) -> impl Iterator<Item = &KnownPeerEntry> {
698 self.peers.values()
699 }
700
701 pub fn to_toml_string(&self) -> Result<String> {
702 toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
703 }
704}
705
706#[derive(Debug, Default, Clone, PartialEq, Eq)]
707pub struct PeerRegistry {
708 peers: BTreeMap<String, PeerRecord>,
709}
710
711#[derive(Debug, Clone, PartialEq, Eq)]
712pub enum PeerLookup<'a> {
713 Found(&'a PeerRecord),
714 Ambiguous(Vec<&'a PeerRecord>),
715 Missing,
716}
717
718impl PeerRegistry {
719 pub fn new() -> Self {
720 Self::default()
721 }
722
723 pub fn observe(&mut self, observation: PeerObservation) -> Result<&PeerRecord> {
724 if observation.fingerprint.trim().is_empty() {
725 return Err(Error::InvalidInput(
726 "peer fingerprint must not be empty".to_string(),
727 ));
728 }
729
730 let fingerprint = observation.fingerprint.clone();
731 if let Some(record) = self.peers.get_mut(&fingerprint) {
732 record.merge(observation);
733 } else {
734 self.peers.insert(
735 fingerprint.clone(),
736 PeerRecord::from_observation(observation),
737 );
738 }
739
740 Ok(self
741 .peers
742 .get(&fingerprint)
743 .expect("observed peer record should exist"))
744 }
745
746 pub fn get_by_fingerprint(&self, fingerprint: &str) -> Option<&PeerRecord> {
747 self.peers.get(fingerprint)
748 }
749
750 pub fn lookup(&self, query: &str) -> Option<&PeerRecord> {
751 match self.lookup_detail(query) {
752 PeerLookup::Found(record) => Some(record),
753 PeerLookup::Ambiguous(_) | PeerLookup::Missing => None,
754 }
755 }
756
757 pub fn lookup_detail(&self, query: &str) -> PeerLookup<'_> {
758 if let Some(record) = self.peers.get(query) {
759 return PeerLookup::Found(record);
760 }
761
762 let matches = self
763 .peers
764 .values()
765 .filter(|record| {
766 record.fingerprint.starts_with(query)
767 || record.aliases.iter().any(|alias| alias == query)
768 || record.hostnames.iter().any(|hostname| hostname == query)
769 })
770 .collect::<Vec<_>>();
771
772 match matches.as_slice() {
773 [] => PeerLookup::Missing,
774 [record] => PeerLookup::Found(record),
775 _ => PeerLookup::Ambiguous(matches),
776 }
777 }
778
779 pub fn records(&self) -> impl Iterator<Item = &PeerRecord> {
780 self.peers.values()
781 }
782}
783
784#[derive(Debug, Clone, PartialEq, Eq)]
785pub struct NativeAnnouncement {
786 pub alias: String,
787 pub fingerprint: String,
788 pub quic_port: u16,
789 pub listen_port: Option<u16>,
790}
791
792impl NativeAnnouncement {
793 pub fn from_config(config: &AppConfig, identity: &NativeIdentity) -> Self {
794 Self {
795 alias: config.alias.clone(),
796 fingerprint: identity.fingerprint().to_string(),
797 quic_port: config.quic_port,
798 listen_port: Some(config.listen_port),
799 }
800 }
801
802 fn service_info(&self) -> Result<ServiceInfo> {
803 let alias = sanitize_dns_label(&self.alias);
804 let fingerprint_prefix = self
805 .fingerprint
806 .get(..12)
807 .unwrap_or(self.fingerprint.as_str());
808 let instance = format!("{alias}-{fingerprint_prefix}");
809 let hostname = format!("{alias}.local.");
810 let properties = [
811 ("pv", NATIVE_PROTOCOL_VERSION.to_string()),
812 ("minpv", MIN_NATIVE_PROTOCOL_VERSION.to_string()),
813 ("fp", self.fingerprint.clone()),
814 ("alias", self.alias.clone()),
815 ("transports", "quic".to_string()),
816 ("quic_port", self.quic_port.to_string()),
817 (
818 "listen_port",
819 self.listen_port
820 .map(|port| port.to_string())
821 .unwrap_or_default(),
822 ),
823 ];
824
825 ServiceInfo::new(
826 NATIVE_SERVICE_TYPE,
827 &instance,
828 &hostname,
829 "",
830 self.quic_port,
831 &properties[..],
832 )
833 .map(ServiceInfo::enable_addr_auto)
834 .map_err(|error| Error::Discovery(error.to_string()))
835 }
836}
837
838pub struct NativeAnnouncer {
839 daemon: ServiceDaemon,
840 fullname: String,
841}
842
843impl NativeAnnouncer {
844 pub fn start(announcement: &NativeAnnouncement) -> Result<Self> {
845 let daemon = ServiceDaemon::new().map_err(|error| Error::Discovery(error.to_string()))?;
846 let service = announcement.service_info()?;
847 let fullname = service.get_fullname().to_string();
848 daemon
849 .register(service)
850 .map_err(|error| Error::Discovery(error.to_string()))?;
851 Ok(Self { daemon, fullname })
852 }
853}
854
855impl Drop for NativeAnnouncer {
856 fn drop(&mut self) {
857 let _ = self.daemon.unregister(&self.fullname);
858 let _ = self.daemon.shutdown();
859 }
860}
861
862pub async fn discover_native_peers(timeout: Duration) -> Result<PeerRegistry> {
863 let daemon = ServiceDaemon::new().map_err(|error| Error::Discovery(error.to_string()))?;
864 let receiver = daemon
865 .browse(NATIVE_SERVICE_TYPE)
866 .map_err(|error| Error::Discovery(error.to_string()))?;
867 let deadline = tokio::time::Instant::now() + timeout;
868 let mut registry = PeerRegistry::new();
869
870 loop {
871 let now = tokio::time::Instant::now();
872 if now >= deadline {
873 break;
874 }
875
876 let wait = deadline - now;
877 match tokio::time::timeout(wait, receiver.recv_async()).await {
878 Ok(Ok(ServiceEvent::ServiceResolved(service))) => {
879 observe_resolved_service(&mut registry, &service)?;
880 }
881 Ok(Ok(_)) => {}
882 Ok(Err(error)) => {
883 return Err(Error::Discovery(error.to_string()));
884 }
885 Err(_) => break,
886 }
887 }
888
889 daemon
890 .stop_browse(NATIVE_SERVICE_TYPE)
891 .map_err(|error| Error::Discovery(error.to_string()))?;
892 let _ = daemon.shutdown();
893 Ok(registry)
894}
895
896fn observe_resolved_service(registry: &mut PeerRegistry, service: &ResolvedService) -> Result<()> {
897 let Some(fingerprint) = service.get_property_val_str("fp") else {
898 return Ok(());
899 };
900 let fingerprint = match normalized_fingerprint(fingerprint.to_string()) {
901 Ok(fingerprint) => fingerprint,
902 Err(_) => return Ok(()),
903 };
904 let Some(highest_version) = parse_txt_u16(service, "pv") else {
905 return Ok(());
906 };
907 let Some(minimum_version) = parse_txt_u16(service, "minpv") else {
908 return Ok(());
909 };
910 if minimum_version > highest_version
911 || minimum_version > NATIVE_PROTOCOL_VERSION
912 || highest_version < MIN_NATIVE_PROTOCOL_VERSION
913 {
914 return Ok(());
915 }
916
917 let Some(quic_port) = parse_txt_u16(service, "quic_port") else {
918 return Ok(());
919 };
920 let transports = service
921 .get_property_val_str("transports")
922 .unwrap_or_default()
923 .split(',')
924 .map(str::trim)
925 .filter(|value| !value.is_empty())
926 .map(ToOwned::to_owned)
927 .collect::<Vec<_>>();
928 if !transports.iter().any(|transport| transport == "quic") {
929 return Ok(());
930 }
931
932 let alias = service.get_property_val_str("alias").map(ToOwned::to_owned);
933 let hostname = Some(service.get_hostname().trim_end_matches('.').to_string());
934 let seen_unix_ms = system_time_unix_ms(SystemTime::now()).unwrap_or_default();
935
936 for address in service.get_addresses_v4() {
937 let mut observation = PeerObservation::new(
938 fingerprint.clone(),
939 SocketAddr::from((address, quic_port)),
940 seen_unix_ms,
941 );
942 if let Some(alias) = &alias {
943 observation = observation.with_alias(alias.clone());
944 }
945 if let Some(hostname) = &hostname {
946 observation = observation.with_hostname(hostname.clone());
947 }
948 for transport in &transports {
949 observation = observation.with_transport(transport.clone());
950 }
951 registry.observe(observation)?;
952 }
953
954 Ok(())
955}
956
957fn parse_txt_u16(service: &ResolvedService, key: &str) -> Option<u16> {
958 service.get_property_val_str(key)?.parse().ok()
959}
960
961fn sanitize_dns_label(value: &str) -> String {
962 let mut label = value
963 .chars()
964 .map(|ch| {
965 if ch.is_ascii_alphanumeric() || ch == '-' {
966 ch.to_ascii_lowercase()
967 } else {
968 '-'
969 }
970 })
971 .collect::<String>();
972 while label.contains("--") {
973 label = label.replace("--", "-");
974 }
975 let label = label.trim_matches('-').to_string();
976 if label.is_empty() {
977 "fileferry-device".to_string()
978 } else {
979 label
980 }
981}
982
983#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
984struct DirectReceivePlan {
985 files: Vec<DirectFilePlan>,
986}
987
988#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
989struct DirectProtocolHello {
990 protocol_version: u16,
991 min_protocol_version: u16,
992 client_fingerprint: String,
993}
994
995impl DirectProtocolHello {
996 fn new(identity: &NativeIdentity) -> Self {
997 Self {
998 protocol_version: NATIVE_PROTOCOL_VERSION,
999 min_protocol_version: MIN_NATIVE_PROTOCOL_VERSION,
1000 client_fingerprint: identity.fingerprint().to_string(),
1001 }
1002 }
1003}
1004
1005#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1006struct DirectProtocolResponse {
1007 accepted: bool,
1008 protocol_version: Option<u16>,
1009 error: Option<String>,
1010}
1011
1012impl DirectProtocolResponse {
1013 fn accept(protocol_version: u16) -> Self {
1014 Self {
1015 accepted: true,
1016 protocol_version: Some(protocol_version),
1017 error: None,
1018 }
1019 }
1020
1021 fn reject(error: impl Into<String>) -> Self {
1022 Self {
1023 accepted: false,
1024 protocol_version: None,
1025 error: Some(error.into()),
1026 }
1027 }
1028}
1029
1030#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1031struct DirectFilePlan {
1032 relative_path: PathBuf,
1033 action: DirectFileAction,
1034 offset: u64,
1035}
1036
1037#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1038#[serde(rename_all = "snake_case")]
1039enum DirectFileAction {
1040 Receive,
1041 Skip,
1042}
1043
1044#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1045struct DirectPayloadHeader {
1046 relative_path: PathBuf,
1047 offset: u64,
1048 bytes: u64,
1049}
1050
1051#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1052struct DirectAck {
1053 ok: bool,
1054 error: Option<String>,
1055}
1056
1057pub fn validate_send_paths(paths: &[PathBuf]) -> Result<()> {
1058 if paths.is_empty() {
1059 return Err(Error::InvalidInput(
1060 "at least one file path is required".to_string(),
1061 ));
1062 }
1063
1064 for path in paths {
1065 if path.as_os_str().is_empty() {
1066 return Err(Error::InvalidInput("empty file path".to_string()));
1067 }
1068 }
1069
1070 Ok(())
1071}
1072
1073pub fn display_path(path: &Path) -> String {
1074 path.to_string_lossy().into_owned()
1075}
1076
1077pub async fn build_manifest(paths: &[PathBuf]) -> Result<TransferManifest> {
1078 validate_send_paths(paths)?;
1079
1080 let sources = collect_manifest_sources(paths)?;
1081 let mut entries = Vec::with_capacity(sources.len());
1082 let mut seen = BTreeSet::new();
1083
1084 for source in sources {
1085 if !seen.insert(source.relative_path.clone()) {
1086 return Err(Error::InvalidInput(format!(
1087 "duplicate transfer path: {}",
1088 display_path(&source.relative_path)
1089 )));
1090 }
1091
1092 entries.push(build_manifest_entry(source).await?);
1093 }
1094
1095 Ok(TransferManifest {
1096 version: MANIFEST_VERSION,
1097 session_id: Uuid::new_v4(),
1098 chunk_size: DEFAULT_CHUNK_SIZE,
1099 entries,
1100 })
1101}
1102
1103pub fn safe_destination_path(destination: &Path, relative_path: &Path) -> Result<PathBuf> {
1104 validate_relative_transfer_path(relative_path)?;
1105 Ok(destination.join(relative_path))
1106}
1107
1108pub async fn decide_resume(
1109 destination: &Path,
1110 entry: &ManifestEntry,
1111 chunk_size: u64,
1112) -> Result<ResumeDecision> {
1113 if entry.kind != ManifestEntryKind::File {
1114 return Ok(ResumeDecision::Create);
1115 }
1116
1117 let path = safe_destination_path(destination, &entry.relative_path)?;
1118 let metadata = match tokio::fs::metadata(&path).await {
1119 Ok(metadata) => metadata,
1120 Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
1121 return Ok(ResumeDecision::Create);
1122 }
1123 Err(source) => {
1124 return Err(Error::IoPath { path, source });
1125 }
1126 };
1127
1128 if !metadata.is_file() {
1129 return Ok(ResumeDecision::Conflict(format!(
1130 "{} already exists and is not a regular file",
1131 display_path(&path)
1132 )));
1133 }
1134
1135 if metadata.len() == entry.size {
1136 let expected = entry
1137 .blake3
1138 .as_deref()
1139 .ok_or_else(|| Error::Protocol("file manifest entry is missing BLAKE3".to_string()))?;
1140 let actual = hash_file(&path).await?;
1141 return if actual == expected {
1142 Ok(ResumeDecision::Skip)
1143 } else {
1144 Ok(ResumeDecision::Conflict(format!(
1145 "{} already exists with different contents",
1146 display_path(&path)
1147 )))
1148 };
1149 }
1150
1151 if metadata.len() > entry.size {
1152 return Ok(ResumeDecision::Conflict(format!(
1153 "{} is larger than incoming file",
1154 display_path(&path)
1155 )));
1156 }
1157
1158 let verified_offset = verified_prefix_offset(&path, entry, metadata.len(), chunk_size).await?;
1159 if verified_offset > 0 {
1160 Ok(ResumeDecision::ResumeFrom(verified_offset))
1161 } else {
1162 Ok(ResumeDecision::Conflict(format!(
1163 "{} already exists and does not match a verified prefix",
1164 display_path(&path)
1165 )))
1166 }
1167}
1168
1169pub async fn send_direct_file(
1170 request: &SendRequest,
1171 identity: &NativeIdentity,
1172 mut events: impl FnMut(TransferEvent),
1173) -> Result<TransferSummary> {
1174 ensure_rustls_provider();
1175
1176 let manifest = build_manifest(request.paths()).await?;
1177 let sources = manifest_source_map(request.paths())?;
1178 events(TransferEvent::SessionStarted {
1179 direction: TransferDirection::Send,
1180 session_id: manifest.session_id,
1181 files_total: manifest.file_entries().count(),
1182 bytes_total: manifest.total_file_bytes(),
1183 });
1184
1185 let mut endpoint = quinn::Endpoint::client(SocketAddr::from(([0, 0, 0, 0], 0)))?;
1186 endpoint.set_default_client_config(insecure_client_config()?);
1187 let connection = endpoint
1188 .connect(request.peer().address(), "localhost")?
1189 .await?;
1190 let (mut send, mut recv) = connection.open_bi().await?;
1191
1192 send.write_all(DIRECT_MAGIC).await?;
1193 write_json_frame(&mut send, &DirectProtocolHello::new(identity)).await?;
1194 let protocol: DirectProtocolResponse = read_json_frame(&mut recv).await?;
1195 if !protocol.accepted {
1196 return Err(Error::Protocol(protocol.error.unwrap_or_else(|| {
1197 "receiver rejected protocol negotiation".to_string()
1198 })));
1199 }
1200 if protocol.protocol_version != Some(NATIVE_PROTOCOL_VERSION) {
1201 return Err(Error::Protocol(format!(
1202 "receiver selected unsupported protocol version {:?}",
1203 protocol.protocol_version
1204 )));
1205 }
1206
1207 write_json_frame(&mut send, &manifest).await?;
1208
1209 let plan: DirectReceivePlan = read_json_frame(&mut recv).await?;
1210 let mut summaries = Vec::new();
1211 for file_plan in plan.files {
1212 let Some(entry) = manifest
1213 .file_entries()
1214 .find(|entry| entry.relative_path == file_plan.relative_path)
1215 else {
1216 return Err(Error::Protocol(format!(
1217 "receiver requested unknown file: {}",
1218 display_path(&file_plan.relative_path)
1219 )));
1220 };
1221
1222 let Some(source_path) = sources.get(&file_plan.relative_path) else {
1223 return Err(Error::Protocol(format!(
1224 "missing source for manifest file: {}",
1225 display_path(&file_plan.relative_path)
1226 )));
1227 };
1228
1229 if file_plan.action == DirectFileAction::Skip {
1230 let blake3 = entry.blake3.clone().unwrap_or_default();
1231 events(TransferEvent::FileFinished {
1232 direction: TransferDirection::Send,
1233 file_name: display_path(&entry.relative_path),
1234 bytes: entry.size,
1235 blake3: blake3.clone(),
1236 status: TransferFileStatus::Skipped,
1237 });
1238 summaries.push(TransferFileSummary {
1239 path: source_path.clone(),
1240 relative_path: entry.relative_path.clone(),
1241 bytes: entry.size,
1242 blake3,
1243 status: TransferFileStatus::Skipped,
1244 });
1245 continue;
1246 }
1247
1248 send_payload_file(&mut send, source_path, entry, file_plan.offset, &mut events).await?;
1249 let status = if file_plan.offset > 0 {
1250 TransferFileStatus::Resumed
1251 } else {
1252 TransferFileStatus::Sent
1253 };
1254 let blake3 = entry.blake3.clone().unwrap_or_default();
1255 events(TransferEvent::FileFinished {
1256 direction: TransferDirection::Send,
1257 file_name: display_path(&entry.relative_path),
1258 bytes: entry.size,
1259 blake3: blake3.clone(),
1260 status,
1261 });
1262 summaries.push(TransferFileSummary {
1263 path: source_path.clone(),
1264 relative_path: entry.relative_path.clone(),
1265 bytes: entry.size,
1266 blake3,
1267 status,
1268 });
1269 }
1270 send.finish()?;
1271
1272 let ack: DirectAck = read_json_frame(&mut recv).await?;
1273 if !ack.ok {
1274 return Err(Error::Transfer(
1275 ack.error
1276 .unwrap_or_else(|| "receiver rejected transfer".to_string()),
1277 ));
1278 }
1279
1280 connection.close(0u32.into(), b"done");
1281 endpoint.wait_idle().await;
1282
1283 let summary = summary_from_files(&manifest, summaries)?;
1284 events(TransferEvent::SessionFinished {
1285 direction: TransferDirection::Send,
1286 files_total: summary.files.len(),
1287 bytes_total: summary.bytes,
1288 blake3: summary.blake3.clone(),
1289 });
1290 Ok(summary)
1291}
1292
1293pub async fn receive_direct_file(
1294 request: &ReceiveRequest,
1295 destination: impl AsRef<Path>,
1296 identity: &NativeIdentity,
1297 mut events: impl FnMut(TransferEvent),
1298) -> Result<TransferSummary> {
1299 ensure_rustls_provider();
1300
1301 let destination = destination.as_ref();
1302 tokio::fs::create_dir_all(destination)
1303 .await
1304 .map_err(|source| Error::IoPath {
1305 path: destination.to_path_buf(),
1306 source,
1307 })?;
1308
1309 let server_config =
1310 quinn::ServerConfig::with_single_cert(identity.cert_chain(), identity.private_key())?;
1311 let endpoint = quinn::Endpoint::server(server_config, request.listen())?;
1312 let incoming = endpoint.accept().await.ok_or(Error::NoIncomingConnection)?;
1313 let connection = incoming.await?;
1314 let (mut send, mut recv) = connection.accept_bi().await?;
1315
1316 let result = receive_stream_file(destination, &mut recv, &mut send, &mut events).await;
1317 let ack = match &result {
1318 Ok(_) => DirectAck {
1319 ok: true,
1320 error: None,
1321 },
1322 Err(error) => DirectAck {
1323 ok: false,
1324 error: Some(error.to_string()),
1325 },
1326 };
1327 write_json_frame(&mut send, &ack).await?;
1328 send.finish()?;
1329 let _ = tokio::time::timeout(Duration::from_secs(1), connection.closed()).await;
1330 endpoint.close(0u32.into(), b"done");
1331 endpoint.wait_idle().await;
1332
1333 result
1334}
1335
1336async fn receive_stream_file(
1337 destination: &Path,
1338 recv: &mut quinn::RecvStream,
1339 send: &mut quinn::SendStream,
1340 events: &mut impl FnMut(TransferEvent),
1341) -> Result<TransferSummary> {
1342 let mut magic = [0; DIRECT_MAGIC.len()];
1343 recv.read_exact(&mut magic).await?;
1344 if &magic != DIRECT_MAGIC {
1345 return Err(Error::Protocol("bad direct-transfer magic".to_string()));
1346 }
1347
1348 let hello: DirectProtocolHello = read_json_frame(recv).await?;
1349 let protocol = negotiate_direct_protocol(&hello);
1350 write_json_frame(send, &protocol).await?;
1351 if !protocol.accepted {
1352 return Err(Error::Protocol(protocol.error.unwrap_or_else(|| {
1353 "unsupported direct protocol version".to_string()
1354 })));
1355 }
1356
1357 let manifest: TransferManifest = read_json_frame(recv).await?;
1358 validate_manifest(&manifest)?;
1359 events(TransferEvent::SessionStarted {
1360 direction: TransferDirection::Receive,
1361 session_id: manifest.session_id,
1362 files_total: manifest.file_entries().count(),
1363 bytes_total: manifest.total_file_bytes(),
1364 });
1365
1366 for entry in &manifest.entries {
1367 if entry.kind == ManifestEntryKind::Directory {
1368 let path = safe_destination_path(destination, &entry.relative_path)?;
1369 tokio::fs::create_dir_all(&path)
1370 .await
1371 .map_err(|source| Error::IoPath { path, source })?;
1372 }
1373 }
1374
1375 let mut plan = DirectReceivePlan { files: Vec::new() };
1376 for entry in manifest.file_entries() {
1377 let decision = decide_resume(destination, entry, manifest.chunk_size).await?;
1378 let (action, offset) = match decision {
1379 ResumeDecision::Create => (DirectFileAction::Receive, 0),
1380 ResumeDecision::Skip => (DirectFileAction::Skip, entry.size),
1381 ResumeDecision::ResumeFrom(offset) => (DirectFileAction::Receive, offset),
1382 ResumeDecision::Conflict(message) => return Err(Error::InvalidInput(message)),
1383 };
1384 plan.files.push(DirectFilePlan {
1385 relative_path: entry.relative_path.clone(),
1386 action,
1387 offset,
1388 });
1389 }
1390 write_json_frame(send, &plan).await?;
1391
1392 let mut summaries = Vec::new();
1393 for file_plan in &plan.files {
1394 let Some(entry) = manifest
1395 .file_entries()
1396 .find(|entry| entry.relative_path == file_plan.relative_path)
1397 else {
1398 return Err(Error::Protocol(format!(
1399 "planned file missing from manifest: {}",
1400 display_path(&file_plan.relative_path)
1401 )));
1402 };
1403
1404 let final_path = safe_destination_path(destination, &entry.relative_path)?;
1405 if file_plan.action == DirectFileAction::Skip {
1406 let blake3 = entry.blake3.clone().unwrap_or_default();
1407 events(TransferEvent::FileFinished {
1408 direction: TransferDirection::Receive,
1409 file_name: display_path(&entry.relative_path),
1410 bytes: entry.size,
1411 blake3: blake3.clone(),
1412 status: TransferFileStatus::Skipped,
1413 });
1414 summaries.push(TransferFileSummary {
1415 path: final_path,
1416 relative_path: entry.relative_path.clone(),
1417 bytes: entry.size,
1418 blake3,
1419 status: TransferFileStatus::Skipped,
1420 });
1421 continue;
1422 }
1423
1424 receive_payload_file(recv, destination, entry, file_plan.offset, events).await?;
1425 let actual_hash = hash_file(&final_path).await?;
1426 let expected_hash = entry
1427 .blake3
1428 .as_deref()
1429 .ok_or_else(|| Error::Protocol("file manifest entry is missing BLAKE3".to_string()))?;
1430 if actual_hash != expected_hash {
1431 return Err(Error::Transfer(format!(
1432 "BLAKE3 hash mismatch for {}",
1433 display_path(&entry.relative_path)
1434 )));
1435 }
1436
1437 let status = if file_plan.offset > 0 {
1438 TransferFileStatus::Resumed
1439 } else {
1440 TransferFileStatus::Received
1441 };
1442 events(TransferEvent::FileFinished {
1443 direction: TransferDirection::Receive,
1444 file_name: display_path(&entry.relative_path),
1445 bytes: entry.size,
1446 blake3: actual_hash.clone(),
1447 status,
1448 });
1449 summaries.push(TransferFileSummary {
1450 path: final_path,
1451 relative_path: entry.relative_path.clone(),
1452 bytes: entry.size,
1453 blake3: actual_hash,
1454 status,
1455 });
1456 }
1457
1458 let summary = summary_from_files(&manifest, summaries)?;
1459 events(TransferEvent::SessionFinished {
1460 direction: TransferDirection::Receive,
1461 files_total: summary.files.len(),
1462 bytes_total: summary.bytes,
1463 blake3: summary.blake3.clone(),
1464 });
1465 Ok(summary)
1466}
1467
1468async fn send_payload_file(
1469 send: &mut quinn::SendStream,
1470 source_path: &Path,
1471 entry: &ManifestEntry,
1472 offset: u64,
1473 events: &mut impl FnMut(TransferEvent),
1474) -> Result<()> {
1475 if offset > entry.size {
1476 return Err(Error::Protocol(format!(
1477 "resume offset exceeds file size for {}",
1478 display_path(&entry.relative_path)
1479 )));
1480 }
1481
1482 let header = DirectPayloadHeader {
1483 relative_path: entry.relative_path.clone(),
1484 offset,
1485 bytes: entry.size - offset,
1486 };
1487 write_json_frame(send, &header).await?;
1488 events(TransferEvent::FileStarted {
1489 direction: TransferDirection::Send,
1490 file_name: display_path(&entry.relative_path),
1491 bytes_total: entry.size,
1492 resume_offset: offset,
1493 });
1494
1495 let mut file = tokio::fs::File::open(source_path)
1496 .await
1497 .map_err(|source| Error::IoPath {
1498 path: source_path.to_path_buf(),
1499 source,
1500 })?;
1501 file.seek(SeekFrom::Start(offset))
1502 .await
1503 .map_err(|source| Error::IoPath {
1504 path: source_path.to_path_buf(),
1505 source,
1506 })?;
1507
1508 let mut buffer = vec![0; IO_BUFFER_SIZE];
1509 let mut bytes_done = offset;
1510 while bytes_done < entry.size {
1511 let remaining = entry.size - bytes_done;
1512 let read_size = buffer.len().min(remaining as usize);
1513 let read = file
1514 .read(&mut buffer[..read_size])
1515 .await
1516 .map_err(|source| Error::IoPath {
1517 path: source_path.to_path_buf(),
1518 source,
1519 })?;
1520 if read == 0 {
1521 return Err(Error::Protocol(format!(
1522 "source file ended early: {}",
1523 display_path(source_path)
1524 )));
1525 }
1526 send.write_all(&buffer[..read]).await?;
1527 bytes_done += read as u64;
1528 events(TransferEvent::Progress {
1529 direction: TransferDirection::Send,
1530 file_name: display_path(&entry.relative_path),
1531 bytes_done,
1532 bytes_total: entry.size,
1533 });
1534 }
1535
1536 Ok(())
1537}
1538
1539async fn receive_payload_file(
1540 recv: &mut quinn::RecvStream,
1541 destination: &Path,
1542 entry: &ManifestEntry,
1543 offset: u64,
1544 events: &mut impl FnMut(TransferEvent),
1545) -> Result<()> {
1546 let header: DirectPayloadHeader = read_json_frame(recv).await?;
1547 if header.relative_path != entry.relative_path
1548 || header.offset != offset
1549 || header.bytes != entry.size - offset
1550 {
1551 return Err(Error::Protocol(format!(
1552 "payload header does not match receive plan for {}",
1553 display_path(&entry.relative_path)
1554 )));
1555 }
1556 events(TransferEvent::FileStarted {
1557 direction: TransferDirection::Receive,
1558 file_name: display_path(&entry.relative_path),
1559 bytes_total: entry.size,
1560 resume_offset: offset,
1561 });
1562
1563 let final_path = safe_destination_path(destination, &entry.relative_path)?;
1564 if let Some(parent) = final_path.parent() {
1565 tokio::fs::create_dir_all(parent)
1566 .await
1567 .map_err(|source| Error::IoPath {
1568 path: parent.to_path_buf(),
1569 source,
1570 })?;
1571 }
1572
1573 let write_path = if offset == 0 {
1574 temp_path_for(&final_path)
1575 } else {
1576 final_path.clone()
1577 };
1578
1579 let mut file = if offset == 0 {
1580 tokio::fs::File::create(&write_path)
1581 .await
1582 .map_err(|source| Error::IoPath {
1583 path: write_path.clone(),
1584 source,
1585 })?
1586 } else {
1587 let mut file = tokio::fs::OpenOptions::new()
1588 .write(true)
1589 .open(&write_path)
1590 .await
1591 .map_err(|source| Error::IoPath {
1592 path: write_path.clone(),
1593 source,
1594 })?;
1595 file.set_len(offset).await.map_err(|source| Error::IoPath {
1596 path: write_path.clone(),
1597 source,
1598 })?;
1599 file.seek(SeekFrom::Start(offset))
1600 .await
1601 .map_err(|source| Error::IoPath {
1602 path: write_path.clone(),
1603 source,
1604 })?;
1605 file
1606 };
1607
1608 let mut bytes_done = offset;
1609 let mut bytes_remaining = header.bytes;
1610 let mut buffer = vec![0; IO_BUFFER_SIZE];
1611 while bytes_remaining > 0 {
1612 let read_size = buffer.len().min(bytes_remaining as usize);
1613 let read = recv.read(&mut buffer[..read_size]).await?;
1614 let Some(read) = read else {
1615 if offset == 0 {
1616 let _ = tokio::fs::remove_file(&write_path).await;
1617 }
1618 return Err(Error::Protocol(
1619 "stream ended before file payload".to_string(),
1620 ));
1621 };
1622
1623 file.write_all(&buffer[..read])
1624 .await
1625 .map_err(|source| Error::IoPath {
1626 path: write_path.clone(),
1627 source,
1628 })?;
1629 bytes_done += read as u64;
1630 bytes_remaining -= read as u64;
1631 events(TransferEvent::Progress {
1632 direction: TransferDirection::Receive,
1633 file_name: display_path(&entry.relative_path),
1634 bytes_done,
1635 bytes_total: entry.size,
1636 });
1637 }
1638
1639 file.flush().await.map_err(|source| Error::IoPath {
1640 path: write_path.clone(),
1641 source,
1642 })?;
1643 drop(file);
1644
1645 if offset == 0 {
1646 tokio::fs::rename(&write_path, &final_path)
1647 .await
1648 .map_err(|source| Error::IoPath {
1649 path: final_path,
1650 source,
1651 })?;
1652 }
1653
1654 Ok(())
1655}
1656
1657async fn write_json_frame<T: Serialize>(send: &mut quinn::SendStream, value: &T) -> Result<()> {
1658 let bytes = serde_json::to_vec(value).map_err(|error| Error::Protocol(error.to_string()))?;
1659 if bytes.len() > MAX_FRAME_LEN {
1660 return Err(Error::Protocol("frame exceeds maximum length".to_string()));
1661 }
1662 send.write_all(&(bytes.len() as u32).to_be_bytes()).await?;
1663 send.write_all(&bytes).await?;
1664 Ok(())
1665}
1666
1667async fn read_json_frame<T: for<'de> Deserialize<'de>>(recv: &mut quinn::RecvStream) -> Result<T> {
1668 let mut len = [0; 4];
1669 recv.read_exact(&mut len).await?;
1670 let len = u32::from_be_bytes(len) as usize;
1671 if len > MAX_FRAME_LEN {
1672 return Err(Error::Protocol("frame exceeds maximum length".to_string()));
1673 }
1674
1675 let mut bytes = vec![0; len];
1676 recv.read_exact(&mut bytes).await?;
1677 serde_json::from_slice(&bytes).map_err(|error| Error::Protocol(error.to_string()))
1678}
1679
1680fn negotiate_direct_protocol(hello: &DirectProtocolHello) -> DirectProtocolResponse {
1681 if hello.min_protocol_version > NATIVE_PROTOCOL_VERSION {
1682 return DirectProtocolResponse::reject(format!(
1683 "peer requires protocol version {}, local max is {}",
1684 hello.min_protocol_version, NATIVE_PROTOCOL_VERSION
1685 ));
1686 }
1687
1688 if hello.protocol_version < MIN_NATIVE_PROTOCOL_VERSION {
1689 return DirectProtocolResponse::reject(format!(
1690 "peer max protocol version {} is below local minimum {}",
1691 hello.protocol_version, MIN_NATIVE_PROTOCOL_VERSION
1692 ));
1693 }
1694
1695 DirectProtocolResponse::accept(NATIVE_PROTOCOL_VERSION.min(hello.protocol_version))
1696}
1697
1698#[derive(Debug, Clone)]
1699struct ManifestSource {
1700 source_path: PathBuf,
1701 relative_path: PathBuf,
1702}
1703
1704fn collect_manifest_sources(paths: &[PathBuf]) -> Result<Vec<ManifestSource>> {
1705 let mut sources = Vec::new();
1706 for path in paths {
1707 let metadata = std::fs::symlink_metadata(path).map_err(|source| Error::IoPath {
1708 path: path.clone(),
1709 source,
1710 })?;
1711 if metadata.file_type().is_symlink() {
1712 return Err(Error::InvalidInput(format!(
1713 "{} is a symlink; symlink transfers are not supported",
1714 display_path(path)
1715 )));
1716 }
1717
1718 let root_name = path
1719 .file_name()
1720 .ok_or_else(|| Error::InvalidInput(format!("{} has no file name", display_path(path))))?
1721 .to_os_string();
1722 let relative_path = PathBuf::from(root_name);
1723
1724 if metadata.is_dir() {
1725 sources.push(ManifestSource {
1726 source_path: path.clone(),
1727 relative_path: relative_path.clone(),
1728 });
1729 collect_dir_sources(path, &relative_path, &mut sources)?;
1730 } else if metadata.is_file() {
1731 sources.push(ManifestSource {
1732 source_path: path.clone(),
1733 relative_path,
1734 });
1735 } else {
1736 return Err(Error::InvalidInput(format!(
1737 "{} is not a regular file or directory",
1738 display_path(path)
1739 )));
1740 }
1741 }
1742
1743 Ok(sources)
1744}
1745
1746fn collect_dir_sources(
1747 dir: &Path,
1748 relative_dir: &Path,
1749 sources: &mut Vec<ManifestSource>,
1750) -> Result<()> {
1751 for entry in std::fs::read_dir(dir).map_err(|source| Error::IoPath {
1752 path: dir.to_path_buf(),
1753 source,
1754 })? {
1755 let entry = entry.map_err(|source| Error::IoPath {
1756 path: dir.to_path_buf(),
1757 source,
1758 })?;
1759 let path = entry.path();
1760 let metadata = std::fs::symlink_metadata(&path).map_err(|source| Error::IoPath {
1761 path: path.clone(),
1762 source,
1763 })?;
1764 if metadata.file_type().is_symlink() {
1765 return Err(Error::InvalidInput(format!(
1766 "{} is a symlink; symlink transfers are not supported",
1767 display_path(&path)
1768 )));
1769 }
1770
1771 let relative_path = relative_dir.join(entry.file_name());
1772 if metadata.is_dir() {
1773 sources.push(ManifestSource {
1774 source_path: path.clone(),
1775 relative_path: relative_path.clone(),
1776 });
1777 collect_dir_sources(&path, &relative_path, sources)?;
1778 } else if metadata.is_file() {
1779 sources.push(ManifestSource {
1780 source_path: path,
1781 relative_path,
1782 });
1783 }
1784 }
1785
1786 Ok(())
1787}
1788
1789fn manifest_source_map(paths: &[PathBuf]) -> Result<BTreeMap<PathBuf, PathBuf>> {
1790 Ok(collect_manifest_sources(paths)?
1791 .into_iter()
1792 .filter_map(|source| {
1793 let metadata = std::fs::metadata(&source.source_path).ok()?;
1794 metadata
1795 .is_file()
1796 .then_some((source.relative_path, source.source_path))
1797 })
1798 .collect())
1799}
1800
1801async fn build_manifest_entry(source: ManifestSource) -> Result<ManifestEntry> {
1802 validate_relative_transfer_path(&source.relative_path)?;
1803 let metadata = tokio::fs::metadata(&source.source_path)
1804 .await
1805 .map_err(|source_error| Error::IoPath {
1806 path: source.source_path.clone(),
1807 source: source_error,
1808 })?;
1809 let unix_mode = unix_mode(&metadata);
1810 let modified_unix_ms = modified_unix_ms(&metadata);
1811
1812 if metadata.is_dir() {
1813 return Ok(ManifestEntry {
1814 relative_path: source.relative_path,
1815 kind: ManifestEntryKind::Directory,
1816 size: 0,
1817 blake3: None,
1818 chunks: Vec::new(),
1819 unix_mode,
1820 modified_unix_ms,
1821 });
1822 }
1823
1824 let (blake3, chunks) = hash_file_with_chunks(&source.source_path, DEFAULT_CHUNK_SIZE).await?;
1825 Ok(ManifestEntry {
1826 relative_path: source.relative_path,
1827 kind: ManifestEntryKind::File,
1828 size: metadata.len(),
1829 blake3: Some(blake3),
1830 chunks,
1831 unix_mode,
1832 modified_unix_ms,
1833 })
1834}
1835
1836fn validate_manifest(manifest: &TransferManifest) -> Result<()> {
1837 if manifest.version != MANIFEST_VERSION {
1838 return Err(Error::Protocol(format!(
1839 "unsupported manifest version {}",
1840 manifest.version
1841 )));
1842 }
1843 if manifest.chunk_size == 0 {
1844 return Err(Error::Protocol("manifest chunk size is zero".to_string()));
1845 }
1846
1847 let mut seen = BTreeSet::new();
1848 for entry in &manifest.entries {
1849 validate_relative_transfer_path(&entry.relative_path)?;
1850 if !seen.insert(entry.relative_path.clone()) {
1851 return Err(Error::Protocol(format!(
1852 "duplicate manifest path: {}",
1853 display_path(&entry.relative_path)
1854 )));
1855 }
1856 if entry.kind == ManifestEntryKind::File && entry.blake3.is_none() {
1857 return Err(Error::Protocol(format!(
1858 "file manifest entry is missing BLAKE3: {}",
1859 display_path(&entry.relative_path)
1860 )));
1861 }
1862 }
1863
1864 Ok(())
1865}
1866
1867fn validate_relative_transfer_path(path: &Path) -> Result<()> {
1868 if path.as_os_str().is_empty() || path.is_absolute() {
1869 return Err(Error::InvalidInput(format!(
1870 "unsafe transfer path: {}",
1871 display_path(path)
1872 )));
1873 }
1874
1875 let mut normal_components = 0usize;
1876 for component in path.components() {
1877 match component {
1878 Component::Normal(value) => {
1879 let Some(name) = value.to_str() else {
1880 return Err(Error::InvalidInput(format!(
1881 "transfer path must be UTF-8: {}",
1882 display_path(path)
1883 )));
1884 };
1885 validate_path_component(name, path)?;
1886 normal_components += 1;
1887 }
1888 Component::CurDir
1889 | Component::ParentDir
1890 | Component::RootDir
1891 | Component::Prefix(_) => {
1892 return Err(Error::InvalidInput(format!(
1893 "unsafe transfer path: {}",
1894 display_path(path)
1895 )));
1896 }
1897 }
1898 }
1899
1900 if normal_components == 0 {
1901 return Err(Error::InvalidInput(format!(
1902 "unsafe transfer path: {}",
1903 display_path(path)
1904 )));
1905 }
1906
1907 Ok(())
1908}
1909
1910fn validate_path_component(component: &str, full_path: &Path) -> Result<()> {
1911 if component.is_empty()
1912 || component.contains('\\')
1913 || component.contains('/')
1914 || component.contains(':')
1915 || component.ends_with(' ')
1916 || component.ends_with('.')
1917 || is_windows_reserved_name(component)
1918 {
1919 return Err(Error::InvalidInput(format!(
1920 "unsafe transfer path: {}",
1921 display_path(full_path)
1922 )));
1923 }
1924
1925 Ok(())
1926}
1927
1928fn is_windows_reserved_name(component: &str) -> bool {
1929 let stem = component
1930 .split('.')
1931 .next()
1932 .unwrap_or(component)
1933 .trim_end_matches([' ', '.'])
1934 .to_ascii_uppercase();
1935 matches!(
1936 stem.as_str(),
1937 "CON"
1938 | "PRN"
1939 | "AUX"
1940 | "NUL"
1941 | "COM1"
1942 | "COM2"
1943 | "COM3"
1944 | "COM4"
1945 | "COM5"
1946 | "COM6"
1947 | "COM7"
1948 | "COM8"
1949 | "COM9"
1950 | "LPT1"
1951 | "LPT2"
1952 | "LPT3"
1953 | "LPT4"
1954 | "LPT5"
1955 | "LPT6"
1956 | "LPT7"
1957 | "LPT8"
1958 | "LPT9"
1959 )
1960}
1961
1962async fn hash_file(path: &Path) -> Result<String> {
1963 hash_file_with_chunks(path, DEFAULT_CHUNK_SIZE)
1964 .await
1965 .map(|(hash, _)| hash)
1966}
1967
1968async fn hash_file_with_chunks(path: &Path, chunk_size: u64) -> Result<(String, Vec<String>)> {
1969 let mut file = tokio::fs::File::open(path)
1970 .await
1971 .map_err(|source| Error::IoPath {
1972 path: path.to_path_buf(),
1973 source,
1974 })?;
1975 let mut hasher = blake3::Hasher::new();
1976 let mut chunk_hasher = blake3::Hasher::new();
1977 let mut chunk_bytes = 0u64;
1978 let mut chunks = Vec::new();
1979 let mut buffer = vec![0; IO_BUFFER_SIZE];
1980
1981 loop {
1982 let read = file
1983 .read(&mut buffer)
1984 .await
1985 .map_err(|source| Error::IoPath {
1986 path: path.to_path_buf(),
1987 source,
1988 })?;
1989 if read == 0 {
1990 break;
1991 }
1992 hasher.update(&buffer[..read]);
1993
1994 let mut remaining = &buffer[..read];
1995 while !remaining.is_empty() {
1996 let take = remaining.len().min((chunk_size - chunk_bytes) as usize);
1997 chunk_hasher.update(&remaining[..take]);
1998 chunk_bytes += take as u64;
1999 remaining = &remaining[take..];
2000 if chunk_bytes == chunk_size {
2001 chunks.push(chunk_hasher.finalize().to_hex().to_string());
2002 chunk_hasher = blake3::Hasher::new();
2003 chunk_bytes = 0;
2004 }
2005 }
2006 }
2007
2008 if chunk_bytes > 0 {
2009 chunks.push(chunk_hasher.finalize().to_hex().to_string());
2010 }
2011
2012 Ok((hasher.finalize().to_hex().to_string(), chunks))
2013}
2014
2015async fn verified_prefix_offset(
2016 path: &Path,
2017 entry: &ManifestEntry,
2018 existing_len: u64,
2019 chunk_size: u64,
2020) -> Result<u64> {
2021 if existing_len == 0 || entry.chunks.is_empty() {
2022 return Ok(0);
2023 }
2024
2025 let chunks_to_check = (existing_len / chunk_size).min(entry.chunks.len() as u64) as usize;
2026 if chunks_to_check == 0 {
2027 return Ok(0);
2028 }
2029
2030 let mut file = tokio::fs::File::open(path)
2031 .await
2032 .map_err(|source| Error::IoPath {
2033 path: path.to_path_buf(),
2034 source,
2035 })?;
2036 let mut buffer = vec![0; IO_BUFFER_SIZE];
2037 let mut verified_chunks = 0usize;
2038
2039 for expected in entry.chunks.iter().take(chunks_to_check) {
2040 let mut remaining = chunk_size;
2041 let mut hasher = blake3::Hasher::new();
2042 while remaining > 0 {
2043 let read_size = buffer.len().min(remaining as usize);
2044 file.read_exact(&mut buffer[..read_size])
2045 .await
2046 .map_err(|source| Error::IoPath {
2047 path: path.to_path_buf(),
2048 source,
2049 })?;
2050 hasher.update(&buffer[..read_size]);
2051 remaining -= read_size as u64;
2052 }
2053
2054 if hasher.finalize().to_hex().as_str() != expected {
2055 break;
2056 }
2057 verified_chunks += 1;
2058 }
2059
2060 Ok(verified_chunks as u64 * chunk_size)
2061}
2062
2063fn summary_from_files(
2064 manifest: &TransferManifest,
2065 files: Vec<TransferFileSummary>,
2066) -> Result<TransferSummary> {
2067 let bytes = manifest.total_file_bytes();
2068 let blake3 = if files.len() == 1 {
2069 files[0].blake3.clone()
2070 } else {
2071 manifest.digest()?
2072 };
2073 let file_name = if files.len() == 1 {
2074 display_path(&files[0].relative_path)
2075 } else {
2076 format!("{} files", files.len())
2077 };
2078 let path = files
2079 .first()
2080 .map(|file| file.path.clone())
2081 .unwrap_or_default();
2082
2083 Ok(TransferSummary {
2084 path,
2085 file_name,
2086 bytes,
2087 blake3,
2088 files,
2089 })
2090}
2091
2092fn temp_path_for(final_path: &Path) -> PathBuf {
2093 let file_name = final_path
2094 .file_name()
2095 .and_then(|name| name.to_str())
2096 .unwrap_or("transfer");
2097 final_path.with_file_name(format!(".{file_name}.{}.ferry-part", Uuid::new_v4()))
2098}
2099
2100fn modified_unix_ms(metadata: &Metadata) -> Option<i128> {
2101 system_time_unix_ms(metadata.modified().ok()?)
2102}
2103
2104fn system_time_unix_ms(time: SystemTime) -> Option<i128> {
2105 match time.duration_since(UNIX_EPOCH) {
2106 Ok(duration) => Some(duration.as_millis() as i128),
2107 Err(error) => Some(-(error.duration().as_millis() as i128)),
2108 }
2109}
2110
2111#[cfg(unix)]
2112fn unix_mode(metadata: &Metadata) -> Option<u32> {
2113 use std::os::unix::fs::PermissionsExt;
2114
2115 Some(metadata.permissions().mode())
2116}
2117
2118#[cfg(not(unix))]
2119fn unix_mode(_metadata: &Metadata) -> Option<u32> {
2120 None
2121}
2122
2123fn insecure_client_config() -> Result<quinn::ClientConfig> {
2124 let crypto = rustls::ClientConfig::builder()
2125 .dangerous()
2126 .with_custom_certificate_verifier(SkipServerVerification::new())
2127 .with_no_client_auth();
2128
2129 Ok(quinn::ClientConfig::new(Arc::new(
2130 QuicClientConfig::try_from(crypto)
2131 .map_err(|error| Error::CryptoConfig(error.to_string()))?,
2132 )))
2133}
2134
2135#[derive(Debug)]
2136struct SkipServerVerification(Arc<CryptoProvider>);
2137
2138impl SkipServerVerification {
2139 fn new() -> Arc<Self> {
2140 Arc::new(Self(Arc::new(rustls::crypto::ring::default_provider())))
2141 }
2142}
2143
2144impl ServerCertVerifier for SkipServerVerification {
2145 fn verify_server_cert(
2146 &self,
2147 _end_entity: &CertificateDer<'_>,
2148 _intermediates: &[CertificateDer<'_>],
2149 _server_name: &ServerName<'_>,
2150 _ocsp: &[u8],
2151 _now: UnixTime,
2152 ) -> std::result::Result<ServerCertVerified, rustls::Error> {
2153 Ok(ServerCertVerified::assertion())
2154 }
2155
2156 fn verify_tls12_signature(
2157 &self,
2158 message: &[u8],
2159 cert: &CertificateDer<'_>,
2160 dss: &DigitallySignedStruct,
2161 ) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
2162 verify_tls12_signature(
2163 message,
2164 cert,
2165 dss,
2166 &self.0.signature_verification_algorithms,
2167 )
2168 }
2169
2170 fn verify_tls13_signature(
2171 &self,
2172 message: &[u8],
2173 cert: &CertificateDer<'_>,
2174 dss: &DigitallySignedStruct,
2175 ) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
2176 verify_tls13_signature(
2177 message,
2178 cert,
2179 dss,
2180 &self.0.signature_verification_algorithms,
2181 )
2182 }
2183
2184 fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
2185 self.0.signature_verification_algorithms.supported_schemes()
2186 }
2187}
2188
2189fn ensure_rustls_provider() {
2190 let _ = rustls::crypto::ring::default_provider().install_default();
2191}
2192
2193pub fn config_dir() -> Result<PathBuf> {
2194 let base_dirs = BaseDirs::new().ok_or(Error::ConfigDirUnavailable)?;
2195 Ok(base_dirs.config_dir().join(CONFIG_DIR_NAME))
2196}
2197
2198fn default_alias() -> String {
2199 std::env::var("HOSTNAME")
2200 .or_else(|_| std::env::var("COMPUTERNAME"))
2201 .ok()
2202 .filter(|value| !value.trim().is_empty())
2203 .unwrap_or_else(|| "fileferry-device".to_string())
2204}
2205
2206fn expand_home_path(path: &str) -> Result<PathBuf> {
2207 if path == "~" {
2208 return Ok(BaseDirs::new()
2209 .ok_or(Error::ConfigDirUnavailable)?
2210 .home_dir()
2211 .to_path_buf());
2212 }
2213
2214 if let Some(rest) = path.strip_prefix("~/") {
2215 return Ok(BaseDirs::new()
2216 .ok_or(Error::ConfigDirUnavailable)?
2217 .home_dir()
2218 .join(rest));
2219 }
2220
2221 Ok(PathBuf::from(path))
2222}
2223
2224fn normalized_fingerprint(fingerprint: String) -> Result<String> {
2225 let fingerprint = fingerprint.trim().to_ascii_lowercase();
2226 if fingerprint.len() != 64 || !fingerprint.chars().all(|char| char.is_ascii_hexdigit()) {
2227 return Err(Error::InvalidInput(
2228 "peer fingerprint must be a full 64-character hex BLAKE3 fingerprint".to_string(),
2229 ));
2230 }
2231
2232 Ok(fingerprint)
2233}
2234
2235fn write_toml_file<T: Serialize>(path: &Path, value: &T) -> Result<()> {
2236 let bytes =
2237 toml::to_string_pretty(value).map_err(|error| Error::ConfigSerialize(error.to_string()))?;
2238 if let Some(parent) = path.parent() {
2239 std::fs::create_dir_all(parent).map_err(|source| Error::IoPath {
2240 path: parent.to_path_buf(),
2241 source,
2242 })?;
2243 }
2244
2245 let temp_path = path.with_extension(format!("tmp-{}", Uuid::new_v4()));
2246 std::fs::write(&temp_path, bytes).map_err(|source| Error::IoPath {
2247 path: temp_path.clone(),
2248 source,
2249 })?;
2250 std::fs::rename(&temp_path, path).map_err(|source| Error::IoPath {
2251 path: path.to_path_buf(),
2252 source,
2253 })?;
2254 Ok(())
2255}
2256
2257fn write_private_file(path: &Path, bytes: &[u8]) -> Result<()> {
2258 std::fs::write(path, bytes).map_err(|source| Error::IoPath {
2259 path: path.to_path_buf(),
2260 source,
2261 })?;
2262
2263 #[cfg(unix)]
2264 {
2265 use std::os::unix::fs::PermissionsExt;
2266
2267 let mut permissions = std::fs::metadata(path)
2268 .map_err(|source| Error::IoPath {
2269 path: path.to_path_buf(),
2270 source,
2271 })?
2272 .permissions();
2273 permissions.set_mode(0o600);
2274 std::fs::set_permissions(path, permissions).map_err(|source| Error::IoPath {
2275 path: path.to_path_buf(),
2276 source,
2277 })?;
2278 }
2279
2280 Ok(())
2281}
2282
2283#[cfg(test)]
2284mod tests {
2285 use super::*;
2286 use std::sync::{Arc, Mutex};
2287
2288 #[test]
2289 fn direct_peer_parses_socket_address() {
2290 let peer = DirectPeer::parse("127.0.0.1:53318").expect("address parses");
2291
2292 assert_eq!(peer.address().to_string(), "127.0.0.1:53318");
2293 }
2294
2295 #[test]
2296 fn direct_peer_rejects_missing_port() {
2297 assert!(DirectPeer::parse("127.0.0.1").is_err());
2298 }
2299
2300 #[test]
2301 fn send_request_requires_at_least_one_path() {
2302 let peer = DirectPeer::parse("127.0.0.1:53318").expect("address parses");
2303
2304 assert!(SendRequest::new(peer, Vec::new()).is_err());
2305 }
2306
2307 #[test]
2308 fn validate_send_paths_rejects_empty_slice() {
2309 assert!(validate_send_paths(&[]).is_err());
2310 }
2311
2312 #[tokio::test]
2313 async fn manifest_walks_directory_and_serializes_entries() {
2314 let temp = tempfile::tempdir().expect("tempdir");
2315 let root = temp.path().join("bundle");
2316 let nested = root.join("nested");
2317 tokio::fs::create_dir_all(&nested).await.expect("dirs");
2318 tokio::fs::write(root.join("a.txt"), b"alpha")
2319 .await
2320 .expect("file a");
2321 tokio::fs::write(nested.join("b.txt"), b"beta")
2322 .await
2323 .expect("file b");
2324
2325 let manifest = build_manifest(&[root]).await.expect("manifest");
2326 let json = serde_json::to_string(&manifest).expect("manifest serializes");
2327 let decoded: TransferManifest = serde_json::from_str(&json).expect("manifest decodes");
2328
2329 assert_eq!(decoded.version, MANIFEST_VERSION);
2330 assert!(decoded.entries.iter().any(|entry| {
2331 entry.kind == ManifestEntryKind::Directory
2332 && entry.relative_path == Path::new("bundle/nested")
2333 }));
2334 assert!(decoded.entries.iter().any(|entry| {
2335 entry.kind == ManifestEntryKind::File
2336 && entry.relative_path == Path::new("bundle/nested/b.txt")
2337 && entry.size == 4
2338 && entry.blake3.is_some()
2339 }));
2340 }
2341
2342 #[test]
2343 fn safe_destination_path_rejects_unsafe_paths() {
2344 let dest = Path::new("/tmp/ferry-dest");
2345
2346 assert!(safe_destination_path(dest, Path::new("../escape.txt")).is_err());
2347 assert!(safe_destination_path(dest, Path::new("/absolute.txt")).is_err());
2348 assert!(safe_destination_path(dest, Path::new("nested/CON")).is_err());
2349 assert!(safe_destination_path(dest, Path::new("nested\\escape.txt")).is_err());
2350 assert!(safe_destination_path(dest, Path::new("nested/ok.txt")).is_ok());
2351 }
2352
2353 #[test]
2354 fn safe_destination_path_rejects_generated_escape_and_reserved_patterns() {
2355 let dest = Path::new("/tmp/ferry-dest");
2356 let unsafe_components = [
2357 "..",
2358 ".",
2359 "CON",
2360 "con.txt",
2361 "NUL",
2362 "COM1",
2363 "LPT9",
2364 "trailingspace ",
2365 "trailingdot.",
2366 "has:colon",
2367 "has\\backslash",
2368 ];
2369
2370 for component in unsafe_components {
2371 assert!(
2372 safe_destination_path(dest, Path::new(component)).is_err(),
2373 "{component} should be rejected as a top-level path"
2374 );
2375 if component != "." {
2376 assert!(
2377 safe_destination_path(dest, Path::new("safe").join(component).as_path())
2378 .is_err(),
2379 "{component} should be rejected as a nested path"
2380 );
2381 }
2382 }
2383
2384 for component in ["alpha", "alpha-1", "alpha_1", "report.final.txt"] {
2385 let path = safe_destination_path(dest, Path::new("safe").join(component).as_path())
2386 .expect("safe generated path accepted");
2387 assert!(path.starts_with(dest));
2388 }
2389 }
2390
2391 #[tokio::test]
2392 async fn resume_decision_skips_existing_matching_file() {
2393 let temp = tempfile::tempdir().expect("tempdir");
2394 let source = temp.path().join("source.txt");
2395 let dest = temp.path().join("dest");
2396 tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2397 tokio::fs::write(&source, b"same contents")
2398 .await
2399 .expect("source");
2400 tokio::fs::write(dest.join("source.txt"), b"same contents")
2401 .await
2402 .expect("dest file");
2403 let manifest = build_manifest(&[source]).await.expect("manifest");
2404 let entry = manifest.file_entries().next().expect("file entry");
2405
2406 let decision = decide_resume(&dest, entry, manifest.chunk_size)
2407 .await
2408 .expect("decision");
2409
2410 assert_eq!(decision, ResumeDecision::Skip);
2411 }
2412
2413 #[tokio::test]
2414 async fn resume_decision_returns_verified_chunk_offset() {
2415 let temp = tempfile::tempdir().expect("tempdir");
2416 let source = temp.path().join("large.bin");
2417 let dest = temp.path().join("dest");
2418 tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2419 let bytes = vec![42; (DEFAULT_CHUNK_SIZE as usize * 2) + 128];
2420 tokio::fs::write(&source, &bytes).await.expect("source");
2421 tokio::fs::write(
2422 dest.join("large.bin"),
2423 &bytes[..DEFAULT_CHUNK_SIZE as usize + 99],
2424 )
2425 .await
2426 .expect("partial");
2427 let manifest = build_manifest(&[source]).await.expect("manifest");
2428 let entry = manifest.file_entries().next().expect("file entry");
2429
2430 let decision = decide_resume(&dest, entry, manifest.chunk_size)
2431 .await
2432 .expect("decision");
2433
2434 assert_eq!(decision, ResumeDecision::ResumeFrom(DEFAULT_CHUNK_SIZE));
2435 }
2436
2437 #[tokio::test]
2438 async fn resume_decision_rejects_existing_file_with_mismatched_contents() {
2439 let temp = tempfile::tempdir().expect("tempdir");
2440 let source = temp.path().join("source.txt");
2441 let dest = temp.path().join("dest");
2442 tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2443 tokio::fs::write(&source, b"expected contents")
2444 .await
2445 .expect("source");
2446 tokio::fs::write(dest.join("source.txt"), b"different contents")
2447 .await
2448 .expect("conflicting dest file");
2449 let manifest = build_manifest(&[source]).await.expect("manifest");
2450 let entry = manifest.file_entries().next().expect("file entry");
2451
2452 let decision = decide_resume(&dest, entry, manifest.chunk_size)
2453 .await
2454 .expect("decision");
2455
2456 assert!(matches!(decision, ResumeDecision::Conflict(_)));
2457 }
2458
2459 #[tokio::test]
2460 async fn resume_decision_rejects_existing_file_larger_than_manifest_entry() {
2461 let temp = tempfile::tempdir().expect("tempdir");
2462 let source = temp.path().join("source.txt");
2463 let dest = temp.path().join("dest");
2464 tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2465 tokio::fs::write(&source, b"small").await.expect("source");
2466 tokio::fs::write(dest.join("source.txt"), b"larger destination")
2467 .await
2468 .expect("larger dest file");
2469 let manifest = build_manifest(&[source]).await.expect("manifest");
2470 let entry = manifest.file_entries().next().expect("file entry");
2471
2472 let decision = decide_resume(&dest, entry, manifest.chunk_size)
2473 .await
2474 .expect("decision");
2475
2476 assert!(matches!(decision, ResumeDecision::Conflict(_)));
2477 }
2478
2479 #[test]
2480 fn identity_is_loaded_after_generation() {
2481 let temp = tempfile::tempdir().expect("tempdir");
2482 let generated =
2483 NativeIdentity::load_or_generate_in(temp.path()).expect("identity generated");
2484 let loaded = NativeIdentity::load_or_generate_in(temp.path()).expect("identity loaded");
2485
2486 assert_eq!(generated.fingerprint(), loaded.fingerprint());
2487 }
2488
2489 #[test]
2490 fn app_config_round_trips_toml() {
2491 let temp = tempfile::tempdir().expect("tempdir");
2492 let path = temp.path().join("config.toml");
2493 let config = AppConfig {
2494 alias: "desk".to_string(),
2495 ..AppConfig::default()
2496 };
2497
2498 config.save_to_path(&path).expect("config saved");
2499 let loaded = AppConfig::load_or_default_from(&path).expect("config loaded");
2500
2501 assert_eq!(loaded.alias, "desk");
2502 assert_eq!(loaded.listen_port, 53317);
2503 assert!(loaded.trust.require_fingerprint);
2504 }
2505
2506 #[test]
2507 fn app_config_redacts_psk_for_display_output() {
2508 let config = AppConfig {
2509 trust: TrustConfig {
2510 psk: "super-secret-psk".to_string(),
2511 ..TrustConfig::default()
2512 },
2513 ..AppConfig::default()
2514 };
2515
2516 let redacted = config.to_redacted_toml_string().expect("config serializes");
2517
2518 assert!(!redacted.contains("super-secret-psk"));
2519 assert!(redacted.contains(r#"psk = "<redacted>""#));
2520 }
2521
2522 #[test]
2523 fn daemon_config_defaults_from_app_config_and_round_trips_toml() {
2524 let temp = tempfile::tempdir().expect("tempdir");
2525 let path = temp.path().join("daemon.toml");
2526 let app_config = AppConfig {
2527 quic_port: 54444,
2528 download_dir: temp.path().join("downloads").display().to_string(),
2529 ..AppConfig::default()
2530 };
2531
2532 let defaulted =
2533 DaemonConfig::load_or_default_from(&path, &app_config).expect("daemon config");
2534 assert_eq!(
2535 defaulted.listen,
2536 SocketAddr::from(([0, 0, 0, 0], app_config.quic_port))
2537 );
2538 assert_eq!(defaulted.destination, temp.path().join("downloads"));
2539
2540 let config = DaemonConfig {
2541 listen: SocketAddr::from(([127, 0, 0, 1], 53318)),
2542 destination: temp.path().join("daemon-dest"),
2543 };
2544 config.save_to_path(&path).expect("daemon config saved");
2545 let loaded =
2546 DaemonConfig::load_or_default_from(&path, &app_config).expect("daemon config loaded");
2547
2548 assert_eq!(loaded, config);
2549 }
2550
2551 #[test]
2552 fn trust_store_load_save_and_forget_round_trip() {
2553 let temp = tempfile::tempdir().expect("tempdir");
2554 let path = temp.path().join("known_peers.toml");
2555 let fingerprint = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
2556 let mut store = TrustStore::default();
2557
2558 store
2559 .trust_fingerprint(fingerprint)
2560 .expect("fingerprint trusted");
2561 store.save_to_path(&path).expect("trust store saved");
2562 let mut loaded = TrustStore::load_or_default_from(&path).expect("trust store loaded");
2563
2564 assert_eq!(loaded.records().count(), 1);
2565 assert_eq!(loaded.peers[fingerprint].trust_state, TrustState::Trusted);
2566 assert!(loaded.forget(fingerprint));
2567 assert_eq!(loaded.records().count(), 0);
2568 }
2569
2570 #[test]
2571 fn trust_store_rejects_short_fingerprint_without_discovery_lookup() {
2572 let mut store = TrustStore::default();
2573
2574 assert!(store.trust_fingerprint("9a4f2c").is_err());
2575 }
2576
2577 #[test]
2578 fn transfer_events_serialize_with_stable_event_names() {
2579 let event = TransferEvent::Progress {
2580 direction: TransferDirection::Send,
2581 file_name: "bundle/a.txt".to_string(),
2582 bytes_done: 5,
2583 bytes_total: 9,
2584 };
2585
2586 let json = serde_json::to_value(&event).expect("event serializes");
2587
2588 assert_eq!(json["event"], "progress");
2589 assert_eq!(json["direction"], "send");
2590 assert_eq!(json["file_name"], "bundle/a.txt");
2591 assert_eq!(json["bytes_done"], 5);
2592 assert_eq!(json["bytes_total"], 9);
2593 }
2594
2595 #[test]
2596 fn direct_protocol_hello_has_stable_golden_json() {
2597 let hello = DirectProtocolHello {
2598 protocol_version: 1,
2599 min_protocol_version: 1,
2600 client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2601 .to_string(),
2602 };
2603
2604 let json = serde_json::to_string(&hello).expect("hello serializes");
2605
2606 assert_eq!(
2607 json,
2608 r#"{"protocol_version":1,"min_protocol_version":1,"client_fingerprint":"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"}"#
2609 );
2610 }
2611
2612 #[test]
2613 fn direct_protocol_response_has_stable_golden_json() {
2614 let response = DirectProtocolResponse::accept(1);
2615
2616 let json = serde_json::to_string(&response).expect("response serializes");
2617
2618 assert_eq!(
2619 json,
2620 r#"{"accepted":true,"protocol_version":1,"error":null}"#
2621 );
2622 }
2623
2624 #[test]
2625 fn direct_protocol_negotiates_supported_overlap() {
2626 let hello = DirectProtocolHello {
2627 protocol_version: 3,
2628 min_protocol_version: 1,
2629 client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2630 .to_string(),
2631 };
2632
2633 let response = negotiate_direct_protocol(&hello);
2634
2635 assert_eq!(response, DirectProtocolResponse::accept(1));
2636 }
2637
2638 #[test]
2639 fn direct_protocol_rejects_incompatible_versions() {
2640 let too_new = DirectProtocolHello {
2641 protocol_version: 3,
2642 min_protocol_version: 2,
2643 client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2644 .to_string(),
2645 };
2646 let too_old = DirectProtocolHello {
2647 protocol_version: 0,
2648 min_protocol_version: 0,
2649 client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2650 .to_string(),
2651 };
2652
2653 assert!(!negotiate_direct_protocol(&too_new).accepted);
2654 assert!(!negotiate_direct_protocol(&too_old).accepted);
2655 }
2656
2657 #[test]
2658 fn transfer_manifest_has_stable_golden_json() {
2659 let manifest = TransferManifest {
2660 version: MANIFEST_VERSION,
2661 session_id: Uuid::parse_str("00000000-0000-0000-0000-000000000001")
2662 .expect("uuid parses"),
2663 chunk_size: DEFAULT_CHUNK_SIZE,
2664 entries: vec![ManifestEntry {
2665 relative_path: PathBuf::from("bundle/a.txt"),
2666 kind: ManifestEntryKind::File,
2667 size: 5,
2668 blake3: Some(
2669 "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef".to_string(),
2670 ),
2671 chunks: vec![
2672 "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789".to_string(),
2673 ],
2674 unix_mode: Some(0o100644),
2675 modified_unix_ms: Some(1_700_000_000_000),
2676 }],
2677 };
2678
2679 let json = serde_json::to_string(&manifest).expect("manifest serializes");
2680
2681 assert_eq!(
2682 json,
2683 r#"{"version":1,"session_id":"00000000-0000-0000-0000-000000000001","chunk_size":1048576,"entries":[{"relative_path":"bundle/a.txt","kind":"file","size":5,"blake3":"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef","chunks":["abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789"],"unix_mode":33188,"modified_unix_ms":1700000000000}]}"#
2684 );
2685 }
2686
2687 #[test]
2688 fn peer_registry_merges_observations_by_fingerprint() {
2689 let mut registry = PeerRegistry::new();
2690 let fingerprint = "abcdef1234567890";
2691 let first = PeerObservation::new(
2692 fingerprint,
2693 SocketAddr::from(([192, 168, 1, 10], 53318)),
2694 100,
2695 )
2696 .with_alias("desk")
2697 .with_hostname("desk.local")
2698 .with_transport("quic");
2699 let second = PeerObservation::new(
2700 fingerprint,
2701 SocketAddr::from(([192, 168, 1, 11], 53318)),
2702 200,
2703 )
2704 .with_alias("workstation")
2705 .with_transport("quic");
2706
2707 registry.observe(first).expect("first observation");
2708 registry.observe(second).expect("second observation");
2709 let record = registry
2710 .get_by_fingerprint(fingerprint)
2711 .expect("record by fingerprint");
2712
2713 assert_eq!(registry.records().count(), 1);
2714 assert!(record.aliases.contains("desk"));
2715 assert!(record.aliases.contains("workstation"));
2716 assert!(record.hostnames.contains("desk.local"));
2717 assert_eq!(record.addresses.len(), 2);
2718 assert_eq!(record.first_seen_unix_ms, 100);
2719 assert_eq!(record.last_seen_unix_ms, 200);
2720 }
2721
2722 #[test]
2723 fn peer_registry_looks_up_unique_fingerprint_prefix_alias_and_hostname() {
2724 let mut registry = PeerRegistry::new();
2725 registry
2726 .observe(
2727 PeerObservation::new(
2728 "abcdef1234567890",
2729 SocketAddr::from(([192, 168, 1, 10], 53318)),
2730 100,
2731 )
2732 .with_alias("desk")
2733 .with_hostname("desk.local"),
2734 )
2735 .expect("first observation");
2736 registry
2737 .observe(
2738 PeerObservation::new(
2739 "123456abcdef7890",
2740 SocketAddr::from(([192, 168, 1, 20], 53318)),
2741 100,
2742 )
2743 .with_alias("laptop"),
2744 )
2745 .expect("second observation");
2746
2747 assert_eq!(
2748 registry
2749 .lookup("abcdef")
2750 .map(|record| record.fingerprint.as_str()),
2751 Some("abcdef1234567890")
2752 );
2753 assert_eq!(
2754 registry
2755 .lookup("desk")
2756 .map(|record| record.fingerprint.as_str()),
2757 Some("abcdef1234567890")
2758 );
2759 assert_eq!(
2760 registry
2761 .lookup("desk.local")
2762 .map(|record| record.fingerprint.as_str()),
2763 Some("abcdef1234567890")
2764 );
2765 assert!(registry.lookup("missing").is_none());
2766 }
2767
2768 #[test]
2769 fn peer_registry_rejects_ambiguous_lookup_hints() {
2770 let mut registry = PeerRegistry::new();
2771 registry
2772 .observe(
2773 PeerObservation::new(
2774 "abcdef1234567890",
2775 SocketAddr::from(([192, 168, 1, 10], 53318)),
2776 100,
2777 )
2778 .with_alias("desk"),
2779 )
2780 .expect("first observation");
2781 registry
2782 .observe(
2783 PeerObservation::new(
2784 "abcdef9999999999",
2785 SocketAddr::from(([192, 168, 1, 11], 53318)),
2786 100,
2787 )
2788 .with_alias("desk"),
2789 )
2790 .expect("second observation");
2791
2792 assert!(registry.lookup("abcdef").is_none());
2793 assert!(registry.lookup("desk").is_none());
2794 assert_eq!(
2795 registry
2796 .lookup("abcdef1234567890")
2797 .map(|record| record.fingerprint.as_str()),
2798 Some("abcdef1234567890")
2799 );
2800 match registry.lookup_detail("desk") {
2801 PeerLookup::Ambiguous(records) => assert_eq!(records.len(), 2),
2802 other => panic!("expected ambiguous duplicate-alias lookup, got {other:?}"),
2803 }
2804 assert!(matches!(
2805 registry.lookup_detail("missing"),
2806 PeerLookup::Missing
2807 ));
2808 }
2809
2810 #[test]
2811 fn native_announcement_builds_expected_txt_properties() {
2812 let announcement = NativeAnnouncement {
2813 alias: "Stephen MBP".to_string(),
2814 fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2815 .to_string(),
2816 quic_port: 53318,
2817 listen_port: Some(53317),
2818 };
2819
2820 let service = announcement.service_info().expect("service info");
2821
2822 assert_eq!(service.get_type(), NATIVE_SERVICE_TYPE);
2823 assert!(
2824 service
2825 .get_fullname()
2826 .starts_with("stephen-mbp-0123456789ab.")
2827 );
2828 assert_eq!(service.get_property_val_str("pv"), Some("1"));
2829 assert_eq!(service.get_property_val_str("minpv"), Some("1"));
2830 assert_eq!(
2831 service.get_property_val_str("fp"),
2832 Some("0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")
2833 );
2834 assert_eq!(service.get_property_val_str("alias"), Some("Stephen MBP"));
2835 assert_eq!(service.get_property_val_str("transports"), Some("quic"));
2836 assert_eq!(service.get_property_val_str("quic_port"), Some("53318"));
2837 assert_eq!(service.get_property_val_str("listen_port"), Some("53317"));
2838 }
2839
2840 #[test]
2841 fn resolved_native_service_merges_into_registry() {
2842 let properties = [
2843 ("pv", "1"),
2844 ("minpv", "1"),
2845 (
2846 "fp",
2847 "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
2848 ),
2849 ("alias", "desk"),
2850 ("transports", "quic"),
2851 ("quic_port", "53318"),
2852 ];
2853 let service = ServiceInfo::new(
2854 NATIVE_SERVICE_TYPE,
2855 "desk-0123456789ab",
2856 "desk.local.",
2857 "192.168.1.42",
2858 53318,
2859 &properties[..],
2860 )
2861 .expect("service info")
2862 .as_resolved_service();
2863 let mut registry = PeerRegistry::new();
2864
2865 observe_resolved_service(&mut registry, &service).expect("observation");
2866
2867 let record = registry.lookup("desk").expect("record by alias");
2868 assert_eq!(
2869 record.fingerprint,
2870 "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2871 );
2872 assert_eq!(
2873 record.preferred_quic_address(),
2874 Some(SocketAddr::from(([192, 168, 1, 42], 53318)))
2875 );
2876 }
2877
2878 #[test]
2879 fn resolved_native_service_ignores_incompatible_protocol_ranges() {
2880 let properties = [
2881 ("pv", "3"),
2882 ("minpv", "2"),
2883 (
2884 "fp",
2885 "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
2886 ),
2887 ("alias", "desk"),
2888 ("transports", "quic"),
2889 ("quic_port", "53318"),
2890 ];
2891 let service = ServiceInfo::new(
2892 NATIVE_SERVICE_TYPE,
2893 "desk-0123456789ab",
2894 "desk.local.",
2895 "192.168.1.42",
2896 53318,
2897 &properties[..],
2898 )
2899 .expect("service info")
2900 .as_resolved_service();
2901 let mut registry = PeerRegistry::new();
2902
2903 observe_resolved_service(&mut registry, &service).expect("observation");
2904
2905 assert_eq!(registry.records().count(), 0);
2906 }
2907
2908 #[tokio::test]
2909 async fn direct_quic_loopback_sends_one_file() {
2910 let source_dir = tempfile::tempdir().expect("source tempdir");
2911 let dest_dir = tempfile::tempdir().expect("dest tempdir");
2912 let identity_dir = tempfile::tempdir().expect("identity tempdir");
2913 let file_path = source_dir.path().join("hello.txt");
2914 tokio::fs::write(&file_path, b"hello from ferry")
2915 .await
2916 .expect("source file written");
2917
2918 let identity =
2919 NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
2920 let recv_identity = identity.clone();
2921 let reserved_socket =
2922 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
2923 let recv_addr = reserved_socket.local_addr().expect("local addr");
2924 drop(reserved_socket);
2925 let receive_progress = Arc::new(Mutex::new(Vec::new()));
2926 let receive_progress_log = receive_progress.clone();
2927 let dest_path = dest_dir.path().to_path_buf();
2928 let server = tokio::spawn(async move {
2929 receive_direct_file(
2930 &ReceiveRequest::new(recv_addr),
2931 dest_path,
2932 &recv_identity,
2933 |event| {
2934 receive_progress_log
2935 .lock()
2936 .expect("progress lock")
2937 .push(event);
2938 },
2939 )
2940 .await
2941 });
2942
2943 tokio::time::sleep(Duration::from_millis(100)).await;
2944
2945 let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
2946 let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
2947 let send_progress = Arc::new(Mutex::new(Vec::new()));
2948 let send_progress_log = send_progress.clone();
2949 let send_summary = send_direct_file(&send_request, &identity, |event| {
2950 send_progress_log.lock().expect("progress lock").push(event);
2951 })
2952 .await
2953 .expect("file sent");
2954 let receive_summary = server.await.expect("server joined").expect("file received");
2955
2956 let received = tokio::fs::read(dest_dir.path().join("hello.txt"))
2957 .await
2958 .expect("received file readable");
2959 assert_eq!(received, b"hello from ferry");
2960 assert_eq!(send_summary.bytes, receive_summary.bytes);
2961 assert_eq!(send_summary.blake3, receive_summary.blake3);
2962 let send_progress = send_progress.lock().expect("progress lock");
2963 let receive_progress = receive_progress.lock().expect("progress lock");
2964 assert!(matches!(
2965 send_progress.first(),
2966 Some(TransferEvent::SessionStarted {
2967 direction: TransferDirection::Send,
2968 ..
2969 })
2970 ));
2971 assert!(
2972 send_progress
2973 .iter()
2974 .any(|event| matches!(event, TransferEvent::Progress { .. }))
2975 );
2976 assert!(
2977 send_progress
2978 .iter()
2979 .any(|event| matches!(event, TransferEvent::SessionFinished { .. }))
2980 );
2981 assert!(matches!(
2982 receive_progress.first(),
2983 Some(TransferEvent::SessionStarted {
2984 direction: TransferDirection::Receive,
2985 ..
2986 })
2987 ));
2988 assert!(
2989 receive_progress
2990 .iter()
2991 .any(|event| matches!(event, TransferEvent::Progress { .. }))
2992 );
2993 assert!(
2994 receive_progress
2995 .iter()
2996 .any(|event| matches!(event, TransferEvent::SessionFinished { .. }))
2997 );
2998 }
2999
3000 #[tokio::test]
3001 async fn direct_quic_loopback_sends_directory_and_resumes_partial_file() {
3002 let source_dir = tempfile::tempdir().expect("source tempdir");
3003 let dest_dir = tempfile::tempdir().expect("dest tempdir");
3004 let identity_dir = tempfile::tempdir().expect("identity tempdir");
3005 let bundle = source_dir.path().join("bundle");
3006 let nested = bundle.join("nested");
3007 tokio::fs::create_dir_all(&nested)
3008 .await
3009 .expect("source dirs");
3010 tokio::fs::write(nested.join("small.txt"), b"small")
3011 .await
3012 .expect("small file");
3013 let large_bytes = vec![7; (DEFAULT_CHUNK_SIZE as usize * 2) + 17];
3014 tokio::fs::write(bundle.join("large.bin"), &large_bytes)
3015 .await
3016 .expect("large file");
3017
3018 let dest_bundle = dest_dir.path().join("bundle");
3019 tokio::fs::create_dir_all(&dest_bundle)
3020 .await
3021 .expect("dest bundle");
3022 tokio::fs::write(
3023 dest_bundle.join("large.bin"),
3024 &large_bytes[..DEFAULT_CHUNK_SIZE as usize + 5],
3025 )
3026 .await
3027 .expect("partial large");
3028
3029 let identity =
3030 NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3031 let recv_identity = identity.clone();
3032 let reserved_socket =
3033 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3034 let recv_addr = reserved_socket.local_addr().expect("local addr");
3035 drop(reserved_socket);
3036 let dest_path = dest_dir.path().to_path_buf();
3037 let server = tokio::spawn(async move {
3038 receive_direct_file(
3039 &ReceiveRequest::new(recv_addr),
3040 dest_path,
3041 &recv_identity,
3042 |_| {},
3043 )
3044 .await
3045 });
3046
3047 tokio::time::sleep(Duration::from_millis(100)).await;
3048
3049 let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3050 let send_request = SendRequest::new(peer, vec![bundle]).expect("send request is valid");
3051 let send_summary = send_direct_file(&send_request, &identity, |_| {})
3052 .await
3053 .expect("directory sent");
3054 let receive_summary = server
3055 .await
3056 .expect("server joined")
3057 .expect("directory received");
3058
3059 let received_large = tokio::fs::read(dest_bundle.join("large.bin"))
3060 .await
3061 .expect("large file readable");
3062 let received_small = tokio::fs::read(dest_bundle.join("nested/small.txt"))
3063 .await
3064 .expect("small file readable");
3065 assert_eq!(received_large, large_bytes);
3066 assert_eq!(received_small, b"small");
3067 assert_eq!(send_summary.bytes, receive_summary.bytes);
3068 assert!(
3069 receive_summary
3070 .files
3071 .iter()
3072 .any(|file| file.status == TransferFileStatus::Resumed)
3073 );
3074 }
3075}