1pub mod error;
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::fs::Metadata;
5use std::io::SeekFrom;
6use std::net::SocketAddr;
7use std::path::{Component, Path, PathBuf};
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::{Arc, Mutex};
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12use directories::BaseDirs;
13use mdns_sd::{ResolvedService, ServiceDaemon, ServiceEvent, ServiceInfo};
14use quinn::crypto::rustls::QuicClientConfig;
15use rcgen::{CertifiedKey, generate_simple_self_signed};
16use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier};
17use rustls::crypto::{CryptoProvider, verify_tls12_signature, verify_tls13_signature};
18use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer, ServerName, UnixTime};
19use rustls::{DigitallySignedStruct, SignatureScheme};
20use serde::{Deserialize, Serialize};
21use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
22use uuid::Uuid;
23
24pub use error::{Error, Result};
25
26pub const NATIVE_PROTOCOL_VERSION: u16 = 1;
27pub const MIN_NATIVE_PROTOCOL_VERSION: u16 = 1;
28pub const NATIVE_SERVICE_TYPE: &str = "_ferry._udp.local.";
29
30const DIRECT_MAGIC: &[u8; 8] = b"FERRY01\n";
31const MAX_FRAME_LEN: usize = 16 * 1024 * 1024;
32const IO_BUFFER_SIZE: usize = 64 * 1024;
33const MANIFEST_VERSION: u16 = 1;
34const DEFAULT_CHUNK_SIZE: u64 = 1024 * 1024;
35const CONFIG_DIR_NAME: &str = "ferry";
36const PSK_PROOF_CONTEXT: &[u8] = b"fileferry-direct-psk-v1";
37
38#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
39pub struct DirectPeer {
40 address: SocketAddr,
41 expected_fingerprint: Option<String>,
42}
43
44impl DirectPeer {
45 pub fn parse(value: &str) -> Result<Self> {
46 Ok(Self {
47 address: value.parse()?,
48 expected_fingerprint: None,
49 })
50 }
51
52 pub const fn address(&self) -> SocketAddr {
53 self.address
54 }
55
56 pub const fn from_address(address: SocketAddr) -> Self {
57 Self {
58 address,
59 expected_fingerprint: None,
60 }
61 }
62
63 pub fn with_expected_fingerprint(mut self, fingerprint: impl Into<String>) -> Result<Self> {
64 self.expected_fingerprint = Some(normalized_fingerprint(fingerprint.into())?);
65 Ok(self)
66 }
67
68 pub fn expected_fingerprint(&self) -> Option<&str> {
69 self.expected_fingerprint.as_deref()
70 }
71}
72
73#[derive(Clone, PartialEq, Eq, Deserialize)]
74pub struct PskSecret(String);
75
76impl PskSecret {
77 pub fn new(psk: impl Into<String>) -> Result<Self> {
78 validate_psk(psk.into()).map(Self)
79 }
80
81 fn expose_secret(&self) -> &str {
82 &self.0
83 }
84}
85
86impl std::fmt::Debug for PskSecret {
87 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88 formatter.write_str("PskSecret(<redacted>)")
89 }
90}
91
92impl Serialize for PskSecret {
93 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
94 where
95 S: serde::Serializer,
96 {
97 serializer.serialize_str("<redacted>")
98 }
99}
100
101#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
102pub struct SendRequest {
103 peer: DirectPeer,
104 paths: Vec<PathBuf>,
105 #[serde(default, skip_serializing)]
106 psk: Option<PskSecret>,
107}
108
109impl SendRequest {
110 pub fn new(peer: DirectPeer, paths: Vec<PathBuf>) -> Result<Self> {
111 if paths.is_empty() {
112 return Err(Error::InvalidInput(
113 "at least one file path is required".to_string(),
114 ));
115 }
116
117 Ok(Self {
118 peer,
119 paths,
120 psk: None,
121 })
122 }
123
124 pub const fn peer(&self) -> &DirectPeer {
125 &self.peer
126 }
127
128 pub fn paths(&self) -> &[PathBuf] {
129 &self.paths
130 }
131
132 pub fn with_psk(mut self, psk: impl Into<String>) -> Result<Self> {
133 self.psk = Some(PskSecret::new(psk)?);
134 Ok(self)
135 }
136
137 pub fn psk(&self) -> Option<&str> {
138 self.psk.as_ref().map(PskSecret::expose_secret)
139 }
140}
141
142#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
143pub struct ReceiveRequest {
144 listen: SocketAddr,
145 #[serde(default, skip_serializing)]
146 psk: Option<PskSecret>,
147}
148
149impl ReceiveRequest {
150 pub fn new(listen: SocketAddr) -> Self {
151 Self { listen, psk: None }
152 }
153
154 pub const fn listen(&self) -> SocketAddr {
155 self.listen
156 }
157
158 pub fn with_psk(mut self, psk: impl Into<String>) -> Result<Self> {
159 self.psk = Some(PskSecret::new(psk)?);
160 Ok(self)
161 }
162
163 pub fn psk(&self) -> Option<&str> {
164 self.psk.as_ref().map(PskSecret::expose_secret)
165 }
166}
167
168#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
169pub struct AppConfig {
170 pub alias: String,
171 pub listen_port: u16,
172 pub quic_port: u16,
173 pub download_dir: String,
174 pub auto_accept_known: bool,
175 pub auto_accept_unknown: bool,
176 pub discovery: Vec<String>,
177 pub max_concurrent_files: usize,
178 pub trust: TrustConfig,
179}
180
181impl Default for AppConfig {
182 fn default() -> Self {
183 Self {
184 alias: default_alias(),
185 listen_port: 53317,
186 quic_port: 53318,
187 download_dir: "~/Downloads/ferry".to_string(),
188 auto_accept_known: true,
189 auto_accept_unknown: false,
190 discovery: vec!["native".to_string()],
191 max_concurrent_files: 8,
192 trust: TrustConfig::default(),
193 }
194 }
195}
196
197impl AppConfig {
198 pub fn load_or_default() -> Result<Self> {
199 Self::load_or_default_from(config_dir()?.join("config.toml"))
200 }
201
202 pub fn load_or_default_from(path: impl AsRef<Path>) -> Result<Self> {
203 let path = path.as_ref();
204 match std::fs::read_to_string(path) {
205 Ok(contents) => {
206 toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
207 }
208 Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Self::default()),
209 Err(source) => Err(Error::IoPath {
210 path: path.to_path_buf(),
211 source,
212 }),
213 }
214 }
215
216 pub fn save_default_path(&self) -> Result<()> {
217 self.save_to_path(config_dir()?.join("config.toml"))
218 }
219
220 pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
221 write_toml_file(path.as_ref(), self)
222 }
223
224 pub fn to_toml_string(&self) -> Result<String> {
225 toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
226 }
227
228 pub fn redacted(&self) -> Self {
229 let mut config = self.clone();
230 config.trust = config.trust.redacted();
231 config
232 }
233
234 pub fn to_redacted_toml_string(&self) -> Result<String> {
235 self.redacted().to_toml_string()
236 }
237}
238
239#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
240pub struct DaemonConfig {
241 pub listen: SocketAddr,
242 pub destination: PathBuf,
243}
244
245impl DaemonConfig {
246 pub fn from_app_config(config: &AppConfig) -> Result<Self> {
247 Ok(Self {
248 listen: SocketAddr::from(([0, 0, 0, 0], config.quic_port)),
249 destination: expand_home_path(&config.download_dir)?,
250 })
251 }
252
253 pub fn load_or_default() -> Result<Self> {
254 let app_config = AppConfig::load_or_default()?;
255 Self::load_or_default_from(config_dir()?.join("daemon.toml"), &app_config)
256 }
257
258 pub fn load_or_default_from(path: impl AsRef<Path>, app_config: &AppConfig) -> Result<Self> {
259 let path = path.as_ref();
260 match std::fs::read_to_string(path) {
261 Ok(contents) => {
262 toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
263 }
264 Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
265 Self::from_app_config(app_config)
266 }
267 Err(source) => Err(Error::IoPath {
268 path: path.to_path_buf(),
269 source,
270 }),
271 }
272 }
273
274 pub fn save_default_path(&self) -> Result<()> {
275 self.save_to_path(config_dir()?.join("daemon.toml"))
276 }
277
278 pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
279 write_toml_file(path.as_ref(), self)
280 }
281
282 pub fn to_toml_string(&self) -> Result<String> {
283 toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
284 }
285}
286
287#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
288pub struct TrustConfig {
289 pub require_fingerprint: bool,
290 pub psk: String,
291}
292
293impl Default for TrustConfig {
294 fn default() -> Self {
295 Self {
296 require_fingerprint: true,
297 psk: String::new(),
298 }
299 }
300}
301
302impl TrustConfig {
303 pub fn redacted(&self) -> Self {
304 let psk = if self.psk.is_empty() {
305 String::new()
306 } else {
307 "<redacted>".to_string()
308 };
309 Self {
310 require_fingerprint: self.require_fingerprint,
311 psk,
312 }
313 }
314}
315
316#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
317pub struct NativeIdentity {
318 cert_der: Vec<u8>,
319 key_der: Vec<u8>,
320 fingerprint: String,
321}
322
323impl NativeIdentity {
324 pub fn load_or_generate() -> Result<Self> {
325 Self::load_or_generate_in(config_dir()?)
326 }
327
328 pub fn load_or_generate_in(config_dir: impl AsRef<Path>) -> Result<Self> {
329 let config_dir = config_dir.as_ref();
330 let cert_path = config_dir.join("identity.cert.der");
331 let key_path = config_dir.join("identity.key.der");
332
333 if cert_path.exists() && key_path.exists() {
334 let cert_der = std::fs::read(&cert_path).map_err(|source| Error::IoPath {
335 path: cert_path,
336 source,
337 })?;
338 let key_der = std::fs::read(&key_path).map_err(|source| Error::IoPath {
339 path: key_path,
340 source,
341 })?;
342
343 return Ok(Self::from_parts(cert_der, key_der));
344 }
345
346 std::fs::create_dir_all(config_dir).map_err(|source| Error::IoPath {
347 path: config_dir.to_path_buf(),
348 source,
349 })?;
350
351 let identity = Self::generate()?;
352 write_private_file(&cert_path, &identity.cert_der)?;
353 write_private_file(&key_path, &identity.key_der)?;
354 Ok(identity)
355 }
356
357 pub fn generate() -> Result<Self> {
358 let CertifiedKey { cert, signing_key } =
359 generate_simple_self_signed(vec!["localhost".to_string()])?;
360 Ok(Self::from_parts(
361 cert.der().to_vec(),
362 signing_key.serialize_der(),
363 ))
364 }
365
366 pub fn fingerprint(&self) -> &str {
367 &self.fingerprint
368 }
369
370 fn cert_chain(&self) -> Vec<CertificateDer<'static>> {
371 vec![CertificateDer::from(self.cert_der.clone())]
372 }
373
374 fn private_key(&self) -> PrivateKeyDer<'static> {
375 PrivateKeyDer::from(PrivatePkcs8KeyDer::from(self.key_der.clone()))
376 }
377
378 fn from_parts(cert_der: Vec<u8>, key_der: Vec<u8>) -> Self {
379 let fingerprint = blake3::hash(&cert_der).to_hex().to_string();
380 Self {
381 cert_der,
382 key_der,
383 fingerprint,
384 }
385 }
386}
387
388#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
389#[serde(rename_all = "snake_case")]
390pub enum TransferDirection {
391 Send,
392 Receive,
393}
394
395#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
396pub struct TransferProgress {
397 pub direction: TransferDirection,
398 pub file_name: String,
399 pub bytes_done: u64,
400 pub bytes_total: u64,
401}
402
403#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
404#[serde(tag = "event", rename_all = "snake_case")]
405pub enum TransferEvent {
406 SessionStarted {
407 direction: TransferDirection,
408 session_id: Uuid,
409 files_total: usize,
410 bytes_total: u64,
411 },
412 FileStarted {
413 direction: TransferDirection,
414 file_name: String,
415 bytes_total: u64,
416 resume_offset: u64,
417 },
418 Progress {
419 direction: TransferDirection,
420 file_name: String,
421 bytes_done: u64,
422 bytes_total: u64,
423 },
424 FileFinished {
425 direction: TransferDirection,
426 file_name: String,
427 bytes: u64,
428 blake3: String,
429 status: TransferFileStatus,
430 },
431 SessionFinished {
432 direction: TransferDirection,
433 files_total: usize,
434 bytes_total: u64,
435 blake3: String,
436 },
437 SessionCancelled {
438 direction: TransferDirection,
439 session_id: Uuid,
440 },
441}
442
443impl TransferEvent {
444 pub fn progress(&self) -> Option<TransferProgress> {
445 match self {
446 Self::Progress {
447 direction,
448 file_name,
449 bytes_done,
450 bytes_total,
451 } => Some(TransferProgress {
452 direction: *direction,
453 file_name: file_name.clone(),
454 bytes_done: *bytes_done,
455 bytes_total: *bytes_total,
456 }),
457 _ => None,
458 }
459 }
460}
461
462#[derive(Debug, Clone, Default)]
463pub struct TransferControl {
464 cancelled: Arc<AtomicBool>,
465}
466
467impl TransferControl {
468 pub fn new() -> Self {
469 Self::default()
470 }
471
472 pub fn cancel(&self) {
473 self.cancelled.store(true, Ordering::SeqCst);
474 }
475
476 pub fn is_cancelled(&self) -> bool {
477 self.cancelled.load(Ordering::SeqCst)
478 }
479
480 fn check_cancelled(&self) -> Result<()> {
481 if self.is_cancelled() {
482 Err(Error::TransferCancelled)
483 } else {
484 Ok(())
485 }
486 }
487}
488
489#[derive(Debug, Clone, PartialEq, Eq)]
490pub struct TransferSummary {
491 pub path: PathBuf,
492 pub file_name: String,
493 pub bytes: u64,
494 pub blake3: String,
495 pub files: Vec<TransferFileSummary>,
496}
497
498#[derive(Debug, Clone, PartialEq, Eq)]
499pub struct TransferFileSummary {
500 pub path: PathBuf,
501 pub relative_path: PathBuf,
502 pub bytes: u64,
503 pub blake3: String,
504 pub status: TransferFileStatus,
505}
506
507#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
508#[serde(rename_all = "snake_case")]
509pub enum TransferFileStatus {
510 Sent,
511 Received,
512 Skipped,
513 Resumed,
514}
515
516#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
517pub struct TransferManifest {
518 pub version: u16,
519 pub session_id: Uuid,
520 pub chunk_size: u64,
521 pub entries: Vec<ManifestEntry>,
522}
523
524impl TransferManifest {
525 pub fn file_entries(&self) -> impl Iterator<Item = &ManifestEntry> {
526 self.entries
527 .iter()
528 .filter(|entry| entry.kind == ManifestEntryKind::File)
529 }
530
531 pub fn total_file_bytes(&self) -> u64 {
532 self.file_entries().map(|entry| entry.size).sum()
533 }
534
535 pub fn digest(&self) -> Result<String> {
536 let bytes = serde_json::to_vec(self).map_err(|error| Error::Protocol(error.to_string()))?;
537 Ok(blake3::hash(&bytes).to_hex().to_string())
538 }
539}
540
541#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
542pub struct ManifestEntry {
543 pub relative_path: PathBuf,
544 pub kind: ManifestEntryKind,
545 pub size: u64,
546 pub blake3: Option<String>,
547 pub chunks: Vec<String>,
548 pub unix_mode: Option<u32>,
549 pub modified_unix_ms: Option<i128>,
550}
551
552#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
553#[serde(rename_all = "snake_case")]
554pub enum ManifestEntryKind {
555 File,
556 Directory,
557}
558
559#[derive(Debug, Clone, PartialEq, Eq)]
560pub enum ResumeDecision {
561 Create,
562 Skip,
563 ResumeFrom(u64),
564 Conflict(String),
565}
566
567#[derive(Debug, Clone, PartialEq, Eq)]
568pub struct PeerObservation {
569 pub fingerprint: String,
570 pub alias: Option<String>,
571 pub hostname: Option<String>,
572 pub address: SocketAddr,
573 pub transports: BTreeSet<String>,
574 pub seen_unix_ms: i128,
575}
576
577impl PeerObservation {
578 pub fn new(fingerprint: impl Into<String>, address: SocketAddr, seen_unix_ms: i128) -> Self {
579 Self {
580 fingerprint: fingerprint.into(),
581 alias: None,
582 hostname: None,
583 address,
584 transports: BTreeSet::new(),
585 seen_unix_ms,
586 }
587 }
588
589 pub fn with_alias(mut self, alias: impl Into<String>) -> Self {
590 self.alias = Some(alias.into());
591 self
592 }
593
594 pub fn with_hostname(mut self, hostname: impl Into<String>) -> Self {
595 self.hostname = Some(hostname.into());
596 self
597 }
598
599 pub fn with_transport(mut self, transport: impl Into<String>) -> Self {
600 self.transports.insert(transport.into());
601 self
602 }
603}
604
605#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
606pub struct PeerRecord {
607 pub fingerprint: String,
608 pub aliases: BTreeSet<String>,
609 pub hostnames: BTreeSet<String>,
610 pub addresses: BTreeSet<SocketAddr>,
611 pub transports: BTreeSet<String>,
612 pub first_seen_unix_ms: i128,
613 pub last_seen_unix_ms: i128,
614}
615
616impl PeerRecord {
617 fn from_observation(observation: PeerObservation) -> Self {
618 let mut aliases = BTreeSet::new();
619 if let Some(alias) = observation.alias {
620 aliases.insert(alias);
621 }
622
623 let mut hostnames = BTreeSet::new();
624 if let Some(hostname) = observation.hostname {
625 hostnames.insert(hostname);
626 }
627
628 let mut addresses = BTreeSet::new();
629 addresses.insert(observation.address);
630
631 Self {
632 fingerprint: observation.fingerprint,
633 aliases,
634 hostnames,
635 addresses,
636 transports: observation.transports,
637 first_seen_unix_ms: observation.seen_unix_ms,
638 last_seen_unix_ms: observation.seen_unix_ms,
639 }
640 }
641
642 fn merge(&mut self, observation: PeerObservation) {
643 if let Some(alias) = observation.alias {
644 self.aliases.insert(alias);
645 }
646 if let Some(hostname) = observation.hostname {
647 self.hostnames.insert(hostname);
648 }
649 self.addresses.insert(observation.address);
650 self.transports.extend(observation.transports);
651 self.first_seen_unix_ms = self.first_seen_unix_ms.min(observation.seen_unix_ms);
652 self.last_seen_unix_ms = self.last_seen_unix_ms.max(observation.seen_unix_ms);
653 }
654
655 pub fn preferred_quic_address(&self) -> Option<SocketAddr> {
656 if !self.transports.contains("quic") {
657 return None;
658 }
659
660 self.addresses.iter().copied().next()
661 }
662}
663
664#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
665#[serde(rename_all = "snake_case")]
666pub enum TrustState {
667 Trusted,
668 Blocked,
669}
670
671#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
672pub struct KnownPeerEntry {
673 pub fingerprint: String,
674 #[serde(default)]
675 pub aliases: BTreeSet<String>,
676 #[serde(default)]
677 pub hostnames: BTreeSet<String>,
678 #[serde(default)]
679 pub addresses: BTreeSet<SocketAddr>,
680 #[serde(default)]
681 pub transports: BTreeSet<String>,
682 pub trust_state: TrustState,
683 pub first_seen_unix_ms: Option<i128>,
684 pub last_seen_unix_ms: Option<i128>,
685}
686
687impl KnownPeerEntry {
688 pub fn trusted(fingerprint: impl Into<String>) -> Result<Self> {
689 let fingerprint = normalized_fingerprint(fingerprint.into())?;
690 Ok(Self {
691 fingerprint,
692 aliases: BTreeSet::new(),
693 hostnames: BTreeSet::new(),
694 addresses: BTreeSet::new(),
695 transports: BTreeSet::new(),
696 trust_state: TrustState::Trusted,
697 first_seen_unix_ms: None,
698 last_seen_unix_ms: None,
699 })
700 }
701
702 pub fn from_record(record: &PeerRecord, trust_state: TrustState) -> Result<Self> {
703 let fingerprint = normalized_fingerprint(record.fingerprint.clone())?;
704 Ok(Self {
705 fingerprint,
706 aliases: record.aliases.clone(),
707 hostnames: record.hostnames.clone(),
708 addresses: record.addresses.clone(),
709 transports: record.transports.clone(),
710 trust_state,
711 first_seen_unix_ms: Some(record.first_seen_unix_ms),
712 last_seen_unix_ms: Some(record.last_seen_unix_ms),
713 })
714 }
715}
716
717#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
718pub struct TrustStore {
719 #[serde(default)]
720 pub peers: BTreeMap<String, KnownPeerEntry>,
721}
722
723impl TrustStore {
724 pub fn load_or_default() -> Result<Self> {
725 Self::load_or_default_from(config_dir()?.join("known_peers.toml"))
726 }
727
728 pub fn load_or_default_from(path: impl AsRef<Path>) -> Result<Self> {
729 let path = path.as_ref();
730 match std::fs::read_to_string(path) {
731 Ok(contents) => {
732 toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
733 }
734 Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Self::default()),
735 Err(source) => Err(Error::IoPath {
736 path: path.to_path_buf(),
737 source,
738 }),
739 }
740 }
741
742 pub fn save_default_path(&self) -> Result<()> {
743 self.save_to_path(config_dir()?.join("known_peers.toml"))
744 }
745
746 pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
747 write_toml_file(path.as_ref(), self)
748 }
749
750 pub fn trust_fingerprint(&mut self, fingerprint: impl Into<String>) -> Result<&KnownPeerEntry> {
751 let entry = KnownPeerEntry::trusted(fingerprint)?;
752 let fingerprint = entry.fingerprint.clone();
753 self.peers
754 .entry(fingerprint.clone())
755 .and_modify(|existing| existing.trust_state = TrustState::Trusted)
756 .or_insert(entry);
757 Ok(self
758 .peers
759 .get(&fingerprint)
760 .expect("trusted peer entry should exist"))
761 }
762
763 pub fn trust_record(&mut self, record: &PeerRecord) -> Result<&KnownPeerEntry> {
764 let entry = KnownPeerEntry::from_record(record, TrustState::Trusted)?;
765 let fingerprint = entry.fingerprint.clone();
766 self.peers
767 .entry(fingerprint.clone())
768 .and_modify(|existing| {
769 existing.aliases.extend(record.aliases.clone());
770 existing.hostnames.extend(record.hostnames.clone());
771 existing.addresses.extend(record.addresses.clone());
772 existing.transports.extend(record.transports.clone());
773 existing.first_seen_unix_ms = Some(
774 existing
775 .first_seen_unix_ms
776 .unwrap_or(record.first_seen_unix_ms)
777 .min(record.first_seen_unix_ms),
778 );
779 existing.last_seen_unix_ms = Some(
780 existing
781 .last_seen_unix_ms
782 .unwrap_or(record.last_seen_unix_ms)
783 .max(record.last_seen_unix_ms),
784 );
785 existing.trust_state = TrustState::Trusted;
786 })
787 .or_insert(entry);
788 Ok(self
789 .peers
790 .get(&fingerprint)
791 .expect("trusted peer entry should exist"))
792 }
793
794 pub fn forget(&mut self, fingerprint: &str) -> bool {
795 self.peers.remove(fingerprint).is_some()
796 }
797
798 pub fn records(&self) -> impl Iterator<Item = &KnownPeerEntry> {
799 self.peers.values()
800 }
801
802 pub fn to_toml_string(&self) -> Result<String> {
803 toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
804 }
805}
806
807#[derive(Debug, Default, Clone, PartialEq, Eq)]
808pub struct PeerRegistry {
809 peers: BTreeMap<String, PeerRecord>,
810}
811
812#[derive(Debug, Clone, PartialEq, Eq)]
813pub enum PeerLookup<'a> {
814 Found(&'a PeerRecord),
815 Ambiguous(Vec<&'a PeerRecord>),
816 Missing,
817}
818
819impl PeerRegistry {
820 pub fn new() -> Self {
821 Self::default()
822 }
823
824 pub fn observe(&mut self, observation: PeerObservation) -> Result<&PeerRecord> {
825 if observation.fingerprint.trim().is_empty() {
826 return Err(Error::InvalidInput(
827 "peer fingerprint must not be empty".to_string(),
828 ));
829 }
830
831 let fingerprint = observation.fingerprint.clone();
832 if let Some(record) = self.peers.get_mut(&fingerprint) {
833 record.merge(observation);
834 } else {
835 self.peers.insert(
836 fingerprint.clone(),
837 PeerRecord::from_observation(observation),
838 );
839 }
840
841 Ok(self
842 .peers
843 .get(&fingerprint)
844 .expect("observed peer record should exist"))
845 }
846
847 pub fn get_by_fingerprint(&self, fingerprint: &str) -> Option<&PeerRecord> {
848 self.peers.get(fingerprint)
849 }
850
851 pub fn lookup(&self, query: &str) -> Option<&PeerRecord> {
852 match self.lookup_detail(query) {
853 PeerLookup::Found(record) => Some(record),
854 PeerLookup::Ambiguous(_) | PeerLookup::Missing => None,
855 }
856 }
857
858 pub fn lookup_detail(&self, query: &str) -> PeerLookup<'_> {
859 if let Some(record) = self.peers.get(query) {
860 return PeerLookup::Found(record);
861 }
862
863 let matches = self
864 .peers
865 .values()
866 .filter(|record| {
867 record.fingerprint.starts_with(query)
868 || record.aliases.iter().any(|alias| alias == query)
869 || record.hostnames.iter().any(|hostname| hostname == query)
870 })
871 .collect::<Vec<_>>();
872
873 match matches.as_slice() {
874 [] => PeerLookup::Missing,
875 [record] => PeerLookup::Found(record),
876 _ => PeerLookup::Ambiguous(matches),
877 }
878 }
879
880 pub fn records(&self) -> impl Iterator<Item = &PeerRecord> {
881 self.peers.values()
882 }
883}
884
885#[derive(Debug, Clone, PartialEq, Eq)]
886pub struct NativeAnnouncement {
887 pub alias: String,
888 pub fingerprint: String,
889 pub quic_port: u16,
890 pub listen_port: Option<u16>,
891}
892
893impl NativeAnnouncement {
894 pub fn from_config(config: &AppConfig, identity: &NativeIdentity) -> Self {
895 Self {
896 alias: config.alias.clone(),
897 fingerprint: identity.fingerprint().to_string(),
898 quic_port: config.quic_port,
899 listen_port: Some(config.listen_port),
900 }
901 }
902
903 fn service_info(&self) -> Result<ServiceInfo> {
904 let alias = sanitize_dns_label(&self.alias);
905 let fingerprint_prefix = self
906 .fingerprint
907 .get(..12)
908 .unwrap_or(self.fingerprint.as_str());
909 let instance = format!("{alias}-{fingerprint_prefix}");
910 let hostname = format!("{alias}.local.");
911 let properties = [
912 ("pv", NATIVE_PROTOCOL_VERSION.to_string()),
913 ("minpv", MIN_NATIVE_PROTOCOL_VERSION.to_string()),
914 ("fp", self.fingerprint.clone()),
915 ("alias", self.alias.clone()),
916 ("transports", "quic".to_string()),
917 ("quic_port", self.quic_port.to_string()),
918 (
919 "listen_port",
920 self.listen_port
921 .map(|port| port.to_string())
922 .unwrap_or_default(),
923 ),
924 ];
925
926 ServiceInfo::new(
927 NATIVE_SERVICE_TYPE,
928 &instance,
929 &hostname,
930 "",
931 self.quic_port,
932 &properties[..],
933 )
934 .map(ServiceInfo::enable_addr_auto)
935 .map_err(|error| Error::Discovery(error.to_string()))
936 }
937}
938
939pub struct NativeAnnouncer {
940 daemon: ServiceDaemon,
941 fullname: String,
942}
943
944impl NativeAnnouncer {
945 pub fn start(announcement: &NativeAnnouncement) -> Result<Self> {
946 let daemon = ServiceDaemon::new().map_err(|error| Error::Discovery(error.to_string()))?;
947 let service = announcement.service_info()?;
948 let fullname = service.get_fullname().to_string();
949 daemon
950 .register(service)
951 .map_err(|error| Error::Discovery(error.to_string()))?;
952 Ok(Self { daemon, fullname })
953 }
954}
955
956impl Drop for NativeAnnouncer {
957 fn drop(&mut self) {
958 let _ = self.daemon.unregister(&self.fullname);
959 let _ = self.daemon.shutdown();
960 }
961}
962
963pub async fn discover_native_peers(timeout: Duration) -> Result<PeerRegistry> {
964 let daemon = ServiceDaemon::new().map_err(|error| Error::Discovery(error.to_string()))?;
965 let receiver = daemon
966 .browse(NATIVE_SERVICE_TYPE)
967 .map_err(|error| Error::Discovery(error.to_string()))?;
968 let deadline = tokio::time::Instant::now() + timeout;
969 let mut registry = PeerRegistry::new();
970
971 loop {
972 let now = tokio::time::Instant::now();
973 if now >= deadline {
974 break;
975 }
976
977 let wait = deadline - now;
978 match tokio::time::timeout(wait, receiver.recv_async()).await {
979 Ok(Ok(ServiceEvent::ServiceResolved(service))) => {
980 observe_resolved_service(&mut registry, &service)?;
981 }
982 Ok(Ok(_)) => {}
983 Ok(Err(error)) => {
984 return Err(Error::Discovery(error.to_string()));
985 }
986 Err(_) => break,
987 }
988 }
989
990 daemon
991 .stop_browse(NATIVE_SERVICE_TYPE)
992 .map_err(|error| Error::Discovery(error.to_string()))?;
993 let _ = daemon.shutdown();
994 Ok(registry)
995}
996
997fn observe_resolved_service(registry: &mut PeerRegistry, service: &ResolvedService) -> Result<()> {
998 let Some(fingerprint) = service.get_property_val_str("fp") else {
999 return Ok(());
1000 };
1001 let fingerprint = match normalized_fingerprint(fingerprint.to_string()) {
1002 Ok(fingerprint) => fingerprint,
1003 Err(_) => return Ok(()),
1004 };
1005 let Some(highest_version) = parse_txt_u16(service, "pv") else {
1006 return Ok(());
1007 };
1008 let Some(minimum_version) = parse_txt_u16(service, "minpv") else {
1009 return Ok(());
1010 };
1011 if minimum_version > highest_version
1012 || minimum_version > NATIVE_PROTOCOL_VERSION
1013 || highest_version < MIN_NATIVE_PROTOCOL_VERSION
1014 {
1015 return Ok(());
1016 }
1017
1018 let Some(quic_port) = parse_txt_u16(service, "quic_port") else {
1019 return Ok(());
1020 };
1021 let transports = service
1022 .get_property_val_str("transports")
1023 .unwrap_or_default()
1024 .split(',')
1025 .map(str::trim)
1026 .filter(|value| !value.is_empty())
1027 .map(ToOwned::to_owned)
1028 .collect::<Vec<_>>();
1029 if !transports.iter().any(|transport| transport == "quic") {
1030 return Ok(());
1031 }
1032
1033 let alias = service.get_property_val_str("alias").map(ToOwned::to_owned);
1034 let hostname = Some(service.get_hostname().trim_end_matches('.').to_string());
1035 let seen_unix_ms = system_time_unix_ms(SystemTime::now()).unwrap_or_default();
1036
1037 for address in service.get_addresses_v4() {
1038 let mut observation = PeerObservation::new(
1039 fingerprint.clone(),
1040 SocketAddr::from((address, quic_port)),
1041 seen_unix_ms,
1042 );
1043 if let Some(alias) = &alias {
1044 observation = observation.with_alias(alias.clone());
1045 }
1046 if let Some(hostname) = &hostname {
1047 observation = observation.with_hostname(hostname.clone());
1048 }
1049 for transport in &transports {
1050 observation = observation.with_transport(transport.clone());
1051 }
1052 registry.observe(observation)?;
1053 }
1054
1055 Ok(())
1056}
1057
1058fn parse_txt_u16(service: &ResolvedService, key: &str) -> Option<u16> {
1059 service.get_property_val_str(key)?.parse().ok()
1060}
1061
1062fn sanitize_dns_label(value: &str) -> String {
1063 let mut label = value
1064 .chars()
1065 .map(|ch| {
1066 if ch.is_ascii_alphanumeric() || ch == '-' {
1067 ch.to_ascii_lowercase()
1068 } else {
1069 '-'
1070 }
1071 })
1072 .collect::<String>();
1073 while label.contains("--") {
1074 label = label.replace("--", "-");
1075 }
1076 let label = label.trim_matches('-').to_string();
1077 if label.is_empty() {
1078 "fileferry-device".to_string()
1079 } else {
1080 label
1081 }
1082}
1083
1084#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1085struct DirectReceivePlan {
1086 files: Vec<DirectFilePlan>,
1087}
1088
1089#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1090struct DirectProtocolHello {
1091 protocol_version: u16,
1092 min_protocol_version: u16,
1093 client_fingerprint: String,
1094 #[serde(default)]
1095 psk: bool,
1096}
1097
1098impl DirectProtocolHello {
1099 fn new(identity: &NativeIdentity, psk: Option<&str>) -> Self {
1100 Self {
1101 protocol_version: NATIVE_PROTOCOL_VERSION,
1102 min_protocol_version: MIN_NATIVE_PROTOCOL_VERSION,
1103 client_fingerprint: identity.fingerprint().to_string(),
1104 psk: psk.is_some(),
1105 }
1106 }
1107}
1108
1109#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1110struct DirectProtocolResponse {
1111 accepted: bool,
1112 protocol_version: Option<u16>,
1113 #[serde(default)]
1114 psk_challenge: Option<String>,
1115 error: Option<String>,
1116}
1117
1118impl DirectProtocolResponse {
1119 fn accept(protocol_version: u16) -> Self {
1120 Self {
1121 accepted: true,
1122 protocol_version: Some(protocol_version),
1123 psk_challenge: None,
1124 error: None,
1125 }
1126 }
1127
1128 fn accept_with_psk(protocol_version: u16, challenge: impl Into<String>) -> Self {
1129 Self {
1130 accepted: true,
1131 protocol_version: Some(protocol_version),
1132 psk_challenge: Some(challenge.into()),
1133 error: None,
1134 }
1135 }
1136
1137 fn reject(error: impl Into<String>) -> Self {
1138 Self {
1139 accepted: false,
1140 protocol_version: None,
1141 psk_challenge: None,
1142 error: Some(error.into()),
1143 }
1144 }
1145}
1146
1147#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1148struct DirectPskProof {
1149 proof: String,
1150}
1151
1152#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1153struct DirectPskResult {
1154 accepted: bool,
1155 error: Option<String>,
1156}
1157
1158impl DirectPskResult {
1159 fn accept() -> Self {
1160 Self {
1161 accepted: true,
1162 error: None,
1163 }
1164 }
1165
1166 fn reject(error: impl Into<String>) -> Self {
1167 Self {
1168 accepted: false,
1169 error: Some(error.into()),
1170 }
1171 }
1172}
1173
1174#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1175struct DirectFilePlan {
1176 relative_path: PathBuf,
1177 action: DirectFileAction,
1178 offset: u64,
1179}
1180
1181#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1182#[serde(rename_all = "snake_case")]
1183enum DirectFileAction {
1184 Receive,
1185 Skip,
1186}
1187
1188#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1189struct DirectPayloadHeader {
1190 relative_path: PathBuf,
1191 offset: u64,
1192 bytes: u64,
1193}
1194
1195#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1196struct DirectAck {
1197 ok: bool,
1198 error: Option<String>,
1199}
1200
1201pub fn validate_send_paths(paths: &[PathBuf]) -> Result<()> {
1202 if paths.is_empty() {
1203 return Err(Error::InvalidInput(
1204 "at least one file path is required".to_string(),
1205 ));
1206 }
1207
1208 for path in paths {
1209 if path.as_os_str().is_empty() {
1210 return Err(Error::InvalidInput("empty file path".to_string()));
1211 }
1212 }
1213
1214 Ok(())
1215}
1216
1217pub fn display_path(path: &Path) -> String {
1218 path.to_string_lossy().into_owned()
1219}
1220
1221pub async fn build_manifest(paths: &[PathBuf]) -> Result<TransferManifest> {
1222 validate_send_paths(paths)?;
1223
1224 let sources = collect_manifest_sources(paths)?;
1225 let mut entries = Vec::with_capacity(sources.len());
1226 let mut seen = BTreeSet::new();
1227
1228 for source in sources {
1229 if !seen.insert(source.relative_path.clone()) {
1230 return Err(Error::InvalidInput(format!(
1231 "duplicate transfer path: {}",
1232 display_path(&source.relative_path)
1233 )));
1234 }
1235
1236 entries.push(build_manifest_entry(source).await?);
1237 }
1238
1239 Ok(TransferManifest {
1240 version: MANIFEST_VERSION,
1241 session_id: Uuid::new_v4(),
1242 chunk_size: DEFAULT_CHUNK_SIZE,
1243 entries,
1244 })
1245}
1246
1247pub fn safe_destination_path(destination: &Path, relative_path: &Path) -> Result<PathBuf> {
1248 validate_relative_transfer_path(relative_path)?;
1249 Ok(destination.join(relative_path))
1250}
1251
1252pub async fn decide_resume(
1253 destination: &Path,
1254 entry: &ManifestEntry,
1255 chunk_size: u64,
1256) -> Result<ResumeDecision> {
1257 if entry.kind != ManifestEntryKind::File {
1258 return Ok(ResumeDecision::Create);
1259 }
1260
1261 let path = safe_destination_path(destination, &entry.relative_path)?;
1262 let metadata = match tokio::fs::metadata(&path).await {
1263 Ok(metadata) => metadata,
1264 Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
1265 return Ok(ResumeDecision::Create);
1266 }
1267 Err(source) => {
1268 return Err(Error::IoPath { path, source });
1269 }
1270 };
1271
1272 if !metadata.is_file() {
1273 return Ok(ResumeDecision::Conflict(format!(
1274 "{} already exists and is not a regular file",
1275 display_path(&path)
1276 )));
1277 }
1278
1279 if metadata.len() == entry.size {
1280 let expected = entry
1281 .blake3
1282 .as_deref()
1283 .ok_or_else(|| Error::Protocol("file manifest entry is missing BLAKE3".to_string()))?;
1284 let actual = hash_file(&path).await?;
1285 return if actual == expected {
1286 Ok(ResumeDecision::Skip)
1287 } else {
1288 Ok(ResumeDecision::Conflict(format!(
1289 "{} already exists with different contents",
1290 display_path(&path)
1291 )))
1292 };
1293 }
1294
1295 if metadata.len() > entry.size {
1296 return Ok(ResumeDecision::Conflict(format!(
1297 "{} is larger than incoming file",
1298 display_path(&path)
1299 )));
1300 }
1301
1302 let verified_offset = verified_prefix_offset(&path, entry, metadata.len(), chunk_size).await?;
1303 if verified_offset > 0 {
1304 Ok(ResumeDecision::ResumeFrom(verified_offset))
1305 } else {
1306 Ok(ResumeDecision::Conflict(format!(
1307 "{} already exists and does not match a verified prefix",
1308 display_path(&path)
1309 )))
1310 }
1311}
1312
1313pub async fn send_direct_file(
1314 request: &SendRequest,
1315 identity: &NativeIdentity,
1316 mut events: impl FnMut(TransferEvent),
1317) -> Result<TransferSummary> {
1318 send_direct_file_with_control(request, identity, TransferControl::new(), &mut events).await
1319}
1320
1321pub async fn send_direct_file_with_control(
1322 request: &SendRequest,
1323 identity: &NativeIdentity,
1324 control: TransferControl,
1325 mut events: impl FnMut(TransferEvent),
1326) -> Result<TransferSummary> {
1327 ensure_rustls_provider();
1328
1329 control.check_cancelled()?;
1330 let manifest = build_manifest(request.paths()).await?;
1331 let sources = manifest_source_map(request.paths())?;
1332 events(TransferEvent::SessionStarted {
1333 direction: TransferDirection::Send,
1334 session_id: manifest.session_id,
1335 files_total: manifest.file_entries().count(),
1336 bytes_total: manifest.total_file_bytes(),
1337 });
1338 control.check_cancelled()?;
1339
1340 let (client_config, server_fingerprint) = client_config_capturing_server_fingerprint()?;
1341 let mut endpoint = quinn::Endpoint::client(SocketAddr::from(([0, 0, 0, 0], 0)))?;
1342 endpoint.set_default_client_config(client_config);
1343 let connection = endpoint
1344 .connect(request.peer().address(), "localhost")?
1345 .await?;
1346 if let Some(expected) = request.peer().expected_fingerprint() {
1347 let actual = captured_server_fingerprint(&server_fingerprint)?;
1348 if actual != expected {
1349 connection.close(1u32.into(), b"fingerprint mismatch");
1350 return Err(Error::PeerFingerprintMismatch {
1351 expected: expected.to_string(),
1352 actual,
1353 });
1354 }
1355 }
1356 let (mut send, mut recv) = connection.open_bi().await?;
1357
1358 send.write_all(DIRECT_MAGIC).await?;
1359 write_json_frame(
1360 &mut send,
1361 &DirectProtocolHello::new(identity, request.psk()),
1362 )
1363 .await?;
1364 let protocol: DirectProtocolResponse = read_json_frame(&mut recv).await?;
1365 if !protocol.accepted {
1366 return Err(Error::Protocol(protocol.error.unwrap_or_else(|| {
1367 "receiver rejected protocol negotiation".to_string()
1368 })));
1369 }
1370 if protocol.protocol_version != Some(NATIVE_PROTOCOL_VERSION) {
1371 return Err(Error::Protocol(format!(
1372 "receiver selected unsupported protocol version {:?}",
1373 protocol.protocol_version
1374 )));
1375 }
1376 match (request.psk(), protocol.psk_challenge.as_deref()) {
1377 (Some(psk), Some(challenge)) => {
1378 let proof = DirectPskProof {
1379 proof: psk_proof(
1380 psk,
1381 challenge,
1382 identity.fingerprint(),
1383 NATIVE_PROTOCOL_VERSION,
1384 )?,
1385 };
1386 write_json_frame(&mut send, &proof).await?;
1387 let auth: DirectPskResult = read_json_frame(&mut recv).await?;
1388 if !auth.accepted {
1389 return Err(Error::Authentication(
1390 auth.error
1391 .unwrap_or_else(|| "PSK authentication rejected".to_string()),
1392 ));
1393 }
1394 }
1395 (Some(_), None) => {
1396 return Err(Error::Authentication(
1397 "receiver is not configured for PSK mode".to_string(),
1398 ));
1399 }
1400 (None, Some(_)) => {
1401 return Err(Error::Authentication(
1402 "receiver requires PSK authentication".to_string(),
1403 ));
1404 }
1405 (None, None) => {}
1406 }
1407
1408 write_json_frame(&mut send, &manifest).await?;
1409
1410 let plan: DirectReceivePlan = read_json_frame(&mut recv).await?;
1411 let mut summaries = Vec::new();
1412 for file_plan in plan.files {
1413 let Some(entry) = manifest
1414 .file_entries()
1415 .find(|entry| entry.relative_path == file_plan.relative_path)
1416 else {
1417 return Err(Error::Protocol(format!(
1418 "receiver requested unknown file: {}",
1419 display_path(&file_plan.relative_path)
1420 )));
1421 };
1422
1423 let Some(source_path) = sources.get(&file_plan.relative_path) else {
1424 return Err(Error::Protocol(format!(
1425 "missing source for manifest file: {}",
1426 display_path(&file_plan.relative_path)
1427 )));
1428 };
1429
1430 if file_plan.action == DirectFileAction::Skip {
1431 let blake3 = entry.blake3.clone().unwrap_or_default();
1432 events(TransferEvent::FileFinished {
1433 direction: TransferDirection::Send,
1434 file_name: display_path(&entry.relative_path),
1435 bytes: entry.size,
1436 blake3: blake3.clone(),
1437 status: TransferFileStatus::Skipped,
1438 });
1439 summaries.push(TransferFileSummary {
1440 path: source_path.clone(),
1441 relative_path: entry.relative_path.clone(),
1442 bytes: entry.size,
1443 blake3,
1444 status: TransferFileStatus::Skipped,
1445 });
1446 continue;
1447 }
1448
1449 match send_payload_file(
1450 &mut send,
1451 source_path,
1452 entry,
1453 file_plan.offset,
1454 &control,
1455 &mut events,
1456 )
1457 .await
1458 {
1459 Ok(()) => {}
1460 Err(Error::TransferCancelled) => {
1461 events(TransferEvent::SessionCancelled {
1462 direction: TransferDirection::Send,
1463 session_id: manifest.session_id,
1464 });
1465 connection.close(1u32.into(), b"cancelled");
1466 return Err(Error::TransferCancelled);
1467 }
1468 Err(error) => return Err(error),
1469 }
1470 let status = if file_plan.offset > 0 {
1471 TransferFileStatus::Resumed
1472 } else {
1473 TransferFileStatus::Sent
1474 };
1475 let blake3 = entry.blake3.clone().unwrap_or_default();
1476 events(TransferEvent::FileFinished {
1477 direction: TransferDirection::Send,
1478 file_name: display_path(&entry.relative_path),
1479 bytes: entry.size,
1480 blake3: blake3.clone(),
1481 status,
1482 });
1483 summaries.push(TransferFileSummary {
1484 path: source_path.clone(),
1485 relative_path: entry.relative_path.clone(),
1486 bytes: entry.size,
1487 blake3,
1488 status,
1489 });
1490 }
1491 send.finish()?;
1492
1493 let ack: DirectAck = read_json_frame(&mut recv).await?;
1494 if !ack.ok {
1495 return Err(Error::Transfer(
1496 ack.error
1497 .unwrap_or_else(|| "receiver rejected transfer".to_string()),
1498 ));
1499 }
1500
1501 connection.close(0u32.into(), b"done");
1502 endpoint.wait_idle().await;
1503
1504 let summary = summary_from_files(&manifest, summaries)?;
1505 events(TransferEvent::SessionFinished {
1506 direction: TransferDirection::Send,
1507 files_total: summary.files.len(),
1508 bytes_total: summary.bytes,
1509 blake3: summary.blake3.clone(),
1510 });
1511 Ok(summary)
1512}
1513
1514pub async fn receive_direct_file(
1515 request: &ReceiveRequest,
1516 destination: impl AsRef<Path>,
1517 identity: &NativeIdentity,
1518 events: impl FnMut(TransferEvent),
1519) -> Result<TransferSummary> {
1520 receive_direct_file_with_control(
1521 request,
1522 destination,
1523 identity,
1524 TransferControl::new(),
1525 events,
1526 )
1527 .await
1528}
1529
1530pub async fn receive_direct_file_with_control(
1531 request: &ReceiveRequest,
1532 destination: impl AsRef<Path>,
1533 identity: &NativeIdentity,
1534 control: TransferControl,
1535 mut events: impl FnMut(TransferEvent),
1536) -> Result<TransferSummary> {
1537 ensure_rustls_provider();
1538
1539 control.check_cancelled()?;
1540 let destination = destination.as_ref();
1541 tokio::fs::create_dir_all(destination)
1542 .await
1543 .map_err(|source| Error::IoPath {
1544 path: destination.to_path_buf(),
1545 source,
1546 })?;
1547
1548 let server_config =
1549 quinn::ServerConfig::with_single_cert(identity.cert_chain(), identity.private_key())?;
1550 let endpoint = quinn::Endpoint::server(server_config, request.listen())?;
1551 let incoming = endpoint.accept().await.ok_or(Error::NoIncomingConnection)?;
1552 let connection = incoming.await?;
1553 let (mut send, mut recv) = connection.accept_bi().await?;
1554
1555 let result = receive_stream_file(
1556 destination,
1557 request.psk(),
1558 &mut recv,
1559 &mut send,
1560 &control,
1561 &mut events,
1562 )
1563 .await;
1564 let ack = match &result {
1565 Ok(_) => DirectAck {
1566 ok: true,
1567 error: None,
1568 },
1569 Err(error) => DirectAck {
1570 ok: false,
1571 error: Some(error.to_string()),
1572 },
1573 };
1574 write_json_frame(&mut send, &ack).await?;
1575 send.finish()?;
1576 let _ = tokio::time::timeout(Duration::from_secs(1), connection.closed()).await;
1577 endpoint.close(0u32.into(), b"done");
1578 endpoint.wait_idle().await;
1579
1580 result
1581}
1582
1583async fn receive_stream_file(
1584 destination: &Path,
1585 psk: Option<&str>,
1586 recv: &mut quinn::RecvStream,
1587 send: &mut quinn::SendStream,
1588 control: &TransferControl,
1589 events: &mut impl FnMut(TransferEvent),
1590) -> Result<TransferSummary> {
1591 let mut magic = [0; DIRECT_MAGIC.len()];
1592 recv.read_exact(&mut magic).await?;
1593 if &magic != DIRECT_MAGIC {
1594 return Err(Error::Protocol("bad direct-transfer magic".to_string()));
1595 }
1596
1597 let hello: DirectProtocolHello = read_json_frame(recv).await?;
1598 let protocol = negotiate_direct_protocol(&hello, psk);
1599 write_json_frame(send, &protocol).await?;
1600 if !protocol.accepted {
1601 return Err(Error::Protocol(protocol.error.unwrap_or_else(|| {
1602 "unsupported direct protocol version".to_string()
1603 })));
1604 }
1605 if let (Some(psk), Some(challenge)) = (psk, protocol.psk_challenge.as_deref()) {
1606 let proof: DirectPskProof = read_json_frame(recv).await?;
1607 let expected = psk_proof(
1608 psk,
1609 challenge,
1610 &hello.client_fingerprint,
1611 protocol.protocol_version.unwrap_or(NATIVE_PROTOCOL_VERSION),
1612 )?;
1613 if proof.proof != expected {
1614 write_json_frame(send, &DirectPskResult::reject("PSK proof mismatch")).await?;
1615 return Err(Error::Authentication("PSK proof mismatch".to_string()));
1616 }
1617 write_json_frame(send, &DirectPskResult::accept()).await?;
1618 }
1619
1620 let manifest: TransferManifest = read_json_frame(recv).await?;
1621 validate_manifest(&manifest)?;
1622 events(TransferEvent::SessionStarted {
1623 direction: TransferDirection::Receive,
1624 session_id: manifest.session_id,
1625 files_total: manifest.file_entries().count(),
1626 bytes_total: manifest.total_file_bytes(),
1627 });
1628 control.check_cancelled()?;
1629
1630 for entry in &manifest.entries {
1631 if entry.kind == ManifestEntryKind::Directory {
1632 let path = safe_destination_path(destination, &entry.relative_path)?;
1633 tokio::fs::create_dir_all(&path)
1634 .await
1635 .map_err(|source| Error::IoPath { path, source })?;
1636 }
1637 }
1638
1639 let mut plan = DirectReceivePlan { files: Vec::new() };
1640 for entry in manifest.file_entries() {
1641 let decision = decide_resume(destination, entry, manifest.chunk_size).await?;
1642 let (action, offset) = match decision {
1643 ResumeDecision::Create => (DirectFileAction::Receive, 0),
1644 ResumeDecision::Skip => (DirectFileAction::Skip, entry.size),
1645 ResumeDecision::ResumeFrom(offset) => (DirectFileAction::Receive, offset),
1646 ResumeDecision::Conflict(message) => return Err(Error::InvalidInput(message)),
1647 };
1648 plan.files.push(DirectFilePlan {
1649 relative_path: entry.relative_path.clone(),
1650 action,
1651 offset,
1652 });
1653 }
1654 write_json_frame(send, &plan).await?;
1655
1656 let mut summaries = Vec::new();
1657 for file_plan in &plan.files {
1658 let Some(entry) = manifest
1659 .file_entries()
1660 .find(|entry| entry.relative_path == file_plan.relative_path)
1661 else {
1662 return Err(Error::Protocol(format!(
1663 "planned file missing from manifest: {}",
1664 display_path(&file_plan.relative_path)
1665 )));
1666 };
1667
1668 let final_path = safe_destination_path(destination, &entry.relative_path)?;
1669 if file_plan.action == DirectFileAction::Skip {
1670 let blake3 = entry.blake3.clone().unwrap_or_default();
1671 events(TransferEvent::FileFinished {
1672 direction: TransferDirection::Receive,
1673 file_name: display_path(&entry.relative_path),
1674 bytes: entry.size,
1675 blake3: blake3.clone(),
1676 status: TransferFileStatus::Skipped,
1677 });
1678 summaries.push(TransferFileSummary {
1679 path: final_path,
1680 relative_path: entry.relative_path.clone(),
1681 bytes: entry.size,
1682 blake3,
1683 status: TransferFileStatus::Skipped,
1684 });
1685 continue;
1686 }
1687
1688 match receive_payload_file(recv, destination, entry, file_plan.offset, control, events)
1689 .await
1690 {
1691 Ok(()) => {}
1692 Err(Error::TransferCancelled) => {
1693 events(TransferEvent::SessionCancelled {
1694 direction: TransferDirection::Receive,
1695 session_id: manifest.session_id,
1696 });
1697 return Err(Error::TransferCancelled);
1698 }
1699 Err(error) => return Err(error),
1700 }
1701 let actual_hash = hash_file(&final_path).await?;
1702 let expected_hash = entry
1703 .blake3
1704 .as_deref()
1705 .ok_or_else(|| Error::Protocol("file manifest entry is missing BLAKE3".to_string()))?;
1706 if actual_hash != expected_hash {
1707 return Err(Error::Transfer(format!(
1708 "BLAKE3 hash mismatch for {}",
1709 display_path(&entry.relative_path)
1710 )));
1711 }
1712
1713 let status = if file_plan.offset > 0 {
1714 TransferFileStatus::Resumed
1715 } else {
1716 TransferFileStatus::Received
1717 };
1718 events(TransferEvent::FileFinished {
1719 direction: TransferDirection::Receive,
1720 file_name: display_path(&entry.relative_path),
1721 bytes: entry.size,
1722 blake3: actual_hash.clone(),
1723 status,
1724 });
1725 summaries.push(TransferFileSummary {
1726 path: final_path,
1727 relative_path: entry.relative_path.clone(),
1728 bytes: entry.size,
1729 blake3: actual_hash,
1730 status,
1731 });
1732 }
1733
1734 let summary = summary_from_files(&manifest, summaries)?;
1735 events(TransferEvent::SessionFinished {
1736 direction: TransferDirection::Receive,
1737 files_total: summary.files.len(),
1738 bytes_total: summary.bytes,
1739 blake3: summary.blake3.clone(),
1740 });
1741 Ok(summary)
1742}
1743
1744async fn send_payload_file(
1745 send: &mut quinn::SendStream,
1746 source_path: &Path,
1747 entry: &ManifestEntry,
1748 offset: u64,
1749 control: &TransferControl,
1750 events: &mut impl FnMut(TransferEvent),
1751) -> Result<()> {
1752 if offset > entry.size {
1753 return Err(Error::Protocol(format!(
1754 "resume offset exceeds file size for {}",
1755 display_path(&entry.relative_path)
1756 )));
1757 }
1758
1759 let header = DirectPayloadHeader {
1760 relative_path: entry.relative_path.clone(),
1761 offset,
1762 bytes: entry.size - offset,
1763 };
1764 write_json_frame(send, &header).await?;
1765 events(TransferEvent::FileStarted {
1766 direction: TransferDirection::Send,
1767 file_name: display_path(&entry.relative_path),
1768 bytes_total: entry.size,
1769 resume_offset: offset,
1770 });
1771
1772 let mut file = tokio::fs::File::open(source_path)
1773 .await
1774 .map_err(|source| Error::IoPath {
1775 path: source_path.to_path_buf(),
1776 source,
1777 })?;
1778 file.seek(SeekFrom::Start(offset))
1779 .await
1780 .map_err(|source| Error::IoPath {
1781 path: source_path.to_path_buf(),
1782 source,
1783 })?;
1784
1785 let mut buffer = vec![0; IO_BUFFER_SIZE];
1786 let mut bytes_done = offset;
1787 while bytes_done < entry.size {
1788 control.check_cancelled()?;
1789 let remaining = entry.size - bytes_done;
1790 let read_size = buffer.len().min(remaining as usize);
1791 let read = file
1792 .read(&mut buffer[..read_size])
1793 .await
1794 .map_err(|source| Error::IoPath {
1795 path: source_path.to_path_buf(),
1796 source,
1797 })?;
1798 if read == 0 {
1799 return Err(Error::Protocol(format!(
1800 "source file ended early: {}",
1801 display_path(source_path)
1802 )));
1803 }
1804 send.write_all(&buffer[..read]).await?;
1805 bytes_done += read as u64;
1806 events(TransferEvent::Progress {
1807 direction: TransferDirection::Send,
1808 file_name: display_path(&entry.relative_path),
1809 bytes_done,
1810 bytes_total: entry.size,
1811 });
1812 control.check_cancelled()?;
1813 }
1814
1815 Ok(())
1816}
1817
1818async fn receive_payload_file(
1819 recv: &mut quinn::RecvStream,
1820 destination: &Path,
1821 entry: &ManifestEntry,
1822 offset: u64,
1823 control: &TransferControl,
1824 events: &mut impl FnMut(TransferEvent),
1825) -> Result<()> {
1826 let header: DirectPayloadHeader = read_json_frame(recv).await?;
1827 if header.relative_path != entry.relative_path
1828 || header.offset != offset
1829 || header.bytes != entry.size - offset
1830 {
1831 return Err(Error::Protocol(format!(
1832 "payload header does not match receive plan for {}",
1833 display_path(&entry.relative_path)
1834 )));
1835 }
1836 events(TransferEvent::FileStarted {
1837 direction: TransferDirection::Receive,
1838 file_name: display_path(&entry.relative_path),
1839 bytes_total: entry.size,
1840 resume_offset: offset,
1841 });
1842
1843 let final_path = safe_destination_path(destination, &entry.relative_path)?;
1844 if let Some(parent) = final_path.parent() {
1845 tokio::fs::create_dir_all(parent)
1846 .await
1847 .map_err(|source| Error::IoPath {
1848 path: parent.to_path_buf(),
1849 source,
1850 })?;
1851 }
1852
1853 let write_path = if offset == 0 {
1854 temp_path_for(&final_path)
1855 } else {
1856 final_path.clone()
1857 };
1858
1859 let mut file = if offset == 0 {
1860 tokio::fs::File::create(&write_path)
1861 .await
1862 .map_err(|source| Error::IoPath {
1863 path: write_path.clone(),
1864 source,
1865 })?
1866 } else {
1867 let mut file = tokio::fs::OpenOptions::new()
1868 .write(true)
1869 .open(&write_path)
1870 .await
1871 .map_err(|source| Error::IoPath {
1872 path: write_path.clone(),
1873 source,
1874 })?;
1875 file.set_len(offset).await.map_err(|source| Error::IoPath {
1876 path: write_path.clone(),
1877 source,
1878 })?;
1879 file.seek(SeekFrom::Start(offset))
1880 .await
1881 .map_err(|source| Error::IoPath {
1882 path: write_path.clone(),
1883 source,
1884 })?;
1885 file
1886 };
1887
1888 let mut bytes_done = offset;
1889 let mut bytes_remaining = header.bytes;
1890 let mut buffer = vec![0; IO_BUFFER_SIZE];
1891 while bytes_remaining > 0 {
1892 if control.is_cancelled() {
1893 if offset == 0 {
1894 let _ = tokio::fs::remove_file(&write_path).await;
1895 }
1896 return Err(Error::TransferCancelled);
1897 }
1898 let read_size = buffer.len().min(bytes_remaining as usize);
1899 let read = match recv.read(&mut buffer[..read_size]).await {
1900 Ok(Some(read)) => read,
1901 Ok(None) => {
1902 if offset == 0 {
1903 let _ = tokio::fs::remove_file(&write_path).await;
1904 }
1905 return Err(Error::Protocol(
1906 "stream ended before file payload".to_string(),
1907 ));
1908 }
1909 Err(error) => {
1910 if offset == 0 {
1911 let _ = tokio::fs::remove_file(&write_path).await;
1912 }
1913 return Err(Error::Read(error));
1914 }
1915 };
1916
1917 file.write_all(&buffer[..read])
1918 .await
1919 .map_err(|source| Error::IoPath {
1920 path: write_path.clone(),
1921 source,
1922 })?;
1923 bytes_done += read as u64;
1924 bytes_remaining -= read as u64;
1925 events(TransferEvent::Progress {
1926 direction: TransferDirection::Receive,
1927 file_name: display_path(&entry.relative_path),
1928 bytes_done,
1929 bytes_total: entry.size,
1930 });
1931 if control.is_cancelled() {
1932 if offset == 0 {
1933 let _ = tokio::fs::remove_file(&write_path).await;
1934 }
1935 return Err(Error::TransferCancelled);
1936 }
1937 }
1938
1939 file.flush().await.map_err(|source| Error::IoPath {
1940 path: write_path.clone(),
1941 source,
1942 })?;
1943 drop(file);
1944
1945 if offset == 0 {
1946 tokio::fs::rename(&write_path, &final_path)
1947 .await
1948 .map_err(|source| Error::IoPath {
1949 path: final_path,
1950 source,
1951 })?;
1952 }
1953
1954 Ok(())
1955}
1956
1957async fn write_json_frame<T: Serialize>(send: &mut quinn::SendStream, value: &T) -> Result<()> {
1958 let bytes = serde_json::to_vec(value).map_err(|error| Error::Protocol(error.to_string()))?;
1959 if bytes.len() > MAX_FRAME_LEN {
1960 return Err(Error::Protocol("frame exceeds maximum length".to_string()));
1961 }
1962 send.write_all(&(bytes.len() as u32).to_be_bytes()).await?;
1963 send.write_all(&bytes).await?;
1964 Ok(())
1965}
1966
1967async fn read_json_frame<T: for<'de> Deserialize<'de>>(recv: &mut quinn::RecvStream) -> Result<T> {
1968 let mut len = [0; 4];
1969 recv.read_exact(&mut len).await?;
1970 let len = u32::from_be_bytes(len) as usize;
1971 if len > MAX_FRAME_LEN {
1972 return Err(Error::Protocol("frame exceeds maximum length".to_string()));
1973 }
1974
1975 let mut bytes = vec![0; len];
1976 recv.read_exact(&mut bytes).await?;
1977 serde_json::from_slice(&bytes).map_err(|error| Error::Protocol(error.to_string()))
1978}
1979
1980fn negotiate_direct_protocol(
1981 hello: &DirectProtocolHello,
1982 psk: Option<&str>,
1983) -> DirectProtocolResponse {
1984 if hello.min_protocol_version > NATIVE_PROTOCOL_VERSION {
1985 return DirectProtocolResponse::reject(format!(
1986 "peer requires protocol version {}, local max is {}",
1987 hello.min_protocol_version, NATIVE_PROTOCOL_VERSION
1988 ));
1989 }
1990
1991 if hello.protocol_version < MIN_NATIVE_PROTOCOL_VERSION {
1992 return DirectProtocolResponse::reject(format!(
1993 "peer max protocol version {} is below local minimum {}",
1994 hello.protocol_version, MIN_NATIVE_PROTOCOL_VERSION
1995 ));
1996 }
1997
1998 let selected_version = NATIVE_PROTOCOL_VERSION.min(hello.protocol_version);
1999 match (psk, hello.psk) {
2000 (Some(_), false) => DirectProtocolResponse::reject("receiver requires PSK authentication"),
2001 (Some(_), true) => {
2002 DirectProtocolResponse::accept_with_psk(selected_version, Uuid::new_v4().to_string())
2003 }
2004 (None, true) => DirectProtocolResponse::reject("receiver is not configured for PSK mode"),
2005 (None, false) => DirectProtocolResponse::accept(selected_version),
2006 }
2007}
2008
2009#[derive(Debug, Clone)]
2010struct ManifestSource {
2011 source_path: PathBuf,
2012 relative_path: PathBuf,
2013}
2014
2015fn collect_manifest_sources(paths: &[PathBuf]) -> Result<Vec<ManifestSource>> {
2016 let mut sources = Vec::new();
2017 for path in paths {
2018 let metadata = std::fs::symlink_metadata(path).map_err(|source| Error::IoPath {
2019 path: path.clone(),
2020 source,
2021 })?;
2022 if metadata.file_type().is_symlink() {
2023 return Err(Error::InvalidInput(format!(
2024 "{} is a symlink; symlink transfers are not supported",
2025 display_path(path)
2026 )));
2027 }
2028
2029 let root_name = path
2030 .file_name()
2031 .ok_or_else(|| Error::InvalidInput(format!("{} has no file name", display_path(path))))?
2032 .to_os_string();
2033 let relative_path = PathBuf::from(root_name);
2034
2035 if metadata.is_dir() {
2036 sources.push(ManifestSource {
2037 source_path: path.clone(),
2038 relative_path: relative_path.clone(),
2039 });
2040 collect_dir_sources(path, &relative_path, &mut sources)?;
2041 } else if metadata.is_file() {
2042 sources.push(ManifestSource {
2043 source_path: path.clone(),
2044 relative_path,
2045 });
2046 } else {
2047 return Err(Error::InvalidInput(format!(
2048 "{} is not a regular file or directory",
2049 display_path(path)
2050 )));
2051 }
2052 }
2053
2054 Ok(sources)
2055}
2056
2057fn collect_dir_sources(
2058 dir: &Path,
2059 relative_dir: &Path,
2060 sources: &mut Vec<ManifestSource>,
2061) -> Result<()> {
2062 for entry in std::fs::read_dir(dir).map_err(|source| Error::IoPath {
2063 path: dir.to_path_buf(),
2064 source,
2065 })? {
2066 let entry = entry.map_err(|source| Error::IoPath {
2067 path: dir.to_path_buf(),
2068 source,
2069 })?;
2070 let path = entry.path();
2071 let metadata = std::fs::symlink_metadata(&path).map_err(|source| Error::IoPath {
2072 path: path.clone(),
2073 source,
2074 })?;
2075 if metadata.file_type().is_symlink() {
2076 return Err(Error::InvalidInput(format!(
2077 "{} is a symlink; symlink transfers are not supported",
2078 display_path(&path)
2079 )));
2080 }
2081
2082 let relative_path = relative_dir.join(entry.file_name());
2083 if metadata.is_dir() {
2084 sources.push(ManifestSource {
2085 source_path: path.clone(),
2086 relative_path: relative_path.clone(),
2087 });
2088 collect_dir_sources(&path, &relative_path, sources)?;
2089 } else if metadata.is_file() {
2090 sources.push(ManifestSource {
2091 source_path: path,
2092 relative_path,
2093 });
2094 }
2095 }
2096
2097 Ok(())
2098}
2099
2100fn manifest_source_map(paths: &[PathBuf]) -> Result<BTreeMap<PathBuf, PathBuf>> {
2101 Ok(collect_manifest_sources(paths)?
2102 .into_iter()
2103 .filter_map(|source| {
2104 let metadata = std::fs::metadata(&source.source_path).ok()?;
2105 metadata
2106 .is_file()
2107 .then_some((source.relative_path, source.source_path))
2108 })
2109 .collect())
2110}
2111
2112async fn build_manifest_entry(source: ManifestSource) -> Result<ManifestEntry> {
2113 validate_relative_transfer_path(&source.relative_path)?;
2114 let metadata = tokio::fs::metadata(&source.source_path)
2115 .await
2116 .map_err(|source_error| Error::IoPath {
2117 path: source.source_path.clone(),
2118 source: source_error,
2119 })?;
2120 let unix_mode = unix_mode(&metadata);
2121 let modified_unix_ms = modified_unix_ms(&metadata);
2122
2123 if metadata.is_dir() {
2124 return Ok(ManifestEntry {
2125 relative_path: source.relative_path,
2126 kind: ManifestEntryKind::Directory,
2127 size: 0,
2128 blake3: None,
2129 chunks: Vec::new(),
2130 unix_mode,
2131 modified_unix_ms,
2132 });
2133 }
2134
2135 let (blake3, chunks) = hash_file_with_chunks(&source.source_path, DEFAULT_CHUNK_SIZE).await?;
2136 Ok(ManifestEntry {
2137 relative_path: source.relative_path,
2138 kind: ManifestEntryKind::File,
2139 size: metadata.len(),
2140 blake3: Some(blake3),
2141 chunks,
2142 unix_mode,
2143 modified_unix_ms,
2144 })
2145}
2146
2147fn validate_manifest(manifest: &TransferManifest) -> Result<()> {
2148 if manifest.version != MANIFEST_VERSION {
2149 return Err(Error::Protocol(format!(
2150 "unsupported manifest version {}",
2151 manifest.version
2152 )));
2153 }
2154 if manifest.chunk_size == 0 {
2155 return Err(Error::Protocol("manifest chunk size is zero".to_string()));
2156 }
2157
2158 let mut seen = BTreeSet::new();
2159 for entry in &manifest.entries {
2160 validate_relative_transfer_path(&entry.relative_path)?;
2161 if !seen.insert(entry.relative_path.clone()) {
2162 return Err(Error::Protocol(format!(
2163 "duplicate manifest path: {}",
2164 display_path(&entry.relative_path)
2165 )));
2166 }
2167 if entry.kind == ManifestEntryKind::File && entry.blake3.is_none() {
2168 return Err(Error::Protocol(format!(
2169 "file manifest entry is missing BLAKE3: {}",
2170 display_path(&entry.relative_path)
2171 )));
2172 }
2173 }
2174
2175 Ok(())
2176}
2177
2178fn validate_relative_transfer_path(path: &Path) -> Result<()> {
2179 if path.as_os_str().is_empty() || path.is_absolute() {
2180 return Err(Error::InvalidInput(format!(
2181 "unsafe transfer path: {}",
2182 display_path(path)
2183 )));
2184 }
2185
2186 let mut normal_components = 0usize;
2187 for component in path.components() {
2188 match component {
2189 Component::Normal(value) => {
2190 let Some(name) = value.to_str() else {
2191 return Err(Error::InvalidInput(format!(
2192 "transfer path must be UTF-8: {}",
2193 display_path(path)
2194 )));
2195 };
2196 validate_path_component(name, path)?;
2197 normal_components += 1;
2198 }
2199 Component::CurDir
2200 | Component::ParentDir
2201 | Component::RootDir
2202 | Component::Prefix(_) => {
2203 return Err(Error::InvalidInput(format!(
2204 "unsafe transfer path: {}",
2205 display_path(path)
2206 )));
2207 }
2208 }
2209 }
2210
2211 if normal_components == 0 {
2212 return Err(Error::InvalidInput(format!(
2213 "unsafe transfer path: {}",
2214 display_path(path)
2215 )));
2216 }
2217
2218 Ok(())
2219}
2220
2221fn validate_path_component(component: &str, full_path: &Path) -> Result<()> {
2222 if component.is_empty()
2223 || component.contains('\\')
2224 || component.contains('/')
2225 || component.contains(':')
2226 || component.ends_with(' ')
2227 || component.ends_with('.')
2228 || is_windows_reserved_name(component)
2229 {
2230 return Err(Error::InvalidInput(format!(
2231 "unsafe transfer path: {}",
2232 display_path(full_path)
2233 )));
2234 }
2235
2236 Ok(())
2237}
2238
2239fn is_windows_reserved_name(component: &str) -> bool {
2240 let stem = component
2241 .split('.')
2242 .next()
2243 .unwrap_or(component)
2244 .trim_end_matches([' ', '.'])
2245 .to_ascii_uppercase();
2246 matches!(
2247 stem.as_str(),
2248 "CON"
2249 | "PRN"
2250 | "AUX"
2251 | "NUL"
2252 | "COM1"
2253 | "COM2"
2254 | "COM3"
2255 | "COM4"
2256 | "COM5"
2257 | "COM6"
2258 | "COM7"
2259 | "COM8"
2260 | "COM9"
2261 | "LPT1"
2262 | "LPT2"
2263 | "LPT3"
2264 | "LPT4"
2265 | "LPT5"
2266 | "LPT6"
2267 | "LPT7"
2268 | "LPT8"
2269 | "LPT9"
2270 )
2271}
2272
2273async fn hash_file(path: &Path) -> Result<String> {
2274 hash_file_with_chunks(path, DEFAULT_CHUNK_SIZE)
2275 .await
2276 .map(|(hash, _)| hash)
2277}
2278
2279async fn hash_file_with_chunks(path: &Path, chunk_size: u64) -> Result<(String, Vec<String>)> {
2280 let mut file = tokio::fs::File::open(path)
2281 .await
2282 .map_err(|source| Error::IoPath {
2283 path: path.to_path_buf(),
2284 source,
2285 })?;
2286 let mut hasher = blake3::Hasher::new();
2287 let mut chunk_hasher = blake3::Hasher::new();
2288 let mut chunk_bytes = 0u64;
2289 let mut chunks = Vec::new();
2290 let mut buffer = vec![0; IO_BUFFER_SIZE];
2291
2292 loop {
2293 let read = file
2294 .read(&mut buffer)
2295 .await
2296 .map_err(|source| Error::IoPath {
2297 path: path.to_path_buf(),
2298 source,
2299 })?;
2300 if read == 0 {
2301 break;
2302 }
2303 hasher.update(&buffer[..read]);
2304
2305 let mut remaining = &buffer[..read];
2306 while !remaining.is_empty() {
2307 let take = remaining.len().min((chunk_size - chunk_bytes) as usize);
2308 chunk_hasher.update(&remaining[..take]);
2309 chunk_bytes += take as u64;
2310 remaining = &remaining[take..];
2311 if chunk_bytes == chunk_size {
2312 chunks.push(chunk_hasher.finalize().to_hex().to_string());
2313 chunk_hasher = blake3::Hasher::new();
2314 chunk_bytes = 0;
2315 }
2316 }
2317 }
2318
2319 if chunk_bytes > 0 {
2320 chunks.push(chunk_hasher.finalize().to_hex().to_string());
2321 }
2322
2323 Ok((hasher.finalize().to_hex().to_string(), chunks))
2324}
2325
2326async fn verified_prefix_offset(
2327 path: &Path,
2328 entry: &ManifestEntry,
2329 existing_len: u64,
2330 chunk_size: u64,
2331) -> Result<u64> {
2332 if existing_len == 0 || entry.chunks.is_empty() {
2333 return Ok(0);
2334 }
2335
2336 let chunks_to_check = (existing_len / chunk_size).min(entry.chunks.len() as u64) as usize;
2337 if chunks_to_check == 0 {
2338 return Ok(0);
2339 }
2340
2341 let mut file = tokio::fs::File::open(path)
2342 .await
2343 .map_err(|source| Error::IoPath {
2344 path: path.to_path_buf(),
2345 source,
2346 })?;
2347 let mut buffer = vec![0; IO_BUFFER_SIZE];
2348 let mut verified_chunks = 0usize;
2349
2350 for expected in entry.chunks.iter().take(chunks_to_check) {
2351 let mut remaining = chunk_size;
2352 let mut hasher = blake3::Hasher::new();
2353 while remaining > 0 {
2354 let read_size = buffer.len().min(remaining as usize);
2355 file.read_exact(&mut buffer[..read_size])
2356 .await
2357 .map_err(|source| Error::IoPath {
2358 path: path.to_path_buf(),
2359 source,
2360 })?;
2361 hasher.update(&buffer[..read_size]);
2362 remaining -= read_size as u64;
2363 }
2364
2365 if hasher.finalize().to_hex().as_str() != expected {
2366 break;
2367 }
2368 verified_chunks += 1;
2369 }
2370
2371 Ok(verified_chunks as u64 * chunk_size)
2372}
2373
2374fn summary_from_files(
2375 manifest: &TransferManifest,
2376 files: Vec<TransferFileSummary>,
2377) -> Result<TransferSummary> {
2378 let bytes = manifest.total_file_bytes();
2379 let blake3 = if files.len() == 1 {
2380 files[0].blake3.clone()
2381 } else {
2382 manifest.digest()?
2383 };
2384 let file_name = if files.len() == 1 {
2385 display_path(&files[0].relative_path)
2386 } else {
2387 format!("{} files", files.len())
2388 };
2389 let path = files
2390 .first()
2391 .map(|file| file.path.clone())
2392 .unwrap_or_default();
2393
2394 Ok(TransferSummary {
2395 path,
2396 file_name,
2397 bytes,
2398 blake3,
2399 files,
2400 })
2401}
2402
2403fn temp_path_for(final_path: &Path) -> PathBuf {
2404 let file_name = final_path
2405 .file_name()
2406 .and_then(|name| name.to_str())
2407 .unwrap_or("transfer");
2408 final_path.with_file_name(format!(".{file_name}.{}.ferry-part", Uuid::new_v4()))
2409}
2410
2411fn modified_unix_ms(metadata: &Metadata) -> Option<i128> {
2412 system_time_unix_ms(metadata.modified().ok()?)
2413}
2414
2415fn system_time_unix_ms(time: SystemTime) -> Option<i128> {
2416 match time.duration_since(UNIX_EPOCH) {
2417 Ok(duration) => Some(duration.as_millis() as i128),
2418 Err(error) => Some(-(error.duration().as_millis() as i128)),
2419 }
2420}
2421
2422#[cfg(unix)]
2423fn unix_mode(metadata: &Metadata) -> Option<u32> {
2424 use std::os::unix::fs::PermissionsExt;
2425
2426 Some(metadata.permissions().mode())
2427}
2428
2429#[cfg(not(unix))]
2430fn unix_mode(_metadata: &Metadata) -> Option<u32> {
2431 None
2432}
2433
2434fn client_config_capturing_server_fingerprint()
2435-> Result<(quinn::ClientConfig, Arc<Mutex<Option<String>>>)> {
2436 let server_fingerprint = Arc::new(Mutex::new(None));
2437 let crypto = rustls::ClientConfig::builder()
2438 .dangerous()
2439 .with_custom_certificate_verifier(ServerFingerprintVerifier::new(
2440 server_fingerprint.clone(),
2441 ))
2442 .with_no_client_auth();
2443
2444 let config = quinn::ClientConfig::new(Arc::new(
2445 QuicClientConfig::try_from(crypto)
2446 .map_err(|error| Error::CryptoConfig(error.to_string()))?,
2447 ));
2448 Ok((config, server_fingerprint))
2449}
2450
2451fn captured_server_fingerprint(fingerprint: &Arc<Mutex<Option<String>>>) -> Result<String> {
2452 fingerprint
2453 .lock()
2454 .map_err(|_| Error::CryptoConfig("server fingerprint capture lock poisoned".to_string()))?
2455 .clone()
2456 .ok_or_else(|| {
2457 Error::CryptoConfig("server certificate fingerprint was not captured".to_string())
2458 })
2459}
2460
2461#[derive(Debug)]
2462struct ServerFingerprintVerifier {
2463 provider: Arc<CryptoProvider>,
2464 server_fingerprint: Arc<Mutex<Option<String>>>,
2465}
2466
2467impl ServerFingerprintVerifier {
2468 fn new(server_fingerprint: Arc<Mutex<Option<String>>>) -> Arc<Self> {
2469 Arc::new(Self {
2470 provider: Arc::new(rustls::crypto::ring::default_provider()),
2471 server_fingerprint,
2472 })
2473 }
2474}
2475
2476impl ServerCertVerifier for ServerFingerprintVerifier {
2477 fn verify_server_cert(
2478 &self,
2479 end_entity: &CertificateDer<'_>,
2480 _intermediates: &[CertificateDer<'_>],
2481 _server_name: &ServerName<'_>,
2482 _ocsp: &[u8],
2483 _now: UnixTime,
2484 ) -> std::result::Result<ServerCertVerified, rustls::Error> {
2485 let fingerprint = blake3::hash(end_entity.as_ref()).to_hex().to_string();
2486 *self.server_fingerprint.lock().map_err(|_| {
2487 rustls::Error::General("server fingerprint capture lock poisoned".to_string())
2488 })? = Some(fingerprint);
2489 Ok(ServerCertVerified::assertion())
2490 }
2491
2492 fn verify_tls12_signature(
2493 &self,
2494 message: &[u8],
2495 cert: &CertificateDer<'_>,
2496 dss: &DigitallySignedStruct,
2497 ) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
2498 verify_tls12_signature(
2499 message,
2500 cert,
2501 dss,
2502 &self.provider.signature_verification_algorithms,
2503 )
2504 }
2505
2506 fn verify_tls13_signature(
2507 &self,
2508 message: &[u8],
2509 cert: &CertificateDer<'_>,
2510 dss: &DigitallySignedStruct,
2511 ) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
2512 verify_tls13_signature(
2513 message,
2514 cert,
2515 dss,
2516 &self.provider.signature_verification_algorithms,
2517 )
2518 }
2519
2520 fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
2521 self.provider
2522 .signature_verification_algorithms
2523 .supported_schemes()
2524 }
2525}
2526
2527fn ensure_rustls_provider() {
2528 let _ = rustls::crypto::ring::default_provider().install_default();
2529}
2530
2531pub fn config_dir() -> Result<PathBuf> {
2532 let base_dirs = BaseDirs::new().ok_or(Error::ConfigDirUnavailable)?;
2533 Ok(base_dirs.config_dir().join(CONFIG_DIR_NAME))
2534}
2535
2536fn default_alias() -> String {
2537 std::env::var("HOSTNAME")
2538 .or_else(|_| std::env::var("COMPUTERNAME"))
2539 .ok()
2540 .filter(|value| !value.trim().is_empty())
2541 .unwrap_or_else(|| "fileferry-device".to_string())
2542}
2543
2544fn expand_home_path(path: &str) -> Result<PathBuf> {
2545 if path == "~" {
2546 return Ok(BaseDirs::new()
2547 .ok_or(Error::ConfigDirUnavailable)?
2548 .home_dir()
2549 .to_path_buf());
2550 }
2551
2552 if let Some(rest) = path.strip_prefix("~/") {
2553 return Ok(BaseDirs::new()
2554 .ok_or(Error::ConfigDirUnavailable)?
2555 .home_dir()
2556 .join(rest));
2557 }
2558
2559 Ok(PathBuf::from(path))
2560}
2561
2562fn normalized_fingerprint(fingerprint: String) -> Result<String> {
2563 let fingerprint = fingerprint.trim().to_ascii_lowercase();
2564 if fingerprint.len() != 64 || !fingerprint.chars().all(|char| char.is_ascii_hexdigit()) {
2565 return Err(Error::InvalidInput(
2566 "peer fingerprint must be a full 64-character hex BLAKE3 fingerprint".to_string(),
2567 ));
2568 }
2569
2570 Ok(fingerprint)
2571}
2572
2573fn validate_psk(psk: String) -> Result<String> {
2574 if psk.is_empty() {
2575 return Err(Error::InvalidInput("PSK must not be empty".to_string()));
2576 }
2577
2578 Ok(psk)
2579}
2580
2581fn psk_proof(
2582 psk: &str,
2583 challenge: &str,
2584 client_fingerprint: &str,
2585 protocol_version: u16,
2586) -> Result<String> {
2587 validate_psk(psk.to_string())?;
2588 let key = blake3::hash(psk.as_bytes());
2589 let mut message = Vec::new();
2590 message.extend_from_slice(PSK_PROOF_CONTEXT);
2591 message.push(0);
2592 message.extend_from_slice(challenge.as_bytes());
2593 message.push(0);
2594 message.extend_from_slice(client_fingerprint.as_bytes());
2595 message.push(0);
2596 message.extend_from_slice(protocol_version.to_string().as_bytes());
2597 Ok(blake3::keyed_hash(key.as_bytes(), &message)
2598 .to_hex()
2599 .to_string())
2600}
2601
2602fn write_toml_file<T: Serialize>(path: &Path, value: &T) -> Result<()> {
2603 let bytes =
2604 toml::to_string_pretty(value).map_err(|error| Error::ConfigSerialize(error.to_string()))?;
2605 if let Some(parent) = path.parent() {
2606 std::fs::create_dir_all(parent).map_err(|source| Error::IoPath {
2607 path: parent.to_path_buf(),
2608 source,
2609 })?;
2610 }
2611
2612 let temp_path = path.with_extension(format!("tmp-{}", Uuid::new_v4()));
2613 std::fs::write(&temp_path, bytes).map_err(|source| Error::IoPath {
2614 path: temp_path.clone(),
2615 source,
2616 })?;
2617 std::fs::rename(&temp_path, path).map_err(|source| Error::IoPath {
2618 path: path.to_path_buf(),
2619 source,
2620 })?;
2621 Ok(())
2622}
2623
2624fn write_private_file(path: &Path, bytes: &[u8]) -> Result<()> {
2625 std::fs::write(path, bytes).map_err(|source| Error::IoPath {
2626 path: path.to_path_buf(),
2627 source,
2628 })?;
2629
2630 #[cfg(unix)]
2631 {
2632 use std::os::unix::fs::PermissionsExt;
2633
2634 let mut permissions = std::fs::metadata(path)
2635 .map_err(|source| Error::IoPath {
2636 path: path.to_path_buf(),
2637 source,
2638 })?
2639 .permissions();
2640 permissions.set_mode(0o600);
2641 std::fs::set_permissions(path, permissions).map_err(|source| Error::IoPath {
2642 path: path.to_path_buf(),
2643 source,
2644 })?;
2645 }
2646
2647 Ok(())
2648}
2649
2650#[cfg(test)]
2651mod tests {
2652 use super::*;
2653 use std::sync::{Arc, Mutex};
2654
2655 #[test]
2656 fn direct_peer_parses_socket_address() {
2657 let peer = DirectPeer::parse("127.0.0.1:53318").expect("address parses");
2658
2659 assert_eq!(peer.address().to_string(), "127.0.0.1:53318");
2660 assert_eq!(peer.expected_fingerprint(), None);
2661 }
2662
2663 #[test]
2664 fn direct_peer_accepts_expected_fingerprint() {
2665 let fingerprint = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
2666 let peer = DirectPeer::parse("127.0.0.1:53318")
2667 .expect("address parses")
2668 .with_expected_fingerprint(fingerprint)
2669 .expect("fingerprint parses");
2670
2671 assert_eq!(peer.expected_fingerprint(), Some(fingerprint));
2672 }
2673
2674 #[test]
2675 fn direct_peer_rejects_missing_port() {
2676 assert!(DirectPeer::parse("127.0.0.1").is_err());
2677 }
2678
2679 #[test]
2680 fn send_request_requires_at_least_one_path() {
2681 let peer = DirectPeer::parse("127.0.0.1:53318").expect("address parses");
2682
2683 assert!(SendRequest::new(peer, Vec::new()).is_err());
2684 }
2685
2686 #[test]
2687 fn psk_secret_is_redacted_in_debug_and_serialization() {
2688 let secret = PskSecret::new("do-not-print").expect("psk accepted");
2689
2690 assert_eq!(format!("{secret:?}"), "PskSecret(<redacted>)");
2691 assert_eq!(
2692 serde_json::to_string(&secret).expect("secret serializes"),
2693 r#""<redacted>""#
2694 );
2695
2696 let peer = DirectPeer::parse("127.0.0.1:53318").expect("address parses");
2697 let request = SendRequest::new(peer, vec![PathBuf::from("file.txt")])
2698 .expect("send request")
2699 .with_psk("do-not-print")
2700 .expect("psk accepted");
2701 assert!(!format!("{request:?}").contains("do-not-print"));
2702 }
2703
2704 #[test]
2705 fn validate_send_paths_rejects_empty_slice() {
2706 assert!(validate_send_paths(&[]).is_err());
2707 }
2708
2709 #[tokio::test]
2710 async fn manifest_walks_directory_and_serializes_entries() {
2711 let temp = tempfile::tempdir().expect("tempdir");
2712 let root = temp.path().join("bundle");
2713 let nested = root.join("nested");
2714 tokio::fs::create_dir_all(&nested).await.expect("dirs");
2715 tokio::fs::write(root.join("a.txt"), b"alpha")
2716 .await
2717 .expect("file a");
2718 tokio::fs::write(nested.join("b.txt"), b"beta")
2719 .await
2720 .expect("file b");
2721
2722 let manifest = build_manifest(&[root]).await.expect("manifest");
2723 let json = serde_json::to_string(&manifest).expect("manifest serializes");
2724 let decoded: TransferManifest = serde_json::from_str(&json).expect("manifest decodes");
2725
2726 assert_eq!(decoded.version, MANIFEST_VERSION);
2727 assert!(decoded.entries.iter().any(|entry| {
2728 entry.kind == ManifestEntryKind::Directory
2729 && entry.relative_path == Path::new("bundle/nested")
2730 }));
2731 assert!(decoded.entries.iter().any(|entry| {
2732 entry.kind == ManifestEntryKind::File
2733 && entry.relative_path == Path::new("bundle/nested/b.txt")
2734 && entry.size == 4
2735 && entry.blake3.is_some()
2736 }));
2737 }
2738
2739 #[test]
2740 fn safe_destination_path_rejects_unsafe_paths() {
2741 let dest = Path::new("/tmp/ferry-dest");
2742
2743 assert!(safe_destination_path(dest, Path::new("../escape.txt")).is_err());
2744 assert!(safe_destination_path(dest, Path::new("/absolute.txt")).is_err());
2745 assert!(safe_destination_path(dest, Path::new("nested/CON")).is_err());
2746 assert!(safe_destination_path(dest, Path::new("nested\\escape.txt")).is_err());
2747 assert!(safe_destination_path(dest, Path::new("nested/ok.txt")).is_ok());
2748 }
2749
2750 #[test]
2751 fn safe_destination_path_rejects_generated_escape_and_reserved_patterns() {
2752 let dest = Path::new("/tmp/ferry-dest");
2753 let unsafe_components = [
2754 "..",
2755 ".",
2756 "CON",
2757 "con.txt",
2758 "NUL",
2759 "COM1",
2760 "LPT9",
2761 "trailingspace ",
2762 "trailingdot.",
2763 "has:colon",
2764 "has\\backslash",
2765 ];
2766
2767 for component in unsafe_components {
2768 assert!(
2769 safe_destination_path(dest, Path::new(component)).is_err(),
2770 "{component} should be rejected as a top-level path"
2771 );
2772 if component != "." {
2773 assert!(
2774 safe_destination_path(dest, Path::new("safe").join(component).as_path())
2775 .is_err(),
2776 "{component} should be rejected as a nested path"
2777 );
2778 }
2779 }
2780
2781 for component in ["alpha", "alpha-1", "alpha_1", "report.final.txt"] {
2782 let path = safe_destination_path(dest, Path::new("safe").join(component).as_path())
2783 .expect("safe generated path accepted");
2784 assert!(path.starts_with(dest));
2785 }
2786 }
2787
2788 #[tokio::test]
2789 async fn resume_decision_skips_existing_matching_file() {
2790 let temp = tempfile::tempdir().expect("tempdir");
2791 let source = temp.path().join("source.txt");
2792 let dest = temp.path().join("dest");
2793 tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2794 tokio::fs::write(&source, b"same contents")
2795 .await
2796 .expect("source");
2797 tokio::fs::write(dest.join("source.txt"), b"same contents")
2798 .await
2799 .expect("dest file");
2800 let manifest = build_manifest(&[source]).await.expect("manifest");
2801 let entry = manifest.file_entries().next().expect("file entry");
2802
2803 let decision = decide_resume(&dest, entry, manifest.chunk_size)
2804 .await
2805 .expect("decision");
2806
2807 assert_eq!(decision, ResumeDecision::Skip);
2808 }
2809
2810 #[tokio::test]
2811 async fn resume_decision_returns_verified_chunk_offset() {
2812 let temp = tempfile::tempdir().expect("tempdir");
2813 let source = temp.path().join("large.bin");
2814 let dest = temp.path().join("dest");
2815 tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2816 let bytes = vec![42; (DEFAULT_CHUNK_SIZE as usize * 2) + 128];
2817 tokio::fs::write(&source, &bytes).await.expect("source");
2818 tokio::fs::write(
2819 dest.join("large.bin"),
2820 &bytes[..DEFAULT_CHUNK_SIZE as usize + 99],
2821 )
2822 .await
2823 .expect("partial");
2824 let manifest = build_manifest(&[source]).await.expect("manifest");
2825 let entry = manifest.file_entries().next().expect("file entry");
2826
2827 let decision = decide_resume(&dest, entry, manifest.chunk_size)
2828 .await
2829 .expect("decision");
2830
2831 assert_eq!(decision, ResumeDecision::ResumeFrom(DEFAULT_CHUNK_SIZE));
2832 }
2833
2834 #[tokio::test]
2835 async fn resume_decision_rejects_existing_file_with_mismatched_contents() {
2836 let temp = tempfile::tempdir().expect("tempdir");
2837 let source = temp.path().join("source.txt");
2838 let dest = temp.path().join("dest");
2839 tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2840 tokio::fs::write(&source, b"expected contents")
2841 .await
2842 .expect("source");
2843 tokio::fs::write(dest.join("source.txt"), b"different contents")
2844 .await
2845 .expect("conflicting dest file");
2846 let manifest = build_manifest(&[source]).await.expect("manifest");
2847 let entry = manifest.file_entries().next().expect("file entry");
2848
2849 let decision = decide_resume(&dest, entry, manifest.chunk_size)
2850 .await
2851 .expect("decision");
2852
2853 assert!(matches!(decision, ResumeDecision::Conflict(_)));
2854 }
2855
2856 #[tokio::test]
2857 async fn resume_decision_rejects_existing_file_larger_than_manifest_entry() {
2858 let temp = tempfile::tempdir().expect("tempdir");
2859 let source = temp.path().join("source.txt");
2860 let dest = temp.path().join("dest");
2861 tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2862 tokio::fs::write(&source, b"small").await.expect("source");
2863 tokio::fs::write(dest.join("source.txt"), b"larger destination")
2864 .await
2865 .expect("larger dest file");
2866 let manifest = build_manifest(&[source]).await.expect("manifest");
2867 let entry = manifest.file_entries().next().expect("file entry");
2868
2869 let decision = decide_resume(&dest, entry, manifest.chunk_size)
2870 .await
2871 .expect("decision");
2872
2873 assert!(matches!(decision, ResumeDecision::Conflict(_)));
2874 }
2875
2876 #[test]
2877 fn identity_is_loaded_after_generation() {
2878 let temp = tempfile::tempdir().expect("tempdir");
2879 let generated =
2880 NativeIdentity::load_or_generate_in(temp.path()).expect("identity generated");
2881 let loaded = NativeIdentity::load_or_generate_in(temp.path()).expect("identity loaded");
2882
2883 assert_eq!(generated.fingerprint(), loaded.fingerprint());
2884 }
2885
2886 #[test]
2887 fn app_config_round_trips_toml() {
2888 let temp = tempfile::tempdir().expect("tempdir");
2889 let path = temp.path().join("config.toml");
2890 let config = AppConfig {
2891 alias: "desk".to_string(),
2892 ..AppConfig::default()
2893 };
2894
2895 config.save_to_path(&path).expect("config saved");
2896 let loaded = AppConfig::load_or_default_from(&path).expect("config loaded");
2897
2898 assert_eq!(loaded.alias, "desk");
2899 assert_eq!(loaded.listen_port, 53317);
2900 assert!(loaded.trust.require_fingerprint);
2901 }
2902
2903 #[test]
2904 fn app_config_redacts_psk_for_display_output() {
2905 let config = AppConfig {
2906 trust: TrustConfig {
2907 psk: "super-secret-psk".to_string(),
2908 ..TrustConfig::default()
2909 },
2910 ..AppConfig::default()
2911 };
2912
2913 let redacted = config.to_redacted_toml_string().expect("config serializes");
2914
2915 assert!(!redacted.contains("super-secret-psk"));
2916 assert!(redacted.contains(r#"psk = "<redacted>""#));
2917 }
2918
2919 #[test]
2920 fn daemon_config_defaults_from_app_config_and_round_trips_toml() {
2921 let temp = tempfile::tempdir().expect("tempdir");
2922 let path = temp.path().join("daemon.toml");
2923 let app_config = AppConfig {
2924 quic_port: 54444,
2925 download_dir: temp.path().join("downloads").display().to_string(),
2926 ..AppConfig::default()
2927 };
2928
2929 let defaulted =
2930 DaemonConfig::load_or_default_from(&path, &app_config).expect("daemon config");
2931 assert_eq!(
2932 defaulted.listen,
2933 SocketAddr::from(([0, 0, 0, 0], app_config.quic_port))
2934 );
2935 assert_eq!(defaulted.destination, temp.path().join("downloads"));
2936
2937 let config = DaemonConfig {
2938 listen: SocketAddr::from(([127, 0, 0, 1], 53318)),
2939 destination: temp.path().join("daemon-dest"),
2940 };
2941 config.save_to_path(&path).expect("daemon config saved");
2942 let loaded =
2943 DaemonConfig::load_or_default_from(&path, &app_config).expect("daemon config loaded");
2944
2945 assert_eq!(loaded, config);
2946 }
2947
2948 #[test]
2949 fn trust_store_load_save_and_forget_round_trip() {
2950 let temp = tempfile::tempdir().expect("tempdir");
2951 let path = temp.path().join("known_peers.toml");
2952 let fingerprint = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
2953 let mut store = TrustStore::default();
2954
2955 store
2956 .trust_fingerprint(fingerprint)
2957 .expect("fingerprint trusted");
2958 store.save_to_path(&path).expect("trust store saved");
2959 let mut loaded = TrustStore::load_or_default_from(&path).expect("trust store loaded");
2960
2961 assert_eq!(loaded.records().count(), 1);
2962 assert_eq!(loaded.peers[fingerprint].trust_state, TrustState::Trusted);
2963 assert!(loaded.forget(fingerprint));
2964 assert_eq!(loaded.records().count(), 0);
2965 }
2966
2967 #[test]
2968 fn trust_store_rejects_short_fingerprint_without_discovery_lookup() {
2969 let mut store = TrustStore::default();
2970
2971 assert!(store.trust_fingerprint("9a4f2c").is_err());
2972 }
2973
2974 #[test]
2975 fn transfer_events_serialize_with_stable_event_names() {
2976 let event = TransferEvent::Progress {
2977 direction: TransferDirection::Send,
2978 file_name: "bundle/a.txt".to_string(),
2979 bytes_done: 5,
2980 bytes_total: 9,
2981 };
2982
2983 let json = serde_json::to_value(&event).expect("event serializes");
2984
2985 assert_eq!(json["event"], "progress");
2986 assert_eq!(json["direction"], "send");
2987 assert_eq!(json["file_name"], "bundle/a.txt");
2988 assert_eq!(json["bytes_done"], 5);
2989 assert_eq!(json["bytes_total"], 9);
2990 }
2991
2992 #[test]
2993 fn direct_protocol_hello_has_stable_golden_json() {
2994 let hello = DirectProtocolHello {
2995 protocol_version: 1,
2996 min_protocol_version: 1,
2997 client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2998 .to_string(),
2999 psk: false,
3000 };
3001
3002 let json = serde_json::to_string(&hello).expect("hello serializes");
3003
3004 assert_eq!(
3005 json,
3006 r#"{"protocol_version":1,"min_protocol_version":1,"client_fingerprint":"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef","psk":false}"#
3007 );
3008 }
3009
3010 #[test]
3011 fn direct_protocol_response_has_stable_golden_json() {
3012 let response = DirectProtocolResponse::accept(1);
3013
3014 let json = serde_json::to_string(&response).expect("response serializes");
3015
3016 assert_eq!(
3017 json,
3018 r#"{"accepted":true,"protocol_version":1,"psk_challenge":null,"error":null}"#
3019 );
3020 }
3021
3022 #[test]
3023 fn direct_protocol_decodes_pre_psk_negotiation_frames() {
3024 let hello: DirectProtocolHello = serde_json::from_str(
3025 r#"{"protocol_version":1,"min_protocol_version":1,"client_fingerprint":"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"}"#,
3026 )
3027 .expect("old hello decodes");
3028 assert!(!hello.psk);
3029
3030 let response: DirectProtocolResponse =
3031 serde_json::from_str(r#"{"accepted":true,"protocol_version":1,"error":null}"#)
3032 .expect("old response decodes");
3033 assert_eq!(response, DirectProtocolResponse::accept(1));
3034 }
3035
3036 #[test]
3037 fn direct_protocol_negotiates_supported_overlap() {
3038 let hello = DirectProtocolHello {
3039 protocol_version: 3,
3040 min_protocol_version: 1,
3041 client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3042 .to_string(),
3043 psk: false,
3044 };
3045
3046 let response = negotiate_direct_protocol(&hello, None);
3047
3048 assert_eq!(response, DirectProtocolResponse::accept(1));
3049 }
3050
3051 #[test]
3052 fn direct_protocol_rejects_incompatible_versions() {
3053 let too_new = DirectProtocolHello {
3054 protocol_version: 3,
3055 min_protocol_version: 2,
3056 client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3057 .to_string(),
3058 psk: false,
3059 };
3060 let too_old = DirectProtocolHello {
3061 protocol_version: 0,
3062 min_protocol_version: 0,
3063 client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3064 .to_string(),
3065 psk: false,
3066 };
3067
3068 assert!(!negotiate_direct_protocol(&too_new, None).accepted);
3069 assert!(!negotiate_direct_protocol(&too_old, None).accepted);
3070 }
3071
3072 #[test]
3073 fn direct_protocol_requires_matching_psk_mode() {
3074 let no_psk = DirectProtocolHello {
3075 protocol_version: 1,
3076 min_protocol_version: 1,
3077 client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3078 .to_string(),
3079 psk: false,
3080 };
3081 let with_psk = DirectProtocolHello {
3082 psk: true,
3083 ..no_psk.clone()
3084 };
3085
3086 assert!(!negotiate_direct_protocol(&no_psk, Some("secret")).accepted);
3087 assert!(!negotiate_direct_protocol(&with_psk, None).accepted);
3088
3089 let accepted = negotiate_direct_protocol(&with_psk, Some("secret"));
3090 assert!(accepted.accepted);
3091 assert!(accepted.psk_challenge.is_some());
3092 }
3093
3094 #[test]
3095 fn transfer_manifest_has_stable_golden_json() {
3096 let manifest = TransferManifest {
3097 version: MANIFEST_VERSION,
3098 session_id: Uuid::parse_str("00000000-0000-0000-0000-000000000001")
3099 .expect("uuid parses"),
3100 chunk_size: DEFAULT_CHUNK_SIZE,
3101 entries: vec![ManifestEntry {
3102 relative_path: PathBuf::from("bundle/a.txt"),
3103 kind: ManifestEntryKind::File,
3104 size: 5,
3105 blake3: Some(
3106 "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef".to_string(),
3107 ),
3108 chunks: vec![
3109 "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789".to_string(),
3110 ],
3111 unix_mode: Some(0o100644),
3112 modified_unix_ms: Some(1_700_000_000_000),
3113 }],
3114 };
3115
3116 let json = serde_json::to_string(&manifest).expect("manifest serializes");
3117
3118 assert_eq!(
3119 json,
3120 r#"{"version":1,"session_id":"00000000-0000-0000-0000-000000000001","chunk_size":1048576,"entries":[{"relative_path":"bundle/a.txt","kind":"file","size":5,"blake3":"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef","chunks":["abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789"],"unix_mode":33188,"modified_unix_ms":1700000000000}]}"#
3121 );
3122 }
3123
3124 #[test]
3125 fn peer_registry_merges_observations_by_fingerprint() {
3126 let mut registry = PeerRegistry::new();
3127 let fingerprint = "abcdef1234567890";
3128 let first = PeerObservation::new(
3129 fingerprint,
3130 SocketAddr::from(([192, 168, 1, 10], 53318)),
3131 100,
3132 )
3133 .with_alias("desk")
3134 .with_hostname("desk.local")
3135 .with_transport("quic");
3136 let second = PeerObservation::new(
3137 fingerprint,
3138 SocketAddr::from(([192, 168, 1, 11], 53318)),
3139 200,
3140 )
3141 .with_alias("workstation")
3142 .with_transport("quic");
3143
3144 registry.observe(first).expect("first observation");
3145 registry.observe(second).expect("second observation");
3146 let record = registry
3147 .get_by_fingerprint(fingerprint)
3148 .expect("record by fingerprint");
3149
3150 assert_eq!(registry.records().count(), 1);
3151 assert!(record.aliases.contains("desk"));
3152 assert!(record.aliases.contains("workstation"));
3153 assert!(record.hostnames.contains("desk.local"));
3154 assert_eq!(record.addresses.len(), 2);
3155 assert_eq!(record.first_seen_unix_ms, 100);
3156 assert_eq!(record.last_seen_unix_ms, 200);
3157 }
3158
3159 #[test]
3160 fn peer_registry_looks_up_unique_fingerprint_prefix_alias_and_hostname() {
3161 let mut registry = PeerRegistry::new();
3162 registry
3163 .observe(
3164 PeerObservation::new(
3165 "abcdef1234567890",
3166 SocketAddr::from(([192, 168, 1, 10], 53318)),
3167 100,
3168 )
3169 .with_alias("desk")
3170 .with_hostname("desk.local"),
3171 )
3172 .expect("first observation");
3173 registry
3174 .observe(
3175 PeerObservation::new(
3176 "123456abcdef7890",
3177 SocketAddr::from(([192, 168, 1, 20], 53318)),
3178 100,
3179 )
3180 .with_alias("laptop"),
3181 )
3182 .expect("second observation");
3183
3184 assert_eq!(
3185 registry
3186 .lookup("abcdef")
3187 .map(|record| record.fingerprint.as_str()),
3188 Some("abcdef1234567890")
3189 );
3190 assert_eq!(
3191 registry
3192 .lookup("desk")
3193 .map(|record| record.fingerprint.as_str()),
3194 Some("abcdef1234567890")
3195 );
3196 assert_eq!(
3197 registry
3198 .lookup("desk.local")
3199 .map(|record| record.fingerprint.as_str()),
3200 Some("abcdef1234567890")
3201 );
3202 assert!(registry.lookup("missing").is_none());
3203 }
3204
3205 #[test]
3206 fn peer_registry_rejects_ambiguous_lookup_hints() {
3207 let mut registry = PeerRegistry::new();
3208 registry
3209 .observe(
3210 PeerObservation::new(
3211 "abcdef1234567890",
3212 SocketAddr::from(([192, 168, 1, 10], 53318)),
3213 100,
3214 )
3215 .with_alias("desk"),
3216 )
3217 .expect("first observation");
3218 registry
3219 .observe(
3220 PeerObservation::new(
3221 "abcdef9999999999",
3222 SocketAddr::from(([192, 168, 1, 11], 53318)),
3223 100,
3224 )
3225 .with_alias("desk"),
3226 )
3227 .expect("second observation");
3228
3229 assert!(registry.lookup("abcdef").is_none());
3230 assert!(registry.lookup("desk").is_none());
3231 assert_eq!(
3232 registry
3233 .lookup("abcdef1234567890")
3234 .map(|record| record.fingerprint.as_str()),
3235 Some("abcdef1234567890")
3236 );
3237 match registry.lookup_detail("desk") {
3238 PeerLookup::Ambiguous(records) => assert_eq!(records.len(), 2),
3239 other => panic!("expected ambiguous duplicate-alias lookup, got {other:?}"),
3240 }
3241 assert!(matches!(
3242 registry.lookup_detail("missing"),
3243 PeerLookup::Missing
3244 ));
3245 }
3246
3247 #[test]
3248 fn native_announcement_builds_expected_txt_properties() {
3249 let announcement = NativeAnnouncement {
3250 alias: "Stephen MBP".to_string(),
3251 fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3252 .to_string(),
3253 quic_port: 53318,
3254 listen_port: Some(53317),
3255 };
3256
3257 let service = announcement.service_info().expect("service info");
3258
3259 assert_eq!(service.get_type(), NATIVE_SERVICE_TYPE);
3260 assert!(
3261 service
3262 .get_fullname()
3263 .starts_with("stephen-mbp-0123456789ab.")
3264 );
3265 assert_eq!(service.get_property_val_str("pv"), Some("1"));
3266 assert_eq!(service.get_property_val_str("minpv"), Some("1"));
3267 assert_eq!(
3268 service.get_property_val_str("fp"),
3269 Some("0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")
3270 );
3271 assert_eq!(service.get_property_val_str("alias"), Some("Stephen MBP"));
3272 assert_eq!(service.get_property_val_str("transports"), Some("quic"));
3273 assert_eq!(service.get_property_val_str("quic_port"), Some("53318"));
3274 assert_eq!(service.get_property_val_str("listen_port"), Some("53317"));
3275 }
3276
3277 #[test]
3278 fn resolved_native_service_merges_into_registry() {
3279 let properties = [
3280 ("pv", "1"),
3281 ("minpv", "1"),
3282 (
3283 "fp",
3284 "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
3285 ),
3286 ("alias", "desk"),
3287 ("transports", "quic"),
3288 ("quic_port", "53318"),
3289 ];
3290 let service = ServiceInfo::new(
3291 NATIVE_SERVICE_TYPE,
3292 "desk-0123456789ab",
3293 "desk.local.",
3294 "192.168.1.42",
3295 53318,
3296 &properties[..],
3297 )
3298 .expect("service info")
3299 .as_resolved_service();
3300 let mut registry = PeerRegistry::new();
3301
3302 observe_resolved_service(&mut registry, &service).expect("observation");
3303
3304 let record = registry.lookup("desk").expect("record by alias");
3305 assert_eq!(
3306 record.fingerprint,
3307 "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3308 );
3309 assert_eq!(
3310 record.preferred_quic_address(),
3311 Some(SocketAddr::from(([192, 168, 1, 42], 53318)))
3312 );
3313 }
3314
3315 #[test]
3316 fn resolved_native_service_ignores_incompatible_protocol_ranges() {
3317 let properties = [
3318 ("pv", "3"),
3319 ("minpv", "2"),
3320 (
3321 "fp",
3322 "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
3323 ),
3324 ("alias", "desk"),
3325 ("transports", "quic"),
3326 ("quic_port", "53318"),
3327 ];
3328 let service = ServiceInfo::new(
3329 NATIVE_SERVICE_TYPE,
3330 "desk-0123456789ab",
3331 "desk.local.",
3332 "192.168.1.42",
3333 53318,
3334 &properties[..],
3335 )
3336 .expect("service info")
3337 .as_resolved_service();
3338 let mut registry = PeerRegistry::new();
3339
3340 observe_resolved_service(&mut registry, &service).expect("observation");
3341
3342 assert_eq!(registry.records().count(), 0);
3343 }
3344
3345 #[tokio::test]
3346 async fn direct_quic_loopback_sends_one_file() {
3347 let source_dir = tempfile::tempdir().expect("source tempdir");
3348 let dest_dir = tempfile::tempdir().expect("dest tempdir");
3349 let identity_dir = tempfile::tempdir().expect("identity tempdir");
3350 let file_path = source_dir.path().join("hello.txt");
3351 tokio::fs::write(&file_path, b"hello from ferry")
3352 .await
3353 .expect("source file written");
3354
3355 let identity =
3356 NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3357 let recv_identity = identity.clone();
3358 let reserved_socket =
3359 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3360 let recv_addr = reserved_socket.local_addr().expect("local addr");
3361 drop(reserved_socket);
3362 let receive_progress = Arc::new(Mutex::new(Vec::new()));
3363 let receive_progress_log = receive_progress.clone();
3364 let dest_path = dest_dir.path().to_path_buf();
3365 let server = tokio::spawn(async move {
3366 receive_direct_file(
3367 &ReceiveRequest::new(recv_addr),
3368 dest_path,
3369 &recv_identity,
3370 |event| {
3371 receive_progress_log
3372 .lock()
3373 .expect("progress lock")
3374 .push(event);
3375 },
3376 )
3377 .await
3378 });
3379
3380 tokio::time::sleep(Duration::from_millis(100)).await;
3381
3382 let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3383 let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
3384 let send_progress = Arc::new(Mutex::new(Vec::new()));
3385 let send_progress_log = send_progress.clone();
3386 let send_summary = send_direct_file(&send_request, &identity, |event| {
3387 send_progress_log.lock().expect("progress lock").push(event);
3388 })
3389 .await
3390 .expect("file sent");
3391 let receive_summary = server.await.expect("server joined").expect("file received");
3392
3393 let received = tokio::fs::read(dest_dir.path().join("hello.txt"))
3394 .await
3395 .expect("received file readable");
3396 assert_eq!(received, b"hello from ferry");
3397 assert_eq!(send_summary.bytes, receive_summary.bytes);
3398 assert_eq!(send_summary.blake3, receive_summary.blake3);
3399 let send_progress = send_progress.lock().expect("progress lock");
3400 let receive_progress = receive_progress.lock().expect("progress lock");
3401 assert!(matches!(
3402 send_progress.first(),
3403 Some(TransferEvent::SessionStarted {
3404 direction: TransferDirection::Send,
3405 ..
3406 })
3407 ));
3408 assert!(
3409 send_progress
3410 .iter()
3411 .any(|event| matches!(event, TransferEvent::Progress { .. }))
3412 );
3413 assert!(
3414 send_progress
3415 .iter()
3416 .any(|event| matches!(event, TransferEvent::SessionFinished { .. }))
3417 );
3418 assert!(matches!(
3419 receive_progress.first(),
3420 Some(TransferEvent::SessionStarted {
3421 direction: TransferDirection::Receive,
3422 ..
3423 })
3424 ));
3425 assert!(
3426 receive_progress
3427 .iter()
3428 .any(|event| matches!(event, TransferEvent::Progress { .. }))
3429 );
3430 assert!(
3431 receive_progress
3432 .iter()
3433 .any(|event| matches!(event, TransferEvent::SessionFinished { .. }))
3434 );
3435 }
3436
3437 #[tokio::test]
3438 async fn direct_transfer_accepts_expected_receiver_fingerprint() {
3439 let source_dir = tempfile::tempdir().expect("source tempdir");
3440 let dest_dir = tempfile::tempdir().expect("dest tempdir");
3441 let sender_identity_dir = tempfile::tempdir().expect("sender identity tempdir");
3442 let receiver_identity_dir = tempfile::tempdir().expect("receiver identity tempdir");
3443 let file_path = source_dir.path().join("hello.txt");
3444 tokio::fs::write(&file_path, b"hello from ferry")
3445 .await
3446 .expect("source file written");
3447
3448 let sender_identity = NativeIdentity::load_or_generate_in(sender_identity_dir.path())
3449 .expect("sender identity");
3450 let receiver_identity = NativeIdentity::load_or_generate_in(receiver_identity_dir.path())
3451 .expect("receiver identity");
3452 let expected_fingerprint = receiver_identity.fingerprint().to_string();
3453 let reserved_socket =
3454 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3455 let recv_addr = reserved_socket.local_addr().expect("local addr");
3456 drop(reserved_socket);
3457 let dest_path = dest_dir.path().to_path_buf();
3458 let server = tokio::spawn(async move {
3459 receive_direct_file(
3460 &ReceiveRequest::new(recv_addr),
3461 dest_path,
3462 &receiver_identity,
3463 |_| {},
3464 )
3465 .await
3466 });
3467
3468 tokio::time::sleep(Duration::from_millis(100)).await;
3469
3470 let peer = DirectPeer::from_address(recv_addr)
3471 .with_expected_fingerprint(expected_fingerprint)
3472 .expect("expected fingerprint accepted");
3473 let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
3474 let send_summary = send_direct_file(&send_request, &sender_identity, |_| {})
3475 .await
3476 .expect("file sent");
3477 let receive_summary = server.await.expect("server joined").expect("file received");
3478
3479 let received = tokio::fs::read(dest_dir.path().join("hello.txt"))
3480 .await
3481 .expect("received file readable");
3482 assert_eq!(received, b"hello from ferry");
3483 assert_eq!(send_summary.bytes, receive_summary.bytes);
3484 assert_eq!(send_summary.blake3, receive_summary.blake3);
3485 }
3486
3487 #[tokio::test]
3488 async fn direct_transfer_rejects_unexpected_receiver_fingerprint_before_payload() {
3489 let source_dir = tempfile::tempdir().expect("source tempdir");
3490 let dest_dir = tempfile::tempdir().expect("dest tempdir");
3491 let sender_identity_dir = tempfile::tempdir().expect("sender identity tempdir");
3492 let receiver_identity_dir = tempfile::tempdir().expect("receiver identity tempdir");
3493 let other_identity_dir = tempfile::tempdir().expect("other identity tempdir");
3494 let file_path = source_dir.path().join("hello.txt");
3495 tokio::fs::write(&file_path, b"hello from ferry")
3496 .await
3497 .expect("source file written");
3498
3499 let sender_identity = NativeIdentity::load_or_generate_in(sender_identity_dir.path())
3500 .expect("sender identity");
3501 let receiver_identity = NativeIdentity::load_or_generate_in(receiver_identity_dir.path())
3502 .expect("receiver identity");
3503 let other_identity =
3504 NativeIdentity::load_or_generate_in(other_identity_dir.path()).expect("other identity");
3505 let reserved_socket =
3506 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3507 let recv_addr = reserved_socket.local_addr().expect("local addr");
3508 drop(reserved_socket);
3509 let dest_path = dest_dir.path().to_path_buf();
3510 let server = tokio::spawn(async move {
3511 receive_direct_file(
3512 &ReceiveRequest::new(recv_addr),
3513 dest_path,
3514 &receiver_identity,
3515 |_| {},
3516 )
3517 .await
3518 });
3519
3520 tokio::time::sleep(Duration::from_millis(100)).await;
3521
3522 let peer = DirectPeer::from_address(recv_addr)
3523 .with_expected_fingerprint(other_identity.fingerprint().to_string())
3524 .expect("expected fingerprint accepted");
3525 let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
3526 let error = send_direct_file(&send_request, &sender_identity, |_| {})
3527 .await
3528 .expect_err("fingerprint mismatch should reject send");
3529
3530 assert!(matches!(error, Error::PeerFingerprintMismatch { .. }));
3531 let server_error = server
3532 .await
3533 .expect("server joined")
3534 .expect_err("server closed");
3535 assert!(matches!(
3536 server_error,
3537 Error::Connection(_) | Error::NoIncomingConnection
3538 ));
3539 assert!(
3540 !tokio::fs::try_exists(dest_dir.path().join("hello.txt"))
3541 .await
3542 .expect("destination checked")
3543 );
3544 }
3545
3546 #[tokio::test]
3547 async fn direct_transfer_accepts_matching_psk_before_payload() {
3548 let source_dir = tempfile::tempdir().expect("source tempdir");
3549 let dest_dir = tempfile::tempdir().expect("dest tempdir");
3550 let identity_dir = tempfile::tempdir().expect("identity tempdir");
3551 let file_path = source_dir.path().join("hello.txt");
3552 tokio::fs::write(&file_path, b"hello from ferry")
3553 .await
3554 .expect("source file written");
3555
3556 let identity =
3557 NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3558 let recv_identity = identity.clone();
3559 let reserved_socket =
3560 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3561 let recv_addr = reserved_socket.local_addr().expect("local addr");
3562 drop(reserved_socket);
3563 let dest_path = dest_dir.path().to_path_buf();
3564 let receive_request = ReceiveRequest::new(recv_addr)
3565 .with_psk("correct horse battery staple")
3566 .expect("receiver psk accepted");
3567 let server = tokio::spawn(async move {
3568 receive_direct_file(&receive_request, dest_path, &recv_identity, |_| {}).await
3569 });
3570
3571 tokio::time::sleep(Duration::from_millis(100)).await;
3572
3573 let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3574 let send_request = SendRequest::new(peer, vec![file_path])
3575 .expect("send request is valid")
3576 .with_psk("correct horse battery staple")
3577 .expect("sender psk accepted");
3578 let send_summary = send_direct_file(&send_request, &identity, |_| {})
3579 .await
3580 .expect("file sent");
3581 let receive_summary = server.await.expect("server joined").expect("file received");
3582
3583 let received = tokio::fs::read(dest_dir.path().join("hello.txt"))
3584 .await
3585 .expect("received file readable");
3586 assert_eq!(received, b"hello from ferry");
3587 assert_eq!(send_summary.bytes, receive_summary.bytes);
3588 }
3589
3590 #[tokio::test]
3591 async fn direct_transfer_rejects_missing_psk_before_payload() {
3592 let source_dir = tempfile::tempdir().expect("source tempdir");
3593 let dest_dir = tempfile::tempdir().expect("dest tempdir");
3594 let identity_dir = tempfile::tempdir().expect("identity tempdir");
3595 let file_path = source_dir.path().join("hello.txt");
3596 tokio::fs::write(&file_path, b"hello from ferry")
3597 .await
3598 .expect("source file written");
3599
3600 let identity =
3601 NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3602 let recv_identity = identity.clone();
3603 let reserved_socket =
3604 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3605 let recv_addr = reserved_socket.local_addr().expect("local addr");
3606 drop(reserved_socket);
3607 let dest_path = dest_dir.path().to_path_buf();
3608 let receive_request = ReceiveRequest::new(recv_addr)
3609 .with_psk("receiver secret")
3610 .expect("receiver psk accepted");
3611 let server = tokio::spawn(async move {
3612 receive_direct_file(&receive_request, dest_path, &recv_identity, |_| {}).await
3613 });
3614
3615 tokio::time::sleep(Duration::from_millis(100)).await;
3616
3617 let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3618 let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
3619 let error = send_direct_file(&send_request, &identity, |_| {})
3620 .await
3621 .expect_err("missing psk should reject send");
3622
3623 assert!(matches!(error, Error::Protocol(_)));
3624 let _ = server.await.expect("server joined");
3625 assert!(
3626 !tokio::fs::try_exists(dest_dir.path().join("hello.txt"))
3627 .await
3628 .expect("destination checked")
3629 );
3630 }
3631
3632 #[tokio::test]
3633 async fn direct_transfer_rejects_wrong_psk_before_payload() {
3634 let source_dir = tempfile::tempdir().expect("source tempdir");
3635 let dest_dir = tempfile::tempdir().expect("dest tempdir");
3636 let identity_dir = tempfile::tempdir().expect("identity tempdir");
3637 let file_path = source_dir.path().join("hello.txt");
3638 tokio::fs::write(&file_path, b"hello from ferry")
3639 .await
3640 .expect("source file written");
3641
3642 let identity =
3643 NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3644 let recv_identity = identity.clone();
3645 let reserved_socket =
3646 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3647 let recv_addr = reserved_socket.local_addr().expect("local addr");
3648 drop(reserved_socket);
3649 let dest_path = dest_dir.path().to_path_buf();
3650 let receive_request = ReceiveRequest::new(recv_addr)
3651 .with_psk("receiver secret")
3652 .expect("receiver psk accepted");
3653 let server = tokio::spawn(async move {
3654 receive_direct_file(&receive_request, dest_path, &recv_identity, |_| {}).await
3655 });
3656
3657 tokio::time::sleep(Duration::from_millis(100)).await;
3658
3659 let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3660 let send_request = SendRequest::new(peer, vec![file_path])
3661 .expect("send request is valid")
3662 .with_psk("wrong secret")
3663 .expect("sender psk accepted");
3664 let error = send_direct_file(&send_request, &identity, |_| {})
3665 .await
3666 .expect_err("wrong psk should reject send");
3667
3668 assert!(matches!(error, Error::Authentication(_)));
3669 assert!(!error.to_string().contains("wrong secret"));
3670 assert!(!error.to_string().contains("receiver secret"));
3671 let server_error = server
3672 .await
3673 .expect("server joined")
3674 .expect_err("server should reject wrong psk");
3675 assert!(matches!(
3676 server_error,
3677 Error::Authentication(_) | Error::Write(_)
3678 ));
3679 assert!(
3680 !tokio::fs::try_exists(dest_dir.path().join("hello.txt"))
3681 .await
3682 .expect("destination checked")
3683 );
3684 }
3685
3686 #[tokio::test]
3687 async fn direct_quic_loopback_sends_directory_and_resumes_partial_file() {
3688 let source_dir = tempfile::tempdir().expect("source tempdir");
3689 let dest_dir = tempfile::tempdir().expect("dest tempdir");
3690 let identity_dir = tempfile::tempdir().expect("identity tempdir");
3691 let bundle = source_dir.path().join("bundle");
3692 let nested = bundle.join("nested");
3693 tokio::fs::create_dir_all(&nested)
3694 .await
3695 .expect("source dirs");
3696 tokio::fs::write(nested.join("small.txt"), b"small")
3697 .await
3698 .expect("small file");
3699 let large_bytes = vec![7; (DEFAULT_CHUNK_SIZE as usize * 2) + 17];
3700 tokio::fs::write(bundle.join("large.bin"), &large_bytes)
3701 .await
3702 .expect("large file");
3703
3704 let dest_bundle = dest_dir.path().join("bundle");
3705 tokio::fs::create_dir_all(&dest_bundle)
3706 .await
3707 .expect("dest bundle");
3708 tokio::fs::write(
3709 dest_bundle.join("large.bin"),
3710 &large_bytes[..DEFAULT_CHUNK_SIZE as usize + 5],
3711 )
3712 .await
3713 .expect("partial large");
3714
3715 let identity =
3716 NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3717 let recv_identity = identity.clone();
3718 let reserved_socket =
3719 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3720 let recv_addr = reserved_socket.local_addr().expect("local addr");
3721 drop(reserved_socket);
3722 let dest_path = dest_dir.path().to_path_buf();
3723 let server = tokio::spawn(async move {
3724 receive_direct_file(
3725 &ReceiveRequest::new(recv_addr),
3726 dest_path,
3727 &recv_identity,
3728 |_| {},
3729 )
3730 .await
3731 });
3732
3733 tokio::time::sleep(Duration::from_millis(100)).await;
3734
3735 let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3736 let send_request = SendRequest::new(peer, vec![bundle]).expect("send request is valid");
3737 let send_summary = send_direct_file(&send_request, &identity, |_| {})
3738 .await
3739 .expect("directory sent");
3740 let receive_summary = server
3741 .await
3742 .expect("server joined")
3743 .expect("directory received");
3744
3745 let received_large = tokio::fs::read(dest_bundle.join("large.bin"))
3746 .await
3747 .expect("large file readable");
3748 let received_small = tokio::fs::read(dest_bundle.join("nested/small.txt"))
3749 .await
3750 .expect("small file readable");
3751 assert_eq!(received_large, large_bytes);
3752 assert_eq!(received_small, b"small");
3753 assert_eq!(send_summary.bytes, receive_summary.bytes);
3754 assert!(
3755 receive_summary
3756 .files
3757 .iter()
3758 .any(|file| file.status == TransferFileStatus::Resumed)
3759 );
3760 }
3761
3762 #[tokio::test]
3763 async fn direct_quic_loopback_cancels_send_and_does_not_finalize_partial_file() {
3764 let source_dir = tempfile::tempdir().expect("source tempdir");
3765 let dest_dir = tempfile::tempdir().expect("dest tempdir");
3766 let identity_dir = tempfile::tempdir().expect("identity tempdir");
3767 let file_path = source_dir.path().join("payload.bin");
3768 tokio::fs::write(&file_path, vec![3; IO_BUFFER_SIZE * 64])
3769 .await
3770 .expect("source file written");
3771
3772 let identity =
3773 NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3774 let recv_identity = identity.clone();
3775 let reserved_socket =
3776 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3777 let recv_addr = reserved_socket.local_addr().expect("local addr");
3778 drop(reserved_socket);
3779
3780 let dest_path = dest_dir.path().to_path_buf();
3781 let server = tokio::spawn(async move {
3782 receive_direct_file(
3783 &ReceiveRequest::new(recv_addr),
3784 dest_path,
3785 &recv_identity,
3786 |_| {},
3787 )
3788 .await
3789 });
3790
3791 tokio::time::sleep(Duration::from_millis(100)).await;
3792
3793 let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3794 let send_request =
3795 SendRequest::new(peer, vec![file_path.clone()]).expect("send request is valid");
3796 let send_control = TransferControl::new();
3797 let cancel_on_progress = send_control.clone();
3798 let send_events = Arc::new(Mutex::new(Vec::new()));
3799 let send_events_log = send_events.clone();
3800 let send_result =
3801 send_direct_file_with_control(&send_request, &identity, send_control, |event| {
3802 if matches!(event, TransferEvent::Progress { .. }) {
3803 cancel_on_progress.cancel();
3804 }
3805 send_events_log.lock().expect("events lock").push(event);
3806 })
3807 .await;
3808
3809 assert!(matches!(send_result, Err(Error::TransferCancelled)));
3810
3811 let receive_result = tokio::time::timeout(Duration::from_secs(5), server)
3812 .await
3813 .expect("receiver should finish after sender cancellation")
3814 .expect("server joined");
3815 assert!(receive_result.is_err());
3816
3817 assert!(!dest_dir.path().join("payload.bin").exists());
3818 assert!(
3819 send_events
3820 .lock()
3821 .expect("events lock")
3822 .iter()
3823 .any(|event| matches!(event, TransferEvent::SessionCancelled { .. }))
3824 );
3825 }
3826}