hft_jobs/lib.rs
1//! A lightweight, allocation-free job system for executing type-erased closures.
2//!
3//! This crate provides the [`Job`] type, which stores closures inline inside a
4//! fixed-size buffer and executes them without heap allocation. Each job embeds
5//! its own `call`, `clone`, and `drop` functions via type-erased function
6//! pointers, enabling predictable, low-latency execution suitable for
7//! high-frequency or real-time workloads.
8//!
9//! `Job` is generic over the inline capacity `N`, the closure's return type
10//! `R`, and an optional context type `C`. The inline storage size `N` has no
11//! default; callers must choose a capacity explicitly. `R` defaults to `()`,
12//! and `C` defaults to `()`.
13//!
14//! # Example: Dispatching Jobs to a Worker Thread
15//!
16//! ```rust
17//! use std::sync::mpsc;
18//! use std::thread;
19//! use hft_jobs::Job;
20//!
21//! // Create a channel for sending jobs.
22//! let (tx, rx) = mpsc::channel::<Job<24, String>>();
23//!
24//! // A worker thread that receives and runs jobs.
25//! thread::spawn(move || {
26//! while let Ok(job) = rx.recv() {
27//! let s = job.run();
28//! println!("{}", s);
29//! }
30//! });
31//!
32//! // Send a simple job.
33//! let job = Job::<24, _>::new(|| "Hello from a job!".to_string());
34//! tx.send(job).unwrap();
35//!
36//! // A convenience macro that enqueues a logging job.
37//! macro_rules! log {
38//! ($tx:expr, $fmt:literal $(, $arg:expr)* $(,)?) => {{
39//! let job = Job::<24, String>::new(move || {
40//! format!($fmt $(, $arg)*)
41//! });
42//! let _ = $tx.send(job);
43//! }};
44//! }
45//!
46//! // Use the `log!` macro to enqueue a println job.
47//! log!(tx, "Logging from thread: {}", 42);
48//! ```
49//!
50//! This model provides a minimal, fast job runtime suitable for embedded
51//! systems, schedulers, executors, or lightweight logging systems.
52
53use std::{marker::PhantomData, mem, ptr};
54
55/// A type-erased, fixed-size job container for storing and invoking closures.
56///
57/// `Job` holds an opaque buffer and a trio of function pointers that know how
58/// to call, clone, and drop the stored value. This allows arbitrary closures
59/// (or other callable types) to be stored in a uniform, `repr(C)` layout,
60/// making it suitable for job queues, thread pools, and FFI boundaries.
61///
62/// The stored callable is expected to live entirely inside the `data` buffer,
63/// and the function pointers must treat that buffer as their backing storage.
64///
65/// # Type Parameters
66///
67/// * `N` – The size of the internal storage buffer, in bytes. This must be
68/// large enough to hold the largest closure (or callable) you intend to
69/// store. There is no default; choose the inline capacity that fits your
70/// workload (e.g., `64` for small captures).
71/// * `R` – The return type of the stored closure. This is the value produced
72/// by [`Job::run`] or [`Job::run_with_ctx`]. Use `()` for fire-and-forget
73/// jobs (the default).
74/// * `C` – A mutable context passed to the closure when it is run. Defaults to
75/// `()` for the common zero-context case.
76///
77/// # Layout
78///
79/// * `data` – An `Align16<[u8; N]>` buffer that stores the erased closure.
80/// The alignment wrapper ensures the buffer has at least 16-byte alignment,
81/// which is typically sufficient for most captured types.
82/// * `fn_call` – An `unsafe fn(*mut u8) -> R` that takes a pointer into
83/// `data`, invokes the stored callable, and returns its result.
84/// * `fn_clone` – An `unsafe fn(*const u8, *mut u8)` that clones the stored
85/// value from one buffer to another (source and destination pointers into
86/// `data`-like storage).
87/// * `fn_drop` – An `unsafe fn(*mut u8)` that drops the stored value in place.
88///
89/// The `repr(C)` attribute guarantees a stable field layout, which is useful if
90/// instances of `Job` are passed across FFI or need a predictable memory
91/// representation.
92///
93/// # Threading and Lifetime
94///
95/// A `Job` created via [`Job::new`]:
96///
97/// * is backed by a `FnOnce() -> R + Clone + Send + 'static` callable,
98/// ensuring that the stored closure can be invoked exactly once, cloned for
99/// replication, transferred across threads, and contains no non-`'static`
100/// borrows;
101/// * is `Clone`, allowing the same logical callable to be duplicated into
102/// multiple `Job` instances (e.g., for SPMC job queues);
103/// * is `Send`, so it may be moved freely to other threads for execution;
104/// * is `'static`, meaning it holds no borrowed references tied to a shorter
105/// lifetime and may safely outlive the scope in which it was created.
106///
107/// # Example
108///
109/// ```rust
110/// # use hft_jobs::Job;
111/// // Typically constructed via a helper like `Job::new`:
112/// let job = Job::<64>::new(|| {
113/// println!("Hello from a job!");
114/// });
115///
116/// // Later, the job executor would call something like:
117/// job.run();
118/// ```
119#[repr(C)]
120pub struct Job<const N: usize, R = (), C = ()> {
121 data: Align16<[u8; N]>, // N must be >= sizeof(biggest closure)
122 fn_call: unsafe fn(*mut u8, &mut C) -> R,
123 fn_clone: unsafe fn(*const u8, *mut u8),
124 fn_drop: unsafe fn(*mut u8),
125 _marker: PhantomData<R>,
126}
127
128#[repr(align(16))]
129struct Align16<T>(pub T);
130
131// SAFETY: Job only ever contains F: FnOnce(&mut C) -> R + Clone + Send + 'static,
132// enforced in Job::new_with_ctx / Job::new, so it is safe to move between threads.
133unsafe impl<const N: usize, R, C> Send for Job<N, R, C> {}
134
135impl<const N: usize, R, C> Job<N, R, C> {
136 /// Creates a new job from a closure, storing it inline without heap
137 /// allocation.
138 ///
139 /// # Type Requirements
140 ///
141 /// The closure `F` must satisfy:
142 /// - `FnOnce() -> R` — the closure is invoked exactly once to produce `R`;
143 /// - `Clone` — needed so the job queue or scheduler can duplicate jobs if
144 /// desired;
145 /// - `Send + 'static` — ensures the closure may be transferred across
146 /// threads and does not borrow non-`'static` data.
147 ///
148 /// # Storage Constraints
149 ///
150 /// The closure must fit inside the inline buffer:
151 /// - `size_of::<F>() <= N`
152 /// - `align_of::<F>() <= align_of::<Align16<[u8; N]>>`
153 ///
154 /// If either condition fails, the function **panics**.
155 ///
156 /// # How it Works
157 ///
158 /// This method:
159 /// 1. Generates specialized `fn_call`, `fn_clone`, and `fn_drop` functions
160 /// for the captured closure type `F`.
161 /// 2. Writes the closure directly into the inline buffer using
162 /// [`ptr::write`], avoiding heap allocations.
163 /// 3. Returns a fully-constructed [`Job`] that owns the closure.
164 ///
165 /// # Safety
166 ///
167 /// Internally uses unsafe operations to:
168 /// - Reconstruct `F` from raw bytes.
169 /// - Clone `F` via raw pointers.
170 /// - Drop `F` in place.
171 ///
172 /// These operations are memory-safe *only* because:
173 /// - The size and alignment checks guarantee the buffer is valid for `F`.
174 /// - The caller cannot violate the type invariants through the public API.
175 ///
176 /// # Example (with context)
177 ///
178 /// ```
179 /// # use hft_jobs::Job;
180 /// #[derive(Default)]
181 /// struct Ctx {
182 /// sum: u32,
183 /// }
184 ///
185 /// let job = Job::<64, u32, Ctx>::new_with_ctx(|ctx| {
186 /// ctx.sum += 1;
187 /// ctx.sum
188 /// });
189 ///
190 /// let mut ctx = Ctx::default();
191 /// assert_eq!(job.run_with_ctx(&mut ctx), 1);
192 /// assert_eq!(ctx.sum, 1);
193 /// ```
194 ///
195 /// F: FnOnce(&mut C) -> R + Clone + Send + 'static
196 #[inline]
197 pub fn new_with_ctx<F>(f: F) -> Self
198 where
199 F: FnOnce(&mut C) -> R + Clone + Send + 'static,
200 {
201 // Ensure (at compile time) that the closure fits into our inline storage
202 const {
203 assert!(mem::size_of::<F>() <= N);
204 assert!(mem::align_of::<F>() <= mem::align_of::<Align16<[u8; N]>>());
205 }
206
207 unsafe fn fn_call<R, C, F>(data: *mut u8, ctx: &mut C) -> R
208 where
209 F: FnOnce(&mut C) -> R,
210 {
211 unsafe {
212 let f = ptr::read(data as *const F);
213 f(ctx)
214 }
215 }
216
217 unsafe fn fn_clone<R, C, F>(src: *const u8, dst: *mut u8)
218 where
219 F: FnOnce(&mut C) -> R + Clone,
220 {
221 unsafe {
222 let f_src = &*(src as *const F);
223 let f_clone = f_src.clone();
224 ptr::write(dst as *mut F, f_clone);
225 }
226 }
227
228 unsafe fn fn_drop<R, C, F>(data: *mut u8)
229 where
230 F: FnOnce(&mut C) -> R,
231 {
232 unsafe {
233 ptr::drop_in_place(data as *mut F);
234 }
235 }
236
237 let mut job = Job {
238 fn_call: fn_call::<R, C, F>,
239 fn_clone: fn_clone::<R, C, F>,
240 fn_drop: fn_drop::<R, C, F>,
241 data: Align16([0u8; N]),
242 _marker: PhantomData,
243 };
244
245 unsafe {
246 // Place the closure into `data` without heap allocation
247 let dst = job.data.0.as_mut_ptr() as *mut F;
248 ptr::write(dst, f);
249 }
250
251 job
252 }
253
254 /// Runs the stored closure with a mutable context, consuming the job and
255 /// returning its result `R`.
256 ///
257 /// This method invokes the closure that was previously stored in the inline
258 /// buffer. Running the job consumes its captured environment and yields
259 /// a value of type `R`.
260 ///
261 /// After invocation, the job's internal `fn_drop` function pointer is replaced
262 /// with a no-op drop function, ensuring that the closure's environment is not
263 /// dropped **twice** when the `Job` itself is later dropped.
264 ///
265 /// # How it Works
266 ///
267 /// - `fn_call` reconstructs the original closure from the buffer, taking
268 /// ownership of it.
269 /// - The closure is executed with the provided context, and its return value
270 /// is propagated back to the caller as `R`.
271 /// - Because the closure has now been consumed, `fn_drop` is overwritten with
272 /// a version that performs no action, preventing a double-drop.
273 ///
274 /// # Safety
275 ///
276 /// Although this method uses unsafe raw-pointer calls internally, it is safe
277 /// to use because:
278 ///
279 /// - The closure is guaranteed (via `Job::new_with_ctx`) to fit in the inline
280 /// buffer with correct alignment.
281 /// - The closure is executed exactly once.
282 /// - The memory backing the closure is never accessed again after ownership
283 /// is taken by `fn_call`.
284 ///
285 /// # Example
286 ///
287 /// ```rust
288 /// # use hft_jobs::Job;
289 /// #[derive(Default)]
290 /// struct Ctx {
291 /// total: u32,
292 /// }
293 ///
294 /// let job = Job::<64, u32, Ctx>::new_with_ctx(|ctx| {
295 /// ctx.total += 2;
296 /// ctx.total
297 /// });
298 ///
299 /// let mut ctx = Ctx::default();
300 /// assert_eq!(job.run_with_ctx(&mut ctx), 2);
301 /// assert_eq!(ctx.total, 2);
302 /// ```
303 #[inline]
304 pub fn run_with_ctx(mut self, ctx: &mut C) -> R {
305 unsafe {
306 // Replace drop with a no-op drop (ZST panic closure) to avoid double-drop.
307 self.fn_drop = Job::<N, R, C>::default().fn_drop;
308 (self.fn_call)(self.data.0.as_ptr() as *mut u8, ctx)
309 }
310 }
311}
312
313impl<const N: usize, R, C> Default for Job<N, R, C> {
314 /// Constructs a "default" job which panics if it is ever run.
315 ///
316 /// This is mainly useful as a placeholder. Calling [`Job::run_with_ctx`] on a
317 /// default-constructed job will panic with the message
318 /// `"attempt to execute an empty job"`.
319 /// Used as a source of a no-op drop function after `run_with_ctx`.
320 #[inline]
321 fn default() -> Self {
322 // ZST closure; drop is effectively a no-op.
323 Self::new_with_ctx(|_ctx: &mut C| panic!("attempt to execute an empty job"))
324 }
325}
326
327impl<const N: usize, R, C> Clone for Job<N, R, C> {
328 /// Clones the underlying closure into a new `Job`.
329 ///
330 /// Both the original and cloned `Job` instances own independent copies of
331 /// the captured environment and can be run separately.
332 #[inline]
333 fn clone(&self) -> Self {
334 let mut new_job = Job {
335 fn_call: self.fn_call,
336 fn_clone: self.fn_clone,
337 fn_drop: self.fn_drop,
338 data: Align16([0u8; N]),
339 _marker: PhantomData,
340 };
341 unsafe {
342 (self.fn_clone)(self.data.0.as_ptr(), new_job.data.0.as_mut_ptr());
343 }
344 new_job
345 }
346}
347
348impl<const N: usize, R, C> Drop for Job<N, R, C> {
349 /// Drops the stored closure (if any) in place.
350 ///
351 /// If the job has not been run, this will drop the captured environment of
352 /// the stored closure. If the job **has** been run, [`Job::run_with_ctx`]
353 /// (or [`Job::run`] for the `C = ()` convenience impl) ensures that
354 /// `fn_drop` has been replaced so that no double-drop occurs.
355 #[inline]
356 fn drop(&mut self) {
357 unsafe {
358 (self.fn_drop)(self.data.0.as_mut_ptr());
359 }
360 }
361}
362
363/// For the common “no context” case (C = ()):
364impl<const N: usize, R> Job<N, R, ()> {
365 /// Convenience constructor for the common zero-context case.
366 ///
367 /// Equivalent to [`Job::new_with_ctx`] with `C = ()`.
368 /// Backwards-compatible constructor: closure takes no arguments.
369 #[inline]
370 pub fn new<F>(f: F) -> Self
371 where
372 F: FnOnce() -> R + Clone + Send + 'static,
373 {
374 // Wrap zero-arg closure into a context-taking closure
375 Self::new_with_ctx(move |_ctx: &mut ()| f())
376 }
377
378 /// Convenience runner for `C = ()`.
379 ///
380 /// Equivalent to [`Job::run_with_ctx`] with an empty context.
381 /// Backwards-compatible runner: no context required.
382 #[inline]
383 pub fn run(self) -> R {
384 self.run_with_ctx(&mut ())
385 }
386}
387
388#[cfg(test)]
389mod tests {
390 use super::*;
391 use std::sync::{
392 Arc,
393 atomic::{AtomicUsize, Ordering},
394 mpsc,
395 };
396 use std::thread;
397
398 /// Helper: prove at compile time that Job is Send.
399 fn assert_send<T: Send>() {}
400 #[test]
401 fn job_is_send() {
402 assert_send::<Job<64, ()>>();
403 assert_send::<Job<128, ()>>();
404 }
405
406 #[test]
407 fn job_runs_closure_once() {
408 let counter = Arc::new(AtomicUsize::new(0));
409 let c = {
410 let counter = counter.clone();
411 move || {
412 counter.fetch_add(1, Ordering::SeqCst);
413 }
414 };
415
416 let job: Job<64, ()> = Job::new(c);
417 job.run();
418
419 assert_eq!(counter.load(Ordering::SeqCst), 1);
420 }
421
422 #[test]
423 fn cloned_jobs_both_run() {
424 let counter = Arc::new(AtomicUsize::new(0));
425 let c = {
426 let counter = counter.clone();
427 move || {
428 counter.fetch_add(1, Ordering::SeqCst);
429 }
430 };
431
432 let job1: Job<64, ()> = Job::new(c);
433 let job2 = job1.clone();
434
435 job1.run();
436 job2.run();
437
438 assert_eq!(counter.load(Ordering::SeqCst), 2);
439 }
440
441 /// Type whose Drop increments a shared AtomicUsize.
442 #[derive(Clone)]
443 struct DropGuard {
444 drops: Arc<AtomicUsize>,
445 }
446
447 impl Drop for DropGuard {
448 fn drop(&mut self) {
449 self.drops.fetch_add(1, Ordering::SeqCst);
450 }
451 }
452
453 #[test]
454 fn job_drop_calls_closure_drop_when_not_run() {
455 let drops = Arc::new(AtomicUsize::new(0));
456 {
457 let guard = DropGuard {
458 drops: drops.clone(),
459 };
460 let c = move || {
461 // Do nothing; we only care about Drop.
462 let _ = &guard;
463 };
464 let _job: Job<64, ()> = Job::new(c);
465 // _job is dropped here without run()
466 }
467
468 // The closure's captured DropGuard should have been dropped exactly once.
469 assert_eq!(drops.load(Ordering::SeqCst), 1);
470 }
471
472 #[test]
473 fn job_run_does_not_double_drop_closure() {
474 let drops = Arc::new(AtomicUsize::new(0));
475 {
476 let guard = DropGuard {
477 drops: drops.clone(),
478 };
479 let c = move || {
480 // When this closure is called, its captured guard will
481 // be dropped at end of call.
482 let _ = &guard;
483 };
484 let job: Job<64, ()> = Job::new(c);
485 job.run();
486 // After run(), Job's Drop should not try to drop the original F again.
487 }
488
489 // Exactly one drop of the captured guard.
490 assert_eq!(drops.load(Ordering::SeqCst), 1);
491 }
492
493 #[test]
494 #[should_panic(expected = "attempt to execute an empty job")]
495 fn default_job_panics_on_run() {
496 let job: Job<64, ()> = Job::default();
497 job.run();
498 }
499
500 #[test]
501 fn job_can_be_sent_to_worker_thread_and_run() {
502 let counter = Arc::new(AtomicUsize::new(0));
503 let (tx, rx) = mpsc::channel::<Job<1024, ()>>();
504
505 // Worker thread that receives and runs jobs.
506 let worker_counter = counter.clone();
507 let worker = thread::spawn(move || {
508 while let Ok(job) = rx.recv() {
509 job.run();
510 worker_counter.fetch_add(1, Ordering::SeqCst);
511 }
512 });
513
514 // Send a few jobs.
515 for _ in 0..3 {
516 let c_counter = counter.clone();
517 let job = Job::new(move || {
518 c_counter.fetch_add(1, Ordering::SeqCst);
519 });
520 tx.send(job).unwrap();
521 }
522
523 drop(tx); // close channel
524 worker.join().unwrap();
525
526 // 3 jobs executed, and worker ran loop 3 times.
527 assert_eq!(counter.load(Ordering::SeqCst), 6);
528 }
529
530 #[test]
531 fn job_alignment_and_size_are_sufficient_for_small_closure() {
532 // Small closure capturing a couple of integers.
533 let a = 1u32;
534 let b = 2u64;
535 let c = move || {
536 let _ = a + b as u32;
537 };
538
539 let size_f = mem::size_of_val(&c);
540 let align_f = mem::align_of_val(&c);
541
542 assert!(size_f <= 64);
543 assert!(align_f <= mem::align_of::<Align16<[u8; 64]>>());
544
545 // Should not panic:
546 let job: Job<64, ()> = Job::new(c);
547 job.run();
548 }
549
550 #[test]
551 fn job_returns_value() {
552 let job: Job<64, u32> = Job::new(|| 40 + 2);
553 let result = job.run();
554 assert_eq!(result, 42);
555 }
556
557 #[test]
558 fn job_returns_owned_string() {
559 let job: Job<64, _> = Job::new(|| "hello".to_owned() + " world");
560 let result = job.run();
561 assert_eq!(result, "hello world");
562 }
563
564 #[test]
565 fn cloned_jobs_both_return_values() {
566 let job1: Job<64, u64> = Job::new(|| 10u64 * 10);
567 let job2 = job1.clone();
568
569 let r1 = job1.run();
570 let r2 = job2.run();
571
572 assert_eq!(r1, 100);
573 assert_eq!(r2, 100);
574 }
575
576 #[test]
577 fn job_with_result_can_be_sent_to_worker_thread() {
578 let (tx, rx) = mpsc::channel::<Job<128, u32>>();
579
580 // Worker thread that receives jobs and collects their results.
581 let worker = thread::spawn(move || {
582 let mut results = Vec::new();
583 while let Ok(job) = rx.recv() {
584 results.push(job.run());
585 }
586 results
587 });
588
589 // Send a few value-returning jobs.
590 for i in 0..3u32 {
591 let job = Job::new(move || i * 2);
592 tx.send(job).unwrap();
593 }
594
595 drop(tx); // close channel
596 let results = worker.join().unwrap();
597
598 assert_eq!(results.len(), 3);
599 assert!(results.contains(&0));
600 assert!(results.contains(&2));
601 assert!(results.contains(&4));
602 }
603
604 #[test]
605 fn job_runs_with_mutable_context() {
606 #[derive(Default, Clone)]
607 struct Ctx {
608 ticks: u32,
609 }
610
611 let mut ctx = Ctx::default();
612
613 let job = Job::<64, u32, Ctx>::new_with_ctx(|c| {
614 c.ticks += 1;
615 c.ticks
616 });
617
618 let job_clone = job.clone();
619
620 let first = job_clone.run_with_ctx(&mut ctx);
621 assert_eq!(first, 1);
622 assert_eq!(ctx.ticks, 1);
623
624 let second = job.run_with_ctx(&mut ctx);
625 assert_eq!(second, 2);
626 assert_eq!(ctx.ticks, 2);
627 }
628
629 #[test]
630 fn example_from_readme() {
631 // Create a simple channel for dispatching jobs to a worker
632 let (tx, rx) = mpsc::channel::<Job<64, _>>();
633
634 // Spawn the worker thread
635 thread::spawn(move || {
636 while let Ok(job) = rx.recv() {
637 let s = job.run(); // executes the closure
638 println!("{}", s);
639 }
640 });
641
642 // Send a job
643 let job = Job::<64, _>::new(|| "Hello from a job!".to_string());
644 tx.send(job).unwrap();
645
646 // A convenience macro that enqueues a logging job.
647 macro_rules! log {
648 ($tx:expr, $fmt:literal $(, $arg:expr)* $(,)?) => {{
649 let job = Job::<64, String>::new(move || {
650 format!($fmt $(, $arg)*)
651 });
652 let _ = $tx.send(job);
653 }};
654 }
655
656 // Use the `log!` macro to enqueue a logging job.
657 log!(tx, "Logging from thread: {}", 42);
658 }
659}