use std::collections::VecDeque;
use std::sync::Mutex;
use anyhow::Result;
use async_trait::async_trait;
use folk_protocol::RpcMessage;
#[async_trait]
pub trait WorkerHandle: Send + 'static {
fn pid(&self) -> u32;
async fn send_task(&mut self, msg: RpcMessage) -> Result<()>;
async fn recv_task(&mut self) -> Result<Option<RpcMessage>>;
async fn send_control(&mut self, msg: RpcMessage) -> Result<()>;
async fn recv_control(&mut self) -> Result<Option<RpcMessage>>;
async fn terminate(&mut self) -> Result<()>;
}
#[async_trait]
pub trait Runtime: Send + Sync + 'static {
async fn spawn(&self) -> Result<Box<dyn WorkerHandle>>;
}
pub struct MockRuntime {
responder: std::sync::Arc<dyn Fn(&RpcMessage) -> RpcMessage + Send + Sync>,
next_pid: std::sync::atomic::AtomicU32,
}
impl MockRuntime {
pub fn echo() -> Self {
Self {
responder: std::sync::Arc::new(|msg| match msg {
RpcMessage::Request { msgid, params, .. } => {
RpcMessage::response_ok(*msgid, params.clone())
},
_ => RpcMessage::notify("mock.unsupported", rmpv::Value::Nil),
}),
next_pid: std::sync::atomic::AtomicU32::new(10000),
}
}
}
#[async_trait]
impl Runtime for MockRuntime {
async fn spawn(&self) -> Result<Box<dyn WorkerHandle>> {
let pid = self
.next_pid
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Ok(Box::new(MockWorker {
pid,
responder: self.responder.clone(),
inbound_task: Mutex::new(VecDeque::new()),
inbound_control: Mutex::new({
let mut q = VecDeque::new();
q.push_back(RpcMessage::notify(
"control.ready",
rmpv::Value::Map(vec![(
rmpv::Value::String("pid".into()),
rmpv::Value::Integer(pid.into()),
)]),
));
q
}),
terminated: false,
}))
}
}
pub struct MockWorker {
pid: u32,
responder: std::sync::Arc<dyn Fn(&RpcMessage) -> RpcMessage + Send + Sync>,
inbound_task: Mutex<VecDeque<RpcMessage>>,
inbound_control: Mutex<VecDeque<RpcMessage>>,
terminated: bool,
}
#[async_trait]
impl WorkerHandle for MockWorker {
fn pid(&self) -> u32 {
self.pid
}
async fn send_task(&mut self, msg: RpcMessage) -> Result<()> {
if self.terminated {
anyhow::bail!("worker terminated");
}
let response = (self.responder)(&msg);
self.inbound_task.lock().unwrap().push_back(response);
Ok(())
}
async fn recv_task(&mut self) -> Result<Option<RpcMessage>> {
Ok(self.inbound_task.lock().unwrap().pop_front())
}
async fn send_control(&mut self, _msg: RpcMessage) -> Result<()> {
Ok(())
}
async fn recv_control(&mut self) -> Result<Option<RpcMessage>> {
Ok(self.inbound_control.lock().unwrap().pop_front())
}
async fn terminate(&mut self) -> Result<()> {
self.terminated = true;
Ok(())
}
}