use crate::{Actions, BlobCompressionAlgorithm, Error, IsAlreadyEncoded, JobId};
use serde::de::DeserializeOwned;
type Result<T, E = crate::error::BoxDynError> = anyhow::Result<T, E>;
pub type Worker = std::sync::Arc<dyn JobExecutor + Send + Sync>;
#[async_trait::async_trait]
pub trait JobExecutor {
async fn execute(
&self,
id: JobId,
data: Vec<u8>,
dependency_data: Option<Vec<u8>>,
actions: &mut Actions,
) -> Result<()>;
}
#[async_trait::async_trait]
pub trait JobWithNoData {
async fn execute(&self, id: JobId, actions: &mut Actions) -> Result<()>;
}
pub struct ExecutorForJobWithNoData<T>(pub T);
#[async_trait::async_trait]
pub trait JobWithNoDependencyData {
async fn execute(&self, id: JobId, data: Vec<u8>, actions: &mut Actions) -> Result<()>;
}
pub struct ExecutorForJobWithNoDependencyData<T>(pub T);
#[async_trait::async_trait]
pub trait URLFetchAndStoreJob {
fn get_url(id: &str) -> Result<reqwest::Url>;
fn build(id: &str, builder: reqwest::RequestBuilder) -> Result<reqwest::RequestBuilder>;
}
#[async_trait::async_trait]
pub trait URLFetchAndStoreJobWithData {
fn get_url(id: &str, data: Vec<u8>) -> Result<reqwest::Url>;
fn build(id: &str, builder: reqwest::RequestBuilder) -> Result<reqwest::RequestBuilder>;
}
pub struct ExecutorForURLFetchAndStoreJob<T>(pub T);
#[async_trait::async_trait]
pub trait URLPostAndStoreJob {
fn get_url(&self, id: &str, data: &[u8]) -> Result<reqwest::Url>;
fn build(
&self,
id: &str,
data: &[u8],
builder: reqwest::RequestBuilder,
) -> Result<reqwest::RequestBuilder>;
}
pub struct ExecutorForURLPostAndStoreJob<T>(pub T);
#[async_trait::async_trait]
pub trait JobWithDependencyData {
async fn execute(
&self,
id: JobId,
dependency_data: Vec<u8>,
actions: &mut Actions,
) -> Result<()>;
}
pub struct ExecutorForJobWithDependencyData<T>(pub T);
#[async_trait::async_trait]
pub trait JobWithSerializedDependencyData {
type Data;
async fn execute(
&self,
id: JobId,
dependency_data: Self::Data,
actions: &mut Actions,
) -> Result<()>;
}
pub struct ExecutorForJobWithSerializedDependencyData<T>(pub T);
#[async_trait::async_trait]
impl<T> JobExecutor for ExecutorForJobWithNoData<T>
where
T: JobWithNoData + Send + Sync,
{
#[inline]
async fn execute(
&self,
id: JobId,
_data: Vec<u8>,
_dependency_data: Option<Vec<u8>>,
actions: &mut Actions,
) -> Result<()> {
self.0.execute(id, actions).await
}
}
#[async_trait::async_trait]
impl<T> JobExecutor for ExecutorForJobWithNoDependencyData<T>
where
T: JobWithNoDependencyData + Send + Sync,
{
#[inline]
async fn execute(
&self,
id: JobId,
data: Vec<u8>,
_dependency_data: Option<Vec<u8>>,
actions: &mut Actions,
) -> Result<()> {
self.0.execute(id, data, actions).await
}
}
impl<T> URLFetchAndStoreJobWithData for T
where
T: URLFetchAndStoreJob + Send + Sync,
{
fn get_url(id: &str, _data: Vec<u8>) -> Result<reqwest::Url> {
<T as URLFetchAndStoreJob>::get_url(id)
}
fn build(id: &str, builder: reqwest::RequestBuilder) -> Result<reqwest::RequestBuilder> {
<T as URLFetchAndStoreJob>::build(id, builder)
}
}
#[async_trait::async_trait]
impl<T> JobExecutor for ExecutorForURLFetchAndStoreJob<T>
where
T: URLFetchAndStoreJobWithData + Send + Sync,
{
async fn execute(
&self,
id: JobId,
data: Vec<u8>,
_dependency_data: Option<Vec<u8>>,
actions: &mut Actions,
) -> Result<()> {
let url = T::get_url(&id, data)?;
let client = reqwest::Client::new();
let builder = client.get(url);
let builder = T::build(&id, builder)?;
let response = builder
.send()
.await
.map_err(|e| Error::JobExecution(Box::new(e)))?;
let status = response.status();
if !status.is_success() {
let headers = response.headers().clone();
let data = response
.bytes()
.await
.map_err(|e| Error::JobExecution(Box::new(e)))?
.to_vec();
let text = String::from_utf8_lossy(&data);
println!(
"Error fetching data of type {} for {:?} - {:?}: {:?}",
std::any::type_name::<T>(),
id,
headers,
text
);
return Err(Box::new(Error::URLFetchAndStoreJobError(
std::any::type_name::<T>().to_owned(),
id,
text.to_string(),
)));
}
let data = response
.bytes()
.await
.map_err(|e| Error::JobExecution(Box::new(e)))?
.to_vec();
actions.write_data((BlobCompressionAlgorithm::Zstd, data, IsAlreadyEncoded::No).into());
Ok(())
}
}
#[async_trait::async_trait]
impl<T> JobExecutor for ExecutorForURLPostAndStoreJob<T>
where
T: URLPostAndStoreJob + Send + Sync,
{
async fn execute(
&self,
id: JobId,
data: Vec<u8>,
_dependency_data: Option<Vec<u8>>,
actions: &mut Actions,
) -> Result<()> {
let url = self.0.get_url(&id, &data)?;
let client = reqwest::Client::new();
let builder = client.post(url);
let builder = self.0.build(&id, &data, builder)?;
let response = builder
.send()
.await
.map_err(|e| Error::JobExecution(Box::new(e)))?;
let status = response.status();
if !status.is_success() {
let headers = response.headers().clone();
let data = response
.bytes()
.await
.map_err(|e| Error::JobExecution(Box::new(e)))?
.to_vec();
let text = String::from_utf8_lossy(&data);
println!(
"Error fetching data of type {} for {:?} - {:?}: {:?}",
std::any::type_name::<T>(),
id,
headers,
text
);
return Err(Box::new(Error::URLFetchAndStoreJobError(
std::any::type_name::<T>().to_owned(),
id,
text.to_string(),
)));
}
let data = response
.bytes()
.await
.map_err(|e| Error::JobExecution(Box::new(e)))?
.to_vec();
actions.write_data((BlobCompressionAlgorithm::Zstd, data, IsAlreadyEncoded::No).into());
Ok(())
}
}
#[async_trait::async_trait]
impl<T> JobExecutor for ExecutorForJobWithDependencyData<T>
where
T: JobWithDependencyData + Send + Sync,
{
#[inline]
async fn execute(
&self,
id: JobId,
_data: Vec<u8>,
dependency_data: Option<Vec<u8>>,
actions: &mut Actions,
) -> Result<()> {
if let Some(data) = dependency_data {
self.0.execute(id, data, actions).await
} else {
Err(crate::error::Error::NoDependencyDataFoundForJob(
std::any::type_name::<T>().to_owned(),
)
.into())
}
}
}
#[async_trait::async_trait]
impl<T> JobExecutor for ExecutorForJobWithSerializedDependencyData<T>
where
T: JobWithSerializedDependencyData + Send + Sync,
T::Data: DeserializeOwned + Send + Sync,
{
#[inline]
async fn execute(
&self,
id: JobId,
_data: Vec<u8>,
dependency_data: Option<Vec<u8>>,
actions: &mut Actions,
) -> Result<()> {
if let Some(data) = dependency_data {
let deserialized: T::Data = serde_json::from_slice(&data)?;
self.0.execute(id, deserialized, actions).await
} else {
Err(crate::error::Error::NoDependencyDataFoundForJob(
std::any::type_name::<T>().to_owned(),
)
.into())
}
}
}