rusticated 0.1.2

Fast, standard-library-shaped async platform layer for brush-async
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
use crate::boxed::Box;
use crate::cell::{Cell, RefCell};
use crate::collections::VecDeque;
use crate::future::Future;
use crate::io;
use crate::pin::Pin;
use crate::task::{Context, Poll, Waker};
use crate::time::Duration;
use alloc::sync::Arc;
use core::sync::atomic::{AtomicBool, Ordering};

use super::{timers::next_deadline, waker::task_waker};

#[cfg(target_os = "linux")]
use super::linux_epoll::Driver;

#[cfg(windows)]
use super::windows::Driver;

#[cfg(any(
    target_os = "macos",
    target_os = "freebsd",
    target_os = "openbsd",
    target_os = "netbsd"
))]
use super::bsd::Driver;

// ─── Task ────────────────────────────────────────────────────────────────────

/// A task in the per-thread run queue.
struct Task {
    /// The boxed, pinned future to drive.
    future: Pin<Box<dyn Future<Output = ()>>>,
    /// Set to `true` by the waker when the task should be re-polled.
    woken: Arc<AtomicBool>,
}

// ── Thread-local executor state ───────────────────────────────────────────────
// TASKS: per-thread run queue. DRIVER: per-thread I/O reactor.
// TASK_DEPTH: approximate queue depth (available for work-stealing hints).

thread_local! {
    static TASKS: RefCell<VecDeque<Task>> = RefCell::new(VecDeque::new());
    static DRIVER: RefCell<Option<Driver>> = RefCell::new(None);
    static TASK_DEPTH: Cell<usize> = Cell::new(0);
}

fn tasks_mut<R>(f: impl FnOnce(&mut VecDeque<Task>) -> R) -> R {
    TASKS.with(|q| f(&mut *q.borrow_mut()))
}

pub(crate) fn with_driver<R>(f: impl FnOnce(&mut Driver) -> R) -> io::Result<R> {
    DRIVER.with(|cell| -> io::Result<R> {
        let mut borrow = cell.borrow_mut();
        if borrow.is_none() {
            *borrow = Some(Driver::new()?);
        }
        match borrow.as_mut() {
            Some(d) => Ok(f(d)),
            None => Err(io::Error::other("driver init failed")),
        }
    })
}

// ─── JoinHandle ──────────────────────────────────────────────────────────────

/// Shared state between a spawned task and its [`JoinHandle`].
struct JoinState<T> {
    /// The task's return value, written on completion.
    result: Option<T>,
    /// Waker stored by the [`JoinHandle`] awaiter; woken when the task finishes.
    waker: Option<Waker>,
}

/// Internal future that drives `F` and deposits its output into [`JoinState`].
///
/// This is the actual payload stored in the executor's task queue.
struct JoinFuture<T> {
    inner: Pin<Box<dyn Future<Output = T>>>,
    state: Arc<RefCell<JoinState<T>>>,
}

