roplat 0.2.0

roplat: just a robot operation system
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
//! 环形队列旁路通讯内核
//!
//! 采用原子索引实现无锁 SPSC(单写单读)有序消息传递。
//! 核心思想:head 和 tail 单调递增,通过 `% capacity` 映射到实际槽位。
//! 写端只修改 head,读端只修改 tail,通过 Acquire/Release 保证可见性。
//!
//! 与三缓冲的区别:
//! - 三缓冲始终保持最新数据(有损),适合状态同步
//! - 环形队列保持 FIFO 有序(可选有损),适合事件流

use std::alloc::{Layout, alloc_zeroed, dealloc};
use std::ffi::c_void;
use std::sync::atomic::{AtomicUsize, Ordering};

/// 环形队列控制块
///
/// 无锁 SPSC 设计:head/tail 单调递增,实际索引 = value % capacity。
/// `head - tail` 为当前元素数量。
#[repr(C)]
pub struct RingBufferCtrl {
    head: AtomicUsize,
    tail: AtomicUsize,
    capacity: usize,
    slot_size: usize,
    data: *mut u8,
}

// Safety: SPSC 通过原子索引保证线程安全
unsafe impl Send for RingBufferCtrl {}
unsafe impl Sync for RingBufferCtrl {}

impl RingBufferCtrl {
    #[inline]
    fn slot_ptr(&self, index: usize) -> *mut u8 {
        unsafe { self.data.add((index % self.capacity) * self.slot_size) }
    }
}

// ============================================================
// C FFI
// ============================================================

/// 创建环形队列控制块
///
/// # Safety
/// - `capacity` 必须 > 0
/// - `slot_size` 必须 > 0
#[unsafe(no_mangle)]
pub unsafe extern "C" fn roplat_rb_new(capacity: usize, slot_size: usize) -> *mut RingBufferCtrl {
    assert!(capacity > 0 && slot_size > 0);
    let layout = Layout::from_size_align(capacity * slot_size, 8).unwrap();
    let data = unsafe { alloc_zeroed(layout) };
    if data.is_null() {
        return std::ptr::null_mut();
    }
    Box::into_raw(Box::new(RingBufferCtrl {
        head: AtomicUsize::new(0),
        tail: AtomicUsize::new(0),
        capacity,
        slot_size,
        data,
    }))
}

/// 释放环形队列控制块
///
/// # Safety
/// - `ctrl` 必须是 `roplat_rb_new` 返回的指针
#[unsafe(no_mangle)]
pub unsafe extern "C" fn roplat_rb_destroy(ctrl: *mut RingBufferCtrl) {
    if ctrl.is_null() {
        return;
    }
    unsafe {
        let ctrl = Box::from_raw(ctrl);
        let layout = Layout::from_size_align(ctrl.capacity * ctrl.slot_size, 8).unwrap();
        dealloc(ctrl.data, layout);
    }
}

/// 尝试推入一个元素(SPSC 写端)
///
/// 将 `src` 指向的 `slot_size` 字节拷入队列。
/// 队列满时返回 false,不阻塞。
///
/// # Safety
/// - `ctrl` 必须有效
/// - `src` 必须指向至少 `slot_size` 字节的可读内存
/// - 同一时刻只允许一个线程调用 push 系列函数
#[unsafe(no_mangle)]
pub unsafe extern "C" fn roplat_rb_try_push(ctrl: *mut RingBufferCtrl, src: *const c_void) -> bool {
    unsafe {
        let ctrl = &*ctrl;
        let head = ctrl.head.load(Ordering::Relaxed);
        let tail = ctrl.tail.load(Ordering::Acquire);

        if head.wrapping_sub(tail) >= ctrl.capacity {
            return false;
        }

        std::ptr::copy_nonoverlapping(src as *const u8, ctrl.slot_ptr(head), ctrl.slot_size);
        ctrl.head.store(head.wrapping_add(1), Ordering::Release);
        true
    }
}

