use futures::{
stream::{select_all, Fuse, FusedStream, FuturesUnordered, SelectAll},
FutureExt, Stream, StreamExt,
};
use instant::Instant;
use pin_project::pin_project;
use tokio::sync::{broadcast, mpsc::UnboundedReceiver};
use tokio_stream::wrappers::UnboundedReceiverStream;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use crate::{
compat::{
sleep_until, spawn_task, Sendable, SendableFusedStream, SendableFuture, SendableStream,
SendableWrapper, Sleep,
},
ActorState, Transient,
};
type FuturesCollection<T> = FuturesUnordered<Pin<Box<dyn SendableFuture<Output = T>>>>;
enum SchedulerStatus {
Alive,
Marked,
MarkedToFinish,
}
#[allow(missing_debug_implementations)]
pub struct Scheduler<A: ActorState> {
recv: SendableWrapper<SelectAll<Fuse<ActorStream<A::Message>>>>,
queue: SendableWrapper<FuturesCollection<A::Message>>,
tasks: SendableWrapper<FuturesCollection<()>>,
outbound: Option<SendableWrapper<OutboundQueue<A::Output>>>,
stream_count: usize,
future_count: usize,
status: SchedulerStatus,
}
struct OutboundQueue<M> {
send: broadcast::Sender<SendableWrapper<M>>,
}
impl<M: Sendable + Clone> OutboundQueue<M> {
fn new(send: broadcast::Sender<SendableWrapper<M>>) -> Self {
Self { send }
}
fn send(&mut self, msg: M) {
let _ = self.send.send(SendableWrapper::new(msg));
}
}
pub(crate) struct ActorRunner<A: ActorState> {
state: A,
scheduler: Scheduler<A>,
}
impl<A: ActorState> ActorRunner<A> {
pub(crate) fn new(state: A) -> Self {
let scheduler = Scheduler::new();
Self { state, scheduler }
}
pub(crate) fn add_broadcaster(&mut self, broad: broadcast::Sender<SendableWrapper<A::Output>>) {
self.scheduler.outbound = Some(SendableWrapper::new(OutboundQueue::new(broad)));
}
pub(crate) fn add_stream(&mut self, stream: ActorStream<A::Message>) {
self.scheduler.attach_stream_inner(stream);
}
pub(crate) fn launch(self) {
spawn_task(self.run())
}
async fn run(mut self) {
self.state.start_up(&mut self.scheduler).await;
loop {
match self.scheduler.next().await {
Some(msg) => self.state.process(&mut self.scheduler, msg).await,
None => return self.close().await,
}
}
}
async fn close(self) {
let Self {
state,
mut scheduler,
} = self;
state.finalize(&mut scheduler).await;
scheduler.finalize();
}
}
impl<A: ActorState> Scheduler<A> {
fn new() -> Self {
let recv = SendableWrapper::new(select_all([]));
let queue = SendableWrapper::new(FuturesCollection::new());
let tasks = SendableWrapper::new(FuturesCollection::new());
Self {
recv,
queue,
tasks,
outbound: None,
stream_count: 0,
future_count: 0,
status: SchedulerStatus::Alive,
}
}
fn is_dead(&self) -> bool {
self.stream_count + self.future_count == 0
|| matches!(
self.status,
SchedulerStatus::Marked | SchedulerStatus::MarkedToFinish
)
}
fn finalize(self) {
if matches!(
self.status,
SchedulerStatus::Alive | SchedulerStatus::MarkedToFinish
) {
spawn_task(poll_to_completion(self.tasks.take()))
}
}
async fn next(&mut self) -> Option<A::Message> {
loop {
if self.is_dead() {
return None;
}
tokio::select! {
msg = self.recv.next(), if self.stream_count != 0 => {
match msg {
Some(msg) => return Some(msg),
None => {
self.stream_count -= 1;
}
}
},
msg = self.queue.next(), if self.future_count != 0 => {
self.future_count -= 1;
return msg
},
_ = self.tasks.next(), if !self.tasks.is_empty() => {},
else => {
return None
}
}
}
}
pub fn queue_task<F, I>(&mut self, fut: F)
where
F: Sendable + Future<Output = I>,
I: 'static + Into<A::Message>,
{
self.future_count += 1;
self.queue.push(Box::pin(fut.map(Into::into)));
}
pub fn manage_future<F, T>(&mut self, fut: F)
where
F: Sendable + Future<Output = T>,
T: Sendable,
{
self.tasks.push(Box::pin(fut.map(drop)));
}
pub fn attach_stream<S, I>(&mut self, stream: S)
where
S: SendableStream<Item = I> + FusedStream,
I: Into<A::Message>,
{
let stream = ActorStream::Secondary(Box::new(stream.map(|m| m.into())));
self.attach_stream_inner(stream)
}
fn attach_stream_inner(&mut self, stream: ActorStream<A::Message>) {
self.stream_count += 1;
self.recv.push(stream.fuse());
}
pub fn schedule<M>(&mut self, deadline: Instant, msg: M)
where
M: Sendable + Into<A::Message>,
{
self.future_count += 1;
self.queue.push(Box::pin(Timer::new(deadline, msg.into())));
}
pub fn broadcast<M>(&mut self, msg: M)
where
M: Into<A::Output>,
{
if let Some(out) = self.outbound.as_mut() {
out.send(msg.into())
}
}
}
impl<A> Scheduler<A>
where
A: ActorState<Permanence = Transient>,
{
pub fn shutdown(&mut self) {
self.status = SchedulerStatus::Marked;
}
pub fn shutdown_and_finish(&mut self) {
self.status = SchedulerStatus::MarkedToFinish;
}
}
impl<M: Sendable> From<UnboundedReceiver<M>> for ActorStream<M> {
fn from(value: UnboundedReceiver<M>) -> Self {
Self::Main(UnboundedReceiverStream::new(value))
}
}
pub(crate) enum ActorStream<M> {
Main(UnboundedReceiverStream<M>),
Secondary(Box<dyn SendableFusedStream<Item = M>>),
}
impl<M: Sendable> Stream for ActorStream<M> {
type Item = M;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match *self {
ActorStream::Main(ref mut stream) => Pin::new(stream).poll_next(cx),
ActorStream::Secondary(ref mut stream) => Pin::new(stream).poll_next(cx),
}
}
}
async fn poll_to_completion<S>(mut stream: S)
where
S: SendableFusedStream,
{
loop {
if stream.next().await.is_none() {
return;
}
}
}
#[pin_project]
#[allow(missing_debug_implementations)]
pub(crate) struct Timer<T> {
#[pin]
deadline: Sleep,
msg: Option<T>,
}
impl<T> Timer<T> {
pub(crate) fn new(deadline: Instant, msg: T) -> Self {
Self {
deadline: sleep_until(deadline),
msg: Some(msg),
}
}
}
impl<T> Future for Timer<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match this.deadline.poll(cx) {
Poll::Ready(()) => Poll::Ready(this.msg.take().unwrap()),
Poll::Pending => Poll::Pending,
}
}
}