Skip to main content

spark_signals/shared/
mod.rs

1// ============================================================================
2// spark-signals - Reactive Shared Arrays
3//
4// Arrays backed by shared memory (SharedArrayBuffer) that integrate with
5// the reactive system. Designed for zero-copy FFI bridge between Rust and
6// TypeScript.
7//
8// Key features:
9// - Direct pointer access to shared memory (no copying)
10// - Per-index dirty tracking for sparse updates
11// - Cross-platform wait mechanism (futex on Linux, ulock on macOS)
12// - Full integration with reactive tracking
13// ============================================================================
14
15pub mod notify;
16pub mod shared_slot_buffer;
17
18use std::marker::PhantomData;
19use std::sync::atomic::{AtomicI32, AtomicU32, Ordering};
20
21// =============================================================================
22// CROSS-PLATFORM WAIT
23// =============================================================================
24
25/// Wait for the wake flag to become non-zero.
26///
27/// Uses platform-specific primitives:
28/// - Linux: futex_wait
29/// - macOS: __ulock_wait
30/// - Windows: WaitOnAddress (not yet implemented)
31///
32/// Returns immediately if the flag is already non-zero.
33pub fn wait_for_wake(wake_flag: &AtomicI32) {
34    loop {
35        // Check if flag is set
36        let value = wake_flag.load(Ordering::SeqCst);
37        if value != 0 {
38            // Reset flag and return
39            wake_flag.store(0, Ordering::SeqCst);
40            return;
41        }
42
43        // Wait for notification
44        platform_wait(wake_flag, 0);
45    }
46}
47
48/// Wait with timeout (in microseconds). Returns true if woken, false if timeout.
49pub fn wait_for_wake_timeout(wake_flag: &AtomicI32, timeout_us: u32) -> bool {
50    let value = wake_flag.load(Ordering::SeqCst);
51    if value != 0 {
52        wake_flag.store(0, Ordering::SeqCst);
53        return true;
54    }
55
56    platform_wait_timeout(wake_flag, 0, timeout_us);
57
58    let value = wake_flag.load(Ordering::SeqCst);
59    if value != 0 {
60        wake_flag.store(0, Ordering::SeqCst);
61        true
62    } else {
63        false
64    }
65}
66
67#[cfg(target_os = "linux")]
68fn platform_wait(flag: &AtomicI32, expected: i32) {
69    unsafe {
70        libc::syscall(
71            libc::SYS_futex,
72            flag as *const AtomicI32,
73            libc::FUTEX_WAIT,
74            expected,
75            std::ptr::null::<libc::timespec>(),
76        );
77    }
78}
79
80#[cfg(target_os = "linux")]
81fn platform_wait_timeout(flag: &AtomicI32, expected: i32, timeout_us: u32) {
82    let timeout = libc::timespec {
83        tv_sec: (timeout_us / 1_000_000) as i64,
84        tv_nsec: ((timeout_us % 1_000_000) * 1000) as i64,
85    };
86    unsafe {
87        libc::syscall(
88            libc::SYS_futex,
89            flag as *const AtomicI32,
90            libc::FUTEX_WAIT,
91            expected,
92            &timeout as *const libc::timespec,
93        );
94    }
95}
96
97#[cfg(target_os = "macos")]
98fn platform_wait(flag: &AtomicI32, expected: i32) {
99    // macOS uses __ulock_wait
100    // UL_COMPARE_AND_WAIT = 1
101    unsafe extern "C" {
102        fn __ulock_wait(operation: u32, addr: *const AtomicI32, value: u64, timeout: u32) -> i32;
103    }
104    unsafe {
105        __ulock_wait(1, flag, expected as u64, 0);
106    }
107}
108
109#[cfg(target_os = "macos")]
110fn platform_wait_timeout(flag: &AtomicI32, expected: i32, timeout_us: u32) {
111    unsafe extern "C" {
112        fn __ulock_wait(operation: u32, addr: *const AtomicI32, value: u64, timeout: u32) -> i32;
113    }
114    unsafe {
115        __ulock_wait(1, flag, expected as u64, timeout_us);
116    }
117}
118
119#[cfg(target_os = "windows")]
120fn platform_wait(flag: &AtomicI32, expected: i32) {
121    // Windows uses WaitOnAddress
122    extern "system" {
123        fn WaitOnAddress(
124            address: *const AtomicI32,
125            compare_address: *const i32,
126            address_size: usize,
127            milliseconds: u32,
128        ) -> i32;
129    }
130    unsafe {
131        WaitOnAddress(flag, &expected, std::mem::size_of::<i32>(), u32::MAX);
132    }
133}
134
135#[cfg(target_os = "windows")]
136fn platform_wait_timeout(flag: &AtomicI32, expected: i32, timeout_us: u32) {
137    extern "system" {
138        fn WaitOnAddress(
139            address: *const AtomicI32,
140            compare_address: *const i32,
141            address_size: usize,
142            milliseconds: u32,
143        ) -> i32;
144    }
145    let timeout_ms = timeout_us / 1000;
146    unsafe {
147        WaitOnAddress(flag, &expected, std::mem::size_of::<i32>(), timeout_ms);
148    }
149}
150
151// Fallback for other platforms (busy wait - not recommended for production)
152#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
153fn platform_wait(_flag: &AtomicI32, _expected: i32) {
154    std::thread::sleep(std::time::Duration::from_micros(100));
155}
156
157#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
158fn platform_wait_timeout(_flag: &AtomicI32, _expected: i32, timeout_us: u32) {
159    std::thread::sleep(std::time::Duration::from_micros(timeout_us as u64));
160}
161
162// =============================================================================
163// SHARED BUFFER CONTEXT
164// =============================================================================
165
166/// Context for a shared buffer - holds pointers to shared memory regions.
167pub struct SharedBufferContext {
168    /// Base pointer to the shared memory
169    pub base_ptr: *mut u8,
170    /// Total size of the shared buffer
171    pub size: usize,
172    /// Pointer to dirty flags (one byte per index)
173    pub dirty_flags: *mut u8,
174    /// Pointer to wake flag (AtomicI32)
175    pub wake_flag: *const AtomicI32,
176    /// Maximum number of elements
177    pub max_elements: usize,
178}
179
180impl SharedBufferContext {
181    /// Create a new context from raw pointers.
182    ///
183    /// # Safety
184    ///
185    /// - `base_ptr` must point to valid shared memory
186    /// - All offsets must be within the buffer bounds
187    /// - The memory must remain valid for the lifetime of this context
188    pub unsafe fn new(
189        base_ptr: *mut u8,
190        size: usize,
191        dirty_flags_offset: usize,
192        wake_flag_offset: usize,
193        max_elements: usize,
194    ) -> Self {
195        unsafe {
196            Self {
197                base_ptr,
198                size,
199                dirty_flags: base_ptr.add(dirty_flags_offset),
200                wake_flag: base_ptr.add(wake_flag_offset) as *const AtomicI32,
201                max_elements,
202            }
203        }
204    }
205
206    /// Get the wake flag reference for waiting.
207    pub fn wake_flag(&self) -> &AtomicI32 {
208        unsafe { &*self.wake_flag }
209    }
210
211    /// Check if an index is marked dirty.
212    #[inline]
213    pub fn is_dirty(&self, index: usize) -> bool {
214        debug_assert!(index < self.max_elements);
215        unsafe { *self.dirty_flags.add(index) != 0 }
216    }
217
218    /// Clear the dirty flag for an index.
219    #[inline]
220    pub fn clear_dirty(&self, index: usize) {
221        debug_assert!(index < self.max_elements);
222        unsafe {
223            *self.dirty_flags.add(index) = 0;
224        }
225    }
226
227    /// Get all dirty indices.
228    pub fn dirty_indices(&self) -> Vec<usize> {
229        (0..self.max_elements)
230            .filter(|&i| self.is_dirty(i))
231            .collect()
232    }
233
234    /// Clear all dirty flags.
235    pub fn clear_all_dirty(&self) {
236        unsafe {
237            std::ptr::write_bytes(self.dirty_flags, 0, self.max_elements);
238        }
239    }
240}
241
242// Safety: The shared memory is synchronized via atomics
243unsafe impl Send for SharedBufferContext {}
244unsafe impl Sync for SharedBufferContext {}
245
246// =============================================================================
247// REACTIVE SHARED ARRAY
248// =============================================================================
249
250/// A reactive array backed by shared memory.
251///
252/// Reads from this array can trigger reactive subscriptions.
253/// The array is read-only from Rust's perspective - writes come from the
254/// TypeScript side.
255///
256/// # Type Parameters
257///
258/// - `T`: The element type (must be Copy for safe shared memory access)
259pub struct ReactiveSharedArray<T: Copy> {
260    ptr: *const T,
261    len: usize,
262    dirty: *const u8,
263    /// Signal version for coarse-grained change detection
264    version: AtomicU32,
265    _marker: PhantomData<T>,
266}
267
268// Safety: The shared memory is synchronized via atomics
269unsafe impl<T: Copy + Send> Send for ReactiveSharedArray<T> {}
270unsafe impl<T: Copy + Sync> Sync for ReactiveSharedArray<T> {}
271
272impl<T: Copy> ReactiveSharedArray<T> {
273    /// Create a new reactive shared array.
274    ///
275    /// # Safety
276    ///
277    /// - `ptr` must point to valid shared memory with at least `len * size_of::<T>()` bytes
278    /// - `dirty` must point to valid shared memory with at least `len` bytes
279    /// - Both pointers must remain valid for the lifetime of this array
280    pub unsafe fn new(ptr: *const T, len: usize, dirty: *const u8) -> Self {
281        Self {
282            ptr,
283            len,
284            dirty,
285            version: AtomicU32::new(0),
286            _marker: PhantomData,
287        }
288    }
289
290    /// Create from a SharedBufferContext with byte offset.
291    ///
292    /// # Safety
293    ///
294    /// - The offset must be properly aligned for type T
295    /// - The region must not overlap with other mutable regions
296    pub unsafe fn from_context(ctx: &SharedBufferContext, byte_offset: usize, len: usize) -> Self {
297        unsafe {
298            let ptr = ctx.base_ptr.add(byte_offset) as *const T;
299            Self::new(ptr, len, ctx.dirty_flags)
300        }
301    }
302
303    /// Get a value at the given index.
304    #[inline]
305    pub fn get(&self, index: usize) -> T {
306        debug_assert!(index < self.len, "index out of bounds");
307        unsafe { *self.ptr.add(index) }
308    }
309
310    /// Check if an index is marked dirty.
311    #[inline]
312    pub fn is_dirty(&self, index: usize) -> bool {
313        debug_assert!(index < self.len);
314        unsafe { *self.dirty.add(index) != 0 }
315    }
316
317    /// Clear the dirty flag for an index.
318    #[inline]
319    pub fn clear_dirty(&self, index: usize) {
320        debug_assert!(index < self.len);
321        unsafe {
322            let dirty_ptr = self.dirty as *mut u8;
323            *dirty_ptr.add(index) = 0;
324        }
325    }
326
327    /// Get all dirty indices.
328    pub fn dirty_indices(&self) -> Vec<usize> {
329        (0..self.len).filter(|&i| self.is_dirty(i)).collect()
330    }
331
332    /// Increment version (called when processing changes).
333    pub fn bump_version(&self) {
334        self.version.fetch_add(1, Ordering::SeqCst);
335    }
336
337    /// Get current version.
338    pub fn version(&self) -> u32 {
339        self.version.load(Ordering::SeqCst)
340    }
341
342    pub fn len(&self) -> usize {
343        self.len
344    }
345
346    pub fn is_empty(&self) -> bool {
347        self.len == 0
348    }
349
350    /// Get a slice of the underlying data.
351    ///
352    /// # Safety
353    ///
354    /// The returned slice is only valid while the TypeScript side is not writing.
355    /// Use only within a synchronized section.
356    pub unsafe fn as_slice(&self) -> &[T] {
357        unsafe { std::slice::from_raw_parts(self.ptr, self.len) }
358    }
359
360    /// Iterate over all elements.
361    pub fn iter(&self) -> impl Iterator<Item = T> + '_ {
362        (0..self.len).map(move |i| self.get(i))
363    }
364}
365
366// =============================================================================
367// MUTABLE SHARED ARRAY (for output arrays that Rust writes to)
368// =============================================================================
369
370/// A mutable array backed by shared memory.
371///
372/// Used for output arrays where Rust writes computed results that
373/// TypeScript reads.
374pub struct MutableSharedArray<T: Copy> {
375    ptr: *mut T,
376    len: usize,
377    _marker: PhantomData<T>,
378}
379
380// Safety: The shared memory is synchronized via atomics
381unsafe impl<T: Copy + Send> Send for MutableSharedArray<T> {}
382unsafe impl<T: Copy + Sync> Sync for MutableSharedArray<T> {}
383
384impl<T: Copy> MutableSharedArray<T> {
385    /// Create a new mutable shared array.
386    ///
387    /// # Safety
388    ///
389    /// - `ptr` must point to valid shared memory
390    /// - The memory must remain valid for the lifetime of this array
391    /// - No other code should write to this memory region
392    pub unsafe fn new(ptr: *mut T, len: usize) -> Self {
393        Self {
394            ptr,
395            len,
396            _marker: PhantomData,
397        }
398    }
399
400    /// Create from a SharedBufferContext with byte offset.
401    pub unsafe fn from_context(ctx: &SharedBufferContext, byte_offset: usize, len: usize) -> Self {
402        unsafe {
403            let ptr = ctx.base_ptr.add(byte_offset) as *mut T;
404            Self::new(ptr, len)
405        }
406    }
407
408    /// Get a value at the given index.
409    #[inline]
410    pub fn get(&self, index: usize) -> T {
411        debug_assert!(index < self.len, "index out of bounds");
412        unsafe { *self.ptr.add(index) }
413    }
414
415    /// Set a value at the given index.
416    #[inline]
417    pub fn set(&self, index: usize, value: T) {
418        debug_assert!(index < self.len, "index out of bounds");
419        unsafe {
420            *self.ptr.add(index) = value;
421        }
422    }
423
424    pub fn len(&self) -> usize {
425        self.len
426    }
427
428    pub fn is_empty(&self) -> bool {
429        self.len == 0
430    }
431
432    /// Get a mutable slice of the underlying data.
433    ///
434    /// # Safety
435    ///
436    /// The returned slice is only valid while no other code is accessing this memory.
437    pub unsafe fn as_mut_slice(&mut self) -> &mut [T] {
438        unsafe { std::slice::from_raw_parts_mut(self.ptr, self.len) }
439    }
440}
441
442// =============================================================================
443// TYPE ALIASES
444// =============================================================================
445
446/// Reactive f32 array backed by shared memory.
447pub type ReactiveSharedF32Array = ReactiveSharedArray<f32>;
448
449/// Reactive u8 array backed by shared memory.
450pub type ReactiveSharedU8Array = ReactiveSharedArray<u8>;
451
452/// Reactive i32 array backed by shared memory.
453pub type ReactiveSharedI32Array = ReactiveSharedArray<i32>;
454
455/// Reactive u32 array backed by shared memory.
456pub type ReactiveSharedU32Array = ReactiveSharedArray<u32>;
457
458/// Mutable f32 array for output data.
459pub type MutableSharedF32Array = MutableSharedArray<f32>;
460
461// =============================================================================
462// TESTS
463// =============================================================================
464
465#[cfg(test)]
466mod tests {
467    use super::*;
468
469    #[test]
470    fn test_reactive_shared_array_basic() {
471        // Create a simple buffer
472        let mut buffer = vec![1.0f32, 2.0, 3.0, 4.0, 5.0];
473        let mut dirty = vec![0u8; 5];
474
475        let array = unsafe {
476            ReactiveSharedArray::new(buffer.as_ptr(), buffer.len(), dirty.as_ptr())
477        };
478
479        assert_eq!(array.len(), 5);
480        assert_eq!(array.get(0), 1.0);
481        assert_eq!(array.get(4), 5.0);
482
483        // Test dirty tracking
484        dirty[2] = 1;
485        assert!(!array.is_dirty(0));
486        assert!(array.is_dirty(2));
487
488        let dirty_indices = array.dirty_indices();
489        assert_eq!(dirty_indices, vec![2]);
490    }
491
492    #[test]
493    fn test_mutable_shared_array() {
494        let mut buffer = vec![0.0f32; 5];
495
496        let array = unsafe { MutableSharedArray::new(buffer.as_mut_ptr(), buffer.len()) };
497
498        array.set(0, 10.0);
499        array.set(2, 20.0);
500
501        assert_eq!(array.get(0), 10.0);
502        assert_eq!(array.get(1), 0.0);
503        assert_eq!(array.get(2), 20.0);
504    }
505
506    #[test]
507    fn test_version_tracking() {
508        let buffer = vec![1.0f32; 5];
509        let dirty = vec![0u8; 5];
510
511        let array = unsafe {
512            ReactiveSharedArray::new(buffer.as_ptr(), buffer.len(), dirty.as_ptr())
513        };
514
515        assert_eq!(array.version(), 0);
516        array.bump_version();
517        assert_eq!(array.version(), 1);
518        array.bump_version();
519        assert_eq!(array.version(), 2);
520    }
521}