/// 强制推入一个元素,队列满时丢弃最旧数据
///
/// # Safety
/// - 同 `roplat_rb_try_push`
/// - **不可**与 `roplat_rb_try_pop` 并发调用(写端和读端需同步)
#[unsafe(no_mangle)]
pub unsafe extern "C" fn roplat_rb_force_push(ctrl: *mut RingBufferCtrl, src: *const c_void) {
    unsafe {
        let ctrl = &*ctrl;
        let head = ctrl.head.load(Ordering::Relaxed);
        let tail = ctrl.tail.load(Ordering::Acquire);

        if head.wrapping_sub(tail) >= ctrl.capacity {
            // 丢弃最旧:推进 tail
            ctrl.tail.store(tail.wrapping_add(1), Ordering::Release);
        }

        std::ptr::copy_nonoverlapping(src as *const u8, ctrl.slot_ptr(head), ctrl.slot_size);
        ctrl.head.store(head.wrapping_add(1), Ordering::Release);
    }
}

/// 尝试弹出一个元素(SPSC 读端)
///
/// 将队首元素拷贝到 `dst`。
/// 队列空时返回 false,不阻塞。
///
/// # Safety
/// - `ctrl` 必须有效
/// - `dst` 必须指向至少 `slot_size` 字节的可写内存
/// - 同一时刻只允许一个线程调用 pop 系列函数
#[unsafe(no_mangle)]
pub unsafe extern "C" fn roplat_rb_try_pop(ctrl: *mut RingBufferCtrl, dst: *mut c_void) -> bool {
    unsafe {
        let ctrl = &*ctrl;
        let tail = ctrl.tail.load(Ordering::Relaxed);
        let head = ctrl.head.load(Ordering::Acquire);

        if head == tail {
            return false;
        }

        std::ptr::copy_nonoverlapping(ctrl.slot_ptr(tail), dst as *mut u8, ctrl.slot_size);
        ctrl.tail.store(tail.wrapping_add(1), Ordering::Release);
        true
    }
}

/// 当前元素数量
///
/// # Safety
/// - `ctrl` 必须有效
#[unsafe(no_mangle)]
pub unsafe extern "C" fn roplat_rb_len(ctrl: *const RingBufferCtrl) -> usize {
    unsafe {
        let ctrl = &*ctrl;
        let head = ctrl.head.load(Ordering::Acquire);
        let tail = ctrl.tail.load(Ordering::Acquire);
        head.wrapping_sub(tail)
    }
}

/// 队列容量
///
/// # Safety
/// - `ctrl` 必须有效
#[unsafe(no_mangle)]
pub unsafe extern "C" fn roplat_rb_capacity(ctrl: *const RingBufferCtrl) -> usize {
    unsafe { (*ctrl).capacity }
}

// ============================================================
// Rust 泛型层
// ============================================================

/// 环形队列写端
pub struct RingBufferWriter<T> {
    ctrl: *mut RingBufferCtrl,
    _marker: std::marker::PhantomData<T>,
}

unsafe impl<T: Send> Send for RingBufferWriter<T> {}

impl<T> RingBufferWriter<T> {
    /// 从控制块创建写端
    ///
    /// # Safety
    /// - `ctrl` 在 Writer 生命周期内必须有效
    /// - 只允许一个 Writer 实例绑定同一个 ctrl
    pub unsafe fn new(ctrl: *mut RingBufferCtrl) -> Self {
        Self { ctrl, _marker: std::marker::PhantomData }
    }

    /// 尝试推入数据,队列满返回 false
    pub fn try_push(&mut self, data: &T) -> bool {
        unsafe { roplat_rb_try_push(self.ctrl, data as *const T as *const c_void) }
    }

    /// 强制推入,满时丢弃最旧数据
    ///
    /// **注意**:不可与 `RingBufferReader::try_pop` 并发使用
    pub fn force_push(&mut self, data: &T) {
        unsafe { roplat_rb_force_push(self.ctrl, data as *const T as *const c_void) }
    }

    /// 当前元素数量
    pub fn len(&self) -> usize {
        unsafe { roplat_rb_len(self.ctrl) }
    }

