1use std::collections::VecDeque;
12use std::sync::Mutex;
13
14use anyhow::Result;
15use async_trait::async_trait;
16use folk_protocol::RpcMessage;
17
18#[async_trait]
24pub trait WorkerHandle: Send + 'static {
25 fn pid(&self) -> u32;
27
28 async fn send_task(&mut self, msg: RpcMessage) -> Result<()>;
30
31 async fn recv_task(&mut self) -> Result<Option<RpcMessage>>;
33
34 async fn send_control(&mut self, msg: RpcMessage) -> Result<()>;
36
37 async fn recv_control(&mut self) -> Result<Option<RpcMessage>>;
39
40 async fn terminate(&mut self) -> Result<()>;
43}
44
45#[async_trait]
47pub trait Runtime: Send + Sync + 'static {
48 async fn spawn(&self) -> Result<Box<dyn WorkerHandle>>;
54}
55
56pub struct MockRuntime {
61 responder: std::sync::Arc<dyn Fn(&RpcMessage) -> RpcMessage + Send + Sync>,
62 next_pid: std::sync::atomic::AtomicU32,
63}
64
65impl MockRuntime {
66 pub fn echo() -> Self {
68 Self {
69 responder: std::sync::Arc::new(|msg| match msg {
70 RpcMessage::Request { msgid, params, .. } => {
71 RpcMessage::response_ok(*msgid, params.clone())
72 },
73 _ => RpcMessage::notify("mock.unsupported", rmpv::Value::Nil),
74 }),
75 next_pid: std::sync::atomic::AtomicU32::new(10000),
76 }
77 }
78}
79
80#[async_trait]
81impl Runtime for MockRuntime {
82 async fn spawn(&self) -> Result<Box<dyn WorkerHandle>> {
83 let pid = self
84 .next_pid
85 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
86 Ok(Box::new(MockWorker {
87 pid,
88 responder: self.responder.clone(),
89 inbound_task: Mutex::new(VecDeque::new()),
90 inbound_control: Mutex::new({
91 let mut q = VecDeque::new();
92 q.push_back(RpcMessage::notify(
94 "control.ready",
95 rmpv::Value::Map(vec![(
96 rmpv::Value::String("pid".into()),
97 rmpv::Value::Integer(pid.into()),
98 )]),
99 ));
100 q
101 }),
102 terminated: false,
103 }))
104 }
105}
106
107pub struct MockWorker {
109 pid: u32,
110 responder: std::sync::Arc<dyn Fn(&RpcMessage) -> RpcMessage + Send + Sync>,
111 inbound_task: Mutex<VecDeque<RpcMessage>>,
112 inbound_control: Mutex<VecDeque<RpcMessage>>,
113 terminated: bool,
114}
115
116#[async_trait]
117impl WorkerHandle for MockWorker {
118 fn pid(&self) -> u32 {
119 self.pid
120 }
121
122 async fn send_task(&mut self, msg: RpcMessage) -> Result<()> {
123 if self.terminated {
124 anyhow::bail!("worker terminated");
125 }
126 let response = (self.responder)(&msg);
127 self.inbound_task.lock().unwrap().push_back(response);
128 Ok(())
129 }
130
131 async fn recv_task(&mut self) -> Result<Option<RpcMessage>> {
132 Ok(self.inbound_task.lock().unwrap().pop_front())
133 }
134
135 async fn send_control(&mut self, _msg: RpcMessage) -> Result<()> {
136 Ok(())
137 }
138
139 async fn recv_control(&mut self) -> Result<Option<RpcMessage>> {
140 Ok(self.inbound_control.lock().unwrap().pop_front())
141 }
142
143 async fn terminate(&mut self) -> Result<()> {
144 self.terminated = true;
145 Ok(())
146 }
147}