hft_jobs/
lib.rs

1//! A lightweight, allocation-free job system for executing type-erased closures.
2//!
3//! This crate provides the [`Job`] type, which stores closures inline inside a
4//! fixed-size buffer and executes them without heap allocation. Each job embeds
5//! its own `call`, `clone`, and `drop` functions via type-erased function
6//! pointers, enabling predictable, low-latency execution suitable for
7//! high-frequency or real-time workloads.
8//!
9//! `Job` is generic over the inline capacity `N`, the closure's return type
10//! `R`, and an optional context type `C`. The inline storage size `N` has no
11//! default; callers must choose a capacity explicitly. `R` defaults to `()`,
12//! and `C` defaults to `()`.
13//!
14//! # Example: Dispatching Jobs to a Worker Thread
15//!
16//! ```rust
17//! use std::sync::mpsc;
18//! use std::thread;
19//! use hft_jobs::Job;
20//!
21//! // Create a channel for sending jobs.
22//! let (tx, rx) = mpsc::channel::<Job<24, String>>();
23//!
24//! // A worker thread that receives and runs jobs.
25//! thread::spawn(move || {
26//!     while let Ok(job) = rx.recv() {
27//!         let s = job.run();
28//!         println!("{}", s);
29//!     }
30//! });
31//!
32//! // Send a simple job.
33//! let job = Job::<24, _>::new(|| "Hello from a job!".to_string());
34//! tx.send(job).unwrap();
35//!
36//! // A convenience macro that enqueues a logging job.
37//! macro_rules! log {
38//!     ($tx:expr, $fmt:literal $(, $arg:expr)* $(,)?) => {{
39//!         let job = Job::<24, String>::new(move || {
40//!             format!($fmt $(, $arg)*)
41//!         });
42//!         let _ = $tx.send(job);
43//!     }};
44//! }
45//!
46//! // Use the `log!` macro to enqueue a println job.
47//! log!(tx, "Logging from thread: {}", 42);
48//! ```
49//!
50//! This model provides a minimal, fast job runtime suitable for embedded
51//! systems, schedulers, executors, or lightweight logging systems.
52
53use std::{marker::PhantomData, mem, ptr};
54
55/// A type-erased, fixed-size job container for storing and invoking closures.
56///
57/// `Job` holds an opaque buffer and a trio of function pointers that know how
58/// to call, clone, and drop the stored value. This allows arbitrary closures
59/// (or other callable types) to be stored in a uniform, `repr(C)` layout,
60/// making it suitable for job queues, thread pools, and FFI boundaries.
61///
62/// The stored callable is expected to live entirely inside the `data` buffer,
63/// and the function pointers must treat that buffer as their backing storage.
64///
65/// # Type Parameters
66///
67/// * `N` – The size of the internal storage buffer, in bytes. This must be
68///   large enough to hold the largest closure (or callable) you intend to
69///   store. There is no default; choose the inline capacity that fits your
70///   workload (e.g., `64` for small captures).
71/// * `R` – The return type of the stored closure. This is the value produced
72///   by [`Job::run`] or [`Job::run_with_ctx`]. Use `()` for fire-and-forget
73///   jobs (the default).
74/// * `C` – A mutable context passed to the closure when it is run. Defaults to
75///   `()` for the common zero-context case.
76///
77/// # Layout
78///
79/// * `data` – An `Align16<[u8; N]>` buffer that stores the erased closure.
80///   The alignment wrapper ensures the buffer has at least 16-byte alignment,
81///   which is typically sufficient for most captured types.
82/// * `fn_call` – An `unsafe fn(*mut u8) -> R` that takes a pointer into
83///   `data`, invokes the stored callable, and returns its result.
84/// * `fn_clone` – An `unsafe fn(*const u8, *mut u8)` that clones the stored
85///   value from one buffer to another (source and destination pointers into
86///   `data`-like storage).
87/// * `fn_drop` – An `unsafe fn(*mut u8)` that drops the stored value in place.
88///
89/// The `repr(C)` attribute guarantees a stable field layout, which is useful if
90/// instances of `Job` are passed across FFI or need a predictable memory
91/// representation.
92///
93/// # Threading and Lifetime
94///
95/// A `Job` created via [`Job::new`]:
96///
97/// * is backed by a `FnOnce() -> R + Clone + Send + 'static` callable,
98///   ensuring that the stored closure can be invoked exactly once, cloned for
99///   replication, transferred across threads, and contains no non-`'static`
100///   borrows;
101/// * is `Clone`, allowing the same logical callable to be duplicated into
102///   multiple `Job` instances (e.g., for SPMC job queues);
103/// * is `Send`, so it may be moved freely to other threads for execution;
104/// * is `'static`, meaning it holds no borrowed references tied to a shorter
105///   lifetime and may safely outlive the scope in which it was created.
106///
107/// # Example
108///
109/// ```rust
110/// # use hft_jobs::Job;
111/// // Typically constructed via a helper like `Job::new`:
112/// let job = Job::<64>::new(|| {
113///     println!("Hello from a job!");
114/// });
115///
116/// // Later, the job executor would call something like:
117/// job.run();
118/// ```
119#[repr(C)]
120pub struct Job<const N: usize, R = (), C = ()> {
121    data: Align16<[u8; N]>, // N must be >= sizeof(biggest closure)
122    fn_call: unsafe fn(*mut u8, &mut C) -> R,
123    fn_clone: unsafe fn(*const u8, *mut u8),
124    fn_drop: unsafe fn(*mut u8),
125    _marker: PhantomData<R>,
126}
127
128#[repr(align(16))]
129struct Align16<T>(pub T);
130
131// SAFETY: Job only ever contains F: FnOnce(&mut C) -> R + Clone + Send + 'static,
132// enforced in Job::new_with_ctx / Job::new, so it is safe to move between threads.
133unsafe impl<const N: usize, R, C> Send for Job<N, R, C> {}
134
135impl<const N: usize, R, C> Job<N, R, C> {
136    /// Creates a new job from a closure, storing it inline without heap
137    /// allocation.
138    ///
139    /// # Type Requirements
140    ///
141    /// The closure `F` must satisfy:
142    /// - `FnOnce() -> R` — the closure is invoked exactly once to produce `R`;
143    /// - `Clone` — needed so the job queue or scheduler can duplicate jobs if
144    ///   desired;
145    /// - `Send + 'static` — ensures the closure may be transferred across
146    ///   threads and does not borrow non-`'static` data.
147    ///
148    /// # Storage Constraints
149    ///
150    /// The closure must fit inside the inline buffer:
151    /// - `size_of::<F>() <= N`
152    /// - `align_of::<F>() <= align_of::<Align16<[u8; N]>>`
153    ///
154    /// If either condition fails, the function **panics**.
155    ///
156    /// # How it Works
157    ///
158    /// This method:
159    /// 1. Generates specialized `fn_call`, `fn_clone`, and `fn_drop` functions
160    ///    for the captured closure type `F`.
161    /// 2. Writes the closure directly into the inline buffer using
162    ///    [`ptr::write`], avoiding heap allocations.
163    /// 3. Returns a fully-constructed [`Job`] that owns the closure.
164    ///
165    /// # Safety
166    ///
167    /// Internally uses unsafe operations to:
168    /// - Reconstruct `F` from raw bytes.
169    /// - Clone `F` via raw pointers.
170    /// - Drop `F` in place.
171    ///
172    /// These operations are memory-safe *only* because:
173    /// - The size and alignment checks guarantee the buffer is valid for `F`.
174    /// - The caller cannot violate the type invariants through the public API.
175    ///
176    /// # Example (with context)
177    ///
178    /// ```
179    /// # use hft_jobs::Job;
180    /// #[derive(Default)]
181    /// struct Ctx {
182    ///     sum: u32,
183    /// }
184    ///
185    /// let job = Job::<64, u32, Ctx>::new_with_ctx(|ctx| {
186    ///     ctx.sum += 1;
187    ///     ctx.sum
188    /// });
189    ///
190    /// let mut ctx = Ctx::default();
191    /// assert_eq!(job.run_with_ctx(&mut ctx), 1);
192    /// assert_eq!(ctx.sum, 1);
193    /// ```
194    ///
195    /// F: FnOnce(&mut C) -> R + Clone + Send + 'static
196    #[inline]
197    pub fn new_with_ctx<F>(f: F) -> Self
198    where
199        F: FnOnce(&mut C) -> R + Clone + Send + 'static,
200    {
201        // Ensure (at compile time) that the closure fits into our inline storage
202        const {
203            assert!(mem::size_of::<F>() <= N);
204            assert!(mem::align_of::<F>() <= mem::align_of::<Align16<[u8; N]>>());
205        }
206
207        unsafe fn fn_call<R, C, F>(data: *mut u8, ctx: &mut C) -> R
208        where
209            F: FnOnce(&mut C) -> R,
210        {
211            unsafe {
212                let f = ptr::read(data as *const F);
213                f(ctx)
214            }
215        }
216
217        unsafe fn fn_clone<R, C, F>(src: *const u8, dst: *mut u8)
218        where
219            F: FnOnce(&mut C) -> R + Clone,
220        {
221            unsafe {
222                let f_src = &*(src as *const F);
223                let f_clone = f_src.clone();
224                ptr::write(dst as *mut F, f_clone);
225            }
226        }
227
228        unsafe fn fn_drop<R, C, F>(data: *mut u8)
229        where
230            F: FnOnce(&mut C) -> R,
231        {
232            unsafe {
233                ptr::drop_in_place(data as *mut F);
234            }
235        }
236
237        let mut job = Job {
238            fn_call: fn_call::<R, C, F>,
239            fn_clone: fn_clone::<R, C, F>,
240            fn_drop: fn_drop::<R, C, F>,
241            data: Align16([0u8; N]),
242            _marker: PhantomData,
243        };
244
245        unsafe {
246            // Place the closure into `data` without heap allocation
247            let dst = job.data.0.as_mut_ptr() as *mut F;
248            ptr::write(dst, f);
249        }
250
251        job
252    }
253
254    /// Runs the stored closure with a mutable context, consuming the job and
255    /// returning its result `R`.
256    ///
257    /// This method invokes the closure that was previously stored in the inline
258    /// buffer. Running the job consumes its captured environment and yields
259    /// a value of type `R`.
260    ///
261    /// After invocation, the job's internal `fn_drop` function pointer is replaced
262    /// with a no-op drop function, ensuring that the closure's environment is not
263    /// dropped **twice** when the `Job` itself is later dropped.
264    ///
265    /// # How it Works
266    ///
267    /// - `fn_call` reconstructs the original closure from the buffer, taking
268    ///   ownership of it.
269    /// - The closure is executed with the provided context, and its return value
270    ///   is propagated back to the caller as `R`.
271    /// - Because the closure has now been consumed, `fn_drop` is overwritten with
272    ///   a version that performs no action, preventing a double-drop.
273    ///
274    /// # Safety
275    ///
276    /// Although this method uses unsafe raw-pointer calls internally, it is safe
277    /// to use because:
278    ///
279    /// - The closure is guaranteed (via `Job::new_with_ctx`) to fit in the inline
280    ///   buffer with correct alignment.
281    /// - The closure is executed exactly once.
282    /// - The memory backing the closure is never accessed again after ownership
283    ///   is taken by `fn_call`.
284    ///
285    /// # Example
286    ///
287    /// ```rust
288    /// # use hft_jobs::Job;
289    /// #[derive(Default)]
290    /// struct Ctx {
291    ///     total: u32,
292    /// }
293    ///
294    /// let job = Job::<64, u32, Ctx>::new_with_ctx(|ctx| {
295    ///     ctx.total += 2;
296    ///     ctx.total
297    /// });
298    ///
299    /// let mut ctx = Ctx::default();
300    /// assert_eq!(job.run_with_ctx(&mut ctx), 2);
301    /// assert_eq!(ctx.total, 2);
302    /// ```
303    #[inline]
304    pub fn run_with_ctx(mut self, ctx: &mut C) -> R {
305        unsafe {
306            // Replace drop with a no-op drop (ZST panic closure) to avoid double-drop.
307            self.fn_drop = Job::<N, R, C>::default().fn_drop;
308            (self.fn_call)(self.data.0.as_ptr() as *mut u8, ctx)
309        }
310    }
311}
312
313impl<const N: usize, R, C> Default for Job<N, R, C> {
314    /// Constructs a "default" job which panics if it is ever run.
315    ///
316    /// This is mainly useful as a placeholder. Calling [`Job::run_with_ctx`] on a
317    /// default-constructed job will panic with the message
318    /// `"attempt to execute an empty job"`.
319    /// Used as a source of a no-op drop function after `run_with_ctx`.
320    #[inline]
321    fn default() -> Self {
322        // ZST closure; drop is effectively a no-op.
323        Self::new_with_ctx(|_ctx: &mut C| panic!("attempt to execute an empty job"))
324    }
325}
326
327impl<const N: usize, R, C> Clone for Job<N, R, C> {
328    /// Clones the underlying closure into a new `Job`.
329    ///
330    /// Both the original and cloned `Job` instances own independent copies of
331    /// the captured environment and can be run separately.
332    #[inline]
333    fn clone(&self) -> Self {
334        let mut new_job = Job {
335            fn_call: self.fn_call,
336            fn_clone: self.fn_clone,
337            fn_drop: self.fn_drop,
338            data: Align16([0u8; N]),
339            _marker: PhantomData,
340        };
341        unsafe {
342            (self.fn_clone)(self.data.0.as_ptr(), new_job.data.0.as_mut_ptr());
343        }
344        new_job
345    }
346}
347
348impl<const N: usize, R, C> Drop for Job<N, R, C> {
349    /// Drops the stored closure (if any) in place.
350    ///
351    /// If the job has not been run, this will drop the captured environment of
352    /// the stored closure. If the job **has** been run, [`Job::run_with_ctx`]
353    /// (or [`Job::run`] for the `C = ()` convenience impl) ensures that
354    /// `fn_drop` has been replaced so that no double-drop occurs.
355    #[inline]
356    fn drop(&mut self) {
357        unsafe {
358            (self.fn_drop)(self.data.0.as_mut_ptr());
359        }
360    }
361}
362
363/// For the common “no context” case (C = ()):
364impl<const N: usize, R> Job<N, R, ()> {
365    /// Convenience constructor for the common zero-context case.
366    ///
367    /// Equivalent to [`Job::new_with_ctx`] with `C = ()`.
368    /// Backwards-compatible constructor: closure takes no arguments.
369    #[inline]
370    pub fn new<F>(f: F) -> Self
371    where
372        F: FnOnce() -> R + Clone + Send + 'static,
373    {
374        // Wrap zero-arg closure into a context-taking closure
375        Self::new_with_ctx(move |_ctx: &mut ()| f())
376    }
377
378    /// Convenience runner for `C = ()`.
379    ///
380    /// Equivalent to [`Job::run_with_ctx`] with an empty context.
381    /// Backwards-compatible runner: no context required.
382    #[inline]
383    pub fn run(self) -> R {
384        self.run_with_ctx(&mut ())
385    }
386}
387
388#[cfg(test)]
389mod tests {
390    use super::*;
391    use std::sync::{
392        Arc,
393        atomic::{AtomicUsize, Ordering},
394        mpsc,
395    };
396    use std::thread;
397
398    /// Helper: prove at compile time that Job is Send.
399    fn assert_send<T: Send>() {}
400    #[test]
401    fn job_is_send() {
402        assert_send::<Job<64, ()>>();
403        assert_send::<Job<128, ()>>();
404    }
405
406    #[test]
407    fn job_runs_closure_once() {
408        let counter = Arc::new(AtomicUsize::new(0));
409        let c = {
410            let counter = counter.clone();
411            move || {
412                counter.fetch_add(1, Ordering::SeqCst);
413            }
414        };
415
416        let job: Job<64, ()> = Job::new(c);
417        job.run();
418
419        assert_eq!(counter.load(Ordering::SeqCst), 1);
420    }
421
422    #[test]
423    fn cloned_jobs_both_run() {
424        let counter = Arc::new(AtomicUsize::new(0));
425        let c = {
426            let counter = counter.clone();
427            move || {
428                counter.fetch_add(1, Ordering::SeqCst);
429            }
430        };
431
432        let job1: Job<64, ()> = Job::new(c);
433        let job2 = job1.clone();
434
435        job1.run();
436        job2.run();
437
438        assert_eq!(counter.load(Ordering::SeqCst), 2);
439    }
440
441    /// Type whose Drop increments a shared AtomicUsize.
442    #[derive(Clone)]
443    struct DropGuard {
444        drops: Arc<AtomicUsize>,
445    }
446
447    impl Drop for DropGuard {
448        fn drop(&mut self) {
449            self.drops.fetch_add(1, Ordering::SeqCst);
450        }
451    }
452
453    #[test]
454    fn job_drop_calls_closure_drop_when_not_run() {
455        let drops = Arc::new(AtomicUsize::new(0));
456        {
457            let guard = DropGuard {
458                drops: drops.clone(),
459            };
460            let c = move || {
461                // Do nothing; we only care about Drop.
462                let _ = &guard;
463            };
464            let _job: Job<64, ()> = Job::new(c);
465            // _job is dropped here without run()
466        }
467
468        // The closure's captured DropGuard should have been dropped exactly once.
469        assert_eq!(drops.load(Ordering::SeqCst), 1);
470    }
471
472    #[test]
473    fn job_run_does_not_double_drop_closure() {
474        let drops = Arc::new(AtomicUsize::new(0));
475        {
476            let guard = DropGuard {
477                drops: drops.clone(),
478            };
479            let c = move || {
480                // When this closure is called, its captured guard will
481                // be dropped at end of call.
482                let _ = &guard;
483            };
484            let job: Job<64, ()> = Job::new(c);
485            job.run();
486            // After run(), Job's Drop should not try to drop the original F again.
487        }
488
489        // Exactly one drop of the captured guard.
490        assert_eq!(drops.load(Ordering::SeqCst), 1);
491    }
492
493    #[test]
494    #[should_panic(expected = "attempt to execute an empty job")]
495    fn default_job_panics_on_run() {
496        let job: Job<64, ()> = Job::default();
497        job.run();
498    }
499
500    #[test]
501    fn job_can_be_sent_to_worker_thread_and_run() {
502        let counter = Arc::new(AtomicUsize::new(0));
503        let (tx, rx) = mpsc::channel::<Job<1024, ()>>();
504
505        // Worker thread that receives and runs jobs.
506        let worker_counter = counter.clone();
507        let worker = thread::spawn(move || {
508            while let Ok(job) = rx.recv() {
509                job.run();
510                worker_counter.fetch_add(1, Ordering::SeqCst);
511            }
512        });
513
514        // Send a few jobs.
515        for _ in 0..3 {
516            let c_counter = counter.clone();
517            let job = Job::new(move || {
518                c_counter.fetch_add(1, Ordering::SeqCst);
519            });
520            tx.send(job).unwrap();
521        }
522
523        drop(tx); // close channel
524        worker.join().unwrap();
525
526        // 3 jobs executed, and worker ran loop 3 times.
527        assert_eq!(counter.load(Ordering::SeqCst), 6);
528    }
529
530    #[test]
531    fn job_alignment_and_size_are_sufficient_for_small_closure() {
532        // Small closure capturing a couple of integers.
533        let a = 1u32;
534        let b = 2u64;
535        let c = move || {
536            let _ = a + b as u32;
537        };
538
539        let size_f = mem::size_of_val(&c);
540        let align_f = mem::align_of_val(&c);
541
542        assert!(size_f <= 64);
543        assert!(align_f <= mem::align_of::<Align16<[u8; 64]>>());
544
545        // Should not panic:
546        let job: Job<64, ()> = Job::new(c);
547        job.run();
548    }
549
550    #[test]
551    fn job_returns_value() {
552        let job: Job<64, u32> = Job::new(|| 40 + 2);
553        let result = job.run();
554        assert_eq!(result, 42);
555    }
556
557    #[test]
558    fn job_returns_owned_string() {
559        let job: Job<64, _> = Job::new(|| "hello".to_owned() + " world");
560        let result = job.run();
561        assert_eq!(result, "hello world");
562    }
563
564    #[test]
565    fn cloned_jobs_both_return_values() {
566        let job1: Job<64, u64> = Job::new(|| 10u64 * 10);
567        let job2 = job1.clone();
568
569        let r1 = job1.run();
570        let r2 = job2.run();
571
572        assert_eq!(r1, 100);
573        assert_eq!(r2, 100);
574    }
575
576    #[test]
577    fn job_with_result_can_be_sent_to_worker_thread() {
578        let (tx, rx) = mpsc::channel::<Job<128, u32>>();
579
580        // Worker thread that receives jobs and collects their results.
581        let worker = thread::spawn(move || {
582            let mut results = Vec::new();
583            while let Ok(job) = rx.recv() {
584                results.push(job.run());
585            }
586            results
587        });
588
589        // Send a few value-returning jobs.
590        for i in 0..3u32 {
591            let job = Job::new(move || i * 2);
592            tx.send(job).unwrap();
593        }
594
595        drop(tx); // close channel
596        let results = worker.join().unwrap();
597
598        assert_eq!(results.len(), 3);
599        assert!(results.contains(&0));
600        assert!(results.contains(&2));
601        assert!(results.contains(&4));
602    }
603
604    #[test]
605    fn job_runs_with_mutable_context() {
606        #[derive(Default, Clone)]
607        struct Ctx {
608            ticks: u32,
609        }
610
611        let mut ctx = Ctx::default();
612
613        let job = Job::<64, u32, Ctx>::new_with_ctx(|c| {
614            c.ticks += 1;
615            c.ticks
616        });
617
618        let job_clone = job.clone();
619
620        let first = job_clone.run_with_ctx(&mut ctx);
621        assert_eq!(first, 1);
622        assert_eq!(ctx.ticks, 1);
623
624        let second = job.run_with_ctx(&mut ctx);
625        assert_eq!(second, 2);
626        assert_eq!(ctx.ticks, 2);
627    }
628
629    #[test]
630    fn example_from_readme() {
631        // Create a simple channel for dispatching jobs to a worker
632        let (tx, rx) = mpsc::channel::<Job<64, _>>();
633
634        // Spawn the worker thread
635        thread::spawn(move || {
636            while let Ok(job) = rx.recv() {
637                let s = job.run(); // executes the closure
638                println!("{}", s);
639            }
640        });
641
642        // Send a job
643        let job = Job::<64, _>::new(|| "Hello from a job!".to_string());
644        tx.send(job).unwrap();
645
646        // A convenience macro that enqueues a logging job.
647        macro_rules! log {
648            ($tx:expr, $fmt:literal $(, $arg:expr)* $(,)?) => {{
649                let job = Job::<64, String>::new(move || {
650                    format!($fmt $(, $arg)*)
651                });
652                let _ = $tx.send(job);
653            }};
654        }
655
656        // Use the `log!` macro to enqueue a logging job.
657        log!(tx, "Logging from thread: {}", 42);
658    }
659}