use super::*;
use std::{
fmt::Debug,
marker::PhantomData,
};
use tokio::sync::mpsc::UnboundedReceiver;
#[derive(Debug, Clone)]
pub struct BasicWorker;
impl BasicWorker {
#[allow(missing_docs)]
pub fn new() -> Box<Self> {
Box::new(Self)
}
}
impl Worker for BasicWorker {
fn handle_response(self: Box<Self>, giveload: Vec<u8>) -> anyhow::Result<()> {
Decoder::try_from(giveload)?;
Ok(())
}
fn handle_error(self: Box<Self>, error: WorkerError, _reporter: Option<&ReporterHandle>) -> anyhow::Result<()> {
anyhow::bail!(error);
}
}
pub struct BasicRetryWorker<R> {
pub request: R,
pub retries: usize,
}
impl<R> Debug for BasicRetryWorker<R>
where
R: Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BasicRetryWorker")
.field("request", &self.request)
.field("retries", &self.retries)
.finish()
}
}
impl<R> Clone for BasicRetryWorker<R>
where
R: Clone,
{
fn clone(&self) -> Self {
Self {
request: self.request.clone(),
retries: self.retries,
}
}
}
impl<R> BasicRetryWorker<R> {
pub fn new(request: R) -> Box<Self> {
Box::new(request.into())
}
}
impl<R, H> IntoRespondingWorker<R, H> for BasicRetryWorker<R>
where
H: 'static + Debug + Send + Sync,
R: 'static + Send + Debug + Request + Sync + SendRequestExt,
{
type Output = AnyWorker<H, R>;
fn with_handle(self, handle: H) -> AnyWorker<H, R> {
AnyWorker::<H, R>::from(self, handle)
}
}
impl<R> From<R> for BasicRetryWorker<R> {
fn from(request: R) -> Self {
Self { request, retries: 0 }
}
}
impl<R> Worker for BasicRetryWorker<R>
where
R: 'static + Debug + Send + Request + Sync,
{
fn handle_response(self: Box<Self>, giveload: Vec<u8>) -> anyhow::Result<()> {
Decoder::try_from(giveload)?;
Ok(())
}
fn handle_error(
self: Box<Self>,
mut error: WorkerError,
reporter_opt: Option<&ReporterHandle>,
) -> anyhow::Result<()> {
error!("{}", error);
if let WorkerError::Cql(ref mut cql_error) = error {
if let (Some(id), Some(reporter)) = (cql_error.try_unprepared_id(), reporter_opt) {
handle_unprepared_error(self, id, reporter).map_err(|worker| {
error!(
"Error trying to reprepare query: {:?}",
worker.request().statement_by_id(&id)
);
anyhow::anyhow!("Error trying to reprepare query!")
})
} else {
self.retry()?
.map_or_else(|| Ok(()), |_| anyhow::bail!("Basic worker consumed all retries"))
}
} else {
self.retry()?
.map_or_else(|| Ok(()), |_| anyhow::bail!("Basic worker consumed all retries"))
}
}
}
impl<R> RetryableWorker<R> for BasicRetryWorker<R>
where
R: 'static + Debug + Send + Request + Sync,
{
fn retries(&self) -> usize {
self.retries
}
fn request(&self) -> &R {
&self.request
}
fn retries_mut(&mut self) -> &mut usize {
&mut self.retries
}
}
pub struct SpawnableRespondWorker<R, I, W> {
pub(crate) inbox: I,
pub(crate) worker: W,
_req: PhantomData<fn(R) -> R>,
}
impl<R, I, W> SpawnableRespondWorker<R, I, W>
where
R: Request + Debug + Send,
W: RetryableWorker<R> + Clone,
{
pub fn new(inbox: I, worker: W) -> Box<Self> {
Box::new(Self {
inbox,
worker,
_req: PhantomData,
})
}
pub fn send_to_reporter(self: Box<Self>, reporter: &ReporterHandle) -> Result<DecodeResult<R::Marker>, RequestError>
where
Self: 'static + Sized,
W: Worker,
R: SendRequestExt,
{
Box::new(self.worker.clone()).send_to_reporter(reporter)?;
Ok(DecodeResult::new(R::Marker::new(), R::TYPE))
}
pub fn send_local(self: Box<Self>) -> Result<DecodeResult<R::Marker>, RequestError>
where
Self: 'static + Sized,
W: Worker,
R: SendRequestExt,
{
Box::new(self.worker.clone()).send_local()?;
Ok(DecodeResult::new(R::Marker::new(), R::TYPE))
}
pub fn send_global(self: Box<Self>) -> Result<DecodeResult<R::Marker>, RequestError>
where
Self: 'static + Sized,
W: Worker,
R: SendRequestExt,
{
Box::new(self.worker.clone()).send_global()?;
Ok(DecodeResult::new(R::Marker::new(), R::TYPE))
}
}
impl<R, W> SpawnableRespondWorker<R, UnboundedReceiver<Result<Decoder, WorkerError>>, W>
where
R: 'static + SendRequestExt + Clone + Debug + Send + Sync,
W: 'static + RetryableWorker<R> + Clone + Worker,
{
pub async fn get_local(&mut self) -> Result<<R::Marker as Marker>::Output, RequestError>
where
Self: Sized,
{
let marker = Box::new(self.worker.clone()).send_local()?;
Ok(marker.try_decode(
self.inbox
.recv()
.await
.ok_or_else(|| anyhow::anyhow!("No response from worker!"))??,
)?)
}
pub fn get_local_blocking(&mut self) -> Result<<R::Marker as Marker>::Output, RequestError>
where
Self: Sized,
W: Worker,
{
let marker = Box::new(self.worker.clone()).send_local()?;
Ok(marker.try_decode(
self.inbox
.blocking_recv()
.ok_or_else(|| anyhow::anyhow!("No response from worker!"))??,
)?)
}
pub async fn get_global(&mut self) -> Result<<R::Marker as Marker>::Output, RequestError>
where
Self: Sized,
W: Worker,
{
let marker = Box::new(self.worker.clone()).send_global()?;
Ok(marker.try_decode(
self.inbox
.recv()
.await
.ok_or_else(|| anyhow::anyhow!("No response from worker!"))??,
)?)
}
pub fn get_global_blocking(&mut self) -> Result<<R::Marker as Marker>::Output, RequestError>
where
Self: Sized,
W: Worker,
{
let marker = Box::new(self.worker.clone()).send_global()?;
Ok(marker.try_decode(
self.inbox
.blocking_recv()
.ok_or_else(|| anyhow::anyhow!("No response from worker!"))??,
)?)
}
}