use std::sync::Arc;
use crate::frame::MappableFrame;
use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, StreamExt};
use tokio::sync::{mpsc, oneshot};
pub mod compose;
pub(crate) type Frame<F, X> = <F as MappableFrame>::Frame<X>;
pub trait MappableFrameRef: MappableFrame {
type RefFrameToken<'a>: MappableFrame;
fn as_ref<X>(input: &Self::Frame<X>) -> <Self::RefFrameToken<'_> as MappableFrame>::Frame<&X>;
}
pub trait AsyncMappableFrame: MappableFrame {
fn map_frame_async<'a, A, B, E>(
input: Self::Frame<A>,
f: impl Fn(A) -> BoxFuture<'a, Result<B, E>> + Send + Sync + 'a,
) -> BoxFuture<'a, Result<Self::Frame<B>, E>>
where
E: Send + 'a,
A: Send + 'a,
B: Send + 'a;
}
pub async fn expand_and_collapse_async<'a, Seed, Out, E, F>(
seed: Seed,
expand_frame: impl Fn(Seed) -> BoxFuture<'a, Result<Frame<F, Seed>, E>> + Send + Sync + 'a,
collapse_frame: impl Fn(Frame<F, Out>) -> BoxFuture<'a, Result<Out, E>> + Send + Sync + 'a,
) -> Result<Out, E>
where
E: Send + Sync + 'a,
F: AsyncMappableFrame + 'a,
Seed: Send + Sync + 'a,
Out: Send + Sync + 'a,
Frame<F, Seed>: Send + Sync + 'a,
Frame<F, Out>: Send + Sync + 'a,
{
let expand_frame = Arc::new(expand_frame);
let collapse_frame = Arc::new(collapse_frame);
let mut work_pool: FuturesUnordered<BoxFuture<'a, Result<(), E>>> = FuturesUnordered::new();
let (work_sender, mut work_receiver) = mpsc::channel(1024);
let (root_step, root_receiver) = Step::new(seed, work_sender);
work_pool.push(
root_step
.run::<'a, F, E>(expand_frame.clone(), collapse_frame.clone())
.boxed(),
);
loop {
tokio::select! {
Some(work) = work_receiver.recv() => work_pool.push(work.run::<F, E>(expand_frame.clone(), collapse_frame.clone()).boxed()),
Some(completion) = work_pool.next() => match completion{
Ok(_) => continue,
Err(e) => return Err(e),
},
else => break,
}
}
Ok(root_receiver.await.unwrap()) }
struct Step<Seed, Out> {
seed: Seed,
completion_sender: oneshot::Sender<Out>,
work_queue: mpsc::Sender<Self>,
}
impl<Seed: Sync + Send, Out: Sync + Send> Step<Seed, Out> {
fn new(seed: Seed, work_queue: mpsc::Sender<Self>) -> (Self, oneshot::Receiver<Out>) {
let (sender, receiver) = oneshot::channel();
let item = Step {
seed,
completion_sender: sender,
work_queue,
};
(item, receiver)
}
async fn run<'a, F: AsyncMappableFrame, E: Send + Sync + 'a>(
self,
expand_frame: Arc<
dyn Fn(Seed) -> BoxFuture<'a, Result<Frame<F, Seed>, E>> + Send + Sync + 'a,
>,
collapse_frame: Arc<
dyn Fn(Frame<F, Out>) -> BoxFuture<'a, Result<Out, E>> + Send + Sync + 'a,
>,
) -> Result<(), E> {
let frame = expand_frame(self.seed).await?;
let node = F::map_frame_async(frame, |seed| {
async {
let (step, receiver) = Step::new(seed, self.work_queue.clone());
self.work_queue.send(step).await.ok().expect("mpsc error");
let recvd: Out = receiver.await.expect("oneshot recv error");
Ok(recvd)
}
.boxed()
})
.await?;
let collapsed = collapse_frame(node).await?;
self.completion_sender
.send(collapsed)
.ok()
.expect("oneshot send failure");
Ok(())
}
}