beamr 0.6.4

A Rust runtime with the BEAM's execution model, targeting Gleam
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
//! Dirty scheduler thread pool.
//!
//! A separate pool of OS threads for native functions that take
//! a long time (git push, cargo build). Long-running work goes
//! here so normal scheduler threads stay free and fair.
//! Pool size is configurable independently of the normal
//! scheduler thread count (per D10).

use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::JoinHandle;

use crossbeam_channel::{Receiver, Sender};

use crate::ets::OwnedTerm;
use crate::native::{ExceptionClass, NativeContinuation, NativeFn, ProcessContext, SuspendRequest};
use crate::scheduler::lock_or_recover;
use crate::term::Term;

/// Default maximum number of queued dirty jobs per pool.
pub const DEFAULT_DIRTY_QUEUE_DEPTH: usize = 1024;

/// Default number of IO dirty scheduler threads.
pub const DEFAULT_DIRTY_IO_THREADS: usize = 10;

/// Distinguishes the two BEAM-style dirty scheduler pools.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum DirtySchedulerKind {
    /// CPU-bound dirty work.
    Cpu,
    /// IO-bound dirty work.
    Io,
}

/// Minimal oneshot result channel used by dirty jobs.
pub mod oneshot {
    use std::sync::mpsc;

    /// Sends a single value to the matching [`Receiver`].
    pub struct Sender<T>(mpsc::SyncSender<T>);

    /// Receives a single value from the matching [`Sender`].
    pub struct Receiver<T>(mpsc::Receiver<T>);

    /// Error returned when the oneshot receiver has been dropped.
    pub struct SendError<T>(pub T);

    /// Error returned when the oneshot sender has been dropped.
    #[derive(Debug, Copy, Clone, Eq, PartialEq)]
    pub struct RecvError;

    /// Creates a single-use channel.
    #[must_use]
    pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
        let (sender, receiver) = mpsc::sync_channel(1);
        (Sender(sender), Receiver(receiver))
    }

    impl<T> Sender<T> {
        /// Sends the result to the receiver.
        pub fn send(self, value: T) -> Result<(), SendError<T>> {
            self.0.send(value).map_err(|error| SendError(error.0))
        }
    }

    impl<T> Receiver<T> {
        /// Blocks until the result arrives or the sender is dropped.
        pub fn recv(self) -> Result<T, RecvError> {
            self.0.recv().map_err(|_| RecvError)
        }
    }
}

/// Result of a native function invocation completed on a dirty scheduler thread.
#[derive(Debug)]
pub struct DirtyResult {
    /// Native function return value or error reason.
    pub result: Result<Term, Term>,
    /// Owns boxed/list allocations reachable from `result` until the process
    /// resumes and copies the dirty native return value onto its own heap.
    pub owned_result: Option<OwnedTerm>,
    /// Exception class requested by the dirty native if it returned `Err`.
    pub exception_class: ExceptionClass,
    /// Stacktrace requested by the dirty native if it returned `Err`.
    pub exception_stacktrace: Term,
    /// Suspend request the dirty native left on its detached context: the
    /// owning thread re-parks the process at the dirty call instruction
    /// under a NEW host-await suspension instead of applying a value.
    /// Ignored when the native returned `Err` (the exception wins, matching
    /// `call_native_entry`).
    pub suspend: Option<SuspendRequest>,
    /// Trampoline request the dirty native left on its detached context:
    /// the owning thread sets up the closure call on resume. Must carry a
    /// continuation (returning straight to the call instruction would
    /// re-submit the dirty call).
    pub trampoline: Option<OwnedDirtyTrampoline>,
}

/// A dirty native's trampoline request with its terms copied into owned
/// storage, so they survive the detached context's teardown until the owning
/// thread copies them onto the resuming process heap.
#[derive(Debug)]
pub struct OwnedDirtyTrampoline {
    /// The closure (fun) term to invoke.
    pub fun: OwnedTerm,
    /// Arguments to pass to the closure.
    pub args: Vec<OwnedTerm>,
    /// Continuation resumed after the closure returns. Must hold no heap
    /// terms (a detached context's terms would dangle); term-carrying
    /// continuations are rejected at the dirty worker.
    pub continuation: NativeContinuation,
}

