mod sys;
use std::{
cell::{Cell, RefCell},
collections::HashMap,
ffi::c_int,
mem::ManuallyDrop,
rc::{Rc, Weak},
sync::{Arc, Mutex},
time::{Duration, Instant},
};
pub type HandleType = usize;
pub const INVALID_HANDLE: HandleType = 0;
use sys::{libc::*, ndk_sys::*};
use self::sys::libc;
pub struct PlatformRunLoop {
looper: *mut ALooper,
pipes: [c_int; 2],
state: Rc<State>,
state_ptr: *const State,
running: Cell<bool>,
}
struct Timer {
scheduled: Instant,
callback: Box<dyn FnOnce()>,
}
struct State {
timer_fd: c_int,
callbacks: Arc<Mutex<Callbacks>>,
next_handle: Cell<HandleType>,
timers: RefCell<HashMap<HandleType, Timer>>,
}
type SenderCallback = Box<dyn FnOnce() + Send>;
struct Callbacks {
fd: c_int,
callbacks: Vec<SenderCallback>,
}
#[allow(unused_variables)]
impl PlatformRunLoop {
pub fn new() -> Self {
let looper = unsafe {
let mut looper = ALooper_forThread();
if looper.is_null() {
looper = ALooper_prepare(0);
}
ALooper_acquire(looper);
looper
};
let mut pipes: [c_int; 2] = [0, 2];
unsafe { pipe(pipes.as_mut_ptr()) };
let timer_fd = unsafe { timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK) };
let state = Rc::new(State {
timer_fd,
callbacks: Arc::new(Mutex::new(Callbacks {
fd: pipes[1],
callbacks: Vec::new(),
})),
next_handle: Cell::new(INVALID_HANDLE + 1),
timers: RefCell::new(HashMap::new()),
});
let state_ptr = Weak::into_raw(Rc::downgrade(&state));
unsafe {
ALooper_addFd(
looper,
pipes[0],
0,
ALOOPER_EVENT_INPUT as c_int,
Some(Self::looper_cb),
state_ptr as *mut _,
);
ALooper_addFd(
looper,
timer_fd,
0,
ALOOPER_EVENT_INPUT as c_int,
Some(Self::looper_timer_cb),
state_ptr as *mut _,
);
}
Self {
looper,
pipes,
state,
state_ptr,
running: Cell::new(false),
}
}
unsafe extern "C" fn looper_cb(
fd: ::std::ffi::c_int,
events: ::std::ffi::c_int,
data: *mut ::std::ffi::c_void,
) -> ::std::ffi::c_int {
let mut buf = [0u8; 8];
read(fd, buf.as_mut_ptr() as *mut _, buf.len());
let state = data as *const State;
let state = ManuallyDrop::new(Weak::from_raw(state));
if let Some(state) = state.upgrade() {
state.process_callbacks();
}
1
}
unsafe extern "C" fn looper_timer_cb(
fd: ::std::ffi::c_int,
events: ::std::ffi::c_int,
data: *mut ::std::ffi::c_void,
) -> ::std::ffi::c_int {
let mut buf = [0u8; 8];
read(fd, buf.as_mut_ptr() as *mut _, buf.len());
let state = data as *const State;
let state = ManuallyDrop::new(Weak::from_raw(state));
if let Some(state) = state.upgrade() {
state.process_timers();
}
1
}
pub fn poll_once(&self) {
unsafe {
ALooper_pollOnce(
4,
std::ptr::null_mut(),
std::ptr::null_mut(),
std::ptr::null_mut(),
)
};
}
pub fn unschedule(&self, handle: HandleType) {
self.state.unschedule(handle);
}
#[must_use]
pub fn schedule<F>(&self, in_time: Duration, callback: F) -> HandleType
where
F: FnOnce() + 'static,
{
self.state.schedule(in_time, callback)
}
pub fn new_sender(&self) -> PlatformRunLoopSender {
PlatformRunLoopSender {
callbacks: Arc::downgrade(&self.state.callbacks),
}
}
pub fn run(&self) {
self.running.set(true);
while self.running.get() {
let res = unsafe {
ALooper_pollOnce(
-1,
std::ptr::null_mut(),
std::ptr::null_mut(),
std::ptr::null_mut(),
)
};
if res == ALOOPER_POLL_TIMEOUT || res == ALOOPER_POLL_ERROR {
self.running.set(false);
}
}
}
pub fn stop(&self) {
self.running.set(false);
unsafe { ALooper_wake(self.looper) };
}
}
impl State {
fn process_callbacks(&self) {
let callbacks: Vec<SenderCallback> = {
let mut callbacks = self.callbacks.lock().unwrap();
callbacks.callbacks.drain(0..).collect()
};
for c in callbacks {
c()
}
}
fn get_pending_timers(&self) -> Vec<HandleType> {
let now = Instant::now();
self.timers
.borrow()
.iter()
.filter(|v| v.1.scheduled <= now)
.map(|v| *v.0)
.collect()
}
fn process_pending_timers(&self, pending: Vec<HandleType>) {
for handle in pending {
let timer = self.timers.borrow_mut().remove(&handle);
if let Some(timer) = timer {
(timer.callback)();
}
}
self.wake_up_at(self.next_timer());
}
fn process_timers(&self) {
loop {
let pending: Vec<HandleType> = self.get_pending_timers();
if pending.is_empty() {
break;
}
self.process_pending_timers(pending);
}
}
fn wake_up_at(&self, time: Instant) {
let wait_time = time.saturating_duration_since(Instant::now());
let spec = itimerspec {
it_interval: timespec {
tv_sec: 0,
tv_nsec: 0,
},
it_value: timespec {
tv_sec: wait_time.as_secs().try_into().unwrap(),
tv_nsec: wait_time.subsec_nanos().try_into().unwrap(),
},
};
unsafe {
timerfd_settime(self.timer_fd, 0, &spec as *const _, std::ptr::null_mut());
}
}
fn next_timer(&self) -> Instant {
let min = self.timers.borrow().values().map(|x| x.scheduled).min();
min.unwrap_or_else(|| Instant::now() + Duration::from_secs(60 * 60))
}
fn next_handle(&self) -> HandleType {
let r = self.next_handle.get();
self.next_handle.replace(r + 1);
r
}
pub fn schedule<F>(&self, in_time: Duration, callback: F) -> HandleType
where
F: FnOnce() + 'static,
{
let handle = self.next_handle();
self.timers.borrow_mut().insert(
handle,
Timer {
scheduled: Instant::now() + in_time,
callback: Box::new(callback),
},
);
self.wake_up_at(self.next_timer());
handle
}
pub fn unschedule(&self, handle: HandleType) {
self.timers.borrow_mut().remove(&handle);
self.wake_up_at(self.next_timer());
}
}
impl Drop for PlatformRunLoop {
fn drop(&mut self) {
unsafe {
ALooper_removeFd(self.looper, self.pipes[0]);
ALooper_removeFd(self.looper, self.state.timer_fd);
ALooper_release(self.looper);
Weak::from_raw(self.state_ptr);
close(self.pipes[0]);
close(self.pipes[1]);
}
}
}
#[derive(Clone)]
pub struct PlatformRunLoopSender {
callbacks: std::sync::Weak<Mutex<Callbacks>>,
}
#[allow(unused_variables)]
impl PlatformRunLoopSender {
pub fn send<F>(&self, callback: F) -> bool
where
F: FnOnce() + 'static + Send,
{
if let Some(callbacks) = self.callbacks.upgrade() {
let mut callbacks = callbacks.lock().unwrap();
callbacks.callbacks.push(Box::new(callback));
let buf = [0u8; 8];
unsafe {
write(callbacks.fd, buf.as_ptr() as *const _, buf.len());
}
true
} else {
false
}
}
}
pub(crate) type PlatformThreadId = usize;
pub(crate) fn get_system_thread_id() -> PlatformThreadId {
unsafe { libc::pthread_self() }
}