sim_lib_server/
coroutine.rs1use std::sync::{
2 Arc, Mutex,
3 atomic::{AtomicU8, AtomicU64, Ordering},
4};
5
6use sim_citizen_derive::non_citizen;
7use sim_kernel::{Args, ClassRef, Cx, Error, Object, Result, Symbol, Value};
8
9use crate::ServerAddress;
10
11static NEXT_COROUTINE_ID: AtomicU64 = AtomicU64::new(1);
12
13#[derive(Clone, Copy, Debug, PartialEq, Eq)]
14pub enum CoroutineStatus {
16 Running,
18 Suspended,
20 Done,
22}
23
24impl CoroutineStatus {
25 fn as_u8(self) -> u8 {
26 match self {
27 Self::Running => 0,
28 Self::Suspended => 1,
29 Self::Done => 2,
30 }
31 }
32
33 fn from_u8(value: u8) -> Self {
34 match value {
35 0 => Self::Running,
36 1 => Self::Suspended,
37 2 => Self::Done,
38 _ => Self::Done,
39 }
40 }
41
42 pub fn as_symbol(self) -> Symbol {
44 Symbol::new(match self {
45 Self::Running => "running",
46 Self::Suspended => "suspended",
47 Self::Done => "done",
48 })
49 }
50}
51
52#[derive(Default)]
53struct CoroutineState {
54 mailbox: Option<Value>,
55 yielded: Option<Value>,
56 cancelled: bool,
57}
58
59struct CoroutineInner {
60 id: u64,
61 address: ServerAddress,
62 handler: Value,
63 status: AtomicU8,
64 state: Mutex<CoroutineState>,
65}
66
67#[derive(Clone)]
68#[non_citizen(
69 reason = "live coroutine handle; inspect through server/Frame descriptor and coroutine ops",
70 kind = "handle"
71)]
72pub struct Coroutine {
76 inner: Arc<CoroutineInner>,
77}
78
79impl Coroutine {
80 pub fn new(address: ServerAddress, handler: Value) -> Self {
82 Self {
83 inner: Arc::new(CoroutineInner {
84 id: NEXT_COROUTINE_ID.fetch_add(1, Ordering::Relaxed),
85 address,
86 handler,
87 status: AtomicU8::new(CoroutineStatus::Suspended.as_u8()),
88 state: Mutex::new(CoroutineState::default()),
89 }),
90 }
91 }
92
93 pub fn id(&self) -> u64 {
95 self.inner.id
96 }
97
98 pub fn address(&self) -> &ServerAddress {
100 &self.inner.address
101 }
102
103 pub fn status(&self) -> CoroutineStatus {
105 CoroutineStatus::from_u8(self.inner.status.load(Ordering::Relaxed))
106 }
107
108 pub fn resume(&self, cx: &mut Cx, input: Value) -> Result<Value> {
115 if self.status() == CoroutineStatus::Done {
116 return Err(Error::Eval("coroutine is done".to_owned()));
117 }
118
119 {
120 let mut state = self
121 .inner
122 .state
123 .lock()
124 .map_err(|_| Error::PoisonedLock("coroutine"))?;
125 if state.cancelled {
126 self.inner
127 .status
128 .store(CoroutineStatus::Done.as_u8(), Ordering::Relaxed);
129 return Err(Error::Eval("coroutine is cancelled".to_owned()));
130 }
131 state.mailbox = Some(input.clone());
132 state.yielded = None;
133 }
134
135 self.inner
136 .status
137 .store(CoroutineStatus::Running.as_u8(), Ordering::Relaxed);
138
139 let handle = cx.factory().opaque(Arc::new(self.clone()))?;
140 let result = cx.call_value(self.inner.handler.clone(), Args::new(vec![handle, input]));
141
142 let yielded = {
143 let mut state = self
144 .inner
145 .state
146 .lock()
147 .map_err(|_| Error::PoisonedLock("coroutine"))?;
148 state.mailbox = None;
149 state.yielded.take()
150 };
151
152 match result {
153 Ok(value) => {
154 if let Some(yielded) = yielded {
155 self.inner
156 .status
157 .store(CoroutineStatus::Suspended.as_u8(), Ordering::Relaxed);
158 Ok(yielded)
159 } else {
160 self.inner
161 .status
162 .store(CoroutineStatus::Done.as_u8(), Ordering::Relaxed);
163 Ok(value)
164 }
165 }
166 Err(err) => {
167 self.inner
168 .status
169 .store(CoroutineStatus::Done.as_u8(), Ordering::Relaxed);
170 Err(err)
171 }
172 }
173 }
174
175 pub fn yield_value(&self, value: Value) -> Result<Value> {
179 let mut state = self
180 .inner
181 .state
182 .lock()
183 .map_err(|_| Error::PoisonedLock("coroutine"))?;
184 if self.status() != CoroutineStatus::Running {
185 return Err(Error::Eval(
186 "server/yield may only be used from a running coroutine".to_owned(),
187 ));
188 }
189 state.yielded = Some(value.clone());
190 Ok(value)
191 }
192
193 pub fn cancel(&self) -> Result<()> {
195 let mut state = self
196 .inner
197 .state
198 .lock()
199 .map_err(|_| Error::PoisonedLock("coroutine"))?;
200 state.cancelled = true;
201 state.mailbox = None;
202 state.yielded = None;
203 self.inner
204 .status
205 .store(CoroutineStatus::Done.as_u8(), Ordering::Relaxed);
206 Ok(())
207 }
208}
209
210impl Object for Coroutine {
211 fn display(&self, _cx: &mut Cx) -> Result<String> {
212 Ok("#<server-coroutine>".to_owned())
213 }
214
215 fn as_any(&self) -> &dyn std::any::Any {
216 self
217 }
218}
219
220impl sim_kernel::ObjectCompat for Coroutine {
221 fn class(&self, cx: &mut Cx) -> Result<ClassRef> {
222 cx.factory().class_stub(
223 sim_kernel::ClassId(0),
224 Symbol::qualified("server", "Coroutine"),
225 )
226 }
227 fn as_expr(&self, cx: &mut Cx) -> Result<sim_kernel::Expr> {
228 self.as_table(cx)?.object().as_expr(cx)
229 }
230 fn as_table(&self, cx: &mut Cx) -> Result<Value> {
231 let state = self
232 .inner
233 .state
234 .lock()
235 .map_err(|_| Error::PoisonedLock("coroutine"))?;
236 let mailbox = match &state.mailbox {
237 Some(mailbox) => mailbox.clone(),
238 None => cx.factory().nil()?,
239 };
240 let cancelled = state.cancelled;
241 drop(state);
242 let address = self.inner.address.as_value(cx)?;
243 cx.factory().table(vec![
244 (
245 Symbol::new("kind"),
246 cx.factory().symbol(Symbol::new("coroutine"))?,
247 ),
248 (
249 Symbol::new("id"),
250 cx.factory().string(self.inner.id.to_string())?,
251 ),
252 (
253 Symbol::new("status"),
254 cx.factory().symbol(self.status().as_symbol())?,
255 ),
256 (Symbol::new("address"), address),
257 (Symbol::new("mailbox"), mailbox),
258 (Symbol::new("cancelled"), cx.factory().bool(cancelled)?),
259 ])
260 }
261}