Skip to main content

sim_lib_server/
coroutine.rs

1use std::sync::{
2    Arc, Mutex,
3    atomic::{AtomicU8, AtomicU64, Ordering},
4};
5
6use sim_citizen_derive::non_citizen;
7use sim_kernel::{Args, ClassRef, Cx, Error, Object, Result, Symbol, Value};
8
9use crate::ServerAddress;
10
11static NEXT_COROUTINE_ID: AtomicU64 = AtomicU64::new(1);
12
13#[derive(Clone, Copy, Debug, PartialEq, Eq)]
14/// Lifecycle state of a [`Coroutine`].
15pub enum CoroutineStatus {
16    /// The coroutine handler is currently executing.
17    Running,
18    /// The coroutine has yielded and is waiting to be resumed.
19    Suspended,
20    /// The coroutine has finished, errored, or been cancelled.
21    Done,
22}
23
24impl CoroutineStatus {
25    fn as_u8(self) -> u8 {
26        match self {
27            Self::Running => 0,
28            Self::Suspended => 1,
29            Self::Done => 2,
30        }
31    }
32
33    fn from_u8(value: u8) -> Self {
34        match value {
35            0 => Self::Running,
36            1 => Self::Suspended,
37            2 => Self::Done,
38            _ => Self::Done,
39        }
40    }
41
42    /// Returns the status as its `running`/`suspended`/`done` symbol.
43    pub fn as_symbol(self) -> Symbol {
44        Symbol::new(match self {
45            Self::Running => "running",
46            Self::Suspended => "suspended",
47            Self::Done => "done",
48        })
49    }
50}
51
52#[derive(Default)]
53struct CoroutineState {
54    mailbox: Option<Value>,
55    yielded: Option<Value>,
56    cancelled: bool,
57}
58
59struct CoroutineInner {
60    id: u64,
61    address: ServerAddress,
62    handler: Value,
63    status: AtomicU8,
64    state: Mutex<CoroutineState>,
65}
66
67#[derive(Clone)]
68#[non_citizen(
69    reason = "live coroutine handle; inspect through server/Frame descriptor and coroutine ops",
70    kind = "handle"
71)]
72/// Live, resumable coroutine bound to a server address and handler value.
73///
74/// Cloning shares the same underlying state across handles.
75pub struct Coroutine {
76    inner: Arc<CoroutineInner>,
77}
78
79impl Coroutine {
80    /// Creates a suspended coroutine for `address` driven by `handler`.
81    pub fn new(address: ServerAddress, handler: Value) -> Self {
82        Self {
83            inner: Arc::new(CoroutineInner {
84                id: NEXT_COROUTINE_ID.fetch_add(1, Ordering::Relaxed),
85                address,
86                handler,
87                status: AtomicU8::new(CoroutineStatus::Suspended.as_u8()),
88                state: Mutex::new(CoroutineState::default()),
89            }),
90        }
91    }
92
93    /// Returns the unique, process-local coroutine id.
94    pub fn id(&self) -> u64 {
95        self.inner.id
96    }
97
98    /// Returns the server address this coroutine is bound to.
99    pub fn address(&self) -> &ServerAddress {
100        &self.inner.address
101    }
102
103    /// Returns the current lifecycle status.
104    pub fn status(&self) -> CoroutineStatus {
105        CoroutineStatus::from_u8(self.inner.status.load(Ordering::Relaxed))
106    }
107
108    /// Resumes the coroutine, delivering `input` and running until it yields or
109    /// finishes.
110    ///
111    /// Returns the yielded value if the handler yielded, otherwise the
112    /// handler's final result. Errors if the coroutine is already done or has
113    /// been cancelled.
114    pub fn resume(&self, cx: &mut Cx, input: Value) -> Result<Value> {
115        if self.status() == CoroutineStatus::Done {
116            return Err(Error::Eval("coroutine is done".to_owned()));
117        }
118
119        {
120            let mut state = self
121                .inner
122                .state
123                .lock()
124                .map_err(|_| Error::PoisonedLock("coroutine"))?;
125            if state.cancelled {
126                self.inner
127                    .status
128                    .store(CoroutineStatus::Done.as_u8(), Ordering::Relaxed);
129                return Err(Error::Eval("coroutine is cancelled".to_owned()));
130            }
131            state.mailbox = Some(input.clone());
132            state.yielded = None;
133        }
134
135        self.inner
136            .status
137            .store(CoroutineStatus::Running.as_u8(), Ordering::Relaxed);
138
139        let handle = cx.factory().opaque(Arc::new(self.clone()))?;
140        let result = cx.call_value(self.inner.handler.clone(), Args::new(vec![handle, input]));
141
142        let yielded = {
143            let mut state = self
144                .inner
145                .state
146                .lock()
147                .map_err(|_| Error::PoisonedLock("coroutine"))?;
148            state.mailbox = None;
149            state.yielded.take()
150        };
151
152        match result {
153            Ok(value) => {
154                if let Some(yielded) = yielded {
155                    self.inner
156                        .status
157                        .store(CoroutineStatus::Suspended.as_u8(), Ordering::Relaxed);
158                    Ok(yielded)
159                } else {
160                    self.inner
161                        .status
162                        .store(CoroutineStatus::Done.as_u8(), Ordering::Relaxed);
163                    Ok(value)
164                }
165            }
166            Err(err) => {
167                self.inner
168                    .status
169                    .store(CoroutineStatus::Done.as_u8(), Ordering::Relaxed);
170                Err(err)
171            }
172        }
173    }
174
175    /// Records `value` as the coroutine's yield and returns it.
176    ///
177    /// Only valid from a running coroutine; errors otherwise.
178    pub fn yield_value(&self, value: Value) -> Result<Value> {
179        let mut state = self
180            .inner
181            .state
182            .lock()
183            .map_err(|_| Error::PoisonedLock("coroutine"))?;
184        if self.status() != CoroutineStatus::Running {
185            return Err(Error::Eval(
186                "server/yield may only be used from a running coroutine".to_owned(),
187            ));
188        }
189        state.yielded = Some(value.clone());
190        Ok(value)
191    }
192
193    /// Cancels the coroutine, clearing its state and marking it done.
194    pub fn cancel(&self) -> Result<()> {
195        let mut state = self
196            .inner
197            .state
198            .lock()
199            .map_err(|_| Error::PoisonedLock("coroutine"))?;
200        state.cancelled = true;
201        state.mailbox = None;
202        state.yielded = None;
203        self.inner
204            .status
205            .store(CoroutineStatus::Done.as_u8(), Ordering::Relaxed);
206        Ok(())
207    }
208}
209
210impl Object for Coroutine {
211    fn display(&self, _cx: &mut Cx) -> Result<String> {
212        Ok("#<server-coroutine>".to_owned())
213    }
214
215    fn as_any(&self) -> &dyn std::any::Any {
216        self
217    }
218}
219
220impl sim_kernel::ObjectCompat for Coroutine {
221    fn class(&self, cx: &mut Cx) -> Result<ClassRef> {
222        cx.factory().class_stub(
223            sim_kernel::ClassId(0),
224            Symbol::qualified("server", "Coroutine"),
225        )
226    }
227    fn as_expr(&self, cx: &mut Cx) -> Result<sim_kernel::Expr> {
228        self.as_table(cx)?.object().as_expr(cx)
229    }
230    fn as_table(&self, cx: &mut Cx) -> Result<Value> {
231        let state = self
232            .inner
233            .state
234            .lock()
235            .map_err(|_| Error::PoisonedLock("coroutine"))?;
236        let mailbox = match &state.mailbox {
237            Some(mailbox) => mailbox.clone(),
238            None => cx.factory().nil()?,
239        };
240        let cancelled = state.cancelled;
241        drop(state);
242        let address = self.inner.address.as_value(cx)?;
243        cx.factory().table(vec![
244            (
245                Symbol::new("kind"),
246                cx.factory().symbol(Symbol::new("coroutine"))?,
247            ),
248            (
249                Symbol::new("id"),
250                cx.factory().string(self.inner.id.to_string())?,
251            ),
252            (
253                Symbol::new("status"),
254                cx.factory().symbol(self.status().as_symbol())?,
255            ),
256            (Symbol::new("address"), address),
257            (Symbol::new("mailbox"), mailbox),
258            (Symbol::new("cancelled"), cx.factory().bool(cancelled)?),
259        ])
260    }
261}