/// Native function invocation scheduled onto a dirty scheduler thread.
pub struct DirtyJob {
    /// Process id that submitted the dirty job.
    pub pid: u64,
    /// Native function to execute.
    pub function: NativeFn,
    /// Arguments passed to the native function.
    pub args: Vec<Term>,
    /// Native call context for the dirty worker.
    pub context: ProcessContext<'static>,
    /// Channel used to return the native result to the submitter.
    pub result_sender: oneshot::Sender<DirtyResult>,
}

// SAFETY: dirty scheduler jobs are constructed for standalone native calls and
// use `ProcessContext<'static>` so they cannot borrow a scheduler-owned process.
// B-077 does not migrate process bodies to dirty threads; future wiring must keep
// that boundary by submitting only detached contexts.
unsafe impl Send for DirtyJob {}

/// Generic dirty CPU work item for runtime maintenance jobs such as JIT compilation.
pub struct DirtyTask {
    task: Box<dyn FnOnce() + Send + 'static>,
}

impl DirtyTask {
    /// Creates a dirty task from an owned closure.
    pub fn new(task: impl FnOnce() + Send + 'static) -> Self {
        Self {
            task: Box::new(task),
        }
    }
}

enum DirtyMessage {
    RunNative(Box<DirtyJob>),
    RunTask(DirtyTask),
    Shutdown,
}

/// Failure returned when a dirty job cannot be enqueued.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum DirtySubmitError {
    /// Submission was attempted after pool shutdown began.
    ShutDown,
    /// The bounded dirty queue is full; the normal scheduler must not block.
    QueueFull,
    /// All dirty workers disconnected from the queue.
    Disconnected,
}

/// A bounded dirty scheduler pool backed by OS threads.
pub struct DirtyPool {
    name: String,
    thread_count: usize,
    queue_depth: usize,
    sender: Sender<DirtyMessage>,
    shutdown: AtomicBool,
    threads: Mutex<Vec<JoinHandle<()>>>,
    worker_names: Vec<String>,
}

impl DirtyPool {
    /// Creates a dirty pool with the default bounded queue depth.
    #[must_use]
    pub fn new(name: &str, thread_count: usize) -> Self {
        Self::with_queue_depth(name, thread_count, DEFAULT_DIRTY_QUEUE_DEPTH)
    }

    /// Creates the default CPU dirty pool.
    #[must_use]
    pub fn default_cpu() -> Self {
        Self::new("dirty-cpu", num_cpus::get())
    }

    /// Creates the default IO dirty pool.
    #[must_use]
    pub fn default_io() -> Self {
        Self::new("dirty-io", DEFAULT_DIRTY_IO_THREADS)
    }

    /// Creates a dirty pool with a configurable bounded queue depth.
    #[must_use]
    pub fn with_queue_depth(name: &str, thread_count: usize, queue_depth: usize) -> Self {
        let pool_thread_count = thread_count.max(1);
        let pool_queue_depth = queue_depth.max(1);
        let (sender, receiver) = crossbeam_channel::bounded(pool_queue_depth);
        let mut threads = Vec::with_capacity(pool_thread_count);
        let mut worker_names = Vec::with_capacity(pool_thread_count);

        for index in 0..pool_thread_count {
            let thread_name = format!("{name}-{index}");
            let receiver_for_thread = receiver.clone();
            match std::thread::Builder::new()
                .name(thread_name.clone())
                .spawn(move || worker_loop(receiver_for_thread))
            {
                Ok(handle) => {
                    worker_names.push(thread_name);
                    threads.push(handle);
                }
                Err(_error) => break,
            }
        }

        Self {
            name: name.to_owned(),
            thread_count: worker_names.len(),
            queue_depth: pool_queue_depth,
            sender,
            shutdown: AtomicBool::new(false),
            threads: Mutex::new(threads),
            worker_names,
        }
    }

