use crate::task::dyn_task::DynStateMutation;
use crate::{Constraint, ConstraitType, TaskId};
use futures::stream::FuturesUnordered;
use std::any::TypeId;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::task::{JoinError, JoinHandle};
use tokio_stream::StreamExt;
pub(crate) struct TaskList<Bkend, Frntend, Md> {
pub inner: Vec<SpawnedTask<Bkend, Frntend, Md>>,
}
pub(crate) struct SpawnedTask<Frntend, Bkend, Md> {
pub(crate) type_id: TypeId,
pub(crate) type_name: &'static str,
pub(crate) type_debug: Arc<String>,
pub(crate) receiver: TaskWaiter<Frntend, Bkend, Md>,
pub(crate) task_id: TaskId,
pub(crate) metadata: Vec<Md>,
}
#[derive(Debug, Clone)]
pub struct TaskInformation<'a, Cstrnt> {
pub type_id: TypeId,
pub type_name: &'static str,
pub type_debug: &'a str,
pub constraint: &'a Option<Constraint<Cstrnt>>,
}
pub(crate) enum TaskWaiter<Frntend, Bkend, Md> {
Future(JoinHandle<DynStateMutation<Frntend, Bkend, Md>>),
Stream {
receiver: mpsc::Receiver<DynStateMutation<Frntend, Bkend, Md>>,
join_handle: JoinHandle<()>,
},
}
impl<Frntend, Bkend, Md> TaskWaiter<Frntend, Bkend, Md> {
fn kill(&mut self) {
match self {
TaskWaiter::Future(handle) => handle.abort(),
TaskWaiter::Stream { join_handle, .. } => join_handle.abort_handle().abort(),
}
}
}
pub enum TaskOutcome<Frntend, Bkend, Md> {
StreamFinished {
type_id: TypeId,
type_name: &'static str,
type_debug: Arc<String>,
task_id: TaskId,
},
StreamPanicked {
error: JoinError,
type_id: TypeId,
type_name: &'static str,
type_debug: Arc<String>,
task_id: TaskId,
},
TaskPanicked {
error: JoinError,
type_id: TypeId,
type_name: &'static str,
type_debug: Arc<String>,
task_id: TaskId,
},
MutationReceived {
mutation: DynStateMutation<Frntend, Bkend, Md>,
type_id: TypeId,
type_name: &'static str,
type_debug: Arc<String>,
task_id: TaskId,
},
}
impl<Bkend, Frntend, Md: PartialEq> TaskList<Frntend, Bkend, Md> {
pub(crate) fn new() -> Self {
Self { inner: vec![] }
}
pub(crate) async fn get_next_response(&mut self) -> Option<TaskOutcome<Frntend, Bkend, Md>> {
let task_completed = self
.inner
.iter_mut()
.enumerate()
.map(|(idx, task)| async move {
match task.receiver {
TaskWaiter::Future(ref mut receiver) => match receiver.await {
Ok(mutation) => (
Some(idx),
TaskOutcome::MutationReceived {
mutation,
type_id: task.type_id,
type_debug: task.type_debug.clone(),
task_id: task.task_id,
type_name: task.type_name,
},
),
Err(error) => (
Some(idx),
TaskOutcome::TaskPanicked {
type_id: task.type_id,
type_name: task.type_name,
type_debug: task.type_debug.clone(),
task_id: task.task_id,
error,
},
),
},
TaskWaiter::Stream {
ref mut receiver,
ref mut join_handle,
} => {
if let Some(mutation) = receiver.recv().await {
return (
None,
TaskOutcome::MutationReceived {
mutation,
type_id: task.type_id,
type_name: task.type_name,
task_id: task.task_id,
type_debug: task.type_debug.clone(),
},
);
};
match join_handle.await {
Err(error) if error.is_panic() => (
Some(idx),
TaskOutcome::StreamPanicked {
error,
type_id: task.type_id,
type_name: task.type_name,
type_debug: task.type_debug.clone(),
task_id: task.task_id,
},
),
_ => (
Some(idx),
TaskOutcome::StreamFinished {
type_id: task.type_id,
type_name: task.type_name,
type_debug: task.type_debug.clone(),
task_id: task.task_id,
},
),
}
}
}
})
.collect::<FuturesUnordered<_>>()
.next()
.await;
let (maybe_completed_idx, outcome) = task_completed?;
if let Some(completed_idx) = maybe_completed_idx {
self.inner.swap_remove(completed_idx);
};
Some(outcome)
}
pub(crate) fn push(&mut self, task: SpawnedTask<Frntend, Bkend, Md>) {
self.inner.push(task)
}
pub(crate) fn handle_constraint(&mut self, constraint: Constraint<Md>, type_id: TypeId) {
let task_doesnt_match_constraint = |task: &SpawnedTask<_, _, _>| task.type_id != type_id;
let task_doesnt_match_metadata =
|task: &SpawnedTask<_, _, _>, constraint| !task.metadata.contains(constraint);
match constraint.constraint_type {
ConstraitType::BlockMatchingMetatdata(metadata) => self
.inner
.retain(|task| task_doesnt_match_metadata(task, &metadata)),
ConstraitType::BlockSameType => {
self.inner.retain(task_doesnt_match_constraint);
}
ConstraitType::KillSameType => self.inner.retain_mut(|task| {
if !task_doesnt_match_constraint(task) {
task.receiver.kill();
return false;
}
true
}),
}
}
}