php_tokio/
event_loop.rs

1use crate::borrow_unchecked::borrow_unchecked;
2use ext_php_rs::{boxed::ZBox, call_user_func, prelude::*, types::ZendHashTable, zend::Function};
3use lazy_static::lazy_static;
4use std::{
5    cell::RefCell,
6    fs::File,
7    future::Future,
8    io::{self, Read, Write},
9    os::fd::{AsRawFd, FromRawFd, RawFd},
10    sync::mpsc::{channel, Receiver, Sender},
11};
12use tokio::runtime::Runtime;
13
14lazy_static! {
15    pub static ref RUNTIME: Runtime = Runtime::new().expect("Could not allocate runtime");
16}
17
18thread_local! {
19    static EVENTLOOP: RefCell<Option<EventLoop>> = RefCell::new(None);
20}
21
22#[cfg(any(target_os = "linux", target_os = "solaris"))]
23fn sys_pipe() -> io::Result<(RawFd, RawFd)> {
24    let mut pipefd = [0; 2];
25    let ret = unsafe { libc::pipe2(pipefd.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) };
26    if ret == -1 {
27        return Err(io::Error::last_os_error());
28    }
29    Ok((pipefd[0], pipefd[1]))
30}
31
32#[cfg(any(target_os = "macos"))]
33fn set_cloexec(fd: RawFd) -> io::Result<()> {
34    use libc::{F_SETFD, FD_CLOEXEC, F_GETFD};
35    use libc::fcntl;
36
37    let flags = unsafe { fcntl(fd, F_GETFD, 0) };
38    if flags == -1 {
39        return Err(io::Error::last_os_error());
40    }
41    let ret = unsafe { fcntl(fd, F_SETFD, flags | FD_CLOEXEC) };
42    if ret == -1 {
43        return Err(io::Error::last_os_error());
44    }
45    Ok(())
46}
47
48#[cfg(any(target_os = "macos"))]
49fn set_nonblocking(fd: RawFd) -> io::Result<()> {
50    use libc::{fcntl, F_SETFL, O_NONBLOCK};
51
52    let ret = unsafe { fcntl(fd, F_SETFL, O_NONBLOCK) };
53    if ret == -1 {
54        return Err(io::Error::last_os_error());
55    }
56
57    Ok(())
58}
59
60#[cfg(any(target_os = "macos"))]
61fn sys_pipe() -> io::Result<(RawFd, RawFd)> {
62    let mut pipefd = [0; 2];
63    let ret = unsafe { libc::pipe(pipefd.as_mut_ptr()) };
64
65    if ret == -1 {
66        return Err(io::Error::last_os_error());
67    }
68
69    for fd in &pipefd {
70        set_cloexec(*fd)?;
71        set_nonblocking(*fd)?;
72    }
73    Ok((pipefd[0], pipefd[1]))
74}
75
76pub struct EventLoop {
77    fibers: ZBox<ZendHashTable>,
78
79    sender: Sender<u64>,
80    receiver: Receiver<u64>,
81
82    notify_sender: File,
83    notify_receiver: File,
84
85    get_current_suspension: Function,
86
87    dummy: [u8; 1],
88}
89
90impl EventLoop {
91    pub fn init() -> PhpResult<u64> {
92        EVENTLOOP.with_borrow_mut(|e| {
93            Ok(match e {
94                None => e.insert(Self::new()?),
95                Some(ev) => ev,
96            }
97            .notify_receiver
98            .as_raw_fd() as u64)
99        })
100    }
101
102    pub fn suspend_on<T: Send + 'static, F: Future<Output = T> + Send + 'static>(future: F) -> T {
103        // What's going on here? Unsafe borrows???
104        // NO: this is actually 100% safe, and here's why.
105        //
106        // Rust thinks we're Sending the Future to another thread (tokio's event loop),
107        // where it may be used even after its lifetime expires in the main (PHP) thread.
108        //
109        // In reality, the Future is only used by Tokio until the result is ready.
110        //
111        // Rust does not understand that when we suspend the current fiber in suspend_on,
112        // we basically keep alive the the entire stack,
113        // including the Rust stack and the Future on it, until the result of the future is ready.
114        //
115        // Once the result of the Future is ready, tokio doesn't need it anymore,
116        // the suspend_on function is resumed, and we safely drop the Future upon exiting.
117        //
118        let (future, get_current_suspension) = EVENTLOOP.with_borrow_mut(move |c| {
119            let c = c.as_mut().unwrap();
120            let idx = c.fibers.len() as u64;
121            c.fibers
122                .insert_at_index(idx, call_user_func!(c.get_current_suspension).unwrap())
123                .unwrap();
124
125            let sender = c.sender.clone();
126            let mut notifier = c.notify_sender.try_clone().unwrap();
127
128            (
129                RUNTIME.spawn(async move {
130                    let res = future.await;
131                    sender.send(idx).unwrap();
132                    notifier.write_all(&[0]).unwrap();
133                    res
134                }),
135                unsafe { borrow_unchecked(&c.get_current_suspension) },
136            )
137        });
138
139        // We suspend the fiber here, the Rust stack is kept alive until the result is ready.
140        call_user_func!(get_current_suspension)
141            .unwrap()
142            .try_call_method("suspend", vec![])
143            .unwrap();
144
145        // We've resumed, the `future` is already resolved and is not used by the tokio thread, it's safe to drop it.
146
147        return RUNTIME.block_on(future).unwrap();
148    }
149
150    pub fn wakeup() -> PhpResult<()> {
151        EVENTLOOP.with_borrow_mut(|c| {
152            let c = c.as_mut().unwrap();
153
154            c.notify_receiver.read_exact(&mut c.dummy).unwrap();
155
156            for fiber_id in c.receiver.try_iter() {
157                if let Some(fiber) = c.fibers.get_index_mut(fiber_id) {
158                    fiber
159                        .object_mut()
160                        .unwrap()
161                        .try_call_method("resume", vec![])?;
162                    c.fibers.remove_index(fiber_id);
163                }
164            }
165            Ok(())
166        })
167    }
168
169    pub fn shutdown() {
170        EVENTLOOP.set(None)
171    }
172
173    pub fn new() -> PhpResult<Self> {
174        let (sender, receiver) = channel();
175        let (notify_receiver, notify_sender) =
176            sys_pipe().map_err(|err| format!("Could not create pipe: {}", err))?;
177
178        if !call_user_func!(
179            Function::try_from_function("class_exists").unwrap(),
180            "\\Revolt\\EventLoop"
181        )?
182        .bool()
183        .unwrap_or(false)
184        {
185            return Err(format!("\\Revolt\\EventLoop does not exist!").into());
186        }
187        if !call_user_func!(
188            Function::try_from_function("interface_exists").unwrap(),
189            "\\Revolt\\EventLoop\\Suspension"
190        )?
191        .bool()
192        .unwrap_or(false)
193        {
194            return Err(format!("\\Revolt\\EventLoop\\Suspension does not exist!").into());
195        }
196
197        Ok(Self {
198            fibers: ZendHashTable::new(),
199            sender: sender,
200            receiver: receiver,
201            notify_sender: unsafe { File::from_raw_fd(notify_sender) },
202            notify_receiver: unsafe { File::from_raw_fd(notify_receiver) },
203            dummy: [0; 1],
204            get_current_suspension: Function::try_from_method(
205                "\\Revolt\\EventLoop",
206                "getSuspension",
207            )
208            .ok_or("\\Revolt\\EventLoop::getSuspension does not exist")?,
209        })
210    }
211}