use base64::{prelude::BASE64_STANDARD, Engine};
use crc32c::crc32c_append;
use futures::TryStream;
use std::{pin::Pin, task::Poll};
use tokio::sync::oneshot;
use crate::common::*;
#[derive(Clone, Debug)]
pub(crate) struct Hasher {
state: u32,
}
impl Hasher {
fn new() -> Self {
Self { state: 0 }
}
fn update(&mut self, data: &[u8]) {
self.state = crc32c_append(self.state, data);
}
pub(crate) fn finish(self) -> u32 {
self.state
}
pub(crate) fn finish_encoded(self) -> String {
let bytes = self.finish().to_be_bytes();
BASE64_STANDARD.encode(bytes)
}
}
#[test]
fn crc32c_matches_gcloud() {
let check = |data: &[u8], expected: u32| {
let mut hasher = Hasher::new();
hasher.update(data);
assert_eq!(hasher.finish(), expected);
};
check(&[0u8; 32], 0x8a91_36aa);
check(&[0xff; 32], 0x62a8_ab43);
let mut buf = [0u8; 32];
for i in 0u8..=31 {
buf[usize::from(i)] = i;
}
check(&buf, 0x46dd_794e);
for i in 0u8..=31 {
buf[usize::from(i)] = 31 - i;
}
check(&buf, 0x113f_db5c);
}
pub(crate) struct Crc32cStream<S>
where
S: TryStream<Error = Error> + Send + Unpin + 'static,
S::Ok: AsRef<[u8]>,
{
inner: S,
hasher: Hasher,
sender: Option<oneshot::Sender<Hasher>>,
}
impl<S> Crc32cStream<S>
where
S: TryStream<Error = Error> + Send + Unpin + 'static,
S::Ok: AsRef<[u8]>,
{
pub(crate) fn new(inner: S) -> (Self, oneshot::Receiver<Hasher>) {
let hasher = Hasher::new();
let (sender, receiver) = oneshot::channel();
(
Self {
inner,
hasher,
sender: Some(sender),
},
receiver,
)
}
}
impl<S, D> Stream for Crc32cStream<S>
where
S: TryStream<Ok = D, Error = Error, Item = Result<D, Error>>
+ Send
+ Unpin
+ 'static,
D: AsRef<[u8]>,
{
type Item = Result<S::Ok, S::Error>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let result = <S as Stream>::poll_next(Pin::new(&mut self.inner), cx);
match result {
Poll::Ready(Some(Ok(data))) => {
self.hasher.update(data.as_ref());
Poll::Ready(Some(Ok(data)))
}
Poll::Ready(None) => {
if let Some(sender) = self.sender.take() {
if sender.send(self.hasher.clone()).is_ok() {
Poll::Ready(None)
} else {
Poll::Ready(Some(Err(format_err!(
"broken pipe forwarding checksum from Crc32Stream",
))))
}
} else {
Poll::Ready(Some(Err(format_err!(
"Crc32Stream tried to end twice",
))))
}
}
other => other,
}
}
}