compio-runtime 0.11.0

High-level runtime for compio
Documentation
#[cfg(target_os = "macos")]
#[test]
fn cf_run_loop() {
    use std::{
        future::Future,
        os::raw::c_void,
        sync::{Arc, Mutex},
        task::{Context, Poll},
        time::Duration,
    };

    use block2::{Block, StackBlock};
    use compio_driver::AsRawFd;
    use compio_runtime::{Runtime, event::Event};
    use core_foundation::{
        base::TCFType,
        filedescriptor::{CFFileDescriptor, CFFileDescriptorRef, kCFFileDescriptorReadCallBack},
        runloop::{CFRunLoop, CFRunLoopRef, CFRunLoopStop, kCFRunLoopDefaultMode},
        string::CFStringRef,
    };

    struct CFRunLoopRuntime {
        runtime: Runtime,
        fd_source: CFFileDescriptor,
    }

    impl CFRunLoopRuntime {
        pub fn new() -> Self {
            let runtime = Runtime::new().unwrap();

            extern "C" fn callback(
                _fdref: CFFileDescriptorRef,
                _callback_types: usize,
                _info: *mut c_void,
            ) {
            }

            let fd_source =
                CFFileDescriptor::new(runtime.as_raw_fd(), false, callback, None).unwrap();
            let source = fd_source.to_run_loop_source(0).unwrap();

            CFRunLoop::get_current().add_source(&source, unsafe { kCFRunLoopDefaultMode });

            Self { runtime, fd_source }
        }

        pub fn block_on<F: Future>(&self, future: F) -> F::Output {
            self.runtime.enter(|| {
                let waker = self.runtime.waker();
                let mut context = Context::from_waker(&waker);
                let mut future = std::pin::pin!(future);
                loop {
                    self.runtime.poll_with(Some(Duration::ZERO));

                    if let Poll::Ready(result) = future.as_mut().poll(&mut context) {
                        self.runtime.run();
                        break result;
                    }
                    let remaining_tasks = self.runtime.run();

                    let timeout = if remaining_tasks {
                        Some(Duration::ZERO)
                    } else {
                        self.runtime.current_timeout()
                    };
                    self.fd_source
                        .enable_callbacks(kCFFileDescriptorReadCallBack);
                    CFRunLoop::run_in_mode(
                        unsafe { kCFRunLoopDefaultMode },
                        timeout.unwrap_or(Duration::MAX),
                        true,
                    );
                }
            })
        }
    }

    let runtime = CFRunLoopRuntime::new();

    runtime.block_on(async {
        compio_runtime::time::sleep(Duration::from_secs(1)).await;

        let event = Event::new();
        let handle = Arc::new(Mutex::new(Some(event.handle())));
        let run_loop = CFRunLoop::get_current();
        let block = StackBlock::new(move || {
            handle.lock().unwrap().take().unwrap().notify();
            unsafe {
                CFRunLoopStop(run_loop.as_concrete_TypeRef());
            }
        });
        unsafe extern "C" {
            fn CFRunLoopPerformBlock(rl: CFRunLoopRef, mode: CFStringRef, block: &Block<dyn Fn()>);
        }
        let run_loop = CFRunLoop::get_current();
        unsafe {
            CFRunLoopPerformBlock(
                run_loop.as_concrete_TypeRef(),
                kCFRunLoopDefaultMode,
                &block,
            );
        }
        event.wait().await;
    });
}

#[cfg(windows)]
#[test]
fn message_queue() {
    use std::{
        future::Future,
        mem::MaybeUninit,
        ptr::null_mut,
        sync::Mutex,
        task::{Context, Poll},
        time::Duration,
    };

    use compio_driver::AsRawFd;
    use compio_runtime::{
        Runtime,
        event::{Event, EventHandle},
    };
    use windows_sys::Win32::{
        Foundation::{HANDLE, HWND, WAIT_FAILED},
        System::Threading::INFINITE,
        UI::WindowsAndMessaging::{
            DispatchMessageW, KillTimer, MWMO_ALERTABLE, MWMO_INPUTAVAILABLE,
            MsgWaitForMultipleObjectsEx, PM_REMOVE, PeekMessageW, QS_ALLINPUT, SetTimer,
            TranslateMessage,
        },
    };

    struct MQRuntime {
        runtime: Runtime,
    }

    impl MQRuntime {
        pub fn new() -> Self {
            Self {
                runtime: Runtime::new().unwrap(),
            }
        }

        pub fn block_on<F: Future>(&self, future: F) -> F::Output {
            self.runtime.enter(|| {
                let waker = self.runtime.waker();
                let mut context = Context::from_waker(&waker);
                let mut future = std::pin::pin!(future);
                loop {
                    self.runtime.poll_with(Some(Duration::ZERO));

                    if let Poll::Ready(result) = future.as_mut().poll(&mut context) {
                        self.runtime.run();
                        break result;
                    }
                    let remaining_tasks = self.runtime.run();

                    let timeout = if remaining_tasks {
                        Some(Duration::ZERO)
                    } else {
                        self.runtime.current_timeout()
                    };
                    let timeout = match timeout {
                        Some(timeout) => timeout.as_millis() as u32,
                        None => INFINITE,
                    };
                    let handle = self.runtime.as_raw_fd() as HANDLE;
                    let res = unsafe {
                        MsgWaitForMultipleObjectsEx(
                            1,
                            &handle,
                            timeout,
                            QS_ALLINPUT,
                            MWMO_ALERTABLE | MWMO_INPUTAVAILABLE,
                        )
                    };
                    if res == WAIT_FAILED {
                        panic!("{:?}", std::io::Error::last_os_error());
                    }

                    let mut msg = MaybeUninit::uninit();
                    let res = unsafe {
                        use std::ptr::null_mut;
                        PeekMessageW(msg.as_mut_ptr(), null_mut(), 0, 0, PM_REMOVE)
                    };
                    if res != 0 {
                        let msg = unsafe { msg.assume_init() };
                        unsafe {
                            TranslateMessage(&msg);
                            DispatchMessageW(&msg);
                        }
                    }
                }
            })
        }
    }

    let runtime = MQRuntime::new();

    runtime.block_on(async {
        compio_runtime::time::sleep(Duration::from_secs(1)).await;

        static GLOBAL_EVENT: Mutex<Option<EventHandle>> = Mutex::new(None);

        let event = Event::new();
        *GLOBAL_EVENT.lock().unwrap() = Some(event.handle());

        unsafe extern "system" fn timer_callback(hwnd: HWND, _msg: u32, id: usize, _dwtime: u32) {
            let handle = GLOBAL_EVENT.lock().unwrap().take().unwrap();
            handle.notify();
            unsafe { KillTimer(hwnd, id) };
        }

        unsafe {
            SetTimer(null_mut(), 0, 1, Some(timer_callback));
        }

        event.wait().await;
    });
}