mspc_ipc 0.1.3

Multi producer single consumer ring buffer for inter-process communication
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Algorithm:
//
// Setup: the ring buffer is made of `num_entries` (must be a power of 2), each entry being `entry_size` bytes
// (rounded up to 8 bytes). We also hold `num_entries` control words (AtomicU64). Everything is initialized
// to zero.
//
// We hold a header with immutable info (first cache line), followed by two atomic indices (ridx and widx)
// in a separate cache line. Both indices are in the same cache line because both the consumer and the producer
// access them. The indices are monotonically-increasing, being masked by `(num_entries-1)` when accessing
// the arrays.
//
// No-std: the code itself uses std, for things like `anyhow`, but these are all done in the setup phase,
// when opening and mmaping files. The algorithm itself does not require any runtime or OS support like futex.
//
// Producer: when pushing, the producer checks for room (widx<ridx+num_entries), and if there is room,
// it attempts to advance widx by 1, using a CAS. If it fails, it tries again. Upon success, it owns slot the slot,
// and writes the control word using a CAS. The contol word has a pid part that is unique to each producer,
// and the length of the data (which must be <= entry_size, and also fit in 32 bits). This CAS writing may fail --
// which means the consumer has given up on us, in which case some one else might be holding the slot. The
// pid ensures we'd be aware of it and bail out. If we succeed in writing the control word, we proceed to
// writing the entry's data (which may take time), followed by rewriting the control word, this time setting the
// FINISHED bit (only if it matches the expected value).
//
// Consumer: when popping, the consumer checks if there are entries to read (ridx<widx). If there are, it reads
// the control word and checks the finished bit. If it's set, it's safe to read the entry, clear the control word
// and advance ridx. If the finished bit is not set, it means either the producer died in the middle of writing
// the entry, or the consumer has ran into a producer which that's still busy writing the entry. In this case,
// the consumer should stall a little and retry (either using `sched_yield`/`nanosleep` or spinning), after which
// either the entry became finished, or the consumer gives up on this entry by clearing the control word and
// advancing ridx.
//
// The only open issue is a "sleepy producer" that started writing a large entry, hanged until the consumer gave
// up, and then woke up and continued the memcpy. In this case, it will corrupt and entry that's already taken by
// some other producer. To solve that, the stall function takes the producer's pid. It is allowed to wait for
// any duration or time (returning `Retry`), or to skip the entry while leaving it occupied (`Skip`), as well as
// clearing the entry (`Reclaim`). Note that `Reclaim` is only safe to use if the producer is dead (or if you
// can ensure it will never wake up)
//
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

use anyhow::{ensure, Result};
use memmap::{MmapMut, MmapOptions};
use std::sync::atomic::{
    AtomicU64,
    Ordering::{Relaxed, Release, SeqCst},
};
use std::{fs::OpenOptions, path::Path, slice};

const MAGIC: u32 = 0x2d06_9f03;
const VERSION: u32 = 1;
const PID_MASK: u32 = 0x7fff_ffff;

fn align(n: u64, alignment: u64) -> u64 {
    ((n + alignment - 1) / alignment) * alignment
}

#[repr(C, align(128))]
struct RingBufParams {
    magic: u32,
    version: u32,
    entry_size: u64,
    num_entries: u64,
    control_offset: u64,
    entries_offset: u64,
}

#[repr(C, align(128))]
struct RingBufHeader {
    // first cache line
    params: RingBufParams,
    // second cache line
    read_idx: AtomicU64,
    write_idx: AtomicU64,
}

// +----------+-----------+-------------+
// | finished |    pid    |     len     |
// |    (1)   |    (31)   |     (32)    |
// +----------+-----------+-------------+
struct ControlWord(u64);

impl ControlWord {
    fn new(pid: u32, len: u32) -> Self {
        Self(((pid as u64) << 32) | (len as u64))
    }
    fn load(atomic: &AtomicU64) -> Self {
        Self(atomic.load(Relaxed))
    }
    fn claim(&self, atomic: &AtomicU64) -> bool {
        atomic.compare_exchange(0, self.0, SeqCst, Relaxed).is_ok()
    }

