use std::{
any::Any,
fmt::Debug,
ops::Deref,
sync::{Arc, RwLock, atomic::AtomicUsize},
};
use tokio::sync::mpsc::UnboundedSender;
use crate::term::Parser;
use super::task::{Task, TaskCmd, TaskDef, TaskId, TaskStatus};
use super::task_path::TaskPath;
pub struct KernelMessage {
pub from: TaskId,
pub command: KernelCommand,
}
pub enum KernelCommand {
Quit,
RegisterTask(
TaskId,
TaskDef,
Box<dyn FnOnce(TaskContext) -> Box<dyn Task> + Send>,
),
RemoveTask(TaskId),
TaskCmd(TaskId, TaskCmd),
TaskCmdByPath(TaskPath, TaskCmd),
SetTaskPath(TaskId, TaskPath),
Query(
KernelQuery,
tokio::sync::oneshot::Sender<KernelQueryResponse>,
),
ListenTaskUpdates,
UnlistenTaskUpdates,
TaskStarted,
TaskStopped(u32),
}
pub enum KernelQuery {
ListTasks(Option<String>),
ResolvePath(TaskPath),
GetScreen(TaskPath),
}
pub enum KernelQueryResponse {
TaskList(Vec<TaskInfo>),
ResolvedPath(Option<TaskId>),
Screen(Option<String>),
}
#[derive(Clone, Debug)]
pub struct TaskInfo {
pub id: TaskId,
pub path: Option<TaskPath>,
pub status: TaskStatus,
pub vt: Option<SharedVt>,
}
#[derive(Clone)]
pub struct SharedVt(Arc<RwLock<Parser>>);
impl SharedVt {
pub fn new(parser: Parser) -> Self {
SharedVt(Arc::new(RwLock::new(parser)))
}
}
impl Debug for SharedVt {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("SharedVt").finish()
}
}
impl Deref for SharedVt {
type Target = Arc<RwLock<Parser>>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[derive(Clone)]
pub struct TaskContext {
next_task_id: Arc<AtomicUsize>,
sender: UnboundedSender<KernelMessage>,
pub task_id: TaskId,
}
impl TaskContext {
pub fn new(
next_task_id: Arc<AtomicUsize>,
task_id: TaskId,
sender: UnboundedSender<KernelMessage>,
) -> Self {
Self {
next_task_id,
sender,
task_id,
}
}
pub fn send(&self, command: KernelCommand) {
if let Err(_err) = self.sender.send(KernelMessage {
from: self.task_id,
command,
}) {
log::debug!(
"Failed to send kernel message (task_id: {}). Channel is closed.",
self.task_id.0,
);
}
}
pub fn send_self_custom<T: Any + Send + 'static>(&self, custom: T) {
self.send(KernelCommand::TaskCmd(self.task_id, TaskCmd::msg(custom)));
}
pub fn alloc_id(&self) -> TaskId {
TaskId(
self
.next_task_id
.fetch_add(1, std::sync::atomic::Ordering::Relaxed),
)
}
pub fn register(
&self,
def: TaskDef,
factory: Box<dyn FnOnce(TaskContext) -> Box<dyn Task> + Send>,
) -> TaskId {
let task_id = self.alloc_id();
self.register_with_id(task_id, def, factory)
}
pub fn register_with_id(
&self,
task_id: TaskId,
def: TaskDef,
factory: Box<dyn FnOnce(TaskContext) -> Box<dyn Task> + Send>,
) -> TaskId {
self.send(KernelCommand::RegisterTask(task_id, def, factory));
task_id
}
pub fn spawn_async<F, Fut>(&self, def: TaskDef, f: F) -> TaskId
where
F: FnOnce(TaskContext, tokio::sync::mpsc::UnboundedReceiver<TaskCmd>) -> Fut
+ Send
+ 'static,
Fut: std::future::Future<Output = ()> + Send + 'static,
{
let task_id = self.alloc_id();
self.spawn_async_with_id(task_id, def, f)
}
pub fn spawn_async_with_id<F, Fut>(
&self,
task_id: TaskId,
def: TaskDef,
f: F,
) -> TaskId
where
F: FnOnce(TaskContext, tokio::sync::mpsc::UnboundedReceiver<TaskCmd>) -> Fut
+ Send
+ 'static,
Fut: std::future::Future<Output = ()> + Send + 'static,
{
use super::task::ChannelTask;
self.register_with_id(
task_id,
def,
Box::new(|ctx| {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
tokio::spawn(f(ctx, rx));
Box::new(ChannelTask::new(tx))
}),
)
}
pub fn send_to_path(&self, path: TaskPath, cmd: TaskCmd) {
self.send(KernelCommand::TaskCmdByPath(path, cmd));
}
pub fn set_task_path(&self, task_id: TaskId, path: TaskPath) {
self.send(KernelCommand::SetTaskPath(task_id, path));
}
pub fn query(
&self,
query: KernelQuery,
) -> tokio::sync::oneshot::Receiver<KernelQueryResponse> {
let (tx, rx) = tokio::sync::oneshot::channel();
self.send(KernelCommand::Query(query, tx));
rx
}
pub fn get_task_sender(&self, target_id: TaskId) -> TaskSender {
TaskSender {
task_id: target_id,
from_id: self.task_id,
sender: self.sender.clone(),
}
}
}
#[derive(Clone)]
pub struct TaskSender {
pub task_id: TaskId,
pub from_id: TaskId,
sender: UnboundedSender<KernelMessage>,
}
impl TaskSender {
pub fn send(&self, cmd: TaskCmd) {
let r = self.sender.send(KernelMessage {
from: self.from_id,
command: KernelCommand::TaskCmd(self.task_id, cmd),
});
if let Err(_err) = r {
log::debug!(
"TaskSender.send() to closed channel. from_id:{} task_id:{}",
self.from_id.0,
self.task_id.0
);
}
}
}