    /// 队列是否为空
    pub fn is_empty(&self) -> bool {
        self.len() == 0
    }

    /// 队列是否已满
    pub fn is_full(&self) -> bool {
        self.len() >= unsafe { roplat_rb_capacity(self.ctrl) }
    }
}

/// 环形队列读端
pub struct RingBufferReader<T> {
    ctrl: *mut RingBufferCtrl,
    _marker: std::marker::PhantomData<T>,
}

unsafe impl<T: Send> Send for RingBufferReader<T> {}

impl<T> RingBufferReader<T> {
    /// 从控制块创建读端
    ///
    /// # Safety
    /// - `ctrl` 在 Reader 生命周期内必须有效
    /// - 只允许一个 Reader 实例绑定同一个 ctrl
    pub unsafe fn new(ctrl: *mut RingBufferCtrl) -> Self {
        Self { ctrl, _marker: std::marker::PhantomData }
    }

    /// 尝试弹出一个元素,队列空返回 None
    pub fn try_pop(&mut self) -> Option<T> {
        let mut out = std::mem::MaybeUninit::<T>::uninit();
        let ok = unsafe { roplat_rb_try_pop(self.ctrl, out.as_mut_ptr() as *mut c_void) };
        if ok {
            Some(unsafe { out.assume_init() })
        } else {
            None
        }
    }

    /// 当前元素数量
    pub fn len(&self) -> usize {
        unsafe { roplat_rb_len(self.ctrl) }
    }

    /// 队列是否为空
    pub fn is_empty(&self) -> bool {
        self.len() == 0
    }
}

/// 创建一个 SPSC 环形队列通道
///
/// 返回 `(Writer, Reader)` 对。
pub fn create_ring_buffer<T>(capacity: usize) -> (RingBufferWriter<T>, RingBufferReader<T>) {
    let ctrl = unsafe { roplat_rb_new(capacity, std::mem::size_of::<T>()) };
    assert!(!ctrl.is_null(), "failed to allocate ring buffer");
    let writer = unsafe { RingBufferWriter::new(ctrl) };
    let reader = unsafe { RingBufferReader::new(ctrl) };
    (writer, reader)
}

/// 带生命周期管理的环形队列通道
///
/// 持有控制块所有权,确保正确释放。
pub struct RingBufferChannel<T> {
    ctrl: *mut RingBufferCtrl,
    _marker: std::marker::PhantomData<T>,
}

unsafe impl<T: Send> Send for RingBufferChannel<T> {}
unsafe impl<T: Send + Sync> Sync for RingBufferChannel<T> {}

impl<T> RingBufferChannel<T> {
    /// 创建带所有权管理的环形队列通道。
    pub fn new(capacity: usize) -> Self {
        let ctrl = unsafe { roplat_rb_new(capacity, std::mem::size_of::<T>()) };
        assert!(!ctrl.is_null(), "failed to allocate ring buffer");
        Self { ctrl, _marker: std::marker::PhantomData }
    }

    /// 创建写端句柄。
    pub fn writer(&self) -> RingBufferWriter<T> {
        unsafe { RingBufferWriter::new(self.ctrl) }
    }

    /// 创建读端句柄。
    pub fn reader(&self) -> RingBufferReader<T> {
        unsafe { RingBufferReader::new(self.ctrl) }
    }

    /// 返回缓冲区容量。
    pub fn capacity(&self) -> usize {
        unsafe { roplat_rb_capacity(self.ctrl) }
    }
}

