use aws_sigv4::http_request::SigningError;
use aws_smithy_runtime_api::http::Headers;
use aws_smithy_types::config_bag::{Storable, StoreReplace};
use bytes::Bytes;
use std::sync::{mpsc, Arc, Mutex};
pub(crate) trait SignChunk: std::fmt::Debug {
fn chunk_signature(&mut self, chunk: &Bytes) -> Result<String, SigningError>;
fn trailer_signature(&mut self, trailing_headers: &Headers) -> Result<String, SigningError>;
}
#[derive(Clone, Debug)]
#[allow(clippy::type_complexity)]
pub struct DeferredSigner {
signer: Arc<Mutex<Option<Box<dyn SignChunk + Send + Sync>>>>,
rx: Arc<Mutex<Option<mpsc::Receiver<Box<dyn SignChunk + Send + Sync>>>>>,
}
impl Storable for DeferredSigner {
type Storer = StoreReplace<Self>;
}
impl DeferredSigner {
pub fn new() -> (Self, DeferredSignerSender) {
let (tx, rx) = mpsc::channel();
(
Self {
signer: Default::default(),
rx: Arc::new(Mutex::new(Some(rx))),
},
DeferredSignerSender { tx: Mutex::new(tx) },
)
}
pub fn empty() -> Self {
Self {
rx: Default::default(),
signer: Default::default(),
}
}
fn acquire(&self) -> Box<dyn SignChunk + Send + Sync> {
let mut rx = self.rx.lock().unwrap();
rx.take()
.and_then(|receiver| receiver.try_recv().ok())
.expect("signer should be available")
}
}
#[derive(Debug)]
pub struct DeferredSignerSender {
tx: Mutex<mpsc::Sender<Box<dyn SignChunk + Send + Sync>>>,
}
impl DeferredSignerSender {
pub(crate) fn send(
&self,
signer: Box<dyn SignChunk + Send + Sync>,
) -> Result<(), mpsc::SendError<Box<dyn SignChunk + Send + Sync>>> {
self.tx.lock().unwrap().send(signer)
}
}
impl Storable for DeferredSignerSender {
type Storer = StoreReplace<Self>;
}
impl SignChunk for DeferredSigner {
fn chunk_signature(&mut self, chunk: &Bytes) -> Result<String, SigningError> {
let mut signer = self.signer.lock().unwrap();
let signer = signer.get_or_insert_with(|| self.acquire());
signer.chunk_signature(chunk)
}
fn trailer_signature(&mut self, trailing_headers: &Headers) -> Result<String, SigningError> {
let mut signer = self.signer.lock().unwrap();
let signer = signer.get_or_insert_with(|| self.acquire());
signer.trailer_signature(trailing_headers)
}
}