1use crate::borrow_unchecked::borrow_unchecked;
2use ext_php_rs::{boxed::ZBox, call_user_func, prelude::*, types::ZendHashTable, zend::Function};
3use lazy_static::lazy_static;
4use std::{
5 cell::RefCell,
6 fs::File,
7 future::Future,
8 io::{self, Read, Write},
9 os::fd::{AsRawFd, FromRawFd, RawFd},
10 sync::mpsc::{channel, Receiver, Sender},
11};
12use tokio::runtime::Runtime;
13
14lazy_static! {
15 pub static ref RUNTIME: Runtime = Runtime::new().expect("Could not allocate runtime");
16}
17
18thread_local! {
19 static EVENTLOOP: RefCell<Option<EventLoop>> = RefCell::new(None);
20}
21
22#[cfg(any(target_os = "linux", target_os = "solaris"))]
23fn sys_pipe() -> io::Result<(RawFd, RawFd)> {
24 let mut pipefd = [0; 2];
25 let ret = unsafe { libc::pipe2(pipefd.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) };
26 if ret == -1 {
27 return Err(io::Error::last_os_error());
28 }
29 Ok((pipefd[0], pipefd[1]))
30}
31
32#[cfg(any(target_os = "macos"))]
33fn set_cloexec(fd: RawFd) -> io::Result<()> {
34 use libc::{F_SETFD, FD_CLOEXEC, F_GETFD};
35 use libc::fcntl;
36
37 let flags = unsafe { fcntl(fd, F_GETFD, 0) };
38 if flags == -1 {
39 return Err(io::Error::last_os_error());
40 }
41 let ret = unsafe { fcntl(fd, F_SETFD, flags | FD_CLOEXEC) };
42 if ret == -1 {
43 return Err(io::Error::last_os_error());
44 }
45 Ok(())
46}
47
48#[cfg(any(target_os = "macos"))]
49fn set_nonblocking(fd: RawFd) -> io::Result<()> {
50 use libc::{fcntl, F_SETFL, O_NONBLOCK};
51
52 let ret = unsafe { fcntl(fd, F_SETFL, O_NONBLOCK) };
53 if ret == -1 {
54 return Err(io::Error::last_os_error());
55 }
56
57 Ok(())
58}
59
60#[cfg(any(target_os = "macos"))]
61fn sys_pipe() -> io::Result<(RawFd, RawFd)> {
62 let mut pipefd = [0; 2];
63 let ret = unsafe { libc::pipe(pipefd.as_mut_ptr()) };
64
65 if ret == -1 {
66 return Err(io::Error::last_os_error());
67 }
68
69 for fd in &pipefd {
70 set_cloexec(*fd)?;
71 set_nonblocking(*fd)?;
72 }
73 Ok((pipefd[0], pipefd[1]))
74}
75
76pub struct EventLoop {
77 fibers: ZBox<ZendHashTable>,
78
79 sender: Sender<u64>,
80 receiver: Receiver<u64>,
81
82 notify_sender: File,
83 notify_receiver: File,
84
85 get_current_suspension: Function,
86
87 dummy: [u8; 1],
88}
89
90impl EventLoop {
91 pub fn init() -> PhpResult<u64> {
92 EVENTLOOP.with_borrow_mut(|e| {
93 Ok(match e {
94 None => e.insert(Self::new()?),
95 Some(ev) => ev,
96 }
97 .notify_receiver
98 .as_raw_fd() as u64)
99 })
100 }
101
102 pub fn suspend_on<T: Send + 'static, F: Future<Output = T> + Send + 'static>(future: F) -> T {
103 let (future, get_current_suspension) = EVENTLOOP.with_borrow_mut(move |c| {
119 let c = c.as_mut().unwrap();
120 let idx = c.fibers.len() as u64;
121 c.fibers
122 .insert_at_index(idx, call_user_func!(c.get_current_suspension).unwrap())
123 .unwrap();
124
125 let sender = c.sender.clone();
126 let mut notifier = c.notify_sender.try_clone().unwrap();
127
128 (
129 RUNTIME.spawn(async move {
130 let res = future.await;
131 sender.send(idx).unwrap();
132 notifier.write_all(&[0]).unwrap();
133 res
134 }),
135 unsafe { borrow_unchecked(&c.get_current_suspension) },
136 )
137 });
138
139 call_user_func!(get_current_suspension)
141 .unwrap()
142 .try_call_method("suspend", vec![])
143 .unwrap();
144
145 return RUNTIME.block_on(future).unwrap();
148 }
149
150 pub fn wakeup() -> PhpResult<()> {
151 EVENTLOOP.with_borrow_mut(|c| {
152 let c = c.as_mut().unwrap();
153
154 c.notify_receiver.read_exact(&mut c.dummy).unwrap();
155
156 for fiber_id in c.receiver.try_iter() {
157 if let Some(fiber) = c.fibers.get_index_mut(fiber_id) {
158 fiber
159 .object_mut()
160 .unwrap()
161 .try_call_method("resume", vec![])?;
162 c.fibers.remove_index(fiber_id);
163 }
164 }
165 Ok(())
166 })
167 }
168
169 pub fn shutdown() {
170 EVENTLOOP.set(None)
171 }
172
173 pub fn new() -> PhpResult<Self> {
174 let (sender, receiver) = channel();
175 let (notify_receiver, notify_sender) =
176 sys_pipe().map_err(|err| format!("Could not create pipe: {}", err))?;
177
178 if !call_user_func!(
179 Function::try_from_function("class_exists").unwrap(),
180 "\\Revolt\\EventLoop"
181 )?
182 .bool()
183 .unwrap_or(false)
184 {
185 return Err(format!("\\Revolt\\EventLoop does not exist!").into());
186 }
187 if !call_user_func!(
188 Function::try_from_function("interface_exists").unwrap(),
189 "\\Revolt\\EventLoop\\Suspension"
190 )?
191 .bool()
192 .unwrap_or(false)
193 {
194 return Err(format!("\\Revolt\\EventLoop\\Suspension does not exist!").into());
195 }
196
197 Ok(Self {
198 fibers: ZendHashTable::new(),
199 sender: sender,
200 receiver: receiver,
201 notify_sender: unsafe { File::from_raw_fd(notify_sender) },
202 notify_receiver: unsafe { File::from_raw_fd(notify_receiver) },
203 dummy: [0; 1],
204 get_current_suspension: Function::try_from_method(
205 "\\Revolt\\EventLoop",
206 "getSuspension",
207 )
208 .ok_or("\\Revolt\\EventLoop::getSuspension does not exist")?,
209 })
210 }
211}