    fn len(&self) -> usize {
        (self.0 as u32) as usize
    }
    fn pid(&self) -> u32 {
        ((self.0 >> 32) as u32) & PID_MASK
    }
    fn is_finished(&self) -> bool {
        self.0 >> 63 != 0
    }
    fn mark_finished(&self, atomic: &AtomicU64) -> bool {
        atomic
            .compare_exchange(self.0, (1 << 63) | self.0, SeqCst, Relaxed)
            .is_ok()
    }
}

struct RingBuf {
    ptr: *const u8,
    control_ptr: *const AtomicU64,
    entries_ptr: *const u8,
    num_entries: u64,
    entry_size: u64,
    _mmap: Option<MmapMut>,
}

impl RingBuf {
    #[inline]
    fn header(&self) -> &RingBufHeader {
        unsafe { &*(self.ptr as *const RingBufHeader) }
    }

    #[inline]
    fn control_word(&self, idx: u64) -> &AtomicU64 {
        unsafe {
            &*self
                .control_ptr
                .add((idx & (self.num_entries - 1)) as usize)
        }
    }

    #[inline]
    fn entry(&self, idx: u64) -> &[u8] {
        let mask = self.num_entries - 1;
        unsafe {
            slice::from_raw_parts(
                self.entries_ptr
                    .byte_add(((idx & mask) * self.entry_size) as usize),
                self.entry_size as usize,
            )
        }
    }

    #[inline]
    fn entry_mut(&self, idx: u64) -> &mut [u8] {
        let mask = self.num_entries - 1;
        unsafe {
            slice::from_raw_parts_mut(
                self.entries_ptr
                    .byte_add(((idx & mask) * self.entry_size) as usize) as *mut _,
                self.entry_size as usize,
            )
        }
    }
}

pub struct SingleConsumer {
    ring: RingBuf,
}

pub enum StallResult {
    Retry,                // stall (re-check the status of the entry)
    SkipAndKeepTombstone, // skip this entry, it will be "tombstoned"
    SkipAndReclaim,       // reclaim (clear) this entry (allow it to be reused) --
                          // ONLY DO THIS IF THE PRODUCER IS SURELY DEAD
}

impl SingleConsumer {
    fn _open_or_create(
        path: impl AsRef<Path>,
        entry_size: u64,
        num_entries: u64,
        truncate: bool,
    ) -> Result<Self> {
        ensure!(
            num_entries.is_power_of_two(),
            "num_entries must be a power of 2, got {num_entries}"
        );
        ensure!(
            entry_size <= u32::MAX as u64,
            "entry_size must fit in 32 bits, got {entry_size}"
        );

        let pgsz = unsafe { libc::sysconf(libc::_SC_PAGESIZE) as u64 };
        ensure!(pgsz >= 1, "_SC_PAGESIZE failed");

        let entry_size = align(entry_size, size_of::<u64>() as u64);
        let control_offset = size_of::<RingBufHeader>() as u64;
        let entries_offset = align(
            control_offset + (size_of::<ControlWord>() as u64) * num_entries,
            pgsz,
        );
        let sz = align(entries_offset + entry_size * num_entries, pgsz);

        let file = OpenOptions::new()
            .read(true)
            .write(true)
            .create(true)
            .truncate(truncate)
            .open(path)?;
        file.set_len(sz as u64)?;

        let mut mmap = unsafe { MmapOptions::new().map_mut(&file) }?;
        let header = unsafe { &mut *(mmap.as_mut_ptr() as *mut RingBufHeader) };
        if header.params.magic == 0 {
            header.params.magic = MAGIC;
            header.params.version = VERSION;
            header.params.entry_size = entry_size;
            header.params.num_entries = num_entries;
            header.params.control_offset = control_offset;
            header.params.entries_offset = entries_offset;
            header.read_idx = AtomicU64::new(0);
            header.write_idx = AtomicU64::new(0);
        }
        ensure!(
            header.params.magic == MAGIC
                && header.params.version == VERSION
                && header.params.entry_size == entry_size
        );

        Ok(Self {
            ring: RingBuf {
                ptr: mmap.as_ptr(),
                num_entries,
                entry_size,
                control_ptr: unsafe {
                    mmap.as_ptr().byte_add(control_offset as usize) as *const AtomicU64
                },
                entries_ptr: unsafe { mmap.as_ptr().byte_add(entries_offset as usize) },
                _mmap: Some(mmap),
            },
        })
    }