impl<T> Drop for RingBufferChannel<T> {
    fn drop(&mut self) {
        unsafe { roplat_rb_destroy(self.ctrl) }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_basic_push_pop() {
        let (mut writer, mut reader) = create_ring_buffer::<i32>(4);

        assert!(reader.try_pop().is_none());

        assert!(writer.try_push(&10));
        assert!(writer.try_push(&20));
        assert!(writer.try_push(&30));

        assert_eq!(reader.try_pop(), Some(10));
        assert_eq!(reader.try_pop(), Some(20));
        assert_eq!(reader.try_pop(), Some(30));
        assert!(reader.try_pop().is_none());
    }

    #[test]
    fn test_full_buffer() {
        let (mut writer, mut reader) = create_ring_buffer::<u64>(2);

        assert!(writer.try_push(&1));
        assert!(writer.try_push(&2));
        assert!(!writer.try_push(&3)); //
        assert_eq!(reader.try_pop(), Some(1));
        assert!(writer.try_push(&3)); // 有空位了
        assert_eq!(reader.try_pop(), Some(2));
        assert_eq!(reader.try_pop(), Some(3));
    }

    #[test]
    fn test_force_push_overwrites() {
        let (mut writer, mut reader) = create_ring_buffer::<i32>(2);

        writer.force_push(&1);
        writer.force_push(&2);
        writer.force_push(&3); // 满时丢弃最旧(1)

        assert_eq!(reader.try_pop(), Some(2));
        assert_eq!(reader.try_pop(), Some(3));
        assert!(reader.try_pop().is_none());
    }

    #[test]
    fn test_len_and_capacity() {
        let (mut writer, mut reader) = create_ring_buffer::<f64>(8);

        assert_eq!(writer.len(), 0);
        assert!(writer.is_empty());
        assert!(!writer.is_full());

        for i in 0..8 {
            writer.try_push(&(i as f64));
        }
        assert_eq!(writer.len(), 8);
        assert!(writer.is_full());

        reader.try_pop();
        assert_eq!(reader.len(), 7);
    }

    #[test]
    #[allow(clippy::approx_constant)] // 3.14 / 2.72 are arbitrary payload values, not PI/e
    fn test_repr_c_struct() {
        #[derive(Clone, Debug, PartialEq)]
        #[repr(C)]
        struct Packet {
            id: u64,
            value: f64,
        }

        let (mut writer, mut reader) = create_ring_buffer::<Packet>(4);

        writer.try_push(&Packet { id: 1, value: 3.14 });
        writer.try_push(&Packet { id: 2, value: 2.72 });

        let p1 = reader.try_pop().unwrap();
        assert_eq!(p1.id, 1);
        assert_eq!(p1.value, 3.14);

        let p2 = reader.try_pop().unwrap();
        assert_eq!(p2.id, 2);
        assert_eq!(p2.value, 2.72);
    }

    #[test]
    fn test_wraparound() {
        let (mut writer, mut reader) = create_ring_buffer::<u32>(3);

        // 填满再清空多次,触发索引回绕
        for round in 0..10 {
            for i in 0..3 {
                assert!(writer.try_push(&(round * 10 + i)));
            }
            for i in 0..3 {
                assert_eq!(reader.try_pop(), Some(round * 10 + i));
            }
        }
    }

    #[test]
    fn test_spsc_threading() {
        use std::thread;

        let (mut writer, mut reader) = create_ring_buffer::<u64>(64);

        let count = 10_000u64;

        let producer = thread::spawn(move || {
            for i in 0..count {
                while !writer.try_push(&i) {
                    std::hint::spin_loop();
                }
            }
        });

        let consumer = thread::spawn(move || {
            let mut received = Vec::with_capacity(count as usize);
            while received.len() < count as usize {
                if let Some(v) = reader.try_pop() {
                    received.push(v);
                } else {
                    std::hint::spin_loop();
                }
            }
            received
        });

        producer.join().unwrap();
        let received = consumer.join().unwrap();

        // 验证有序且完整
        assert_eq!(received.len(), count as usize);
        for (i, &v) in received.iter().enumerate() {
            assert_eq!(v, i as u64);
        }
    }

    #[test]
    fn test_channel_lifecycle() {
        let channel = RingBufferChannel::<i32>::new(4);
        assert_eq!(channel.capacity(), 4);

        let mut writer = channel.writer();
        let mut reader = channel.reader();

        writer.try_push(&42);
        assert_eq!(reader.try_pop(), Some(42));
        // channel drop 时释放控制块
    }
}