momo-rs-core 0.1.0

Core task scheduling and executor for the Momo runtime.
Documentation
//! # Momo (Core Execution Engine)
#![doc(hidden)]
//!
//! The core task scheduling and execution logic for the Momo runtime.
//!
//! This module implements the internal event loop and task model that powers 
//! the entire Momo ecosystem. While users typically interact with these APIs 
//! via the umbrella `momo` crate, this module defines the fundamental 
//! rules of execution.
//!
//! ## Execution Philosophy
//!
//! Momo is built on the principle of **Thread-Local Affinity**. Unlike multi-threaded 
//! runtimes that use work-stealing, Momo pins every task to the thread that 
//! spawned it. This allows the runtime to be completely atomic-free in its 
//! hot paths, providing superior performance and cache locality for 
//! single-threaded or performance-critical workloads.
//!
//! ## Core Primitives
//!
//! - **The Scheduler**: A cooperative batch-based scheduler that processes 
//!   tasks in 128-poll windows to ensure system responsiveness.
//! - **Injection Queues**: A mechanism for tasks to be safely awakened by 
//!   remote threads and integrated into the local execution flow.
//! - **Lifecycle Management**: Tools for starting, stopping, and yielding 
//!   execution within a task.
//!
//! ## Usage Example (via momo-rs)
//!
//! ```rust,ignore
//! #[momo::main]
//! async fn main() {
//!     // Spawning a task in the momo-rs core
//!     momo::spawn(async {
//!         println!("Running on the Momo core scheduler.");
//!     });
//! }
//! ```

use std::collections::VecDeque;
use std::sync::Arc;
use core::cell::RefCell;
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};

use tracing::{debug_span, Span};
use core::sync::atomic::{AtomicU64, Ordering};
use std::thread::ThreadId;
use std::sync::Mutex;
use std::collections::HashMap;
use lazy_static::lazy_static;

static WAKEUP_HANDLER: Mutex<Option<fn(ThreadId)>> = Mutex::new(None);

/// Internal registration for cross-thread reactor wakeups.
///
/// This is used by the `momo-driver` to register the reactor's wakeup mechanism 
/// with the core executor.
#[inline]
pub fn set_wakeup_handler(handler: fn(ThreadId)) {
    *WAKEUP_HANDLER.lock().unwrap_or_else(|e| e.into_inner()) = Some(handler);
}

#[inline]
fn signal_wakeup(id: ThreadId) {
    if let Some(handler) = *WAKEUP_HANDLER.lock().unwrap_or_else(|e| e.into_inner()) {
        handler(id);
    }
}

static NEXT_TASK_ID: AtomicU64 = AtomicU64::new(1);

struct Task {
    owner_thread: ThreadId,
    future: RefCell<Pin<Box<dyn Future<Output = ()>>>>,
    span: Span,
}

unsafe impl Send for Task {}
unsafe impl Sync for Task {}

thread_local! {
    static RUN_QUEUE: RefCell<VecDeque<Arc<Task>>> = RefCell::new(VecDeque::new());
    static STOP: RefCell<bool> = RefCell::new(false);
    static THREAD_ID: ThreadId = std::thread::current().id();
}

lazy_static! {
    static ref INJECTION_QUEUES: Mutex<HashMap<ThreadId, (Arc<Mutex<VecDeque<Arc<Task>>>>, Arc<core::sync::atomic::AtomicBool>)>> = Mutex::new(HashMap::new());
}

fn get_injection_lock() -> std::sync::MutexGuard<'static, HashMap<ThreadId, (Arc<Mutex<VecDeque<Arc<Task>>>>, Arc<core::sync::atomic::AtomicBool>)>> {
    INJECTION_QUEUES.lock().unwrap_or_else(|e| e.into_inner())
}

thread_local! {
    static MY_INJECTION: RefCell<Option<(Arc<Mutex<VecDeque<Arc<Task>>>>, Arc<core::sync::atomic::AtomicBool>)>> = RefCell::new(None);
}

fn get_my_injection() -> (Arc<Mutex<VecDeque<Arc<Task>>>>, Arc<core::sync::atomic::AtomicBool>) {
    MY_INJECTION.with(|my| {
        let mut my = my.borrow_mut();
        if let Some(cached) = &*my {
            return cached.clone();
        }
        let current = THREAD_ID.with(|id| *id);
        let mut queues = get_injection_lock();
        let pair = queues.entry(current).or_insert_with(|| (Arc::new(Mutex::new(VecDeque::new())), Arc::new(core::sync::atomic::AtomicBool::new(false)))).clone();
        *my = Some(pair.clone());
        pair
    })
}

/// Signals the `momo-rs` runtime to stop execution on the current thread.
pub fn stop() {
    STOP.with(|s| *s.borrow_mut() = true);
}

/// Returns whether the runtime has been signaled to stop.
pub fn is_stopped() -> bool {
    STOP.with(|s| *s.borrow())
}

impl Task {
    fn poll(self: Arc<Self>) {
        let _enter = self.span.enter();
        let waker = unsafe { Waker::from_raw(self.clone().raw_waker()) };
        let mut cx = Context::from_waker(&waker);
        
        let mut future = self.future.borrow_mut();
        match future.as_mut().poll(&mut cx) {
            Poll::Ready(_) => {}
            Poll::Pending => {}
        }
    }