    pub fn create(path: impl AsRef<Path>, entry_size: u64, num_entries: u64) -> Result<Self> {
        Self::_open_or_create(path, entry_size, num_entries, true)
    }

    pub fn open_or_create(
        path: impl AsRef<Path>,
        entry_size: u64,
        num_entries: u64,
    ) -> Result<Self> {
        Self::_open_or_create(path, entry_size, num_entries, false)
    }

    pub fn from_buffer(
        buf: &mut [u8],
        entry_size: u64,
        num_entries: u64,
        clear: bool,
    ) -> Result<Self> {
        ensure!(
            num_entries.is_power_of_two(),
            "num_entries must be a power of 2, got {num_entries}"
        );
        ensure!(
            entry_size <= u32::MAX as u64,
            "entry_size must fit in 32 bits, got {entry_size}"
        );

        let pgsz = unsafe { libc::sysconf(libc::_SC_PAGESIZE) as u64 };
        ensure!(pgsz >= 1, "_SC_PAGESIZE failed");

        let entry_size = align(entry_size, size_of::<u64>() as u64);
        let control_offset = size_of::<RingBufHeader>() as u64;
        let entries_offset = align(
            control_offset + (size_of::<ControlWord>() as u64) * num_entries,
            pgsz,
        );

        if clear {
            buf.fill(0);
        }

        let header = unsafe { &mut *(buf.as_mut_ptr() as *mut RingBufHeader) };
        if header.params.magic == 0 {
            header.params.magic = MAGIC;
            header.params.version = VERSION;
            header.params.entry_size = entry_size;
            header.params.num_entries = num_entries;
            header.params.control_offset = control_offset;
            header.params.entries_offset = entries_offset;
            header.read_idx = AtomicU64::new(0);
            header.write_idx = AtomicU64::new(0);
        }
        ensure!(
            header.params.magic == MAGIC
                && header.params.version == VERSION
                && header.params.entry_size == entry_size
        );

        Ok(Self {
            ring: RingBuf {
                ptr: buf.as_ptr(),
                num_entries,
                entry_size,
                control_ptr: unsafe {
                    buf.as_ptr().byte_add(control_offset as usize) as *const AtomicU64
                },
                entries_ptr: unsafe { buf.as_ptr().byte_add(entries_offset as usize) },
                _mmap: None,
            },
        })
    }

    pub fn pop(&self, buf: &mut [u8], mut stall: impl FnMut(u32, usize) -> StallResult) -> bool {
        debug_assert!(buf.len() >= self.ring.entry_size as usize);
        let header = self.ring.header();
        let mut attempt = 0;
        loop {
            let ridx = header.read_idx.load(Relaxed);
            let widx = header.write_idx.load(Relaxed);
            debug_assert!(ridx <= widx, "ridx={ridx} widx={widx}");
            if ridx == widx {
                return false;
            }
            let ctrl = self.ring.control_word(ridx);
            let ctrl_word = ControlWord::load(ctrl);

            if !ctrl_word.is_finished() {
                match stall(ctrl_word.pid(), attempt) {
                    StallResult::Retry => { // keep waiting
                    }
                    StallResult::SkipAndKeepTombstone => {
                        // leave this entry occupied and move to the next one
                        header.read_idx.fetch_add(1, Release);
                    }
                    StallResult::SkipAndReclaim => {
                        // forcefully clear the entry and move to the next one -- should only be done if the caller
                        // is sure the producer is dead, otherwise the producer might wake up in the future and
                        // corrupt the entry's buffer (the memcpy part is not atomic)
                        ctrl.store(0, SeqCst);
                        header.read_idx.fetch_add(1, Release);
                    }
                }
                attempt += 1;
                continue;
            }

            let entry = self.ring.entry(ridx);
            let len = ctrl_word.len();
            debug_assert!(len <= entry.len(), "len={len} entry_size={}", entry.len());
            buf[..len].copy_from_slice(&entry[..len]);

            ctrl.store(0, SeqCst);
            header.read_idx.fetch_add(1, Relaxed);
            return true;
        }
    }
}

pub struct MultiProducer {
    ring: RingBuf,
    tid: u32,
}

