use std::marker::PhantomData;
use dvcompute_utils::simulation::arrival::*;
use dvcompute_utils::grc::Grc;
use dvcompute::simulation::simulation::*;
use dvcompute::simulation::event::*;
use dvcompute::simulation::observable::*;
use dvcompute::simulation::observable::observer::*;
use dvcompute::simulation::observable::disposable::*;
use dvcompute::simulation::stream::*;
use crate::simulation::block::*;
use crate::simulation::block::ops::*;
use crate::simulation::transact::*;
pub trait GeneratorBlock<T>: Block<Input = BlockFn<T, ()>, Output = ()> where T: 'static {}
#[inline]
pub fn observable_generator_block0<T, O>(observable: O) -> impl GeneratorBlock<Transact<T>>
where O: Observable<Message = Arrival<T>> + 'static,
T: Clone + 'static
{
observable_generator_block(observable, 0)
}
#[inline]
pub fn observable_generator_block<T, O>(observable: O, priority: isize) -> impl GeneratorBlock<Transact<T>>
where O: Observable<Message = Arrival<T>> + 'static,
T: Clone + 'static
{
observable_generator_block_c(observable, move || { return_event(priority) })
}
#[inline]
pub fn observable_generator_block_c<T, O, F, M>(observable: O, priority: F) -> impl GeneratorBlock<Transact<T>>
where O: Observable<Message = Arrival<T>> + 'static,
F: Fn() -> M + 'static,
M: Event<Item = isize> + 'static,
T: Clone + 'static
{
ObservableGenerator { observable: observable, priority: priority, _phantom1: PhantomData, _phantom2: PhantomData }
}
#[inline]
pub fn stream_generator_block0<T>(stream: Stream<Arrival<T>>) -> impl GeneratorBlock<Transact<T>>
where T: Clone + 'static
{
stream_generator_block(stream, 0)
}
#[inline]
pub fn stream_generator_block<T>(stream: Stream<Arrival<T>>, priority: isize) -> impl GeneratorBlock<Transact<T>>
where T: Clone + 'static
{
stream_generator_block_c(stream, move || { return_event(priority) })
}
#[inline]
pub fn stream_generator_block_c<T, F, M>(stream: Stream<Arrival<T>>, priority: F) -> impl GeneratorBlock<Transact<T>>
where F: Fn() -> M + 'static,
M: Event<Item = isize> + 'static,
T: Clone + 'static
{
StreamGenerator { stream: stream, priority: priority, _phantom1: PhantomData, _phantom2: PhantomData }
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
struct ObservableGenerator<T, O, F, M> {
observable: O,
priority: F,
_phantom1: PhantomData<M>,
_phantom2: PhantomData<T>
}
impl<T, O, F, M> Block for ObservableGenerator<T, O, F, M>
where O: Observable<Message = Arrival<T>> + 'static,
F: Fn() -> M + 'static,
M: Event<Item = isize> + 'static,
T: Clone + 'static
{
type Input = BlockFn<Transact<T>, ()>;
type Output = ();
#[doc(hidden)]
fn call_block<C>(self, a: Self::Input, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Output>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
let cont = ProcessBoxCont::new(cont);
self.call_block_boxed(a, cont, pid, p)
}
#[doc(hidden)]
fn call_block_boxed(self, a: Self::Input, cont: ProcessBoxCont<Self::Output>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
if is_process_cancelled(&pid, p) {
revoke_process_boxed(cont, pid, p)
} else {
let ObservableGenerator { observable, priority, _phantom1, _phantom2 } = self;
let h = {
observable.subscribe({
cons_observer(move |arrival: &Arrival<T>, p| {
let chain = a.clone();
priority()
.and_then(move |priority| {
Transact::new(arrival.clone(), priority)
.into_event()
.and_then(move |t| {
TransactId::take(t.transact_id.clone())
.and_then(move |()| {
let block = chain.next();
block.run(t)
})
.run()
})
})
.call_event(p)
})
}).call_event(p)?
};
never_process()
.finally({
h.into_event().into_process()
})
.call_process_boxed(cont, pid, p)
}
}
}
impl<T, O, F, M> GeneratorBlock<Transact<T>> for ObservableGenerator<T, O, F, M>
where O: Observable<Message = Arrival<T>> + 'static,
F: Fn() -> M + 'static,
M: Event<Item = isize> + 'static,
T: Clone + 'static
{}
#[must_use = "computations are lazy and do nothing unless to be run"]
struct StreamGenerator<T, F, M> where T: 'static {
stream: Stream<Arrival<T>>,
priority: F,
_phantom1: PhantomData<M>,
_phantom2: PhantomData<T>
}
impl<T, F, M> Block for StreamGenerator<T, F, M>
where F: Fn() -> M + 'static,
M: Event<Item = isize> + 'static,
T: Clone + 'static
{
type Input = BlockFn<Transact<T>, ()>;
type Output = ();
#[doc(hidden)]
fn call_block<C>(self, a: Self::Input, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Output>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
let cont = ProcessBoxCont::new(cont);
self.call_block_boxed(a, cont, pid, p)
}
#[doc(hidden)]
fn call_block_boxed(self, a: Self::Input, cont: ProcessBoxCont<Self::Output>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
if is_process_cancelled(&pid, p) {
revoke_process_boxed(cont, pid, p)
} else {
let StreamGenerator { stream, priority, _phantom1, _phantom2 } = self;
generator_stream_loop(stream, priority, a)
.call_process_boxed(cont, pid, p)
}
}
}
fn generator_stream_loop<T, F, M>(stream: Stream<Arrival<T>>, priority: F, a: BlockFn<Transact<T>, ()>) -> ProcessBox<()>
where F: Fn() -> M + 'static,
M: Event<Item = isize> + 'static,
T: Clone + 'static
{
let Stream::Cons(comp) = stream;
comp.and_then(move |(arrival, xs)| {
let chain = a.clone();
priority().and_then(move |priority| {
Transact::new(arrival, priority)
.into_event()
.and_then(move |t| {
TransactId::take(t.transact_id.clone())
.and_then(move |()| {
let block = chain.next();
block.run(t)
})
.run()
})
})
.into_process()
.and_then(move |()| {
generator_stream_loop(xs, priority, a)
})
}).into_boxed()
}
impl<T, F, M> GeneratorBlock<Transact<T>> for StreamGenerator<T, F, M>
where F: Fn() -> M + 'static,
M: Event<Item = isize> + 'static,
T: Clone + 'static
{}