    /// Enqueues a dirty job without blocking a normal scheduler thread.
    pub fn submit(&self, job: DirtyJob) -> Result<(), DirtySubmitError> {
        if self.shutdown.load(Ordering::Acquire) {
            return Err(DirtySubmitError::ShutDown);
        }
        self.sender
            .try_send(DirtyMessage::RunNative(Box::new(job)))
            .map_err(|error| match error {
                crossbeam_channel::TrySendError::Full(_) => DirtySubmitError::QueueFull,
                crossbeam_channel::TrySendError::Disconnected(_) => DirtySubmitError::Disconnected,
            })
    }

    /// Enqueues a generic dirty task without blocking a normal scheduler thread.
    pub fn submit_task(&self, task: DirtyTask) -> Result<(), DirtySubmitError> {
        if self.shutdown.load(Ordering::Acquire) {
            return Err(DirtySubmitError::ShutDown);
        }
        self.sender
            .try_send(DirtyMessage::RunTask(task))
            .map_err(|error| match error {
                crossbeam_channel::TrySendError::Full(_) => DirtySubmitError::QueueFull,
                crossbeam_channel::TrySendError::Disconnected(_) => DirtySubmitError::Disconnected,
            })
    }

    /// Signals all dirty workers to stop and joins them.
    pub fn shutdown(&self) {
        if self.shutdown.swap(true, Ordering::AcqRel) {
            return;
        }

        let mut threads = lock_or_recover(&self.threads);
        for _ in 0..threads.len() {
            let _ = self.sender.send(DirtyMessage::Shutdown);
        }
        for handle in threads.drain(..) {
            if let Err(payload) = handle.join() {
                std::panic::resume_unwind(payload);
            }
        }
    }

    /// Number of worker threads successfully started for this pool.
    #[must_use]
    pub fn thread_count(&self) -> usize {
        self.thread_count
    }

    /// Configured bounded queue depth.
    #[must_use]
    pub fn queue_depth(&self) -> usize {
        self.queue_depth
    }

    /// Pool base name.
    #[must_use]
    pub fn name(&self) -> &str {
        &self.name
    }

    /// Names of worker OS threads in this pool.
    #[must_use]
    pub fn worker_names(&self) -> &[String] {
        &self.worker_names
    }

    /// Whether shutdown has been requested.
    #[must_use]
    pub fn is_shutdown(&self) -> bool {
        self.shutdown.load(Ordering::Acquire)
    }
}

impl Drop for DirtyPool {
    fn drop(&mut self) {
        self.shutdown();
    }
}

fn worker_loop(receiver: Receiver<DirtyMessage>) {
    while let Ok(message) = receiver.recv() {
        match message {
            DirtyMessage::RunNative(mut job) => {
                let _pid = job.pid;
                let result = (job.function)(&job.args, &mut job.context);
                let raw_result = match &result {
                    Ok(value) | Err(value) => *value,
                };
                let owned_result = job.context.take_detached_result(raw_result).or_else(|| {
                    if raw_result.is_list() || raw_result.is_boxed() {
                        crate::ets::copy_term_to_ets(raw_result).ok()
                    } else {
                        None
                    }
                });
                let result = match owned_result.as_ref() {
                    Some(owned) => result.map(|_| owned.root()).map_err(|_| owned.root()),
                    None => result,
                };
                let exception_class = job.context.take_exception_class();
                let exception_stacktrace = job.context.take_exception_stacktrace();
                // Follow-up requests a dirty native is allowed to make:
                // re-suspend (host await) or trampoline a closure call. The
                // exception path wins over both, matching call_native_entry.
                let suspend = job.context.take_suspend().filter(|_| result.is_ok());
                let trampoline = match job.context.take_trampoline().filter(|_| result.is_ok()) {
                    None => None,
                    Some(request) => match own_dirty_trampoline(request) {
                        Ok(owned) => Some(owned),
                        Err(reason) => {
                            // Reject malformed requests loudly: the process
                            // raises instead of silently dropping them.
                            let _ = job.result_sender.send(DirtyResult {
                                result: Err(Term::atom(crate::atom::Atom::BADARG)),
                                owned_result: None,
                                exception_class: ExceptionClass::Error,
                                exception_stacktrace: Term::NIL,
                                suspend: None,
                                trampoline: None,
                            });
                            let _trace = reason;
                            continue;
                        }
                    },
                };
                let _ = job.result_sender.send(DirtyResult {
                    result,
                    owned_result,
                    exception_class,
                    exception_stacktrace,
                    suspend,
                    trampoline,
                });
            }
            DirtyMessage::RunTask(task) => {
                (task.task)();
            }
            DirtyMessage::Shutdown => break,
        }
    }
}

