1pub mod error;
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::fs::Metadata;
5use std::io::SeekFrom;
6use std::net::SocketAddr;
7use std::path::{Component, Path, PathBuf};
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::{Arc, Mutex};
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12use directories::BaseDirs;
13use mdns_sd::{ResolvedService, ServiceDaemon, ServiceEvent, ServiceInfo};
14use quinn::crypto::rustls::QuicClientConfig;
15use rcgen::{CertifiedKey, generate_simple_self_signed};
16use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier};
17use rustls::crypto::{CryptoProvider, verify_tls12_signature, verify_tls13_signature};
18use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer, ServerName, UnixTime};
19use rustls::{DigitallySignedStruct, SignatureScheme};
20use serde::{Deserialize, Serialize};
21use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
22use uuid::Uuid;
23
24pub use error::{Error, Result};
25
26pub const NATIVE_PROTOCOL_VERSION: u16 = 1;
27pub const MIN_NATIVE_PROTOCOL_VERSION: u16 = 1;
28pub const NATIVE_SERVICE_TYPE: &str = "_ferry._udp.local.";
29
30const DIRECT_MAGIC: &[u8; 8] = b"FERRY01\n";
31const MAX_FRAME_LEN: usize = 16 * 1024 * 1024;
32const IO_BUFFER_SIZE: usize = 64 * 1024;
33const MANIFEST_VERSION: u16 = 1;
34const DEFAULT_CHUNK_SIZE: u64 = 1024 * 1024;
35const DEFAULT_MANIFEST_CONCURRENCY: usize = 8;
36const CONFIG_DIR_NAME: &str = "ferry";
37const PSK_PROOF_CONTEXT: &[u8] = b"fileferry-direct-psk-v1";
38
39#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
40pub struct DirectPeer {
41 address: SocketAddr,
42 expected_fingerprint: Option<String>,
43}
44
45impl DirectPeer {
46 pub fn parse(value: &str) -> Result<Self> {
47 Ok(Self {
48 address: value.parse()?,
49 expected_fingerprint: None,
50 })
51 }
52
53 pub const fn address(&self) -> SocketAddr {
54 self.address
55 }
56
57 pub const fn from_address(address: SocketAddr) -> Self {
58 Self {
59 address,
60 expected_fingerprint: None,
61 }
62 }
63
64 pub fn with_expected_fingerprint(mut self, fingerprint: impl Into<String>) -> Result<Self> {
65 self.expected_fingerprint = Some(normalized_fingerprint(fingerprint.into())?);
66 Ok(self)
67 }
68
69 pub fn expected_fingerprint(&self) -> Option<&str> {
70 self.expected_fingerprint.as_deref()
71 }
72}
73
74#[derive(Clone, PartialEq, Eq, Deserialize)]
75pub struct PskSecret(String);
76
77impl PskSecret {
78 pub fn new(psk: impl Into<String>) -> Result<Self> {
79 validate_psk(psk.into()).map(Self)
80 }
81
82 fn expose_secret(&self) -> &str {
83 &self.0
84 }
85}
86
87impl std::fmt::Debug for PskSecret {
88 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 formatter.write_str("PskSecret(<redacted>)")
90 }
91}
92
93impl Serialize for PskSecret {
94 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
95 where
96 S: serde::Serializer,
97 {
98 serializer.serialize_str("<redacted>")
99 }
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
103pub struct SendRequest {
104 peer: DirectPeer,
105 paths: Vec<PathBuf>,
106 #[serde(default, skip_serializing)]
107 psk: Option<PskSecret>,
108}
109
110impl SendRequest {
111 pub fn new(peer: DirectPeer, paths: Vec<PathBuf>) -> Result<Self> {
112 if paths.is_empty() {
113 return Err(Error::InvalidInput(
114 "at least one file path is required".to_string(),
115 ));
116 }
117
118 Ok(Self {
119 peer,
120 paths,
121 psk: None,
122 })
123 }
124
125 pub const fn peer(&self) -> &DirectPeer {
126 &self.peer
127 }
128
129 pub fn paths(&self) -> &[PathBuf] {
130 &self.paths
131 }
132
133 pub fn with_psk(mut self, psk: impl Into<String>) -> Result<Self> {
134 self.psk = Some(PskSecret::new(psk)?);
135 Ok(self)
136 }
137
138 pub fn psk(&self) -> Option<&str> {
139 self.psk.as_ref().map(PskSecret::expose_secret)
140 }
141}
142
143#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
144pub struct ReceiveRequest {
145 listen: SocketAddr,
146 #[serde(default, skip_serializing)]
147 psk: Option<PskSecret>,
148}
149
150impl ReceiveRequest {
151 pub fn new(listen: SocketAddr) -> Self {
152 Self { listen, psk: None }
153 }
154
155 pub const fn listen(&self) -> SocketAddr {
156 self.listen
157 }
158
159 pub fn with_psk(mut self, psk: impl Into<String>) -> Result<Self> {
160 self.psk = Some(PskSecret::new(psk)?);
161 Ok(self)
162 }
163
164 pub fn psk(&self) -> Option<&str> {
165 self.psk.as_ref().map(PskSecret::expose_secret)
166 }
167}
168
169#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
170pub struct AppConfig {
171 pub alias: String,
172 pub listen_port: u16,
173 pub quic_port: u16,
174 pub download_dir: String,
175 pub auto_accept_known: bool,
176 pub auto_accept_unknown: bool,
177 pub discovery: Vec<String>,
178 pub max_concurrent_files: usize,
179 pub trust: TrustConfig,
180}
181
182impl Default for AppConfig {
183 fn default() -> Self {
184 Self {
185 alias: default_alias(),
186 listen_port: 53317,
187 quic_port: 53318,
188 download_dir: "~/Downloads/ferry".to_string(),
189 auto_accept_known: true,
190 auto_accept_unknown: false,
191 discovery: vec!["native".to_string()],
192 max_concurrent_files: 8,
193 trust: TrustConfig::default(),
194 }
195 }
196}
197
198impl AppConfig {
199 pub fn load_or_default() -> Result<Self> {
200 Self::load_or_default_from(config_dir()?.join("config.toml"))
201 }
202
203 pub fn load_or_default_from(path: impl AsRef<Path>) -> Result<Self> {
204 let path = path.as_ref();
205 match std::fs::read_to_string(path) {
206 Ok(contents) => {
207 toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
208 }
209 Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Self::default()),
210 Err(source) => Err(Error::IoPath {
211 path: path.to_path_buf(),
212 source,
213 }),
214 }
215 }
216
217 pub fn save_default_path(&self) -> Result<()> {
218 self.save_to_path(config_dir()?.join("config.toml"))
219 }
220
221 pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
222 write_toml_file(path.as_ref(), self)
223 }
224
225 pub fn to_toml_string(&self) -> Result<String> {
226 toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
227 }
228
229 pub fn redacted(&self) -> Self {
230 let mut config = self.clone();
231 config.trust = config.trust.redacted();
232 config
233 }
234
235 pub fn to_redacted_toml_string(&self) -> Result<String> {
236 self.redacted().to_toml_string()
237 }
238}
239
240#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
241pub struct DaemonConfig {
242 pub listen: SocketAddr,
243 pub destination: PathBuf,
244}
245
246impl DaemonConfig {
247 pub fn from_app_config(config: &AppConfig) -> Result<Self> {
248 Ok(Self {
249 listen: SocketAddr::from(([0, 0, 0, 0], config.quic_port)),
250 destination: expand_home_path(&config.download_dir)?,
251 })
252 }
253
254 pub fn load_or_default() -> Result<Self> {
255 let app_config = AppConfig::load_or_default()?;
256 Self::load_or_default_from(config_dir()?.join("daemon.toml"), &app_config)
257 }
258
259 pub fn load_or_default_from(path: impl AsRef<Path>, app_config: &AppConfig) -> Result<Self> {
260 let path = path.as_ref();
261 match std::fs::read_to_string(path) {
262 Ok(contents) => {
263 toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
264 }
265 Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
266 Self::from_app_config(app_config)
267 }
268 Err(source) => Err(Error::IoPath {
269 path: path.to_path_buf(),
270 source,
271 }),
272 }
273 }
274
275 pub fn save_default_path(&self) -> Result<()> {
276 self.save_to_path(config_dir()?.join("daemon.toml"))
277 }
278
279 pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
280 write_toml_file(path.as_ref(), self)
281 }
282
283 pub fn to_toml_string(&self) -> Result<String> {
284 toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
285 }
286}
287
288#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
289pub struct TrustConfig {
290 pub require_fingerprint: bool,
291 pub psk: String,
292}
293
294impl Default for TrustConfig {
295 fn default() -> Self {
296 Self {
297 require_fingerprint: true,
298 psk: String::new(),
299 }
300 }
301}
302
303impl TrustConfig {
304 pub fn redacted(&self) -> Self {
305 let psk = if self.psk.is_empty() {
306 String::new()
307 } else {
308 "<redacted>".to_string()
309 };
310 Self {
311 require_fingerprint: self.require_fingerprint,
312 psk,
313 }
314 }
315}
316
317#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
318pub struct NativeIdentity {
319 cert_der: Vec<u8>,
320 key_der: Vec<u8>,
321 fingerprint: String,
322}
323
324impl NativeIdentity {
325 pub fn load_or_generate() -> Result<Self> {
326 Self::load_or_generate_in(config_dir()?)
327 }
328
329 pub fn load_or_generate_in(config_dir: impl AsRef<Path>) -> Result<Self> {
330 let config_dir = config_dir.as_ref();
331 let cert_path = config_dir.join("identity.cert.der");
332 let key_path = config_dir.join("identity.key.der");
333
334 if cert_path.exists() && key_path.exists() {
335 let cert_der = std::fs::read(&cert_path).map_err(|source| Error::IoPath {
336 path: cert_path,
337 source,
338 })?;
339 let key_der = std::fs::read(&key_path).map_err(|source| Error::IoPath {
340 path: key_path,
341 source,
342 })?;
343
344 return Ok(Self::from_parts(cert_der, key_der));
345 }
346
347 std::fs::create_dir_all(config_dir).map_err(|source| Error::IoPath {
348 path: config_dir.to_path_buf(),
349 source,
350 })?;
351
352 let identity = Self::generate()?;
353 write_private_file(&cert_path, &identity.cert_der)?;
354 write_private_file(&key_path, &identity.key_der)?;
355 Ok(identity)
356 }
357
358 pub fn generate() -> Result<Self> {
359 let CertifiedKey { cert, signing_key } =
360 generate_simple_self_signed(vec!["localhost".to_string()])?;
361 Ok(Self::from_parts(
362 cert.der().to_vec(),
363 signing_key.serialize_der(),
364 ))
365 }
366
367 pub fn fingerprint(&self) -> &str {
368 &self.fingerprint
369 }
370
371 fn cert_chain(&self) -> Vec<CertificateDer<'static>> {
372 vec![CertificateDer::from(self.cert_der.clone())]
373 }
374
375 fn private_key(&self) -> PrivateKeyDer<'static> {
376 PrivateKeyDer::from(PrivatePkcs8KeyDer::from(self.key_der.clone()))
377 }
378
379 fn from_parts(cert_der: Vec<u8>, key_der: Vec<u8>) -> Self {
380 let fingerprint = blake3::hash(&cert_der).to_hex().to_string();
381 Self {
382 cert_der,
383 key_der,
384 fingerprint,
385 }
386 }
387}
388
389#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
390#[serde(rename_all = "snake_case")]
391pub enum TransferDirection {
392 Send,
393 Receive,
394}
395
396#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
397pub struct TransferProgress {
398 pub direction: TransferDirection,
399 pub file_name: String,
400 pub bytes_done: u64,
401 pub bytes_total: u64,
402}
403
404#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
405#[serde(tag = "event", rename_all = "snake_case")]
406pub enum TransferEvent {
407 SessionStarted {
408 direction: TransferDirection,
409 session_id: Uuid,
410 files_total: usize,
411 bytes_total: u64,
412 },
413 FileStarted {
414 direction: TransferDirection,
415 file_name: String,
416 bytes_total: u64,
417 resume_offset: u64,
418 },
419 Progress {
420 direction: TransferDirection,
421 file_name: String,
422 bytes_done: u64,
423 bytes_total: u64,
424 },
425 FileFinished {
426 direction: TransferDirection,
427 file_name: String,
428 bytes: u64,
429 blake3: String,
430 status: TransferFileStatus,
431 },
432 SessionFinished {
433 direction: TransferDirection,
434 files_total: usize,
435 bytes_total: u64,
436 blake3: String,
437 },
438 SessionCancelled {
439 direction: TransferDirection,
440 session_id: Uuid,
441 },
442}
443
444impl TransferEvent {
445 pub fn progress(&self) -> Option<TransferProgress> {
446 match self {
447 Self::Progress {
448 direction,
449 file_name,
450 bytes_done,
451 bytes_total,
452 } => Some(TransferProgress {
453 direction: *direction,
454 file_name: file_name.clone(),
455 bytes_done: *bytes_done,
456 bytes_total: *bytes_total,
457 }),
458 _ => None,
459 }
460 }
461}
462
463#[derive(Debug, Clone, Default)]
464pub struct TransferControl {
465 cancelled: Arc<AtomicBool>,
466}
467
468impl TransferControl {
469 pub fn new() -> Self {
470 Self::default()
471 }
472
473 pub fn cancel(&self) {
474 self.cancelled.store(true, Ordering::SeqCst);
475 }
476
477 pub fn is_cancelled(&self) -> bool {
478 self.cancelled.load(Ordering::SeqCst)
479 }
480
481 fn check_cancelled(&self) -> Result<()> {
482 if self.is_cancelled() {
483 Err(Error::TransferCancelled)
484 } else {
485 Ok(())
486 }
487 }
488}
489
490#[derive(Debug, Clone, PartialEq, Eq)]
491pub struct TransferSummary {
492 pub path: PathBuf,
493 pub file_name: String,
494 pub bytes: u64,
495 pub blake3: String,
496 pub files: Vec<TransferFileSummary>,
497}
498
499#[derive(Debug, Clone, PartialEq, Eq)]
500pub struct TransferFileSummary {
501 pub path: PathBuf,
502 pub relative_path: PathBuf,
503 pub bytes: u64,
504 pub blake3: String,
505 pub status: TransferFileStatus,
506}
507
508#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
509#[serde(rename_all = "snake_case")]
510pub enum TransferFileStatus {
511 Sent,
512 Received,
513 Skipped,
514 Resumed,
515}
516
517#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
518pub struct TransferManifest {
519 pub version: u16,
520 pub session_id: Uuid,
521 pub chunk_size: u64,
522 pub entries: Vec<ManifestEntry>,
523}
524
525impl TransferManifest {
526 pub fn file_entries(&self) -> impl Iterator<Item = &ManifestEntry> {
527 self.entries
528 .iter()
529 .filter(|entry| entry.kind == ManifestEntryKind::File)
530 }
531
532 pub fn total_file_bytes(&self) -> u64 {
533 self.file_entries().map(|entry| entry.size).sum()
534 }
535
536 pub fn digest(&self) -> Result<String> {
537 let bytes = serde_json::to_vec(self).map_err(|error| Error::Protocol(error.to_string()))?;
538 Ok(blake3::hash(&bytes).to_hex().to_string())
539 }
540}
541
542#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
543pub struct ManifestEntry {
544 pub relative_path: PathBuf,
545 pub kind: ManifestEntryKind,
546 pub size: u64,
547 pub blake3: Option<String>,
548 pub chunks: Vec<String>,
549 pub unix_mode: Option<u32>,
550 pub modified_unix_ms: Option<i128>,
551}
552
553#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
554#[serde(rename_all = "snake_case")]
555pub enum ManifestEntryKind {
556 File,
557 Directory,
558}
559
560#[derive(Debug, Clone, PartialEq, Eq)]
561pub enum ResumeDecision {
562 Create,
563 Skip,
564 ResumeFrom(u64),
565 Conflict(String),
566}
567
568#[derive(Debug, Clone, PartialEq, Eq)]
569pub struct PeerObservation {
570 pub fingerprint: String,
571 pub alias: Option<String>,
572 pub hostname: Option<String>,
573 pub address: SocketAddr,
574 pub transports: BTreeSet<String>,
575 pub seen_unix_ms: i128,
576}
577
578impl PeerObservation {
579 pub fn new(fingerprint: impl Into<String>, address: SocketAddr, seen_unix_ms: i128) -> Self {
580 Self {
581 fingerprint: fingerprint.into(),
582 alias: None,
583 hostname: None,
584 address,
585 transports: BTreeSet::new(),
586 seen_unix_ms,
587 }
588 }
589
590 pub fn with_alias(mut self, alias: impl Into<String>) -> Self {
591 self.alias = Some(alias.into());
592 self
593 }
594
595 pub fn with_hostname(mut self, hostname: impl Into<String>) -> Self {
596 self.hostname = Some(hostname.into());
597 self
598 }
599
600 pub fn with_transport(mut self, transport: impl Into<String>) -> Self {
601 self.transports.insert(transport.into());
602 self
603 }
604}
605
606#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
607pub struct PeerRecord {
608 pub fingerprint: String,
609 pub aliases: BTreeSet<String>,
610 pub hostnames: BTreeSet<String>,
611 pub addresses: BTreeSet<SocketAddr>,
612 pub transports: BTreeSet<String>,
613 pub first_seen_unix_ms: i128,
614 pub last_seen_unix_ms: i128,
615}
616
617impl PeerRecord {
618 fn from_observation(observation: PeerObservation) -> Self {
619 let mut aliases = BTreeSet::new();
620 if let Some(alias) = observation.alias {
621 aliases.insert(alias);
622 }
623
624 let mut hostnames = BTreeSet::new();
625 if let Some(hostname) = observation.hostname {
626 hostnames.insert(hostname);
627 }
628
629 let mut addresses = BTreeSet::new();
630 addresses.insert(observation.address);
631
632 Self {
633 fingerprint: observation.fingerprint,
634 aliases,
635 hostnames,
636 addresses,
637 transports: observation.transports,
638 first_seen_unix_ms: observation.seen_unix_ms,
639 last_seen_unix_ms: observation.seen_unix_ms,
640 }
641 }
642
643 fn merge(&mut self, observation: PeerObservation) {
644 if let Some(alias) = observation.alias {
645 self.aliases.insert(alias);
646 }
647 if let Some(hostname) = observation.hostname {
648 self.hostnames.insert(hostname);
649 }
650 self.addresses.insert(observation.address);
651 self.transports.extend(observation.transports);
652 self.first_seen_unix_ms = self.first_seen_unix_ms.min(observation.seen_unix_ms);
653 self.last_seen_unix_ms = self.last_seen_unix_ms.max(observation.seen_unix_ms);
654 }
655
656 pub fn preferred_quic_address(&self) -> Option<SocketAddr> {
657 if !self.transports.contains("quic") {
658 return None;
659 }
660
661 self.addresses.iter().copied().next()
662 }
663}
664
665#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
666#[serde(rename_all = "snake_case")]
667pub enum TrustState {
668 Trusted,
669 Blocked,
670}
671
672#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
673pub struct KnownPeerEntry {
674 pub fingerprint: String,
675 #[serde(default)]
676 pub aliases: BTreeSet<String>,
677 #[serde(default)]
678 pub hostnames: BTreeSet<String>,
679 #[serde(default)]
680 pub addresses: BTreeSet<SocketAddr>,
681 #[serde(default)]
682 pub transports: BTreeSet<String>,
683 pub trust_state: TrustState,
684 pub first_seen_unix_ms: Option<i128>,
685 pub last_seen_unix_ms: Option<i128>,
686}
687
688impl KnownPeerEntry {
689 pub fn trusted(fingerprint: impl Into<String>) -> Result<Self> {
690 let fingerprint = normalized_fingerprint(fingerprint.into())?;
691 Ok(Self {
692 fingerprint,
693 aliases: BTreeSet::new(),
694 hostnames: BTreeSet::new(),
695 addresses: BTreeSet::new(),
696 transports: BTreeSet::new(),
697 trust_state: TrustState::Trusted,
698 first_seen_unix_ms: None,
699 last_seen_unix_ms: None,
700 })
701 }
702
703 pub fn from_record(record: &PeerRecord, trust_state: TrustState) -> Result<Self> {
704 let fingerprint = normalized_fingerprint(record.fingerprint.clone())?;
705 Ok(Self {
706 fingerprint,
707 aliases: record.aliases.clone(),
708 hostnames: record.hostnames.clone(),
709 addresses: record.addresses.clone(),
710 transports: record.transports.clone(),
711 trust_state,
712 first_seen_unix_ms: Some(record.first_seen_unix_ms),
713 last_seen_unix_ms: Some(record.last_seen_unix_ms),
714 })
715 }
716}
717
718#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
719pub struct TrustStore {
720 #[serde(default)]
721 pub peers: BTreeMap<String, KnownPeerEntry>,
722}
723
724impl TrustStore {
725 pub fn load_or_default() -> Result<Self> {
726 Self::load_or_default_from(config_dir()?.join("known_peers.toml"))
727 }
728
729 pub fn load_or_default_from(path: impl AsRef<Path>) -> Result<Self> {
730 let path = path.as_ref();
731 match std::fs::read_to_string(path) {
732 Ok(contents) => {
733 toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
734 }
735 Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Self::default()),
736 Err(source) => Err(Error::IoPath {
737 path: path.to_path_buf(),
738 source,
739 }),
740 }
741 }
742
743 pub fn save_default_path(&self) -> Result<()> {
744 self.save_to_path(config_dir()?.join("known_peers.toml"))
745 }
746
747 pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
748 write_toml_file(path.as_ref(), self)
749 }
750
751 pub fn trust_fingerprint(&mut self, fingerprint: impl Into<String>) -> Result<&KnownPeerEntry> {
752 let entry = KnownPeerEntry::trusted(fingerprint)?;
753 let fingerprint = entry.fingerprint.clone();
754 self.peers
755 .entry(fingerprint.clone())
756 .and_modify(|existing| existing.trust_state = TrustState::Trusted)
757 .or_insert(entry);
758 Ok(self
759 .peers
760 .get(&fingerprint)
761 .expect("trusted peer entry should exist"))
762 }
763
764 pub fn trust_record(&mut self, record: &PeerRecord) -> Result<&KnownPeerEntry> {
765 let entry = KnownPeerEntry::from_record(record, TrustState::Trusted)?;
766 let fingerprint = entry.fingerprint.clone();
767 self.peers
768 .entry(fingerprint.clone())
769 .and_modify(|existing| {
770 existing.aliases.extend(record.aliases.clone());
771 existing.hostnames.extend(record.hostnames.clone());
772 existing.addresses.extend(record.addresses.clone());
773 existing.transports.extend(record.transports.clone());
774 existing.first_seen_unix_ms = Some(
775 existing
776 .first_seen_unix_ms
777 .unwrap_or(record.first_seen_unix_ms)
778 .min(record.first_seen_unix_ms),
779 );
780 existing.last_seen_unix_ms = Some(
781 existing
782 .last_seen_unix_ms
783 .unwrap_or(record.last_seen_unix_ms)
784 .max(record.last_seen_unix_ms),
785 );
786 existing.trust_state = TrustState::Trusted;
787 })
788 .or_insert(entry);
789 Ok(self
790 .peers
791 .get(&fingerprint)
792 .expect("trusted peer entry should exist"))
793 }
794
795 pub fn trust_direct_address(
796 &mut self,
797 fingerprint: impl Into<String>,
798 address: SocketAddr,
799 ) -> Result<&KnownPeerEntry> {
800 let mut entry = KnownPeerEntry::trusted(fingerprint)?;
801 let seen = system_time_unix_ms(SystemTime::now()).unwrap_or_default();
802 entry.addresses.insert(address);
803 entry.transports.insert("quic".to_string());
804 entry.first_seen_unix_ms = Some(seen);
805 entry.last_seen_unix_ms = Some(seen);
806
807 let fingerprint = entry.fingerprint.clone();
808 self.peers
809 .entry(fingerprint.clone())
810 .and_modify(|existing| {
811 existing.addresses.insert(address);
812 existing.transports.insert("quic".to_string());
813 existing.first_seen_unix_ms =
814 Some(existing.first_seen_unix_ms.unwrap_or(seen).min(seen));
815 existing.last_seen_unix_ms =
816 Some(existing.last_seen_unix_ms.unwrap_or(seen).max(seen));
817 existing.trust_state = TrustState::Trusted;
818 })
819 .or_insert(entry);
820 Ok(self
821 .peers
822 .get(&fingerprint)
823 .expect("trusted peer entry should exist"))
824 }
825
826 pub fn is_trusted_fingerprint(&self, fingerprint: &str) -> bool {
827 self.peers
828 .get(fingerprint)
829 .is_some_and(|entry| entry.trust_state == TrustState::Trusted)
830 }
831
832 pub fn trusted_fingerprint_for_address(&self, address: SocketAddr) -> Option<&str> {
833 self.peers
834 .values()
835 .find(|entry| {
836 entry.trust_state == TrustState::Trusted && entry.addresses.contains(&address)
837 })
838 .map(|entry| entry.fingerprint.as_str())
839 }
840
841 pub fn forget(&mut self, fingerprint: &str) -> bool {
842 self.peers.remove(fingerprint).is_some()
843 }
844
845 pub fn records(&self) -> impl Iterator<Item = &KnownPeerEntry> {
846 self.peers.values()
847 }
848
849 pub fn to_toml_string(&self) -> Result<String> {
850 toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
851 }
852}
853
854#[derive(Debug, Default, Clone, PartialEq, Eq)]
855pub struct PeerRegistry {
856 peers: BTreeMap<String, PeerRecord>,
857}
858
859#[derive(Debug, Clone, PartialEq, Eq)]
860pub enum PeerLookup<'a> {
861 Found(&'a PeerRecord),
862 Ambiguous(Vec<&'a PeerRecord>),
863 Missing,
864}
865
866impl PeerRegistry {
867 pub fn new() -> Self {
868 Self::default()
869 }
870
871 pub fn observe(&mut self, observation: PeerObservation) -> Result<&PeerRecord> {
872 if observation.fingerprint.trim().is_empty() {
873 return Err(Error::InvalidInput(
874 "peer fingerprint must not be empty".to_string(),
875 ));
876 }
877
878 let fingerprint = observation.fingerprint.clone();
879 if let Some(record) = self.peers.get_mut(&fingerprint) {
880 record.merge(observation);
881 } else {
882 self.peers.insert(
883 fingerprint.clone(),
884 PeerRecord::from_observation(observation),
885 );
886 }
887
888 Ok(self
889 .peers
890 .get(&fingerprint)
891 .expect("observed peer record should exist"))
892 }
893
894 pub fn get_by_fingerprint(&self, fingerprint: &str) -> Option<&PeerRecord> {
895 self.peers.get(fingerprint)
896 }
897
898 pub fn lookup(&self, query: &str) -> Option<&PeerRecord> {
899 match self.lookup_detail(query) {
900 PeerLookup::Found(record) => Some(record),
901 PeerLookup::Ambiguous(_) | PeerLookup::Missing => None,
902 }
903 }
904
905 pub fn lookup_detail(&self, query: &str) -> PeerLookup<'_> {
906 if let Some(record) = self.peers.get(query) {
907 return PeerLookup::Found(record);
908 }
909
910 let matches = self
911 .peers
912 .values()
913 .filter(|record| {
914 record.fingerprint.starts_with(query)
915 || record.aliases.iter().any(|alias| alias == query)
916 || record.hostnames.iter().any(|hostname| hostname == query)
917 })
918 .collect::<Vec<_>>();
919
920 match matches.as_slice() {
921 [] => PeerLookup::Missing,
922 [record] => PeerLookup::Found(record),
923 _ => PeerLookup::Ambiguous(matches),
924 }
925 }
926
927 pub fn records(&self) -> impl Iterator<Item = &PeerRecord> {
928 self.peers.values()
929 }
930}
931
932#[derive(Debug, Clone, PartialEq, Eq)]
933pub struct NativeAnnouncement {
934 pub alias: String,
935 pub fingerprint: String,
936 pub quic_port: u16,
937 pub listen_port: Option<u16>,
938}
939
940impl NativeAnnouncement {
941 pub fn from_config(config: &AppConfig, identity: &NativeIdentity) -> Self {
942 Self {
943 alias: config.alias.clone(),
944 fingerprint: identity.fingerprint().to_string(),
945 quic_port: config.quic_port,
946 listen_port: Some(config.listen_port),
947 }
948 }
949
950 fn service_info(&self) -> Result<ServiceInfo> {
951 let alias = sanitize_dns_label(&self.alias);
952 let fingerprint_prefix = self
953 .fingerprint
954 .get(..12)
955 .unwrap_or(self.fingerprint.as_str());
956 let instance = format!("{alias}-{fingerprint_prefix}");
957 let hostname = format!("{alias}.local.");
958 let properties = [
959 ("pv", NATIVE_PROTOCOL_VERSION.to_string()),
960 ("minpv", MIN_NATIVE_PROTOCOL_VERSION.to_string()),
961 ("fp", self.fingerprint.clone()),
962 ("alias", self.alias.clone()),
963 ("transports", "quic".to_string()),
964 ("quic_port", self.quic_port.to_string()),
965 (
966 "listen_port",
967 self.listen_port
968 .map(|port| port.to_string())
969 .unwrap_or_default(),
970 ),
971 ];
972
973 ServiceInfo::new(
974 NATIVE_SERVICE_TYPE,
975 &instance,
976 &hostname,
977 "",
978 self.quic_port,
979 &properties[..],
980 )
981 .map(ServiceInfo::enable_addr_auto)
982 .map_err(|error| Error::Discovery(error.to_string()))
983 }
984}
985
986pub struct NativeAnnouncer {
987 daemon: ServiceDaemon,
988 fullname: String,
989}
990
991impl NativeAnnouncer {
992 pub fn start(announcement: &NativeAnnouncement) -> Result<Self> {
993 let daemon = ServiceDaemon::new().map_err(|error| Error::Discovery(error.to_string()))?;
994 let service = announcement.service_info()?;
995 let fullname = service.get_fullname().to_string();
996 daemon
997 .register(service)
998 .map_err(|error| Error::Discovery(error.to_string()))?;
999 Ok(Self { daemon, fullname })
1000 }
1001}
1002
1003impl Drop for NativeAnnouncer {
1004 fn drop(&mut self) {
1005 let _ = self.daemon.unregister(&self.fullname);
1006 let _ = self.daemon.shutdown();
1007 }
1008}
1009
1010pub async fn discover_native_peers(timeout: Duration) -> Result<PeerRegistry> {
1011 let daemon = ServiceDaemon::new().map_err(|error| Error::Discovery(error.to_string()))?;
1012 let receiver = daemon
1013 .browse(NATIVE_SERVICE_TYPE)
1014 .map_err(|error| Error::Discovery(error.to_string()))?;
1015 let deadline = tokio::time::Instant::now() + timeout;
1016 let mut registry = PeerRegistry::new();
1017
1018 loop {
1019 let now = tokio::time::Instant::now();
1020 if now >= deadline {
1021 break;
1022 }
1023
1024 let wait = deadline - now;
1025 match tokio::time::timeout(wait, receiver.recv_async()).await {
1026 Ok(Ok(ServiceEvent::ServiceResolved(service))) => {
1027 observe_resolved_service(&mut registry, &service)?;
1028 }
1029 Ok(Ok(_)) => {}
1030 Ok(Err(error)) => {
1031 return Err(Error::Discovery(error.to_string()));
1032 }
1033 Err(_) => break,
1034 }
1035 }
1036
1037 daemon
1038 .stop_browse(NATIVE_SERVICE_TYPE)
1039 .map_err(|error| Error::Discovery(error.to_string()))?;
1040 let _ = daemon.shutdown();
1041 Ok(registry)
1042}
1043
1044fn observe_resolved_service(registry: &mut PeerRegistry, service: &ResolvedService) -> Result<()> {
1045 let Some(fingerprint) = service.get_property_val_str("fp") else {
1046 return Ok(());
1047 };
1048 let fingerprint = match normalized_fingerprint(fingerprint.to_string()) {
1049 Ok(fingerprint) => fingerprint,
1050 Err(_) => return Ok(()),
1051 };
1052 let Some(highest_version) = parse_txt_u16(service, "pv") else {
1053 return Ok(());
1054 };
1055 let Some(minimum_version) = parse_txt_u16(service, "minpv") else {
1056 return Ok(());
1057 };
1058 if minimum_version > highest_version
1059 || minimum_version > NATIVE_PROTOCOL_VERSION
1060 || highest_version < MIN_NATIVE_PROTOCOL_VERSION
1061 {
1062 return Ok(());
1063 }
1064
1065 let Some(quic_port) = parse_txt_u16(service, "quic_port") else {
1066 return Ok(());
1067 };
1068 let transports = service
1069 .get_property_val_str("transports")
1070 .unwrap_or_default()
1071 .split(',')
1072 .map(str::trim)
1073 .filter(|value| !value.is_empty())
1074 .map(ToOwned::to_owned)
1075 .collect::<Vec<_>>();
1076 if !transports.iter().any(|transport| transport == "quic") {
1077 return Ok(());
1078 }
1079
1080 let alias = service.get_property_val_str("alias").map(ToOwned::to_owned);
1081 let hostname = Some(service.get_hostname().trim_end_matches('.').to_string());
1082 let seen_unix_ms = system_time_unix_ms(SystemTime::now()).unwrap_or_default();
1083
1084 for address in service.get_addresses_v4() {
1085 let mut observation = PeerObservation::new(
1086 fingerprint.clone(),
1087 SocketAddr::from((address, quic_port)),
1088 seen_unix_ms,
1089 );
1090 if let Some(alias) = &alias {
1091 observation = observation.with_alias(alias.clone());
1092 }
1093 if let Some(hostname) = &hostname {
1094 observation = observation.with_hostname(hostname.clone());
1095 }
1096 for transport in &transports {
1097 observation = observation.with_transport(transport.clone());
1098 }
1099 registry.observe(observation)?;
1100 }
1101
1102 Ok(())
1103}
1104
1105fn parse_txt_u16(service: &ResolvedService, key: &str) -> Option<u16> {
1106 service.get_property_val_str(key)?.parse().ok()
1107}
1108
1109fn sanitize_dns_label(value: &str) -> String {
1110 let mut label = value
1111 .chars()
1112 .map(|ch| {
1113 if ch.is_ascii_alphanumeric() || ch == '-' {
1114 ch.to_ascii_lowercase()
1115 } else {
1116 '-'
1117 }
1118 })
1119 .collect::<String>();
1120 while label.contains("--") {
1121 label = label.replace("--", "-");
1122 }
1123 let label = label.trim_matches('-').to_string();
1124 if label.is_empty() {
1125 "fileferry-device".to_string()
1126 } else {
1127 label
1128 }
1129}
1130
1131#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1132struct DirectReceivePlan {
1133 files: Vec<DirectFilePlan>,
1134}
1135
1136#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1137struct DirectProtocolHello {
1138 protocol_version: u16,
1139 min_protocol_version: u16,
1140 client_fingerprint: String,
1141 #[serde(default)]
1142 psk: bool,
1143}
1144
1145impl DirectProtocolHello {
1146 fn new(identity: &NativeIdentity, psk: Option<&str>) -> Self {
1147 Self {
1148 protocol_version: NATIVE_PROTOCOL_VERSION,
1149 min_protocol_version: MIN_NATIVE_PROTOCOL_VERSION,
1150 client_fingerprint: identity.fingerprint().to_string(),
1151 psk: psk.is_some(),
1152 }
1153 }
1154}
1155
1156#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1157struct DirectProtocolResponse {
1158 accepted: bool,
1159 protocol_version: Option<u16>,
1160 #[serde(default)]
1161 psk_challenge: Option<String>,
1162 error: Option<String>,
1163}
1164
1165impl DirectProtocolResponse {
1166 fn accept(protocol_version: u16) -> Self {
1167 Self {
1168 accepted: true,
1169 protocol_version: Some(protocol_version),
1170 psk_challenge: None,
1171 error: None,
1172 }
1173 }
1174
1175 fn accept_with_psk(protocol_version: u16, challenge: impl Into<String>) -> Self {
1176 Self {
1177 accepted: true,
1178 protocol_version: Some(protocol_version),
1179 psk_challenge: Some(challenge.into()),
1180 error: None,
1181 }
1182 }
1183
1184 fn reject(error: impl Into<String>) -> Self {
1185 Self {
1186 accepted: false,
1187 protocol_version: None,
1188 psk_challenge: None,
1189 error: Some(error.into()),
1190 }
1191 }
1192}
1193
1194#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1195struct DirectPskProof {
1196 proof: String,
1197}
1198
1199#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1200struct DirectPskResult {
1201 accepted: bool,
1202 error: Option<String>,
1203}
1204
1205impl DirectPskResult {
1206 fn accept() -> Self {
1207 Self {
1208 accepted: true,
1209 error: None,
1210 }
1211 }
1212
1213 fn reject(error: impl Into<String>) -> Self {
1214 Self {
1215 accepted: false,
1216 error: Some(error.into()),
1217 }
1218 }
1219}
1220
1221#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1222struct DirectFilePlan {
1223 relative_path: PathBuf,
1224 action: DirectFileAction,
1225 offset: u64,
1226}
1227
1228#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1229#[serde(rename_all = "snake_case")]
1230enum DirectFileAction {
1231 Receive,
1232 Skip,
1233}
1234
1235#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1236struct DirectPayloadHeader {
1237 relative_path: PathBuf,
1238 offset: u64,
1239 bytes: u64,
1240}
1241
1242#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1243struct DirectAck {
1244 ok: bool,
1245 error: Option<String>,
1246}
1247
1248pub fn validate_send_paths(paths: &[PathBuf]) -> Result<()> {
1249 if paths.is_empty() {
1250 return Err(Error::InvalidInput(
1251 "at least one file path is required".to_string(),
1252 ));
1253 }
1254
1255 for path in paths {
1256 if path.as_os_str().is_empty() {
1257 return Err(Error::InvalidInput("empty file path".to_string()));
1258 }
1259 }
1260
1261 Ok(())
1262}
1263
1264pub fn display_path(path: &Path) -> String {
1265 path.to_string_lossy().into_owned()
1266}
1267
1268pub async fn build_manifest(paths: &[PathBuf]) -> Result<TransferManifest> {
1269 validate_send_paths(paths)?;
1270
1271 let sources = collect_manifest_sources(paths)?;
1272 let mut seen = BTreeSet::new();
1273 for source in &sources {
1274 if !seen.insert(source.relative_path.clone()) {
1275 return Err(Error::InvalidInput(format!(
1276 "duplicate transfer path: {}",
1277 display_path(&source.relative_path)
1278 )));
1279 }
1280 }
1281
1282 Ok(TransferManifest {
1283 version: MANIFEST_VERSION,
1284 session_id: Uuid::new_v4(),
1285 chunk_size: DEFAULT_CHUNK_SIZE,
1286 entries: build_manifest_entries(sources).await?,
1287 })
1288}
1289
1290async fn build_manifest_entries(sources: Vec<ManifestSource>) -> Result<Vec<ManifestEntry>> {
1291 if sources.len() <= 1 {
1292 let mut entries = Vec::with_capacity(sources.len());
1293 for source in sources {
1294 entries.push(build_manifest_entry(source).await?);
1295 }
1296 return Ok(entries);
1297 }
1298
1299 let len = sources.len();
1300 let concurrency = std::thread::available_parallelism()
1301 .map(|parallelism| parallelism.get())
1302 .unwrap_or(1)
1303 .clamp(1, DEFAULT_MANIFEST_CONCURRENCY)
1304 .min(len);
1305 let mut sources = sources.into_iter().enumerate();
1306 let mut tasks = tokio::task::JoinSet::new();
1307 let mut entries = vec![None; len];
1308
1309 for _ in 0..concurrency {
1310 if let Some((index, source)) = sources.next() {
1311 tasks.spawn(async move {
1312 build_manifest_entry(source)
1313 .await
1314 .map(|entry| (index, entry))
1315 });
1316 }
1317 }
1318
1319 while let Some(result) = tasks.join_next().await {
1320 let (index, entry) =
1321 result.map_err(|error| Error::Protocol(format!("manifest task failed: {error}")))??;
1322 entries[index] = Some(entry);
1323
1324 if let Some((index, source)) = sources.next() {
1325 tasks.spawn(async move {
1326 build_manifest_entry(source)
1327 .await
1328 .map(|entry| (index, entry))
1329 });
1330 }
1331 }
1332
1333 Ok(entries
1334 .into_iter()
1335 .map(|entry| entry.expect("manifest task should fill every entry"))
1336 .collect())
1337}
1338
1339pub fn safe_destination_path(destination: &Path, relative_path: &Path) -> Result<PathBuf> {
1340 validate_relative_transfer_path(relative_path)?;
1341 Ok(destination.join(relative_path))
1342}
1343
1344pub async fn decide_resume(
1345 destination: &Path,
1346 entry: &ManifestEntry,
1347 chunk_size: u64,
1348) -> Result<ResumeDecision> {
1349 if entry.kind != ManifestEntryKind::File {
1350 return Ok(ResumeDecision::Create);
1351 }
1352
1353 let path = safe_destination_path(destination, &entry.relative_path)?;
1354 let metadata = match tokio::fs::metadata(&path).await {
1355 Ok(metadata) => metadata,
1356 Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
1357 return Ok(ResumeDecision::Create);
1358 }
1359 Err(source) => {
1360 return Err(Error::IoPath { path, source });
1361 }
1362 };
1363
1364 if !metadata.is_file() {
1365 return Ok(ResumeDecision::Conflict(format!(
1366 "{} already exists and is not a regular file",
1367 display_path(&path)
1368 )));
1369 }
1370
1371 if metadata.len() == entry.size {
1372 let expected = entry
1373 .blake3
1374 .as_deref()
1375 .ok_or_else(|| Error::Protocol("file manifest entry is missing BLAKE3".to_string()))?;
1376 let actual = hash_file(&path).await?;
1377 return if actual == expected {
1378 Ok(ResumeDecision::Skip)
1379 } else {
1380 Ok(ResumeDecision::Conflict(format!(
1381 "{} already exists with different contents",
1382 display_path(&path)
1383 )))
1384 };
1385 }
1386
1387 if metadata.len() > entry.size {
1388 return Ok(ResumeDecision::Conflict(format!(
1389 "{} is larger than incoming file",
1390 display_path(&path)
1391 )));
1392 }
1393
1394 let verified_offset = verified_prefix_offset(&path, entry, metadata.len(), chunk_size).await?;
1395 if verified_offset > 0 {
1396 Ok(ResumeDecision::ResumeFrom(verified_offset))
1397 } else {
1398 Ok(ResumeDecision::Conflict(format!(
1399 "{} already exists and does not match a verified prefix",
1400 display_path(&path)
1401 )))
1402 }
1403}
1404
1405pub async fn send_direct_file(
1406 request: &SendRequest,
1407 identity: &NativeIdentity,
1408 mut events: impl FnMut(TransferEvent),
1409) -> Result<TransferSummary> {
1410 send_direct_file_with_control(request, identity, TransferControl::new(), &mut events).await
1411}
1412
1413pub async fn send_direct_file_with_control(
1414 request: &SendRequest,
1415 identity: &NativeIdentity,
1416 control: TransferControl,
1417 mut events: impl FnMut(TransferEvent),
1418) -> Result<TransferSummary> {
1419 send_direct_file_with_control_and_trust(request, identity, control, &mut events, |_| Ok(()))
1420 .await
1421}
1422
1423pub async fn send_direct_file_with_control_and_trust(
1424 request: &SendRequest,
1425 identity: &NativeIdentity,
1426 control: TransferControl,
1427 mut events: impl FnMut(TransferEvent),
1428 mut confirm_unpinned_receiver: impl FnMut(&str) -> Result<()>,
1429) -> Result<TransferSummary> {
1430 ensure_rustls_provider();
1431
1432 control.check_cancelled()?;
1433 let manifest = build_manifest(request.paths()).await?;
1434 let sources = manifest_source_map(request.paths())?;
1435 events(TransferEvent::SessionStarted {
1436 direction: TransferDirection::Send,
1437 session_id: manifest.session_id,
1438 files_total: manifest.file_entries().count(),
1439 bytes_total: manifest.total_file_bytes(),
1440 });
1441 control.check_cancelled()?;
1442
1443 let (client_config, server_fingerprint) = client_config_capturing_server_fingerprint()?;
1444 let mut endpoint = quinn::Endpoint::client(SocketAddr::from(([0, 0, 0, 0], 0)))?;
1445 endpoint.set_default_client_config(client_config);
1446 let connection = endpoint
1447 .connect(request.peer().address(), "localhost")?
1448 .await?;
1449 let actual = captured_server_fingerprint(&server_fingerprint)?;
1450 match request.peer().expected_fingerprint() {
1451 Some(expected) if actual != expected => {
1452 connection.close(1u32.into(), b"fingerprint mismatch");
1453 return Err(Error::PeerFingerprintMismatch {
1454 expected: expected.to_string(),
1455 actual,
1456 });
1457 }
1458 Some(_) => {}
1459 None => {
1460 if let Err(error) = confirm_unpinned_receiver(&actual) {
1461 connection.close(1u32.into(), b"receiver fingerprint not trusted");
1462 return Err(error);
1463 }
1464 }
1465 }
1466 let (mut send, mut recv) = connection.open_bi().await?;
1467
1468 send.write_all(DIRECT_MAGIC).await?;
1469 write_json_frame(
1470 &mut send,
1471 &DirectProtocolHello::new(identity, request.psk()),
1472 )
1473 .await?;
1474 let protocol: DirectProtocolResponse = read_json_frame(&mut recv).await?;
1475 if !protocol.accepted {
1476 return Err(Error::Protocol(protocol.error.unwrap_or_else(|| {
1477 "receiver rejected protocol negotiation".to_string()
1478 })));
1479 }
1480 if protocol.protocol_version != Some(NATIVE_PROTOCOL_VERSION) {
1481 return Err(Error::Protocol(format!(
1482 "receiver selected unsupported protocol version {:?}",
1483 protocol.protocol_version
1484 )));
1485 }
1486 match (request.psk(), protocol.psk_challenge.as_deref()) {
1487 (Some(psk), Some(challenge)) => {
1488 let proof = DirectPskProof {
1489 proof: psk_proof(
1490 psk,
1491 challenge,
1492 identity.fingerprint(),
1493 NATIVE_PROTOCOL_VERSION,
1494 )?,
1495 };
1496 write_json_frame(&mut send, &proof).await?;
1497 let auth: DirectPskResult = read_json_frame(&mut recv).await?;
1498 if !auth.accepted {
1499 return Err(Error::Authentication(
1500 auth.error
1501 .unwrap_or_else(|| "PSK authentication rejected".to_string()),
1502 ));
1503 }
1504 }
1505 (Some(_), None) => {
1506 return Err(Error::Authentication(
1507 "receiver is not configured for PSK mode".to_string(),
1508 ));
1509 }
1510 (None, Some(_)) => {
1511 return Err(Error::Authentication(
1512 "receiver requires PSK authentication".to_string(),
1513 ));
1514 }
1515 (None, None) => {}
1516 }
1517
1518 write_json_frame(&mut send, &manifest).await?;
1519
1520 let plan: DirectReceivePlan = read_json_frame(&mut recv).await?;
1521 let mut summaries = Vec::new();
1522 for file_plan in plan.files {
1523 let Some(entry) = manifest
1524 .file_entries()
1525 .find(|entry| entry.relative_path == file_plan.relative_path)
1526 else {
1527 return Err(Error::Protocol(format!(
1528 "receiver requested unknown file: {}",
1529 display_path(&file_plan.relative_path)
1530 )));
1531 };
1532
1533 let Some(source_path) = sources.get(&file_plan.relative_path) else {
1534 return Err(Error::Protocol(format!(
1535 "missing source for manifest file: {}",
1536 display_path(&file_plan.relative_path)
1537 )));
1538 };
1539
1540 if file_plan.action == DirectFileAction::Skip {
1541 let blake3 = entry.blake3.clone().unwrap_or_default();
1542 events(TransferEvent::FileFinished {
1543 direction: TransferDirection::Send,
1544 file_name: display_path(&entry.relative_path),
1545 bytes: entry.size,
1546 blake3: blake3.clone(),
1547 status: TransferFileStatus::Skipped,
1548 });
1549 summaries.push(TransferFileSummary {
1550 path: source_path.clone(),
1551 relative_path: entry.relative_path.clone(),
1552 bytes: entry.size,
1553 blake3,
1554 status: TransferFileStatus::Skipped,
1555 });
1556 continue;
1557 }
1558
1559 match send_payload_file(
1560 &mut send,
1561 source_path,
1562 entry,
1563 file_plan.offset,
1564 &control,
1565 &mut events,
1566 )
1567 .await
1568 {
1569 Ok(()) => {}
1570 Err(Error::TransferCancelled) => {
1571 events(TransferEvent::SessionCancelled {
1572 direction: TransferDirection::Send,
1573 session_id: manifest.session_id,
1574 });
1575 connection.close(1u32.into(), b"cancelled");
1576 return Err(Error::TransferCancelled);
1577 }
1578 Err(error) => return Err(error),
1579 }
1580 let status = if file_plan.offset > 0 {
1581 TransferFileStatus::Resumed
1582 } else {
1583 TransferFileStatus::Sent
1584 };
1585 let blake3 = entry.blake3.clone().unwrap_or_default();
1586 events(TransferEvent::FileFinished {
1587 direction: TransferDirection::Send,
1588 file_name: display_path(&entry.relative_path),
1589 bytes: entry.size,
1590 blake3: blake3.clone(),
1591 status,
1592 });
1593 summaries.push(TransferFileSummary {
1594 path: source_path.clone(),
1595 relative_path: entry.relative_path.clone(),
1596 bytes: entry.size,
1597 blake3,
1598 status,
1599 });
1600 }
1601 send.finish()?;
1602
1603 let ack: DirectAck = read_json_frame(&mut recv).await?;
1604 if !ack.ok {
1605 return Err(Error::Transfer(
1606 ack.error
1607 .unwrap_or_else(|| "receiver rejected transfer".to_string()),
1608 ));
1609 }
1610
1611 connection.close(0u32.into(), b"done");
1612 endpoint.wait_idle().await;
1613
1614 let summary = summary_from_files(&manifest, summaries)?;
1615 events(TransferEvent::SessionFinished {
1616 direction: TransferDirection::Send,
1617 files_total: summary.files.len(),
1618 bytes_total: summary.bytes,
1619 blake3: summary.blake3.clone(),
1620 });
1621 Ok(summary)
1622}
1623
1624pub async fn receive_direct_file(
1625 request: &ReceiveRequest,
1626 destination: impl AsRef<Path>,
1627 identity: &NativeIdentity,
1628 events: impl FnMut(TransferEvent),
1629) -> Result<TransferSummary> {
1630 receive_direct_file_with_control(
1631 request,
1632 destination,
1633 identity,
1634 TransferControl::new(),
1635 events,
1636 )
1637 .await
1638}
1639
1640pub async fn receive_direct_file_with_control(
1641 request: &ReceiveRequest,
1642 destination: impl AsRef<Path>,
1643 identity: &NativeIdentity,
1644 control: TransferControl,
1645 mut events: impl FnMut(TransferEvent),
1646) -> Result<TransferSummary> {
1647 ensure_rustls_provider();
1648
1649 control.check_cancelled()?;
1650 let destination = destination.as_ref();
1651 tokio::fs::create_dir_all(destination)
1652 .await
1653 .map_err(|source| Error::IoPath {
1654 path: destination.to_path_buf(),
1655 source,
1656 })?;
1657
1658 let server_config =
1659 quinn::ServerConfig::with_single_cert(identity.cert_chain(), identity.private_key())?;
1660 let endpoint = quinn::Endpoint::server(server_config, request.listen())?;
1661 let incoming = endpoint.accept().await.ok_or(Error::NoIncomingConnection)?;
1662 let connection = incoming.await?;
1663 let (mut send, mut recv) = connection.accept_bi().await?;
1664
1665 let result = receive_stream_file(
1666 destination,
1667 request.psk(),
1668 &mut recv,
1669 &mut send,
1670 &control,
1671 &mut events,
1672 )
1673 .await;
1674 let ack = match &result {
1675 Ok(_) => DirectAck {
1676 ok: true,
1677 error: None,
1678 },
1679 Err(error) => DirectAck {
1680 ok: false,
1681 error: Some(error.to_string()),
1682 },
1683 };
1684 write_json_frame(&mut send, &ack).await?;
1685 send.finish()?;
1686 let _ = tokio::time::timeout(Duration::from_secs(1), connection.closed()).await;
1687 endpoint.close(0u32.into(), b"done");
1688 endpoint.wait_idle().await;
1689
1690 result
1691}
1692
1693async fn receive_stream_file(
1694 destination: &Path,
1695 psk: Option<&str>,
1696 recv: &mut quinn::RecvStream,
1697 send: &mut quinn::SendStream,
1698 control: &TransferControl,
1699 events: &mut impl FnMut(TransferEvent),
1700) -> Result<TransferSummary> {
1701 let mut magic = [0; DIRECT_MAGIC.len()];
1702 recv.read_exact(&mut magic).await?;
1703 if &magic != DIRECT_MAGIC {
1704 return Err(Error::Protocol("bad direct-transfer magic".to_string()));
1705 }
1706
1707 let hello: DirectProtocolHello = read_json_frame(recv).await?;
1708 let protocol = negotiate_direct_protocol(&hello, psk);
1709 write_json_frame(send, &protocol).await?;
1710 if !protocol.accepted {
1711 return Err(Error::Protocol(protocol.error.unwrap_or_else(|| {
1712 "unsupported direct protocol version".to_string()
1713 })));
1714 }
1715 if let (Some(psk), Some(challenge)) = (psk, protocol.psk_challenge.as_deref()) {
1716 let proof: DirectPskProof = read_json_frame(recv).await?;
1717 let expected = psk_proof(
1718 psk,
1719 challenge,
1720 &hello.client_fingerprint,
1721 protocol.protocol_version.unwrap_or(NATIVE_PROTOCOL_VERSION),
1722 )?;
1723 if proof.proof != expected {
1724 write_json_frame(send, &DirectPskResult::reject("PSK proof mismatch")).await?;
1725 return Err(Error::Authentication("PSK proof mismatch".to_string()));
1726 }
1727 write_json_frame(send, &DirectPskResult::accept()).await?;
1728 }
1729
1730 let manifest: TransferManifest = read_json_frame(recv).await?;
1731 validate_manifest(&manifest)?;
1732 events(TransferEvent::SessionStarted {
1733 direction: TransferDirection::Receive,
1734 session_id: manifest.session_id,
1735 files_total: manifest.file_entries().count(),
1736 bytes_total: manifest.total_file_bytes(),
1737 });
1738 control.check_cancelled()?;
1739
1740 for entry in &manifest.entries {
1741 if entry.kind == ManifestEntryKind::Directory {
1742 let path = safe_destination_path(destination, &entry.relative_path)?;
1743 tokio::fs::create_dir_all(&path)
1744 .await
1745 .map_err(|source| Error::IoPath { path, source })?;
1746 }
1747 }
1748
1749 let mut plan = DirectReceivePlan { files: Vec::new() };
1750 for entry in manifest.file_entries() {
1751 let decision = decide_resume(destination, entry, manifest.chunk_size).await?;
1752 let (action, offset) = match decision {
1753 ResumeDecision::Create => (DirectFileAction::Receive, 0),
1754 ResumeDecision::Skip => (DirectFileAction::Skip, entry.size),
1755 ResumeDecision::ResumeFrom(offset) => (DirectFileAction::Receive, offset),
1756 ResumeDecision::Conflict(message) => return Err(Error::InvalidInput(message)),
1757 };
1758 plan.files.push(DirectFilePlan {
1759 relative_path: entry.relative_path.clone(),
1760 action,
1761 offset,
1762 });
1763 }
1764 write_json_frame(send, &plan).await?;
1765
1766 let mut summaries = Vec::new();
1767 for file_plan in &plan.files {
1768 let Some(entry) = manifest
1769 .file_entries()
1770 .find(|entry| entry.relative_path == file_plan.relative_path)
1771 else {
1772 return Err(Error::Protocol(format!(
1773 "planned file missing from manifest: {}",
1774 display_path(&file_plan.relative_path)
1775 )));
1776 };
1777
1778 let final_path = safe_destination_path(destination, &entry.relative_path)?;
1779 if file_plan.action == DirectFileAction::Skip {
1780 let blake3 = entry.blake3.clone().unwrap_or_default();
1781 events(TransferEvent::FileFinished {
1782 direction: TransferDirection::Receive,
1783 file_name: display_path(&entry.relative_path),
1784 bytes: entry.size,
1785 blake3: blake3.clone(),
1786 status: TransferFileStatus::Skipped,
1787 });
1788 summaries.push(TransferFileSummary {
1789 path: final_path,
1790 relative_path: entry.relative_path.clone(),
1791 bytes: entry.size,
1792 blake3,
1793 status: TransferFileStatus::Skipped,
1794 });
1795 continue;
1796 }
1797
1798 match receive_payload_file(recv, destination, entry, file_plan.offset, control, events)
1799 .await
1800 {
1801 Ok(()) => {}
1802 Err(Error::TransferCancelled) => {
1803 events(TransferEvent::SessionCancelled {
1804 direction: TransferDirection::Receive,
1805 session_id: manifest.session_id,
1806 });
1807 return Err(Error::TransferCancelled);
1808 }
1809 Err(error) => return Err(error),
1810 }
1811 let actual_hash = hash_file(&final_path).await?;
1812 let expected_hash = entry
1813 .blake3
1814 .as_deref()
1815 .ok_or_else(|| Error::Protocol("file manifest entry is missing BLAKE3".to_string()))?;
1816 if actual_hash != expected_hash {
1817 return Err(Error::Transfer(format!(
1818 "BLAKE3 hash mismatch for {}",
1819 display_path(&entry.relative_path)
1820 )));
1821 }
1822
1823 let status = if file_plan.offset > 0 {
1824 TransferFileStatus::Resumed
1825 } else {
1826 TransferFileStatus::Received
1827 };
1828 events(TransferEvent::FileFinished {
1829 direction: TransferDirection::Receive,
1830 file_name: display_path(&entry.relative_path),
1831 bytes: entry.size,
1832 blake3: actual_hash.clone(),
1833 status,
1834 });
1835 summaries.push(TransferFileSummary {
1836 path: final_path,
1837 relative_path: entry.relative_path.clone(),
1838 bytes: entry.size,
1839 blake3: actual_hash,
1840 status,
1841 });
1842 }
1843
1844 let summary = summary_from_files(&manifest, summaries)?;
1845 events(TransferEvent::SessionFinished {
1846 direction: TransferDirection::Receive,
1847 files_total: summary.files.len(),
1848 bytes_total: summary.bytes,
1849 blake3: summary.blake3.clone(),
1850 });
1851 Ok(summary)
1852}
1853
1854async fn send_payload_file(
1855 send: &mut quinn::SendStream,
1856 source_path: &Path,
1857 entry: &ManifestEntry,
1858 offset: u64,
1859 control: &TransferControl,
1860 events: &mut impl FnMut(TransferEvent),
1861) -> Result<()> {
1862 if offset > entry.size {
1863 return Err(Error::Protocol(format!(
1864 "resume offset exceeds file size for {}",
1865 display_path(&entry.relative_path)
1866 )));
1867 }
1868
1869 let header = DirectPayloadHeader {
1870 relative_path: entry.relative_path.clone(),
1871 offset,
1872 bytes: entry.size - offset,
1873 };
1874 write_json_frame(send, &header).await?;
1875 events(TransferEvent::FileStarted {
1876 direction: TransferDirection::Send,
1877 file_name: display_path(&entry.relative_path),
1878 bytes_total: entry.size,
1879 resume_offset: offset,
1880 });
1881
1882 let mut file = tokio::fs::File::open(source_path)
1883 .await
1884 .map_err(|source| Error::IoPath {
1885 path: source_path.to_path_buf(),
1886 source,
1887 })?;
1888 file.seek(SeekFrom::Start(offset))
1889 .await
1890 .map_err(|source| Error::IoPath {
1891 path: source_path.to_path_buf(),
1892 source,
1893 })?;
1894
1895 let mut buffer = vec![0; IO_BUFFER_SIZE];
1896 let mut bytes_done = offset;
1897 while bytes_done < entry.size {
1898 control.check_cancelled()?;
1899 let remaining = entry.size - bytes_done;
1900 let read_size = buffer.len().min(remaining as usize);
1901 let read = file
1902 .read(&mut buffer[..read_size])
1903 .await
1904 .map_err(|source| Error::IoPath {
1905 path: source_path.to_path_buf(),
1906 source,
1907 })?;
1908 if read == 0 {
1909 return Err(Error::Protocol(format!(
1910 "source file ended early: {}",
1911 display_path(source_path)
1912 )));
1913 }
1914 send.write_all(&buffer[..read]).await?;
1915 bytes_done += read as u64;
1916 events(TransferEvent::Progress {
1917 direction: TransferDirection::Send,
1918 file_name: display_path(&entry.relative_path),
1919 bytes_done,
1920 bytes_total: entry.size,
1921 });
1922 control.check_cancelled()?;
1923 }
1924
1925 Ok(())
1926}
1927
1928async fn receive_payload_file(
1929 recv: &mut quinn::RecvStream,
1930 destination: &Path,
1931 entry: &ManifestEntry,
1932 offset: u64,
1933 control: &TransferControl,
1934 events: &mut impl FnMut(TransferEvent),
1935) -> Result<()> {
1936 let header: DirectPayloadHeader = read_json_frame(recv).await?;
1937 if header.relative_path != entry.relative_path
1938 || header.offset != offset
1939 || header.bytes != entry.size - offset
1940 {
1941 return Err(Error::Protocol(format!(
1942 "payload header does not match receive plan for {}",
1943 display_path(&entry.relative_path)
1944 )));
1945 }
1946 events(TransferEvent::FileStarted {
1947 direction: TransferDirection::Receive,
1948 file_name: display_path(&entry.relative_path),
1949 bytes_total: entry.size,
1950 resume_offset: offset,
1951 });
1952
1953 let final_path = safe_destination_path(destination, &entry.relative_path)?;
1954 if let Some(parent) = final_path.parent() {
1955 tokio::fs::create_dir_all(parent)
1956 .await
1957 .map_err(|source| Error::IoPath {
1958 path: parent.to_path_buf(),
1959 source,
1960 })?;
1961 }
1962
1963 let write_path = if offset == 0 {
1964 temp_path_for(&final_path)
1965 } else {
1966 final_path.clone()
1967 };
1968
1969 let mut file = if offset == 0 {
1970 tokio::fs::File::create(&write_path)
1971 .await
1972 .map_err(|source| Error::IoPath {
1973 path: write_path.clone(),
1974 source,
1975 })?
1976 } else {
1977 let mut file = tokio::fs::OpenOptions::new()
1978 .write(true)
1979 .open(&write_path)
1980 .await
1981 .map_err(|source| Error::IoPath {
1982 path: write_path.clone(),
1983 source,
1984 })?;
1985 file.set_len(offset).await.map_err(|source| Error::IoPath {
1986 path: write_path.clone(),
1987 source,
1988 })?;
1989 file.seek(SeekFrom::Start(offset))
1990 .await
1991 .map_err(|source| Error::IoPath {
1992 path: write_path.clone(),
1993 source,
1994 })?;
1995 file
1996 };
1997
1998 let mut bytes_done = offset;
1999 let mut bytes_remaining = header.bytes;
2000 let mut buffer = vec![0; IO_BUFFER_SIZE];
2001 while bytes_remaining > 0 {
2002 if control.is_cancelled() {
2003 if offset == 0 {
2004 let _ = tokio::fs::remove_file(&write_path).await;
2005 }
2006 return Err(Error::TransferCancelled);
2007 }
2008 let read_size = buffer.len().min(bytes_remaining as usize);
2009 let read = match recv.read(&mut buffer[..read_size]).await {
2010 Ok(Some(read)) => read,
2011 Ok(None) => {
2012 if offset == 0 {
2013 let _ = tokio::fs::remove_file(&write_path).await;
2014 }
2015 return Err(Error::Protocol(
2016 "stream ended before file payload".to_string(),
2017 ));
2018 }
2019 Err(error) => {
2020 if offset == 0 {
2021 let _ = tokio::fs::remove_file(&write_path).await;
2022 }
2023 return Err(Error::Read(error));
2024 }
2025 };
2026
2027 file.write_all(&buffer[..read])
2028 .await
2029 .map_err(|source| Error::IoPath {
2030 path: write_path.clone(),
2031 source,
2032 })?;
2033 bytes_done += read as u64;
2034 bytes_remaining -= read as u64;
2035 events(TransferEvent::Progress {
2036 direction: TransferDirection::Receive,
2037 file_name: display_path(&entry.relative_path),
2038 bytes_done,
2039 bytes_total: entry.size,
2040 });
2041 if control.is_cancelled() {
2042 if offset == 0 {
2043 let _ = tokio::fs::remove_file(&write_path).await;
2044 }
2045 return Err(Error::TransferCancelled);
2046 }
2047 }
2048
2049 file.flush().await.map_err(|source| Error::IoPath {
2050 path: write_path.clone(),
2051 source,
2052 })?;
2053 drop(file);
2054
2055 if offset == 0 {
2056 tokio::fs::rename(&write_path, &final_path)
2057 .await
2058 .map_err(|source| Error::IoPath {
2059 path: final_path,
2060 source,
2061 })?;
2062 }
2063
2064 Ok(())
2065}
2066
2067async fn write_json_frame<T: Serialize>(send: &mut quinn::SendStream, value: &T) -> Result<()> {
2068 let bytes = serde_json::to_vec(value).map_err(|error| Error::Protocol(error.to_string()))?;
2069 if bytes.len() > MAX_FRAME_LEN {
2070 return Err(Error::Protocol("frame exceeds maximum length".to_string()));
2071 }
2072 send.write_all(&(bytes.len() as u32).to_be_bytes()).await?;
2073 send.write_all(&bytes).await?;
2074 Ok(())
2075}
2076
2077async fn read_json_frame<T: for<'de> Deserialize<'de>>(recv: &mut quinn::RecvStream) -> Result<T> {
2078 let mut len = [0; 4];
2079 recv.read_exact(&mut len).await?;
2080 let len = u32::from_be_bytes(len) as usize;
2081 if len > MAX_FRAME_LEN {
2082 return Err(Error::Protocol("frame exceeds maximum length".to_string()));
2083 }
2084
2085 let mut bytes = vec![0; len];
2086 recv.read_exact(&mut bytes).await?;
2087 serde_json::from_slice(&bytes).map_err(|error| Error::Protocol(error.to_string()))
2088}
2089
2090fn negotiate_direct_protocol(
2091 hello: &DirectProtocolHello,
2092 psk: Option<&str>,
2093) -> DirectProtocolResponse {
2094 if hello.min_protocol_version > NATIVE_PROTOCOL_VERSION {
2095 return DirectProtocolResponse::reject(format!(
2096 "peer requires protocol version {}, local max is {}",
2097 hello.min_protocol_version, NATIVE_PROTOCOL_VERSION
2098 ));
2099 }
2100
2101 if hello.protocol_version < MIN_NATIVE_PROTOCOL_VERSION {
2102 return DirectProtocolResponse::reject(format!(
2103 "peer max protocol version {} is below local minimum {}",
2104 hello.protocol_version, MIN_NATIVE_PROTOCOL_VERSION
2105 ));
2106 }
2107
2108 let selected_version = NATIVE_PROTOCOL_VERSION.min(hello.protocol_version);
2109 match (psk, hello.psk) {
2110 (Some(_), false) => DirectProtocolResponse::reject("receiver requires PSK authentication"),
2111 (Some(_), true) => {
2112 DirectProtocolResponse::accept_with_psk(selected_version, Uuid::new_v4().to_string())
2113 }
2114 (None, true) => DirectProtocolResponse::reject("receiver is not configured for PSK mode"),
2115 (None, false) => DirectProtocolResponse::accept(selected_version),
2116 }
2117}
2118
2119#[derive(Debug, Clone)]
2120struct ManifestSource {
2121 source_path: PathBuf,
2122 relative_path: PathBuf,
2123}
2124
2125fn collect_manifest_sources(paths: &[PathBuf]) -> Result<Vec<ManifestSource>> {
2126 let mut sources = Vec::new();
2127 for path in paths {
2128 let metadata = std::fs::symlink_metadata(path).map_err(|source| Error::IoPath {
2129 path: path.clone(),
2130 source,
2131 })?;
2132 if metadata.file_type().is_symlink() {
2133 return Err(Error::InvalidInput(format!(
2134 "{} is a symlink; symlink transfers are not supported",
2135 display_path(path)
2136 )));
2137 }
2138
2139 let root_name = path
2140 .file_name()
2141 .ok_or_else(|| Error::InvalidInput(format!("{} has no file name", display_path(path))))?
2142 .to_os_string();
2143 let relative_path = PathBuf::from(root_name);
2144
2145 if metadata.is_dir() {
2146 sources.push(ManifestSource {
2147 source_path: path.clone(),
2148 relative_path: relative_path.clone(),
2149 });
2150 collect_dir_sources(path, &relative_path, &mut sources)?;
2151 } else if metadata.is_file() {
2152 sources.push(ManifestSource {
2153 source_path: path.clone(),
2154 relative_path,
2155 });
2156 } else {
2157 return Err(Error::InvalidInput(format!(
2158 "{} is not a regular file or directory",
2159 display_path(path)
2160 )));
2161 }
2162 }
2163
2164 Ok(sources)
2165}
2166
2167fn collect_dir_sources(
2168 dir: &Path,
2169 relative_dir: &Path,
2170 sources: &mut Vec<ManifestSource>,
2171) -> Result<()> {
2172 let mut entries = std::fs::read_dir(dir)
2173 .map_err(|source| Error::IoPath {
2174 path: dir.to_path_buf(),
2175 source,
2176 })?
2177 .collect::<std::result::Result<Vec<_>, _>>()
2178 .map_err(|source| Error::IoPath {
2179 path: dir.to_path_buf(),
2180 source,
2181 })?;
2182 entries.sort_by_key(|entry| entry.file_name());
2183
2184 for entry in entries {
2185 let path = entry.path();
2186 let metadata = std::fs::symlink_metadata(&path).map_err(|source| Error::IoPath {
2187 path: path.clone(),
2188 source,
2189 })?;
2190 if metadata.file_type().is_symlink() {
2191 return Err(Error::InvalidInput(format!(
2192 "{} is a symlink; symlink transfers are not supported",
2193 display_path(&path)
2194 )));
2195 }
2196
2197 let relative_path = relative_dir.join(entry.file_name());
2198 if metadata.is_dir() {
2199 sources.push(ManifestSource {
2200 source_path: path.clone(),
2201 relative_path: relative_path.clone(),
2202 });
2203 collect_dir_sources(&path, &relative_path, sources)?;
2204 } else if metadata.is_file() {
2205 sources.push(ManifestSource {
2206 source_path: path,
2207 relative_path,
2208 });
2209 }
2210 }
2211
2212 Ok(())
2213}
2214
2215fn manifest_source_map(paths: &[PathBuf]) -> Result<BTreeMap<PathBuf, PathBuf>> {
2216 Ok(collect_manifest_sources(paths)?
2217 .into_iter()
2218 .filter_map(|source| {
2219 let metadata = std::fs::metadata(&source.source_path).ok()?;
2220 metadata
2221 .is_file()
2222 .then_some((source.relative_path, source.source_path))
2223 })
2224 .collect())
2225}
2226
2227async fn build_manifest_entry(source: ManifestSource) -> Result<ManifestEntry> {
2228 validate_relative_transfer_path(&source.relative_path)?;
2229 let metadata = tokio::fs::metadata(&source.source_path)
2230 .await
2231 .map_err(|source_error| Error::IoPath {
2232 path: source.source_path.clone(),
2233 source: source_error,
2234 })?;
2235 let unix_mode = unix_mode(&metadata);
2236 let modified_unix_ms = modified_unix_ms(&metadata);
2237
2238 if metadata.is_dir() {
2239 return Ok(ManifestEntry {
2240 relative_path: source.relative_path,
2241 kind: ManifestEntryKind::Directory,
2242 size: 0,
2243 blake3: None,
2244 chunks: Vec::new(),
2245 unix_mode,
2246 modified_unix_ms,
2247 });
2248 }
2249
2250 let (blake3, chunks) = hash_file_with_chunks(&source.source_path, DEFAULT_CHUNK_SIZE).await?;
2251 Ok(ManifestEntry {
2252 relative_path: source.relative_path,
2253 kind: ManifestEntryKind::File,
2254 size: metadata.len(),
2255 blake3: Some(blake3),
2256 chunks,
2257 unix_mode,
2258 modified_unix_ms,
2259 })
2260}
2261
2262fn validate_manifest(manifest: &TransferManifest) -> Result<()> {
2263 if manifest.version != MANIFEST_VERSION {
2264 return Err(Error::Protocol(format!(
2265 "unsupported manifest version {}",
2266 manifest.version
2267 )));
2268 }
2269 if manifest.chunk_size == 0 {
2270 return Err(Error::Protocol("manifest chunk size is zero".to_string()));
2271 }
2272
2273 let mut seen = BTreeSet::new();
2274 for entry in &manifest.entries {
2275 validate_relative_transfer_path(&entry.relative_path)?;
2276 if !seen.insert(entry.relative_path.clone()) {
2277 return Err(Error::Protocol(format!(
2278 "duplicate manifest path: {}",
2279 display_path(&entry.relative_path)
2280 )));
2281 }
2282 if entry.kind == ManifestEntryKind::File && entry.blake3.is_none() {
2283 return Err(Error::Protocol(format!(
2284 "file manifest entry is missing BLAKE3: {}",
2285 display_path(&entry.relative_path)
2286 )));
2287 }
2288 }
2289
2290 Ok(())
2291}
2292
2293fn validate_relative_transfer_path(path: &Path) -> Result<()> {
2294 if path.as_os_str().is_empty() || path.is_absolute() {
2295 return Err(Error::InvalidInput(format!(
2296 "unsafe transfer path: {}",
2297 display_path(path)
2298 )));
2299 }
2300
2301 let mut normal_components = 0usize;
2302 for component in path.components() {
2303 match component {
2304 Component::Normal(value) => {
2305 let Some(name) = value.to_str() else {
2306 return Err(Error::InvalidInput(format!(
2307 "transfer path must be UTF-8: {}",
2308 display_path(path)
2309 )));
2310 };
2311 validate_path_component(name, path)?;
2312 normal_components += 1;
2313 }
2314 Component::CurDir
2315 | Component::ParentDir
2316 | Component::RootDir
2317 | Component::Prefix(_) => {
2318 return Err(Error::InvalidInput(format!(
2319 "unsafe transfer path: {}",
2320 display_path(path)
2321 )));
2322 }
2323 }
2324 }
2325
2326 if normal_components == 0 {
2327 return Err(Error::InvalidInput(format!(
2328 "unsafe transfer path: {}",
2329 display_path(path)
2330 )));
2331 }
2332
2333 Ok(())
2334}
2335
2336fn validate_path_component(component: &str, full_path: &Path) -> Result<()> {
2337 if component.is_empty()
2338 || component.contains('\\')
2339 || component.contains('/')
2340 || component.contains(':')
2341 || component.ends_with(' ')
2342 || component.ends_with('.')
2343 || is_windows_reserved_name(component)
2344 {
2345 return Err(Error::InvalidInput(format!(
2346 "unsafe transfer path: {}",
2347 display_path(full_path)
2348 )));
2349 }
2350
2351 Ok(())
2352}
2353
2354fn is_windows_reserved_name(component: &str) -> bool {
2355 let stem = component
2356 .split('.')
2357 .next()
2358 .unwrap_or(component)
2359 .trim_end_matches([' ', '.'])
2360 .to_ascii_uppercase();
2361 matches!(
2362 stem.as_str(),
2363 "CON"
2364 | "PRN"
2365 | "AUX"
2366 | "NUL"
2367 | "COM1"
2368 | "COM2"
2369 | "COM3"
2370 | "COM4"
2371 | "COM5"
2372 | "COM6"
2373 | "COM7"
2374 | "COM8"
2375 | "COM9"
2376 | "LPT1"
2377 | "LPT2"
2378 | "LPT3"
2379 | "LPT4"
2380 | "LPT5"
2381 | "LPT6"
2382 | "LPT7"
2383 | "LPT8"
2384 | "LPT9"
2385 )
2386}
2387
2388async fn hash_file(path: &Path) -> Result<String> {
2389 hash_file_with_chunks(path, DEFAULT_CHUNK_SIZE)
2390 .await
2391 .map(|(hash, _)| hash)
2392}
2393
2394async fn hash_file_with_chunks(path: &Path, chunk_size: u64) -> Result<(String, Vec<String>)> {
2395 let mut file = tokio::fs::File::open(path)
2396 .await
2397 .map_err(|source| Error::IoPath {
2398 path: path.to_path_buf(),
2399 source,
2400 })?;
2401 let mut hasher = blake3::Hasher::new();
2402 let mut chunk_hasher = blake3::Hasher::new();
2403 let mut chunk_bytes = 0u64;
2404 let mut chunks = Vec::new();
2405 let mut buffer = vec![0; IO_BUFFER_SIZE];
2406
2407 loop {
2408 let read = file
2409 .read(&mut buffer)
2410 .await
2411 .map_err(|source| Error::IoPath {
2412 path: path.to_path_buf(),
2413 source,
2414 })?;
2415 if read == 0 {
2416 break;
2417 }
2418 hasher.update(&buffer[..read]);
2419
2420 let mut remaining = &buffer[..read];
2421 while !remaining.is_empty() {
2422 let take = remaining.len().min((chunk_size - chunk_bytes) as usize);
2423 chunk_hasher.update(&remaining[..take]);
2424 chunk_bytes += take as u64;
2425 remaining = &remaining[take..];
2426 if chunk_bytes == chunk_size {
2427 chunks.push(chunk_hasher.finalize().to_hex().to_string());
2428 chunk_hasher = blake3::Hasher::new();
2429 chunk_bytes = 0;
2430 }
2431 }
2432 }
2433
2434 if chunk_bytes > 0 {
2435 chunks.push(chunk_hasher.finalize().to_hex().to_string());
2436 }
2437
2438 Ok((hasher.finalize().to_hex().to_string(), chunks))
2439}
2440
2441async fn verified_prefix_offset(
2442 path: &Path,
2443 entry: &ManifestEntry,
2444 existing_len: u64,
2445 chunk_size: u64,
2446) -> Result<u64> {
2447 if existing_len == 0 || entry.chunks.is_empty() {
2448 return Ok(0);
2449 }
2450
2451 let chunks_to_check = (existing_len / chunk_size).min(entry.chunks.len() as u64) as usize;
2452 if chunks_to_check == 0 {
2453 return Ok(0);
2454 }
2455
2456 let mut file = tokio::fs::File::open(path)
2457 .await
2458 .map_err(|source| Error::IoPath {
2459 path: path.to_path_buf(),
2460 source,
2461 })?;
2462 let mut buffer = vec![0; IO_BUFFER_SIZE];
2463 let mut verified_chunks = 0usize;
2464
2465 for expected in entry.chunks.iter().take(chunks_to_check) {
2466 let mut remaining = chunk_size;
2467 let mut hasher = blake3::Hasher::new();
2468 while remaining > 0 {
2469 let read_size = buffer.len().min(remaining as usize);
2470 file.read_exact(&mut buffer[..read_size])
2471 .await
2472 .map_err(|source| Error::IoPath {
2473 path: path.to_path_buf(),
2474 source,
2475 })?;
2476 hasher.update(&buffer[..read_size]);
2477 remaining -= read_size as u64;
2478 }
2479
2480 if hasher.finalize().to_hex().as_str() != expected {
2481 break;
2482 }
2483 verified_chunks += 1;
2484 }
2485
2486 Ok(verified_chunks as u64 * chunk_size)
2487}
2488
2489fn summary_from_files(
2490 manifest: &TransferManifest,
2491 files: Vec<TransferFileSummary>,
2492) -> Result<TransferSummary> {
2493 let bytes = manifest.total_file_bytes();
2494 let blake3 = if files.len() == 1 {
2495 files[0].blake3.clone()
2496 } else {
2497 manifest.digest()?
2498 };
2499 let file_name = if files.len() == 1 {
2500 display_path(&files[0].relative_path)
2501 } else {
2502 format!("{} files", files.len())
2503 };
2504 let path = files
2505 .first()
2506 .map(|file| file.path.clone())
2507 .unwrap_or_default();
2508
2509 Ok(TransferSummary {
2510 path,
2511 file_name,
2512 bytes,
2513 blake3,
2514 files,
2515 })
2516}
2517
2518fn temp_path_for(final_path: &Path) -> PathBuf {
2519 let file_name = final_path
2520 .file_name()
2521 .and_then(|name| name.to_str())
2522 .unwrap_or("transfer");
2523 final_path.with_file_name(format!(".{file_name}.{}.ferry-part", Uuid::new_v4()))
2524}
2525
2526fn modified_unix_ms(metadata: &Metadata) -> Option<i128> {
2527 system_time_unix_ms(metadata.modified().ok()?)
2528}
2529
2530fn system_time_unix_ms(time: SystemTime) -> Option<i128> {
2531 match time.duration_since(UNIX_EPOCH) {
2532 Ok(duration) => Some(duration.as_millis() as i128),
2533 Err(error) => Some(-(error.duration().as_millis() as i128)),
2534 }
2535}
2536
2537#[cfg(unix)]
2538fn unix_mode(metadata: &Metadata) -> Option<u32> {
2539 use std::os::unix::fs::PermissionsExt;
2540
2541 Some(metadata.permissions().mode())
2542}
2543
2544#[cfg(not(unix))]
2545fn unix_mode(_metadata: &Metadata) -> Option<u32> {
2546 None
2547}
2548
2549fn client_config_capturing_server_fingerprint()
2550-> Result<(quinn::ClientConfig, Arc<Mutex<Option<String>>>)> {
2551 let server_fingerprint = Arc::new(Mutex::new(None));
2552 let crypto = rustls::ClientConfig::builder()
2553 .dangerous()
2554 .with_custom_certificate_verifier(ServerFingerprintVerifier::new(
2555 server_fingerprint.clone(),
2556 ))
2557 .with_no_client_auth();
2558
2559 let config = quinn::ClientConfig::new(Arc::new(
2560 QuicClientConfig::try_from(crypto)
2561 .map_err(|error| Error::CryptoConfig(error.to_string()))?,
2562 ));
2563 Ok((config, server_fingerprint))
2564}
2565
2566fn captured_server_fingerprint(fingerprint: &Arc<Mutex<Option<String>>>) -> Result<String> {
2567 fingerprint
2568 .lock()
2569 .map_err(|_| Error::CryptoConfig("server fingerprint capture lock poisoned".to_string()))?
2570 .clone()
2571 .ok_or_else(|| {
2572 Error::CryptoConfig("server certificate fingerprint was not captured".to_string())
2573 })
2574}
2575
2576#[derive(Debug)]
2577struct ServerFingerprintVerifier {
2578 provider: Arc<CryptoProvider>,
2579 server_fingerprint: Arc<Mutex<Option<String>>>,
2580}
2581
2582impl ServerFingerprintVerifier {
2583 fn new(server_fingerprint: Arc<Mutex<Option<String>>>) -> Arc<Self> {
2584 Arc::new(Self {
2585 provider: Arc::new(rustls::crypto::ring::default_provider()),
2586 server_fingerprint,
2587 })
2588 }
2589}
2590
2591impl ServerCertVerifier for ServerFingerprintVerifier {
2592 fn verify_server_cert(
2593 &self,
2594 end_entity: &CertificateDer<'_>,
2595 _intermediates: &[CertificateDer<'_>],
2596 _server_name: &ServerName<'_>,
2597 _ocsp: &[u8],
2598 _now: UnixTime,
2599 ) -> std::result::Result<ServerCertVerified, rustls::Error> {
2600 let fingerprint = blake3::hash(end_entity.as_ref()).to_hex().to_string();
2601 *self.server_fingerprint.lock().map_err(|_| {
2602 rustls::Error::General("server fingerprint capture lock poisoned".to_string())
2603 })? = Some(fingerprint);
2604 Ok(ServerCertVerified::assertion())
2605 }
2606
2607 fn verify_tls12_signature(
2608 &self,
2609 message: &[u8],
2610 cert: &CertificateDer<'_>,
2611 dss: &DigitallySignedStruct,
2612 ) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
2613 verify_tls12_signature(
2614 message,
2615 cert,
2616 dss,
2617 &self.provider.signature_verification_algorithms,
2618 )
2619 }
2620
2621 fn verify_tls13_signature(
2622 &self,
2623 message: &[u8],
2624 cert: &CertificateDer<'_>,
2625 dss: &DigitallySignedStruct,
2626 ) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
2627 verify_tls13_signature(
2628 message,
2629 cert,
2630 dss,
2631 &self.provider.signature_verification_algorithms,
2632 )
2633 }
2634
2635 fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
2636 self.provider
2637 .signature_verification_algorithms
2638 .supported_schemes()
2639 }
2640}
2641
2642fn ensure_rustls_provider() {
2643 let _ = rustls::crypto::ring::default_provider().install_default();
2644}
2645
2646pub fn config_dir() -> Result<PathBuf> {
2647 let base_dirs = BaseDirs::new().ok_or(Error::ConfigDirUnavailable)?;
2648 Ok(base_dirs.config_dir().join(CONFIG_DIR_NAME))
2649}
2650
2651fn default_alias() -> String {
2652 std::env::var("HOSTNAME")
2653 .or_else(|_| std::env::var("COMPUTERNAME"))
2654 .ok()
2655 .filter(|value| !value.trim().is_empty())
2656 .unwrap_or_else(|| "fileferry-device".to_string())
2657}
2658
2659fn expand_home_path(path: &str) -> Result<PathBuf> {
2660 if path == "~" {
2661 return Ok(BaseDirs::new()
2662 .ok_or(Error::ConfigDirUnavailable)?
2663 .home_dir()
2664 .to_path_buf());
2665 }
2666
2667 if let Some(rest) = path.strip_prefix("~/") {
2668 return Ok(BaseDirs::new()
2669 .ok_or(Error::ConfigDirUnavailable)?
2670 .home_dir()
2671 .join(rest));
2672 }
2673
2674 Ok(PathBuf::from(path))
2675}
2676
2677fn normalized_fingerprint(fingerprint: String) -> Result<String> {
2678 let fingerprint = fingerprint.trim().to_ascii_lowercase();
2679 if fingerprint.len() != 64 || !fingerprint.chars().all(|char| char.is_ascii_hexdigit()) {
2680 return Err(Error::InvalidInput(
2681 "peer fingerprint must be a full 64-character hex BLAKE3 fingerprint".to_string(),
2682 ));
2683 }
2684
2685 Ok(fingerprint)
2686}
2687
2688fn validate_psk(psk: String) -> Result<String> {
2689 if psk.is_empty() {
2690 return Err(Error::InvalidInput("PSK must not be empty".to_string()));
2691 }
2692
2693 Ok(psk)
2694}
2695
2696fn psk_proof(
2697 psk: &str,
2698 challenge: &str,
2699 client_fingerprint: &str,
2700 protocol_version: u16,
2701) -> Result<String> {
2702 validate_psk(psk.to_string())?;
2703 let key = blake3::hash(psk.as_bytes());
2704 let mut message = Vec::new();
2705 message.extend_from_slice(PSK_PROOF_CONTEXT);
2706 message.push(0);
2707 message.extend_from_slice(challenge.as_bytes());
2708 message.push(0);
2709 message.extend_from_slice(client_fingerprint.as_bytes());
2710 message.push(0);
2711 message.extend_from_slice(protocol_version.to_string().as_bytes());
2712 Ok(blake3::keyed_hash(key.as_bytes(), &message)
2713 .to_hex()
2714 .to_string())
2715}
2716
2717fn write_toml_file<T: Serialize>(path: &Path, value: &T) -> Result<()> {
2718 let bytes =
2719 toml::to_string_pretty(value).map_err(|error| Error::ConfigSerialize(error.to_string()))?;
2720 if let Some(parent) = path.parent() {
2721 std::fs::create_dir_all(parent).map_err(|source| Error::IoPath {
2722 path: parent.to_path_buf(),
2723 source,
2724 })?;
2725 }
2726
2727 let temp_path = path.with_extension(format!("tmp-{}", Uuid::new_v4()));
2728 std::fs::write(&temp_path, bytes).map_err(|source| Error::IoPath {
2729 path: temp_path.clone(),
2730 source,
2731 })?;
2732 std::fs::rename(&temp_path, path).map_err(|source| Error::IoPath {
2733 path: path.to_path_buf(),
2734 source,
2735 })?;
2736 Ok(())
2737}
2738
2739fn write_private_file(path: &Path, bytes: &[u8]) -> Result<()> {
2740 std::fs::write(path, bytes).map_err(|source| Error::IoPath {
2741 path: path.to_path_buf(),
2742 source,
2743 })?;
2744
2745 #[cfg(unix)]
2746 {
2747 use std::os::unix::fs::PermissionsExt;
2748
2749 let mut permissions = std::fs::metadata(path)
2750 .map_err(|source| Error::IoPath {
2751 path: path.to_path_buf(),
2752 source,
2753 })?
2754 .permissions();
2755 permissions.set_mode(0o600);
2756 std::fs::set_permissions(path, permissions).map_err(|source| Error::IoPath {
2757 path: path.to_path_buf(),
2758 source,
2759 })?;
2760 }
2761
2762 Ok(())
2763}
2764
2765#[cfg(test)]
2766mod tests {
2767 use super::*;
2768 use std::sync::{Arc, Mutex};
2769
2770 #[test]
2771 fn direct_peer_parses_socket_address() {
2772 let peer = DirectPeer::parse("127.0.0.1:53318").expect("address parses");
2773
2774 assert_eq!(peer.address().to_string(), "127.0.0.1:53318");
2775 assert_eq!(peer.expected_fingerprint(), None);
2776 }
2777
2778 #[test]
2779 fn direct_peer_accepts_expected_fingerprint() {
2780 let fingerprint = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
2781 let peer = DirectPeer::parse("127.0.0.1:53318")
2782 .expect("address parses")
2783 .with_expected_fingerprint(fingerprint)
2784 .expect("fingerprint parses");
2785
2786 assert_eq!(peer.expected_fingerprint(), Some(fingerprint));
2787 }
2788
2789 #[test]
2790 fn direct_peer_rejects_missing_port() {
2791 assert!(DirectPeer::parse("127.0.0.1").is_err());
2792 }
2793
2794 #[test]
2795 fn send_request_requires_at_least_one_path() {
2796 let peer = DirectPeer::parse("127.0.0.1:53318").expect("address parses");
2797
2798 assert!(SendRequest::new(peer, Vec::new()).is_err());
2799 }
2800
2801 #[test]
2802 fn psk_secret_is_redacted_in_debug_and_serialization() {
2803 let secret = PskSecret::new("do-not-print").expect("psk accepted");
2804
2805 assert_eq!(format!("{secret:?}"), "PskSecret(<redacted>)");
2806 assert_eq!(
2807 serde_json::to_string(&secret).expect("secret serializes"),
2808 r#""<redacted>""#
2809 );
2810
2811 let peer = DirectPeer::parse("127.0.0.1:53318").expect("address parses");
2812 let request = SendRequest::new(peer, vec![PathBuf::from("file.txt")])
2813 .expect("send request")
2814 .with_psk("do-not-print")
2815 .expect("psk accepted");
2816 assert!(!format!("{request:?}").contains("do-not-print"));
2817 }
2818
2819 #[test]
2820 fn validate_send_paths_rejects_empty_slice() {
2821 assert!(validate_send_paths(&[]).is_err());
2822 }
2823
2824 #[tokio::test]
2825 async fn manifest_walks_directory_and_serializes_entries() {
2826 let temp = tempfile::tempdir().expect("tempdir");
2827 let root = temp.path().join("bundle");
2828 let nested = root.join("nested");
2829 let empty = nested.join("empty");
2830 tokio::fs::create_dir_all(&empty).await.expect("dirs");
2831 tokio::fs::write(root.join("a.txt"), b"alpha")
2832 .await
2833 .expect("file a");
2834 tokio::fs::write(root.join("zero.bin"), [])
2835 .await
2836 .expect("zero file");
2837 tokio::fs::write(nested.join("b.txt"), b"beta")
2838 .await
2839 .expect("file b");
2840 tokio::fs::write(nested.join("large.bin"), vec![9; IO_BUFFER_SIZE + 17])
2841 .await
2842 .expect("large file");
2843
2844 let manifest = build_manifest(&[root]).await.expect("manifest");
2845 let json = serde_json::to_string(&manifest).expect("manifest serializes");
2846 let decoded: TransferManifest = serde_json::from_str(&json).expect("manifest decodes");
2847 let relative_paths = decoded
2848 .entries
2849 .iter()
2850 .map(|entry| entry.relative_path.clone())
2851 .collect::<Vec<_>>();
2852
2853 assert_eq!(decoded.version, MANIFEST_VERSION);
2854 assert_eq!(
2855 relative_paths,
2856 vec![
2857 PathBuf::from("bundle"),
2858 PathBuf::from("bundle/a.txt"),
2859 PathBuf::from("bundle/nested"),
2860 PathBuf::from("bundle/nested/b.txt"),
2861 PathBuf::from("bundle/nested/empty"),
2862 PathBuf::from("bundle/nested/large.bin"),
2863 PathBuf::from("bundle/zero.bin"),
2864 ]
2865 );
2866 assert!(decoded.entries.iter().any(|entry| {
2867 entry.kind == ManifestEntryKind::Directory
2868 && entry.relative_path == Path::new("bundle/nested/empty")
2869 }));
2870 assert!(decoded.entries.iter().any(|entry| {
2871 entry.kind == ManifestEntryKind::File
2872 && entry.relative_path == Path::new("bundle/nested/b.txt")
2873 && entry.size == 4
2874 && entry.blake3.is_some()
2875 }));
2876 assert!(decoded.entries.iter().any(|entry| {
2877 entry.kind == ManifestEntryKind::File
2878 && entry.relative_path == Path::new("bundle/zero.bin")
2879 && entry.size == 0
2880 && entry.blake3.is_some()
2881 }));
2882 assert!(decoded.entries.iter().any(|entry| {
2883 entry.kind == ManifestEntryKind::File
2884 && entry.relative_path == Path::new("bundle/nested/large.bin")
2885 && entry.size == IO_BUFFER_SIZE as u64 + 17
2886 && entry.blake3.is_some()
2887 }));
2888 }
2889
2890 #[test]
2891 fn safe_destination_path_rejects_unsafe_paths() {
2892 let dest = Path::new("/tmp/ferry-dest");
2893
2894 assert!(safe_destination_path(dest, Path::new("../escape.txt")).is_err());
2895 assert!(safe_destination_path(dest, Path::new("/absolute.txt")).is_err());
2896 assert!(safe_destination_path(dest, Path::new("nested/CON")).is_err());
2897 assert!(safe_destination_path(dest, Path::new("nested\\escape.txt")).is_err());
2898 assert!(safe_destination_path(dest, Path::new("nested/ok.txt")).is_ok());
2899 }
2900
2901 #[test]
2902 fn safe_destination_path_rejects_generated_escape_and_reserved_patterns() {
2903 let dest = Path::new("/tmp/ferry-dest");
2904 let unsafe_components = [
2905 "..",
2906 ".",
2907 "CON",
2908 "con.txt",
2909 "NUL",
2910 "COM1",
2911 "LPT9",
2912 "trailingspace ",
2913 "trailingdot.",
2914 "has:colon",
2915 "has\\backslash",
2916 ];
2917
2918 for component in unsafe_components {
2919 assert!(
2920 safe_destination_path(dest, Path::new(component)).is_err(),
2921 "{component} should be rejected as a top-level path"
2922 );
2923 if component != "." {
2924 assert!(
2925 safe_destination_path(dest, Path::new("safe").join(component).as_path())
2926 .is_err(),
2927 "{component} should be rejected as a nested path"
2928 );
2929 }
2930 }
2931
2932 for component in ["alpha", "alpha-1", "alpha_1", "report.final.txt"] {
2933 let path = safe_destination_path(dest, Path::new("safe").join(component).as_path())
2934 .expect("safe generated path accepted");
2935 assert!(path.starts_with(dest));
2936 }
2937 }
2938
2939 #[tokio::test]
2940 async fn resume_decision_skips_existing_matching_file() {
2941 let temp = tempfile::tempdir().expect("tempdir");
2942 let source = temp.path().join("source.txt");
2943 let dest = temp.path().join("dest");
2944 tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2945 tokio::fs::write(&source, b"same contents")
2946 .await
2947 .expect("source");
2948 tokio::fs::write(dest.join("source.txt"), b"same contents")
2949 .await
2950 .expect("dest file");
2951 let manifest = build_manifest(&[source]).await.expect("manifest");
2952 let entry = manifest.file_entries().next().expect("file entry");
2953
2954 let decision = decide_resume(&dest, entry, manifest.chunk_size)
2955 .await
2956 .expect("decision");
2957
2958 assert_eq!(decision, ResumeDecision::Skip);
2959 }
2960
2961 #[tokio::test]
2962 async fn resume_decision_returns_verified_chunk_offset() {
2963 let temp = tempfile::tempdir().expect("tempdir");
2964 let source = temp.path().join("large.bin");
2965 let dest = temp.path().join("dest");
2966 tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2967 let bytes = vec![42; (DEFAULT_CHUNK_SIZE as usize * 2) + 128];
2968 tokio::fs::write(&source, &bytes).await.expect("source");
2969 tokio::fs::write(
2970 dest.join("large.bin"),
2971 &bytes[..DEFAULT_CHUNK_SIZE as usize + 99],
2972 )
2973 .await
2974 .expect("partial");
2975 let manifest = build_manifest(&[source]).await.expect("manifest");
2976 let entry = manifest.file_entries().next().expect("file entry");
2977
2978 let decision = decide_resume(&dest, entry, manifest.chunk_size)
2979 .await
2980 .expect("decision");
2981
2982 assert_eq!(decision, ResumeDecision::ResumeFrom(DEFAULT_CHUNK_SIZE));
2983 }
2984
2985 #[tokio::test]
2986 async fn resume_decision_rejects_existing_file_with_mismatched_contents() {
2987 let temp = tempfile::tempdir().expect("tempdir");
2988 let source = temp.path().join("source.txt");
2989 let dest = temp.path().join("dest");
2990 tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2991 tokio::fs::write(&source, b"expected contents")
2992 .await
2993 .expect("source");
2994 tokio::fs::write(dest.join("source.txt"), b"different contents")
2995 .await
2996 .expect("conflicting dest file");
2997 let manifest = build_manifest(&[source]).await.expect("manifest");
2998 let entry = manifest.file_entries().next().expect("file entry");
2999
3000 let decision = decide_resume(&dest, entry, manifest.chunk_size)
3001 .await
3002 .expect("decision");
3003
3004 assert!(matches!(decision, ResumeDecision::Conflict(_)));
3005 }
3006
3007 #[tokio::test]
3008 async fn resume_decision_rejects_existing_file_larger_than_manifest_entry() {
3009 let temp = tempfile::tempdir().expect("tempdir");
3010 let source = temp.path().join("source.txt");
3011 let dest = temp.path().join("dest");
3012 tokio::fs::create_dir_all(&dest).await.expect("dest dir");
3013 tokio::fs::write(&source, b"small").await.expect("source");
3014 tokio::fs::write(dest.join("source.txt"), b"larger destination")
3015 .await
3016 .expect("larger dest file");
3017 let manifest = build_manifest(&[source]).await.expect("manifest");
3018 let entry = manifest.file_entries().next().expect("file entry");
3019
3020 let decision = decide_resume(&dest, entry, manifest.chunk_size)
3021 .await
3022 .expect("decision");
3023
3024 assert!(matches!(decision, ResumeDecision::Conflict(_)));
3025 }
3026
3027 #[tokio::test]
3028 async fn resume_decision_handles_nested_partial_files_safely() {
3029 let temp = tempfile::tempdir().expect("tempdir");
3030 let source_root = temp.path().join("bundle");
3031 let source_nested = source_root.join("nested");
3032 let dest = temp.path().join("dest");
3033 let dest_nested = dest.join("bundle/nested");
3034 tokio::fs::create_dir_all(&source_nested)
3035 .await
3036 .expect("source nested dir");
3037 tokio::fs::create_dir_all(&dest_nested)
3038 .await
3039 .expect("dest nested dir");
3040 let bytes = (0..((DEFAULT_CHUNK_SIZE as usize * 2) + 777))
3041 .map(|index| (index % 251) as u8)
3042 .collect::<Vec<_>>();
3043 let source_file = source_nested.join("large.bin");
3044 tokio::fs::write(&source_file, &bytes)
3045 .await
3046 .expect("source");
3047 let manifest = build_manifest(&[source_root]).await.expect("manifest");
3048 let entry = manifest
3049 .file_entries()
3050 .find(|entry| entry.relative_path == Path::new("bundle/nested/large.bin"))
3051 .expect("nested file entry");
3052
3053 tokio::fs::write(
3054 dest_nested.join("large.bin"),
3055 &bytes[..DEFAULT_CHUNK_SIZE as usize + 99],
3056 )
3057 .await
3058 .expect("matching partial");
3059 let decision = decide_resume(&dest, entry, manifest.chunk_size)
3060 .await
3061 .expect("matching partial decision");
3062 assert_eq!(decision, ResumeDecision::ResumeFrom(DEFAULT_CHUNK_SIZE));
3063
3064 let mut mismatched = bytes[..DEFAULT_CHUNK_SIZE as usize + 99].to_vec();
3065 mismatched[0] ^= 0xff;
3066 tokio::fs::write(dest_nested.join("large.bin"), mismatched)
3067 .await
3068 .expect("mismatched partial");
3069 let decision = decide_resume(&dest, entry, manifest.chunk_size)
3070 .await
3071 .expect("mismatched partial decision");
3072 assert!(matches!(decision, ResumeDecision::Conflict(_)));
3073 }
3074
3075 #[tokio::test]
3076 async fn resume_decision_rejects_same_name_file_directory_conflicts() {
3077 let temp = tempfile::tempdir().expect("tempdir");
3078 let source = temp.path().join("source.txt");
3079 let dest = temp.path().join("dest");
3080 tokio::fs::create_dir_all(&dest).await.expect("dest dir");
3081 tokio::fs::write(&source, b"incoming file")
3082 .await
3083 .expect("source");
3084 tokio::fs::create_dir(dest.join("source.txt"))
3085 .await
3086 .expect("conflicting dest dir");
3087 let manifest = build_manifest(&[source]).await.expect("manifest");
3088 let entry = manifest.file_entries().next().expect("file entry");
3089
3090 let decision = decide_resume(&dest, entry, manifest.chunk_size)
3091 .await
3092 .expect("decision");
3093
3094 assert!(matches!(decision, ResumeDecision::Conflict(_)));
3095 }
3096
3097 #[test]
3098 fn identity_is_loaded_after_generation() {
3099 let temp = tempfile::tempdir().expect("tempdir");
3100 let generated =
3101 NativeIdentity::load_or_generate_in(temp.path()).expect("identity generated");
3102 let loaded = NativeIdentity::load_or_generate_in(temp.path()).expect("identity loaded");
3103
3104 assert_eq!(generated.fingerprint(), loaded.fingerprint());
3105 }
3106
3107 #[test]
3108 fn app_config_round_trips_toml() {
3109 let temp = tempfile::tempdir().expect("tempdir");
3110 let path = temp.path().join("config.toml");
3111 let config = AppConfig {
3112 alias: "desk".to_string(),
3113 ..AppConfig::default()
3114 };
3115
3116 config.save_to_path(&path).expect("config saved");
3117 let loaded = AppConfig::load_or_default_from(&path).expect("config loaded");
3118
3119 assert_eq!(loaded.alias, "desk");
3120 assert_eq!(loaded.listen_port, 53317);
3121 assert!(loaded.trust.require_fingerprint);
3122 }
3123
3124 #[test]
3125 fn app_config_redacts_psk_for_display_output() {
3126 let config = AppConfig {
3127 trust: TrustConfig {
3128 psk: "super-secret-psk".to_string(),
3129 ..TrustConfig::default()
3130 },
3131 ..AppConfig::default()
3132 };
3133
3134 let redacted = config.to_redacted_toml_string().expect("config serializes");
3135
3136 assert!(!redacted.contains("super-secret-psk"));
3137 assert!(redacted.contains(r#"psk = "<redacted>""#));
3138 }
3139
3140 #[test]
3141 fn daemon_config_defaults_from_app_config_and_round_trips_toml() {
3142 let temp = tempfile::tempdir().expect("tempdir");
3143 let path = temp.path().join("daemon.toml");
3144 let app_config = AppConfig {
3145 quic_port: 54444,
3146 download_dir: temp.path().join("downloads").display().to_string(),
3147 ..AppConfig::default()
3148 };
3149
3150 let defaulted =
3151 DaemonConfig::load_or_default_from(&path, &app_config).expect("daemon config");
3152 assert_eq!(
3153 defaulted.listen,
3154 SocketAddr::from(([0, 0, 0, 0], app_config.quic_port))
3155 );
3156 assert_eq!(defaulted.destination, temp.path().join("downloads"));
3157
3158 let config = DaemonConfig {
3159 listen: SocketAddr::from(([127, 0, 0, 1], 53318)),
3160 destination: temp.path().join("daemon-dest"),
3161 };
3162 config.save_to_path(&path).expect("daemon config saved");
3163 let loaded =
3164 DaemonConfig::load_or_default_from(&path, &app_config).expect("daemon config loaded");
3165
3166 assert_eq!(loaded, config);
3167 }
3168
3169 #[test]
3170 fn trust_store_load_save_and_forget_round_trip() {
3171 let temp = tempfile::tempdir().expect("tempdir");
3172 let path = temp.path().join("known_peers.toml");
3173 let fingerprint = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
3174 let mut store = TrustStore::default();
3175
3176 store
3177 .trust_fingerprint(fingerprint)
3178 .expect("fingerprint trusted");
3179 store.save_to_path(&path).expect("trust store saved");
3180 let mut loaded = TrustStore::load_or_default_from(&path).expect("trust store loaded");
3181
3182 assert_eq!(loaded.records().count(), 1);
3183 assert_eq!(loaded.peers[fingerprint].trust_state, TrustState::Trusted);
3184 assert!(loaded.forget(fingerprint));
3185 assert_eq!(loaded.records().count(), 0);
3186 }
3187
3188 #[test]
3189 fn trust_store_rejects_short_fingerprint_without_discovery_lookup() {
3190 let mut store = TrustStore::default();
3191
3192 assert!(store.trust_fingerprint("9a4f2c").is_err());
3193 }
3194
3195 #[test]
3196 fn trust_store_pins_direct_address_to_fingerprint() {
3197 let fingerprint = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
3198 let address = SocketAddr::from(([192, 168, 1, 42], 53318));
3199 let mut store = TrustStore::default();
3200
3201 store
3202 .trust_direct_address(fingerprint, address)
3203 .expect("direct address trusted");
3204
3205 assert!(store.is_trusted_fingerprint(fingerprint));
3206 assert_eq!(
3207 store.trusted_fingerprint_for_address(address),
3208 Some(fingerprint)
3209 );
3210 assert!(store.peers[fingerprint].addresses.contains(&address));
3211 assert!(store.peers[fingerprint].transports.contains("quic"));
3212 assert_eq!(store.peers[fingerprint].trust_state, TrustState::Trusted);
3213 }
3214
3215 #[test]
3216 fn transfer_events_serialize_with_stable_event_names() {
3217 let event = TransferEvent::Progress {
3218 direction: TransferDirection::Send,
3219 file_name: "bundle/a.txt".to_string(),
3220 bytes_done: 5,
3221 bytes_total: 9,
3222 };
3223
3224 let json = serde_json::to_value(&event).expect("event serializes");
3225
3226 assert_eq!(json["event"], "progress");
3227 assert_eq!(json["direction"], "send");
3228 assert_eq!(json["file_name"], "bundle/a.txt");
3229 assert_eq!(json["bytes_done"], 5);
3230 assert_eq!(json["bytes_total"], 9);
3231 }
3232
3233 #[test]
3234 fn direct_protocol_hello_has_stable_golden_json() {
3235 let hello = DirectProtocolHello {
3236 protocol_version: 1,
3237 min_protocol_version: 1,
3238 client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3239 .to_string(),
3240 psk: false,
3241 };
3242
3243 let json = serde_json::to_string(&hello).expect("hello serializes");
3244
3245 assert_eq!(
3246 json,
3247 r#"{"protocol_version":1,"min_protocol_version":1,"client_fingerprint":"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef","psk":false}"#
3248 );
3249 }
3250
3251 #[test]
3252 fn direct_protocol_response_has_stable_golden_json() {
3253 let response = DirectProtocolResponse::accept(1);
3254
3255 let json = serde_json::to_string(&response).expect("response serializes");
3256
3257 assert_eq!(
3258 json,
3259 r#"{"accepted":true,"protocol_version":1,"psk_challenge":null,"error":null}"#
3260 );
3261 }
3262
3263 #[test]
3264 fn direct_protocol_decodes_pre_psk_negotiation_frames() {
3265 let hello: DirectProtocolHello = serde_json::from_str(
3266 r#"{"protocol_version":1,"min_protocol_version":1,"client_fingerprint":"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"}"#,
3267 )
3268 .expect("old hello decodes");
3269 assert!(!hello.psk);
3270
3271 let response: DirectProtocolResponse =
3272 serde_json::from_str(r#"{"accepted":true,"protocol_version":1,"error":null}"#)
3273 .expect("old response decodes");
3274 assert_eq!(response, DirectProtocolResponse::accept(1));
3275 }
3276
3277 #[test]
3278 fn direct_protocol_negotiates_supported_overlap() {
3279 let hello = DirectProtocolHello {
3280 protocol_version: 3,
3281 min_protocol_version: 1,
3282 client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3283 .to_string(),
3284 psk: false,
3285 };
3286
3287 let response = negotiate_direct_protocol(&hello, None);
3288
3289 assert_eq!(response, DirectProtocolResponse::accept(1));
3290 }
3291
3292 #[test]
3293 fn direct_protocol_rejects_incompatible_versions() {
3294 let too_new = DirectProtocolHello {
3295 protocol_version: 3,
3296 min_protocol_version: 2,
3297 client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3298 .to_string(),
3299 psk: false,
3300 };
3301 let too_old = DirectProtocolHello {
3302 protocol_version: 0,
3303 min_protocol_version: 0,
3304 client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3305 .to_string(),
3306 psk: false,
3307 };
3308
3309 assert!(!negotiate_direct_protocol(&too_new, None).accepted);
3310 assert!(!negotiate_direct_protocol(&too_old, None).accepted);
3311 }
3312
3313 #[test]
3314 fn direct_protocol_requires_matching_psk_mode() {
3315 let no_psk = DirectProtocolHello {
3316 protocol_version: 1,
3317 min_protocol_version: 1,
3318 client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3319 .to_string(),
3320 psk: false,
3321 };
3322 let with_psk = DirectProtocolHello {
3323 psk: true,
3324 ..no_psk.clone()
3325 };
3326
3327 assert!(!negotiate_direct_protocol(&no_psk, Some("secret")).accepted);
3328 assert!(!negotiate_direct_protocol(&with_psk, None).accepted);
3329
3330 let accepted = negotiate_direct_protocol(&with_psk, Some("secret"));
3331 assert!(accepted.accepted);
3332 assert!(accepted.psk_challenge.is_some());
3333 }
3334
3335 #[test]
3336 fn transfer_manifest_has_stable_golden_json() {
3337 let manifest = TransferManifest {
3338 version: MANIFEST_VERSION,
3339 session_id: Uuid::parse_str("00000000-0000-0000-0000-000000000001")
3340 .expect("uuid parses"),
3341 chunk_size: DEFAULT_CHUNK_SIZE,
3342 entries: vec![ManifestEntry {
3343 relative_path: PathBuf::from("bundle/a.txt"),
3344 kind: ManifestEntryKind::File,
3345 size: 5,
3346 blake3: Some(
3347 "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef".to_string(),
3348 ),
3349 chunks: vec![
3350 "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789".to_string(),
3351 ],
3352 unix_mode: Some(0o100644),
3353 modified_unix_ms: Some(1_700_000_000_000),
3354 }],
3355 };
3356
3357 let json = serde_json::to_string(&manifest).expect("manifest serializes");
3358
3359 assert_eq!(
3360 json,
3361 r#"{"version":1,"session_id":"00000000-0000-0000-0000-000000000001","chunk_size":1048576,"entries":[{"relative_path":"bundle/a.txt","kind":"file","size":5,"blake3":"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef","chunks":["abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789"],"unix_mode":33188,"modified_unix_ms":1700000000000}]}"#
3362 );
3363 }
3364
3365 #[test]
3366 fn peer_registry_merges_observations_by_fingerprint() {
3367 let mut registry = PeerRegistry::new();
3368 let fingerprint = "abcdef1234567890";
3369 let first = PeerObservation::new(
3370 fingerprint,
3371 SocketAddr::from(([192, 168, 1, 10], 53318)),
3372 100,
3373 )
3374 .with_alias("desk")
3375 .with_hostname("desk.local")
3376 .with_transport("quic");
3377 let second = PeerObservation::new(
3378 fingerprint,
3379 SocketAddr::from(([192, 168, 1, 11], 53318)),
3380 200,
3381 )
3382 .with_alias("workstation")
3383 .with_transport("quic");
3384
3385 registry.observe(first).expect("first observation");
3386 registry.observe(second).expect("second observation");
3387 let record = registry
3388 .get_by_fingerprint(fingerprint)
3389 .expect("record by fingerprint");
3390
3391 assert_eq!(registry.records().count(), 1);
3392 assert!(record.aliases.contains("desk"));
3393 assert!(record.aliases.contains("workstation"));
3394 assert!(record.hostnames.contains("desk.local"));
3395 assert_eq!(record.addresses.len(), 2);
3396 assert_eq!(record.first_seen_unix_ms, 100);
3397 assert_eq!(record.last_seen_unix_ms, 200);
3398 }
3399
3400 #[test]
3401 fn peer_registry_looks_up_unique_fingerprint_prefix_alias_and_hostname() {
3402 let mut registry = PeerRegistry::new();
3403 registry
3404 .observe(
3405 PeerObservation::new(
3406 "abcdef1234567890",
3407 SocketAddr::from(([192, 168, 1, 10], 53318)),
3408 100,
3409 )
3410 .with_alias("desk")
3411 .with_hostname("desk.local"),
3412 )
3413 .expect("first observation");
3414 registry
3415 .observe(
3416 PeerObservation::new(
3417 "123456abcdef7890",
3418 SocketAddr::from(([192, 168, 1, 20], 53318)),
3419 100,
3420 )
3421 .with_alias("laptop"),
3422 )
3423 .expect("second observation");
3424
3425 assert_eq!(
3426 registry
3427 .lookup("abcdef")
3428 .map(|record| record.fingerprint.as_str()),
3429 Some("abcdef1234567890")
3430 );
3431 assert_eq!(
3432 registry
3433 .lookup("desk")
3434 .map(|record| record.fingerprint.as_str()),
3435 Some("abcdef1234567890")
3436 );
3437 assert_eq!(
3438 registry
3439 .lookup("desk.local")
3440 .map(|record| record.fingerprint.as_str()),
3441 Some("abcdef1234567890")
3442 );
3443 assert!(registry.lookup("missing").is_none());
3444 }
3445
3446 #[test]
3447 fn peer_registry_rejects_ambiguous_lookup_hints() {
3448 let mut registry = PeerRegistry::new();
3449 registry
3450 .observe(
3451 PeerObservation::new(
3452 "abcdef1234567890",
3453 SocketAddr::from(([192, 168, 1, 10], 53318)),
3454 100,
3455 )
3456 .with_alias("desk"),
3457 )
3458 .expect("first observation");
3459 registry
3460 .observe(
3461 PeerObservation::new(
3462 "abcdef9999999999",
3463 SocketAddr::from(([192, 168, 1, 11], 53318)),
3464 100,
3465 )
3466 .with_alias("desk"),
3467 )
3468 .expect("second observation");
3469
3470 assert!(registry.lookup("abcdef").is_none());
3471 assert!(registry.lookup("desk").is_none());
3472 assert_eq!(
3473 registry
3474 .lookup("abcdef1234567890")
3475 .map(|record| record.fingerprint.as_str()),
3476 Some("abcdef1234567890")
3477 );
3478 match registry.lookup_detail("desk") {
3479 PeerLookup::Ambiguous(records) => assert_eq!(records.len(), 2),
3480 other => panic!("expected ambiguous duplicate-alias lookup, got {other:?}"),
3481 }
3482 assert!(matches!(
3483 registry.lookup_detail("missing"),
3484 PeerLookup::Missing
3485 ));
3486 }
3487
3488 #[test]
3489 fn native_announcement_builds_expected_txt_properties() {
3490 let announcement = NativeAnnouncement {
3491 alias: "Stephen MBP".to_string(),
3492 fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3493 .to_string(),
3494 quic_port: 53318,
3495 listen_port: Some(53317),
3496 };
3497
3498 let service = announcement.service_info().expect("service info");
3499
3500 assert_eq!(service.get_type(), NATIVE_SERVICE_TYPE);
3501 assert!(
3502 service
3503 .get_fullname()
3504 .starts_with("stephen-mbp-0123456789ab.")
3505 );
3506 assert_eq!(service.get_property_val_str("pv"), Some("1"));
3507 assert_eq!(service.get_property_val_str("minpv"), Some("1"));
3508 assert_eq!(
3509 service.get_property_val_str("fp"),
3510 Some("0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")
3511 );
3512 assert_eq!(service.get_property_val_str("alias"), Some("Stephen MBP"));
3513 assert_eq!(service.get_property_val_str("transports"), Some("quic"));
3514 assert_eq!(service.get_property_val_str("quic_port"), Some("53318"));
3515 assert_eq!(service.get_property_val_str("listen_port"), Some("53317"));
3516 }
3517
3518 #[test]
3519 fn resolved_native_service_merges_into_registry() {
3520 let properties = [
3521 ("pv", "1"),
3522 ("minpv", "1"),
3523 (
3524 "fp",
3525 "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
3526 ),
3527 ("alias", "desk"),
3528 ("transports", "quic"),
3529 ("quic_port", "53318"),
3530 ];
3531 let service = ServiceInfo::new(
3532 NATIVE_SERVICE_TYPE,
3533 "desk-0123456789ab",
3534 "desk.local.",
3535 "192.168.1.42",
3536 53318,
3537 &properties[..],
3538 )
3539 .expect("service info")
3540 .as_resolved_service();
3541 let mut registry = PeerRegistry::new();
3542
3543 observe_resolved_service(&mut registry, &service).expect("observation");
3544
3545 let record = registry.lookup("desk").expect("record by alias");
3546 assert_eq!(
3547 record.fingerprint,
3548 "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3549 );
3550 assert_eq!(
3551 record.preferred_quic_address(),
3552 Some(SocketAddr::from(([192, 168, 1, 42], 53318)))
3553 );
3554 }
3555
3556 #[test]
3557 fn resolved_native_service_ignores_incompatible_protocol_ranges() {
3558 let properties = [
3559 ("pv", "3"),
3560 ("minpv", "2"),
3561 (
3562 "fp",
3563 "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
3564 ),
3565 ("alias", "desk"),
3566 ("transports", "quic"),
3567 ("quic_port", "53318"),
3568 ];
3569 let service = ServiceInfo::new(
3570 NATIVE_SERVICE_TYPE,
3571 "desk-0123456789ab",
3572 "desk.local.",
3573 "192.168.1.42",
3574 53318,
3575 &properties[..],
3576 )
3577 .expect("service info")
3578 .as_resolved_service();
3579 let mut registry = PeerRegistry::new();
3580
3581 observe_resolved_service(&mut registry, &service).expect("observation");
3582
3583 assert_eq!(registry.records().count(), 0);
3584 }
3585
3586 #[tokio::test]
3587 async fn direct_quic_loopback_sends_one_file() {
3588 let source_dir = tempfile::tempdir().expect("source tempdir");
3589 let dest_dir = tempfile::tempdir().expect("dest tempdir");
3590 let identity_dir = tempfile::tempdir().expect("identity tempdir");
3591 let file_path = source_dir.path().join("hello.txt");
3592 tokio::fs::write(&file_path, b"hello from ferry")
3593 .await
3594 .expect("source file written");
3595
3596 let identity =
3597 NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3598 let recv_identity = identity.clone();
3599 let reserved_socket =
3600 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3601 let recv_addr = reserved_socket.local_addr().expect("local addr");
3602 drop(reserved_socket);
3603 let receive_progress = Arc::new(Mutex::new(Vec::new()));
3604 let receive_progress_log = receive_progress.clone();
3605 let dest_path = dest_dir.path().to_path_buf();
3606 let server = tokio::spawn(async move {
3607 receive_direct_file(
3608 &ReceiveRequest::new(recv_addr),
3609 dest_path,
3610 &recv_identity,
3611 |event| {
3612 receive_progress_log
3613 .lock()
3614 .expect("progress lock")
3615 .push(event);
3616 },
3617 )
3618 .await
3619 });
3620
3621 tokio::time::sleep(Duration::from_millis(100)).await;
3622
3623 let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3624 let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
3625 let send_progress = Arc::new(Mutex::new(Vec::new()));
3626 let send_progress_log = send_progress.clone();
3627 let send_summary = send_direct_file(&send_request, &identity, |event| {
3628 send_progress_log.lock().expect("progress lock").push(event);
3629 })
3630 .await
3631 .expect("file sent");
3632 let receive_summary = server.await.expect("server joined").expect("file received");
3633
3634 let received = tokio::fs::read(dest_dir.path().join("hello.txt"))
3635 .await
3636 .expect("received file readable");
3637 assert_eq!(received, b"hello from ferry");
3638 assert_eq!(send_summary.bytes, receive_summary.bytes);
3639 assert_eq!(send_summary.blake3, receive_summary.blake3);
3640 let send_progress = send_progress.lock().expect("progress lock");
3641 let receive_progress = receive_progress.lock().expect("progress lock");
3642 assert!(matches!(
3643 send_progress.first(),
3644 Some(TransferEvent::SessionStarted {
3645 direction: TransferDirection::Send,
3646 ..
3647 })
3648 ));
3649 assert!(
3650 send_progress
3651 .iter()
3652 .any(|event| matches!(event, TransferEvent::Progress { .. }))
3653 );
3654 assert!(
3655 send_progress
3656 .iter()
3657 .any(|event| matches!(event, TransferEvent::SessionFinished { .. }))
3658 );
3659 assert!(matches!(
3660 receive_progress.first(),
3661 Some(TransferEvent::SessionStarted {
3662 direction: TransferDirection::Receive,
3663 ..
3664 })
3665 ));
3666 assert!(
3667 receive_progress
3668 .iter()
3669 .any(|event| matches!(event, TransferEvent::Progress { .. }))
3670 );
3671 assert!(
3672 receive_progress
3673 .iter()
3674 .any(|event| matches!(event, TransferEvent::SessionFinished { .. }))
3675 );
3676 }
3677
3678 #[tokio::test]
3679 async fn direct_transfer_accepts_expected_receiver_fingerprint() {
3680 let source_dir = tempfile::tempdir().expect("source tempdir");
3681 let dest_dir = tempfile::tempdir().expect("dest tempdir");
3682 let sender_identity_dir = tempfile::tempdir().expect("sender identity tempdir");
3683 let receiver_identity_dir = tempfile::tempdir().expect("receiver identity tempdir");
3684 let file_path = source_dir.path().join("hello.txt");
3685 tokio::fs::write(&file_path, b"hello from ferry")
3686 .await
3687 .expect("source file written");
3688
3689 let sender_identity = NativeIdentity::load_or_generate_in(sender_identity_dir.path())
3690 .expect("sender identity");
3691 let receiver_identity = NativeIdentity::load_or_generate_in(receiver_identity_dir.path())
3692 .expect("receiver identity");
3693 let expected_fingerprint = receiver_identity.fingerprint().to_string();
3694 let reserved_socket =
3695 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3696 let recv_addr = reserved_socket.local_addr().expect("local addr");
3697 drop(reserved_socket);
3698 let dest_path = dest_dir.path().to_path_buf();
3699 let server = tokio::spawn(async move {
3700 receive_direct_file(
3701 &ReceiveRequest::new(recv_addr),
3702 dest_path,
3703 &receiver_identity,
3704 |_| {},
3705 )
3706 .await
3707 });
3708
3709 tokio::time::sleep(Duration::from_millis(100)).await;
3710
3711 let peer = DirectPeer::from_address(recv_addr)
3712 .with_expected_fingerprint(expected_fingerprint)
3713 .expect("expected fingerprint accepted");
3714 let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
3715 let send_summary = send_direct_file(&send_request, &sender_identity, |_| {})
3716 .await
3717 .expect("file sent");
3718 let receive_summary = server.await.expect("server joined").expect("file received");
3719
3720 let received = tokio::fs::read(dest_dir.path().join("hello.txt"))
3721 .await
3722 .expect("received file readable");
3723 assert_eq!(received, b"hello from ferry");
3724 assert_eq!(send_summary.bytes, receive_summary.bytes);
3725 assert_eq!(send_summary.blake3, receive_summary.blake3);
3726 }
3727
3728 #[tokio::test]
3729 async fn direct_transfer_confirms_unpinned_receiver_before_payload() {
3730 let source_dir = tempfile::tempdir().expect("source tempdir");
3731 let dest_dir = tempfile::tempdir().expect("dest tempdir");
3732 let sender_identity_dir = tempfile::tempdir().expect("sender identity tempdir");
3733 let receiver_identity_dir = tempfile::tempdir().expect("receiver identity tempdir");
3734 let file_path = source_dir.path().join("hello.txt");
3735 tokio::fs::write(&file_path, b"hello from ferry")
3736 .await
3737 .expect("source file written");
3738
3739 let sender_identity = NativeIdentity::load_or_generate_in(sender_identity_dir.path())
3740 .expect("sender identity");
3741 let receiver_identity = NativeIdentity::load_or_generate_in(receiver_identity_dir.path())
3742 .expect("receiver identity");
3743 let expected_fingerprint = receiver_identity.fingerprint().to_string();
3744 let reserved_socket =
3745 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3746 let recv_addr = reserved_socket.local_addr().expect("local addr");
3747 drop(reserved_socket);
3748 let dest_path = dest_dir.path().to_path_buf();
3749 let server = tokio::spawn(async move {
3750 receive_direct_file(
3751 &ReceiveRequest::new(recv_addr),
3752 dest_path,
3753 &receiver_identity,
3754 |_| {},
3755 )
3756 .await
3757 });
3758
3759 tokio::time::sleep(Duration::from_millis(100)).await;
3760
3761 let peer = DirectPeer::from_address(recv_addr);
3762 let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
3763 let callback_seen = Arc::new(AtomicBool::new(false));
3764 let callback_seen_send = callback_seen.clone();
3765 let send_summary = send_direct_file_with_control_and_trust(
3766 &send_request,
3767 &sender_identity,
3768 TransferControl::new(),
3769 |_| {},
3770 |fingerprint| {
3771 callback_seen_send.store(true, Ordering::SeqCst);
3772 assert_eq!(fingerprint, expected_fingerprint);
3773 Ok(())
3774 },
3775 )
3776 .await
3777 .expect("file sent after trust callback");
3778 let receive_summary = server.await.expect("server joined").expect("file received");
3779
3780 assert!(callback_seen.load(Ordering::SeqCst));
3781 assert_eq!(send_summary.bytes, receive_summary.bytes);
3782 assert_eq!(send_summary.blake3, receive_summary.blake3);
3783 }
3784
3785 #[tokio::test]
3786 async fn direct_transfer_rejects_unexpected_receiver_fingerprint_before_payload() {
3787 let source_dir = tempfile::tempdir().expect("source tempdir");
3788 let dest_dir = tempfile::tempdir().expect("dest tempdir");
3789 let sender_identity_dir = tempfile::tempdir().expect("sender identity tempdir");
3790 let receiver_identity_dir = tempfile::tempdir().expect("receiver identity tempdir");
3791 let other_identity_dir = tempfile::tempdir().expect("other identity tempdir");
3792 let file_path = source_dir.path().join("hello.txt");
3793 tokio::fs::write(&file_path, b"hello from ferry")
3794 .await
3795 .expect("source file written");
3796
3797 let sender_identity = NativeIdentity::load_or_generate_in(sender_identity_dir.path())
3798 .expect("sender identity");
3799 let receiver_identity = NativeIdentity::load_or_generate_in(receiver_identity_dir.path())
3800 .expect("receiver identity");
3801 let other_identity =
3802 NativeIdentity::load_or_generate_in(other_identity_dir.path()).expect("other identity");
3803 let reserved_socket =
3804 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3805 let recv_addr = reserved_socket.local_addr().expect("local addr");
3806 drop(reserved_socket);
3807 let dest_path = dest_dir.path().to_path_buf();
3808 let server = tokio::spawn(async move {
3809 receive_direct_file(
3810 &ReceiveRequest::new(recv_addr),
3811 dest_path,
3812 &receiver_identity,
3813 |_| {},
3814 )
3815 .await
3816 });
3817
3818 tokio::time::sleep(Duration::from_millis(100)).await;
3819
3820 let peer = DirectPeer::from_address(recv_addr)
3821 .with_expected_fingerprint(other_identity.fingerprint().to_string())
3822 .expect("expected fingerprint accepted");
3823 let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
3824 let error = send_direct_file(&send_request, &sender_identity, |_| {})
3825 .await
3826 .expect_err("fingerprint mismatch should reject send");
3827
3828 assert!(matches!(error, Error::PeerFingerprintMismatch { .. }));
3829 let server_error = server
3830 .await
3831 .expect("server joined")
3832 .expect_err("server closed");
3833 assert!(matches!(
3834 server_error,
3835 Error::Connection(_) | Error::NoIncomingConnection
3836 ));
3837 assert!(
3838 !tokio::fs::try_exists(dest_dir.path().join("hello.txt"))
3839 .await
3840 .expect("destination checked")
3841 );
3842 }
3843
3844 #[tokio::test]
3845 async fn direct_transfer_accepts_matching_psk_before_payload() {
3846 let source_dir = tempfile::tempdir().expect("source tempdir");
3847 let dest_dir = tempfile::tempdir().expect("dest tempdir");
3848 let identity_dir = tempfile::tempdir().expect("identity tempdir");
3849 let file_path = source_dir.path().join("hello.txt");
3850 tokio::fs::write(&file_path, b"hello from ferry")
3851 .await
3852 .expect("source file written");
3853
3854 let identity =
3855 NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3856 let recv_identity = identity.clone();
3857 let reserved_socket =
3858 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3859 let recv_addr = reserved_socket.local_addr().expect("local addr");
3860 drop(reserved_socket);
3861 let dest_path = dest_dir.path().to_path_buf();
3862 let receive_request = ReceiveRequest::new(recv_addr)
3863 .with_psk("correct horse battery staple")
3864 .expect("receiver psk accepted");
3865 let server = tokio::spawn(async move {
3866 receive_direct_file(&receive_request, dest_path, &recv_identity, |_| {}).await
3867 });
3868
3869 tokio::time::sleep(Duration::from_millis(100)).await;
3870
3871 let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3872 let send_request = SendRequest::new(peer, vec![file_path])
3873 .expect("send request is valid")
3874 .with_psk("correct horse battery staple")
3875 .expect("sender psk accepted");
3876 let send_summary = send_direct_file(&send_request, &identity, |_| {})
3877 .await
3878 .expect("file sent");
3879 let receive_summary = server.await.expect("server joined").expect("file received");
3880
3881 let received = tokio::fs::read(dest_dir.path().join("hello.txt"))
3882 .await
3883 .expect("received file readable");
3884 assert_eq!(received, b"hello from ferry");
3885 assert_eq!(send_summary.bytes, receive_summary.bytes);
3886 }
3887
3888 #[tokio::test]
3889 async fn direct_transfer_rejects_missing_psk_before_payload() {
3890 let source_dir = tempfile::tempdir().expect("source tempdir");
3891 let dest_dir = tempfile::tempdir().expect("dest tempdir");
3892 let identity_dir = tempfile::tempdir().expect("identity tempdir");
3893 let file_path = source_dir.path().join("hello.txt");
3894 tokio::fs::write(&file_path, b"hello from ferry")
3895 .await
3896 .expect("source file written");
3897
3898 let identity =
3899 NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3900 let recv_identity = identity.clone();
3901 let reserved_socket =
3902 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3903 let recv_addr = reserved_socket.local_addr().expect("local addr");
3904 drop(reserved_socket);
3905 let dest_path = dest_dir.path().to_path_buf();
3906 let receive_request = ReceiveRequest::new(recv_addr)
3907 .with_psk("receiver secret")
3908 .expect("receiver psk accepted");
3909 let server = tokio::spawn(async move {
3910 receive_direct_file(&receive_request, dest_path, &recv_identity, |_| {}).await
3911 });
3912
3913 tokio::time::sleep(Duration::from_millis(100)).await;
3914
3915 let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3916 let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
3917 let error = send_direct_file(&send_request, &identity, |_| {})
3918 .await
3919 .expect_err("missing psk should reject send");
3920
3921 assert!(matches!(error, Error::Protocol(_)));
3922 let _ = server.await.expect("server joined");
3923 assert!(
3924 !tokio::fs::try_exists(dest_dir.path().join("hello.txt"))
3925 .await
3926 .expect("destination checked")
3927 );
3928 }
3929
3930 #[tokio::test]
3931 async fn direct_transfer_rejects_wrong_psk_before_payload() {
3932 let source_dir = tempfile::tempdir().expect("source tempdir");
3933 let dest_dir = tempfile::tempdir().expect("dest tempdir");
3934 let identity_dir = tempfile::tempdir().expect("identity tempdir");
3935 let file_path = source_dir.path().join("hello.txt");
3936 tokio::fs::write(&file_path, b"hello from ferry")
3937 .await
3938 .expect("source file written");
3939
3940 let identity =
3941 NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3942 let recv_identity = identity.clone();
3943 let reserved_socket =
3944 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3945 let recv_addr = reserved_socket.local_addr().expect("local addr");
3946 drop(reserved_socket);
3947 let dest_path = dest_dir.path().to_path_buf();
3948 let receive_request = ReceiveRequest::new(recv_addr)
3949 .with_psk("receiver secret")
3950 .expect("receiver psk accepted");
3951 let server = tokio::spawn(async move {
3952 receive_direct_file(&receive_request, dest_path, &recv_identity, |_| {}).await
3953 });
3954
3955 tokio::time::sleep(Duration::from_millis(100)).await;
3956
3957 let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3958 let send_request = SendRequest::new(peer, vec![file_path])
3959 .expect("send request is valid")
3960 .with_psk("wrong secret")
3961 .expect("sender psk accepted");
3962 let error = send_direct_file(&send_request, &identity, |_| {})
3963 .await
3964 .expect_err("wrong psk should reject send");
3965
3966 assert!(matches!(error, Error::Authentication(_)));
3967 assert!(!error.to_string().contains("wrong secret"));
3968 assert!(!error.to_string().contains("receiver secret"));
3969 let server_error = server
3970 .await
3971 .expect("server joined")
3972 .expect_err("server should reject wrong psk");
3973 assert!(matches!(
3974 server_error,
3975 Error::Authentication(_) | Error::Write(_)
3976 ));
3977 assert!(
3978 !tokio::fs::try_exists(dest_dir.path().join("hello.txt"))
3979 .await
3980 .expect("destination checked")
3981 );
3982 }
3983
3984 #[tokio::test]
3985 async fn direct_quic_loopback_sends_directory_and_resumes_partial_file() {
3986 let source_dir = tempfile::tempdir().expect("source tempdir");
3987 let dest_dir = tempfile::tempdir().expect("dest tempdir");
3988 let identity_dir = tempfile::tempdir().expect("identity tempdir");
3989 let bundle = source_dir.path().join("bundle");
3990 let nested = bundle.join("nested");
3991 tokio::fs::create_dir_all(&nested)
3992 .await
3993 .expect("source dirs");
3994 tokio::fs::write(nested.join("small.txt"), b"small")
3995 .await
3996 .expect("small file");
3997 let large_bytes = vec![7; (DEFAULT_CHUNK_SIZE as usize * 2) + 17];
3998 tokio::fs::write(bundle.join("large.bin"), &large_bytes)
3999 .await
4000 .expect("large file");
4001
4002 let dest_bundle = dest_dir.path().join("bundle");
4003 tokio::fs::create_dir_all(&dest_bundle)
4004 .await
4005 .expect("dest bundle");
4006 tokio::fs::write(
4007 dest_bundle.join("large.bin"),
4008 &large_bytes[..DEFAULT_CHUNK_SIZE as usize + 5],
4009 )
4010 .await
4011 .expect("partial large");
4012
4013 let identity =
4014 NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
4015 let recv_identity = identity.clone();
4016 let reserved_socket =
4017 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
4018 let recv_addr = reserved_socket.local_addr().expect("local addr");
4019 drop(reserved_socket);
4020 let dest_path = dest_dir.path().to_path_buf();
4021 let server = tokio::spawn(async move {
4022 receive_direct_file(
4023 &ReceiveRequest::new(recv_addr),
4024 dest_path,
4025 &recv_identity,
4026 |_| {},
4027 )
4028 .await
4029 });
4030
4031 tokio::time::sleep(Duration::from_millis(100)).await;
4032
4033 let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
4034 let send_request = SendRequest::new(peer, vec![bundle]).expect("send request is valid");
4035 let send_summary = send_direct_file(&send_request, &identity, |_| {})
4036 .await
4037 .expect("directory sent");
4038 let receive_summary = server
4039 .await
4040 .expect("server joined")
4041 .expect("directory received");
4042
4043 let received_large = tokio::fs::read(dest_bundle.join("large.bin"))
4044 .await
4045 .expect("large file readable");
4046 let received_small = tokio::fs::read(dest_bundle.join("nested/small.txt"))
4047 .await
4048 .expect("small file readable");
4049 assert_eq!(received_large, large_bytes);
4050 assert_eq!(received_small, b"small");
4051 assert_eq!(send_summary.bytes, receive_summary.bytes);
4052 assert!(
4053 receive_summary
4054 .files
4055 .iter()
4056 .any(|file| file.status == TransferFileStatus::Resumed)
4057 );
4058 }
4059
4060 #[tokio::test]
4061 async fn direct_quic_loopback_rejects_file_blocking_incoming_directory() {
4062 let source_dir = tempfile::tempdir().expect("source tempdir");
4063 let dest_dir = tempfile::tempdir().expect("dest tempdir");
4064 let identity_dir = tempfile::tempdir().expect("identity tempdir");
4065 let bundle = source_dir.path().join("bundle");
4066 let nested = bundle.join("nested");
4067 tokio::fs::create_dir_all(&nested)
4068 .await
4069 .expect("source dirs");
4070 tokio::fs::write(nested.join("payload.txt"), b"payload")
4071 .await
4072 .expect("source file");
4073
4074 tokio::fs::create_dir_all(dest_dir.path().join("bundle"))
4075 .await
4076 .expect("dest bundle dir");
4077 let conflicting_path = dest_dir.path().join("bundle/nested");
4078 tokio::fs::write(&conflicting_path, b"do not overwrite")
4079 .await
4080 .expect("conflicting dest file");
4081
4082 let identity =
4083 NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
4084 let recv_identity = identity.clone();
4085 let reserved_socket =
4086 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
4087 let recv_addr = reserved_socket.local_addr().expect("local addr");
4088 drop(reserved_socket);
4089 let dest_path = dest_dir.path().to_path_buf();
4090 let server = tokio::spawn(async move {
4091 receive_direct_file(
4092 &ReceiveRequest::new(recv_addr),
4093 dest_path,
4094 &recv_identity,
4095 |_| {},
4096 )
4097 .await
4098 });
4099
4100 tokio::time::sleep(Duration::from_millis(100)).await;
4101
4102 let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
4103 let send_request = SendRequest::new(peer, vec![bundle]).expect("send request is valid");
4104 let send_result = send_direct_file(&send_request, &identity, |_| {}).await;
4105 let receive_result = server.await.expect("server joined");
4106
4107 assert!(send_result.is_err());
4108 assert!(receive_result.is_err());
4109 let conflict = tokio::fs::read(&conflicting_path)
4110 .await
4111 .expect("conflicting dest file remains readable");
4112 assert_eq!(conflict, b"do not overwrite");
4113 assert!(!conflicting_path.is_dir());
4114 }
4115
4116 #[tokio::test]
4117 async fn direct_quic_loopback_cancels_send_and_does_not_finalize_partial_file() {
4118 let source_dir = tempfile::tempdir().expect("source tempdir");
4119 let dest_dir = tempfile::tempdir().expect("dest tempdir");
4120 let identity_dir = tempfile::tempdir().expect("identity tempdir");
4121 let file_path = source_dir.path().join("payload.bin");
4122 tokio::fs::write(&file_path, vec![3; IO_BUFFER_SIZE * 64])
4123 .await
4124 .expect("source file written");
4125
4126 let identity =
4127 NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
4128 let recv_identity = identity.clone();
4129 let reserved_socket =
4130 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
4131 let recv_addr = reserved_socket.local_addr().expect("local addr");
4132 drop(reserved_socket);
4133
4134 let dest_path = dest_dir.path().to_path_buf();
4135 let server = tokio::spawn(async move {
4136 receive_direct_file(
4137 &ReceiveRequest::new(recv_addr),
4138 dest_path,
4139 &recv_identity,
4140 |_| {},
4141 )
4142 .await
4143 });
4144
4145 tokio::time::sleep(Duration::from_millis(100)).await;
4146
4147 let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
4148 let send_request =
4149 SendRequest::new(peer, vec![file_path.clone()]).expect("send request is valid");
4150 let send_control = TransferControl::new();
4151 let cancel_on_progress = send_control.clone();
4152 let send_events = Arc::new(Mutex::new(Vec::new()));
4153 let send_events_log = send_events.clone();
4154 let send_result =
4155 send_direct_file_with_control(&send_request, &identity, send_control, |event| {
4156 if matches!(event, TransferEvent::Progress { .. }) {
4157 cancel_on_progress.cancel();
4158 }
4159 send_events_log.lock().expect("events lock").push(event);
4160 })
4161 .await;
4162
4163 assert!(matches!(send_result, Err(Error::TransferCancelled)));
4164
4165 let receive_result = tokio::time::timeout(Duration::from_secs(5), server)
4166 .await
4167 .expect("receiver should finish after sender cancellation")
4168 .expect("server joined");
4169 assert!(receive_result.is_err());
4170
4171 assert!(!dest_dir.path().join("payload.bin").exists());
4172 assert!(
4173 send_events
4174 .lock()
4175 .expect("events lock")
4176 .iter()
4177 .any(|event| matches!(event, TransferEvent::SessionCancelled { .. }))
4178 );
4179 }
4180
4181 #[tokio::test]
4182 async fn direct_quic_loopback_cleans_partial_file_when_directory_receive_is_cancelled() {
4183 let source_dir = tempfile::tempdir().expect("source tempdir");
4184 let dest_dir = tempfile::tempdir().expect("dest tempdir");
4185 let identity_dir = tempfile::tempdir().expect("identity tempdir");
4186 let bundle = source_dir.path().join("bundle");
4187 let nested = bundle.join("nested");
4188 tokio::fs::create_dir_all(&nested)
4189 .await
4190 .expect("source dirs");
4191 tokio::fs::write(bundle.join("small.txt"), b"small")
4192 .await
4193 .expect("small file");
4194 tokio::fs::write(nested.join("payload.bin"), vec![5; IO_BUFFER_SIZE * 128])
4195 .await
4196 .expect("large source file");
4197
4198 let identity =
4199 NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
4200 let recv_identity = identity.clone();
4201 let reserved_socket =
4202 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
4203 let recv_addr = reserved_socket.local_addr().expect("local addr");
4204 drop(reserved_socket);
4205
4206 let receive_control = TransferControl::new();
4207 let cancel_receive = receive_control.clone();
4208 let receive_events = Arc::new(Mutex::new(Vec::new()));
4209 let receive_events_log = receive_events.clone();
4210 let dest_path = dest_dir.path().to_path_buf();
4211 let server = tokio::spawn(async move {
4212 receive_direct_file_with_control(
4213 &ReceiveRequest::new(recv_addr),
4214 dest_path,
4215 &recv_identity,
4216 receive_control,
4217 |event| {
4218 if let TransferEvent::Progress { file_name, .. } = &event
4219 && file_name == "bundle/nested/payload.bin"
4220 {
4221 cancel_receive.cancel();
4222 }
4223 receive_events_log.lock().expect("events lock").push(event);
4224 },
4225 )
4226 .await
4227 });
4228
4229 tokio::time::sleep(Duration::from_millis(100)).await;
4230
4231 let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
4232 let send_request = SendRequest::new(peer, vec![bundle]).expect("send request is valid");
4233 let send_result = send_direct_file(&send_request, &identity, |_| {}).await;
4234 let receive_result = tokio::time::timeout(Duration::from_secs(5), server)
4235 .await
4236 .expect("receiver should finish after cancellation")
4237 .expect("server joined");
4238
4239 assert!(send_result.is_err());
4240 assert!(matches!(receive_result, Err(Error::TransferCancelled)));
4241 assert!(
4242 receive_events
4243 .lock()
4244 .expect("events lock")
4245 .iter()
4246 .any(|event| matches!(event, TransferEvent::SessionCancelled { .. }))
4247 );
4248 assert!(
4249 !tokio::fs::try_exists(dest_dir.path().join("bundle/nested/payload.bin"))
4250 .await
4251 .expect("payload path checked")
4252 );
4253 let part_files = std::fs::read_dir(dest_dir.path().join("bundle/nested"))
4254 .expect("nested dest dir exists")
4255 .filter_map(|entry| entry.ok())
4256 .filter(|entry| entry.file_name().to_string_lossy().contains(".ferry-part"))
4257 .count();
4258 assert_eq!(part_files, 0);
4259 }
4260}