use crate::{
StreamData,
endpoint::GcsEndpoint,
error::Error,
writer::{StreamWriter, WriteJob},
};
use async_std::channel::Receiver;
use async_trait::async_trait;
use futures_util::StreamExt;
#[async_trait]
impl StreamWriter for GcsEndpoint {
async fn write_stream(
&self,
path: &str,
receiver: Receiver<StreamData>,
job_and_notification: &dyn WriteJob,
) -> Result<(), Error> {
let client = self.connection();
let job_id_str = job_and_notification.get_str_job_id();
let total_file_size;
let mut received_bytes = 0;
let mut prev_percent = 0;
let first_message = receiver.recv().await?;
let (progression_sender, progression_receiver) = std::sync::mpsc::sync_channel(100);
match first_message {
StreamData::Size(file_size) => {
total_file_size = file_size;
client
.object()
.create_streamed(
self.bucket(),
receiver
.take_while(move |message| {
futures::future::ready(
!matches!(message, StreamData::Eof) || !matches!(message, StreamData::Stop),
)
})
.map(move |stream_data| match stream_data {
StreamData::Data(data) => {
received_bytes += data.len();
progression_sender.send(received_bytes)?;
Ok(data)
}
other => Err(Error::Other(format!(
"GCS writer received an unexpected message: {other:?}"
))),
}),
file_size,
path,
"application/octet-stream",
)
.await?;
}
StreamData::Eof | StreamData::Stop => {
log::warn!(target: &job_id_str, "Nothing to do in writer.");
return Ok(());
}
other => {
return Err(Error::Other(format!(
"GCS writer received an unexpected {other:?} message, Size was expected"
)));
}
}
while let Ok(received_bytes) = progression_receiver.recv() {
let percent = (received_bytes as f32 / total_file_size as f32 * 100.0) as u8;
if percent > prev_percent {
prev_percent = percent;
job_and_notification.progress(percent)?;
}
}
Ok(())
}
}
#[test]
pub fn test_gcs_writer_invalid_data_message_before_size() {
use crate::writer::DummyWriteJob;
let dummy_write_job = DummyWriteJob {};
let bucket = "test_bucket".to_string();
let path = "/path/to/destination_file.txt".to_string();
let message = StreamData::Data(vec![1, 2, 3]);
let (sender, receiver) = async_std::channel::bounded(100);
let gcs_writer = GcsEndpoint::new(&bucket).unwrap();
let result = async_std::task::block_on(async {
sender.send(message).await.unwrap();
gcs_writer
.write_stream(&path, receiver, &dummy_write_job)
.await
});
assert!(result.is_err());
let error = result.as_ref().unwrap_err();
assert_eq!(
"GCS writer received an unexpected Data([1, 2, 3]) message, Size was expected".to_string(),
error.to_string()
);
}
#[test]
pub fn test_gcs_writer_eof_message_before_size() {
use crate::writer::DummyWriteJob;
let dummy_write_job = DummyWriteJob {};
let bucket = "test_bucket".to_string();
let path = "/path/to/destination_file.txt".to_string();
let message = StreamData::Eof;
let (sender, receiver) = async_std::channel::bounded(100);
let gcs_writer = GcsEndpoint::new(&bucket).unwrap();
let result = async_std::task::block_on(async {
sender.send(message).await.unwrap();
gcs_writer
.write_stream(&path, receiver, &dummy_write_job)
.await
});
assert!(result.is_ok());
assert!(matches!(result.unwrap(), ()));
}
#[test]
pub fn test_gcs_writer_stop_message_before_size() {
use crate::writer::DummyWriteJob;
let dummy_write_job = DummyWriteJob {};
let bucket = "test_bucket".to_string();
let path = "/path/to/destination_file.txt".to_string();
let message = StreamData::Stop;
let (sender, receiver) = async_std::channel::bounded(100);
let gcs_writer = GcsEndpoint::new(&bucket).unwrap();
let result = async_std::task::block_on(async {
sender.send(message).await.unwrap();
gcs_writer
.write_stream(&path, receiver, &dummy_write_job)
.await
});
assert!(result.is_ok());
assert!(matches!(result.unwrap(), ()));
}