Skip to main content

momo_rs_core/
lib.rs

1//! # Momo (Core Execution Engine)
2#![doc(hidden)]
3//!
4//! The core task scheduling and execution logic for the Momo runtime.
5//!
6//! This module implements the internal event loop and task model that powers 
7//! the entire Momo ecosystem. While users typically interact with these APIs 
8//! via the umbrella `momo` crate, this module defines the fundamental 
9//! rules of execution.
10//!
11//! ## Execution Philosophy
12//!
13//! Momo is built on the principle of **Thread-Local Affinity**. Unlike multi-threaded 
14//! runtimes that use work-stealing, Momo pins every task to the thread that 
15//! spawned it. This allows the runtime to be completely atomic-free in its 
16//! hot paths, providing superior performance and cache locality for 
17//! single-threaded or performance-critical workloads.
18//!
19//! ## Core Primitives
20//!
21//! - **The Scheduler**: A cooperative batch-based scheduler that processes 
22//!   tasks in 128-poll windows to ensure system responsiveness.
23//! - **Injection Queues**: A mechanism for tasks to be safely awakened by 
24//!   remote threads and integrated into the local execution flow.
25//! - **Lifecycle Management**: Tools for starting, stopping, and yielding 
26//!   execution within a task.
27//!
28//! ## Usage Example (via momo-rs)
29//!
30//! ```rust,ignore
31//! #[momo::main]
32//! async fn main() {
33//!     // Spawning a task in the momo-rs core
34//!     momo::spawn(async {
35//!         println!("Running on the Momo core scheduler.");
36//!     });
37//! }
38//! ```
39
40use std::collections::VecDeque;
41use std::sync::Arc;
42use core::cell::RefCell;
43use core::future::Future;
44use core::pin::Pin;
45use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
46
47use tracing::{debug_span, Span};
48use core::sync::atomic::{AtomicU64, Ordering};
49use std::thread::ThreadId;
50use std::sync::Mutex;
51use std::collections::HashMap;
52use lazy_static::lazy_static;
53
54static WAKEUP_HANDLER: Mutex<Option<fn(ThreadId)>> = Mutex::new(None);
55
56/// Internal registration for cross-thread reactor wakeups.
57///
58/// This is used by the `momo-driver` to register the reactor's wakeup mechanism 
59/// with the core executor.
60#[inline]
61pub fn set_wakeup_handler(handler: fn(ThreadId)) {
62    *WAKEUP_HANDLER.lock().unwrap_or_else(|e| e.into_inner()) = Some(handler);
63}
64
65#[inline]
66fn signal_wakeup(id: ThreadId) {
67    if let Some(handler) = *WAKEUP_HANDLER.lock().unwrap_or_else(|e| e.into_inner()) {
68        handler(id);
69    }
70}
71
72static NEXT_TASK_ID: AtomicU64 = AtomicU64::new(1);
73
74struct Task {
75    owner_thread: ThreadId,
76    future: RefCell<Pin<Box<dyn Future<Output = ()>>>>,
77    span: Span,
78}
79
80unsafe impl Send for Task {}
81unsafe impl Sync for Task {}
82
83thread_local! {
84    static RUN_QUEUE: RefCell<VecDeque<Arc<Task>>> = RefCell::new(VecDeque::new());
85    static STOP: RefCell<bool> = RefCell::new(false);
86    static THREAD_ID: ThreadId = std::thread::current().id();
87}
88
89lazy_static! {
90    static ref INJECTION_QUEUES: Mutex<HashMap<ThreadId, (Arc<Mutex<VecDeque<Arc<Task>>>>, Arc<core::sync::atomic::AtomicBool>)>> = Mutex::new(HashMap::new());
91}
92
93fn get_injection_lock() -> std::sync::MutexGuard<'static, HashMap<ThreadId, (Arc<Mutex<VecDeque<Arc<Task>>>>, Arc<core::sync::atomic::AtomicBool>)>> {
94    INJECTION_QUEUES.lock().unwrap_or_else(|e| e.into_inner())
95}
96
97thread_local! {
98    static MY_INJECTION: RefCell<Option<(Arc<Mutex<VecDeque<Arc<Task>>>>, Arc<core::sync::atomic::AtomicBool>)>> = RefCell::new(None);
99}
100
101fn get_my_injection() -> (Arc<Mutex<VecDeque<Arc<Task>>>>, Arc<core::sync::atomic::AtomicBool>) {
102    MY_INJECTION.with(|my| {
103        let mut my = my.borrow_mut();
104        if let Some(cached) = &*my {
105            return cached.clone();
106        }
107        let current = THREAD_ID.with(|id| *id);
108        let mut queues = get_injection_lock();
109        let pair = queues.entry(current).or_insert_with(|| (Arc::new(Mutex::new(VecDeque::new())), Arc::new(core::sync::atomic::AtomicBool::new(false)))).clone();
110        *my = Some(pair.clone());
111        pair
112    })
113}
114
115/// Signals the `momo-rs` runtime to stop execution on the current thread.
116pub fn stop() {
117    STOP.with(|s| *s.borrow_mut() = true);
118}
119
120/// Returns whether the runtime has been signaled to stop.
121pub fn is_stopped() -> bool {
122    STOP.with(|s| *s.borrow())
123}
124
125impl Task {
126    fn poll(self: Arc<Self>) {
127        let _enter = self.span.enter();
128        let waker = unsafe { Waker::from_raw(self.clone().raw_waker()) };
129        let mut cx = Context::from_waker(&waker);
130        
131        let mut future = self.future.borrow_mut();
132        match future.as_mut().poll(&mut cx) {
133            Poll::Ready(_) => {}
134            Poll::Pending => {}
135        }
136    }
137
138    fn raw_waker(self: Arc<Self>) -> RawWaker {
139        let ptr = Arc::into_raw(self) as *const ();
140        RawWaker::new(ptr, &VTABLE)
141    }
142}
143
144static VTABLE: RawWakerVTable = RawWakerVTable::new(
145    clone_waker,
146    wake,
147    wake_by_ref,
148    drop_waker,
149);
150
151unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
152    let arc = Arc::from_raw(ptr as *const Task);
153    let cloned = arc.clone();
154    let _ = Arc::into_raw(arc);
155    let new_ptr = Arc::into_raw(cloned) as *const ();
156    RawWaker::new(new_ptr, &VTABLE)
157}
158
159unsafe fn wake(ptr: *const ()) {
160    let arc = Arc::from_raw(ptr as *const Task);
161    let owner = arc.owner_thread;
162    let current = THREAD_ID.with(|id| *id);
163    
164    if owner == current {
165        RUN_QUEUE.with(|q| {
166            q.borrow_mut().push_back(arc);
167        });
168    } else {
169        let (queue, flag) = {
170            let mut queues = get_injection_lock();
171            queues.entry(owner).or_insert_with(|| (Arc::new(Mutex::new(VecDeque::new())), Arc::new(core::sync::atomic::AtomicBool::new(false)))).clone()
172        };
173        queue.lock().unwrap_or_else(|e| e.into_inner()).push_back(arc);
174        flag.store(true, Ordering::Release);
175        signal_wakeup(owner);
176    }
177}
178
179unsafe fn wake_by_ref(ptr: *const ()) {
180    let arc = Arc::from_raw(ptr as *const Task);
181    let owner = arc.owner_thread;
182    let current = THREAD_ID.with(|id| *id);
183
184    if owner == current {
185        RUN_QUEUE.with(|q| {
186            q.borrow_mut().push_back(arc.clone());
187        });
188    } else {
189        let (queue, flag) = {
190            let mut queues = get_injection_lock();
191            queues.entry(owner).or_insert_with(|| (Arc::new(Mutex::new(VecDeque::new())), Arc::new(core::sync::atomic::AtomicBool::new(false)))).clone()
192        };
193        queue.lock().unwrap_or_else(|e| e.into_inner()).push_back(arc.clone());
194        flag.store(true, Ordering::Release);
195        signal_wakeup(owner);
196    }
197    let _ = Arc::into_raw(arc);
198}
199
200unsafe fn drop_waker(ptr: *const ()) {
201    drop(Arc::from_raw(ptr as *const Task));
202}
203
204/// Spawns a new asynchronous task into the **momo-rs** engine.
205///
206/// The task is pinned to the current thread and will never migrate.
207pub fn spawn<F>(future: F)
208where
209    F: Future<Output = ()> + 'static,
210{
211    let _ = get_my_injection();
212
213    let id = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed);
214    let span = debug_span!("task", id = id);
215    
216    let task = Arc::new(Task {
217        owner_thread: THREAD_ID.with(|id| *id),
218        future: RefCell::new(Box::pin(future)),
219        span,
220    });
221    
222    RUN_QUEUE.with(|q| {
223        q.borrow_mut().push_back(task);
224    });
225}
226
227/// Internal check for whether any tasks are ready to be polled.
228pub fn has_pending_tasks() -> bool {
229    let local_empty = RUN_QUEUE.with(|q| q.borrow().is_empty());
230    if !local_empty { return true; }
231
232    let (queue, flag) = get_my_injection();
233    if !flag.load(Ordering::Acquire) {
234        return false;
235    }
236
237    let remote = queue.lock().unwrap_or_else(|e| e.into_inner());
238    let res = !remote.is_empty();
239    if !res {
240        flag.store(false, Ordering::Release);
241    }
242    res
243}
244
245/// A handle to the core task scheduler for **momo-rs**.
246pub struct Executor;
247
248impl Executor {
249    /// Initialises the executor for the current thread.
250    pub fn new() -> Self {
251        let _ = get_my_injection();
252        Self
253    }
254
255    /// Primary execution method for the **momo-rs** engine.
256    ///
257    /// Drains the injection queue and polls up to 128 local tasks.
258    pub fn run_until_idle(&self) {
259        self.drain_injection_queue();
260
261        let mut processed = 0;
262        const POLL_BUDGET: usize = 128;
263
264        while processed < POLL_BUDGET {
265            let task = RUN_QUEUE.with(|q| q.borrow_mut().pop_front());
266            
267            match task {
268                Some(task) => {
269                    task.poll();
270                    processed += 1;
271                }
272                None => break,
273            }
274        }
275    }
276
277    fn drain_injection_queue(&self) {
278        let (queue, flag) = get_my_injection();
279        let mut remote = queue.lock().unwrap_or_else(|e| e.into_inner());
280        if !remote.is_empty() {
281            RUN_QUEUE.with(|q| {
282                let mut local = q.borrow_mut();
283                while let Some(task) = remote.pop_front() {
284                    local.push_back(task);
285                }
286            });
287            flag.store(false, Ordering::Release);
288        } else {
289            flag.store(false, Ordering::Release);
290        }
291    }
292}
293
294/// Cooperatively yields control back to the **momo-rs** scheduler.
295pub fn yield_now() -> YieldNow {
296    YieldNow { yielded: false }
297}
298
299/// Future for the `yield_now` operation.
300pub struct YieldNow {
301    yielded: bool,
302}
303
304impl Future for YieldNow {
305    type Output = ();
306
307    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
308        if self.yielded {
309            return Poll::Ready(());
310        }
311        self.yielded = true;
312        cx.waker().wake_by_ref();
313        Poll::Pending
314    }
315}