#![allow(unsafe_code)]
use std::{
collections::VecDeque,
os::unix::io::RawFd,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::{Duration, Instant},
};
use gpui::{PlatformDispatcher, Priority, RunnableVariant, ThreadTaskTimings};
use parking_lot::Mutex;
#[allow(dead_code)]
const ALOOPER_POLL_CALLBACK: i32 = -2;
const ALOOPER_EVENT_INPUT: i32 = 1;
unsafe extern "C" {
fn ALooper_forThread() -> *mut libc_looper_opaque;
fn ALooper_addFd(
looper: *mut libc_looper_opaque,
fd: RawFd,
ident: i32,
events: i32,
callback: Option<
unsafe extern "C" fn(fd: RawFd, events: i32, data: *mut std::ffi::c_void) -> i32,
>,
data: *mut std::ffi::c_void,
) -> i32;
fn ALooper_removeFd(looper: *mut libc_looper_opaque, fd: RawFd) -> i32;
}
#[repr(C)]
struct libc_looper_opaque {
_priv: [u8; 0],
}
type BoxedTask = Box<dyn FnOnce() + Send + 'static>;
struct MainQueue {
tasks: VecDeque<BoxedTask>,
write_fd: RawFd,
}
struct ThreadPool {
sender: std::sync::mpsc::Sender<BoxedTask>,
}
impl ThreadPool {
fn new(threads: usize) -> Self {
let (sender, receiver) = std::sync::mpsc::channel::<BoxedTask>();
let receiver = Arc::new(Mutex::new(receiver));
for i in 0..threads {
let rx = Arc::clone(&receiver);
std::thread::Builder::new()
.name(format!("gpui-bg-{}", i))
.spawn(move || {
loop {
let task = {
let lock = rx.lock();
lock.recv()
};
match task {
Ok(f) => f(),
Err(_) => break, }
}
})
.expect("failed to spawn background thread");
}
ThreadPool { sender }
}
fn dispatch(&self, task: BoxedTask) {
let _ = self.sender.send(task);
}
}
struct DelayedTask {
due: Instant,
task: BoxedTask,
}
pub struct AndroidDispatcher {
main_queue: Arc<Mutex<MainQueue>>,
read_fd: RawFd,
looper: *mut libc_looper_opaque,
pool: ThreadPool,
delayed: Mutex<Vec<DelayedTask>>,
shutdown: AtomicBool,
}
unsafe impl Send for AndroidDispatcher {}
unsafe impl Sync for AndroidDispatcher {}
impl AndroidDispatcher {
pub fn new() -> Arc<Self> {
let (read_fd, write_fd) = create_pipe().expect("failed to create wake pipe");
let looper = unsafe { ALooper_forThread() };
assert!(
!looper.is_null(),
"AndroidDispatcher::new() must be called on the Android main thread"
);
let pool_threads = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4)
.max(2);
let main_queue = Arc::new(Mutex::new(MainQueue {
tasks: VecDeque::new(),
write_fd,
}));
let dispatcher = Arc::new(Self {
main_queue: Arc::clone(&main_queue),
read_fd,
looper,
pool: ThreadPool::new(pool_threads),
delayed: Mutex::new(Vec::new()),
shutdown: AtomicBool::new(false),
});
dispatcher.register_with_looper();
log::debug!("AndroidDispatcher created (pool_threads={})", pool_threads);
dispatcher
}
pub fn new_headless() -> Arc<Self> {
let (read_fd, write_fd) = create_pipe().expect("failed to create wake pipe");
let pool_threads = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(2)
.max(1);
let main_queue = Arc::new(Mutex::new(MainQueue {
tasks: VecDeque::new(),
write_fd,
}));
Arc::new(Self {
main_queue,
read_fd,
looper: std::ptr::null_mut(), pool: ThreadPool::new(pool_threads),
delayed: Mutex::new(Vec::new()),
shutdown: AtomicBool::new(false),
})
}
pub fn is_main_thread(&self) -> bool {
let current = unsafe { ALooper_forThread() };
!current.is_null() && current == self.looper
}
pub fn dispatch_on_main_thread<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
if self.shutdown.load(Ordering::Relaxed) {
return;
}
let mut q = self.main_queue.lock();
q.tasks.push_back(Box::new(f));
wake_pipe(q.write_fd);
}
pub fn dispatch<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
if self.shutdown.load(Ordering::Relaxed) {
return;
}
self.pool.dispatch(Box::new(f));
}
pub fn dispatch_after<F>(&self, delay: Duration, f: F)
where
F: FnOnce() + Send + 'static,
{
if self.shutdown.load(Ordering::Relaxed) {
return;
}
let due = Instant::now() + delay;
let mut delayed = self.delayed.lock();
delayed.push(DelayedTask {
due,
task: Box::new(f),
});
delayed.sort_by_key(|d| d.due);
}
pub fn tick(&self) {
let now = Instant::now();
let mut ready: Vec<BoxedTask> = Vec::new();
{
let mut delayed = self.delayed.lock();
while delayed.first().map(|d| d.due <= now).unwrap_or(false) {
ready.push(delayed.remove(0).task);
}
}
for task in ready {
self.pool.dispatch(task);
}
}
pub fn flush_main_thread_tasks(&self) {
loop {
let task = {
let mut q = self.main_queue.lock();
q.tasks.pop_front()
};
match task {
Some(f) => f(),
None => break,
}
}
}
pub fn shutdown(&self) {
self.shutdown.store(true, Ordering::SeqCst);
self.unregister_from_looper();
}
fn register_with_looper(&self) {
let data_ptr = Arc::into_raw(Arc::clone(&self.main_queue)) as *mut std::ffi::c_void;
let ret = unsafe {
ALooper_addFd(
self.looper,
self.read_fd,
0, ALOOPER_EVENT_INPUT,
Some(looper_callback),
data_ptr,
)
};
if ret != 1 {
log::warn!(
"ALooper_addFd returned {} (expected 1); foreground dispatch may not work",
ret
);
}
}
fn unregister_from_looper(&self) {
unsafe {
ALooper_removeFd(self.looper, self.read_fd);
}
}
}
impl Default for AndroidDispatcher {
fn default() -> Self {
panic!(
"AndroidDispatcher must be constructed via `AndroidDispatcher::new()` \
on the main thread"
);
}
}
impl Drop for AndroidDispatcher {
fn drop(&mut self) {
self.shutdown.store(true, Ordering::SeqCst);
self.unregister_from_looper();
unsafe {
libc_close(self.read_fd);
libc_close(self.main_queue.lock().write_fd);
}
}
}
unsafe extern "C" fn looper_callback(fd: RawFd, _events: i32, data: *mut std::ffi::c_void) -> i32 {
let queue_arc = unsafe { Arc::from_raw(data as *const Mutex<MainQueue>) };
let mut buf = [0u8; 64];
loop {
let n = unsafe { libc_read(fd, buf.as_mut_ptr() as *mut std::ffi::c_void, buf.len()) };
if n <= 0 {
break;
}
}
loop {
let task = {
let mut q = queue_arc.lock();
q.tasks.pop_front()
};
match task {
Some(f) => f(),
None => break,
}
}
std::mem::forget(queue_arc);
1 }
fn create_pipe() -> std::io::Result<(RawFd, RawFd)> {
let mut fds = [0i32; 2];
let ret = unsafe { libc_pipe(fds.as_mut_ptr()) };
if ret != 0 {
return Err(std::io::Error::last_os_error());
}
set_nonblocking(fds[0]);
set_nonblocking(fds[1]);
Ok((fds[0], fds[1]))
}
fn set_nonblocking(fd: RawFd) {
unsafe {
let flags = libc_fcntl(fd, LIBC_F_GETFL, 0);
if flags >= 0 {
libc_fcntl(fd, LIBC_F_SETFL, flags | LIBC_O_NONBLOCK);
}
}
}
fn wake_pipe(write_fd: RawFd) {
let buf = [1u8; 1];
unsafe {
libc_write(write_fd, buf.as_ptr() as *const std::ffi::c_void, 1);
}
}
const LIBC_F_GETFL: i32 = 3;
const LIBC_F_SETFL: i32 = 4;
const LIBC_O_NONBLOCK: i32 = 0o4000;
unsafe extern "C" {
fn pipe(pipefd: *mut i32) -> i32;
fn read(fd: i32, buf: *mut std::ffi::c_void, count: usize) -> isize;
fn write(fd: i32, buf: *const std::ffi::c_void, count: usize) -> isize;
fn close(fd: i32) -> i32;
fn fcntl(fd: i32, cmd: i32, ...) -> i32;
}
#[inline(always)]
unsafe fn libc_pipe(fds: *mut i32) -> i32 {
unsafe { pipe(fds) }
}
#[inline(always)]
unsafe fn libc_read(fd: i32, buf: *mut std::ffi::c_void, count: usize) -> isize {
unsafe { read(fd, buf, count) }
}
#[inline(always)]
unsafe fn libc_write(fd: i32, buf: *const std::ffi::c_void, count: usize) -> isize {
unsafe { write(fd, buf, count) }
}
#[inline(always)]
unsafe fn libc_close(fd: i32) -> i32 {
unsafe { close(fd) }
}
#[inline(always)]
unsafe fn libc_fcntl(fd: i32, cmd: i32, arg: i32) -> i32 {
unsafe { fcntl(fd, cmd, arg) }
}
impl PlatformDispatcher for AndroidDispatcher {
fn get_all_timings(&self) -> Vec<ThreadTaskTimings> {
Vec::new()
}
fn get_current_thread_timings(&self) -> ThreadTaskTimings {
ThreadTaskTimings {
thread_name: std::thread::current().name().map(|n| n.to_string()),
thread_id: std::thread::current().id(),
timings: Vec::new(),
total_pushed: 0,
}
}
fn is_main_thread(&self) -> bool {
AndroidDispatcher::is_main_thread(self)
}
fn dispatch(&self, runnable: RunnableVariant, _priority: Priority) {
self.pool.dispatch(Box::new(move || {
runnable.run();
}));
}
fn dispatch_on_main_thread(&self, runnable: RunnableVariant, _priority: Priority) {
if self.shutdown.load(Ordering::Relaxed) {
return;
}
let mut q = self.main_queue.lock();
q.tasks.push_back(Box::new(move || {
runnable.run();
}));
wake_pipe(q.write_fd);
}
fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant) {
if self.shutdown.load(Ordering::Relaxed) {
return;
}
let due = Instant::now() + duration;
let mut delayed = self.delayed.lock();
delayed.push(DelayedTask {
due,
task: Box::new(move || {
runnable.run();
}),
});
delayed.sort_by_key(|d| d.due);
}
fn spawn_realtime(&self, f: Box<dyn FnOnce() + Send>) {
std::thread::Builder::new()
.name("gpui-realtime".to_string())
.spawn(move || {
f();
})
.expect("failed to spawn realtime thread");
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
#[test]
fn background_tasks_run() {
let pool = ThreadPool::new(2);
let counter = Arc::new(AtomicUsize::new(0));
for _ in 0..10 {
let c = Arc::clone(&counter);
pool.dispatch(Box::new(move || {
c.fetch_add(1, Ordering::Relaxed);
}));
}
std::thread::sleep(Duration::from_millis(100));
assert_eq!(counter.load(Ordering::Relaxed), 10);
}
#[test]
fn delayed_tasks_not_early() {
let (read_fd, write_fd) = create_pipe().unwrap();
let main_queue = Arc::new(Mutex::new(MainQueue {
tasks: VecDeque::new(),
write_fd,
}));
let dispatcher = AndroidDispatcher {
main_queue,
read_fd,
looper: std::ptr::null_mut(),
pool: ThreadPool::new(1),
delayed: Mutex::new(Vec::new()),
shutdown: AtomicBool::new(false),
};
let ran = Arc::new(AtomicBool::new(false));
let ran2 = Arc::clone(&ran);
dispatcher.dispatch_after(Duration::from_secs(60), move || {
ran2.store(true, Ordering::Relaxed);
});
dispatcher.tick();
assert!(!ran.load(Ordering::Relaxed), "task should not run yet");
dispatcher.shutdown.store(true, Ordering::SeqCst);
std::mem::forget(dispatcher); unsafe {
libc_close(read_fd);
libc_close(write_fd);
}
}
#[test]
fn pipe_creation() {
let (r, w) = create_pipe().expect("pipe");
unsafe {
libc_close(r);
libc_close(w);
}
}
}