impl MultiProducer {
    fn gettid() -> Result<u32> {
        let tid = unsafe { libc::gettid() };
        ensure!(tid > 0, "gettid failed");
        let tid = tid as u32;
        ensure!(
            tid & PID_MASK == tid,
            "PIDs are expected to have only 24 meaningful bits"
        );
        Ok(tid & PID_MASK)
    }

    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
        let file = OpenOptions::new().read(true).write(true).open(path)?;
        let mmap = unsafe { MmapOptions::new().map_mut(&file) }?;
        let header = unsafe { &*(mmap.as_ptr() as *const RingBufHeader) };
        ensure!(header.params.magic == MAGIC && header.params.version == VERSION);

        Ok(Self {
            ring: RingBuf {
                ptr: mmap.as_ptr(),
                control_ptr: unsafe {
                    mmap.as_ptr()
                        .byte_add(header.params.control_offset as usize)
                        as *const AtomicU64
                },
                entries_ptr: unsafe {
                    mmap.as_ptr()
                        .byte_add(header.params.entries_offset as usize)
                },
                num_entries: header.params.num_entries,
                entry_size: header.params.entry_size,
                _mmap: Some(mmap),
            },
            tid: Self::gettid()?,
        })
    }

    pub fn from_buffer(buf: &mut [u8]) -> Result<Self> {
        let header = unsafe { &*(buf.as_ptr() as *const RingBufHeader) };
        ensure!(header.params.magic == MAGIC && header.params.version == VERSION);

        Ok(Self {
            ring: RingBuf {
                ptr: buf.as_ptr(),
                control_ptr: unsafe {
                    buf.as_ptr().byte_add(header.params.control_offset as usize) as *const AtomicU64
                },
                entries_ptr: unsafe {
                    buf.as_ptr().byte_add(header.params.entries_offset as usize)
                },
                num_entries: header.params.num_entries,
                entry_size: header.params.entry_size,
                _mmap: None,
            },
            tid: Self::gettid()?,
        })
    }

    pub fn push(&self, data: &[u8]) -> bool {
        debug_assert!(!data.is_empty() && data.len() <= self.ring.entry_size as usize);
        let header = self.ring.header();
        loop {
            let ridx = header.read_idx.load(Relaxed);
            let widx = header.write_idx.load(Relaxed);
            if widx >= ridx + self.ring.num_entries {
                // no room
                return false;
            }
            if header
                .write_idx
                .compare_exchange(widx, widx + 1, SeqCst, Relaxed)
                .is_err()
            {
                continue;
            }

            let ctrl = self.ring.control_word(widx);
            let ctrl_word = ControlWord::new(self.tid, data.len() as u32);
            if !ctrl_word.claim(ctrl) {
                // this entry is taken (due to another process still holding it), skip for now
                continue;
            }
            self.ring.entry_mut(widx)[..data.len()].copy_from_slice(data);

            if !ctrl_word.mark_finished(ctrl) {
                // we may have corrupted an entry now belonging to another producer during the memcpy above
                // all we can do is signal this case by overwriting the control word to `FINISHED|0` so the
                // consumer will not read anything from it
                return false;
            }

            return true;
        }
    }
}

#[test]
fn test_ring() -> Result<()> {
    let sc = SingleConsumer::create("/tmp/myring", 8, 128)?;

    let pushes = std::sync::Arc::new(AtomicU64::new(0));
    let attempts = std::sync::Arc::new(AtomicU64::new(0));

    let mut handles = vec![];
    for i in 0..16usize {
        let pushes = pushes.clone();
        let attempts = attempts.clone();
        handles.push(std::thread::spawn(move || {
            let mp = MultiProducer::open("/tmp/myring").unwrap();
            for j in i * 1000..i * 1000 + 100 {
                while !mp.push(&(j.to_ne_bytes()[..])) {
                    attempts.fetch_add(1, SeqCst);
                    std::thread::yield_now();
                }
                pushes.fetch_add(1, SeqCst);
            }
        }));
        std::thread::yield_now();
    }

    let mut res = vec![];
    loop {
        let mut buf = [0u8; 8];
        while sc.pop(&mut buf, |_, _| {
            std::thread::yield_now();
            StallResult::Retry
        }) {
            res.push(unsafe { *(buf.as_ptr() as *const usize) });
        }
        if handles.iter().all(|h| h.is_finished()) {
            break;
        }
    }

    println!("{:?}", res);
    assert_eq!(res.len(), pushes.load(SeqCst) as _);
    println!("{}", attempts.load(SeqCst));

    Ok(())
}