1pub mod error;
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::fs::Metadata;
5use std::io::SeekFrom;
6use std::net::SocketAddr;
7use std::path::{Component, Path, PathBuf};
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::{Arc, Mutex};
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12use directories::BaseDirs;
13use mdns_sd::{ResolvedService, ServiceDaemon, ServiceEvent, ServiceInfo};
14use quinn::crypto::rustls::QuicClientConfig;
15use rcgen::{CertifiedKey, generate_simple_self_signed};
16use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier};
17use rustls::crypto::{CryptoProvider, verify_tls12_signature, verify_tls13_signature};
18use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer, ServerName, UnixTime};
19use rustls::{DigitallySignedStruct, SignatureScheme};
20use serde::{Deserialize, Serialize};
21use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
22use uuid::Uuid;
23
24pub use error::{Error, Result};
25
26pub const NATIVE_PROTOCOL_VERSION: u16 = 1;
27pub const MIN_NATIVE_PROTOCOL_VERSION: u16 = 1;
28pub const NATIVE_SERVICE_TYPE: &str = "_ferry._udp.local.";
29
30const DIRECT_MAGIC: &[u8; 8] = b"FERRY01\n";
31const MAX_FRAME_LEN: usize = 16 * 1024 * 1024;
32const IO_BUFFER_SIZE: usize = 64 * 1024;
33const MANIFEST_VERSION: u16 = 1;
34const DEFAULT_CHUNK_SIZE: u64 = 1024 * 1024;
35const CONFIG_DIR_NAME: &str = "ferry";
36const PSK_PROOF_CONTEXT: &[u8] = b"fileferry-direct-psk-v1";
37
38#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
39pub struct DirectPeer {
40 address: SocketAddr,
41 expected_fingerprint: Option<String>,
42}
43
44impl DirectPeer {
45 pub fn parse(value: &str) -> Result<Self> {
46 Ok(Self {
47 address: value.parse()?,
48 expected_fingerprint: None,
49 })
50 }
51
52 pub const fn address(&self) -> SocketAddr {
53 self.address
54 }
55
56 pub const fn from_address(address: SocketAddr) -> Self {
57 Self {
58 address,
59 expected_fingerprint: None,
60 }
61 }
62
63 pub fn with_expected_fingerprint(mut self, fingerprint: impl Into<String>) -> Result<Self> {
64 self.expected_fingerprint = Some(normalized_fingerprint(fingerprint.into())?);
65 Ok(self)
66 }
67
68 pub fn expected_fingerprint(&self) -> Option<&str> {
69 self.expected_fingerprint.as_deref()
70 }
71}
72
73#[derive(Clone, PartialEq, Eq, Deserialize)]
74pub struct PskSecret(String);
75
76impl PskSecret {
77 pub fn new(psk: impl Into<String>) -> Result<Self> {
78 validate_psk(psk.into()).map(Self)
79 }
80
81 fn expose_secret(&self) -> &str {
82 &self.0
83 }
84}
85
86impl std::fmt::Debug for PskSecret {
87 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88 formatter.write_str("PskSecret(<redacted>)")
89 }
90}
91
92impl Serialize for PskSecret {
93 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
94 where
95 S: serde::Serializer,
96 {
97 serializer.serialize_str("<redacted>")
98 }
99}
100
101#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
102pub struct SendRequest {
103 peer: DirectPeer,
104 paths: Vec<PathBuf>,
105 #[serde(default, skip_serializing)]
106 psk: Option<PskSecret>,
107}
108
109impl SendRequest {
110 pub fn new(peer: DirectPeer, paths: Vec<PathBuf>) -> Result<Self> {
111 if paths.is_empty() {
112 return Err(Error::InvalidInput(
113 "at least one file path is required".to_string(),
114 ));
115 }
116
117 Ok(Self {
118 peer,
119 paths,
120 psk: None,
121 })
122 }
123
124 pub const fn peer(&self) -> &DirectPeer {
125 &self.peer
126 }
127
128 pub fn paths(&self) -> &[PathBuf] {
129 &self.paths
130 }
131
132 pub fn with_psk(mut self, psk: impl Into<String>) -> Result<Self> {
133 self.psk = Some(PskSecret::new(psk)?);
134 Ok(self)
135 }
136
137 pub fn psk(&self) -> Option<&str> {
138 self.psk.as_ref().map(PskSecret::expose_secret)
139 }
140}
141
142#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
143pub struct ReceiveRequest {
144 listen: SocketAddr,
145 #[serde(default, skip_serializing)]
146 psk: Option<PskSecret>,
147}
148
149impl ReceiveRequest {
150 pub fn new(listen: SocketAddr) -> Self {
151 Self { listen, psk: None }
152 }
153
154 pub const fn listen(&self) -> SocketAddr {
155 self.listen
156 }
157
158 pub fn with_psk(mut self, psk: impl Into<String>) -> Result<Self> {
159 self.psk = Some(PskSecret::new(psk)?);
160 Ok(self)
161 }
162
163 pub fn psk(&self) -> Option<&str> {
164 self.psk.as_ref().map(PskSecret::expose_secret)
165 }
166}
167
168#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
169pub struct AppConfig {
170 pub alias: String,
171 pub listen_port: u16,
172 pub quic_port: u16,
173 pub download_dir: String,
174 pub auto_accept_known: bool,
175 pub auto_accept_unknown: bool,
176 pub discovery: Vec<String>,
177 pub max_concurrent_files: usize,
178 pub trust: TrustConfig,
179}
180
181impl Default for AppConfig {
182 fn default() -> Self {
183 Self {
184 alias: default_alias(),
185 listen_port: 53317,
186 quic_port: 53318,
187 download_dir: "~/Downloads/ferry".to_string(),
188 auto_accept_known: true,
189 auto_accept_unknown: false,
190 discovery: vec!["native".to_string()],
191 max_concurrent_files: 8,
192 trust: TrustConfig::default(),
193 }
194 }
195}
196
197impl AppConfig {
198 pub fn load_or_default() -> Result<Self> {
199 Self::load_or_default_from(config_dir()?.join("config.toml"))
200 }
201
202 pub fn load_or_default_from(path: impl AsRef<Path>) -> Result<Self> {
203 let path = path.as_ref();
204 match std::fs::read_to_string(path) {
205 Ok(contents) => {
206 toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
207 }
208 Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Self::default()),
209 Err(source) => Err(Error::IoPath {
210 path: path.to_path_buf(),
211 source,
212 }),
213 }
214 }
215
216 pub fn save_default_path(&self) -> Result<()> {
217 self.save_to_path(config_dir()?.join("config.toml"))
218 }
219
220 pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
221 write_toml_file(path.as_ref(), self)
222 }
223
224 pub fn to_toml_string(&self) -> Result<String> {
225 toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
226 }
227
228 pub fn redacted(&self) -> Self {
229 let mut config = self.clone();
230 config.trust = config.trust.redacted();
231 config
232 }
233
234 pub fn to_redacted_toml_string(&self) -> Result<String> {
235 self.redacted().to_toml_string()
236 }
237}
238
239#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
240pub struct DaemonConfig {
241 pub listen: SocketAddr,
242 pub destination: PathBuf,
243}
244
245impl DaemonConfig {
246 pub fn from_app_config(config: &AppConfig) -> Result<Self> {
247 Ok(Self {
248 listen: SocketAddr::from(([0, 0, 0, 0], config.quic_port)),
249 destination: expand_home_path(&config.download_dir)?,
250 })
251 }
252
253 pub fn load_or_default() -> Result<Self> {
254 let app_config = AppConfig::load_or_default()?;
255 Self::load_or_default_from(config_dir()?.join("daemon.toml"), &app_config)
256 }
257
258 pub fn load_or_default_from(path: impl AsRef<Path>, app_config: &AppConfig) -> Result<Self> {
259 let path = path.as_ref();
260 match std::fs::read_to_string(path) {
261 Ok(contents) => {
262 toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
263 }
264 Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
265 Self::from_app_config(app_config)
266 }
267 Err(source) => Err(Error::IoPath {
268 path: path.to_path_buf(),
269 source,
270 }),
271 }
272 }
273
274 pub fn save_default_path(&self) -> Result<()> {
275 self.save_to_path(config_dir()?.join("daemon.toml"))
276 }
277
278 pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
279 write_toml_file(path.as_ref(), self)
280 }
281
282 pub fn to_toml_string(&self) -> Result<String> {
283 toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
284 }
285}
286
287#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
288pub struct TrustConfig {
289 pub require_fingerprint: bool,
290 pub psk: String,
291}
292
293impl Default for TrustConfig {
294 fn default() -> Self {
295 Self {
296 require_fingerprint: true,
297 psk: String::new(),
298 }
299 }
300}
301
302impl TrustConfig {
303 pub fn redacted(&self) -> Self {
304 let psk = if self.psk.is_empty() {
305 String::new()
306 } else {
307 "<redacted>".to_string()
308 };
309 Self {
310 require_fingerprint: self.require_fingerprint,
311 psk,
312 }
313 }
314}
315
316#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
317pub struct NativeIdentity {
318 cert_der: Vec<u8>,
319 key_der: Vec<u8>,
320 fingerprint: String,
321}
322
323impl NativeIdentity {
324 pub fn load_or_generate() -> Result<Self> {
325 Self::load_or_generate_in(config_dir()?)
326 }
327
328 pub fn load_or_generate_in(config_dir: impl AsRef<Path>) -> Result<Self> {
329 let config_dir = config_dir.as_ref();
330 let cert_path = config_dir.join("identity.cert.der");
331 let key_path = config_dir.join("identity.key.der");
332
333 if cert_path.exists() && key_path.exists() {
334 let cert_der = std::fs::read(&cert_path).map_err(|source| Error::IoPath {
335 path: cert_path,
336 source,
337 })?;
338 let key_der = std::fs::read(&key_path).map_err(|source| Error::IoPath {
339 path: key_path,
340 source,
341 })?;
342
343 return Ok(Self::from_parts(cert_der, key_der));
344 }
345
346 std::fs::create_dir_all(config_dir).map_err(|source| Error::IoPath {
347 path: config_dir.to_path_buf(),
348 source,
349 })?;
350
351 let identity = Self::generate()?;
352 write_private_file(&cert_path, &identity.cert_der)?;
353 write_private_file(&key_path, &identity.key_der)?;
354 Ok(identity)
355 }
356
357 pub fn generate() -> Result<Self> {
358 let CertifiedKey { cert, signing_key } =
359 generate_simple_self_signed(vec!["localhost".to_string()])?;
360 Ok(Self::from_parts(
361 cert.der().to_vec(),
362 signing_key.serialize_der(),
363 ))
364 }
365
366 pub fn fingerprint(&self) -> &str {
367 &self.fingerprint
368 }
369
370 fn cert_chain(&self) -> Vec<CertificateDer<'static>> {
371 vec![CertificateDer::from(self.cert_der.clone())]
372 }
373
374 fn private_key(&self) -> PrivateKeyDer<'static> {
375 PrivateKeyDer::from(PrivatePkcs8KeyDer::from(self.key_der.clone()))
376 }
377
378 fn from_parts(cert_der: Vec<u8>, key_der: Vec<u8>) -> Self {
379 let fingerprint = blake3::hash(&cert_der).to_hex().to_string();
380 Self {
381 cert_der,
382 key_der,
383 fingerprint,
384 }
385 }
386}
387
388#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
389#[serde(rename_all = "snake_case")]
390pub enum TransferDirection {
391 Send,
392 Receive,
393}
394
395#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
396pub struct TransferProgress {
397 pub direction: TransferDirection,
398 pub file_name: String,
399 pub bytes_done: u64,
400 pub bytes_total: u64,
401}
402
403#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
404#[serde(tag = "event", rename_all = "snake_case")]
405pub enum TransferEvent {
406 SessionStarted {
407 direction: TransferDirection,
408 session_id: Uuid,
409 files_total: usize,
410 bytes_total: u64,
411 },
412 FileStarted {
413 direction: TransferDirection,
414 file_name: String,
415 bytes_total: u64,
416 resume_offset: u64,
417 },
418 Progress {
419 direction: TransferDirection,
420 file_name: String,
421 bytes_done: u64,
422 bytes_total: u64,
423 },
424 FileFinished {
425 direction: TransferDirection,
426 file_name: String,
427 bytes: u64,
428 blake3: String,
429 status: TransferFileStatus,
430 },
431 SessionFinished {
432 direction: TransferDirection,
433 files_total: usize,
434 bytes_total: u64,
435 blake3: String,
436 },
437 SessionCancelled {
438 direction: TransferDirection,
439 session_id: Uuid,
440 },
441}
442
443impl TransferEvent {
444 pub fn progress(&self) -> Option<TransferProgress> {
445 match self {
446 Self::Progress {
447 direction,
448 file_name,
449 bytes_done,
450 bytes_total,
451 } => Some(TransferProgress {
452 direction: *direction,
453 file_name: file_name.clone(),
454 bytes_done: *bytes_done,
455 bytes_total: *bytes_total,
456 }),
457 _ => None,
458 }
459 }
460}
461
462#[derive(Debug, Clone, Default)]
463pub struct TransferControl {
464 cancelled: Arc<AtomicBool>,
465}
466
467impl TransferControl {
468 pub fn new() -> Self {
469 Self::default()
470 }
471
472 pub fn cancel(&self) {
473 self.cancelled.store(true, Ordering::SeqCst);
474 }
475
476 pub fn is_cancelled(&self) -> bool {
477 self.cancelled.load(Ordering::SeqCst)
478 }
479
480 fn check_cancelled(&self) -> Result<()> {
481 if self.is_cancelled() {
482 Err(Error::TransferCancelled)
483 } else {
484 Ok(())
485 }
486 }
487}
488
489#[derive(Debug, Clone, PartialEq, Eq)]
490pub struct TransferSummary {
491 pub path: PathBuf,
492 pub file_name: String,
493 pub bytes: u64,
494 pub blake3: String,
495 pub files: Vec<TransferFileSummary>,
496}
497
498#[derive(Debug, Clone, PartialEq, Eq)]
499pub struct TransferFileSummary {
500 pub path: PathBuf,
501 pub relative_path: PathBuf,
502 pub bytes: u64,
503 pub blake3: String,
504 pub status: TransferFileStatus,
505}
506
507#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
508#[serde(rename_all = "snake_case")]
509pub enum TransferFileStatus {
510 Sent,
511 Received,
512 Skipped,
513 Resumed,
514}
515
516#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
517pub struct TransferManifest {
518 pub version: u16,
519 pub session_id: Uuid,
520 pub chunk_size: u64,
521 pub entries: Vec<ManifestEntry>,
522}
523
524impl TransferManifest {
525 pub fn file_entries(&self) -> impl Iterator<Item = &ManifestEntry> {
526 self.entries
527 .iter()
528 .filter(|entry| entry.kind == ManifestEntryKind::File)
529 }
530
531 pub fn total_file_bytes(&self) -> u64 {
532 self.file_entries().map(|entry| entry.size).sum()
533 }
534
535 pub fn digest(&self) -> Result<String> {
536 let bytes = serde_json::to_vec(self).map_err(|error| Error::Protocol(error.to_string()))?;
537 Ok(blake3::hash(&bytes).to_hex().to_string())
538 }
539}
540
541#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
542pub struct ManifestEntry {
543 pub relative_path: PathBuf,
544 pub kind: ManifestEntryKind,
545 pub size: u64,
546 pub blake3: Option<String>,
547 pub chunks: Vec<String>,
548 pub unix_mode: Option<u32>,
549 pub modified_unix_ms: Option<i128>,
550}
551
552#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
553#[serde(rename_all = "snake_case")]
554pub enum ManifestEntryKind {
555 File,
556 Directory,
557}
558
559#[derive(Debug, Clone, PartialEq, Eq)]
560pub enum ResumeDecision {
561 Create,
562 Skip,
563 ResumeFrom(u64),
564 Conflict(String),
565}
566
567#[derive(Debug, Clone, PartialEq, Eq)]
568pub struct PeerObservation {
569 pub fingerprint: String,
570 pub alias: Option<String>,
571 pub hostname: Option<String>,
572 pub address: SocketAddr,
573 pub transports: BTreeSet<String>,
574 pub seen_unix_ms: i128,
575}
576
577impl PeerObservation {
578 pub fn new(fingerprint: impl Into<String>, address: SocketAddr, seen_unix_ms: i128) -> Self {
579 Self {
580 fingerprint: fingerprint.into(),
581 alias: None,
582 hostname: None,
583 address,
584 transports: BTreeSet::new(),
585 seen_unix_ms,
586 }
587 }
588
589 pub fn with_alias(mut self, alias: impl Into<String>) -> Self {
590 self.alias = Some(alias.into());
591 self
592 }
593
594 pub fn with_hostname(mut self, hostname: impl Into<String>) -> Self {
595 self.hostname = Some(hostname.into());
596 self
597 }
598
599 pub fn with_transport(mut self, transport: impl Into<String>) -> Self {
600 self.transports.insert(transport.into());
601 self
602 }
603}
604
605#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
606pub struct PeerRecord {
607 pub fingerprint: String,
608 pub aliases: BTreeSet<String>,
609 pub hostnames: BTreeSet<String>,
610 pub addresses: BTreeSet<SocketAddr>,
611 pub transports: BTreeSet<String>,
612 pub first_seen_unix_ms: i128,
613 pub last_seen_unix_ms: i128,
614}
615
616impl PeerRecord {
617 fn from_observation(observation: PeerObservation) -> Self {
618 let mut aliases = BTreeSet::new();
619 if let Some(alias) = observation.alias {
620 aliases.insert(alias);
621 }
622
623 let mut hostnames = BTreeSet::new();
624 if let Some(hostname) = observation.hostname {
625 hostnames.insert(hostname);
626 }
627
628 let mut addresses = BTreeSet::new();
629 addresses.insert(observation.address);
630
631 Self {
632 fingerprint: observation.fingerprint,
633 aliases,
634 hostnames,
635 addresses,
636 transports: observation.transports,
637 first_seen_unix_ms: observation.seen_unix_ms,
638 last_seen_unix_ms: observation.seen_unix_ms,
639 }
640 }
641
642 fn merge(&mut self, observation: PeerObservation) {
643 if let Some(alias) = observation.alias {
644 self.aliases.insert(alias);
645 }
646 if let Some(hostname) = observation.hostname {
647 self.hostnames.insert(hostname);
648 }
649 self.addresses.insert(observation.address);
650 self.transports.extend(observation.transports);
651 self.first_seen_unix_ms = self.first_seen_unix_ms.min(observation.seen_unix_ms);
652 self.last_seen_unix_ms = self.last_seen_unix_ms.max(observation.seen_unix_ms);
653 }
654
655 pub fn preferred_quic_address(&self) -> Option<SocketAddr> {
656 if !self.transports.contains("quic") {
657 return None;
658 }
659
660 self.addresses.iter().copied().next()
661 }
662}
663
664#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
665#[serde(rename_all = "snake_case")]
666pub enum TrustState {
667 Trusted,
668 Blocked,
669}
670
671#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
672pub struct KnownPeerEntry {
673 pub fingerprint: String,
674 #[serde(default)]
675 pub aliases: BTreeSet<String>,
676 #[serde(default)]
677 pub hostnames: BTreeSet<String>,
678 #[serde(default)]
679 pub addresses: BTreeSet<SocketAddr>,
680 #[serde(default)]
681 pub transports: BTreeSet<String>,
682 pub trust_state: TrustState,
683 pub first_seen_unix_ms: Option<i128>,
684 pub last_seen_unix_ms: Option<i128>,
685}
686
687impl KnownPeerEntry {
688 pub fn trusted(fingerprint: impl Into<String>) -> Result<Self> {
689 let fingerprint = normalized_fingerprint(fingerprint.into())?;
690 Ok(Self {
691 fingerprint,
692 aliases: BTreeSet::new(),
693 hostnames: BTreeSet::new(),
694 addresses: BTreeSet::new(),
695 transports: BTreeSet::new(),
696 trust_state: TrustState::Trusted,
697 first_seen_unix_ms: None,
698 last_seen_unix_ms: None,
699 })
700 }
701
702 pub fn from_record(record: &PeerRecord, trust_state: TrustState) -> Result<Self> {
703 let fingerprint = normalized_fingerprint(record.fingerprint.clone())?;
704 Ok(Self {
705 fingerprint,
706 aliases: record.aliases.clone(),
707 hostnames: record.hostnames.clone(),
708 addresses: record.addresses.clone(),
709 transports: record.transports.clone(),
710 trust_state,
711 first_seen_unix_ms: Some(record.first_seen_unix_ms),
712 last_seen_unix_ms: Some(record.last_seen_unix_ms),
713 })
714 }
715}
716
717#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
718pub struct TrustStore {
719 #[serde(default)]
720 pub peers: BTreeMap<String, KnownPeerEntry>,
721}
722
723impl TrustStore {
724 pub fn load_or_default() -> Result<Self> {
725 Self::load_or_default_from(config_dir()?.join("known_peers.toml"))
726 }
727
728 pub fn load_or_default_from(path: impl AsRef<Path>) -> Result<Self> {
729 let path = path.as_ref();
730 match std::fs::read_to_string(path) {
731 Ok(contents) => {
732 toml::from_str(&contents).map_err(|error| Error::ConfigParse(error.to_string()))
733 }
734 Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Self::default()),
735 Err(source) => Err(Error::IoPath {
736 path: path.to_path_buf(),
737 source,
738 }),
739 }
740 }
741
742 pub fn save_default_path(&self) -> Result<()> {
743 self.save_to_path(config_dir()?.join("known_peers.toml"))
744 }
745
746 pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<()> {
747 write_toml_file(path.as_ref(), self)
748 }
749
750 pub fn trust_fingerprint(&mut self, fingerprint: impl Into<String>) -> Result<&KnownPeerEntry> {
751 let entry = KnownPeerEntry::trusted(fingerprint)?;
752 let fingerprint = entry.fingerprint.clone();
753 self.peers
754 .entry(fingerprint.clone())
755 .and_modify(|existing| existing.trust_state = TrustState::Trusted)
756 .or_insert(entry);
757 Ok(self
758 .peers
759 .get(&fingerprint)
760 .expect("trusted peer entry should exist"))
761 }
762
763 pub fn trust_record(&mut self, record: &PeerRecord) -> Result<&KnownPeerEntry> {
764 let entry = KnownPeerEntry::from_record(record, TrustState::Trusted)?;
765 let fingerprint = entry.fingerprint.clone();
766 self.peers
767 .entry(fingerprint.clone())
768 .and_modify(|existing| {
769 existing.aliases.extend(record.aliases.clone());
770 existing.hostnames.extend(record.hostnames.clone());
771 existing.addresses.extend(record.addresses.clone());
772 existing.transports.extend(record.transports.clone());
773 existing.first_seen_unix_ms = Some(
774 existing
775 .first_seen_unix_ms
776 .unwrap_or(record.first_seen_unix_ms)
777 .min(record.first_seen_unix_ms),
778 );
779 existing.last_seen_unix_ms = Some(
780 existing
781 .last_seen_unix_ms
782 .unwrap_or(record.last_seen_unix_ms)
783 .max(record.last_seen_unix_ms),
784 );
785 existing.trust_state = TrustState::Trusted;
786 })
787 .or_insert(entry);
788 Ok(self
789 .peers
790 .get(&fingerprint)
791 .expect("trusted peer entry should exist"))
792 }
793
794 pub fn trust_direct_address(
795 &mut self,
796 fingerprint: impl Into<String>,
797 address: SocketAddr,
798 ) -> Result<&KnownPeerEntry> {
799 let mut entry = KnownPeerEntry::trusted(fingerprint)?;
800 let seen = system_time_unix_ms(SystemTime::now()).unwrap_or_default();
801 entry.addresses.insert(address);
802 entry.transports.insert("quic".to_string());
803 entry.first_seen_unix_ms = Some(seen);
804 entry.last_seen_unix_ms = Some(seen);
805
806 let fingerprint = entry.fingerprint.clone();
807 self.peers
808 .entry(fingerprint.clone())
809 .and_modify(|existing| {
810 existing.addresses.insert(address);
811 existing.transports.insert("quic".to_string());
812 existing.first_seen_unix_ms =
813 Some(existing.first_seen_unix_ms.unwrap_or(seen).min(seen));
814 existing.last_seen_unix_ms =
815 Some(existing.last_seen_unix_ms.unwrap_or(seen).max(seen));
816 existing.trust_state = TrustState::Trusted;
817 })
818 .or_insert(entry);
819 Ok(self
820 .peers
821 .get(&fingerprint)
822 .expect("trusted peer entry should exist"))
823 }
824
825 pub fn is_trusted_fingerprint(&self, fingerprint: &str) -> bool {
826 self.peers
827 .get(fingerprint)
828 .is_some_and(|entry| entry.trust_state == TrustState::Trusted)
829 }
830
831 pub fn trusted_fingerprint_for_address(&self, address: SocketAddr) -> Option<&str> {
832 self.peers
833 .values()
834 .find(|entry| {
835 entry.trust_state == TrustState::Trusted && entry.addresses.contains(&address)
836 })
837 .map(|entry| entry.fingerprint.as_str())
838 }
839
840 pub fn forget(&mut self, fingerprint: &str) -> bool {
841 self.peers.remove(fingerprint).is_some()
842 }
843
844 pub fn records(&self) -> impl Iterator<Item = &KnownPeerEntry> {
845 self.peers.values()
846 }
847
848 pub fn to_toml_string(&self) -> Result<String> {
849 toml::to_string_pretty(self).map_err(|error| Error::ConfigSerialize(error.to_string()))
850 }
851}
852
853#[derive(Debug, Default, Clone, PartialEq, Eq)]
854pub struct PeerRegistry {
855 peers: BTreeMap<String, PeerRecord>,
856}
857
858#[derive(Debug, Clone, PartialEq, Eq)]
859pub enum PeerLookup<'a> {
860 Found(&'a PeerRecord),
861 Ambiguous(Vec<&'a PeerRecord>),
862 Missing,
863}
864
865impl PeerRegistry {
866 pub fn new() -> Self {
867 Self::default()
868 }
869
870 pub fn observe(&mut self, observation: PeerObservation) -> Result<&PeerRecord> {
871 if observation.fingerprint.trim().is_empty() {
872 return Err(Error::InvalidInput(
873 "peer fingerprint must not be empty".to_string(),
874 ));
875 }
876
877 let fingerprint = observation.fingerprint.clone();
878 if let Some(record) = self.peers.get_mut(&fingerprint) {
879 record.merge(observation);
880 } else {
881 self.peers.insert(
882 fingerprint.clone(),
883 PeerRecord::from_observation(observation),
884 );
885 }
886
887 Ok(self
888 .peers
889 .get(&fingerprint)
890 .expect("observed peer record should exist"))
891 }
892
893 pub fn get_by_fingerprint(&self, fingerprint: &str) -> Option<&PeerRecord> {
894 self.peers.get(fingerprint)
895 }
896
897 pub fn lookup(&self, query: &str) -> Option<&PeerRecord> {
898 match self.lookup_detail(query) {
899 PeerLookup::Found(record) => Some(record),
900 PeerLookup::Ambiguous(_) | PeerLookup::Missing => None,
901 }
902 }
903
904 pub fn lookup_detail(&self, query: &str) -> PeerLookup<'_> {
905 if let Some(record) = self.peers.get(query) {
906 return PeerLookup::Found(record);
907 }
908
909 let matches = self
910 .peers
911 .values()
912 .filter(|record| {
913 record.fingerprint.starts_with(query)
914 || record.aliases.iter().any(|alias| alias == query)
915 || record.hostnames.iter().any(|hostname| hostname == query)
916 })
917 .collect::<Vec<_>>();
918
919 match matches.as_slice() {
920 [] => PeerLookup::Missing,
921 [record] => PeerLookup::Found(record),
922 _ => PeerLookup::Ambiguous(matches),
923 }
924 }
925
926 pub fn records(&self) -> impl Iterator<Item = &PeerRecord> {
927 self.peers.values()
928 }
929}
930
931#[derive(Debug, Clone, PartialEq, Eq)]
932pub struct NativeAnnouncement {
933 pub alias: String,
934 pub fingerprint: String,
935 pub quic_port: u16,
936 pub listen_port: Option<u16>,
937}
938
939impl NativeAnnouncement {
940 pub fn from_config(config: &AppConfig, identity: &NativeIdentity) -> Self {
941 Self {
942 alias: config.alias.clone(),
943 fingerprint: identity.fingerprint().to_string(),
944 quic_port: config.quic_port,
945 listen_port: Some(config.listen_port),
946 }
947 }
948
949 fn service_info(&self) -> Result<ServiceInfo> {
950 let alias = sanitize_dns_label(&self.alias);
951 let fingerprint_prefix = self
952 .fingerprint
953 .get(..12)
954 .unwrap_or(self.fingerprint.as_str());
955 let instance = format!("{alias}-{fingerprint_prefix}");
956 let hostname = format!("{alias}.local.");
957 let properties = [
958 ("pv", NATIVE_PROTOCOL_VERSION.to_string()),
959 ("minpv", MIN_NATIVE_PROTOCOL_VERSION.to_string()),
960 ("fp", self.fingerprint.clone()),
961 ("alias", self.alias.clone()),
962 ("transports", "quic".to_string()),
963 ("quic_port", self.quic_port.to_string()),
964 (
965 "listen_port",
966 self.listen_port
967 .map(|port| port.to_string())
968 .unwrap_or_default(),
969 ),
970 ];
971
972 ServiceInfo::new(
973 NATIVE_SERVICE_TYPE,
974 &instance,
975 &hostname,
976 "",
977 self.quic_port,
978 &properties[..],
979 )
980 .map(ServiceInfo::enable_addr_auto)
981 .map_err(|error| Error::Discovery(error.to_string()))
982 }
983}
984
985pub struct NativeAnnouncer {
986 daemon: ServiceDaemon,
987 fullname: String,
988}
989
990impl NativeAnnouncer {
991 pub fn start(announcement: &NativeAnnouncement) -> Result<Self> {
992 let daemon = ServiceDaemon::new().map_err(|error| Error::Discovery(error.to_string()))?;
993 let service = announcement.service_info()?;
994 let fullname = service.get_fullname().to_string();
995 daemon
996 .register(service)
997 .map_err(|error| Error::Discovery(error.to_string()))?;
998 Ok(Self { daemon, fullname })
999 }
1000}
1001
1002impl Drop for NativeAnnouncer {
1003 fn drop(&mut self) {
1004 let _ = self.daemon.unregister(&self.fullname);
1005 let _ = self.daemon.shutdown();
1006 }
1007}
1008
1009pub async fn discover_native_peers(timeout: Duration) -> Result<PeerRegistry> {
1010 let daemon = ServiceDaemon::new().map_err(|error| Error::Discovery(error.to_string()))?;
1011 let receiver = daemon
1012 .browse(NATIVE_SERVICE_TYPE)
1013 .map_err(|error| Error::Discovery(error.to_string()))?;
1014 let deadline = tokio::time::Instant::now() + timeout;
1015 let mut registry = PeerRegistry::new();
1016
1017 loop {
1018 let now = tokio::time::Instant::now();
1019 if now >= deadline {
1020 break;
1021 }
1022
1023 let wait = deadline - now;
1024 match tokio::time::timeout(wait, receiver.recv_async()).await {
1025 Ok(Ok(ServiceEvent::ServiceResolved(service))) => {
1026 observe_resolved_service(&mut registry, &service)?;
1027 }
1028 Ok(Ok(_)) => {}
1029 Ok(Err(error)) => {
1030 return Err(Error::Discovery(error.to_string()));
1031 }
1032 Err(_) => break,
1033 }
1034 }
1035
1036 daemon
1037 .stop_browse(NATIVE_SERVICE_TYPE)
1038 .map_err(|error| Error::Discovery(error.to_string()))?;
1039 let _ = daemon.shutdown();
1040 Ok(registry)
1041}
1042
1043fn observe_resolved_service(registry: &mut PeerRegistry, service: &ResolvedService) -> Result<()> {
1044 let Some(fingerprint) = service.get_property_val_str("fp") else {
1045 return Ok(());
1046 };
1047 let fingerprint = match normalized_fingerprint(fingerprint.to_string()) {
1048 Ok(fingerprint) => fingerprint,
1049 Err(_) => return Ok(()),
1050 };
1051 let Some(highest_version) = parse_txt_u16(service, "pv") else {
1052 return Ok(());
1053 };
1054 let Some(minimum_version) = parse_txt_u16(service, "minpv") else {
1055 return Ok(());
1056 };
1057 if minimum_version > highest_version
1058 || minimum_version > NATIVE_PROTOCOL_VERSION
1059 || highest_version < MIN_NATIVE_PROTOCOL_VERSION
1060 {
1061 return Ok(());
1062 }
1063
1064 let Some(quic_port) = parse_txt_u16(service, "quic_port") else {
1065 return Ok(());
1066 };
1067 let transports = service
1068 .get_property_val_str("transports")
1069 .unwrap_or_default()
1070 .split(',')
1071 .map(str::trim)
1072 .filter(|value| !value.is_empty())
1073 .map(ToOwned::to_owned)
1074 .collect::<Vec<_>>();
1075 if !transports.iter().any(|transport| transport == "quic") {
1076 return Ok(());
1077 }
1078
1079 let alias = service.get_property_val_str("alias").map(ToOwned::to_owned);
1080 let hostname = Some(service.get_hostname().trim_end_matches('.').to_string());
1081 let seen_unix_ms = system_time_unix_ms(SystemTime::now()).unwrap_or_default();
1082
1083 for address in service.get_addresses_v4() {
1084 let mut observation = PeerObservation::new(
1085 fingerprint.clone(),
1086 SocketAddr::from((address, quic_port)),
1087 seen_unix_ms,
1088 );
1089 if let Some(alias) = &alias {
1090 observation = observation.with_alias(alias.clone());
1091 }
1092 if let Some(hostname) = &hostname {
1093 observation = observation.with_hostname(hostname.clone());
1094 }
1095 for transport in &transports {
1096 observation = observation.with_transport(transport.clone());
1097 }
1098 registry.observe(observation)?;
1099 }
1100
1101 Ok(())
1102}
1103
1104fn parse_txt_u16(service: &ResolvedService, key: &str) -> Option<u16> {
1105 service.get_property_val_str(key)?.parse().ok()
1106}
1107
1108fn sanitize_dns_label(value: &str) -> String {
1109 let mut label = value
1110 .chars()
1111 .map(|ch| {
1112 if ch.is_ascii_alphanumeric() || ch == '-' {
1113 ch.to_ascii_lowercase()
1114 } else {
1115 '-'
1116 }
1117 })
1118 .collect::<String>();
1119 while label.contains("--") {
1120 label = label.replace("--", "-");
1121 }
1122 let label = label.trim_matches('-').to_string();
1123 if label.is_empty() {
1124 "fileferry-device".to_string()
1125 } else {
1126 label
1127 }
1128}
1129
1130#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1131struct DirectReceivePlan {
1132 files: Vec<DirectFilePlan>,
1133}
1134
1135#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1136struct DirectProtocolHello {
1137 protocol_version: u16,
1138 min_protocol_version: u16,
1139 client_fingerprint: String,
1140 #[serde(default)]
1141 psk: bool,
1142}
1143
1144impl DirectProtocolHello {
1145 fn new(identity: &NativeIdentity, psk: Option<&str>) -> Self {
1146 Self {
1147 protocol_version: NATIVE_PROTOCOL_VERSION,
1148 min_protocol_version: MIN_NATIVE_PROTOCOL_VERSION,
1149 client_fingerprint: identity.fingerprint().to_string(),
1150 psk: psk.is_some(),
1151 }
1152 }
1153}
1154
1155#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1156struct DirectProtocolResponse {
1157 accepted: bool,
1158 protocol_version: Option<u16>,
1159 #[serde(default)]
1160 psk_challenge: Option<String>,
1161 error: Option<String>,
1162}
1163
1164impl DirectProtocolResponse {
1165 fn accept(protocol_version: u16) -> Self {
1166 Self {
1167 accepted: true,
1168 protocol_version: Some(protocol_version),
1169 psk_challenge: None,
1170 error: None,
1171 }
1172 }
1173
1174 fn accept_with_psk(protocol_version: u16, challenge: impl Into<String>) -> Self {
1175 Self {
1176 accepted: true,
1177 protocol_version: Some(protocol_version),
1178 psk_challenge: Some(challenge.into()),
1179 error: None,
1180 }
1181 }
1182
1183 fn reject(error: impl Into<String>) -> Self {
1184 Self {
1185 accepted: false,
1186 protocol_version: None,
1187 psk_challenge: None,
1188 error: Some(error.into()),
1189 }
1190 }
1191}
1192
1193#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1194struct DirectPskProof {
1195 proof: String,
1196}
1197
1198#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1199struct DirectPskResult {
1200 accepted: bool,
1201 error: Option<String>,
1202}
1203
1204impl DirectPskResult {
1205 fn accept() -> Self {
1206 Self {
1207 accepted: true,
1208 error: None,
1209 }
1210 }
1211
1212 fn reject(error: impl Into<String>) -> Self {
1213 Self {
1214 accepted: false,
1215 error: Some(error.into()),
1216 }
1217 }
1218}
1219
1220#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1221struct DirectFilePlan {
1222 relative_path: PathBuf,
1223 action: DirectFileAction,
1224 offset: u64,
1225}
1226
1227#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1228#[serde(rename_all = "snake_case")]
1229enum DirectFileAction {
1230 Receive,
1231 Skip,
1232}
1233
1234#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1235struct DirectPayloadHeader {
1236 relative_path: PathBuf,
1237 offset: u64,
1238 bytes: u64,
1239}
1240
1241#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1242struct DirectAck {
1243 ok: bool,
1244 error: Option<String>,
1245}
1246
1247pub fn validate_send_paths(paths: &[PathBuf]) -> Result<()> {
1248 if paths.is_empty() {
1249 return Err(Error::InvalidInput(
1250 "at least one file path is required".to_string(),
1251 ));
1252 }
1253
1254 for path in paths {
1255 if path.as_os_str().is_empty() {
1256 return Err(Error::InvalidInput("empty file path".to_string()));
1257 }
1258 }
1259
1260 Ok(())
1261}
1262
1263pub fn display_path(path: &Path) -> String {
1264 path.to_string_lossy().into_owned()
1265}
1266
1267pub async fn build_manifest(paths: &[PathBuf]) -> Result<TransferManifest> {
1268 validate_send_paths(paths)?;
1269
1270 let sources = collect_manifest_sources(paths)?;
1271 let mut entries = Vec::with_capacity(sources.len());
1272 let mut seen = BTreeSet::new();
1273
1274 for source in sources {
1275 if !seen.insert(source.relative_path.clone()) {
1276 return Err(Error::InvalidInput(format!(
1277 "duplicate transfer path: {}",
1278 display_path(&source.relative_path)
1279 )));
1280 }
1281
1282 entries.push(build_manifest_entry(source).await?);
1283 }
1284
1285 Ok(TransferManifest {
1286 version: MANIFEST_VERSION,
1287 session_id: Uuid::new_v4(),
1288 chunk_size: DEFAULT_CHUNK_SIZE,
1289 entries,
1290 })
1291}
1292
1293pub fn safe_destination_path(destination: &Path, relative_path: &Path) -> Result<PathBuf> {
1294 validate_relative_transfer_path(relative_path)?;
1295 Ok(destination.join(relative_path))
1296}
1297
1298pub async fn decide_resume(
1299 destination: &Path,
1300 entry: &ManifestEntry,
1301 chunk_size: u64,
1302) -> Result<ResumeDecision> {
1303 if entry.kind != ManifestEntryKind::File {
1304 return Ok(ResumeDecision::Create);
1305 }
1306
1307 let path = safe_destination_path(destination, &entry.relative_path)?;
1308 let metadata = match tokio::fs::metadata(&path).await {
1309 Ok(metadata) => metadata,
1310 Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
1311 return Ok(ResumeDecision::Create);
1312 }
1313 Err(source) => {
1314 return Err(Error::IoPath { path, source });
1315 }
1316 };
1317
1318 if !metadata.is_file() {
1319 return Ok(ResumeDecision::Conflict(format!(
1320 "{} already exists and is not a regular file",
1321 display_path(&path)
1322 )));
1323 }
1324
1325 if metadata.len() == entry.size {
1326 let expected = entry
1327 .blake3
1328 .as_deref()
1329 .ok_or_else(|| Error::Protocol("file manifest entry is missing BLAKE3".to_string()))?;
1330 let actual = hash_file(&path).await?;
1331 return if actual == expected {
1332 Ok(ResumeDecision::Skip)
1333 } else {
1334 Ok(ResumeDecision::Conflict(format!(
1335 "{} already exists with different contents",
1336 display_path(&path)
1337 )))
1338 };
1339 }
1340
1341 if metadata.len() > entry.size {
1342 return Ok(ResumeDecision::Conflict(format!(
1343 "{} is larger than incoming file",
1344 display_path(&path)
1345 )));
1346 }
1347
1348 let verified_offset = verified_prefix_offset(&path, entry, metadata.len(), chunk_size).await?;
1349 if verified_offset > 0 {
1350 Ok(ResumeDecision::ResumeFrom(verified_offset))
1351 } else {
1352 Ok(ResumeDecision::Conflict(format!(
1353 "{} already exists and does not match a verified prefix",
1354 display_path(&path)
1355 )))
1356 }
1357}
1358
1359pub async fn send_direct_file(
1360 request: &SendRequest,
1361 identity: &NativeIdentity,
1362 mut events: impl FnMut(TransferEvent),
1363) -> Result<TransferSummary> {
1364 send_direct_file_with_control(request, identity, TransferControl::new(), &mut events).await
1365}
1366
1367pub async fn send_direct_file_with_control(
1368 request: &SendRequest,
1369 identity: &NativeIdentity,
1370 control: TransferControl,
1371 mut events: impl FnMut(TransferEvent),
1372) -> Result<TransferSummary> {
1373 send_direct_file_with_control_and_trust(request, identity, control, &mut events, |_| Ok(()))
1374 .await
1375}
1376
1377pub async fn send_direct_file_with_control_and_trust(
1378 request: &SendRequest,
1379 identity: &NativeIdentity,
1380 control: TransferControl,
1381 mut events: impl FnMut(TransferEvent),
1382 mut confirm_unpinned_receiver: impl FnMut(&str) -> Result<()>,
1383) -> Result<TransferSummary> {
1384 ensure_rustls_provider();
1385
1386 control.check_cancelled()?;
1387 let manifest = build_manifest(request.paths()).await?;
1388 let sources = manifest_source_map(request.paths())?;
1389 events(TransferEvent::SessionStarted {
1390 direction: TransferDirection::Send,
1391 session_id: manifest.session_id,
1392 files_total: manifest.file_entries().count(),
1393 bytes_total: manifest.total_file_bytes(),
1394 });
1395 control.check_cancelled()?;
1396
1397 let (client_config, server_fingerprint) = client_config_capturing_server_fingerprint()?;
1398 let mut endpoint = quinn::Endpoint::client(SocketAddr::from(([0, 0, 0, 0], 0)))?;
1399 endpoint.set_default_client_config(client_config);
1400 let connection = endpoint
1401 .connect(request.peer().address(), "localhost")?
1402 .await?;
1403 let actual = captured_server_fingerprint(&server_fingerprint)?;
1404 match request.peer().expected_fingerprint() {
1405 Some(expected) if actual != expected => {
1406 connection.close(1u32.into(), b"fingerprint mismatch");
1407 return Err(Error::PeerFingerprintMismatch {
1408 expected: expected.to_string(),
1409 actual,
1410 });
1411 }
1412 Some(_) => {}
1413 None => {
1414 if let Err(error) = confirm_unpinned_receiver(&actual) {
1415 connection.close(1u32.into(), b"receiver fingerprint not trusted");
1416 return Err(error);
1417 }
1418 }
1419 }
1420 let (mut send, mut recv) = connection.open_bi().await?;
1421
1422 send.write_all(DIRECT_MAGIC).await?;
1423 write_json_frame(
1424 &mut send,
1425 &DirectProtocolHello::new(identity, request.psk()),
1426 )
1427 .await?;
1428 let protocol: DirectProtocolResponse = read_json_frame(&mut recv).await?;
1429 if !protocol.accepted {
1430 return Err(Error::Protocol(protocol.error.unwrap_or_else(|| {
1431 "receiver rejected protocol negotiation".to_string()
1432 })));
1433 }
1434 if protocol.protocol_version != Some(NATIVE_PROTOCOL_VERSION) {
1435 return Err(Error::Protocol(format!(
1436 "receiver selected unsupported protocol version {:?}",
1437 protocol.protocol_version
1438 )));
1439 }
1440 match (request.psk(), protocol.psk_challenge.as_deref()) {
1441 (Some(psk), Some(challenge)) => {
1442 let proof = DirectPskProof {
1443 proof: psk_proof(
1444 psk,
1445 challenge,
1446 identity.fingerprint(),
1447 NATIVE_PROTOCOL_VERSION,
1448 )?,
1449 };
1450 write_json_frame(&mut send, &proof).await?;
1451 let auth: DirectPskResult = read_json_frame(&mut recv).await?;
1452 if !auth.accepted {
1453 return Err(Error::Authentication(
1454 auth.error
1455 .unwrap_or_else(|| "PSK authentication rejected".to_string()),
1456 ));
1457 }
1458 }
1459 (Some(_), None) => {
1460 return Err(Error::Authentication(
1461 "receiver is not configured for PSK mode".to_string(),
1462 ));
1463 }
1464 (None, Some(_)) => {
1465 return Err(Error::Authentication(
1466 "receiver requires PSK authentication".to_string(),
1467 ));
1468 }
1469 (None, None) => {}
1470 }
1471
1472 write_json_frame(&mut send, &manifest).await?;
1473
1474 let plan: DirectReceivePlan = read_json_frame(&mut recv).await?;
1475 let mut summaries = Vec::new();
1476 for file_plan in plan.files {
1477 let Some(entry) = manifest
1478 .file_entries()
1479 .find(|entry| entry.relative_path == file_plan.relative_path)
1480 else {
1481 return Err(Error::Protocol(format!(
1482 "receiver requested unknown file: {}",
1483 display_path(&file_plan.relative_path)
1484 )));
1485 };
1486
1487 let Some(source_path) = sources.get(&file_plan.relative_path) else {
1488 return Err(Error::Protocol(format!(
1489 "missing source for manifest file: {}",
1490 display_path(&file_plan.relative_path)
1491 )));
1492 };
1493
1494 if file_plan.action == DirectFileAction::Skip {
1495 let blake3 = entry.blake3.clone().unwrap_or_default();
1496 events(TransferEvent::FileFinished {
1497 direction: TransferDirection::Send,
1498 file_name: display_path(&entry.relative_path),
1499 bytes: entry.size,
1500 blake3: blake3.clone(),
1501 status: TransferFileStatus::Skipped,
1502 });
1503 summaries.push(TransferFileSummary {
1504 path: source_path.clone(),
1505 relative_path: entry.relative_path.clone(),
1506 bytes: entry.size,
1507 blake3,
1508 status: TransferFileStatus::Skipped,
1509 });
1510 continue;
1511 }
1512
1513 match send_payload_file(
1514 &mut send,
1515 source_path,
1516 entry,
1517 file_plan.offset,
1518 &control,
1519 &mut events,
1520 )
1521 .await
1522 {
1523 Ok(()) => {}
1524 Err(Error::TransferCancelled) => {
1525 events(TransferEvent::SessionCancelled {
1526 direction: TransferDirection::Send,
1527 session_id: manifest.session_id,
1528 });
1529 connection.close(1u32.into(), b"cancelled");
1530 return Err(Error::TransferCancelled);
1531 }
1532 Err(error) => return Err(error),
1533 }
1534 let status = if file_plan.offset > 0 {
1535 TransferFileStatus::Resumed
1536 } else {
1537 TransferFileStatus::Sent
1538 };
1539 let blake3 = entry.blake3.clone().unwrap_or_default();
1540 events(TransferEvent::FileFinished {
1541 direction: TransferDirection::Send,
1542 file_name: display_path(&entry.relative_path),
1543 bytes: entry.size,
1544 blake3: blake3.clone(),
1545 status,
1546 });
1547 summaries.push(TransferFileSummary {
1548 path: source_path.clone(),
1549 relative_path: entry.relative_path.clone(),
1550 bytes: entry.size,
1551 blake3,
1552 status,
1553 });
1554 }
1555 send.finish()?;
1556
1557 let ack: DirectAck = read_json_frame(&mut recv).await?;
1558 if !ack.ok {
1559 return Err(Error::Transfer(
1560 ack.error
1561 .unwrap_or_else(|| "receiver rejected transfer".to_string()),
1562 ));
1563 }
1564
1565 connection.close(0u32.into(), b"done");
1566 endpoint.wait_idle().await;
1567
1568 let summary = summary_from_files(&manifest, summaries)?;
1569 events(TransferEvent::SessionFinished {
1570 direction: TransferDirection::Send,
1571 files_total: summary.files.len(),
1572 bytes_total: summary.bytes,
1573 blake3: summary.blake3.clone(),
1574 });
1575 Ok(summary)
1576}
1577
1578pub async fn receive_direct_file(
1579 request: &ReceiveRequest,
1580 destination: impl AsRef<Path>,
1581 identity: &NativeIdentity,
1582 events: impl FnMut(TransferEvent),
1583) -> Result<TransferSummary> {
1584 receive_direct_file_with_control(
1585 request,
1586 destination,
1587 identity,
1588 TransferControl::new(),
1589 events,
1590 )
1591 .await
1592}
1593
1594pub async fn receive_direct_file_with_control(
1595 request: &ReceiveRequest,
1596 destination: impl AsRef<Path>,
1597 identity: &NativeIdentity,
1598 control: TransferControl,
1599 mut events: impl FnMut(TransferEvent),
1600) -> Result<TransferSummary> {
1601 ensure_rustls_provider();
1602
1603 control.check_cancelled()?;
1604 let destination = destination.as_ref();
1605 tokio::fs::create_dir_all(destination)
1606 .await
1607 .map_err(|source| Error::IoPath {
1608 path: destination.to_path_buf(),
1609 source,
1610 })?;
1611
1612 let server_config =
1613 quinn::ServerConfig::with_single_cert(identity.cert_chain(), identity.private_key())?;
1614 let endpoint = quinn::Endpoint::server(server_config, request.listen())?;
1615 let incoming = endpoint.accept().await.ok_or(Error::NoIncomingConnection)?;
1616 let connection = incoming.await?;
1617 let (mut send, mut recv) = connection.accept_bi().await?;
1618
1619 let result = receive_stream_file(
1620 destination,
1621 request.psk(),
1622 &mut recv,
1623 &mut send,
1624 &control,
1625 &mut events,
1626 )
1627 .await;
1628 let ack = match &result {
1629 Ok(_) => DirectAck {
1630 ok: true,
1631 error: None,
1632 },
1633 Err(error) => DirectAck {
1634 ok: false,
1635 error: Some(error.to_string()),
1636 },
1637 };
1638 write_json_frame(&mut send, &ack).await?;
1639 send.finish()?;
1640 let _ = tokio::time::timeout(Duration::from_secs(1), connection.closed()).await;
1641 endpoint.close(0u32.into(), b"done");
1642 endpoint.wait_idle().await;
1643
1644 result
1645}
1646
1647async fn receive_stream_file(
1648 destination: &Path,
1649 psk: Option<&str>,
1650 recv: &mut quinn::RecvStream,
1651 send: &mut quinn::SendStream,
1652 control: &TransferControl,
1653 events: &mut impl FnMut(TransferEvent),
1654) -> Result<TransferSummary> {
1655 let mut magic = [0; DIRECT_MAGIC.len()];
1656 recv.read_exact(&mut magic).await?;
1657 if &magic != DIRECT_MAGIC {
1658 return Err(Error::Protocol("bad direct-transfer magic".to_string()));
1659 }
1660
1661 let hello: DirectProtocolHello = read_json_frame(recv).await?;
1662 let protocol = negotiate_direct_protocol(&hello, psk);
1663 write_json_frame(send, &protocol).await?;
1664 if !protocol.accepted {
1665 return Err(Error::Protocol(protocol.error.unwrap_or_else(|| {
1666 "unsupported direct protocol version".to_string()
1667 })));
1668 }
1669 if let (Some(psk), Some(challenge)) = (psk, protocol.psk_challenge.as_deref()) {
1670 let proof: DirectPskProof = read_json_frame(recv).await?;
1671 let expected = psk_proof(
1672 psk,
1673 challenge,
1674 &hello.client_fingerprint,
1675 protocol.protocol_version.unwrap_or(NATIVE_PROTOCOL_VERSION),
1676 )?;
1677 if proof.proof != expected {
1678 write_json_frame(send, &DirectPskResult::reject("PSK proof mismatch")).await?;
1679 return Err(Error::Authentication("PSK proof mismatch".to_string()));
1680 }
1681 write_json_frame(send, &DirectPskResult::accept()).await?;
1682 }
1683
1684 let manifest: TransferManifest = read_json_frame(recv).await?;
1685 validate_manifest(&manifest)?;
1686 events(TransferEvent::SessionStarted {
1687 direction: TransferDirection::Receive,
1688 session_id: manifest.session_id,
1689 files_total: manifest.file_entries().count(),
1690 bytes_total: manifest.total_file_bytes(),
1691 });
1692 control.check_cancelled()?;
1693
1694 for entry in &manifest.entries {
1695 if entry.kind == ManifestEntryKind::Directory {
1696 let path = safe_destination_path(destination, &entry.relative_path)?;
1697 tokio::fs::create_dir_all(&path)
1698 .await
1699 .map_err(|source| Error::IoPath { path, source })?;
1700 }
1701 }
1702
1703 let mut plan = DirectReceivePlan { files: Vec::new() };
1704 for entry in manifest.file_entries() {
1705 let decision = decide_resume(destination, entry, manifest.chunk_size).await?;
1706 let (action, offset) = match decision {
1707 ResumeDecision::Create => (DirectFileAction::Receive, 0),
1708 ResumeDecision::Skip => (DirectFileAction::Skip, entry.size),
1709 ResumeDecision::ResumeFrom(offset) => (DirectFileAction::Receive, offset),
1710 ResumeDecision::Conflict(message) => return Err(Error::InvalidInput(message)),
1711 };
1712 plan.files.push(DirectFilePlan {
1713 relative_path: entry.relative_path.clone(),
1714 action,
1715 offset,
1716 });
1717 }
1718 write_json_frame(send, &plan).await?;
1719
1720 let mut summaries = Vec::new();
1721 for file_plan in &plan.files {
1722 let Some(entry) = manifest
1723 .file_entries()
1724 .find(|entry| entry.relative_path == file_plan.relative_path)
1725 else {
1726 return Err(Error::Protocol(format!(
1727 "planned file missing from manifest: {}",
1728 display_path(&file_plan.relative_path)
1729 )));
1730 };
1731
1732 let final_path = safe_destination_path(destination, &entry.relative_path)?;
1733 if file_plan.action == DirectFileAction::Skip {
1734 let blake3 = entry.blake3.clone().unwrap_or_default();
1735 events(TransferEvent::FileFinished {
1736 direction: TransferDirection::Receive,
1737 file_name: display_path(&entry.relative_path),
1738 bytes: entry.size,
1739 blake3: blake3.clone(),
1740 status: TransferFileStatus::Skipped,
1741 });
1742 summaries.push(TransferFileSummary {
1743 path: final_path,
1744 relative_path: entry.relative_path.clone(),
1745 bytes: entry.size,
1746 blake3,
1747 status: TransferFileStatus::Skipped,
1748 });
1749 continue;
1750 }
1751
1752 match receive_payload_file(recv, destination, entry, file_plan.offset, control, events)
1753 .await
1754 {
1755 Ok(()) => {}
1756 Err(Error::TransferCancelled) => {
1757 events(TransferEvent::SessionCancelled {
1758 direction: TransferDirection::Receive,
1759 session_id: manifest.session_id,
1760 });
1761 return Err(Error::TransferCancelled);
1762 }
1763 Err(error) => return Err(error),
1764 }
1765 let actual_hash = hash_file(&final_path).await?;
1766 let expected_hash = entry
1767 .blake3
1768 .as_deref()
1769 .ok_or_else(|| Error::Protocol("file manifest entry is missing BLAKE3".to_string()))?;
1770 if actual_hash != expected_hash {
1771 return Err(Error::Transfer(format!(
1772 "BLAKE3 hash mismatch for {}",
1773 display_path(&entry.relative_path)
1774 )));
1775 }
1776
1777 let status = if file_plan.offset > 0 {
1778 TransferFileStatus::Resumed
1779 } else {
1780 TransferFileStatus::Received
1781 };
1782 events(TransferEvent::FileFinished {
1783 direction: TransferDirection::Receive,
1784 file_name: display_path(&entry.relative_path),
1785 bytes: entry.size,
1786 blake3: actual_hash.clone(),
1787 status,
1788 });
1789 summaries.push(TransferFileSummary {
1790 path: final_path,
1791 relative_path: entry.relative_path.clone(),
1792 bytes: entry.size,
1793 blake3: actual_hash,
1794 status,
1795 });
1796 }
1797
1798 let summary = summary_from_files(&manifest, summaries)?;
1799 events(TransferEvent::SessionFinished {
1800 direction: TransferDirection::Receive,
1801 files_total: summary.files.len(),
1802 bytes_total: summary.bytes,
1803 blake3: summary.blake3.clone(),
1804 });
1805 Ok(summary)
1806}
1807
1808async fn send_payload_file(
1809 send: &mut quinn::SendStream,
1810 source_path: &Path,
1811 entry: &ManifestEntry,
1812 offset: u64,
1813 control: &TransferControl,
1814 events: &mut impl FnMut(TransferEvent),
1815) -> Result<()> {
1816 if offset > entry.size {
1817 return Err(Error::Protocol(format!(
1818 "resume offset exceeds file size for {}",
1819 display_path(&entry.relative_path)
1820 )));
1821 }
1822
1823 let header = DirectPayloadHeader {
1824 relative_path: entry.relative_path.clone(),
1825 offset,
1826 bytes: entry.size - offset,
1827 };
1828 write_json_frame(send, &header).await?;
1829 events(TransferEvent::FileStarted {
1830 direction: TransferDirection::Send,
1831 file_name: display_path(&entry.relative_path),
1832 bytes_total: entry.size,
1833 resume_offset: offset,
1834 });
1835
1836 let mut file = tokio::fs::File::open(source_path)
1837 .await
1838 .map_err(|source| Error::IoPath {
1839 path: source_path.to_path_buf(),
1840 source,
1841 })?;
1842 file.seek(SeekFrom::Start(offset))
1843 .await
1844 .map_err(|source| Error::IoPath {
1845 path: source_path.to_path_buf(),
1846 source,
1847 })?;
1848
1849 let mut buffer = vec![0; IO_BUFFER_SIZE];
1850 let mut bytes_done = offset;
1851 while bytes_done < entry.size {
1852 control.check_cancelled()?;
1853 let remaining = entry.size - bytes_done;
1854 let read_size = buffer.len().min(remaining as usize);
1855 let read = file
1856 .read(&mut buffer[..read_size])
1857 .await
1858 .map_err(|source| Error::IoPath {
1859 path: source_path.to_path_buf(),
1860 source,
1861 })?;
1862 if read == 0 {
1863 return Err(Error::Protocol(format!(
1864 "source file ended early: {}",
1865 display_path(source_path)
1866 )));
1867 }
1868 send.write_all(&buffer[..read]).await?;
1869 bytes_done += read as u64;
1870 events(TransferEvent::Progress {
1871 direction: TransferDirection::Send,
1872 file_name: display_path(&entry.relative_path),
1873 bytes_done,
1874 bytes_total: entry.size,
1875 });
1876 control.check_cancelled()?;
1877 }
1878
1879 Ok(())
1880}
1881
1882async fn receive_payload_file(
1883 recv: &mut quinn::RecvStream,
1884 destination: &Path,
1885 entry: &ManifestEntry,
1886 offset: u64,
1887 control: &TransferControl,
1888 events: &mut impl FnMut(TransferEvent),
1889) -> Result<()> {
1890 let header: DirectPayloadHeader = read_json_frame(recv).await?;
1891 if header.relative_path != entry.relative_path
1892 || header.offset != offset
1893 || header.bytes != entry.size - offset
1894 {
1895 return Err(Error::Protocol(format!(
1896 "payload header does not match receive plan for {}",
1897 display_path(&entry.relative_path)
1898 )));
1899 }
1900 events(TransferEvent::FileStarted {
1901 direction: TransferDirection::Receive,
1902 file_name: display_path(&entry.relative_path),
1903 bytes_total: entry.size,
1904 resume_offset: offset,
1905 });
1906
1907 let final_path = safe_destination_path(destination, &entry.relative_path)?;
1908 if let Some(parent) = final_path.parent() {
1909 tokio::fs::create_dir_all(parent)
1910 .await
1911 .map_err(|source| Error::IoPath {
1912 path: parent.to_path_buf(),
1913 source,
1914 })?;
1915 }
1916
1917 let write_path = if offset == 0 {
1918 temp_path_for(&final_path)
1919 } else {
1920 final_path.clone()
1921 };
1922
1923 let mut file = if offset == 0 {
1924 tokio::fs::File::create(&write_path)
1925 .await
1926 .map_err(|source| Error::IoPath {
1927 path: write_path.clone(),
1928 source,
1929 })?
1930 } else {
1931 let mut file = tokio::fs::OpenOptions::new()
1932 .write(true)
1933 .open(&write_path)
1934 .await
1935 .map_err(|source| Error::IoPath {
1936 path: write_path.clone(),
1937 source,
1938 })?;
1939 file.set_len(offset).await.map_err(|source| Error::IoPath {
1940 path: write_path.clone(),
1941 source,
1942 })?;
1943 file.seek(SeekFrom::Start(offset))
1944 .await
1945 .map_err(|source| Error::IoPath {
1946 path: write_path.clone(),
1947 source,
1948 })?;
1949 file
1950 };
1951
1952 let mut bytes_done = offset;
1953 let mut bytes_remaining = header.bytes;
1954 let mut buffer = vec![0; IO_BUFFER_SIZE];
1955 while bytes_remaining > 0 {
1956 if control.is_cancelled() {
1957 if offset == 0 {
1958 let _ = tokio::fs::remove_file(&write_path).await;
1959 }
1960 return Err(Error::TransferCancelled);
1961 }
1962 let read_size = buffer.len().min(bytes_remaining as usize);
1963 let read = match recv.read(&mut buffer[..read_size]).await {
1964 Ok(Some(read)) => read,
1965 Ok(None) => {
1966 if offset == 0 {
1967 let _ = tokio::fs::remove_file(&write_path).await;
1968 }
1969 return Err(Error::Protocol(
1970 "stream ended before file payload".to_string(),
1971 ));
1972 }
1973 Err(error) => {
1974 if offset == 0 {
1975 let _ = tokio::fs::remove_file(&write_path).await;
1976 }
1977 return Err(Error::Read(error));
1978 }
1979 };
1980
1981 file.write_all(&buffer[..read])
1982 .await
1983 .map_err(|source| Error::IoPath {
1984 path: write_path.clone(),
1985 source,
1986 })?;
1987 bytes_done += read as u64;
1988 bytes_remaining -= read as u64;
1989 events(TransferEvent::Progress {
1990 direction: TransferDirection::Receive,
1991 file_name: display_path(&entry.relative_path),
1992 bytes_done,
1993 bytes_total: entry.size,
1994 });
1995 if control.is_cancelled() {
1996 if offset == 0 {
1997 let _ = tokio::fs::remove_file(&write_path).await;
1998 }
1999 return Err(Error::TransferCancelled);
2000 }
2001 }
2002
2003 file.flush().await.map_err(|source| Error::IoPath {
2004 path: write_path.clone(),
2005 source,
2006 })?;
2007 drop(file);
2008
2009 if offset == 0 {
2010 tokio::fs::rename(&write_path, &final_path)
2011 .await
2012 .map_err(|source| Error::IoPath {
2013 path: final_path,
2014 source,
2015 })?;
2016 }
2017
2018 Ok(())
2019}
2020
2021async fn write_json_frame<T: Serialize>(send: &mut quinn::SendStream, value: &T) -> Result<()> {
2022 let bytes = serde_json::to_vec(value).map_err(|error| Error::Protocol(error.to_string()))?;
2023 if bytes.len() > MAX_FRAME_LEN {
2024 return Err(Error::Protocol("frame exceeds maximum length".to_string()));
2025 }
2026 send.write_all(&(bytes.len() as u32).to_be_bytes()).await?;
2027 send.write_all(&bytes).await?;
2028 Ok(())
2029}
2030
2031async fn read_json_frame<T: for<'de> Deserialize<'de>>(recv: &mut quinn::RecvStream) -> Result<T> {
2032 let mut len = [0; 4];
2033 recv.read_exact(&mut len).await?;
2034 let len = u32::from_be_bytes(len) as usize;
2035 if len > MAX_FRAME_LEN {
2036 return Err(Error::Protocol("frame exceeds maximum length".to_string()));
2037 }
2038
2039 let mut bytes = vec![0; len];
2040 recv.read_exact(&mut bytes).await?;
2041 serde_json::from_slice(&bytes).map_err(|error| Error::Protocol(error.to_string()))
2042}
2043
2044fn negotiate_direct_protocol(
2045 hello: &DirectProtocolHello,
2046 psk: Option<&str>,
2047) -> DirectProtocolResponse {
2048 if hello.min_protocol_version > NATIVE_PROTOCOL_VERSION {
2049 return DirectProtocolResponse::reject(format!(
2050 "peer requires protocol version {}, local max is {}",
2051 hello.min_protocol_version, NATIVE_PROTOCOL_VERSION
2052 ));
2053 }
2054
2055 if hello.protocol_version < MIN_NATIVE_PROTOCOL_VERSION {
2056 return DirectProtocolResponse::reject(format!(
2057 "peer max protocol version {} is below local minimum {}",
2058 hello.protocol_version, MIN_NATIVE_PROTOCOL_VERSION
2059 ));
2060 }
2061
2062 let selected_version = NATIVE_PROTOCOL_VERSION.min(hello.protocol_version);
2063 match (psk, hello.psk) {
2064 (Some(_), false) => DirectProtocolResponse::reject("receiver requires PSK authentication"),
2065 (Some(_), true) => {
2066 DirectProtocolResponse::accept_with_psk(selected_version, Uuid::new_v4().to_string())
2067 }
2068 (None, true) => DirectProtocolResponse::reject("receiver is not configured for PSK mode"),
2069 (None, false) => DirectProtocolResponse::accept(selected_version),
2070 }
2071}
2072
2073#[derive(Debug, Clone)]
2074struct ManifestSource {
2075 source_path: PathBuf,
2076 relative_path: PathBuf,
2077}
2078
2079fn collect_manifest_sources(paths: &[PathBuf]) -> Result<Vec<ManifestSource>> {
2080 let mut sources = Vec::new();
2081 for path in paths {
2082 let metadata = std::fs::symlink_metadata(path).map_err(|source| Error::IoPath {
2083 path: path.clone(),
2084 source,
2085 })?;
2086 if metadata.file_type().is_symlink() {
2087 return Err(Error::InvalidInput(format!(
2088 "{} is a symlink; symlink transfers are not supported",
2089 display_path(path)
2090 )));
2091 }
2092
2093 let root_name = path
2094 .file_name()
2095 .ok_or_else(|| Error::InvalidInput(format!("{} has no file name", display_path(path))))?
2096 .to_os_string();
2097 let relative_path = PathBuf::from(root_name);
2098
2099 if metadata.is_dir() {
2100 sources.push(ManifestSource {
2101 source_path: path.clone(),
2102 relative_path: relative_path.clone(),
2103 });
2104 collect_dir_sources(path, &relative_path, &mut sources)?;
2105 } else if metadata.is_file() {
2106 sources.push(ManifestSource {
2107 source_path: path.clone(),
2108 relative_path,
2109 });
2110 } else {
2111 return Err(Error::InvalidInput(format!(
2112 "{} is not a regular file or directory",
2113 display_path(path)
2114 )));
2115 }
2116 }
2117
2118 Ok(sources)
2119}
2120
2121fn collect_dir_sources(
2122 dir: &Path,
2123 relative_dir: &Path,
2124 sources: &mut Vec<ManifestSource>,
2125) -> Result<()> {
2126 for entry in std::fs::read_dir(dir).map_err(|source| Error::IoPath {
2127 path: dir.to_path_buf(),
2128 source,
2129 })? {
2130 let entry = entry.map_err(|source| Error::IoPath {
2131 path: dir.to_path_buf(),
2132 source,
2133 })?;
2134 let path = entry.path();
2135 let metadata = std::fs::symlink_metadata(&path).map_err(|source| Error::IoPath {
2136 path: path.clone(),
2137 source,
2138 })?;
2139 if metadata.file_type().is_symlink() {
2140 return Err(Error::InvalidInput(format!(
2141 "{} is a symlink; symlink transfers are not supported",
2142 display_path(&path)
2143 )));
2144 }
2145
2146 let relative_path = relative_dir.join(entry.file_name());
2147 if metadata.is_dir() {
2148 sources.push(ManifestSource {
2149 source_path: path.clone(),
2150 relative_path: relative_path.clone(),
2151 });
2152 collect_dir_sources(&path, &relative_path, sources)?;
2153 } else if metadata.is_file() {
2154 sources.push(ManifestSource {
2155 source_path: path,
2156 relative_path,
2157 });
2158 }
2159 }
2160
2161 Ok(())
2162}
2163
2164fn manifest_source_map(paths: &[PathBuf]) -> Result<BTreeMap<PathBuf, PathBuf>> {
2165 Ok(collect_manifest_sources(paths)?
2166 .into_iter()
2167 .filter_map(|source| {
2168 let metadata = std::fs::metadata(&source.source_path).ok()?;
2169 metadata
2170 .is_file()
2171 .then_some((source.relative_path, source.source_path))
2172 })
2173 .collect())
2174}
2175
2176async fn build_manifest_entry(source: ManifestSource) -> Result<ManifestEntry> {
2177 validate_relative_transfer_path(&source.relative_path)?;
2178 let metadata = tokio::fs::metadata(&source.source_path)
2179 .await
2180 .map_err(|source_error| Error::IoPath {
2181 path: source.source_path.clone(),
2182 source: source_error,
2183 })?;
2184 let unix_mode = unix_mode(&metadata);
2185 let modified_unix_ms = modified_unix_ms(&metadata);
2186
2187 if metadata.is_dir() {
2188 return Ok(ManifestEntry {
2189 relative_path: source.relative_path,
2190 kind: ManifestEntryKind::Directory,
2191 size: 0,
2192 blake3: None,
2193 chunks: Vec::new(),
2194 unix_mode,
2195 modified_unix_ms,
2196 });
2197 }
2198
2199 let (blake3, chunks) = hash_file_with_chunks(&source.source_path, DEFAULT_CHUNK_SIZE).await?;
2200 Ok(ManifestEntry {
2201 relative_path: source.relative_path,
2202 kind: ManifestEntryKind::File,
2203 size: metadata.len(),
2204 blake3: Some(blake3),
2205 chunks,
2206 unix_mode,
2207 modified_unix_ms,
2208 })
2209}
2210
2211fn validate_manifest(manifest: &TransferManifest) -> Result<()> {
2212 if manifest.version != MANIFEST_VERSION {
2213 return Err(Error::Protocol(format!(
2214 "unsupported manifest version {}",
2215 manifest.version
2216 )));
2217 }
2218 if manifest.chunk_size == 0 {
2219 return Err(Error::Protocol("manifest chunk size is zero".to_string()));
2220 }
2221
2222 let mut seen = BTreeSet::new();
2223 for entry in &manifest.entries {
2224 validate_relative_transfer_path(&entry.relative_path)?;
2225 if !seen.insert(entry.relative_path.clone()) {
2226 return Err(Error::Protocol(format!(
2227 "duplicate manifest path: {}",
2228 display_path(&entry.relative_path)
2229 )));
2230 }
2231 if entry.kind == ManifestEntryKind::File && entry.blake3.is_none() {
2232 return Err(Error::Protocol(format!(
2233 "file manifest entry is missing BLAKE3: {}",
2234 display_path(&entry.relative_path)
2235 )));
2236 }
2237 }
2238
2239 Ok(())
2240}
2241
2242fn validate_relative_transfer_path(path: &Path) -> Result<()> {
2243 if path.as_os_str().is_empty() || path.is_absolute() {
2244 return Err(Error::InvalidInput(format!(
2245 "unsafe transfer path: {}",
2246 display_path(path)
2247 )));
2248 }
2249
2250 let mut normal_components = 0usize;
2251 for component in path.components() {
2252 match component {
2253 Component::Normal(value) => {
2254 let Some(name) = value.to_str() else {
2255 return Err(Error::InvalidInput(format!(
2256 "transfer path must be UTF-8: {}",
2257 display_path(path)
2258 )));
2259 };
2260 validate_path_component(name, path)?;
2261 normal_components += 1;
2262 }
2263 Component::CurDir
2264 | Component::ParentDir
2265 | Component::RootDir
2266 | Component::Prefix(_) => {
2267 return Err(Error::InvalidInput(format!(
2268 "unsafe transfer path: {}",
2269 display_path(path)
2270 )));
2271 }
2272 }
2273 }
2274
2275 if normal_components == 0 {
2276 return Err(Error::InvalidInput(format!(
2277 "unsafe transfer path: {}",
2278 display_path(path)
2279 )));
2280 }
2281
2282 Ok(())
2283}
2284
2285fn validate_path_component(component: &str, full_path: &Path) -> Result<()> {
2286 if component.is_empty()
2287 || component.contains('\\')
2288 || component.contains('/')
2289 || component.contains(':')
2290 || component.ends_with(' ')
2291 || component.ends_with('.')
2292 || is_windows_reserved_name(component)
2293 {
2294 return Err(Error::InvalidInput(format!(
2295 "unsafe transfer path: {}",
2296 display_path(full_path)
2297 )));
2298 }
2299
2300 Ok(())
2301}
2302
2303fn is_windows_reserved_name(component: &str) -> bool {
2304 let stem = component
2305 .split('.')
2306 .next()
2307 .unwrap_or(component)
2308 .trim_end_matches([' ', '.'])
2309 .to_ascii_uppercase();
2310 matches!(
2311 stem.as_str(),
2312 "CON"
2313 | "PRN"
2314 | "AUX"
2315 | "NUL"
2316 | "COM1"
2317 | "COM2"
2318 | "COM3"
2319 | "COM4"
2320 | "COM5"
2321 | "COM6"
2322 | "COM7"
2323 | "COM8"
2324 | "COM9"
2325 | "LPT1"
2326 | "LPT2"
2327 | "LPT3"
2328 | "LPT4"
2329 | "LPT5"
2330 | "LPT6"
2331 | "LPT7"
2332 | "LPT8"
2333 | "LPT9"
2334 )
2335}
2336
2337async fn hash_file(path: &Path) -> Result<String> {
2338 hash_file_with_chunks(path, DEFAULT_CHUNK_SIZE)
2339 .await
2340 .map(|(hash, _)| hash)
2341}
2342
2343async fn hash_file_with_chunks(path: &Path, chunk_size: u64) -> Result<(String, Vec<String>)> {
2344 let mut file = tokio::fs::File::open(path)
2345 .await
2346 .map_err(|source| Error::IoPath {
2347 path: path.to_path_buf(),
2348 source,
2349 })?;
2350 let mut hasher = blake3::Hasher::new();
2351 let mut chunk_hasher = blake3::Hasher::new();
2352 let mut chunk_bytes = 0u64;
2353 let mut chunks = Vec::new();
2354 let mut buffer = vec![0; IO_BUFFER_SIZE];
2355
2356 loop {
2357 let read = file
2358 .read(&mut buffer)
2359 .await
2360 .map_err(|source| Error::IoPath {
2361 path: path.to_path_buf(),
2362 source,
2363 })?;
2364 if read == 0 {
2365 break;
2366 }
2367 hasher.update(&buffer[..read]);
2368
2369 let mut remaining = &buffer[..read];
2370 while !remaining.is_empty() {
2371 let take = remaining.len().min((chunk_size - chunk_bytes) as usize);
2372 chunk_hasher.update(&remaining[..take]);
2373 chunk_bytes += take as u64;
2374 remaining = &remaining[take..];
2375 if chunk_bytes == chunk_size {
2376 chunks.push(chunk_hasher.finalize().to_hex().to_string());
2377 chunk_hasher = blake3::Hasher::new();
2378 chunk_bytes = 0;
2379 }
2380 }
2381 }
2382
2383 if chunk_bytes > 0 {
2384 chunks.push(chunk_hasher.finalize().to_hex().to_string());
2385 }
2386
2387 Ok((hasher.finalize().to_hex().to_string(), chunks))
2388}
2389
2390async fn verified_prefix_offset(
2391 path: &Path,
2392 entry: &ManifestEntry,
2393 existing_len: u64,
2394 chunk_size: u64,
2395) -> Result<u64> {
2396 if existing_len == 0 || entry.chunks.is_empty() {
2397 return Ok(0);
2398 }
2399
2400 let chunks_to_check = (existing_len / chunk_size).min(entry.chunks.len() as u64) as usize;
2401 if chunks_to_check == 0 {
2402 return Ok(0);
2403 }
2404
2405 let mut file = tokio::fs::File::open(path)
2406 .await
2407 .map_err(|source| Error::IoPath {
2408 path: path.to_path_buf(),
2409 source,
2410 })?;
2411 let mut buffer = vec![0; IO_BUFFER_SIZE];
2412 let mut verified_chunks = 0usize;
2413
2414 for expected in entry.chunks.iter().take(chunks_to_check) {
2415 let mut remaining = chunk_size;
2416 let mut hasher = blake3::Hasher::new();
2417 while remaining > 0 {
2418 let read_size = buffer.len().min(remaining as usize);
2419 file.read_exact(&mut buffer[..read_size])
2420 .await
2421 .map_err(|source| Error::IoPath {
2422 path: path.to_path_buf(),
2423 source,
2424 })?;
2425 hasher.update(&buffer[..read_size]);
2426 remaining -= read_size as u64;
2427 }
2428
2429 if hasher.finalize().to_hex().as_str() != expected {
2430 break;
2431 }
2432 verified_chunks += 1;
2433 }
2434
2435 Ok(verified_chunks as u64 * chunk_size)
2436}
2437
2438fn summary_from_files(
2439 manifest: &TransferManifest,
2440 files: Vec<TransferFileSummary>,
2441) -> Result<TransferSummary> {
2442 let bytes = manifest.total_file_bytes();
2443 let blake3 = if files.len() == 1 {
2444 files[0].blake3.clone()
2445 } else {
2446 manifest.digest()?
2447 };
2448 let file_name = if files.len() == 1 {
2449 display_path(&files[0].relative_path)
2450 } else {
2451 format!("{} files", files.len())
2452 };
2453 let path = files
2454 .first()
2455 .map(|file| file.path.clone())
2456 .unwrap_or_default();
2457
2458 Ok(TransferSummary {
2459 path,
2460 file_name,
2461 bytes,
2462 blake3,
2463 files,
2464 })
2465}
2466
2467fn temp_path_for(final_path: &Path) -> PathBuf {
2468 let file_name = final_path
2469 .file_name()
2470 .and_then(|name| name.to_str())
2471 .unwrap_or("transfer");
2472 final_path.with_file_name(format!(".{file_name}.{}.ferry-part", Uuid::new_v4()))
2473}
2474
2475fn modified_unix_ms(metadata: &Metadata) -> Option<i128> {
2476 system_time_unix_ms(metadata.modified().ok()?)
2477}
2478
2479fn system_time_unix_ms(time: SystemTime) -> Option<i128> {
2480 match time.duration_since(UNIX_EPOCH) {
2481 Ok(duration) => Some(duration.as_millis() as i128),
2482 Err(error) => Some(-(error.duration().as_millis() as i128)),
2483 }
2484}
2485
2486#[cfg(unix)]
2487fn unix_mode(metadata: &Metadata) -> Option<u32> {
2488 use std::os::unix::fs::PermissionsExt;
2489
2490 Some(metadata.permissions().mode())
2491}
2492
2493#[cfg(not(unix))]
2494fn unix_mode(_metadata: &Metadata) -> Option<u32> {
2495 None
2496}
2497
2498fn client_config_capturing_server_fingerprint()
2499-> Result<(quinn::ClientConfig, Arc<Mutex<Option<String>>>)> {
2500 let server_fingerprint = Arc::new(Mutex::new(None));
2501 let crypto = rustls::ClientConfig::builder()
2502 .dangerous()
2503 .with_custom_certificate_verifier(ServerFingerprintVerifier::new(
2504 server_fingerprint.clone(),
2505 ))
2506 .with_no_client_auth();
2507
2508 let config = quinn::ClientConfig::new(Arc::new(
2509 QuicClientConfig::try_from(crypto)
2510 .map_err(|error| Error::CryptoConfig(error.to_string()))?,
2511 ));
2512 Ok((config, server_fingerprint))
2513}
2514
2515fn captured_server_fingerprint(fingerprint: &Arc<Mutex<Option<String>>>) -> Result<String> {
2516 fingerprint
2517 .lock()
2518 .map_err(|_| Error::CryptoConfig("server fingerprint capture lock poisoned".to_string()))?
2519 .clone()
2520 .ok_or_else(|| {
2521 Error::CryptoConfig("server certificate fingerprint was not captured".to_string())
2522 })
2523}
2524
2525#[derive(Debug)]
2526struct ServerFingerprintVerifier {
2527 provider: Arc<CryptoProvider>,
2528 server_fingerprint: Arc<Mutex<Option<String>>>,
2529}
2530
2531impl ServerFingerprintVerifier {
2532 fn new(server_fingerprint: Arc<Mutex<Option<String>>>) -> Arc<Self> {
2533 Arc::new(Self {
2534 provider: Arc::new(rustls::crypto::ring::default_provider()),
2535 server_fingerprint,
2536 })
2537 }
2538}
2539
2540impl ServerCertVerifier for ServerFingerprintVerifier {
2541 fn verify_server_cert(
2542 &self,
2543 end_entity: &CertificateDer<'_>,
2544 _intermediates: &[CertificateDer<'_>],
2545 _server_name: &ServerName<'_>,
2546 _ocsp: &[u8],
2547 _now: UnixTime,
2548 ) -> std::result::Result<ServerCertVerified, rustls::Error> {
2549 let fingerprint = blake3::hash(end_entity.as_ref()).to_hex().to_string();
2550 *self.server_fingerprint.lock().map_err(|_| {
2551 rustls::Error::General("server fingerprint capture lock poisoned".to_string())
2552 })? = Some(fingerprint);
2553 Ok(ServerCertVerified::assertion())
2554 }
2555
2556 fn verify_tls12_signature(
2557 &self,
2558 message: &[u8],
2559 cert: &CertificateDer<'_>,
2560 dss: &DigitallySignedStruct,
2561 ) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
2562 verify_tls12_signature(
2563 message,
2564 cert,
2565 dss,
2566 &self.provider.signature_verification_algorithms,
2567 )
2568 }
2569
2570 fn verify_tls13_signature(
2571 &self,
2572 message: &[u8],
2573 cert: &CertificateDer<'_>,
2574 dss: &DigitallySignedStruct,
2575 ) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
2576 verify_tls13_signature(
2577 message,
2578 cert,
2579 dss,
2580 &self.provider.signature_verification_algorithms,
2581 )
2582 }
2583
2584 fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
2585 self.provider
2586 .signature_verification_algorithms
2587 .supported_schemes()
2588 }
2589}
2590
2591fn ensure_rustls_provider() {
2592 let _ = rustls::crypto::ring::default_provider().install_default();
2593}
2594
2595pub fn config_dir() -> Result<PathBuf> {
2596 let base_dirs = BaseDirs::new().ok_or(Error::ConfigDirUnavailable)?;
2597 Ok(base_dirs.config_dir().join(CONFIG_DIR_NAME))
2598}
2599
2600fn default_alias() -> String {
2601 std::env::var("HOSTNAME")
2602 .or_else(|_| std::env::var("COMPUTERNAME"))
2603 .ok()
2604 .filter(|value| !value.trim().is_empty())
2605 .unwrap_or_else(|| "fileferry-device".to_string())
2606}
2607
2608fn expand_home_path(path: &str) -> Result<PathBuf> {
2609 if path == "~" {
2610 return Ok(BaseDirs::new()
2611 .ok_or(Error::ConfigDirUnavailable)?
2612 .home_dir()
2613 .to_path_buf());
2614 }
2615
2616 if let Some(rest) = path.strip_prefix("~/") {
2617 return Ok(BaseDirs::new()
2618 .ok_or(Error::ConfigDirUnavailable)?
2619 .home_dir()
2620 .join(rest));
2621 }
2622
2623 Ok(PathBuf::from(path))
2624}
2625
2626fn normalized_fingerprint(fingerprint: String) -> Result<String> {
2627 let fingerprint = fingerprint.trim().to_ascii_lowercase();
2628 if fingerprint.len() != 64 || !fingerprint.chars().all(|char| char.is_ascii_hexdigit()) {
2629 return Err(Error::InvalidInput(
2630 "peer fingerprint must be a full 64-character hex BLAKE3 fingerprint".to_string(),
2631 ));
2632 }
2633
2634 Ok(fingerprint)
2635}
2636
2637fn validate_psk(psk: String) -> Result<String> {
2638 if psk.is_empty() {
2639 return Err(Error::InvalidInput("PSK must not be empty".to_string()));
2640 }
2641
2642 Ok(psk)
2643}
2644
2645fn psk_proof(
2646 psk: &str,
2647 challenge: &str,
2648 client_fingerprint: &str,
2649 protocol_version: u16,
2650) -> Result<String> {
2651 validate_psk(psk.to_string())?;
2652 let key = blake3::hash(psk.as_bytes());
2653 let mut message = Vec::new();
2654 message.extend_from_slice(PSK_PROOF_CONTEXT);
2655 message.push(0);
2656 message.extend_from_slice(challenge.as_bytes());
2657 message.push(0);
2658 message.extend_from_slice(client_fingerprint.as_bytes());
2659 message.push(0);
2660 message.extend_from_slice(protocol_version.to_string().as_bytes());
2661 Ok(blake3::keyed_hash(key.as_bytes(), &message)
2662 .to_hex()
2663 .to_string())
2664}
2665
2666fn write_toml_file<T: Serialize>(path: &Path, value: &T) -> Result<()> {
2667 let bytes =
2668 toml::to_string_pretty(value).map_err(|error| Error::ConfigSerialize(error.to_string()))?;
2669 if let Some(parent) = path.parent() {
2670 std::fs::create_dir_all(parent).map_err(|source| Error::IoPath {
2671 path: parent.to_path_buf(),
2672 source,
2673 })?;
2674 }
2675
2676 let temp_path = path.with_extension(format!("tmp-{}", Uuid::new_v4()));
2677 std::fs::write(&temp_path, bytes).map_err(|source| Error::IoPath {
2678 path: temp_path.clone(),
2679 source,
2680 })?;
2681 std::fs::rename(&temp_path, path).map_err(|source| Error::IoPath {
2682 path: path.to_path_buf(),
2683 source,
2684 })?;
2685 Ok(())
2686}
2687
2688fn write_private_file(path: &Path, bytes: &[u8]) -> Result<()> {
2689 std::fs::write(path, bytes).map_err(|source| Error::IoPath {
2690 path: path.to_path_buf(),
2691 source,
2692 })?;
2693
2694 #[cfg(unix)]
2695 {
2696 use std::os::unix::fs::PermissionsExt;
2697
2698 let mut permissions = std::fs::metadata(path)
2699 .map_err(|source| Error::IoPath {
2700 path: path.to_path_buf(),
2701 source,
2702 })?
2703 .permissions();
2704 permissions.set_mode(0o600);
2705 std::fs::set_permissions(path, permissions).map_err(|source| Error::IoPath {
2706 path: path.to_path_buf(),
2707 source,
2708 })?;
2709 }
2710
2711 Ok(())
2712}
2713
2714#[cfg(test)]
2715mod tests {
2716 use super::*;
2717 use std::sync::{Arc, Mutex};
2718
2719 #[test]
2720 fn direct_peer_parses_socket_address() {
2721 let peer = DirectPeer::parse("127.0.0.1:53318").expect("address parses");
2722
2723 assert_eq!(peer.address().to_string(), "127.0.0.1:53318");
2724 assert_eq!(peer.expected_fingerprint(), None);
2725 }
2726
2727 #[test]
2728 fn direct_peer_accepts_expected_fingerprint() {
2729 let fingerprint = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
2730 let peer = DirectPeer::parse("127.0.0.1:53318")
2731 .expect("address parses")
2732 .with_expected_fingerprint(fingerprint)
2733 .expect("fingerprint parses");
2734
2735 assert_eq!(peer.expected_fingerprint(), Some(fingerprint));
2736 }
2737
2738 #[test]
2739 fn direct_peer_rejects_missing_port() {
2740 assert!(DirectPeer::parse("127.0.0.1").is_err());
2741 }
2742
2743 #[test]
2744 fn send_request_requires_at_least_one_path() {
2745 let peer = DirectPeer::parse("127.0.0.1:53318").expect("address parses");
2746
2747 assert!(SendRequest::new(peer, Vec::new()).is_err());
2748 }
2749
2750 #[test]
2751 fn psk_secret_is_redacted_in_debug_and_serialization() {
2752 let secret = PskSecret::new("do-not-print").expect("psk accepted");
2753
2754 assert_eq!(format!("{secret:?}"), "PskSecret(<redacted>)");
2755 assert_eq!(
2756 serde_json::to_string(&secret).expect("secret serializes"),
2757 r#""<redacted>""#
2758 );
2759
2760 let peer = DirectPeer::parse("127.0.0.1:53318").expect("address parses");
2761 let request = SendRequest::new(peer, vec![PathBuf::from("file.txt")])
2762 .expect("send request")
2763 .with_psk("do-not-print")
2764 .expect("psk accepted");
2765 assert!(!format!("{request:?}").contains("do-not-print"));
2766 }
2767
2768 #[test]
2769 fn validate_send_paths_rejects_empty_slice() {
2770 assert!(validate_send_paths(&[]).is_err());
2771 }
2772
2773 #[tokio::test]
2774 async fn manifest_walks_directory_and_serializes_entries() {
2775 let temp = tempfile::tempdir().expect("tempdir");
2776 let root = temp.path().join("bundle");
2777 let nested = root.join("nested");
2778 tokio::fs::create_dir_all(&nested).await.expect("dirs");
2779 tokio::fs::write(root.join("a.txt"), b"alpha")
2780 .await
2781 .expect("file a");
2782 tokio::fs::write(nested.join("b.txt"), b"beta")
2783 .await
2784 .expect("file b");
2785
2786 let manifest = build_manifest(&[root]).await.expect("manifest");
2787 let json = serde_json::to_string(&manifest).expect("manifest serializes");
2788 let decoded: TransferManifest = serde_json::from_str(&json).expect("manifest decodes");
2789
2790 assert_eq!(decoded.version, MANIFEST_VERSION);
2791 assert!(decoded.entries.iter().any(|entry| {
2792 entry.kind == ManifestEntryKind::Directory
2793 && entry.relative_path == Path::new("bundle/nested")
2794 }));
2795 assert!(decoded.entries.iter().any(|entry| {
2796 entry.kind == ManifestEntryKind::File
2797 && entry.relative_path == Path::new("bundle/nested/b.txt")
2798 && entry.size == 4
2799 && entry.blake3.is_some()
2800 }));
2801 }
2802
2803 #[test]
2804 fn safe_destination_path_rejects_unsafe_paths() {
2805 let dest = Path::new("/tmp/ferry-dest");
2806
2807 assert!(safe_destination_path(dest, Path::new("../escape.txt")).is_err());
2808 assert!(safe_destination_path(dest, Path::new("/absolute.txt")).is_err());
2809 assert!(safe_destination_path(dest, Path::new("nested/CON")).is_err());
2810 assert!(safe_destination_path(dest, Path::new("nested\\escape.txt")).is_err());
2811 assert!(safe_destination_path(dest, Path::new("nested/ok.txt")).is_ok());
2812 }
2813
2814 #[test]
2815 fn safe_destination_path_rejects_generated_escape_and_reserved_patterns() {
2816 let dest = Path::new("/tmp/ferry-dest");
2817 let unsafe_components = [
2818 "..",
2819 ".",
2820 "CON",
2821 "con.txt",
2822 "NUL",
2823 "COM1",
2824 "LPT9",
2825 "trailingspace ",
2826 "trailingdot.",
2827 "has:colon",
2828 "has\\backslash",
2829 ];
2830
2831 for component in unsafe_components {
2832 assert!(
2833 safe_destination_path(dest, Path::new(component)).is_err(),
2834 "{component} should be rejected as a top-level path"
2835 );
2836 if component != "." {
2837 assert!(
2838 safe_destination_path(dest, Path::new("safe").join(component).as_path())
2839 .is_err(),
2840 "{component} should be rejected as a nested path"
2841 );
2842 }
2843 }
2844
2845 for component in ["alpha", "alpha-1", "alpha_1", "report.final.txt"] {
2846 let path = safe_destination_path(dest, Path::new("safe").join(component).as_path())
2847 .expect("safe generated path accepted");
2848 assert!(path.starts_with(dest));
2849 }
2850 }
2851
2852 #[tokio::test]
2853 async fn resume_decision_skips_existing_matching_file() {
2854 let temp = tempfile::tempdir().expect("tempdir");
2855 let source = temp.path().join("source.txt");
2856 let dest = temp.path().join("dest");
2857 tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2858 tokio::fs::write(&source, b"same contents")
2859 .await
2860 .expect("source");
2861 tokio::fs::write(dest.join("source.txt"), b"same contents")
2862 .await
2863 .expect("dest file");
2864 let manifest = build_manifest(&[source]).await.expect("manifest");
2865 let entry = manifest.file_entries().next().expect("file entry");
2866
2867 let decision = decide_resume(&dest, entry, manifest.chunk_size)
2868 .await
2869 .expect("decision");
2870
2871 assert_eq!(decision, ResumeDecision::Skip);
2872 }
2873
2874 #[tokio::test]
2875 async fn resume_decision_returns_verified_chunk_offset() {
2876 let temp = tempfile::tempdir().expect("tempdir");
2877 let source = temp.path().join("large.bin");
2878 let dest = temp.path().join("dest");
2879 tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2880 let bytes = vec![42; (DEFAULT_CHUNK_SIZE as usize * 2) + 128];
2881 tokio::fs::write(&source, &bytes).await.expect("source");
2882 tokio::fs::write(
2883 dest.join("large.bin"),
2884 &bytes[..DEFAULT_CHUNK_SIZE as usize + 99],
2885 )
2886 .await
2887 .expect("partial");
2888 let manifest = build_manifest(&[source]).await.expect("manifest");
2889 let entry = manifest.file_entries().next().expect("file entry");
2890
2891 let decision = decide_resume(&dest, entry, manifest.chunk_size)
2892 .await
2893 .expect("decision");
2894
2895 assert_eq!(decision, ResumeDecision::ResumeFrom(DEFAULT_CHUNK_SIZE));
2896 }
2897
2898 #[tokio::test]
2899 async fn resume_decision_rejects_existing_file_with_mismatched_contents() {
2900 let temp = tempfile::tempdir().expect("tempdir");
2901 let source = temp.path().join("source.txt");
2902 let dest = temp.path().join("dest");
2903 tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2904 tokio::fs::write(&source, b"expected contents")
2905 .await
2906 .expect("source");
2907 tokio::fs::write(dest.join("source.txt"), b"different contents")
2908 .await
2909 .expect("conflicting dest file");
2910 let manifest = build_manifest(&[source]).await.expect("manifest");
2911 let entry = manifest.file_entries().next().expect("file entry");
2912
2913 let decision = decide_resume(&dest, entry, manifest.chunk_size)
2914 .await
2915 .expect("decision");
2916
2917 assert!(matches!(decision, ResumeDecision::Conflict(_)));
2918 }
2919
2920 #[tokio::test]
2921 async fn resume_decision_rejects_existing_file_larger_than_manifest_entry() {
2922 let temp = tempfile::tempdir().expect("tempdir");
2923 let source = temp.path().join("source.txt");
2924 let dest = temp.path().join("dest");
2925 tokio::fs::create_dir_all(&dest).await.expect("dest dir");
2926 tokio::fs::write(&source, b"small").await.expect("source");
2927 tokio::fs::write(dest.join("source.txt"), b"larger destination")
2928 .await
2929 .expect("larger dest file");
2930 let manifest = build_manifest(&[source]).await.expect("manifest");
2931 let entry = manifest.file_entries().next().expect("file entry");
2932
2933 let decision = decide_resume(&dest, entry, manifest.chunk_size)
2934 .await
2935 .expect("decision");
2936
2937 assert!(matches!(decision, ResumeDecision::Conflict(_)));
2938 }
2939
2940 #[test]
2941 fn identity_is_loaded_after_generation() {
2942 let temp = tempfile::tempdir().expect("tempdir");
2943 let generated =
2944 NativeIdentity::load_or_generate_in(temp.path()).expect("identity generated");
2945 let loaded = NativeIdentity::load_or_generate_in(temp.path()).expect("identity loaded");
2946
2947 assert_eq!(generated.fingerprint(), loaded.fingerprint());
2948 }
2949
2950 #[test]
2951 fn app_config_round_trips_toml() {
2952 let temp = tempfile::tempdir().expect("tempdir");
2953 let path = temp.path().join("config.toml");
2954 let config = AppConfig {
2955 alias: "desk".to_string(),
2956 ..AppConfig::default()
2957 };
2958
2959 config.save_to_path(&path).expect("config saved");
2960 let loaded = AppConfig::load_or_default_from(&path).expect("config loaded");
2961
2962 assert_eq!(loaded.alias, "desk");
2963 assert_eq!(loaded.listen_port, 53317);
2964 assert!(loaded.trust.require_fingerprint);
2965 }
2966
2967 #[test]
2968 fn app_config_redacts_psk_for_display_output() {
2969 let config = AppConfig {
2970 trust: TrustConfig {
2971 psk: "super-secret-psk".to_string(),
2972 ..TrustConfig::default()
2973 },
2974 ..AppConfig::default()
2975 };
2976
2977 let redacted = config.to_redacted_toml_string().expect("config serializes");
2978
2979 assert!(!redacted.contains("super-secret-psk"));
2980 assert!(redacted.contains(r#"psk = "<redacted>""#));
2981 }
2982
2983 #[test]
2984 fn daemon_config_defaults_from_app_config_and_round_trips_toml() {
2985 let temp = tempfile::tempdir().expect("tempdir");
2986 let path = temp.path().join("daemon.toml");
2987 let app_config = AppConfig {
2988 quic_port: 54444,
2989 download_dir: temp.path().join("downloads").display().to_string(),
2990 ..AppConfig::default()
2991 };
2992
2993 let defaulted =
2994 DaemonConfig::load_or_default_from(&path, &app_config).expect("daemon config");
2995 assert_eq!(
2996 defaulted.listen,
2997 SocketAddr::from(([0, 0, 0, 0], app_config.quic_port))
2998 );
2999 assert_eq!(defaulted.destination, temp.path().join("downloads"));
3000
3001 let config = DaemonConfig {
3002 listen: SocketAddr::from(([127, 0, 0, 1], 53318)),
3003 destination: temp.path().join("daemon-dest"),
3004 };
3005 config.save_to_path(&path).expect("daemon config saved");
3006 let loaded =
3007 DaemonConfig::load_or_default_from(&path, &app_config).expect("daemon config loaded");
3008
3009 assert_eq!(loaded, config);
3010 }
3011
3012 #[test]
3013 fn trust_store_load_save_and_forget_round_trip() {
3014 let temp = tempfile::tempdir().expect("tempdir");
3015 let path = temp.path().join("known_peers.toml");
3016 let fingerprint = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
3017 let mut store = TrustStore::default();
3018
3019 store
3020 .trust_fingerprint(fingerprint)
3021 .expect("fingerprint trusted");
3022 store.save_to_path(&path).expect("trust store saved");
3023 let mut loaded = TrustStore::load_or_default_from(&path).expect("trust store loaded");
3024
3025 assert_eq!(loaded.records().count(), 1);
3026 assert_eq!(loaded.peers[fingerprint].trust_state, TrustState::Trusted);
3027 assert!(loaded.forget(fingerprint));
3028 assert_eq!(loaded.records().count(), 0);
3029 }
3030
3031 #[test]
3032 fn trust_store_rejects_short_fingerprint_without_discovery_lookup() {
3033 let mut store = TrustStore::default();
3034
3035 assert!(store.trust_fingerprint("9a4f2c").is_err());
3036 }
3037
3038 #[test]
3039 fn trust_store_pins_direct_address_to_fingerprint() {
3040 let fingerprint = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
3041 let address = SocketAddr::from(([192, 168, 1, 42], 53318));
3042 let mut store = TrustStore::default();
3043
3044 store
3045 .trust_direct_address(fingerprint, address)
3046 .expect("direct address trusted");
3047
3048 assert!(store.is_trusted_fingerprint(fingerprint));
3049 assert_eq!(
3050 store.trusted_fingerprint_for_address(address),
3051 Some(fingerprint)
3052 );
3053 assert!(store.peers[fingerprint].addresses.contains(&address));
3054 assert!(store.peers[fingerprint].transports.contains("quic"));
3055 assert_eq!(store.peers[fingerprint].trust_state, TrustState::Trusted);
3056 }
3057
3058 #[test]
3059 fn transfer_events_serialize_with_stable_event_names() {
3060 let event = TransferEvent::Progress {
3061 direction: TransferDirection::Send,
3062 file_name: "bundle/a.txt".to_string(),
3063 bytes_done: 5,
3064 bytes_total: 9,
3065 };
3066
3067 let json = serde_json::to_value(&event).expect("event serializes");
3068
3069 assert_eq!(json["event"], "progress");
3070 assert_eq!(json["direction"], "send");
3071 assert_eq!(json["file_name"], "bundle/a.txt");
3072 assert_eq!(json["bytes_done"], 5);
3073 assert_eq!(json["bytes_total"], 9);
3074 }
3075
3076 #[test]
3077 fn direct_protocol_hello_has_stable_golden_json() {
3078 let hello = DirectProtocolHello {
3079 protocol_version: 1,
3080 min_protocol_version: 1,
3081 client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3082 .to_string(),
3083 psk: false,
3084 };
3085
3086 let json = serde_json::to_string(&hello).expect("hello serializes");
3087
3088 assert_eq!(
3089 json,
3090 r#"{"protocol_version":1,"min_protocol_version":1,"client_fingerprint":"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef","psk":false}"#
3091 );
3092 }
3093
3094 #[test]
3095 fn direct_protocol_response_has_stable_golden_json() {
3096 let response = DirectProtocolResponse::accept(1);
3097
3098 let json = serde_json::to_string(&response).expect("response serializes");
3099
3100 assert_eq!(
3101 json,
3102 r#"{"accepted":true,"protocol_version":1,"psk_challenge":null,"error":null}"#
3103 );
3104 }
3105
3106 #[test]
3107 fn direct_protocol_decodes_pre_psk_negotiation_frames() {
3108 let hello: DirectProtocolHello = serde_json::from_str(
3109 r#"{"protocol_version":1,"min_protocol_version":1,"client_fingerprint":"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"}"#,
3110 )
3111 .expect("old hello decodes");
3112 assert!(!hello.psk);
3113
3114 let response: DirectProtocolResponse =
3115 serde_json::from_str(r#"{"accepted":true,"protocol_version":1,"error":null}"#)
3116 .expect("old response decodes");
3117 assert_eq!(response, DirectProtocolResponse::accept(1));
3118 }
3119
3120 #[test]
3121 fn direct_protocol_negotiates_supported_overlap() {
3122 let hello = DirectProtocolHello {
3123 protocol_version: 3,
3124 min_protocol_version: 1,
3125 client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3126 .to_string(),
3127 psk: false,
3128 };
3129
3130 let response = negotiate_direct_protocol(&hello, None);
3131
3132 assert_eq!(response, DirectProtocolResponse::accept(1));
3133 }
3134
3135 #[test]
3136 fn direct_protocol_rejects_incompatible_versions() {
3137 let too_new = DirectProtocolHello {
3138 protocol_version: 3,
3139 min_protocol_version: 2,
3140 client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3141 .to_string(),
3142 psk: false,
3143 };
3144 let too_old = DirectProtocolHello {
3145 protocol_version: 0,
3146 min_protocol_version: 0,
3147 client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3148 .to_string(),
3149 psk: false,
3150 };
3151
3152 assert!(!negotiate_direct_protocol(&too_new, None).accepted);
3153 assert!(!negotiate_direct_protocol(&too_old, None).accepted);
3154 }
3155
3156 #[test]
3157 fn direct_protocol_requires_matching_psk_mode() {
3158 let no_psk = DirectProtocolHello {
3159 protocol_version: 1,
3160 min_protocol_version: 1,
3161 client_fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3162 .to_string(),
3163 psk: false,
3164 };
3165 let with_psk = DirectProtocolHello {
3166 psk: true,
3167 ..no_psk.clone()
3168 };
3169
3170 assert!(!negotiate_direct_protocol(&no_psk, Some("secret")).accepted);
3171 assert!(!negotiate_direct_protocol(&with_psk, None).accepted);
3172
3173 let accepted = negotiate_direct_protocol(&with_psk, Some("secret"));
3174 assert!(accepted.accepted);
3175 assert!(accepted.psk_challenge.is_some());
3176 }
3177
3178 #[test]
3179 fn transfer_manifest_has_stable_golden_json() {
3180 let manifest = TransferManifest {
3181 version: MANIFEST_VERSION,
3182 session_id: Uuid::parse_str("00000000-0000-0000-0000-000000000001")
3183 .expect("uuid parses"),
3184 chunk_size: DEFAULT_CHUNK_SIZE,
3185 entries: vec![ManifestEntry {
3186 relative_path: PathBuf::from("bundle/a.txt"),
3187 kind: ManifestEntryKind::File,
3188 size: 5,
3189 blake3: Some(
3190 "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef".to_string(),
3191 ),
3192 chunks: vec![
3193 "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789".to_string(),
3194 ],
3195 unix_mode: Some(0o100644),
3196 modified_unix_ms: Some(1_700_000_000_000),
3197 }],
3198 };
3199
3200 let json = serde_json::to_string(&manifest).expect("manifest serializes");
3201
3202 assert_eq!(
3203 json,
3204 r#"{"version":1,"session_id":"00000000-0000-0000-0000-000000000001","chunk_size":1048576,"entries":[{"relative_path":"bundle/a.txt","kind":"file","size":5,"blake3":"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef","chunks":["abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789"],"unix_mode":33188,"modified_unix_ms":1700000000000}]}"#
3205 );
3206 }
3207
3208 #[test]
3209 fn peer_registry_merges_observations_by_fingerprint() {
3210 let mut registry = PeerRegistry::new();
3211 let fingerprint = "abcdef1234567890";
3212 let first = PeerObservation::new(
3213 fingerprint,
3214 SocketAddr::from(([192, 168, 1, 10], 53318)),
3215 100,
3216 )
3217 .with_alias("desk")
3218 .with_hostname("desk.local")
3219 .with_transport("quic");
3220 let second = PeerObservation::new(
3221 fingerprint,
3222 SocketAddr::from(([192, 168, 1, 11], 53318)),
3223 200,
3224 )
3225 .with_alias("workstation")
3226 .with_transport("quic");
3227
3228 registry.observe(first).expect("first observation");
3229 registry.observe(second).expect("second observation");
3230 let record = registry
3231 .get_by_fingerprint(fingerprint)
3232 .expect("record by fingerprint");
3233
3234 assert_eq!(registry.records().count(), 1);
3235 assert!(record.aliases.contains("desk"));
3236 assert!(record.aliases.contains("workstation"));
3237 assert!(record.hostnames.contains("desk.local"));
3238 assert_eq!(record.addresses.len(), 2);
3239 assert_eq!(record.first_seen_unix_ms, 100);
3240 assert_eq!(record.last_seen_unix_ms, 200);
3241 }
3242
3243 #[test]
3244 fn peer_registry_looks_up_unique_fingerprint_prefix_alias_and_hostname() {
3245 let mut registry = PeerRegistry::new();
3246 registry
3247 .observe(
3248 PeerObservation::new(
3249 "abcdef1234567890",
3250 SocketAddr::from(([192, 168, 1, 10], 53318)),
3251 100,
3252 )
3253 .with_alias("desk")
3254 .with_hostname("desk.local"),
3255 )
3256 .expect("first observation");
3257 registry
3258 .observe(
3259 PeerObservation::new(
3260 "123456abcdef7890",
3261 SocketAddr::from(([192, 168, 1, 20], 53318)),
3262 100,
3263 )
3264 .with_alias("laptop"),
3265 )
3266 .expect("second observation");
3267
3268 assert_eq!(
3269 registry
3270 .lookup("abcdef")
3271 .map(|record| record.fingerprint.as_str()),
3272 Some("abcdef1234567890")
3273 );
3274 assert_eq!(
3275 registry
3276 .lookup("desk")
3277 .map(|record| record.fingerprint.as_str()),
3278 Some("abcdef1234567890")
3279 );
3280 assert_eq!(
3281 registry
3282 .lookup("desk.local")
3283 .map(|record| record.fingerprint.as_str()),
3284 Some("abcdef1234567890")
3285 );
3286 assert!(registry.lookup("missing").is_none());
3287 }
3288
3289 #[test]
3290 fn peer_registry_rejects_ambiguous_lookup_hints() {
3291 let mut registry = PeerRegistry::new();
3292 registry
3293 .observe(
3294 PeerObservation::new(
3295 "abcdef1234567890",
3296 SocketAddr::from(([192, 168, 1, 10], 53318)),
3297 100,
3298 )
3299 .with_alias("desk"),
3300 )
3301 .expect("first observation");
3302 registry
3303 .observe(
3304 PeerObservation::new(
3305 "abcdef9999999999",
3306 SocketAddr::from(([192, 168, 1, 11], 53318)),
3307 100,
3308 )
3309 .with_alias("desk"),
3310 )
3311 .expect("second observation");
3312
3313 assert!(registry.lookup("abcdef").is_none());
3314 assert!(registry.lookup("desk").is_none());
3315 assert_eq!(
3316 registry
3317 .lookup("abcdef1234567890")
3318 .map(|record| record.fingerprint.as_str()),
3319 Some("abcdef1234567890")
3320 );
3321 match registry.lookup_detail("desk") {
3322 PeerLookup::Ambiguous(records) => assert_eq!(records.len(), 2),
3323 other => panic!("expected ambiguous duplicate-alias lookup, got {other:?}"),
3324 }
3325 assert!(matches!(
3326 registry.lookup_detail("missing"),
3327 PeerLookup::Missing
3328 ));
3329 }
3330
3331 #[test]
3332 fn native_announcement_builds_expected_txt_properties() {
3333 let announcement = NativeAnnouncement {
3334 alias: "Stephen MBP".to_string(),
3335 fingerprint: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3336 .to_string(),
3337 quic_port: 53318,
3338 listen_port: Some(53317),
3339 };
3340
3341 let service = announcement.service_info().expect("service info");
3342
3343 assert_eq!(service.get_type(), NATIVE_SERVICE_TYPE);
3344 assert!(
3345 service
3346 .get_fullname()
3347 .starts_with("stephen-mbp-0123456789ab.")
3348 );
3349 assert_eq!(service.get_property_val_str("pv"), Some("1"));
3350 assert_eq!(service.get_property_val_str("minpv"), Some("1"));
3351 assert_eq!(
3352 service.get_property_val_str("fp"),
3353 Some("0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")
3354 );
3355 assert_eq!(service.get_property_val_str("alias"), Some("Stephen MBP"));
3356 assert_eq!(service.get_property_val_str("transports"), Some("quic"));
3357 assert_eq!(service.get_property_val_str("quic_port"), Some("53318"));
3358 assert_eq!(service.get_property_val_str("listen_port"), Some("53317"));
3359 }
3360
3361 #[test]
3362 fn resolved_native_service_merges_into_registry() {
3363 let properties = [
3364 ("pv", "1"),
3365 ("minpv", "1"),
3366 (
3367 "fp",
3368 "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
3369 ),
3370 ("alias", "desk"),
3371 ("transports", "quic"),
3372 ("quic_port", "53318"),
3373 ];
3374 let service = ServiceInfo::new(
3375 NATIVE_SERVICE_TYPE,
3376 "desk-0123456789ab",
3377 "desk.local.",
3378 "192.168.1.42",
3379 53318,
3380 &properties[..],
3381 )
3382 .expect("service info")
3383 .as_resolved_service();
3384 let mut registry = PeerRegistry::new();
3385
3386 observe_resolved_service(&mut registry, &service).expect("observation");
3387
3388 let record = registry.lookup("desk").expect("record by alias");
3389 assert_eq!(
3390 record.fingerprint,
3391 "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
3392 );
3393 assert_eq!(
3394 record.preferred_quic_address(),
3395 Some(SocketAddr::from(([192, 168, 1, 42], 53318)))
3396 );
3397 }
3398
3399 #[test]
3400 fn resolved_native_service_ignores_incompatible_protocol_ranges() {
3401 let properties = [
3402 ("pv", "3"),
3403 ("minpv", "2"),
3404 (
3405 "fp",
3406 "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
3407 ),
3408 ("alias", "desk"),
3409 ("transports", "quic"),
3410 ("quic_port", "53318"),
3411 ];
3412 let service = ServiceInfo::new(
3413 NATIVE_SERVICE_TYPE,
3414 "desk-0123456789ab",
3415 "desk.local.",
3416 "192.168.1.42",
3417 53318,
3418 &properties[..],
3419 )
3420 .expect("service info")
3421 .as_resolved_service();
3422 let mut registry = PeerRegistry::new();
3423
3424 observe_resolved_service(&mut registry, &service).expect("observation");
3425
3426 assert_eq!(registry.records().count(), 0);
3427 }
3428
3429 #[tokio::test]
3430 async fn direct_quic_loopback_sends_one_file() {
3431 let source_dir = tempfile::tempdir().expect("source tempdir");
3432 let dest_dir = tempfile::tempdir().expect("dest tempdir");
3433 let identity_dir = tempfile::tempdir().expect("identity tempdir");
3434 let file_path = source_dir.path().join("hello.txt");
3435 tokio::fs::write(&file_path, b"hello from ferry")
3436 .await
3437 .expect("source file written");
3438
3439 let identity =
3440 NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3441 let recv_identity = identity.clone();
3442 let reserved_socket =
3443 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3444 let recv_addr = reserved_socket.local_addr().expect("local addr");
3445 drop(reserved_socket);
3446 let receive_progress = Arc::new(Mutex::new(Vec::new()));
3447 let receive_progress_log = receive_progress.clone();
3448 let dest_path = dest_dir.path().to_path_buf();
3449 let server = tokio::spawn(async move {
3450 receive_direct_file(
3451 &ReceiveRequest::new(recv_addr),
3452 dest_path,
3453 &recv_identity,
3454 |event| {
3455 receive_progress_log
3456 .lock()
3457 .expect("progress lock")
3458 .push(event);
3459 },
3460 )
3461 .await
3462 });
3463
3464 tokio::time::sleep(Duration::from_millis(100)).await;
3465
3466 let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3467 let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
3468 let send_progress = Arc::new(Mutex::new(Vec::new()));
3469 let send_progress_log = send_progress.clone();
3470 let send_summary = send_direct_file(&send_request, &identity, |event| {
3471 send_progress_log.lock().expect("progress lock").push(event);
3472 })
3473 .await
3474 .expect("file sent");
3475 let receive_summary = server.await.expect("server joined").expect("file received");
3476
3477 let received = tokio::fs::read(dest_dir.path().join("hello.txt"))
3478 .await
3479 .expect("received file readable");
3480 assert_eq!(received, b"hello from ferry");
3481 assert_eq!(send_summary.bytes, receive_summary.bytes);
3482 assert_eq!(send_summary.blake3, receive_summary.blake3);
3483 let send_progress = send_progress.lock().expect("progress lock");
3484 let receive_progress = receive_progress.lock().expect("progress lock");
3485 assert!(matches!(
3486 send_progress.first(),
3487 Some(TransferEvent::SessionStarted {
3488 direction: TransferDirection::Send,
3489 ..
3490 })
3491 ));
3492 assert!(
3493 send_progress
3494 .iter()
3495 .any(|event| matches!(event, TransferEvent::Progress { .. }))
3496 );
3497 assert!(
3498 send_progress
3499 .iter()
3500 .any(|event| matches!(event, TransferEvent::SessionFinished { .. }))
3501 );
3502 assert!(matches!(
3503 receive_progress.first(),
3504 Some(TransferEvent::SessionStarted {
3505 direction: TransferDirection::Receive,
3506 ..
3507 })
3508 ));
3509 assert!(
3510 receive_progress
3511 .iter()
3512 .any(|event| matches!(event, TransferEvent::Progress { .. }))
3513 );
3514 assert!(
3515 receive_progress
3516 .iter()
3517 .any(|event| matches!(event, TransferEvent::SessionFinished { .. }))
3518 );
3519 }
3520
3521 #[tokio::test]
3522 async fn direct_transfer_accepts_expected_receiver_fingerprint() {
3523 let source_dir = tempfile::tempdir().expect("source tempdir");
3524 let dest_dir = tempfile::tempdir().expect("dest tempdir");
3525 let sender_identity_dir = tempfile::tempdir().expect("sender identity tempdir");
3526 let receiver_identity_dir = tempfile::tempdir().expect("receiver identity tempdir");
3527 let file_path = source_dir.path().join("hello.txt");
3528 tokio::fs::write(&file_path, b"hello from ferry")
3529 .await
3530 .expect("source file written");
3531
3532 let sender_identity = NativeIdentity::load_or_generate_in(sender_identity_dir.path())
3533 .expect("sender identity");
3534 let receiver_identity = NativeIdentity::load_or_generate_in(receiver_identity_dir.path())
3535 .expect("receiver identity");
3536 let expected_fingerprint = receiver_identity.fingerprint().to_string();
3537 let reserved_socket =
3538 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3539 let recv_addr = reserved_socket.local_addr().expect("local addr");
3540 drop(reserved_socket);
3541 let dest_path = dest_dir.path().to_path_buf();
3542 let server = tokio::spawn(async move {
3543 receive_direct_file(
3544 &ReceiveRequest::new(recv_addr),
3545 dest_path,
3546 &receiver_identity,
3547 |_| {},
3548 )
3549 .await
3550 });
3551
3552 tokio::time::sleep(Duration::from_millis(100)).await;
3553
3554 let peer = DirectPeer::from_address(recv_addr)
3555 .with_expected_fingerprint(expected_fingerprint)
3556 .expect("expected fingerprint accepted");
3557 let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
3558 let send_summary = send_direct_file(&send_request, &sender_identity, |_| {})
3559 .await
3560 .expect("file sent");
3561 let receive_summary = server.await.expect("server joined").expect("file received");
3562
3563 let received = tokio::fs::read(dest_dir.path().join("hello.txt"))
3564 .await
3565 .expect("received file readable");
3566 assert_eq!(received, b"hello from ferry");
3567 assert_eq!(send_summary.bytes, receive_summary.bytes);
3568 assert_eq!(send_summary.blake3, receive_summary.blake3);
3569 }
3570
3571 #[tokio::test]
3572 async fn direct_transfer_confirms_unpinned_receiver_before_payload() {
3573 let source_dir = tempfile::tempdir().expect("source tempdir");
3574 let dest_dir = tempfile::tempdir().expect("dest tempdir");
3575 let sender_identity_dir = tempfile::tempdir().expect("sender identity tempdir");
3576 let receiver_identity_dir = tempfile::tempdir().expect("receiver identity tempdir");
3577 let file_path = source_dir.path().join("hello.txt");
3578 tokio::fs::write(&file_path, b"hello from ferry")
3579 .await
3580 .expect("source file written");
3581
3582 let sender_identity = NativeIdentity::load_or_generate_in(sender_identity_dir.path())
3583 .expect("sender identity");
3584 let receiver_identity = NativeIdentity::load_or_generate_in(receiver_identity_dir.path())
3585 .expect("receiver identity");
3586 let expected_fingerprint = receiver_identity.fingerprint().to_string();
3587 let reserved_socket =
3588 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3589 let recv_addr = reserved_socket.local_addr().expect("local addr");
3590 drop(reserved_socket);
3591 let dest_path = dest_dir.path().to_path_buf();
3592 let server = tokio::spawn(async move {
3593 receive_direct_file(
3594 &ReceiveRequest::new(recv_addr),
3595 dest_path,
3596 &receiver_identity,
3597 |_| {},
3598 )
3599 .await
3600 });
3601
3602 tokio::time::sleep(Duration::from_millis(100)).await;
3603
3604 let peer = DirectPeer::from_address(recv_addr);
3605 let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
3606 let callback_seen = Arc::new(AtomicBool::new(false));
3607 let callback_seen_send = callback_seen.clone();
3608 let send_summary = send_direct_file_with_control_and_trust(
3609 &send_request,
3610 &sender_identity,
3611 TransferControl::new(),
3612 |_| {},
3613 |fingerprint| {
3614 callback_seen_send.store(true, Ordering::SeqCst);
3615 assert_eq!(fingerprint, expected_fingerprint);
3616 Ok(())
3617 },
3618 )
3619 .await
3620 .expect("file sent after trust callback");
3621 let receive_summary = server.await.expect("server joined").expect("file received");
3622
3623 assert!(callback_seen.load(Ordering::SeqCst));
3624 assert_eq!(send_summary.bytes, receive_summary.bytes);
3625 assert_eq!(send_summary.blake3, receive_summary.blake3);
3626 }
3627
3628 #[tokio::test]
3629 async fn direct_transfer_rejects_unexpected_receiver_fingerprint_before_payload() {
3630 let source_dir = tempfile::tempdir().expect("source tempdir");
3631 let dest_dir = tempfile::tempdir().expect("dest tempdir");
3632 let sender_identity_dir = tempfile::tempdir().expect("sender identity tempdir");
3633 let receiver_identity_dir = tempfile::tempdir().expect("receiver identity tempdir");
3634 let other_identity_dir = tempfile::tempdir().expect("other identity tempdir");
3635 let file_path = source_dir.path().join("hello.txt");
3636 tokio::fs::write(&file_path, b"hello from ferry")
3637 .await
3638 .expect("source file written");
3639
3640 let sender_identity = NativeIdentity::load_or_generate_in(sender_identity_dir.path())
3641 .expect("sender identity");
3642 let receiver_identity = NativeIdentity::load_or_generate_in(receiver_identity_dir.path())
3643 .expect("receiver identity");
3644 let other_identity =
3645 NativeIdentity::load_or_generate_in(other_identity_dir.path()).expect("other identity");
3646 let reserved_socket =
3647 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3648 let recv_addr = reserved_socket.local_addr().expect("local addr");
3649 drop(reserved_socket);
3650 let dest_path = dest_dir.path().to_path_buf();
3651 let server = tokio::spawn(async move {
3652 receive_direct_file(
3653 &ReceiveRequest::new(recv_addr),
3654 dest_path,
3655 &receiver_identity,
3656 |_| {},
3657 )
3658 .await
3659 });
3660
3661 tokio::time::sleep(Duration::from_millis(100)).await;
3662
3663 let peer = DirectPeer::from_address(recv_addr)
3664 .with_expected_fingerprint(other_identity.fingerprint().to_string())
3665 .expect("expected fingerprint accepted");
3666 let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
3667 let error = send_direct_file(&send_request, &sender_identity, |_| {})
3668 .await
3669 .expect_err("fingerprint mismatch should reject send");
3670
3671 assert!(matches!(error, Error::PeerFingerprintMismatch { .. }));
3672 let server_error = server
3673 .await
3674 .expect("server joined")
3675 .expect_err("server closed");
3676 assert!(matches!(
3677 server_error,
3678 Error::Connection(_) | Error::NoIncomingConnection
3679 ));
3680 assert!(
3681 !tokio::fs::try_exists(dest_dir.path().join("hello.txt"))
3682 .await
3683 .expect("destination checked")
3684 );
3685 }
3686
3687 #[tokio::test]
3688 async fn direct_transfer_accepts_matching_psk_before_payload() {
3689 let source_dir = tempfile::tempdir().expect("source tempdir");
3690 let dest_dir = tempfile::tempdir().expect("dest tempdir");
3691 let identity_dir = tempfile::tempdir().expect("identity tempdir");
3692 let file_path = source_dir.path().join("hello.txt");
3693 tokio::fs::write(&file_path, b"hello from ferry")
3694 .await
3695 .expect("source file written");
3696
3697 let identity =
3698 NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3699 let recv_identity = identity.clone();
3700 let reserved_socket =
3701 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3702 let recv_addr = reserved_socket.local_addr().expect("local addr");
3703 drop(reserved_socket);
3704 let dest_path = dest_dir.path().to_path_buf();
3705 let receive_request = ReceiveRequest::new(recv_addr)
3706 .with_psk("correct horse battery staple")
3707 .expect("receiver psk accepted");
3708 let server = tokio::spawn(async move {
3709 receive_direct_file(&receive_request, dest_path, &recv_identity, |_| {}).await
3710 });
3711
3712 tokio::time::sleep(Duration::from_millis(100)).await;
3713
3714 let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3715 let send_request = SendRequest::new(peer, vec![file_path])
3716 .expect("send request is valid")
3717 .with_psk("correct horse battery staple")
3718 .expect("sender psk accepted");
3719 let send_summary = send_direct_file(&send_request, &identity, |_| {})
3720 .await
3721 .expect("file sent");
3722 let receive_summary = server.await.expect("server joined").expect("file received");
3723
3724 let received = tokio::fs::read(dest_dir.path().join("hello.txt"))
3725 .await
3726 .expect("received file readable");
3727 assert_eq!(received, b"hello from ferry");
3728 assert_eq!(send_summary.bytes, receive_summary.bytes);
3729 }
3730
3731 #[tokio::test]
3732 async fn direct_transfer_rejects_missing_psk_before_payload() {
3733 let source_dir = tempfile::tempdir().expect("source tempdir");
3734 let dest_dir = tempfile::tempdir().expect("dest tempdir");
3735 let identity_dir = tempfile::tempdir().expect("identity tempdir");
3736 let file_path = source_dir.path().join("hello.txt");
3737 tokio::fs::write(&file_path, b"hello from ferry")
3738 .await
3739 .expect("source file written");
3740
3741 let identity =
3742 NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3743 let recv_identity = identity.clone();
3744 let reserved_socket =
3745 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3746 let recv_addr = reserved_socket.local_addr().expect("local addr");
3747 drop(reserved_socket);
3748 let dest_path = dest_dir.path().to_path_buf();
3749 let receive_request = ReceiveRequest::new(recv_addr)
3750 .with_psk("receiver secret")
3751 .expect("receiver psk accepted");
3752 let server = tokio::spawn(async move {
3753 receive_direct_file(&receive_request, dest_path, &recv_identity, |_| {}).await
3754 });
3755
3756 tokio::time::sleep(Duration::from_millis(100)).await;
3757
3758 let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3759 let send_request = SendRequest::new(peer, vec![file_path]).expect("send request is valid");
3760 let error = send_direct_file(&send_request, &identity, |_| {})
3761 .await
3762 .expect_err("missing psk should reject send");
3763
3764 assert!(matches!(error, Error::Protocol(_)));
3765 let _ = server.await.expect("server joined");
3766 assert!(
3767 !tokio::fs::try_exists(dest_dir.path().join("hello.txt"))
3768 .await
3769 .expect("destination checked")
3770 );
3771 }
3772
3773 #[tokio::test]
3774 async fn direct_transfer_rejects_wrong_psk_before_payload() {
3775 let source_dir = tempfile::tempdir().expect("source tempdir");
3776 let dest_dir = tempfile::tempdir().expect("dest tempdir");
3777 let identity_dir = tempfile::tempdir().expect("identity tempdir");
3778 let file_path = source_dir.path().join("hello.txt");
3779 tokio::fs::write(&file_path, b"hello from ferry")
3780 .await
3781 .expect("source file written");
3782
3783 let identity =
3784 NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3785 let recv_identity = identity.clone();
3786 let reserved_socket =
3787 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3788 let recv_addr = reserved_socket.local_addr().expect("local addr");
3789 drop(reserved_socket);
3790 let dest_path = dest_dir.path().to_path_buf();
3791 let receive_request = ReceiveRequest::new(recv_addr)
3792 .with_psk("receiver secret")
3793 .expect("receiver psk accepted");
3794 let server = tokio::spawn(async move {
3795 receive_direct_file(&receive_request, dest_path, &recv_identity, |_| {}).await
3796 });
3797
3798 tokio::time::sleep(Duration::from_millis(100)).await;
3799
3800 let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3801 let send_request = SendRequest::new(peer, vec![file_path])
3802 .expect("send request is valid")
3803 .with_psk("wrong secret")
3804 .expect("sender psk accepted");
3805 let error = send_direct_file(&send_request, &identity, |_| {})
3806 .await
3807 .expect_err("wrong psk should reject send");
3808
3809 assert!(matches!(error, Error::Authentication(_)));
3810 assert!(!error.to_string().contains("wrong secret"));
3811 assert!(!error.to_string().contains("receiver secret"));
3812 let server_error = server
3813 .await
3814 .expect("server joined")
3815 .expect_err("server should reject wrong psk");
3816 assert!(matches!(
3817 server_error,
3818 Error::Authentication(_) | Error::Write(_)
3819 ));
3820 assert!(
3821 !tokio::fs::try_exists(dest_dir.path().join("hello.txt"))
3822 .await
3823 .expect("destination checked")
3824 );
3825 }
3826
3827 #[tokio::test]
3828 async fn direct_quic_loopback_sends_directory_and_resumes_partial_file() {
3829 let source_dir = tempfile::tempdir().expect("source tempdir");
3830 let dest_dir = tempfile::tempdir().expect("dest tempdir");
3831 let identity_dir = tempfile::tempdir().expect("identity tempdir");
3832 let bundle = source_dir.path().join("bundle");
3833 let nested = bundle.join("nested");
3834 tokio::fs::create_dir_all(&nested)
3835 .await
3836 .expect("source dirs");
3837 tokio::fs::write(nested.join("small.txt"), b"small")
3838 .await
3839 .expect("small file");
3840 let large_bytes = vec![7; (DEFAULT_CHUNK_SIZE as usize * 2) + 17];
3841 tokio::fs::write(bundle.join("large.bin"), &large_bytes)
3842 .await
3843 .expect("large file");
3844
3845 let dest_bundle = dest_dir.path().join("bundle");
3846 tokio::fs::create_dir_all(&dest_bundle)
3847 .await
3848 .expect("dest bundle");
3849 tokio::fs::write(
3850 dest_bundle.join("large.bin"),
3851 &large_bytes[..DEFAULT_CHUNK_SIZE as usize + 5],
3852 )
3853 .await
3854 .expect("partial large");
3855
3856 let identity =
3857 NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3858 let recv_identity = identity.clone();
3859 let reserved_socket =
3860 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3861 let recv_addr = reserved_socket.local_addr().expect("local addr");
3862 drop(reserved_socket);
3863 let dest_path = dest_dir.path().to_path_buf();
3864 let server = tokio::spawn(async move {
3865 receive_direct_file(
3866 &ReceiveRequest::new(recv_addr),
3867 dest_path,
3868 &recv_identity,
3869 |_| {},
3870 )
3871 .await
3872 });
3873
3874 tokio::time::sleep(Duration::from_millis(100)).await;
3875
3876 let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3877 let send_request = SendRequest::new(peer, vec![bundle]).expect("send request is valid");
3878 let send_summary = send_direct_file(&send_request, &identity, |_| {})
3879 .await
3880 .expect("directory sent");
3881 let receive_summary = server
3882 .await
3883 .expect("server joined")
3884 .expect("directory received");
3885
3886 let received_large = tokio::fs::read(dest_bundle.join("large.bin"))
3887 .await
3888 .expect("large file readable");
3889 let received_small = tokio::fs::read(dest_bundle.join("nested/small.txt"))
3890 .await
3891 .expect("small file readable");
3892 assert_eq!(received_large, large_bytes);
3893 assert_eq!(received_small, b"small");
3894 assert_eq!(send_summary.bytes, receive_summary.bytes);
3895 assert!(
3896 receive_summary
3897 .files
3898 .iter()
3899 .any(|file| file.status == TransferFileStatus::Resumed)
3900 );
3901 }
3902
3903 #[tokio::test]
3904 async fn direct_quic_loopback_cancels_send_and_does_not_finalize_partial_file() {
3905 let source_dir = tempfile::tempdir().expect("source tempdir");
3906 let dest_dir = tempfile::tempdir().expect("dest tempdir");
3907 let identity_dir = tempfile::tempdir().expect("identity tempdir");
3908 let file_path = source_dir.path().join("payload.bin");
3909 tokio::fs::write(&file_path, vec![3; IO_BUFFER_SIZE * 64])
3910 .await
3911 .expect("source file written");
3912
3913 let identity =
3914 NativeIdentity::load_or_generate_in(identity_dir.path()).expect("identity generated");
3915 let recv_identity = identity.clone();
3916 let reserved_socket =
3917 std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("udp bind");
3918 let recv_addr = reserved_socket.local_addr().expect("local addr");
3919 drop(reserved_socket);
3920
3921 let dest_path = dest_dir.path().to_path_buf();
3922 let server = tokio::spawn(async move {
3923 receive_direct_file(
3924 &ReceiveRequest::new(recv_addr),
3925 dest_path,
3926 &recv_identity,
3927 |_| {},
3928 )
3929 .await
3930 });
3931
3932 tokio::time::sleep(Duration::from_millis(100)).await;
3933
3934 let peer = DirectPeer::parse(&recv_addr.to_string()).expect("peer parses");
3935 let send_request =
3936 SendRequest::new(peer, vec![file_path.clone()]).expect("send request is valid");
3937 let send_control = TransferControl::new();
3938 let cancel_on_progress = send_control.clone();
3939 let send_events = Arc::new(Mutex::new(Vec::new()));
3940 let send_events_log = send_events.clone();
3941 let send_result =
3942 send_direct_file_with_control(&send_request, &identity, send_control, |event| {
3943 if matches!(event, TransferEvent::Progress { .. }) {
3944 cancel_on_progress.cancel();
3945 }
3946 send_events_log.lock().expect("events lock").push(event);
3947 })
3948 .await;
3949
3950 assert!(matches!(send_result, Err(Error::TransferCancelled)));
3951
3952 let receive_result = tokio::time::timeout(Duration::from_secs(5), server)
3953 .await
3954 .expect("receiver should finish after sender cancellation")
3955 .expect("server joined");
3956 assert!(receive_result.is_err());
3957
3958 assert!(!dest_dir.path().join("payload.bin").exists());
3959 assert!(
3960 send_events
3961 .lock()
3962 .expect("events lock")
3963 .iter()
3964 .any(|event| matches!(event, TransferEvent::SessionCancelled { .. }))
3965 );
3966 }
3967}