enclave_runner/usercalls/
mod.rs

1/* Copyright (c) Fortanix, Inc.
2 *
3 * This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6
7use std::alloc::{GlobalAlloc, Layout, System};
8use std::cell::RefCell;
9use std::collections::{HashMap, VecDeque};
10use std::io::{self, ErrorKind as IoErrorKind, Read, Result as IoResult};
11use std::pin::Pin;
12use std::ptr;
13use std::result::Result as StdResult;
14use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
15use std::sync::{Arc, LazyLock};
16use std::task::{Context, Poll, Waker};
17use std::thread::{self, JoinHandle};
18use std::time::{self, Duration};
19use std::{cmp, fmt, str};
20
21use anyhow::bail;
22use fnv::FnvHashMap;
23use futures::future::{poll_fn, Either, Future, FutureExt};
24use futures::lock::Mutex;
25use lazy_static::lazy_static;
26#[cfg(unix)]
27use libc::*;
28#[cfg(unix)]
29use nix::sys::signal;
30use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
31use tokio::runtime::Builder as RuntimeBuilder;
32use tokio::sync::{broadcast, mpsc as async_mpsc, oneshot, Semaphore};
33use tokio::sync::broadcast::error::RecvError;
34use fortanix_sgx_abi::*;
35use insecure_time::{Freq, Rdtscp};
36use ipc_queue::{DescriptorGuard, Identified, QueueEvent};
37use ipc_queue::position::WritePosition;
38use sgxs::loader::Tcs as SgxsTcs;
39
40use crate::loader::{EnclavePanic, ErasedTcs};
41use crate::tcs::{self, CoResult, ThreadResult};
42use self::abi::dispatch;
43use self::abi::ReturnValue;
44use self::abi::UsercallList;
45use self::interface::ToSgxResult;
46use self::interface::{Handler, OutputBuffer};
47
48pub(crate) mod abi;
49mod interface;
50
51lazy_static! {
52    static ref DEBUGGER_TOGGLE_SYNC: Mutex<()> = Mutex::new(());
53}
54
55const NANOS_PER_SEC: u64 = 1_000_000_000;
56
57static TIME_INFO: LazyLock<Option<InsecureTimeInfo>> = LazyLock::new(|| {
58    if Rdtscp::is_supported() {
59        if let Ok(frequency) = Freq::get() {
60            return Some(InsecureTimeInfo {
61                version: 0,
62                frequency: frequency.as_u64(),
63            })
64        }
65    }
66    None
67});
68
69// This is not an event in the sense that it could be passed to `send()` or
70// `wait()` usercalls in enclave code. However, it's easier for the enclave
71// runner implementation to lump it in with events. Also note that this constant
72// is not public.
73const EV_ABORT: u64 = 0b0000_0000_0001_0000;
74
75const USERCALL_QUEUE_SIZE: usize = 16;
76const RETURN_QUEUE_SIZE: usize = 1024;
77const CANCEL_QUEUE_SIZE: usize = USERCALL_QUEUE_SIZE * 2;
78
79enum UsercallSendData {
80    Sync(ThreadResult<ErasedTcs>, RunningTcs, RefCell<[u8; 1024]>),
81    Async(Identified<Usercall>, Option<oneshot::Receiver<()>>),
82}
83
84// This is the same as UsercallSendData except that it can't be Sync(CoResult::Return(...), ...)
85enum UsercallHandleData {
86    Sync(tcs::Usercall<ErasedTcs>, RunningTcs, RefCell<[u8; 1024]>),
87    Async(Identified<Usercall>, Option<oneshot::Receiver<()>>, Option<async_mpsc::UnboundedSender<UsercallEvent>>),
88}
89
90type EnclaveResult = StdResult<(u64, u64), EnclaveAbort<Option<EnclavePanic>>>;
91
92struct ReadOnly<R>(Pin<Box<R>>);
93struct WriteOnly<W>(Pin<Box<W>>);
94
95macro_rules! forward {
96    (fn $n:ident(mut self: Pin<&mut Self> $(, $p:ident : $t:ty)*) -> $ret:ty) => {
97        fn $n(mut self: Pin<&mut Self> $(, $p: $t)*) -> $ret {
98            self.0.as_mut().$n($($p),*)
99        }
100    }
101}
102
103impl<R: std::marker::Unpin + AsyncRead> AsyncRead for ReadOnly<R> {
104    forward!(fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut ReadBuf) -> Poll<tokio::io::Result<()>>);
105}
106
107impl<T> AsyncRead for WriteOnly<T> {
108    fn poll_read(self: Pin<&mut Self>, _cx: &mut Context, _buf: &mut ReadBuf) -> Poll<tokio::io::Result<()>> {
109        Poll::Ready(Err(IoErrorKind::BrokenPipe.into()))
110    }
111}
112
113impl<T> AsyncWrite for ReadOnly<T> {
114    fn poll_write(self: Pin<&mut Self>, _cx: &mut Context, _buf: &[u8]) -> Poll<tokio::io::Result<usize>> {
115        Poll::Ready(Err(IoErrorKind::BrokenPipe.into()))
116    }
117
118    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<tokio::io::Result<()>> {
119        Poll::Ready(Err(IoErrorKind::BrokenPipe.into()))
120    }
121
122    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<tokio::io::Result<()>> {
123        Poll::Ready(Err(IoErrorKind::BrokenPipe.into()))
124    }
125}
126
127impl<W: std::marker::Unpin + AsyncWrite> AsyncWrite for WriteOnly<W> {
128    forward!(fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<tokio::io::Result<usize>>);
129    forward!(fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<tokio::io::Result<()>>);
130    forward!(fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<tokio::io::Result<()>>);
131}
132
133struct Stdin;
134
135impl AsyncRead for Stdin {
136    fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut ReadBuf) -> Poll<tokio::io::Result<()>> {
137        const BUF_SIZE: usize = 8192;
138
139        struct AsyncStdin {
140            rx: async_mpsc::Receiver<VecDeque<u8>>,
141            buf: VecDeque<u8>,
142        }
143
144        lazy_static::lazy_static! {
145            static ref STDIN: Mutex<AsyncStdin> = {
146                let (tx, rx) = async_mpsc::channel(8);
147                thread::spawn(move || {
148                    let mut buf = [0u8; BUF_SIZE];
149                    while let Ok(len) = io::stdin().read(&mut buf) {
150                        if len == 0 {
151                            continue
152                        }
153
154                        if tx.try_send(buf[..len].to_vec().into()).is_err() {
155                            return
156                        };
157                    }
158                });
159                Mutex::new(AsyncStdin { rx, buf: VecDeque::new() })
160            };
161        }
162
163        match Pin::new(&mut STDIN.lock()).poll(cx) {
164            Poll::Ready(mut stdin) => {
165                if stdin.buf.is_empty() {
166                    let pipeerr = tokio::io::Error::new(tokio::io::ErrorKind::BrokenPipe, "broken pipe");
167                    stdin.buf = match Pin::new(&mut stdin.rx).poll_recv(cx) {
168                        Poll::Ready(Some(vec)) => vec,
169                        Poll::Ready(None) => return Poll::Ready(Err(pipeerr)),
170                        _ => return Poll::Pending,
171                    };
172                }
173                let inbuf = match stdin.buf.as_slices() {
174                    (&[], inbuf) => inbuf,
175                    (inbuf, _) => inbuf,
176                };
177                let len = cmp::min(buf.remaining(), inbuf.len());
178                buf.put_slice(&inbuf[..len]);
179                stdin.buf.drain(..len);
180                Poll::Ready(Ok(()))
181            }
182            Poll::Pending => Poll::Pending
183        }
184    }
185}
186
187pub trait AsyncStream: AsyncRead + AsyncWrite + 'static + Send + Sync {
188    fn poll_read_alloc(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<Vec<u8>>>
189    {
190        let mut v: Vec<u8> = vec![0; 8192];
191        let mut buffer = ReadBuf::new(&mut v);
192        self.poll_read(cx, &mut buffer).map(|b| b.map(|_| buffer.filled().to_vec()))
193    }
194}
195
196impl<S: AsyncRead + AsyncWrite + Sync + Send + 'static> AsyncStream for S {}
197
198/// AsyncListener lets an implementation implement a slightly modified form of `std::net::TcpListener::accept`.
199pub trait AsyncListener: 'static + Send {
200    /// The enclave may optionally request the local or peer addresses
201    /// be returned in `local_addr` or `peer_addr`, respectively.
202    /// If `local_addr` and/or `peer_addr` are not `None`, they will point to an empty `String`.
203    /// On success, user-space can fill in the strings as appropriate.
204    ///
205    /// The enclave must not make any security decisions based on the local address received.
206    fn poll_accept(
207        self: Pin<&mut Self>,
208        cx: &mut Context,
209        local_addr: Option<&mut String>,
210        peer_addr: Option<&mut String>,
211    ) -> Poll<tokio::io::Result<Option<Box<dyn AsyncStream>>>>;
212}
213
214struct AsyncStreamAdapter {
215    stream: Pin<Box<dyn AsyncStream>>,
216    read_queue: VecDeque<Waker>,
217    write_queue: VecDeque<Waker>,
218    flush_queue: VecDeque<Waker>,
219}
220
221fn notify_other_tasks(cx: &mut Context, queue: &mut VecDeque<Waker>) {
222    for task in queue.drain(..) {
223        if !task.will_wake(&cx.waker()) {
224            task.wake();
225        }
226    }
227}
228
229impl AsyncStreamAdapter {
230    fn poll_read_alloc(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<tokio::io::Result<Vec<u8>>> {
231        match self.stream.as_mut().poll_read_alloc(cx) {
232            Poll::Pending => {
233                self.read_queue.push_back(cx.waker().clone());
234                Poll::Pending
235            }
236            Poll::Ready(Ok(ret)) => {
237                notify_other_tasks(cx, &mut self.read_queue);
238                Poll::Ready(Ok(ret))
239            }
240            Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
241        }
242    }
243
244    fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut ReadBuf) -> Poll<tokio::io::Result<()>> {
245        match self.stream.as_mut().poll_read(cx, buf) {
246            Poll::Pending => {
247                self.read_queue.push_back(cx.waker().clone());
248                Poll::Pending
249            }
250            Poll::Ready(Ok(())) => {
251                notify_other_tasks(cx, &mut self.read_queue);
252                Poll::Ready(Ok(()))
253            }
254            Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
255        }
256    }
257
258    fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<tokio::io::Result<usize>> {
259        match self.stream.as_mut().poll_write(cx, buf) {
260            Poll::Pending => {
261                self.write_queue.push_back(cx.waker().clone());
262                Poll::Pending
263            }
264            Poll::Ready(Ok(ret)) => {
265                notify_other_tasks(cx, &mut self.write_queue);
266                Poll::Ready(Ok(ret))
267            }
268            Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
269        }
270    }
271
272    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<tokio::io::Result<()>> {
273        match self.stream.as_mut().poll_flush(cx) {
274            Poll::Pending => {
275                self.flush_queue.push_back(cx.waker().clone());
276                Poll::Pending
277            }
278            Poll::Ready(Ok(ret)) => {
279                notify_other_tasks(cx, &mut self.flush_queue);
280                Poll::Ready(Ok(ret))
281            }
282            Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
283        }
284    }
285}
286
287struct AsyncStreamContainer {
288    inner: Mutex<Pin<Box<AsyncStreamAdapter>>>,
289}
290
291impl AsyncStreamContainer {
292    fn new(s: Box<dyn AsyncStream>) -> Self {
293        AsyncStreamContainer {
294            inner: Mutex::new(Box::pin(AsyncStreamAdapter {
295                stream: s.into(),
296                read_queue: VecDeque::new(),
297                write_queue: VecDeque::new(),
298                flush_queue: VecDeque::new(),
299            })),
300        }
301    }
302
303    async fn async_read(&self, buf: &mut ReadBuf<'_>) -> IoResult<()> {
304        poll_fn(|cx| {
305            let inner_ref = &mut self.inner.lock();
306            let mut inner = Pin::new(inner_ref);
307            match inner.as_mut().poll(cx) {
308                Poll::Ready(mut adapter) => adapter.as_mut().poll_read(cx, buf),
309                Poll::Pending => Poll::Pending,
310            }
311        }).await
312    }
313
314    async fn async_read_alloc(&self) -> IoResult<Vec<u8>> {
315        poll_fn(|cx| {
316            let inner_ref = &mut self.inner.lock();
317            let mut inner = Pin::new(inner_ref);
318            match inner.as_mut().poll(cx) {
319                Poll::Ready(mut adapter) => adapter.as_mut().poll_read_alloc(cx),
320                Poll::Pending => Poll::Pending,
321            }
322        }).await
323    }
324
325    async fn async_write(&self, buf: &[u8]) -> IoResult<usize> {
326        poll_fn(|cx| {
327            let inner_ref = &mut self.inner.lock();
328            let mut inner = Pin::new(inner_ref);
329            match inner.as_mut().poll(cx) {
330                Poll::Ready(mut adapter) => adapter.as_mut().poll_write(cx, buf),
331                Poll::Pending => Poll::Pending,
332            }
333        }).await
334    }
335
336    async fn async_flush(&self) -> IoResult<()> {
337        poll_fn(|cx| {
338            let inner_ref = &mut self.inner.lock();
339            let mut inner = Pin::new(inner_ref);
340            match inner.as_mut().poll(cx) {
341                Poll::Ready(mut adapter) => adapter.as_mut().poll_flush(cx),
342                Poll::Pending => Poll::Pending,
343            }
344        }).await
345    }
346}
347
348struct AsyncListenerAdapter {
349    listener: Pin<Box<dyn AsyncListener>>,
350    accept_queue: VecDeque<Waker>,
351}
352
353impl AsyncListenerAdapter {
354    fn poll_accept(
355        mut self: Pin<&mut Self>,
356        cx: &mut Context,
357        local_addr: Option<&mut String>,
358        peer_addr: Option<&mut String>
359    ) -> Poll<tokio::io::Result<Option<Box<dyn AsyncStream>>>> {
360        match self.listener.as_mut().poll_accept(cx, local_addr, peer_addr) {
361            Poll::Pending => {
362                self.accept_queue.push_back(cx.waker().clone());
363                Poll::Pending
364            }
365            Poll::Ready(Ok(ret)) => {
366                notify_other_tasks(cx, &mut self.accept_queue);
367                Poll::Ready(Ok(ret))
368            }
369            Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
370        }
371    }
372}
373
374struct AsyncListenerContainer {
375    inner: Mutex<Pin<Box<AsyncListenerAdapter>>>,
376}
377
378impl AsyncListenerContainer {
379    fn new(l: Box<dyn AsyncListener>) -> Self {
380        AsyncListenerContainer {
381            inner: Mutex::new(Pin::new(Box::new(AsyncListenerAdapter {
382                listener: l.into(),
383                accept_queue: VecDeque::new(),
384            }))),
385        }
386    }
387
388    async fn async_accept(&self, local_addr: Option<&mut String>, peer_addr: Option<&mut String>) -> IoResult<Option<Box<dyn AsyncStream>>> {
389        let mut local_addr_owned: Option<String> = if local_addr.is_some() { Some(String::new()) } else { None };
390        let mut peer_addr_owned: Option<String> = if peer_addr.is_some() { Some(String::new()) } else { None };
391        let res = poll_fn(|cx| {
392            let inner_ref = &mut self.inner.lock();
393            let mut inner = Pin::new(inner_ref);
394            match inner.as_mut().poll(cx) {
395                Poll::Ready(mut adapter) => adapter.as_mut().poll_accept(cx, local_addr_owned.as_mut(), peer_addr_owned.as_mut()),
396                Poll::Pending => Poll::Pending,
397            }
398        }).await;
399
400        if let Some(local_addr) = local_addr {
401            *local_addr = local_addr_owned.unwrap();
402        }
403        if let Some(peer_addr) = peer_addr {
404            *peer_addr = peer_addr_owned.unwrap();
405        }
406        res
407    }
408}
409
410impl AsyncListener for tokio::net::TcpListener {
411    fn poll_accept(
412        self: Pin<&mut Self>,
413        cx: &mut Context,
414        local_addr: Option<&mut String>,
415        peer_addr: Option<&mut String>,
416    ) -> Poll<tokio::io::Result<Option<Box<dyn AsyncStream>>>> {
417        match tokio::net::TcpListener::poll_accept(&self, cx) {
418            Poll::Ready(Ok((stream, _peer))) => {
419                if let Some(local_addr) = local_addr {
420                    *local_addr = stream.local_addr().map(|addr| addr.to_string()).unwrap_or_else(|_err| "error".to_owned());
421                }
422                if let Some(peer_addr) = peer_addr {
423                    *peer_addr = stream.peer_addr().map(|addr| addr.to_string()).unwrap_or_else(|_err| "error".to_owned());
424                }
425                Poll::Ready(Ok(Some(Box::new(stream))))
426            }
427            Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
428            Poll::Pending => Poll::Pending,
429        }
430    }
431}
432
433enum AsyncFileDesc {
434    Stream(AsyncStreamContainer),
435    Listener(AsyncListenerContainer),
436}
437
438impl AsyncFileDesc {
439    fn stream(s: Box<dyn AsyncStream>) -> AsyncFileDesc {
440        AsyncFileDesc::Stream(AsyncStreamContainer::new(s))
441    }
442
443    fn listener(l: Box<dyn AsyncListener>) -> AsyncFileDesc {
444        AsyncFileDesc::Listener(AsyncListenerContainer::new(l))
445    }
446
447    fn as_stream(&self) -> IoResult<&AsyncStreamContainer> {
448        if let AsyncFileDesc::Stream(ref s) = self {
449            Ok(s)
450        } else {
451            Err(IoErrorKind::InvalidInput.into())
452        }
453    }
454
455    fn as_listener(&self) -> IoResult<&AsyncListenerContainer> {
456        if let AsyncFileDesc::Listener(ref l) = self {
457            Ok(l)
458        } else {
459            Err(IoErrorKind::InvalidInput.into())
460        }
461    }
462}
463
464#[derive(Debug)]
465pub(crate) enum EnclaveAbort<T> {
466    Exit {
467        panic: T,
468    },
469    /// Secondary threads exiting due to an abort
470    Secondary,
471    IndefiniteWait,
472    InvalidUsercall(u64),
473    MainReturned,
474}
475
476impl<T> EnclaveAbort<T> {
477    fn map_panic<U, F: FnOnce(T) -> U>(self, f: F) -> EnclaveAbort<U> {
478        match self {
479            EnclaveAbort::Exit { panic } => EnclaveAbort::Exit { panic: f(panic) },
480            EnclaveAbort::Secondary => EnclaveAbort::Secondary,
481            EnclaveAbort::IndefiniteWait => EnclaveAbort::IndefiniteWait,
482            EnclaveAbort::InvalidUsercall(n) => EnclaveAbort::InvalidUsercall(n),
483            EnclaveAbort::MainReturned => EnclaveAbort::MainReturned,
484        }
485    }
486}
487
488#[derive(Copy, Clone, Debug, Hash, Eq, PartialEq, Ord, PartialOrd)]
489struct TcsAddress(usize);
490
491impl ErasedTcs {
492    fn address(&self) -> TcsAddress {
493        TcsAddress(SgxsTcs::address(self) as _)
494    }
495}
496
497impl fmt::Pointer for TcsAddress {
498    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
499        (self.0 as *const u8).fmt(f)
500    }
501}
502
503struct StoppedTcs {
504    tcs: ErasedTcs,
505}
506
507struct IOHandlerInput<'tcs> {
508    tcs: Option<&'tcs mut RunningTcs>,
509    enclave: Arc<EnclaveState>,
510    work_sender: &'tcs crossbeam::channel::Sender<Work>,
511}
512
513struct PendingEvents {
514    // The Semaphores are basically counting how many times each event set has
515    // been sent. The count is decreased when an event set is consumed through
516    // `take` or `wait_for` methods by calling `SemaphorePermit::forget`.
517    counts: [Semaphore; Self::EV_MAX],
518    abort: Semaphore,
519}
520
521impl PendingEvents {
522    // Will error if it doesn't fit in a `u64`
523    const EV_MAX_U64: u64 = (EV_USERCALLQ_NOT_FULL | EV_RETURNQ_NOT_EMPTY | EV_UNPARK | EV_CANCELQ_NOT_FULL) + 1;
524    const EV_MAX: usize = Self::EV_MAX_U64 as _;
525    // Will error if it doesn't fit in a `usize`
526    const _ERROR_IF_USIZE_TOO_SMALL: u64 = u64::MAX + (Self::EV_MAX_U64 - (Self::EV_MAX as u64));
527    const _EV_MAX_U16: u16 = Self::EV_MAX_U64 as u16;
528    // Will error if it doesn't fit in a `u16`. We should consider a different approach if we reach this limit.
529    const _ERROR_IF_TOO_BIG: u64 = u64::MAX + (Self::EV_MAX_U64 - (Self::_EV_MAX_U16 as u64));
530
531    fn new() -> Self {
532        PendingEvents {
533            counts: [
534                Semaphore::new(0), Semaphore::new(0), Semaphore::new(0), Semaphore::new(0),
535                Semaphore::new(0), Semaphore::new(0), Semaphore::new(0), Semaphore::new(0),
536                Semaphore::new(0), Semaphore::new(0), Semaphore::new(0), Semaphore::new(0),
537                Semaphore::new(0), Semaphore::new(0), Semaphore::new(0), Semaphore::new(0),
538            ],
539            abort: Semaphore::new(0),
540        }
541    }
542
543    fn take(&self, event_mask: u64) -> Option<u64> {
544        assert!(event_mask < Self::EV_MAX_U64);
545
546        if let Ok(_) = self.abort.try_acquire() {
547            return Some(EV_ABORT);
548        }
549
550        for i in (1..Self::EV_MAX).rev() {
551            let ev = i as u64;
552            if (ev & event_mask) != 0 {
553                if let Ok(permit) = self.counts[i].try_acquire() {
554                    permit.forget();
555                    return Some(ev);
556                }
557            }
558        }
559        None
560    }
561
562    async fn wait_for(&self, event_mask: u64) -> u64 {
563        assert!(event_mask < Self::EV_MAX_U64);
564
565        if let Ok(_) = self.abort.try_acquire() {
566            return EV_ABORT;
567        }
568
569        let it = std::iter::once((EV_ABORT, &self.abort))
570            .chain(self.counts.iter().enumerate().map(|(ev, sem)| (ev as u64, sem)).filter(|&(ev, _)| ev & event_mask != 0))
571            .map(|(ev, sem)| sem.acquire().map(move |permit| (ev, permit)).boxed());
572
573        let ((ev, permit), _, _) = futures::future::select_all(it).await;
574
575        // Abort should take precedence if it happens concurrently with another
576        // event. The abort semaphore may not have been selected.
577        if let Ok(_) = self.abort.try_acquire() {
578            return EV_ABORT;
579        }
580        if ev != EV_ABORT {
581            if let Ok(permit) = permit {
582                permit.forget();
583            }
584        }
585        ev
586    }
587
588    fn push(&self, event: u64) {
589        debug_assert!(event != 0 && event < Self::EV_MAX_U64);
590        let index = event as usize;
591        // add_permits() panics if the permit limit is exceeded.
592        // NOTE: [the documentation] incorrectly specifies the maximum to be
593        // `usize::MAX >> 3`, while the actual maximum is `usize::MAX >> 4`.
594        // It's possible to have multiple threads pushing the same event
595        // concurrently, hence the smaller bound.
596        //
597        // [the documentation]: https://docs.rs/tokio/0.2.22/tokio/sync/struct.Semaphore.html#method.add_permits
598        const MAX_PERMITS: usize = usize::MAX >> 5;
599        if self.counts[index].available_permits() < MAX_PERMITS {
600            self.counts[index].add_permits(1);
601        }
602    }
603
604    fn abort(&self) {
605        if self.abort.available_permits() == 0 {
606            self.abort.add_permits(usize::MAX >> 5);
607        }
608    }
609}
610
611struct RunningTcs {
612    tcs_address: TcsAddress,
613    mode: EnclaveEntry,
614}
615
616enum EnclaveKind {
617    Command(Command),
618    Library(Library),
619}
620
621struct PanicReason {
622    primary_panic_reason: Option<EnclaveAbort<EnclavePanic>>,
623    other_reasons: Vec<EnclaveAbort<EnclavePanic>>,
624}
625
626struct Command {
627    panic_reason: Mutex<PanicReason>,
628}
629
630struct Library {}
631
632impl EnclaveKind {
633    fn as_command(&self) -> Option<&Command> {
634        match self {
635            EnclaveKind::Command(c) => Some(c),
636            _ => None,
637        }
638    }
639
640    fn _as_library(&self) -> Option<&Library> {
641        match self {
642            EnclaveKind::Library(l) => Some(l),
643            _ => None,
644        }
645    }
646}
647
648struct FifoGuards {
649    usercall_queue: DescriptorGuard<Usercall>,
650    return_queue: DescriptorGuard<Return>,
651    cancel_queue: DescriptorGuard<Cancel>,
652    async_queues_called: bool,
653}
654
655pub(crate) struct EnclaveState {
656    kind: EnclaveKind,
657    event_queues: FnvHashMap<TcsAddress, PendingEvents>,
658    fds: Mutex<FnvHashMap<Fd, Arc<AsyncFileDesc>>>,
659    last_fd: AtomicUsize,
660    exiting: AtomicBool,
661    usercall_ext: Box<dyn UsercallExtension>,
662    threads_queue: crossbeam::queue::SegQueue<StoppedTcs>,
663    forward_panics: bool,
664    force_time_usercalls: bool,
665    // Once set to Some, the guards should not be dropped for the lifetime of the enclave.
666    fifo_guards: Mutex<Option<FifoGuards>>,
667    return_queue_tx: Mutex<Option<ipc_queue::AsyncSender<Return, QueueSynchronizer>>>,
668}
669
670struct Work {
671    tcs: RunningTcs,
672    entry: CoEntry,
673}
674
675enum CoEntry {
676    Initial(ErasedTcs, u64, u64, u64, u64, u64),
677    Resume(tcs::Usercall<ErasedTcs>, (u64, u64)),
678}
679
680impl Work {
681    fn do_work(self, io_send_queue: &mut tokio::sync::mpsc::UnboundedSender<UsercallSendData>) {
682        let buf = RefCell::new([0u8; 1024]);
683        let usercall_send_data = match self.entry {
684            CoEntry::Initial(erased_tcs, p1, p2, p3, p4, p5) => {
685                let coresult = tcs::coenter(erased_tcs, p1, p2, p3, p4, p5, Some(&buf));
686                UsercallSendData::Sync(coresult, self.tcs, buf)
687            }
688            CoEntry::Resume(usercall, coresult) => {
689                let coresult = usercall.coreturn(coresult, Some(&buf));
690                UsercallSendData::Sync(coresult, self.tcs, buf)
691            }
692        };
693        // if there is an error do nothing, as it means that the main thread has exited
694        let _ = io_send_queue.send(usercall_send_data);
695    }
696}
697
698enum UsercallEvent {
699    Started(u64, oneshot::Sender<()>),
700    Finished(u64),
701    Cancelled(u64, WritePosition),
702}
703
704trait IgnoreCancel {
705    fn ignore_cancel(&self) -> bool;
706}
707
708impl IgnoreCancel for Identified<Usercall> {
709    fn ignore_cancel(&self) -> bool {
710        self.data.0 != UsercallList::read as u64 &&
711            self.data.0 != UsercallList::read_alloc as u64 &&
712            self.data.0 != UsercallList::write as u64 &&
713            self.data.0 != UsercallList::accept_stream as u64 &&
714            self.data.0 != UsercallList::connect_stream as u64 &&
715            self.data.0 != UsercallList::wait as u64
716    }
717}
718
719impl EnclaveState {
720    fn event_queue_add_tcs(
721        event_queues: &mut FnvHashMap<TcsAddress, PendingEvents>,
722        tcs: ErasedTcs,
723    ) -> StoppedTcs {
724        if event_queues.insert(tcs.address(), PendingEvents::new()).is_some() {
725            panic!("duplicate TCS address: {:p}", tcs.address())
726        }
727        StoppedTcs {
728            tcs,
729        }
730    }
731
732    fn new(
733        kind: EnclaveKind,
734        mut event_queues: FnvHashMap<TcsAddress, PendingEvents>,
735        usercall_ext: Option<Box<dyn UsercallExtension>>,
736        threads_vector: Vec<ErasedTcs>,
737        forward_panics: bool,
738        force_time_usercalls: bool,
739    ) -> Arc<Self> {
740        let mut fds = FnvHashMap::default();
741
742        fds.insert(
743            FD_STDIN,
744            Arc::new(AsyncFileDesc::stream(Box::new(ReadOnly(
745                Box::pin(Stdin)
746            )))),
747        );
748        fds.insert(
749            FD_STDOUT,
750            Arc::new(AsyncFileDesc::stream(Box::new(WriteOnly(
751                Box::pin(tokio::io::stdout()),
752            )))),
753        );
754        fds.insert(
755            FD_STDERR,
756            Arc::new(AsyncFileDesc::stream(Box::new(WriteOnly(
757                Box::pin(tokio::io::stderr()),
758            )))),
759        );
760        let last_fd = AtomicUsize::new(fds.keys().cloned().max().unwrap() as _);
761
762        let usercall_ext = usercall_ext.unwrap_or_else(|| Box::new(UsercallExtensionDefault));
763
764        let threads_queue = crossbeam::queue::SegQueue::new();
765
766        for thread in threads_vector {
767            threads_queue.push(Self::event_queue_add_tcs(&mut event_queues, thread));
768        }
769
770        Arc::new(EnclaveState {
771            kind,
772            event_queues,
773            fds: Mutex::new(fds),
774            last_fd,
775            exiting: AtomicBool::new(false),
776            usercall_ext,
777            threads_queue,
778            forward_panics,
779            force_time_usercalls,
780            fifo_guards: Mutex::new(None),
781            return_queue_tx: Mutex::new(None),
782        })
783    }
784
785    async fn handle_usercall(
786        enclave: Arc<EnclaveState>,
787        work_sender: crossbeam::channel::Sender<Work>,
788        tx_return_channel: tokio::sync::mpsc::UnboundedSender<(EnclaveResult, ReturnSource)>,
789        mut handle_data: UsercallHandleData,
790    ) {
791        let notifier_rx = match handle_data {
792            UsercallHandleData::Async(_, ref mut notifier_rx, _) => notifier_rx.take(),
793            _ => None,
794        };
795        let (parameters, mode, tcs) = match handle_data {
796            UsercallHandleData::Sync(ref usercall, ref mut tcs, _) => (usercall.parameters(), tcs.mode.into(), Some(tcs)),
797            UsercallHandleData::Async(ref usercall, _, _) => (usercall.data.into(), ReturnSource::AsyncUsercall, None),
798        };
799        let mut input = IOHandlerInput { enclave: enclave.clone(), tcs, work_sender: &work_sender };
800        let handler = Handler(&mut input);
801        let result = {
802            let (p1, p2, p3, p4, p5) = parameters;
803            match notifier_rx {
804                None => dispatch(handler, p1, p2, p3, p4, p5).await.1,
805                Some(notifier_rx) => {
806                    let a = dispatch(handler, p1, p2, p3, p4, p5).boxed_local();
807                    let b = notifier_rx;
808                    match futures::future::select(a, b).await {
809                        Either::Left((ret, _)) => ret.1,
810                        Either::Right((Ok(()), _)) => {
811                            let result: IoResult<usize> = Err(IoErrorKind::Interrupted.into());
812                            ReturnValue::into_registers(Ok(result.to_sgx_result()))
813                        },
814                        Either::Right((Err(_), _)) => panic!("notifier channel closed unexpectedly"),
815                    }
816                },
817            }
818        };
819        let ret = match result {
820            Ok(ret) => {
821                match handle_data {
822                    UsercallHandleData::Sync(usercall, tcs, _) => {
823                        work_sender.send(Work {
824                            tcs,
825                            entry: CoEntry::Resume(usercall, ret),
826                        }).expect("Work sender couldn't send data to receiver");
827                    }
828                    UsercallHandleData::Async(usercall, _, usercall_event_tx) => {
829                        if let Some(usercall_event_tx) = usercall_event_tx {
830                            usercall_event_tx.send(UsercallEvent::Finished(usercall.id)).ok()
831                                .expect("failed to send usercall event");
832                        }
833                        let return_queue_tx = enclave.return_queue_tx.lock().await.clone().expect("return_queue_tx not initialized");
834                        let ret = Identified {
835                            id: usercall.id,
836                            data: Return(ret.0, ret.1),
837                        };
838                        return_queue_tx.send(ret).await.unwrap();
839                    }
840                }
841                return;
842            }
843            Err(EnclaveAbort::Exit { panic: true }) => {
844                let panic = match handle_data {
845                    UsercallHandleData::Sync(usercall, _, debug_buf) => {
846                        let debug_buf = debug_buf.into_inner();
847                        #[cfg(unix)] {
848                            eprintln!("Attaching debugger");
849                            trap_attached_debugger(usercall.tcs_address() as _, debug_buf.as_ptr()).await;
850                        }
851                        EnclavePanic::from(debug_buf)
852                    }
853                    UsercallHandleData::Async(_, _, _) => {
854                        // TODO: https://github.com/fortanix/rust-sgx/issues/235#issuecomment-641811437
855                        EnclavePanic::DebugStr("async exit with a panic".to_owned())
856                    }
857                };
858                Err(EnclaveAbort::Exit{ panic: Some(panic) })
859            }
860            Err(EnclaveAbort::Exit { panic: false }) => Err(EnclaveAbort::Exit{ panic: None }),
861            Err(EnclaveAbort::IndefiniteWait) => Err(EnclaveAbort::IndefiniteWait),
862            Err(EnclaveAbort::InvalidUsercall(n)) => Err(EnclaveAbort::InvalidUsercall(n)),
863            Err(EnclaveAbort::MainReturned) => Err(EnclaveAbort::MainReturned),
864            Err(EnclaveAbort::Secondary) => Err(EnclaveAbort::Secondary),
865        };
866        let _ = tx_return_channel.send((ret, mode));
867    }
868
869    fn syscall_loop(
870        enclave: Arc<EnclaveState>,
871        mut io_queue_receive: tokio::sync::mpsc::UnboundedReceiver<UsercallSendData>,
872        io_queue_send: tokio::sync::mpsc::UnboundedSender<UsercallSendData>,
873        work_sender: crossbeam::channel::Sender<Work>,
874    ) -> EnclaveResult {
875        let (tx_return_channel, mut rx_return_channel) = tokio::sync::mpsc::unbounded_channel();
876        let enclave_clone = enclave.clone();
877        let mut rt = RuntimeBuilder::new_current_thread()
878            .enable_all()
879            .build()
880            .expect("failed to create tokio Runtime");
881        let local_set = tokio::task::LocalSet::new();
882
883        let return_future = async move {
884            while let Some((my_result, mode)) = rx_return_channel.recv().await {
885                if enclave_clone.forward_panics {
886                    if let Err(EnclaveAbort::Exit { panic: Some(panic) }) = &my_result {
887                        panic!("{}", panic);
888                    }
889                }
890
891                let res = match (my_result, mode) {
892                    (Err(EnclaveAbort::Secondary), _) |
893                    (Ok(_), ReturnSource::ExecutableNonMain) => continue,
894
895                    (e, ReturnSource::Library) |
896                    (e, ReturnSource::ExecutableMain) |
897                    (e @ Err(EnclaveAbort::Exit { panic: None }), _)
898                        => e,
899
900                    (Ok(_), ReturnSource::AsyncUsercall) |
901                    (Err(EnclaveAbort::MainReturned), ReturnSource::AsyncUsercall) => unreachable!(),
902
903                    (Err(e @ EnclaveAbort::Exit { panic: Some(_) }), _) |
904                    (Err(e @ EnclaveAbort::InvalidUsercall(_)), _) => {
905                        let e = e.map_panic(|opt| opt.unwrap());
906                        let cmd = enclave_clone.kind.as_command().unwrap();
907                        let mut cmddata = cmd.panic_reason.lock().await;
908
909                        if cmddata.primary_panic_reason.is_none() {
910                            cmddata.primary_panic_reason = Some(e)
911                        } else {
912                            cmddata.other_reasons.push(e)
913                        }
914                        Err(EnclaveAbort::Secondary)
915                    }
916
917                    (Err(e), _) => {
918                        let e = e.map_panic(|opt| opt.unwrap());
919                        let cmd = enclave_clone.kind.as_command().unwrap();
920                        let mut cmddata = cmd.panic_reason.lock().await;
921                        cmddata.other_reasons.push(e);
922                        continue;
923                    }
924                };
925                return res;
926            }
927            unreachable!();
928        };
929        let enclave_clone = enclave.clone();
930        let io_future = async move {
931            let (uqs, rqs, cqs, sync_usercall_tx) = QueueSynchronizer::new(enclave_clone.clone());
932
933            let (usercall_queue_tx, usercall_queue_rx) = ipc_queue::bounded_async(USERCALL_QUEUE_SIZE, uqs);
934            let (return_queue_tx, return_queue_rx) = ipc_queue::bounded_async(RETURN_QUEUE_SIZE, rqs);
935            let (cancel_queue_tx, cancel_queue_rx) = ipc_queue::bounded_async(CANCEL_QUEUE_SIZE, cqs);
936
937            let fifo_guards = FifoGuards {
938                usercall_queue: usercall_queue_tx.into_descriptor_guard(),
939                return_queue: return_queue_rx.into_descriptor_guard(),
940                cancel_queue: cancel_queue_tx.into_descriptor_guard(),
941                async_queues_called: false,
942            };
943
944            *enclave_clone.fifo_guards.lock().await = Some(fifo_guards);
945            *enclave_clone.return_queue_tx.lock().await = Some(return_queue_tx);
946
947            let usercall_queue_monitor = usercall_queue_rx.position_monitor();
948
949            let (usercall_event_tx, mut usercall_event_rx) = async_mpsc::unbounded_channel();
950            let usercall_event_tx_clone = usercall_event_tx.clone();
951            tokio::task::spawn_local(async move {
952                while let Ok(usercall) = usercall_queue_rx.recv().await {
953                    let notifier_rx = if usercall.ignore_cancel() {
954                        None
955                    } else {
956                        let (notifier_tx, notifier_rx) = oneshot::channel();
957                        usercall_event_tx_clone.send(UsercallEvent::Started(usercall.id, notifier_tx)).ok().expect("failed to send usercall event");
958                        Some(notifier_rx)
959                    };
960                    let _ = io_queue_send.send(UsercallSendData::Async(usercall, notifier_rx));
961                }
962            });
963
964            let usercall_event_tx_clone = usercall_event_tx.clone();
965            let usercall_queue_monitor_clone = usercall_queue_monitor.clone();
966            tokio::task::spawn_local(async move {
967                while let Ok(c) = cancel_queue_rx.recv().await {
968                    let write_position = usercall_queue_monitor_clone.write_position();
969                    let _ = usercall_event_tx_clone.send(UsercallEvent::Cancelled(c.id, write_position));
970                }
971            });
972
973            tokio::task::spawn_local(async move {
974                let mut notifiers = HashMap::new();
975                let mut cancels: HashMap<u64, WritePosition> = HashMap::new();
976                loop {
977                    match usercall_event_rx.recv().await.expect("usercall_event channel closed unexpectedly") {
978                        UsercallEvent::Started(id, notifier) => match cancels.remove(&id) {
979                            Some(_) => { let _ = notifier.send(()); },
980                            _ => { notifiers.insert(id, notifier); },
981                        },
982                        UsercallEvent::Finished(id) => { notifiers.remove(&id); },
983                        UsercallEvent::Cancelled(id, wp) => match notifiers.remove(&id) {
984                            Some(notifier) => { let _ = notifier.send(()); },
985                            None => { cancels.insert(id, wp); },
986                        },
987                    }
988                    // cleanup old cancels
989                    let read_position = usercall_queue_monitor.read_position();
990                    cancels.retain(|_id, wp|
991                        if let Some(past) = read_position.is_past(wp) {
992                            !past
993                        } else {
994                            false
995                        }
996                    );
997                }
998            });
999
1000            while let Some(work) = io_queue_receive.recv().await {
1001                let enclave_clone = enclave_clone.clone();
1002                let tx_return_channel = tx_return_channel.clone();
1003                match work {
1004                    UsercallSendData::Async(usercall, notifier_rx) => {
1005                        let usercall_event_tx = if usercall.ignore_cancel() { None } else { Some(usercall_event_tx.clone()) };
1006                        let uchd = UsercallHandleData::Async(usercall, notifier_rx, usercall_event_tx);
1007                        let fut = Self::handle_usercall(enclave_clone, work_sender.clone(), tx_return_channel, uchd);
1008                        tokio::task::spawn_local(fut);
1009                    }
1010                    UsercallSendData::Sync(CoResult::Yield(usercall), state, buf) => {
1011                        let _ = sync_usercall_tx.send(());
1012                        let uchd = UsercallHandleData::Sync(usercall, state, buf);
1013                        let fut = Self::handle_usercall(enclave_clone, work_sender.clone(), tx_return_channel, uchd);
1014                        tokio::task::spawn_local(fut);
1015                    }
1016                    UsercallSendData::Sync(CoResult::Return((tcs, v1, v2)), state, _buf) => {
1017                        let fut = async move {
1018                            let ret = match state.mode {
1019                                EnclaveEntry::Library => {
1020                                    enclave_clone.threads_queue.push(StoppedTcs { tcs });
1021                                    Ok((v1, v2))
1022                                }
1023                                EnclaveEntry::ExecutableMain => Err(EnclaveAbort::MainReturned),
1024                                EnclaveEntry::ExecutableNonMain => {
1025                                    assert_eq!(
1026                                        (v1, v2),
1027                                        (0, 0),
1028                                        "Expected enclave thread entrypoint to return zero"
1029                                    );
1030                                    // If the enclave is in the exit-state, threads are no
1031                                    // longer able to be launched
1032                                    if !enclave_clone.exiting.load(Ordering::SeqCst) {
1033                                        enclave_clone.threads_queue.push(StoppedTcs { tcs });
1034                                    }
1035                                    Ok((0, 0))
1036                                }
1037                            };
1038                            let _ = tx_return_channel.send((ret, state.mode.into()));
1039                        };
1040                        tokio::task::spawn_local(fut);
1041                    }
1042                };
1043            }
1044            unreachable!();
1045        };
1046
1047        // Note that:
1048        // - io_future will never return, its job is to spawn new futures that handle I/O.
1049        // - return_future returns in certain cases (see above) and in such cases we want to
1050        //   terminate the syscall loop.
1051        let select_fut =
1052            futures::future::select(return_future.boxed_local(), io_future.boxed_local()).map( |either| {
1053                match either {
1054                    Either::Left((x, _)) => x,
1055                    _ => unreachable!(),
1056                }
1057            });
1058
1059        local_set.block_on(&mut rt, select_fut.unit_error()).unwrap()
1060    }
1061
1062    fn run(
1063        enclave: Arc<EnclaveState>,
1064        num_of_worker_threads: usize,
1065        start_work: Work,
1066    ) -> EnclaveResult {
1067        fn create_worker_threads(
1068            num_of_worker_threads: usize,
1069            work_receiver: crossbeam::channel::Receiver<Work>,
1070            io_queue_send: tokio::sync::mpsc::UnboundedSender<UsercallSendData>,
1071        ) -> Vec<JoinHandle<()>> {
1072            let mut thread_handles = vec![];
1073            for _ in 0..num_of_worker_threads {
1074                let work_receiver = work_receiver.clone();
1075                let mut io_queue_send = io_queue_send.clone();
1076
1077                thread_handles.push(thread::spawn(move || {
1078                    while let Ok(work) = work_receiver.recv() {
1079                        work.do_work(&mut io_queue_send);
1080                    }
1081                }));
1082            }
1083            thread_handles
1084        }
1085
1086        let (io_queue_send, io_queue_receive) = tokio::sync::mpsc::unbounded_channel();
1087
1088        let (work_sender, work_receiver) = crossbeam::channel::unbounded();
1089        work_sender
1090            .send(start_work)
1091            .expect("Work sender couldn't send data to receiver");
1092
1093        let join_handlers =
1094            create_worker_threads(num_of_worker_threads, work_receiver, io_queue_send.clone());
1095        // main syscall polling loop
1096        let main_result =
1097            EnclaveState::syscall_loop(enclave.clone(), io_queue_receive, io_queue_send, work_sender);
1098
1099        for handler in join_handlers {
1100            let _ = handler.join();
1101        }
1102
1103        main_result
1104    }
1105
1106    pub(crate) fn main_entry(
1107        main: ErasedTcs,
1108        threads: Vec<ErasedTcs>,
1109        usercall_ext: Option<Box<dyn UsercallExtension>>,
1110        forward_panics: bool,
1111        force_time_usercalls: bool,
1112        cmd_args: Vec<Vec<u8>>,
1113        num_of_worker_threads: usize,
1114    ) -> StdResult<(), anyhow::Error> {
1115        assert!(num_of_worker_threads > 0, "worker_threads cannot be zero");
1116        let mut event_queues =
1117            FnvHashMap::with_capacity_and_hasher(threads.len() + 1, Default::default());
1118        let main = Self::event_queue_add_tcs(&mut event_queues, main);
1119
1120        let mut args = Vec::with_capacity(cmd_args.len());
1121        for a in cmd_args {
1122            args.push(ByteBuffer {
1123                len: a.len(),
1124                data: Box::into_raw(a.into_boxed_slice()) as *const u8,
1125            });
1126        }
1127        let argc = args.len();
1128        let argv = Box::into_raw(args.into_boxed_slice()) as *const u8;
1129
1130        let main_work = Work {
1131            tcs: RunningTcs {
1132                tcs_address: main.tcs.address(),
1133                mode: EnclaveEntry::ExecutableMain,
1134            },
1135            entry: CoEntry::Initial(main.tcs, argv as _, argc as _, 0, 0, 0),
1136        };
1137
1138        let kind = EnclaveKind::Command(Command {
1139            panic_reason: Mutex::new(PanicReason {
1140                primary_panic_reason: None,
1141                other_reasons: vec![],
1142            }),
1143        });
1144        let enclave = EnclaveState::new(kind, event_queues, usercall_ext, threads, forward_panics, force_time_usercalls);
1145
1146        let main_result = EnclaveState::run(enclave.clone(), num_of_worker_threads, main_work);
1147
1148        let main_panicking = match main_result {
1149            Err(EnclaveAbort::MainReturned)
1150            | Err(EnclaveAbort::InvalidUsercall(_))
1151            | Err(EnclaveAbort::Exit { .. }) => true,
1152            Err(EnclaveAbort::IndefiniteWait) | Err(EnclaveAbort::Secondary) | Ok(_) => false,
1153        };
1154
1155        let rt = tokio::runtime::Builder::new_current_thread()
1156            .enable_all()
1157            .build()
1158            .expect("failed to create tokio Runtime");
1159
1160        rt.block_on(async move {
1161            enclave.abort_all_threads();
1162            //clear the threads_queue
1163            while enclave.threads_queue.pop().is_some() {}
1164
1165            let cmd = enclave.kind.as_command().unwrap();
1166            let mut cmddata = cmd.panic_reason.lock().await;
1167            let main_result = match (main_panicking, cmddata.primary_panic_reason.take()) {
1168                (false, Some(reason)) => Err(reason.map_panic(Some)),
1169                // TODO: interpret other_reasons
1170                _ => main_result,
1171            };
1172            match main_result {
1173                Err(EnclaveAbort::Exit { panic: None }) => Ok(()),
1174                Err(EnclaveAbort::Exit { panic: Some(panic) }) => Err(panic.into()),
1175                Err(EnclaveAbort::IndefiniteWait) => {
1176                    bail!("All enclave threads are waiting indefinitely without possibility of wakeup")
1177                }
1178                Err(EnclaveAbort::InvalidUsercall(n)) => {
1179                    bail!("The enclave performed an invalid usercall 0x{:x}", n)
1180                }
1181                Err(EnclaveAbort::MainReturned) => bail!(
1182                    "The enclave returned from the main entrypoint in violation of the specification."
1183                ),
1184                // Should always be able to return the real exit reason
1185                Err(EnclaveAbort::Secondary) => unreachable!(),
1186                Ok(_) => Ok(()),
1187            }
1188        }.boxed_local())
1189    }
1190
1191    pub(crate) fn library(
1192        threads: Vec<ErasedTcs>,
1193        usercall_ext: Option<Box<dyn UsercallExtension>>,
1194        forward_panics: bool,
1195        force_time_usercalls: bool,
1196    ) -> Arc<Self> {
1197        let event_queues = FnvHashMap::with_capacity_and_hasher(threads.len(), Default::default());
1198
1199        let kind = EnclaveKind::Library(Library {});
1200
1201        let enclave = EnclaveState::new(kind, event_queues, usercall_ext, threads, forward_panics, force_time_usercalls);
1202        return enclave;
1203    }
1204
1205    pub(crate) fn library_entry(
1206        enclave: &Arc<Self>,
1207        p1: u64,
1208        p2: u64,
1209        p3: u64,
1210        p4: u64,
1211        p5: u64,
1212    ) -> StdResult<(u64, u64), anyhow::Error> {
1213        let thread = enclave.threads_queue.pop().expect("threads queue empty");
1214        let work = Work {
1215            tcs: RunningTcs {
1216                tcs_address: thread.tcs.address(),
1217                mode: EnclaveEntry::Library,
1218            },
1219            entry: CoEntry::Initial(thread.tcs, p1, p2, p3, p4, p5),
1220        };
1221        // As currently we are not supporting spawning threads let the number of threads be 2
1222        // one for usercall handling the other for actually running
1223        let num_of_worker_threads = 1;
1224
1225        let library_result = EnclaveState::run(enclave.clone(), num_of_worker_threads, work);
1226
1227        match library_result {
1228            Err(EnclaveAbort::Exit { panic: None }) => bail!("This thread aborted execution"),
1229            Err(EnclaveAbort::Exit { panic: Some(panic) }) => Err(panic.into()),
1230            Err(EnclaveAbort::IndefiniteWait) => {
1231                bail!("This thread is waiting indefinitely without possibility of wakeup")
1232            }
1233            Err(EnclaveAbort::InvalidUsercall(n)) => {
1234                bail!("The enclave performed an invalid usercall 0x{:x}", n)
1235            }
1236            Err(EnclaveAbort::Secondary) => {
1237                bail!("This thread exited because another thread aborted")
1238            }
1239            Err(EnclaveAbort::MainReturned) => unreachable!(),
1240            Ok(result) => Ok(result),
1241        }
1242    }
1243
1244    fn abort_all_threads(&self) {
1245        self.exiting.store(true, Ordering::SeqCst);
1246        // wake other threads
1247        for pending_events in self.event_queues.values() {
1248            pending_events.abort();
1249        }
1250    }
1251}
1252
1253#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1254enum EnclaveEntry {
1255    ExecutableMain,
1256    ExecutableNonMain,
1257    Library,
1258}
1259
1260#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1261enum ReturnSource {
1262    ExecutableMain,
1263    ExecutableNonMain,
1264    Library,
1265    AsyncUsercall,
1266}
1267
1268impl From<EnclaveEntry> for ReturnSource {
1269    fn from(e: EnclaveEntry) -> Self {
1270        match e {
1271            EnclaveEntry::ExecutableMain => ReturnSource::ExecutableMain,
1272            EnclaveEntry::ExecutableNonMain => ReturnSource::ExecutableNonMain,
1273            EnclaveEntry::Library => ReturnSource::Library,
1274        }
1275    }
1276}
1277
1278#[repr(C)]
1279#[allow(unused)]
1280enum Greg {
1281    R8 = 0,
1282    R9,
1283    R10,
1284    R11,
1285    R12,
1286    R13,
1287    R14,
1288    R15,
1289    RDI,
1290    RSI,
1291    RBP,
1292    RBX,
1293    RDX,
1294    RAX,
1295    RCX,
1296    RSP,
1297    RIP,
1298    EFL,
1299    CSGSFS, /* Actually short cs, gs, fs, __pad0. */
1300    ERR,
1301    TRAPNO,
1302    OLDMASK,
1303    CR2,
1304}
1305
1306#[cfg(unix)]
1307/* Here we are passing control to debugger `fixup` style by raising Sigtrap.
1308 * If there is no debugger attached, this function, would skip the `int3` instructon
1309 * and resume execution.
1310 */
1311extern "C" fn handle_trap(_signo: c_int, _info: *mut siginfo_t, context: *mut c_void) {
1312    unsafe {
1313        let context = &mut *(context as *mut ucontext_t);
1314        let rip = &mut context.uc_mcontext.gregs[Greg::RIP as usize];
1315        let inst: *const u8 = *rip as _;
1316        if *inst == 0xcc {
1317            *rip += 1;
1318        }
1319    }
1320    return;
1321}
1322
1323#[cfg(unix)]
1324/* Raising Sigtrap to allow debugger to take control.
1325 * Here, we also store tcs in rbx, so that the debugger could read it, to
1326 * set sgx state and correctly map the enclave symbols.
1327 */
1328async fn trap_attached_debugger(tcs: usize, debug_buf: *const u8) {
1329    let _g = DEBUGGER_TOGGLE_SYNC.lock().await;
1330    let hdl = self::signal::SigHandler::SigAction(handle_trap);
1331    let sig_action = signal::SigAction::new(hdl, signal::SaFlags::empty(), signal::SigSet::empty());
1332    // Synchronized
1333    unsafe {
1334        let old = signal::sigaction(signal::SIGTRAP, &sig_action).unwrap();
1335        std::arch::asm!("
1336            xchg %rbx, {0}
1337            int3
1338            xchg {0}, %rbx
1339            ",
1340            in(reg) tcs, // rbx is used internally by LLVM and cannot be used as an operand for inline asm (#84658)
1341            in("r10") debug_buf,
1342            options(nomem, nostack, att_syntax));
1343        signal::sigaction(signal::SIGTRAP, &old).unwrap();
1344    }
1345}
1346
1347/// Provides a mechanism for the enclave code to interface with an external service via a modified runner.
1348///
1349/// An implementation of `UsercallExtension` can be registered while [building](../struct.EnclaveBuilder.html#method.usercall_extension) the enclave.
1350pub trait UsercallExtension: 'static + Send + Sync + std::fmt::Debug {
1351    /// Override the connection target for connect calls by the enclave. The runner should determine the service that the enclave is trying to connect to by looking at addr.
1352    /// If `connect_stream` returns None, the default implementation of [`connect_stream`](../../fortanix_sgx_abi/struct.Usercalls.html#method.connect_stream) is used.
1353    /// The enclave may optionally request the local or peer addresses
1354    /// be returned in `local_addr` or `peer_addr`, respectively.
1355    /// If `local_addr` and/or `peer_addr` are not `None`, they will point to an empty `String`.
1356    /// On success, user-space can fill in the strings as appropriate.
1357    ///
1358    /// The enclave must not make any security decisions based on the local or
1359    /// peer address received.
1360    #[allow(unused)]
1361    fn connect_stream<'future>(
1362        &'future self,
1363        addr: &'future str,
1364        local_addr: Option<&'future mut String>,
1365        peer_addr: Option<&'future mut String>,
1366    ) -> std::pin::Pin<Box<dyn Future<Output = IoResult<Option<Box<dyn AsyncStream>>>> +'future>> {
1367        async {
1368            Ok(None)
1369        }.boxed_local()
1370    }
1371
1372    /// Override the target for bind calls by the enclave. The runner should determine the service that the enclave is trying to bind to by looking at addr.
1373    /// If `bind_stream` returns None, the default implementation of [`bind_stream`](../../fortanix_sgx_abi/struct.Usercalls.html#method.bind_stream) is used.
1374    /// The enclave may optionally request the local address be returned in `local_addr`.
1375    /// If `local_addr` is not `None`, it will point to an empty `String`.
1376    /// On success, user-space can fill in the string as appropriate.
1377    ///
1378    /// The enclave must not make any security decisions based on the local address received.
1379    #[allow(unused)]
1380    fn bind_stream<'future>(
1381        &'future self,
1382        addr: &'future str,
1383        local_addr: Option<&'future mut String>,
1384    ) -> std::pin::Pin<Box<dyn Future<Output = IoResult<Option<Box<dyn AsyncListener>>>> + 'future>> {
1385        async {
1386            Ok(None)
1387        }.boxed_local()
1388    }
1389}
1390
1391impl<T: UsercallExtension> From<T> for Box<dyn UsercallExtension> {
1392    fn from(value: T) -> Box<dyn UsercallExtension> {
1393        Box::new(value)
1394    }
1395}
1396
1397#[derive(Debug)]
1398struct UsercallExtensionDefault;
1399impl UsercallExtension for UsercallExtensionDefault {}
1400
1401impl<'tcs> IOHandlerInput<'tcs> {
1402    async fn lookup_fd(&self, fd: Fd) -> IoResult<Arc<AsyncFileDesc>> {
1403        match self.enclave.fds.lock().await.get(&fd) {
1404            Some(stream) => Ok(stream.clone()),
1405            None => Err(IoErrorKind::BrokenPipe.into()), // FIXME: Rust normally maps Unix EBADF to `Other`
1406        }
1407    }
1408
1409    async fn alloc_fd(&self, stream: AsyncFileDesc) -> Fd {
1410        let fd = (self
1411            .enclave
1412            .last_fd
1413            .fetch_add(1, Ordering::Relaxed)
1414            .checked_add(1)
1415            .expect("FD overflow")) as Fd;
1416        let prev = self.enclave.fds.lock().await.insert(fd, Arc::new(stream));
1417        debug_assert!(prev.is_none());
1418        fd
1419    }
1420
1421    #[inline(always)]
1422    fn is_exiting(&self) -> bool {
1423        self.enclave.exiting.load(Ordering::SeqCst)
1424    }
1425
1426    #[inline(always)]
1427    async fn read(&self, fd: Fd, buf: &mut ReadBuf<'_>) -> IoResult<()> {
1428        let file_desc = self.lookup_fd(fd).await?;
1429        file_desc.as_stream()?.async_read(buf).await
1430    }
1431
1432    #[inline(always)]
1433    async fn read_alloc(&self, fd: Fd, buf: &mut OutputBuffer<'tcs>) -> IoResult<()> {
1434        let file_desc = self.lookup_fd(fd).await?;
1435        let v = file_desc.as_stream()?.async_read_alloc().await?;
1436        buf.set(v);
1437        Ok(())
1438    }
1439
1440    #[inline(always)]
1441    async fn write(&self, fd: Fd, buf: &[u8]) -> IoResult<usize> {
1442        let file_desc = self.lookup_fd(fd).await?;
1443        return file_desc.as_stream()?.async_write(buf).await;
1444    }
1445
1446    #[inline(always)]
1447    async fn flush(&self, fd: Fd) -> IoResult<()> {
1448        let file_desc = self.lookup_fd(fd).await?;
1449        file_desc.as_stream()?.async_flush().await
1450    }
1451
1452    #[inline(always)]
1453    async fn close(&self, fd: Fd) {
1454        self.enclave.fds.lock().await.remove(&fd);
1455    }
1456
1457    #[inline(always)]
1458    async fn bind_stream(
1459        &self,
1460        addr: &[u8],
1461        local_addr: Option<&mut OutputBuffer<'tcs>>,
1462    ) -> IoResult<Fd> {
1463        let addr = str::from_utf8(addr).map_err(|_| IoErrorKind::ConnectionRefused)?;
1464        let mut local_addr_str = local_addr.as_ref().map(|_| String::new());
1465        if let Some(stream_ext) = self
1466            .enclave
1467            .usercall_ext
1468            .bind_stream(addr, local_addr_str.as_mut()).await?
1469        {
1470            if let Some(local_addr) = local_addr {
1471                local_addr.set(local_addr_str.unwrap().into_bytes());
1472            }
1473            return Ok(self.alloc_fd(AsyncFileDesc::listener(stream_ext)).await);
1474        }
1475
1476        let socket = tokio::net::TcpListener::bind(addr).await?;
1477        if let Some(local_addr) = local_addr {
1478            local_addr.set(socket.local_addr()?.to_string().into_bytes());
1479        }
1480        Ok(self.alloc_fd(AsyncFileDesc::listener(Box::new(socket))).await)
1481    }
1482
1483    #[inline(always)]
1484    async fn accept_stream(
1485        &self,
1486        fd: Fd,
1487        local_addr: Option<&mut OutputBuffer<'tcs>>,
1488        peer_addr: Option<&mut OutputBuffer<'tcs>>,
1489    ) -> IoResult<Fd> {
1490        let mut local_addr_str = local_addr.as_ref().map(|_| String::new());
1491        let mut peer_addr_str = peer_addr.as_ref().map(|_| String::new());
1492
1493        let file_desc = self.lookup_fd(fd).await?;
1494        let stream = file_desc.as_listener()?.async_accept(local_addr_str.as_mut(), peer_addr_str.as_mut()).await?.unwrap();
1495
1496        if let Some(local_addr) = local_addr {
1497            local_addr.set(&local_addr_str.unwrap().into_bytes()[..])
1498        }
1499        if let Some(peer_addr) = peer_addr {
1500            peer_addr.set(&peer_addr_str.unwrap().into_bytes()[..])
1501        }
1502        Ok(self.alloc_fd(AsyncFileDesc::stream(stream)).await)
1503    }
1504
1505    #[inline(always)]
1506    async fn connect_stream(
1507        &self,
1508        addr: &[u8],
1509        local_addr: Option<&mut OutputBuffer<'tcs>>,
1510        peer_addr: Option<&mut OutputBuffer<'tcs>>,
1511    ) -> IoResult<Fd> {
1512        let addr = str::from_utf8(addr).map_err(|_| IoErrorKind::ConnectionRefused)?;
1513        let mut local_addr_str = local_addr.as_ref().map(|_| String::new());
1514        let mut peer_addr_str = peer_addr.as_ref().map(|_| String::new());
1515        if let Some(stream_ext) = self.enclave.usercall_ext.connect_stream(
1516            addr,
1517            local_addr_str.as_mut(),
1518            peer_addr_str.as_mut(),
1519        ).await? {
1520            if let Some(local_addr) = local_addr {
1521                local_addr.set(local_addr_str.unwrap().into_bytes());
1522            }
1523            if let Some(peer_addr) = peer_addr {
1524                peer_addr.set(peer_addr_str.unwrap().into_bytes());
1525            }
1526            return Ok(self.alloc_fd(AsyncFileDesc::stream(stream_ext)).await);
1527        }
1528
1529        let stream = tokio::net::TcpStream::connect(addr).await?;
1530
1531        if let Some(local_addr) = local_addr {
1532            match stream.local_addr() {
1533                Ok(local) => local_addr.set(local.to_string().into_bytes()),
1534                Err(_) => local_addr.set(&b"error"[..]),
1535            }
1536        }
1537        if let Some(peer_addr) = peer_addr {
1538            match stream.peer_addr() {
1539                Ok(peer) => peer_addr.set(peer.to_string().into_bytes()),
1540                Err(_) => peer_addr.set(&b"error"[..]),
1541            }
1542        }
1543        Ok(self.alloc_fd(AsyncFileDesc::stream(Box::new(stream))).await)
1544    }
1545
1546    #[inline(always)]
1547    fn launch_thread(&self) -> IoResult<()> {
1548        // check if enclave is of type command
1549        self.enclave
1550            .kind
1551            .as_command()
1552            .ok_or(IoErrorKind::InvalidInput)?;
1553        let new_tcs = match self.enclave.threads_queue.pop() {
1554            Some(tcs) => tcs,
1555            None => {
1556                return Err(IoErrorKind::WouldBlock.into());
1557            }
1558        };
1559
1560        let ret = self.work_sender.send(Work {
1561            tcs: RunningTcs {
1562                tcs_address: new_tcs.tcs.address(),
1563                mode: EnclaveEntry::ExecutableNonMain,
1564            },
1565            entry: CoEntry::Initial(new_tcs.tcs, 0, 0, 0, 0, 0),
1566        });
1567        match ret {
1568            Ok(()) => Ok(()),
1569            Err(e) => {
1570                let entry = e.0.entry;
1571                match entry {
1572                    CoEntry::Initial(tcs, _, _ ,_, _, _) => {
1573                        self.enclave.threads_queue.push(StoppedTcs { tcs });
1574                    },
1575                    _ => unreachable!(),
1576                };
1577                Err(std::io::Error::new(
1578                    IoErrorKind::NotConnected,
1579                    "Work Sender: send error",
1580                ))
1581            }
1582        }
1583    }
1584
1585    #[inline(always)]
1586    fn exit(&mut self, panic: bool) -> EnclaveAbort<bool> {
1587        self.enclave.abort_all_threads();
1588        EnclaveAbort::Exit { panic }
1589    }
1590
1591    fn check_event_set(set: u64) -> IoResult<()> {
1592        const EV_ALL: u64 = EV_USERCALLQ_NOT_FULL | EV_RETURNQ_NOT_EMPTY | EV_UNPARK | EV_CANCELQ_NOT_FULL;
1593        if (set & !EV_ALL) != 0 {
1594            return Err(IoErrorKind::InvalidInput.into());
1595        }
1596
1597        assert!((EV_ALL | EV_ABORT) <= u8::max_value().into());
1598        assert!((EV_ALL & EV_ABORT) == 0);
1599        Ok(())
1600    }
1601
1602    #[inline(always)]
1603    async fn wait(&mut self, event_mask: u64, timeout: u64) -> IoResult<u64> {
1604        Self::check_event_set(event_mask)?;
1605
1606        let timeout = match timeout {
1607            WAIT_NO | WAIT_INDEFINITE => timeout,
1608            _ => {
1609                // tokio::time::timeout does not handle large timeout values
1610                // well which may or may not be a bug in tokio, but it seems to
1611                // work ok with timeout values smaller than 2 ^ 55 nanoseconds.
1612                // NOTE: 2 ^ 55 nanoseconds is roughly 417 days.
1613                // TODO: revisit when https://github.com/tokio-rs/tokio/issues/2667 is resolved.
1614                cmp::min(1 << 55, timeout)
1615            }
1616        };
1617
1618        // TODO: https://github.com/fortanix/rust-sgx/issues/290
1619        let tcs = self.tcs.as_mut().ok_or(io::Error::from(io::ErrorKind::Other))?;
1620
1621        let pending_events = self.enclave.event_queues.get(&tcs.tcs_address).expect("invalid tcs address");
1622
1623        let ret = match timeout {
1624            WAIT_NO => pending_events.take(event_mask),
1625            WAIT_INDEFINITE => Some(pending_events.wait_for(event_mask).await),
1626            n => tokio::time::timeout(Duration::from_nanos(n), pending_events.wait_for(event_mask)).await.ok(),
1627        };
1628
1629        if let Some(ev) = ret {
1630            if (ev & EV_ABORT) != 0 {
1631                // dispatch will make sure this is not returned to enclave
1632                return Err(IoErrorKind::Other.into());
1633            }
1634            return Ok(ev.into());
1635        }
1636        Err(if timeout == WAIT_NO { IoErrorKind::WouldBlock } else { IoErrorKind::TimedOut }.into())
1637    }
1638
1639    #[inline(always)]
1640    fn send(&self, event_set: u64, target: Option<Tcs>) -> IoResult<()> {
1641        Self::check_event_set(event_set)?;
1642
1643        if event_set == 0 {
1644            return Err(IoErrorKind::InvalidInput.into());
1645        }
1646
1647        if let Some(tcs) = target {
1648            let tcs = TcsAddress(tcs.as_ptr() as _);
1649            let pending_events = self.enclave.event_queues.get(&tcs).ok_or(IoErrorKind::InvalidInput)?;
1650            pending_events.push(event_set);
1651        } else {
1652            for pending_events in self.enclave.event_queues.values() {
1653                pending_events.push(event_set);
1654            }
1655        }
1656
1657        Ok(())
1658    }
1659
1660    #[inline(always)]
1661    fn insecure_time(&mut self) -> (u64, *const InsecureTimeInfo) {
1662        let time = time::SystemTime::now()
1663            .duration_since(time::UNIX_EPOCH)
1664            .unwrap();
1665        let t = (time.subsec_nanos() as u64) + time.as_secs() * NANOS_PER_SEC;
1666        let insecure_time_ref: Option<&'static InsecureTimeInfo> = (&*TIME_INFO).as_ref();
1667        let info = if let (Some(info), false) = (insecure_time_ref, self.enclave.force_time_usercalls) {
1668                info
1669            } else {
1670                ptr::null()
1671            };
1672        (t, info)
1673    }
1674
1675    #[inline(always)]
1676    fn alloc(&self, size: usize, alignment: usize) -> IoResult<*mut u8> {
1677        unsafe {
1678            let layout =
1679                Layout::from_size_align(size, alignment).map_err(|_| IoErrorKind::InvalidInput)?;
1680            if layout.size() == 0 {
1681                return Err(IoErrorKind::InvalidInput.into());
1682            }
1683            let ptr = System.alloc(layout);
1684            if ptr.is_null() {
1685                Err(IoErrorKind::Other.into())
1686            } else {
1687                Ok(ptr)
1688            }
1689        }
1690    }
1691
1692    #[inline(always)]
1693    fn free(&self, ptr: *mut u8, size: usize, alignment: usize) -> IoResult<()> {
1694        unsafe {
1695            let layout =
1696                Layout::from_size_align(size, alignment).map_err(|_| IoErrorKind::InvalidInput)?;
1697            if size == 0 {
1698                return Ok(());
1699            }
1700            Ok(System.dealloc(ptr, layout))
1701        }
1702    }
1703
1704    #[inline(always)]
1705    async fn async_queues(
1706        &mut self,
1707        usercall_queue: &mut FifoDescriptor<Usercall>,
1708        return_queue: &mut FifoDescriptor<Return>,
1709        cancel_queue: Option<&mut FifoDescriptor<Cancel>>,
1710    ) -> StdResult<(), EnclaveAbort<bool>> {
1711        let mut fifo_guards = self.enclave.fifo_guards.lock().await;
1712        match &mut *fifo_guards {
1713            Some(ref mut fifo_guards) if !fifo_guards.async_queues_called => {
1714                *usercall_queue = fifo_guards.usercall_queue.fifo_descriptor();
1715                *return_queue = fifo_guards.return_queue.fifo_descriptor();
1716                if let Some(cancel_queue) = cancel_queue {
1717                    *cancel_queue = fifo_guards.cancel_queue.fifo_descriptor();
1718                }
1719                fifo_guards.async_queues_called = true;
1720                Ok(())
1721            }
1722            Some(_) => {
1723                drop(fifo_guards);
1724                Err(self.exit(true))
1725            },
1726            // Enclave would not be able to call `async_queues()` before the
1727            // fifo_guards is set to Some in `fn syscall_loop`.
1728            None => unreachable!(),
1729        }
1730    }
1731}
1732
1733#[derive(Clone, Copy)]
1734enum Queue {
1735    Usercall,
1736    Return,
1737    Cancel,
1738}
1739
1740struct QueueSynchronizer {
1741    queue: Queue,
1742    enclave: Arc<EnclaveState>,
1743    // the Mutex is uncontested and is used for providing interior mutability.
1744    subscription: Mutex<broadcast::Receiver<()>>,
1745    // this is only used to create a new Receiver in the Clone implementation
1746    subscription_maker: broadcast::Sender<()>,
1747}
1748
1749impl QueueSynchronizer {
1750    fn new(enclave: Arc<EnclaveState>) -> (Self, Self, Self, broadcast::Sender<()>) {
1751        // This broadcast channel is used to notify enclave-runner of any
1752        // synchronous usercalls made by the enclave for the purpose of
1753        // synchronizing access to usercall and return queues.
1754        // The size of this channel should not matter since recv() can
1755        // return RecvError::Lagged.
1756        let (tx, rx1) = broadcast::channel(1);
1757        let rx2 = tx.subscribe();
1758        let rx3 = tx.subscribe();
1759        let usercall_queue_synchronizer = QueueSynchronizer {
1760            queue: Queue::Usercall,
1761            enclave: enclave.clone(),
1762            subscription: Mutex::new(rx1),
1763            subscription_maker: tx.clone(),
1764        };
1765        let return_queue_synchronizer = QueueSynchronizer {
1766            queue: Queue::Return,
1767            enclave: enclave.clone(),
1768            subscription: Mutex::new(rx2),
1769            subscription_maker: tx.clone(),
1770        };
1771        let cancel_queue_synchronizer = QueueSynchronizer {
1772            queue: Queue::Cancel,
1773            enclave,
1774            subscription: Mutex::new(rx3),
1775            subscription_maker: tx.clone(),
1776        };
1777        (usercall_queue_synchronizer, return_queue_synchronizer, cancel_queue_synchronizer, tx)
1778    }
1779}
1780
1781impl Clone for QueueSynchronizer {
1782    fn clone(&self) -> Self {
1783        Self {
1784            queue: self.queue,
1785            enclave: self.enclave.clone(),
1786            subscription: Mutex::new(self.subscription_maker.subscribe()),
1787            subscription_maker: self.subscription_maker.clone(),
1788        }
1789    }
1790}
1791
1792impl ipc_queue::AsyncSynchronizer for QueueSynchronizer {
1793    fn wait(&self, event: QueueEvent) -> Pin<Box<dyn Future<Output = StdResult<(), ipc_queue::SynchronizationError>> + '_>> {
1794        match (self.queue, event) {
1795            (Queue::Usercall, QueueEvent::NotFull) => panic!("enclave runner should not send on the usercall queue"),
1796            (Queue::Cancel, QueueEvent::NotFull) => panic!("enclave runner should not send on the cancel queue"),
1797            (Queue::Return, QueueEvent::NotEmpty)  => panic!("enclave runner should not receive on the return queue"),
1798            _ => {}
1799        }
1800        // When userspace needs to wait on a queue, it will park the current thread (or do whatever
1801        // else is appropriate for the synchronization model currently in use by userspace).
1802        // Any synchronous usercall will wake the blocked thread (or otherwise signal that either queue is ready).
1803        async move {
1804            let mut subscription = self.subscription.lock().await;
1805            match subscription.recv().await {
1806                Ok(()) | Err(RecvError::Lagged(_)) => Ok(()),
1807                Err(RecvError::Closed) => Err(ipc_queue::SynchronizationError::ChannelClosed),
1808            }
1809        }.boxed_local()
1810    }
1811
1812    fn notify(&self, event: QueueEvent) {
1813        let ev = match (self.queue, event) {
1814            (Queue::Usercall, QueueEvent::NotEmpty) => panic!("enclave runner should not send on the usercall queue"),
1815            (Queue::Cancel, QueueEvent::NotEmpty) => panic!("enclave runner should not send on the cancel queue"),
1816            (Queue::Return, QueueEvent::NotFull) => panic!("enclave runner should not receive on the return queue"),
1817            (Queue::Usercall, QueueEvent::NotFull) => EV_USERCALLQ_NOT_FULL,
1818            (Queue::Return, QueueEvent::NotEmpty) => EV_RETURNQ_NOT_EMPTY,
1819            (Queue::Cancel, QueueEvent::NotFull) => EV_CANCELQ_NOT_FULL,
1820        };
1821        // When the enclave needs to wait on a queue, it executes the wait() usercall synchronously,
1822        // specifying EV_USERCALLQ_NOT_FULL, EV_RETURNQ_NOT_EMPTY or EV_CANCELQ_NOT_FULL in the event_mask.
1823        // Userspace will wake any or all threads waiting on the appropriate event when it is triggered.
1824        for pending_events in self.enclave.event_queues.values() {
1825            pending_events.push(ev as _);
1826        }
1827    }
1828}