    fn raw_waker(self: Arc<Self>) -> RawWaker {
        let ptr = Arc::into_raw(self) as *const ();
        RawWaker::new(ptr, &VTABLE)
    }
}

static VTABLE: RawWakerVTable = RawWakerVTable::new(
    clone_waker,
    wake,
    wake_by_ref,
    drop_waker,
);

unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
    let arc = Arc::from_raw(ptr as *const Task);
    let cloned = arc.clone();
    let _ = Arc::into_raw(arc);
    let new_ptr = Arc::into_raw(cloned) as *const ();
    RawWaker::new(new_ptr, &VTABLE)
}

unsafe fn wake(ptr: *const ()) {
    let arc = Arc::from_raw(ptr as *const Task);
    let owner = arc.owner_thread;
    let current = THREAD_ID.with(|id| *id);
    
    if owner == current {
        RUN_QUEUE.with(|q| {
            q.borrow_mut().push_back(arc);
        });
    } else {
        let (queue, flag) = {
            let mut queues = get_injection_lock();
            queues.entry(owner).or_insert_with(|| (Arc::new(Mutex::new(VecDeque::new())), Arc::new(core::sync::atomic::AtomicBool::new(false)))).clone()
        };
        queue.lock().unwrap_or_else(|e| e.into_inner()).push_back(arc);
        flag.store(true, Ordering::Release);
        signal_wakeup(owner);
    }
}

unsafe fn wake_by_ref(ptr: *const ()) {
    let arc = Arc::from_raw(ptr as *const Task);
    let owner = arc.owner_thread;
    let current = THREAD_ID.with(|id| *id);

    if owner == current {
        RUN_QUEUE.with(|q| {
            q.borrow_mut().push_back(arc.clone());
        });
    } else {
        let (queue, flag) = {
            let mut queues = get_injection_lock();
            queues.entry(owner).or_insert_with(|| (Arc::new(Mutex::new(VecDeque::new())), Arc::new(core::sync::atomic::AtomicBool::new(false)))).clone()
        };
        queue.lock().unwrap_or_else(|e| e.into_inner()).push_back(arc.clone());
        flag.store(true, Ordering::Release);
        signal_wakeup(owner);
    }
    let _ = Arc::into_raw(arc);
}

unsafe fn drop_waker(ptr: *const ()) {
    drop(Arc::from_raw(ptr as *const Task));
}

/// Spawns a new asynchronous task into the **momo-rs** engine.
///
/// The task is pinned to the current thread and will never migrate.
pub fn spawn<F>(future: F)
where
    F: Future<Output = ()> + 'static,
{
    let _ = get_my_injection();

    let id = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed);
    let span = debug_span!("task", id = id);
    
    let task = Arc::new(Task {
        owner_thread: THREAD_ID.with(|id| *id),
        future: RefCell::new(Box::pin(future)),
        span,
    });
    
    RUN_QUEUE.with(|q| {
        q.borrow_mut().push_back(task);
    });
}

/// Internal check for whether any tasks are ready to be polled.
pub fn has_pending_tasks() -> bool {
    let local_empty = RUN_QUEUE.with(|q| q.borrow().is_empty());
    if !local_empty { return true; }

    let (queue, flag) = get_my_injection();
    if !flag.load(Ordering::Acquire) {
        return false;
    }

    let remote = queue.lock().unwrap_or_else(|e| e.into_inner());
    let res = !remote.is_empty();
    if !res {
        flag.store(false, Ordering::Release);
    }
    res
}

/// A handle to the core task scheduler for **momo-rs**.
pub struct Executor;

impl Executor {
    /// Initialises the executor for the current thread.
    pub fn new() -> Self {
        let _ = get_my_injection();
        Self
    }

    /// Primary execution method for the **momo-rs** engine.
    ///
    /// Drains the injection queue and polls up to 128 local tasks.
    pub fn run_until_idle(&self) {
        self.drain_injection_queue();

        let mut processed = 0;
        const POLL_BUDGET: usize = 128;

        while processed < POLL_BUDGET {
            let task = RUN_QUEUE.with(|q| q.borrow_mut().pop_front());
            
            match task {
                Some(task) => {
                    task.poll();
                    processed += 1;
                }
                None => break,
            }
        }
    }

    fn drain_injection_queue(&self) {
        let (queue, flag) = get_my_injection();
        let mut remote = queue.lock().unwrap_or_else(|e| e.into_inner());
        if !remote.is_empty() {
            RUN_QUEUE.with(|q| {
                let mut local = q.borrow_mut();
                while let Some(task) = remote.pop_front() {
                    local.push_back(task);
                }
            });
            flag.store(false, Ordering::Release);
        } else {
            flag.store(false, Ordering::Release);
        }
    }
}

/// Cooperatively yields control back to the **momo-rs** scheduler.
pub fn yield_now() -> YieldNow {
    YieldNow { yielded: false }
}

/// Future for the `yield_now` operation.
pub struct YieldNow {
    yielded: bool,
}

impl Future for YieldNow {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if self.yielded {
            return Poll::Ready(());
        }
        self.yielded = true;
        cx.waker().wake_by_ref();
        Poll::Pending
    }
}