s3sync 1.58.6

Reliable, flexible, and fast synchronization tool for S3.
Documentation
use anyhow::{Context, Result, anyhow};
use async_channel::{Receiver, Sender};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};

use crate::Config;
use crate::storage::Storage;
use crate::types::token::PipelineCancellationToken;
use crate::types::{S3syncObject, SyncStatistics};

pub struct Stage {
    pub config: Config,
    pub source: Option<Storage>,
    pub target: Option<Storage>,
    pub receiver: Option<Receiver<S3syncObject>>,
    pub sender: Option<Sender<S3syncObject>>,
    pub cancellation_token: PipelineCancellationToken,
    pub has_warning: Arc<AtomicBool>,
}

#[derive(Debug, Clone, PartialEq)]
pub enum SendResult {
    Success,
    Closed,
}

impl Stage {
    pub fn new(
        config: Config,
        source: Option<Storage>,
        target: Option<Storage>,
        receiver: Option<Receiver<S3syncObject>>,
        sender: Option<Sender<S3syncObject>>,
        cancellation_token: PipelineCancellationToken,
        has_warning: Arc<AtomicBool>,
    ) -> Self {
        Self {
            config,
            source,
            target,
            receiver,
            sender,
            cancellation_token,
            has_warning,
        }
    }

    pub async fn send(&self, object: S3syncObject) -> Result<SendResult> {
        let result = self
            .sender
            .as_ref()
            .unwrap()
            .send(object)
            .await
            .context("async_channel::Sender::send() failed.");

        if let Err(e) = result {
            return if !self.is_channel_closed() {
                Err(anyhow!(e))
            } else {
                Ok(SendResult::Closed)
            };
        }

        Ok(SendResult::Success)
    }

    pub fn is_channel_closed(&self) -> bool {
        self.sender.as_ref().unwrap().is_closed()
    }

    pub async fn send_stats(&self, stats: SyncStatistics) {
        let _ = self
            .target
            .as_ref()
            .unwrap()
            .get_stats_sender()
            .send(stats)
            .await;
    }

    pub fn set_warning(&self) {
        self.has_warning.store(true, Ordering::SeqCst);
    }
}