sett/
encrypt.rs

1//! Encrypt workflow
2
3use std::{
4    collections::BTreeMap,
5    io::{self, Write as _},
6    path::{Path, PathBuf},
7};
8
9use anyhow::Context as _;
10use bytes::BytesMut;
11use chrono::{DateTime, Utc};
12use flate2::{write::GzEncoder, Compression};
13use sequoia_openpgp::policy::StandardPolicy;
14use tokio::io::AsyncWriteExt;
15use tracing::{debug, info, instrument, trace};
16
17use crate::{
18    destination::Destination,
19    filesystem::{check_space, check_writeable, get_common_path},
20    package,
21    task::{Mode, Status},
22    utils::{Progress, ProgressReader},
23    zip::ZipArchive,
24};
25
26/// Options required by the encrypt workflow.
27pub struct EncryptOpts<T, U> {
28    /// Input files for encryption.
29    pub files: Vec<PathBuf>,
30    /// Public OpenPGP keys of recipients.
31    pub recipients: Vec<crate::openpgp::cert::Fingerprint>,
32    /// Private OpenPGP key for signer (used for signing data).
33    pub signer: crate::openpgp::cert::Fingerprint,
34    /// OpenPGP certificate store
35    pub cert_store: crate::openpgp::certstore::CertStore<'static>,
36    /// OpenPGP private key store
37    pub key_store: crate::openpgp::keystore::KeyStore,
38    /// Password for decrypting the signer's key.
39    pub password: U,
40    /// Algorithm used for compressing data before encryption.
41    pub compression_algorithm: package::CompressionAlgorithm,
42    /// Run the workflow or only perform a check.
43    pub mode: Mode,
44    /// Encryption progress handler.
45    pub progress: Option<T>,
46    /// Purpose, see [`package::Purpose`] for more details.
47    pub purpose: Option<package::Purpose>,
48    /// Data transfer ID, see [`package::Metadata::transfer_id`] for more details.
49    pub transfer_id: Option<u32>,
50    /// Timestamp of the data package.
51    pub timestamp: DateTime<Utc>,
52    /// Prefix for the data package name.
53    pub prefix: Option<String>,
54    /// Extra metadata fields, see [`package::Metadata::extra`] for more details.
55    pub extra_metadata: BTreeMap<String, String>,
56}
57
58impl<T: Progress, U> std::fmt::Debug for EncryptOpts<T, U> {
59    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60        f.debug_struct("EncryptOpts")
61            .field("files", &self.files)
62            .field("recipients", &self.recipients)
63            .field("signer", &self.signer)
64            .field("compression_algorithm", &self.compression_algorithm)
65            .field("mode", &self.mode)
66            .field("purpose", &self.purpose)
67            .field("transfer_id", &self.transfer_id)
68            .field("extra_metadata", &self.extra_metadata)
69            .finish()
70    }
71}
72
73/// Creates a data package with encrypted and optionally compressed data along
74/// with checksums and metadata.
75///
76/// The data package can be stored in either the local filesystem or on a
77/// remote server.
78#[instrument(err(Debug, level=tracing::Level::ERROR))]
79pub async fn encrypt<T, F, Fut>(
80    mut opts: EncryptOpts<T, F>,
81    dest: Destination,
82) -> anyhow::Result<Status>
83where
84    T: Progress + Send + 'static,
85    F: Fn(crate::openpgp::types::PasswordHint) -> Fut,
86    Fut: std::future::Future<Output = crate::openpgp::types::Password>,
87{
88    trace!("Extract signing and encryption certificates");
89    let policy = StandardPolicy::new();
90    let signer_cert = opts.cert_store.get_cert_by_fingerprint(&opts.signer)?;
91    let recipients_certs = opts
92        .recipients
93        .iter()
94        .map(|fp| {
95            opts.cert_store
96                .get_cert_by_fingerprint(fp)
97                .map(|cert| cert.0)
98        })
99        .collect::<anyhow::Result<Vec<_>>>()?;
100    let keys = EncryptKeys {
101        signer: (
102            crate::openpgp::crypto::get_signing_capable_key(
103                &signer_cert.0,
104                &policy,
105                &mut opts.key_store,
106                &opts.password,
107            )
108            .await?,
109            signer_cert.0.fingerprint(),
110        ),
111        recipients: crate::openpgp::crypto::get_encryption_keys(&recipients_certs, &policy)?
112            .into_iter()
113            .zip(recipients_certs.iter().map(|cert| cert.fingerprint()))
114            .collect(),
115    };
116    let status = match dest {
117        Destination::Stdout => encrypt_stdout(opts, keys).await,
118        Destination::Local(local_opts) => encrypt_local(opts, &local_opts, keys).await,
119        Destination::Sftp(sftp_opts) => encrypt_sftp(opts, &sftp_opts, keys).await,
120        Destination::S3(s3_opts) => encrypt_s3(opts, &s3_opts, keys).await,
121    }?;
122    match &status {
123        Status::Checked {
124            destination,
125            source_size,
126        } => {
127            debug!(destination, source_size, "Checked encryption task input");
128        }
129        Status::Completed {
130            destination,
131            source_size,
132            destination_size,
133            metadata,
134        } => {
135            info!(
136                destination,
137                source_size,
138                destination_size,
139                metadata = metadata.to_json_or_debug(),
140                "Successfully created encrypted data package"
141            )
142        }
143    }
144    Ok(status)
145}
146
147/// A collection of keys necessary for creating a data package.
148///
149/// In addition to the key pair, we also store the fingerprint of the
150/// certificate, which is used to identify the signer and recipients
151/// in the metadata file.
152struct EncryptKeys {
153    signer: (sequoia_keystore::Key, sequoia_openpgp::Fingerprint),
154    recipients: Vec<(
155        sequoia_openpgp::packet::Key<
156            sequoia_openpgp::packet::key::PublicParts,
157            sequoia_openpgp::packet::key::UnspecifiedRole,
158        >,
159        sequoia_openpgp::Fingerprint,
160    )>,
161}
162
163/// Returns paths where the common prefix (to all paths) is stripped.
164fn get_archive_paths(files: &[PathBuf]) -> anyhow::Result<Vec<PathBuf>> {
165    let parent_folders = files
166        .iter()
167        .map(|f| {
168            f.parent()
169                .with_context(|| format!("Unable to find parent folder of {f:?}"))
170        })
171        .collect::<Result<Vec<_>, _>>()?;
172    let root_dir = get_common_path(&parent_folders)?;
173    let archive_paths = files
174        .iter()
175        .map(|f| Ok(Path::new(package::CONTENT_FOLDER).join(f.strip_prefix(&root_dir)?)))
176        .collect::<anyhow::Result<_>>()?;
177    Ok(archive_paths)
178}
179
180/// Initializes and return an OpenPGP message, which can be used for writing
181/// data into it.
182fn init_message<'a, W: io::Write + Send + Sync>(
183    writer: &'a mut W,
184    keys: &'a EncryptKeys,
185) -> anyhow::Result<sequoia_openpgp::serialize::stream::Message<'a>> {
186    use sequoia_openpgp::serialize::stream::{
187        Encryptor2, LiteralWriter, Message, Recipient, Signer,
188    };
189    LiteralWriter::new(
190        Signer::new(
191            Encryptor2::for_recipients(
192                Message::new(writer),
193                keys.recipients.iter().map(|(key, _)| Recipient::from(key)),
194            )
195            .build()?,
196            keys.signer.0.clone(),
197        )
198        .build()?,
199    )
200    .build()
201}
202
203/// Adds a file to the inner tar archive containing checksums of the individual
204/// data files.
205///
206/// The checksum file has the following format.
207///
208/// ```text
209/// <checksum1> <file 1 path inside the data package>
210/// <checksum2> <file 2 path inside the data package>
211/// ...
212/// ```
213fn add_checksum_file(
214    archive: &mut tar::Builder<impl io::Write>,
215    content: &[(String, String)],
216) -> anyhow::Result<()> {
217    use std::fmt::Write as _;
218    let content = content
219        .iter()
220        .fold(String::new(), |mut output, (checksum, path)| {
221            let _ = writeln!(output, "{checksum} {path}");
222            output
223        })
224        .into_bytes();
225    let mut header = tar::Header::new_gnu();
226    header.set_entry_type(tar::EntryType::file());
227    header.set_size(content.len().try_into()?);
228    header.set_mtime(Utc::now().timestamp().try_into()?);
229    header.set_mode(0o644);
230    header.set_cksum();
231    archive.append_data(&mut header, package::CHECKSUM_FILE, &content[..])?;
232    Ok(())
233}
234
235/// Writes files into a tarball.
236fn create_tarball(
237    writer: &mut impl io::Write,
238    file_path: &[(PathBuf, PathBuf)],
239    mut progress: Option<&mut impl Progress>,
240) -> anyhow::Result<()> {
241    enum Message {
242        Payload(BytesMut),
243        Finalize(std::path::PathBuf),
244    }
245
246    impl crate::io::Message for Message {
247        fn from_bytes(bytes: BytesMut) -> Self {
248            Self::Payload(bytes)
249        }
250    }
251
252    let (result_tx, mut result_rx) = tokio::sync::mpsc::channel(4);
253    let (pool_tx, mut pool_rx) = tokio::sync::mpsc::channel(4);
254    for _ in 0..4 {
255        pool_tx.blocking_send(BytesMut::with_capacity(1 << 19))?;
256    }
257    let checksums_len = file_path.len();
258    let checksums_handle = std::thread::spawn(move || -> anyhow::Result<_> {
259        use sequoia_openpgp::{crypto::hash::Digest as _, types::HashAlgorithm::SHA256};
260
261        let mut checksums = Vec::with_capacity(checksums_len);
262        let mut hasher = SHA256.context()?;
263        while let Some(message) = result_rx.blocking_recv() {
264            match message {
265                Message::Payload(payload) => {
266                    hasher.update(&payload);
267                    if pool_tx.blocking_send(payload).is_err() {
268                        // Under "normal" conditions this can only happen once, when all data is
269                        // read and the receiver end of this channel is closed.
270                        trace!("Failed to send chunk back to the pool (channel closed)");
271                    }
272                }
273                Message::Finalize(path) => {
274                    let path = crate::filesystem::to_posix_path(&path)
275                        .with_context(|| format!("Path contains non-UTF-8 characters: {path:?}"))?
276                        .to_string();
277                    let cs = std::mem::replace(&mut hasher, SHA256.context()?);
278                    checksums.push((crate::utils::to_hex_string(&cs.into_digest()?), path));
279                }
280            }
281        }
282        Ok(checksums)
283    });
284
285    let mut archive = tar::Builder::new(writer);
286    for (fs_path, archive_path) in file_path {
287        let f = std::fs::File::open(fs_path)?;
288        let mut header = tar::Header::new_gnu();
289        header.set_metadata(&f.metadata()?);
290        header.set_cksum();
291        let mut hash_reader = crate::io::Tee::new(f, &mut pool_rx, &result_tx)?;
292        if let Some(p) = progress.as_mut() {
293            let mut progress_reader = ProgressReader::new(&mut hash_reader, |len| {
294                p.inc(len.try_into()?);
295                Ok(())
296            });
297            archive.append_data(&mut header, archive_path, &mut progress_reader)?;
298        } else {
299            archive.append_data(&mut header, archive_path, &mut hash_reader)?;
300        }
301        result_tx.blocking_send(Message::Finalize(archive_path.clone()))?;
302    }
303    drop(result_tx);
304    let checksums = checksums_handle
305        .join()
306        .map_err(|_| anyhow::anyhow!("checksum thread panicked"))??;
307    add_checksum_file(&mut archive, &checksums)?;
308    archive.finish()?;
309    Ok(())
310}
311
312/// Adds the metadata file to the data package (outer zip file).
313fn add_metadata_file<S: sequoia_openpgp::crypto::Signer + Send + Sync>(
314    archive: &mut ZipArchive<impl io::Write>,
315    metadata: &package::Metadata,
316    signer_key: S,
317) -> anyhow::Result<()> {
318    let zip_file_opts = Default::default();
319    archive.add(package::METADATA_FILE, &zip_file_opts)?;
320    let metadata_json = serde_json::to_string(&metadata)?.as_bytes().to_owned();
321    archive.write_all(&metadata_json)?;
322    archive.add(package::METADATA_SIG_FILE, &zip_file_opts)?;
323    archive.write_all(&crate::openpgp::crypto::sign_detached(
324        &metadata_json,
325        signer_key,
326        crate::openpgp::types::SerializationFormat::AsciiArmored,
327    )?)?;
328    Ok(())
329}
330
331/// Processes input data files.
332///
333/// Returns:
334///
335/// - A vector pairs where the first element is the absolute path to the file
336///   and the second element is the path inside the data package, i.e. a path
337///   where the common prefix (for all files) is stripped.
338/// - The combined size of all files.
339///
340/// Processing steps:
341///
342/// - Walk directories and return individual files.
343/// - Normalize paths (make them absolute).
344/// - Remove duplicated paths.
345/// - Verify that archive paths contain only valid UTF-8 characters
346///   (later in the workflow it's necessary to write paths as strings).
347/// - Calculate the combined size of the input.
348fn process_files<P: AsRef<Path>>(files: &[P]) -> anyhow::Result<(Vec<(PathBuf, PathBuf)>, u64)> {
349    let mut total_input_size = 0u64;
350    let mut unique_files = std::collections::HashSet::new();
351    for entry in files.iter().flat_map(walkdir::WalkDir::new) {
352        let entry = entry?;
353        if entry.file_type().is_file() {
354            total_input_size += entry.metadata()?.len();
355            unique_files.insert(entry.path().canonicalize()?.to_owned());
356        }
357    }
358    let files = Vec::from_iter(unique_files);
359    let archive_paths = get_archive_paths(&files)?;
360    archive_paths.iter().try_for_each(|p| {
361        p.to_str()
362            .and(Some(()))
363            .with_context(|| format!("Path contains non-UTF-8 characters: {p:?}"))
364    })?;
365    let file_archive_path = files.into_iter().zip(archive_paths).collect();
366    Ok((file_archive_path, total_input_size))
367}
368
369/// Spawns a blocking task to handle the encryption and compression pipeline.
370fn spawn_encryption_task<T: Progress + Send + 'static, F>(
371    mut opts: EncryptOpts<T, F>,
372    file_path: Vec<(PathBuf, PathBuf)>,
373    total_input_size: u64,
374    keys: EncryptKeys,
375    source: crate::io::Source,
376    sink: tokio::sync::mpsc::Sender<BytesMut>,
377) -> tokio::task::JoinHandle<anyhow::Result<(u64, package::Metadata)>> {
378    struct Message(BytesMut);
379
380    impl crate::io::Message for Message {
381        fn from_bytes(bytes: BytesMut) -> Self {
382            Self(bytes)
383        }
384    }
385
386    tokio::task::spawn_blocking(move || -> anyhow::Result<_> {
387        let writer = crate::io::ChannelWriter::new(source, sink)?;
388        let mut zip = ZipArchive::new(writer);
389        zip.add(package::DATA_FILE_ENCRYPTED, &Default::default())?;
390
391        let (result_tx, mut result_rx) = tokio::sync::mpsc::channel::<Message>(4);
392        let (pool_tx, mut pool_rx) = tokio::sync::mpsc::channel(4);
393        for _ in 0..4 {
394            pool_tx.blocking_send(BytesMut::with_capacity(1 << 19))?;
395        }
396        let checksums_handle = std::thread::spawn(move || -> anyhow::Result<_> {
397            use sequoia_openpgp::{crypto::hash::Digest as _, types::HashAlgorithm::SHA256};
398
399            let mut hasher = SHA256.context()?;
400            while let Some(message) = result_rx.blocking_recv() {
401                hasher.update(&message.0);
402                if pool_tx.blocking_send(message.0).is_err() {
403                    // Under "normal" conditions this can only happen once, when all data is
404                    // read and the receiver end of this channel is closed.
405                    trace!("Failed to send chunk back to the pool (channel closed)");
406                }
407            }
408            Ok(crate::utils::to_hex_string(&hasher.into_digest()?))
409        });
410
411        let mut checksum_writer = crate::io::Tee::new(&mut zip, &mut pool_rx, &result_tx)?;
412        let mut encrypted_message = init_message(&mut checksum_writer, &keys)?;
413
414        crate::io::write_parallel(&mut encrypted_message, |w| {
415            let mut progress = opts.progress.as_mut();
416            if let Some(p) = &mut progress {
417                p.set_length(total_input_size);
418            }
419            match opts.compression_algorithm {
420                package::CompressionAlgorithm::Stored => create_tarball(w, &file_path, progress),
421                package::CompressionAlgorithm::Gzip(level) => {
422                    let mut enc = GzEncoder::new(
423                        w,
424                        Compression::new(level.unwrap_or_else(|| Compression::default().level())),
425                    );
426                    create_tarball(&mut enc, &file_path, progress)
427                }
428                package::CompressionAlgorithm::Zstandard(level) => {
429                    let mut enc = zstd::stream::write::Encoder::new(
430                        w,
431                        level.unwrap_or(zstd::DEFAULT_COMPRESSION_LEVEL),
432                    )?;
433                    enc.multithread(2)?;
434                    create_tarball(&mut enc, &file_path, progress)?;
435                    enc.finish()?;
436                    Ok(())
437                }
438            }
439        })?;
440
441        encrypted_message.finalize()?;
442        checksum_writer.flush_channel()?;
443        drop(result_tx);
444        let checksum = checksums_handle
445            .join()
446            .map_err(|_| anyhow::anyhow!("checksum thread in encrypt panicked"))??;
447
448        let metadata = package::Metadata {
449            sender: keys.signer.1.to_hex(),
450            recipients: keys.recipients.iter().map(|(_, fp)| fp.to_hex()).collect(),
451            checksum,
452            timestamp: opts.timestamp,
453            version: package::default_version(),
454            checksum_algorithm: Default::default(),
455            compression_algorithm: opts.compression_algorithm,
456            transfer_id: opts.transfer_id,
457            purpose: opts.purpose,
458            extra: opts.extra_metadata,
459        };
460        add_metadata_file(&mut zip, &metadata, keys.signer.0.clone())?;
461        let size = zip.finish()?;
462
463        if let Some(p) = &mut opts.progress {
464            p.finish();
465        }
466        Ok((size, metadata))
467    })
468}
469
470async fn encrypt_to_writer<T: Progress + Send + 'static, F>(
471    output: impl tokio::io::AsyncWrite,
472    opts: EncryptOpts<T, F>,
473    file_path: Vec<(PathBuf, PathBuf)>,
474    total_input_size: u64,
475    keys: EncryptKeys,
476) -> anyhow::Result<(u64, package::Metadata)> {
477    let (result_tx, mut result_rx) = tokio::sync::mpsc::channel(3);
478    let (buf_pool_tx, buf_pool_rx) = tokio::sync::mpsc::channel(3);
479    for _ in 0..3 {
480        buf_pool_tx.send(BytesMut::with_capacity(1 << 22)).await?;
481    }
482    let encrypt_task_handle = spawn_encryption_task(
483        opts,
484        file_path,
485        total_input_size,
486        keys,
487        crate::io::Source::Channel(buf_pool_rx),
488        result_tx,
489    );
490    tokio::pin!(output);
491    while let Some(chunk) = result_rx.recv().await {
492        output.write_all(&chunk).await?;
493        if buf_pool_tx.send(chunk).await.is_err() {
494            // Under "normal" conditions this can only happen once, when the encryption thread
495            // is done and the receiver end of this channel is closed.
496            trace!("Failed to send chunk back to the pool (channel closed)");
497        }
498    }
499    encrypt_task_handle.await?
500}
501
502async fn encrypt_to_blocking_writer<T: Progress + Send + 'static, F>(
503    output: &mut impl std::io::Write,
504    opts: EncryptOpts<T, F>,
505    file_path: Vec<(PathBuf, PathBuf)>,
506    total_input_size: u64,
507    keys: EncryptKeys,
508) -> anyhow::Result<(u64, package::Metadata)> {
509    let (result_tx, mut result_rx) = tokio::sync::mpsc::channel(3);
510    let (buf_pool_tx, buf_pool_rx) = tokio::sync::mpsc::channel(3);
511    for _ in 0..3 {
512        buf_pool_tx.send(BytesMut::with_capacity(1 << 22)).await?;
513    }
514    let encrypt_task_handle = spawn_encryption_task(
515        opts,
516        file_path,
517        total_input_size,
518        keys,
519        crate::io::Source::Channel(buf_pool_rx),
520        result_tx,
521    );
522    while let Some(chunk) = result_rx.recv().await {
523        output.write_all(&chunk)?;
524        if buf_pool_tx.send(chunk).await.is_err() {
525            // Under "normal" conditions this can only happen once, when the encryption thread
526            // is done and the receiver end of this channel is closed.
527            trace!("Failed to send chunk back to the pool (channel closed)");
528        }
529    }
530    encrypt_task_handle.await?
531}
532
533/// Encrypt to a file on disk.
534async fn encrypt_local<T: Progress + Send + 'static, F>(
535    opts: EncryptOpts<T, F>,
536    dest: &crate::destination::Local,
537    keys: EncryptKeys,
538) -> anyhow::Result<Status> {
539    trace!("Verify files to encrypt");
540    let (file_path, source_size) = process_files(&opts.files)?;
541    let output_path = dest
542        .path
543        .join(dest.package_name(&opts.timestamp, opts.prefix.as_deref()));
544    let destination = output_path.to_string_lossy().into_owned();
545    let parent = output_path
546        .parent()
547        .context("Unable to find output directory")?;
548    trace!("Verify available disk space");
549    check_space(source_size, parent, opts.mode)?;
550    trace!("Verify whether destination is writeable");
551    check_writeable(parent, opts.mode)?;
552    Ok(if let Mode::Check = opts.mode {
553        Status::Checked {
554            destination,
555            source_size,
556        }
557    } else {
558        let output = tokio::fs::File::create(&output_path).await?;
559        let mut buffered_output = tokio::io::BufWriter::with_capacity(1 << 22, output);
560        let (package_size, metadata) =
561            encrypt_to_writer(&mut buffered_output, opts, file_path, source_size, keys)
562                .await
563                .with_context(|| {
564                    // If an error occurs while creating the `.zip` file, delete
565                    // the partial file.
566                    let err_msg = "encryption failed";
567                    if let Err(msg) = std::fs::remove_file(&output_path) {
568                        anyhow::anyhow!(
569                            "{}. Additionally, failed to clean partial file '{}', reason: {}",
570                            err_msg,
571                            &destination,
572                            msg
573                        )
574                    } else {
575                        anyhow::anyhow!(err_msg)
576                    }
577                })?;
578        buffered_output.flush().await?;
579        Status::Completed {
580            destination,
581            source_size,
582            destination_size: package_size,
583            metadata,
584        }
585    })
586}
587
588/// Encrypt to standard output.
589async fn encrypt_stdout<T: Progress + Send + 'static, F>(
590    opts: EncryptOpts<T, F>,
591    keys: EncryptKeys,
592) -> anyhow::Result<Status> {
593    trace!("Verify files to encrypt");
594    let (file_path, source_size) = process_files(&opts.files)?;
595    let destination = "stdout".to_string();
596    Ok(if let Mode::Check = opts.mode {
597        Status::Checked {
598            destination,
599            source_size,
600        }
601    } else {
602        let (package_size, metadata) =
603            encrypt_to_writer(tokio::io::stdout(), opts, file_path, source_size, keys).await?;
604        Status::Completed {
605            destination,
606            source_size,
607            destination_size: package_size,
608            metadata,
609        }
610    })
611}
612
613async fn encrypt_s3<T: Progress + Send + 'static, F>(
614    opts: EncryptOpts<T, F>,
615    dest: &crate::destination::S3,
616    keys: EncryptKeys,
617) -> anyhow::Result<Status> {
618    let object_name = package::generate_package_name(&opts.timestamp, opts.prefix.as_deref());
619    trace!("Verify files to encrypt");
620    let (file_path, source_size) = process_files(&opts.files)?;
621    let destination = [
622        dest.client().endpoint.as_str(),
623        dest.bucket(),
624        object_name.as_str(),
625    ]
626    .join("/");
627    if let Mode::Check = opts.mode {
628        return Ok(Status::Checked {
629            destination,
630            source_size,
631        });
632    }
633    let (result_tx, result_rx) = tokio::sync::mpsc::channel(3);
634    let chunk_size = crate::remote::s3::compute_chunk_size(source_size);
635    let encrypt_task_handle = spawn_encryption_task(
636        opts,
637        file_path,
638        source_size,
639        keys,
640        crate::io::Source::New(chunk_size),
641        result_tx,
642    );
643    dest.client()
644        .put_object(dest.bucket(), &object_name, result_rx)
645        .await?;
646    let (package_size, metadata) = encrypt_task_handle.await??;
647    Ok(Status::Completed {
648        destination,
649        source_size,
650        destination_size: package_size,
651        metadata,
652    })
653}
654
655/// Encrypts to a file or a remote SFTP server.
656async fn encrypt_sftp<T: Progress + Send + 'static, F>(
657    opts: EncryptOpts<T, F>,
658    dest: &crate::destination::Sftp,
659    keys: EncryptKeys,
660) -> anyhow::Result<Status> {
661    trace!("Verify files to encrypt");
662    let (file_path, source_size) = process_files(&opts.files)?;
663    let client = dest.client().connect()?;
664    let upload_dir = crate::remote::sftp::UploadDir::new(dest.base_path(), &client);
665    let dpkg_path = crate::remote::sftp::DpkgPath::new(
666        &upload_dir.path,
667        package::generate_package_name(&opts.timestamp, opts.prefix.as_deref()),
668        &client,
669    );
670    let destination = client.get_url(&dpkg_path.path);
671    if let Mode::Check = opts.mode {
672        return Ok(Status::Checked {
673            destination,
674            source_size,
675        });
676    }
677    upload_dir.create(None)?;
678    let mut buffered_output =
679        std::io::BufWriter::with_capacity(1 << 22, client.inner.create(&dpkg_path.tmp)?);
680    let (package_size, metadata) =
681        encrypt_to_blocking_writer(&mut buffered_output, opts, file_path, source_size, keys)
682            .await
683            .with_context(|| {
684                // If an error occurs while creating the `.zip` file, delete
685                // the partial file on the SFTP server.
686                let err_msg = "encryption failed";
687                if let Err(msg) = upload_dir.delete() {
688                    anyhow::anyhow!(
689                        "{}. Additionally, failed to clean partially uploaded data \
690                    at '{}', reason: {}",
691                        err_msg,
692                        destination,
693                        msg
694                    )
695                } else {
696                    anyhow::anyhow!(err_msg)
697                }
698            })?;
699    buffered_output.flush()?;
700    dpkg_path.finalize()?;
701    upload_dir.finalize()?;
702    Ok(Status::Completed {
703        source_size,
704        destination_size: package_size,
705        destination,
706        metadata,
707    })
708}