1use std::{
4 collections::BTreeMap,
5 io::{self, Write as _},
6 path::{Path, PathBuf},
7};
8
9use anyhow::Context as _;
10use bytes::BytesMut;
11use chrono::{DateTime, Utc};
12use flate2::{write::GzEncoder, Compression};
13use sequoia_openpgp::policy::StandardPolicy;
14use tokio::io::AsyncWriteExt;
15use tracing::{debug, info, instrument, trace};
16
17use crate::{
18 destination::Destination,
19 filesystem::{check_space, check_writeable, get_common_path},
20 package,
21 task::{Mode, Status},
22 utils::{Progress, ProgressReader},
23 zip::ZipArchive,
24};
25
26pub struct EncryptOpts<T, U> {
28 pub files: Vec<PathBuf>,
30 pub recipients: Vec<crate::openpgp::cert::Fingerprint>,
32 pub signer: crate::openpgp::cert::Fingerprint,
34 pub cert_store: crate::openpgp::certstore::CertStore<'static>,
36 pub key_store: crate::openpgp::keystore::KeyStore,
38 pub password: U,
40 pub compression_algorithm: package::CompressionAlgorithm,
42 pub mode: Mode,
44 pub progress: Option<T>,
46 pub purpose: Option<package::Purpose>,
48 pub transfer_id: Option<u32>,
50 pub timestamp: DateTime<Utc>,
52 pub prefix: Option<String>,
54 pub extra_metadata: BTreeMap<String, String>,
56}
57
58impl<T: Progress, U> std::fmt::Debug for EncryptOpts<T, U> {
59 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60 f.debug_struct("EncryptOpts")
61 .field("files", &self.files)
62 .field("recipients", &self.recipients)
63 .field("signer", &self.signer)
64 .field("compression_algorithm", &self.compression_algorithm)
65 .field("mode", &self.mode)
66 .field("purpose", &self.purpose)
67 .field("transfer_id", &self.transfer_id)
68 .field("extra_metadata", &self.extra_metadata)
69 .finish()
70 }
71}
72
73#[instrument(err(Debug, level=tracing::Level::ERROR))]
79pub async fn encrypt<T, F, Fut>(
80 mut opts: EncryptOpts<T, F>,
81 dest: Destination,
82) -> anyhow::Result<Status>
83where
84 T: Progress + Send + 'static,
85 F: Fn(crate::openpgp::types::PasswordHint) -> Fut,
86 Fut: std::future::Future<Output = crate::openpgp::types::Password>,
87{
88 trace!("Extract signing and encryption certificates");
89 let policy = StandardPolicy::new();
90 let signer_cert = opts.cert_store.get_cert_by_fingerprint(&opts.signer)?;
91 let recipients_certs = opts
92 .recipients
93 .iter()
94 .map(|fp| {
95 opts.cert_store
96 .get_cert_by_fingerprint(fp)
97 .map(|cert| cert.0)
98 })
99 .collect::<anyhow::Result<Vec<_>>>()?;
100 let keys = EncryptKeys {
101 signer: (
102 crate::openpgp::crypto::get_signing_capable_key(
103 &signer_cert.0,
104 &policy,
105 &mut opts.key_store,
106 &opts.password,
107 )
108 .await?,
109 signer_cert.0.fingerprint(),
110 ),
111 recipients: crate::openpgp::crypto::get_encryption_keys(&recipients_certs, &policy)?
112 .into_iter()
113 .zip(recipients_certs.iter().map(|cert| cert.fingerprint()))
114 .collect(),
115 };
116 let status = match dest {
117 Destination::Stdout => encrypt_stdout(opts, keys).await,
118 Destination::Local(local_opts) => encrypt_local(opts, &local_opts, keys).await,
119 Destination::Sftp(sftp_opts) => encrypt_sftp(opts, &sftp_opts, keys).await,
120 Destination::S3(s3_opts) => encrypt_s3(opts, &s3_opts, keys).await,
121 }?;
122 match &status {
123 Status::Checked {
124 destination,
125 source_size,
126 } => {
127 debug!(destination, source_size, "Checked encryption task input");
128 }
129 Status::Completed {
130 destination,
131 source_size,
132 destination_size,
133 metadata,
134 } => {
135 info!(
136 destination,
137 source_size,
138 destination_size,
139 metadata = metadata.to_json_or_debug(),
140 "Successfully created encrypted data package"
141 )
142 }
143 }
144 Ok(status)
145}
146
147struct EncryptKeys {
153 signer: (sequoia_keystore::Key, sequoia_openpgp::Fingerprint),
154 recipients: Vec<(
155 sequoia_openpgp::packet::Key<
156 sequoia_openpgp::packet::key::PublicParts,
157 sequoia_openpgp::packet::key::UnspecifiedRole,
158 >,
159 sequoia_openpgp::Fingerprint,
160 )>,
161}
162
163fn get_archive_paths(files: &[PathBuf]) -> anyhow::Result<Vec<PathBuf>> {
165 let parent_folders = files
166 .iter()
167 .map(|f| {
168 f.parent()
169 .with_context(|| format!("Unable to find parent folder of {f:?}"))
170 })
171 .collect::<Result<Vec<_>, _>>()?;
172 let root_dir = get_common_path(&parent_folders)?;
173 let archive_paths = files
174 .iter()
175 .map(|f| Ok(Path::new(package::CONTENT_FOLDER).join(f.strip_prefix(&root_dir)?)))
176 .collect::<anyhow::Result<_>>()?;
177 Ok(archive_paths)
178}
179
180fn init_message<'a, W: io::Write + Send + Sync>(
183 writer: &'a mut W,
184 keys: &'a EncryptKeys,
185) -> anyhow::Result<sequoia_openpgp::serialize::stream::Message<'a>> {
186 use sequoia_openpgp::serialize::stream::{
187 Encryptor2, LiteralWriter, Message, Recipient, Signer,
188 };
189 LiteralWriter::new(
190 Signer::new(
191 Encryptor2::for_recipients(
192 Message::new(writer),
193 keys.recipients.iter().map(|(key, _)| Recipient::from(key)),
194 )
195 .build()?,
196 keys.signer.0.clone(),
197 )
198 .build()?,
199 )
200 .build()
201}
202
203fn add_checksum_file(
214 archive: &mut tar::Builder<impl io::Write>,
215 content: &[(String, String)],
216) -> anyhow::Result<()> {
217 use std::fmt::Write as _;
218 let content = content
219 .iter()
220 .fold(String::new(), |mut output, (checksum, path)| {
221 let _ = writeln!(output, "{checksum} {path}");
222 output
223 })
224 .into_bytes();
225 let mut header = tar::Header::new_gnu();
226 header.set_entry_type(tar::EntryType::file());
227 header.set_size(content.len().try_into()?);
228 header.set_mtime(Utc::now().timestamp().try_into()?);
229 header.set_mode(0o644);
230 header.set_cksum();
231 archive.append_data(&mut header, package::CHECKSUM_FILE, &content[..])?;
232 Ok(())
233}
234
235fn create_tarball(
237 writer: &mut impl io::Write,
238 file_path: &[(PathBuf, PathBuf)],
239 mut progress: Option<&mut impl Progress>,
240) -> anyhow::Result<()> {
241 enum Message {
242 Payload(BytesMut),
243 Finalize(std::path::PathBuf),
244 }
245
246 impl crate::io::Message for Message {
247 fn from_bytes(bytes: BytesMut) -> Self {
248 Self::Payload(bytes)
249 }
250 }
251
252 let (result_tx, mut result_rx) = tokio::sync::mpsc::channel(4);
253 let (pool_tx, mut pool_rx) = tokio::sync::mpsc::channel(4);
254 for _ in 0..4 {
255 pool_tx.blocking_send(BytesMut::with_capacity(1 << 19))?;
256 }
257 let checksums_len = file_path.len();
258 let checksums_handle = std::thread::spawn(move || -> anyhow::Result<_> {
259 use sequoia_openpgp::{crypto::hash::Digest as _, types::HashAlgorithm::SHA256};
260
261 let mut checksums = Vec::with_capacity(checksums_len);
262 let mut hasher = SHA256.context()?;
263 while let Some(message) = result_rx.blocking_recv() {
264 match message {
265 Message::Payload(payload) => {
266 hasher.update(&payload);
267 if pool_tx.blocking_send(payload).is_err() {
268 trace!("Failed to send chunk back to the pool (channel closed)");
271 }
272 }
273 Message::Finalize(path) => {
274 let path = crate::filesystem::to_posix_path(&path)
275 .with_context(|| format!("Path contains non-UTF-8 characters: {path:?}"))?
276 .to_string();
277 let cs = std::mem::replace(&mut hasher, SHA256.context()?);
278 checksums.push((crate::utils::to_hex_string(&cs.into_digest()?), path));
279 }
280 }
281 }
282 Ok(checksums)
283 });
284
285 let mut archive = tar::Builder::new(writer);
286 for (fs_path, archive_path) in file_path {
287 let f = std::fs::File::open(fs_path)?;
288 let mut header = tar::Header::new_gnu();
289 header.set_metadata(&f.metadata()?);
290 header.set_cksum();
291 let mut hash_reader = crate::io::Tee::new(f, &mut pool_rx, &result_tx)?;
292 if let Some(p) = progress.as_mut() {
293 let mut progress_reader = ProgressReader::new(&mut hash_reader, |len| {
294 p.inc(len.try_into()?);
295 Ok(())
296 });
297 archive.append_data(&mut header, archive_path, &mut progress_reader)?;
298 } else {
299 archive.append_data(&mut header, archive_path, &mut hash_reader)?;
300 }
301 result_tx.blocking_send(Message::Finalize(archive_path.clone()))?;
302 }
303 drop(result_tx);
304 let checksums = checksums_handle
305 .join()
306 .map_err(|_| anyhow::anyhow!("checksum thread panicked"))??;
307 add_checksum_file(&mut archive, &checksums)?;
308 archive.finish()?;
309 Ok(())
310}
311
312fn add_metadata_file<S: sequoia_openpgp::crypto::Signer + Send + Sync>(
314 archive: &mut ZipArchive<impl io::Write>,
315 metadata: &package::Metadata,
316 signer_key: S,
317) -> anyhow::Result<()> {
318 let zip_file_opts = Default::default();
319 archive.add(package::METADATA_FILE, &zip_file_opts)?;
320 let metadata_json = serde_json::to_string(&metadata)?.as_bytes().to_owned();
321 archive.write_all(&metadata_json)?;
322 archive.add(package::METADATA_SIG_FILE, &zip_file_opts)?;
323 archive.write_all(&crate::openpgp::crypto::sign_detached(
324 &metadata_json,
325 signer_key,
326 crate::openpgp::types::SerializationFormat::AsciiArmored,
327 )?)?;
328 Ok(())
329}
330
331fn process_files<P: AsRef<Path>>(files: &[P]) -> anyhow::Result<(Vec<(PathBuf, PathBuf)>, u64)> {
349 let mut total_input_size = 0u64;
350 let mut unique_files = std::collections::HashSet::new();
351 for entry in files.iter().flat_map(walkdir::WalkDir::new) {
352 let entry = entry?;
353 if entry.file_type().is_file() {
354 total_input_size += entry.metadata()?.len();
355 unique_files.insert(entry.path().canonicalize()?.to_owned());
356 }
357 }
358 let files = Vec::from_iter(unique_files);
359 let archive_paths = get_archive_paths(&files)?;
360 archive_paths.iter().try_for_each(|p| {
361 p.to_str()
362 .and(Some(()))
363 .with_context(|| format!("Path contains non-UTF-8 characters: {p:?}"))
364 })?;
365 let file_archive_path = files.into_iter().zip(archive_paths).collect();
366 Ok((file_archive_path, total_input_size))
367}
368
369fn spawn_encryption_task<T: Progress + Send + 'static, F>(
371 mut opts: EncryptOpts<T, F>,
372 file_path: Vec<(PathBuf, PathBuf)>,
373 total_input_size: u64,
374 keys: EncryptKeys,
375 source: crate::io::Source,
376 sink: tokio::sync::mpsc::Sender<BytesMut>,
377) -> tokio::task::JoinHandle<anyhow::Result<(u64, package::Metadata)>> {
378 struct Message(BytesMut);
379
380 impl crate::io::Message for Message {
381 fn from_bytes(bytes: BytesMut) -> Self {
382 Self(bytes)
383 }
384 }
385
386 tokio::task::spawn_blocking(move || -> anyhow::Result<_> {
387 let writer = crate::io::ChannelWriter::new(source, sink)?;
388 let mut zip = ZipArchive::new(writer);
389 zip.add(package::DATA_FILE_ENCRYPTED, &Default::default())?;
390
391 let (result_tx, mut result_rx) = tokio::sync::mpsc::channel::<Message>(4);
392 let (pool_tx, mut pool_rx) = tokio::sync::mpsc::channel(4);
393 for _ in 0..4 {
394 pool_tx.blocking_send(BytesMut::with_capacity(1 << 19))?;
395 }
396 let checksums_handle = std::thread::spawn(move || -> anyhow::Result<_> {
397 use sequoia_openpgp::{crypto::hash::Digest as _, types::HashAlgorithm::SHA256};
398
399 let mut hasher = SHA256.context()?;
400 while let Some(message) = result_rx.blocking_recv() {
401 hasher.update(&message.0);
402 if pool_tx.blocking_send(message.0).is_err() {
403 trace!("Failed to send chunk back to the pool (channel closed)");
406 }
407 }
408 Ok(crate::utils::to_hex_string(&hasher.into_digest()?))
409 });
410
411 let mut checksum_writer = crate::io::Tee::new(&mut zip, &mut pool_rx, &result_tx)?;
412 let mut encrypted_message = init_message(&mut checksum_writer, &keys)?;
413
414 crate::io::write_parallel(&mut encrypted_message, |w| {
415 let mut progress = opts.progress.as_mut();
416 if let Some(p) = &mut progress {
417 p.set_length(total_input_size);
418 }
419 match opts.compression_algorithm {
420 package::CompressionAlgorithm::Stored => create_tarball(w, &file_path, progress),
421 package::CompressionAlgorithm::Gzip(level) => {
422 let mut enc = GzEncoder::new(
423 w,
424 Compression::new(level.unwrap_or_else(|| Compression::default().level())),
425 );
426 create_tarball(&mut enc, &file_path, progress)
427 }
428 package::CompressionAlgorithm::Zstandard(level) => {
429 let mut enc = zstd::stream::write::Encoder::new(
430 w,
431 level.unwrap_or(zstd::DEFAULT_COMPRESSION_LEVEL),
432 )?;
433 enc.multithread(2)?;
434 create_tarball(&mut enc, &file_path, progress)?;
435 enc.finish()?;
436 Ok(())
437 }
438 }
439 })?;
440
441 encrypted_message.finalize()?;
442 checksum_writer.flush_channel()?;
443 drop(result_tx);
444 let checksum = checksums_handle
445 .join()
446 .map_err(|_| anyhow::anyhow!("checksum thread in encrypt panicked"))??;
447
448 let metadata = package::Metadata {
449 sender: keys.signer.1.to_hex(),
450 recipients: keys.recipients.iter().map(|(_, fp)| fp.to_hex()).collect(),
451 checksum,
452 timestamp: opts.timestamp,
453 version: package::default_version(),
454 checksum_algorithm: Default::default(),
455 compression_algorithm: opts.compression_algorithm,
456 transfer_id: opts.transfer_id,
457 purpose: opts.purpose,
458 extra: opts.extra_metadata,
459 };
460 add_metadata_file(&mut zip, &metadata, keys.signer.0.clone())?;
461 let size = zip.finish()?;
462
463 if let Some(p) = &mut opts.progress {
464 p.finish();
465 }
466 Ok((size, metadata))
467 })
468}
469
470async fn encrypt_to_writer<T: Progress + Send + 'static, F>(
471 output: impl tokio::io::AsyncWrite,
472 opts: EncryptOpts<T, F>,
473 file_path: Vec<(PathBuf, PathBuf)>,
474 total_input_size: u64,
475 keys: EncryptKeys,
476) -> anyhow::Result<(u64, package::Metadata)> {
477 let (result_tx, mut result_rx) = tokio::sync::mpsc::channel(3);
478 let (buf_pool_tx, buf_pool_rx) = tokio::sync::mpsc::channel(3);
479 for _ in 0..3 {
480 buf_pool_tx.send(BytesMut::with_capacity(1 << 22)).await?;
481 }
482 let encrypt_task_handle = spawn_encryption_task(
483 opts,
484 file_path,
485 total_input_size,
486 keys,
487 crate::io::Source::Channel(buf_pool_rx),
488 result_tx,
489 );
490 tokio::pin!(output);
491 while let Some(chunk) = result_rx.recv().await {
492 output.write_all(&chunk).await?;
493 if buf_pool_tx.send(chunk).await.is_err() {
494 trace!("Failed to send chunk back to the pool (channel closed)");
497 }
498 }
499 encrypt_task_handle.await?
500}
501
502async fn encrypt_to_blocking_writer<T: Progress + Send + 'static, F>(
503 output: &mut impl std::io::Write,
504 opts: EncryptOpts<T, F>,
505 file_path: Vec<(PathBuf, PathBuf)>,
506 total_input_size: u64,
507 keys: EncryptKeys,
508) -> anyhow::Result<(u64, package::Metadata)> {
509 let (result_tx, mut result_rx) = tokio::sync::mpsc::channel(3);
510 let (buf_pool_tx, buf_pool_rx) = tokio::sync::mpsc::channel(3);
511 for _ in 0..3 {
512 buf_pool_tx.send(BytesMut::with_capacity(1 << 22)).await?;
513 }
514 let encrypt_task_handle = spawn_encryption_task(
515 opts,
516 file_path,
517 total_input_size,
518 keys,
519 crate::io::Source::Channel(buf_pool_rx),
520 result_tx,
521 );
522 while let Some(chunk) = result_rx.recv().await {
523 output.write_all(&chunk)?;
524 if buf_pool_tx.send(chunk).await.is_err() {
525 trace!("Failed to send chunk back to the pool (channel closed)");
528 }
529 }
530 encrypt_task_handle.await?
531}
532
533async fn encrypt_local<T: Progress + Send + 'static, F>(
535 opts: EncryptOpts<T, F>,
536 dest: &crate::destination::Local,
537 keys: EncryptKeys,
538) -> anyhow::Result<Status> {
539 trace!("Verify files to encrypt");
540 let (file_path, source_size) = process_files(&opts.files)?;
541 let output_path = dest
542 .path
543 .join(dest.package_name(&opts.timestamp, opts.prefix.as_deref()));
544 let destination = output_path.to_string_lossy().into_owned();
545 let parent = output_path
546 .parent()
547 .context("Unable to find output directory")?;
548 trace!("Verify available disk space");
549 check_space(source_size, parent, opts.mode)?;
550 trace!("Verify whether destination is writeable");
551 check_writeable(parent, opts.mode)?;
552 Ok(if let Mode::Check = opts.mode {
553 Status::Checked {
554 destination,
555 source_size,
556 }
557 } else {
558 let output = tokio::fs::File::create(&output_path).await?;
559 let mut buffered_output = tokio::io::BufWriter::with_capacity(1 << 22, output);
560 let (package_size, metadata) =
561 encrypt_to_writer(&mut buffered_output, opts, file_path, source_size, keys)
562 .await
563 .with_context(|| {
564 let err_msg = "encryption failed";
567 if let Err(msg) = std::fs::remove_file(&output_path) {
568 anyhow::anyhow!(
569 "{}. Additionally, failed to clean partial file '{}', reason: {}",
570 err_msg,
571 &destination,
572 msg
573 )
574 } else {
575 anyhow::anyhow!(err_msg)
576 }
577 })?;
578 buffered_output.flush().await?;
579 Status::Completed {
580 destination,
581 source_size,
582 destination_size: package_size,
583 metadata,
584 }
585 })
586}
587
588async fn encrypt_stdout<T: Progress + Send + 'static, F>(
590 opts: EncryptOpts<T, F>,
591 keys: EncryptKeys,
592) -> anyhow::Result<Status> {
593 trace!("Verify files to encrypt");
594 let (file_path, source_size) = process_files(&opts.files)?;
595 let destination = "stdout".to_string();
596 Ok(if let Mode::Check = opts.mode {
597 Status::Checked {
598 destination,
599 source_size,
600 }
601 } else {
602 let (package_size, metadata) =
603 encrypt_to_writer(tokio::io::stdout(), opts, file_path, source_size, keys).await?;
604 Status::Completed {
605 destination,
606 source_size,
607 destination_size: package_size,
608 metadata,
609 }
610 })
611}
612
613async fn encrypt_s3<T: Progress + Send + 'static, F>(
614 opts: EncryptOpts<T, F>,
615 dest: &crate::destination::S3,
616 keys: EncryptKeys,
617) -> anyhow::Result<Status> {
618 let object_name = package::generate_package_name(&opts.timestamp, opts.prefix.as_deref());
619 trace!("Verify files to encrypt");
620 let (file_path, source_size) = process_files(&opts.files)?;
621 let destination = [
622 dest.client().endpoint.as_str(),
623 dest.bucket(),
624 object_name.as_str(),
625 ]
626 .join("/");
627 if let Mode::Check = opts.mode {
628 return Ok(Status::Checked {
629 destination,
630 source_size,
631 });
632 }
633 let (result_tx, result_rx) = tokio::sync::mpsc::channel(3);
634 let chunk_size = crate::remote::s3::compute_chunk_size(source_size);
635 let encrypt_task_handle = spawn_encryption_task(
636 opts,
637 file_path,
638 source_size,
639 keys,
640 crate::io::Source::New(chunk_size),
641 result_tx,
642 );
643 dest.client()
644 .put_object(dest.bucket(), &object_name, result_rx)
645 .await?;
646 let (package_size, metadata) = encrypt_task_handle.await??;
647 Ok(Status::Completed {
648 destination,
649 source_size,
650 destination_size: package_size,
651 metadata,
652 })
653}
654
655async fn encrypt_sftp<T: Progress + Send + 'static, F>(
657 opts: EncryptOpts<T, F>,
658 dest: &crate::destination::Sftp,
659 keys: EncryptKeys,
660) -> anyhow::Result<Status> {
661 trace!("Verify files to encrypt");
662 let (file_path, source_size) = process_files(&opts.files)?;
663 let client = dest.client().connect()?;
664 let upload_dir = crate::remote::sftp::UploadDir::new(dest.base_path(), &client);
665 let dpkg_path = crate::remote::sftp::DpkgPath::new(
666 &upload_dir.path,
667 package::generate_package_name(&opts.timestamp, opts.prefix.as_deref()),
668 &client,
669 );
670 let destination = client.get_url(&dpkg_path.path);
671 if let Mode::Check = opts.mode {
672 return Ok(Status::Checked {
673 destination,
674 source_size,
675 });
676 }
677 upload_dir.create(None)?;
678 let mut buffered_output =
679 std::io::BufWriter::with_capacity(1 << 22, client.inner.create(&dpkg_path.tmp)?);
680 let (package_size, metadata) =
681 encrypt_to_blocking_writer(&mut buffered_output, opts, file_path, source_size, keys)
682 .await
683 .with_context(|| {
684 let err_msg = "encryption failed";
687 if let Err(msg) = upload_dir.delete() {
688 anyhow::anyhow!(
689 "{}. Additionally, failed to clean partially uploaded data \
690 at '{}', reason: {}",
691 err_msg,
692 destination,
693 msg
694 )
695 } else {
696 anyhow::anyhow!(err_msg)
697 }
698 })?;
699 buffered_output.flush()?;
700 dpkg_path.finalize()?;
701 upload_dir.finalize()?;
702 Ok(Status::Completed {
703 source_size,
704 destination_size: package_size,
705 destination,
706 metadata,
707 })
708}