1pub mod notify;
16pub mod shared_slot_buffer;
17
18use std::marker::PhantomData;
19use std::sync::atomic::{AtomicI32, AtomicU32, Ordering};
20
21pub fn wait_for_wake(wake_flag: &AtomicI32) {
34 loop {
35 let value = wake_flag.load(Ordering::SeqCst);
37 if value != 0 {
38 wake_flag.store(0, Ordering::SeqCst);
40 return;
41 }
42
43 platform_wait(wake_flag, 0);
45 }
46}
47
48pub fn wait_for_wake_timeout(wake_flag: &AtomicI32, timeout_us: u32) -> bool {
50 let value = wake_flag.load(Ordering::SeqCst);
51 if value != 0 {
52 wake_flag.store(0, Ordering::SeqCst);
53 return true;
54 }
55
56 platform_wait_timeout(wake_flag, 0, timeout_us);
57
58 let value = wake_flag.load(Ordering::SeqCst);
59 if value != 0 {
60 wake_flag.store(0, Ordering::SeqCst);
61 true
62 } else {
63 false
64 }
65}
66
67#[cfg(target_os = "linux")]
68fn platform_wait(flag: &AtomicI32, expected: i32) {
69 unsafe {
70 libc::syscall(
71 libc::SYS_futex,
72 flag as *const AtomicI32,
73 libc::FUTEX_WAIT,
74 expected,
75 std::ptr::null::<libc::timespec>(),
76 );
77 }
78}
79
80#[cfg(target_os = "linux")]
81fn platform_wait_timeout(flag: &AtomicI32, expected: i32, timeout_us: u32) {
82 let timeout = libc::timespec {
83 tv_sec: (timeout_us / 1_000_000) as i64,
84 tv_nsec: ((timeout_us % 1_000_000) * 1000) as i64,
85 };
86 unsafe {
87 libc::syscall(
88 libc::SYS_futex,
89 flag as *const AtomicI32,
90 libc::FUTEX_WAIT,
91 expected,
92 &timeout as *const libc::timespec,
93 );
94 }
95}
96
97#[cfg(target_os = "macos")]
98fn platform_wait(flag: &AtomicI32, expected: i32) {
99 unsafe extern "C" {
102 fn __ulock_wait(operation: u32, addr: *const AtomicI32, value: u64, timeout: u32) -> i32;
103 }
104 unsafe {
105 __ulock_wait(1, flag, expected as u64, 0);
106 }
107}
108
109#[cfg(target_os = "macos")]
110fn platform_wait_timeout(flag: &AtomicI32, expected: i32, timeout_us: u32) {
111 unsafe extern "C" {
112 fn __ulock_wait(operation: u32, addr: *const AtomicI32, value: u64, timeout: u32) -> i32;
113 }
114 unsafe {
115 __ulock_wait(1, flag, expected as u64, timeout_us);
116 }
117}
118
119#[cfg(target_os = "windows")]
120fn platform_wait(flag: &AtomicI32, expected: i32) {
121 extern "system" {
123 fn WaitOnAddress(
124 address: *const AtomicI32,
125 compare_address: *const i32,
126 address_size: usize,
127 milliseconds: u32,
128 ) -> i32;
129 }
130 unsafe {
131 WaitOnAddress(flag, &expected, std::mem::size_of::<i32>(), u32::MAX);
132 }
133}
134
135#[cfg(target_os = "windows")]
136fn platform_wait_timeout(flag: &AtomicI32, expected: i32, timeout_us: u32) {
137 extern "system" {
138 fn WaitOnAddress(
139 address: *const AtomicI32,
140 compare_address: *const i32,
141 address_size: usize,
142 milliseconds: u32,
143 ) -> i32;
144 }
145 let timeout_ms = timeout_us / 1000;
146 unsafe {
147 WaitOnAddress(flag, &expected, std::mem::size_of::<i32>(), timeout_ms);
148 }
149}
150
151#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
153fn platform_wait(_flag: &AtomicI32, _expected: i32) {
154 std::thread::sleep(std::time::Duration::from_micros(100));
155}
156
157#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
158fn platform_wait_timeout(_flag: &AtomicI32, _expected: i32, timeout_us: u32) {
159 std::thread::sleep(std::time::Duration::from_micros(timeout_us as u64));
160}
161
162pub struct SharedBufferContext {
168 pub base_ptr: *mut u8,
170 pub size: usize,
172 pub dirty_flags: *mut u8,
174 pub wake_flag: *const AtomicI32,
176 pub max_elements: usize,
178}
179
180impl SharedBufferContext {
181 pub unsafe fn new(
189 base_ptr: *mut u8,
190 size: usize,
191 dirty_flags_offset: usize,
192 wake_flag_offset: usize,
193 max_elements: usize,
194 ) -> Self {
195 unsafe {
196 Self {
197 base_ptr,
198 size,
199 dirty_flags: base_ptr.add(dirty_flags_offset),
200 wake_flag: base_ptr.add(wake_flag_offset) as *const AtomicI32,
201 max_elements,
202 }
203 }
204 }
205
206 pub fn wake_flag(&self) -> &AtomicI32 {
208 unsafe { &*self.wake_flag }
209 }
210
211 #[inline]
213 pub fn is_dirty(&self, index: usize) -> bool {
214 debug_assert!(index < self.max_elements);
215 unsafe { *self.dirty_flags.add(index) != 0 }
216 }
217
218 #[inline]
220 pub fn clear_dirty(&self, index: usize) {
221 debug_assert!(index < self.max_elements);
222 unsafe {
223 *self.dirty_flags.add(index) = 0;
224 }
225 }
226
227 pub fn dirty_indices(&self) -> Vec<usize> {
229 (0..self.max_elements)
230 .filter(|&i| self.is_dirty(i))
231 .collect()
232 }
233
234 pub fn clear_all_dirty(&self) {
236 unsafe {
237 std::ptr::write_bytes(self.dirty_flags, 0, self.max_elements);
238 }
239 }
240}
241
242unsafe impl Send for SharedBufferContext {}
244unsafe impl Sync for SharedBufferContext {}
245
246pub struct ReactiveSharedArray<T: Copy> {
260 ptr: *const T,
261 len: usize,
262 dirty: *const u8,
263 version: AtomicU32,
265 _marker: PhantomData<T>,
266}
267
268unsafe impl<T: Copy + Send> Send for ReactiveSharedArray<T> {}
270unsafe impl<T: Copy + Sync> Sync for ReactiveSharedArray<T> {}
271
272impl<T: Copy> ReactiveSharedArray<T> {
273 pub unsafe fn new(ptr: *const T, len: usize, dirty: *const u8) -> Self {
281 Self {
282 ptr,
283 len,
284 dirty,
285 version: AtomicU32::new(0),
286 _marker: PhantomData,
287 }
288 }
289
290 pub unsafe fn from_context(ctx: &SharedBufferContext, byte_offset: usize, len: usize) -> Self {
297 unsafe {
298 let ptr = ctx.base_ptr.add(byte_offset) as *const T;
299 Self::new(ptr, len, ctx.dirty_flags)
300 }
301 }
302
303 #[inline]
305 pub fn get(&self, index: usize) -> T {
306 debug_assert!(index < self.len, "index out of bounds");
307 unsafe { *self.ptr.add(index) }
308 }
309
310 #[inline]
312 pub fn is_dirty(&self, index: usize) -> bool {
313 debug_assert!(index < self.len);
314 unsafe { *self.dirty.add(index) != 0 }
315 }
316
317 #[inline]
319 pub fn clear_dirty(&self, index: usize) {
320 debug_assert!(index < self.len);
321 unsafe {
322 let dirty_ptr = self.dirty as *mut u8;
323 *dirty_ptr.add(index) = 0;
324 }
325 }
326
327 pub fn dirty_indices(&self) -> Vec<usize> {
329 (0..self.len).filter(|&i| self.is_dirty(i)).collect()
330 }
331
332 pub fn bump_version(&self) {
334 self.version.fetch_add(1, Ordering::SeqCst);
335 }
336
337 pub fn version(&self) -> u32 {
339 self.version.load(Ordering::SeqCst)
340 }
341
342 pub fn len(&self) -> usize {
343 self.len
344 }
345
346 pub fn is_empty(&self) -> bool {
347 self.len == 0
348 }
349
350 pub unsafe fn as_slice(&self) -> &[T] {
357 unsafe { std::slice::from_raw_parts(self.ptr, self.len) }
358 }
359
360 pub fn iter(&self) -> impl Iterator<Item = T> + '_ {
362 (0..self.len).map(move |i| self.get(i))
363 }
364}
365
366pub struct MutableSharedArray<T: Copy> {
375 ptr: *mut T,
376 len: usize,
377 _marker: PhantomData<T>,
378}
379
380unsafe impl<T: Copy + Send> Send for MutableSharedArray<T> {}
382unsafe impl<T: Copy + Sync> Sync for MutableSharedArray<T> {}
383
384impl<T: Copy> MutableSharedArray<T> {
385 pub unsafe fn new(ptr: *mut T, len: usize) -> Self {
393 Self {
394 ptr,
395 len,
396 _marker: PhantomData,
397 }
398 }
399
400 pub unsafe fn from_context(ctx: &SharedBufferContext, byte_offset: usize, len: usize) -> Self {
402 unsafe {
403 let ptr = ctx.base_ptr.add(byte_offset) as *mut T;
404 Self::new(ptr, len)
405 }
406 }
407
408 #[inline]
410 pub fn get(&self, index: usize) -> T {
411 debug_assert!(index < self.len, "index out of bounds");
412 unsafe { *self.ptr.add(index) }
413 }
414
415 #[inline]
417 pub fn set(&self, index: usize, value: T) {
418 debug_assert!(index < self.len, "index out of bounds");
419 unsafe {
420 *self.ptr.add(index) = value;
421 }
422 }
423
424 pub fn len(&self) -> usize {
425 self.len
426 }
427
428 pub fn is_empty(&self) -> bool {
429 self.len == 0
430 }
431
432 pub unsafe fn as_mut_slice(&mut self) -> &mut [T] {
438 unsafe { std::slice::from_raw_parts_mut(self.ptr, self.len) }
439 }
440}
441
442pub type ReactiveSharedF32Array = ReactiveSharedArray<f32>;
448
449pub type ReactiveSharedU8Array = ReactiveSharedArray<u8>;
451
452pub type ReactiveSharedI32Array = ReactiveSharedArray<i32>;
454
455pub type ReactiveSharedU32Array = ReactiveSharedArray<u32>;
457
458pub type MutableSharedF32Array = MutableSharedArray<f32>;
460
461#[cfg(test)]
466mod tests {
467 use super::*;
468
469 #[test]
470 fn test_reactive_shared_array_basic() {
471 let mut buffer = vec![1.0f32, 2.0, 3.0, 4.0, 5.0];
473 let mut dirty = vec![0u8; 5];
474
475 let array = unsafe {
476 ReactiveSharedArray::new(buffer.as_ptr(), buffer.len(), dirty.as_ptr())
477 };
478
479 assert_eq!(array.len(), 5);
480 assert_eq!(array.get(0), 1.0);
481 assert_eq!(array.get(4), 5.0);
482
483 dirty[2] = 1;
485 assert!(!array.is_dirty(0));
486 assert!(array.is_dirty(2));
487
488 let dirty_indices = array.dirty_indices();
489 assert_eq!(dirty_indices, vec![2]);
490 }
491
492 #[test]
493 fn test_mutable_shared_array() {
494 let mut buffer = vec![0.0f32; 5];
495
496 let array = unsafe { MutableSharedArray::new(buffer.as_mut_ptr(), buffer.len()) };
497
498 array.set(0, 10.0);
499 array.set(2, 20.0);
500
501 assert_eq!(array.get(0), 10.0);
502 assert_eq!(array.get(1), 0.0);
503 assert_eq!(array.get(2), 20.0);
504 }
505
506 #[test]
507 fn test_version_tracking() {
508 let buffer = vec![1.0f32; 5];
509 let dirty = vec![0u8; 5];
510
511 let array = unsafe {
512 ReactiveSharedArray::new(buffer.as_ptr(), buffer.len(), dirty.as_ptr())
513 };
514
515 assert_eq!(array.version(), 0);
516 array.bump_version();
517 assert_eq!(array.version(), 1);
518 array.bump_version();
519 assert_eq!(array.version(), 2);
520 }
521}