/// Copy a dirty native's trampoline request into owned storage.
///
/// Rejects requests without a continuation (returning to the call
/// instruction would re-submit the dirty call) and continuations that hold
/// heap terms (a detached context's terms dangle once the job is dropped).
fn own_dirty_trampoline(
    request: crate::native::TrampolineRequest,
) -> Result<OwnedDirtyTrampoline, &'static str> {
    let Some(continuation) = request.continuation else {
        return Err("dirty trampoline requires a continuation");
    };
    let mut holds_terms = false;
    continuation.for_each_term(&mut |_| holds_terms = true);
    if holds_terms {
        return Err("dirty trampoline continuation must not hold heap terms");
    }
    let fun = own_term(request.fun).map_err(|_| "dirty trampoline fun copy failed")?;
    let mut args = Vec::with_capacity(request.args.len());
    for arg in request.args {
        args.push(own_term(arg).map_err(|_| "dirty trampoline arg copy failed")?);
    }
    Ok(OwnedDirtyTrampoline {
        fun,
        args,
        continuation,
    })
}

fn own_term(term: Term) -> Result<OwnedTerm, crate::ets::EtsError> {
    if term.is_list() || term.is_boxed() {
        crate::ets::copy_term_to_ets(term)
    } else {
        Ok(OwnedTerm::immediate(term))
    }
}

#[cfg(test)]
mod tests {
    use super::{DirtyJob, DirtyPool, DirtySchedulerKind, oneshot};
    use crate::native::{ExceptionClass, ProcessContext};
    use crate::term::Term;

    fn forty_two(_args: &[Term], _context: &mut ProcessContext) -> Result<Term, Term> {
        Ok(Term::small_int(42))
    }

    #[test]
    fn dirty_pool_starts_named_threads_and_shuts_down_cleanly() {
        let pool = DirtyPool::new("dirty-test", 4);

        assert_eq!(pool.thread_count(), 4);
        assert_eq!(pool.worker_names().len(), 4);
        assert_eq!(
            pool.worker_names(),
            &[
                "dirty-test-0".to_owned(),
                "dirty-test-1".to_owned(),
                "dirty-test-2".to_owned(),
                "dirty-test-3".to_owned(),
            ]
        );

        pool.shutdown();
        assert!(pool.is_shutdown());
        pool.shutdown();
    }

    #[test]
    fn dirty_pool_executes_submitted_job_and_returns_result() {
        let pool = DirtyPool::with_queue_depth("dirty-test-job", 1, 1);
        let (result_sender, result_receiver) = oneshot::channel();

        assert_eq!(
            pool.submit(DirtyJob {
                pid: 7,
                function: forty_two,
                args: Vec::new(),
                context: ProcessContext::new(),
                result_sender,
            }),
            Ok(())
        );

        let result = result_receiver.recv().expect("dirty result");
        assert_eq!(result.result, Ok(Term::small_int(42)));
        assert!(result.owned_result.is_none());
        assert_eq!(result.exception_class, ExceptionClass::Error);
        assert_eq!(result.exception_stacktrace, Term::NIL);
        pool.shutdown();
    }

    #[test]
    fn dirty_scheduler_kind_distinguishes_cpu_and_io() {
        assert_eq!(DirtySchedulerKind::Cpu, DirtySchedulerKind::Cpu);
        assert_ne!(DirtySchedulerKind::Cpu, DirtySchedulerKind::Io);
    }
}