use std::{
fmt::Debug,
ops::{Deref, DerefMut},
};
use either::Either::{self, *};
use ufotofu::{BufferedProducer, BulkProducer, Producer};
use crate::{mutex::WriteGuard, Mutex};
#[derive(Debug)]
pub struct State<P, ProducerFinal, ProducerErr>(Mutex<MutexState<P, ProducerFinal, ProducerErr>>);
impl<P, ProducerFinal, ProducerErr> State<P, ProducerFinal, ProducerErr> {
pub fn new(producer: P) -> Self {
State(Mutex::new(MutexState {
p: producer,
last: None,
}))
}
}
#[derive(Debug)]
struct MutexState<P, ProducerFinal, ProducerErr> {
p: P,
last: Option<Result<ProducerFinal, ProducerErr>>,
}
#[derive(Debug, Clone)]
pub struct SharedProducer<R, P, ProducerFinal, ProducerErr>
where
R: Deref<Target = State<P, ProducerFinal, ProducerErr>> + Clone,
{
state_ref: R,
}
impl<R, P, ProducerFinal, ProducerErr> SharedProducer<R, P, ProducerFinal, ProducerErr>
where
R: Deref<Target = State<P, ProducerFinal, ProducerErr>> + Clone,
{
pub fn new(state_ref: R) -> Self {
Self { state_ref }
}
pub async fn access_producer(&self) -> SharedProducerAccess<P, ProducerFinal, ProducerErr> {
SharedProducerAccess(self.state_ref.deref().0.write().await)
}
}
#[derive(Debug)]
pub struct SharedProducerAccess<'shared_producer, P, ProducerFinal, ProducerErr>(
WriteGuard<'shared_producer, MutexState<P, ProducerFinal, ProducerErr>>,
);
impl<P, ProducerFinal, ProducerErr> Producer
for SharedProducerAccess<'_, P, ProducerFinal, ProducerErr>
where
P: Producer<Final = ProducerFinal, Error = ProducerErr>,
ProducerFinal: Clone,
ProducerErr: Clone,
{
type Item = P::Item;
type Final = P::Final;
type Error = P::Error;
async fn produce(&mut self) -> Result<Either<Self::Item, Self::Final>, Self::Error> {
let inner_state = self.0.deref_mut();
match inner_state.last.as_ref() {
Some(Ok(fin)) => Ok(Right(fin.clone())),
Some(Err(err)) => Err(err.clone()),
None => match inner_state.p.produce().await {
Ok(Left(item)) => Ok(Left(item)),
Ok(Right(fin)) => {
inner_state.last = Some(Ok(fin.clone()));
Ok(Right(fin))
}
Err(err) => {
inner_state.last = Some(Err(err.clone()));
Err(err)
}
},
}
}
}
impl<P, ProducerFinal, ProducerErr> BufferedProducer
for SharedProducerAccess<'_, P, ProducerFinal, ProducerErr>
where
P: BufferedProducer<Final = ProducerFinal, Error = ProducerErr>,
ProducerFinal: Clone,
ProducerErr: Clone,
{
async fn slurp(&mut self) -> Result<(), Self::Error> {
let inner_state = self.0.deref_mut();
match inner_state.last.as_ref() {
Some(Ok(_fin)) => Ok(()), Some(Err(err)) => Err(err.clone()),
None => match inner_state.p.slurp().await {
Ok(()) => Ok(()),
Err(err) => {
inner_state.last = Some(Err(err.clone()));
Err(err)
}
},
}
}
}
impl<P, ProducerFinal, ProducerErr> BulkProducer
for SharedProducerAccess<'_, P, ProducerFinal, ProducerErr>
where
P: BulkProducer<Final = ProducerFinal, Error = ProducerErr>,
ProducerFinal: Clone,
ProducerErr: Clone,
{
async fn expose_items<'a>(
&'a mut self,
) -> Result<Either<&'a [Self::Item], Self::Final>, Self::Error>
where
Self::Item: 'a,
{
let inner_state = self.0.deref_mut();
match inner_state.last.as_ref() {
Some(Ok(fin)) => Ok(Right(fin.clone())),
Some(Err(err)) => Err(err.clone()),
None => match inner_state.p.expose_items().await {
Ok(Left(items)) => Ok(Left(items)),
Ok(Right(fin)) => {
inner_state.last = Some(Ok(fin.clone()));
Ok(Right(fin))
}
Err(err) => {
inner_state.last = Some(Err(err.clone()));
Err(err)
}
},
}
}
async fn consider_produced(&mut self, amount: usize) -> Result<(), Self::Error> {
let inner_state = self.0.deref_mut();
match inner_state.last.as_ref() {
Some(Ok(_fin)) => Ok(()), Some(Err(err)) => Err(err.clone()),
None => match inner_state.p.consider_produced(amount).await {
Ok(()) => Ok(()),
Err(err) => {
inner_state.last = Some(Err(err.clone()));
Err(err)
}
},
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use core::time::Duration;
use smol::{block_on, Timer};
use ufotofu::{
producer::{TestProducer, TestProducerBuilder},
Producer,
};
#[test]
fn test_shared_producer() {
let underlying_p: TestProducer<u8, (), i16> =
TestProducerBuilder::new(vec![1, 2, 3].into(), Err(-17)).build();
let state = State::new(underlying_p);
let shared1 = SharedProducer::new(&state);
let shared2 = shared1.clone();
let read_some_items1 = async {
{
let mut p_handle = shared1.access_producer().await;
Timer::after(Duration::from_millis(50)).await; assert_eq!(Ok(Left(1)), p_handle.produce().await);
}
Timer::after(Duration::from_millis(50)).await;
{
let mut p_handle = shared1.access_producer().await;
assert_eq!(Ok(Left(3)), p_handle.produce().await);
assert_eq!(Err(-17), p_handle.produce().await);
}
};
let read_some_items2 = async {
Timer::after(Duration::from_millis(10)).await;
{
let mut p_handle = shared2.access_producer().await;
assert_eq!(Ok(Left(2)), p_handle.produce().await);
}
Timer::after(Duration::from_millis(50)).await;
let mut p_handle = shared2.access_producer().await;
assert_eq!(Err(-17), p_handle.produce().await); };
block_on(futures::future::join(read_some_items1, read_some_items2));
}
}