// JoinFuture<T> is Unpin: Pin<Box<dyn Future>> is Unpin (Box is always Unpin),
// and Arc<RefCell<...>> is Unpin.
impl<T: 'static> Future for JoinFuture<T> {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        let this = Pin::into_inner(self);
        match this.inner.as_mut().poll(cx) {
            Poll::Ready(val) => {
                let mut state = this.state.borrow_mut();
                state.result = Some(val);
                if let Some(w) = state.waker.take() {
                    w.wake();
                }
                Poll::Ready(())
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

/// A handle to a spawned task that can be awaited for its return value.
///
/// Dropping the handle does not cancel the task — it continues to run; its
/// output is simply discarded when it completes.
pub struct JoinHandle<T> {
    state: Arc<RefCell<JoinState<T>>>,
}

impl<T> Future for JoinHandle<T> {
    type Output = T;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
        let mut state = self.state.borrow_mut();
        if let Some(val) = state.result.take() {
            Poll::Ready(val)
        } else {
            state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}

// ─── spawn ───────────────────────────────────────────────────────────────────

/// Submit a future to the per-thread task queue, returning a [`JoinHandle`]
/// that can be awaited to obtain the task's output.
///
/// The future is polled on the current thread by each subsequent
/// [`poll_step`] call. Multiple tasks can be in flight concurrently; they
/// are polled round-robin within each step.
///
/// Dropping the returned handle does not cancel the task.
pub fn spawn<F, T>(future: F) -> JoinHandle<T>
where
    F: Future<Output = T> + 'static,
    T: 'static,
{
    let state = Arc::new(RefCell::new(JoinState {
        result: None,
        waker: None,
    }));
    let handle = JoinHandle {
        state: Arc::clone(&state),
    };
    let wrapper = JoinFuture {
        inner: Box::pin(future),
        state,
    };
    // Mark woken=true so the task is polled on the very first poll_step.
    let woken = Arc::new(AtomicBool::new(true));
    tasks_mut(|q| {
        q.push_back(Task {
            future: Box::pin(wrapper),
            woken,
        })
    });
    handle
}

/// Internal helper: spawn a `Future<Output = ()>` and discard the handle.
///
/// Used by test `block_on` utilities and platform bootstrapping code within
/// this crate. Not part of the public API.
pub(crate) fn run<F>(future: F)
where
    F: Future<Output = ()> + 'static,
{
    let _ = spawn(future);
}

// ─── select ──────────────────────────────────────────────────────────────────

/// The output of [`select`]: whichever branch completed first.
pub enum Either<A, B> {
    /// The first (left) future completed first.
    Left(A),
    /// The second (right) future completed first.
    Right(B),
}

/// Drives two futures concurrently, resolving with whichever completes first.
///
/// Both sides are pinned on the heap. The losing future is dropped.
/// See [`select`] for construction.
pub struct Select<FA, FB> {
    a: Pin<Box<FA>>,
    b: Pin<Box<FB>>,
}

impl<FA: Future, FB: Future> Future for Select<FA, FB> {
    type Output = Either<FA::Output, FB::Output>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Poll left first; if it resolves, the right future is dropped.
        if let Poll::Ready(a) = self.a.as_mut().poll(cx) {
            return Poll::Ready(Either::Left(a));
        }
        // Poll right; if it resolves, the left future is dropped.
        if let Poll::Ready(b) = self.b.as_mut().poll(cx) {
            return Poll::Ready(Either::Right(b));
        }
        Poll::Pending
    }
}

/// Race two futures: resolve with whichever one completes first.
///
/// When both are immediately ready, the left wins (it is polled first).
/// The losing future is dropped when the [`Select`] future resolves.
///
/// # Example
///
/// ```rust,ignore
/// match std::rt::select(future_a, future_b).await {
///     std::rt::Either::Left(a)  => { /* a completed first */ }
///     std::rt::Either::Right(b) => { /* b completed first */ }
/// }
/// ```
pub fn select<FA: Future, FB: Future>(a: FA, b: FB) -> Select<FA, FB> {
    Select {
        a: Box::pin(a),
        b: Box::pin(b),
    }
}

// ─── PollStatus / poll_step ──────────────────────────────────────────────────

/// Outcome of one [`poll_step`] iteration.
///
/// The host uses this to decide whether to keep ticking and how long it may
/// safely sleep before the next tick.
#[derive(Debug, Clone, Copy)]
pub enum PollStatus {
    /// All tasks have completed and there are no pending I/O events.
    Done,
    /// Work was performed (I/O events processed or at least one task made progress).
    Ready,
    /// No work was available this iteration. The host may sleep at most
    /// `next_deadline` before the next call (or indefinitely if `None`).
    Idle {
        /// Upper bound on how long the host may wait before the next call.
        next_deadline: Option<Duration>,
    },
}

/// Drive the runtime by exactly one step.
///
/// Performs a non-blocking platform poll, then polls every queued task
/// once in FIFO order. Tasks that return [`Poll::Pending`] are re-queued;
/// completed tasks are dropped.
///
/// Returns a [`PollStatus`] describing the outcome.
pub fn poll_step() -> io::Result<PollStatus> {
    // Drive the platform reactor with a zero timeout.
    let had_events = with_driver(|d| d.poll_nonblocking())??;

    // Only poll tasks whose waker flag was set since the last step.
    // Tasks spawned during polling are picked up on the next step.
    let n = tasks_mut(|q| q.len());
    let mut made_progress = false;
    let mut remaining = 0usize;

    for _ in 0..n {
        let task = tasks_mut(|q| q.pop_front());
        let Some(mut task) = task else { break };

        if task.woken.swap(false, Ordering::AcqRel) {
            // Task was woken — give it a targeted waker and poll it.
            let waker = task_waker(Arc::clone(&task.woken));
            let mut cx = Context::from_waker(&waker);
            match task.future.as_mut().poll(&mut cx) {
                Poll::Ready(()) => {
                    made_progress = true;
                    // task dropped — not re-queued
                }
                Poll::Pending => {
                    tasks_mut(|q| q.push_back(task));
                    remaining += 1;
                }
            }
        } else {
            // Not yet woken — return to queue without polling.
            tasks_mut(|q| q.push_back(task));
            remaining += 1;
        }
    }

    TASK_DEPTH.with(|d| d.set(remaining));

    Ok(if remaining == 0 && !had_events {
        PollStatus::Done
    } else if made_progress || had_events {
        PollStatus::Ready
    } else {
        PollStatus::Idle {
            next_deadline: next_deadline(),
        }
    })
}

// ─── Tests ───────────────────────────────────────────────────────────────────

#[cfg(test)]
mod tests {
    use super::*;
    use alloc::rc::Rc;
    use core::cell::Cell;

    /// Spin the executor until all tasks have completed (no I/O in unit tests).
    fn drive_until_done() {
        loop {
            match poll_step().unwrap() {
                PollStatus::Done => break,
                PollStatus::Ready | PollStatus::Idle { .. } => continue,
            }
        }
    }

    // ── JoinHandle ───────────────────────────────────────────────────────────

    /// A spawned future returning a non-unit value is retrievable via its handle.
    #[test]
    fn join_handle_resolves_with_task_output() {
        let result: Rc<Cell<u32>> = Rc::new(Cell::new(0));
        let result2 = Rc::clone(&result);

        let handle = spawn(async { 42u32 });
        let _ = spawn(async move {
            result2.set(handle.await);
        });

        drive_until_done();
        assert_eq!(result.get(), 42);
    }

    /// The handle waits even when the producer task is queued after the consumer.
    #[test]
    fn join_handle_waits_for_producer() {
        let result: Rc<Cell<u32>> = Rc::new(Cell::new(0));
        let result2 = Rc::clone(&result);

        // Consumer spawned first — it will see Pending on its first poll,
        // then be correctly woken when the producer completes.
        let handle = spawn(async { 99u32 });
        let _ = spawn(async move {
            result2.set(handle.await);
        });

        drive_until_done();
        assert_eq!(result.get(), 99);
    }

    /// Dropping the JoinHandle does not prevent the task from running.
    #[test]
    fn drop_join_handle_task_still_runs() {
        let ran: Rc<Cell<bool>> = Rc::new(Cell::new(false));
        let ran2 = Rc::clone(&ran);

        // Drop the handle immediately after spawning.
        drop(spawn(async move {
            ran2.set(true);
        }));

        drive_until_done();
        assert!(ran.get(), "task should run even after handle is dropped");
    }

    /// A task spawned from inside another async task is correctly scheduled.
    #[test]
    fn nested_spawn_resolves() {
        let result: Rc<Cell<bool>> = Rc::new(Cell::new(false));
        let result2 = Rc::clone(&result);

        let _ = spawn(async move {
            let handle = spawn(async { true });
            result2.set(handle.await);
        });

        drive_until_done();
        assert!(result.get());
    }

    /// Multiple independent tasks all complete and their side-effects accumulate.
    #[test]
    fn multiple_tasks_all_run() {
        let counter: Rc<Cell<u32>> = Rc::new(Cell::new(0));
        for _ in 0..5 {
            let c = Rc::clone(&counter);
            let _ = spawn(async move {
                c.set(c.get() + 1);
            });
        }
        drive_until_done();
        assert_eq!(counter.get(), 5);
    }

    // ── select ───────────────────────────────────────────────────────────────

    /// When left is immediately ready, select resolves with Left.
    #[test]
    fn select_left_wins_when_immediately_ready() {
        let winner: Rc<Cell<u8>> = Rc::new(Cell::new(0));
        let w2 = Rc::clone(&winner);

        let _ = spawn(async move {
            let r = select(async { 1u8 }, core::future::pending::<u8>()).await;
            match r {
                Either::Left(v) => w2.set(v),
                Either::Right(_) => w2.set(99),
            }
        });

        drive_until_done();
        assert_eq!(winner.get(), 1);
    }

    /// When right is immediately ready (left never resolves), select resolves with Right.
    #[test]
    fn select_right_wins_when_left_never_resolves() {
        let winner: Rc<Cell<u8>> = Rc::new(Cell::new(0));
        let w2 = Rc::clone(&winner);

        let _ = spawn(async move {
            let r = select(core::future::pending::<u8>(), async { 2u8 }).await;
            match r {
                Either::Left(_) => w2.set(99),
                Either::Right(v) => w2.set(v),
            }
        });

        drive_until_done();
        assert_eq!(winner.get(), 2);
    }

    /// When both sides are immediately ready, left wins (it is polled first).
    #[test]
    fn select_left_wins_when_both_immediately_ready() {
        let winner: Rc<Cell<u8>> = Rc::new(Cell::new(0));
        let w2 = Rc::clone(&winner);

        let _ = spawn(async move {
            let r = select(async { 10u8 }, async { 20u8 }).await;
            match r {
                Either::Left(v) | Either::Right(v) => w2.set(v),
            }
        });

        drive_until_done();
        assert_eq!(
            winner.get(),
            10,
            "left is polled first so it wins when both ready"
        );
    }

    /// select can be composed: one arm is itself a JoinHandle.
    #[test]
    fn select_with_join_handle_arm() {
        let result: Rc<Cell<u32>> = Rc::new(Cell::new(0));
        let r2 = Rc::clone(&result);

        let _ = spawn(async move {
            let fast = spawn(async { 7u32 });
            let r = select(fast, core::future::pending::<u32>()).await;
            match r {
                Either::Left(v) => r2.set(v),
                Either::Right(v) => r2.set(v),
            }
        });

        drive_until_done();
        assert_eq!(result.get(), 7);
    }
}