1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
use anyhow::Result;
use std::{
ptr,
sync::atomic::{AtomicUsize, Ordering},
};
use crate::errors::BondiError;
const MAX_CONSUMERS: usize = 100;
#[derive(Debug)]
pub struct Ring<T> {
capacity: usize,
buffer: Vec<T>,
writer_idx: AtomicUsize,
consumer_idx: Vec<AtomicUsize>,
num_consumers: AtomicUsize,
}
impl<T: Clone + std::fmt::Debug> Ring<T> {
pub fn new(capacity: usize) -> Self {
Ring {
capacity,
buffer: Vec::with_capacity(capacity),
writer_idx: AtomicUsize::new(0),
consumer_idx: (0..MAX_CONSUMERS)
.into_iter()
.map(|_| AtomicUsize::new(0))
.collect(),
num_consumers: AtomicUsize::new(0),
}
}
pub fn new_consumer(&self) -> Result<usize> {
if self.num_consumers.load(Ordering::SeqCst) == MAX_CONSUMERS {
return Err(BondiError::NoReaderAvailable.into());
}
let num_consumers = self.num_consumers.fetch_add(1, Ordering::Relaxed);
Ok(num_consumers)
}
pub fn get_slowest_reader(&self) -> Result<usize> {
if self.num_consumers.load(Ordering::Relaxed) == 0 {
return Err(BondiError::NoReaderAvailable.into());
}
Ok(self
.consumer_idx
.iter()
.take(self.num_consumers.load(Ordering::Relaxed))
.map(|x| x.load(Ordering::Relaxed))
.min()
.unwrap_or(0))
}
pub fn insert(&self, item: T) {
if self.num_consumers.load(Ordering::Relaxed) > 0 {
let mut slowest_reader = self.get_slowest_reader().unwrap();
let mut writer_idx = self.writer_idx.load(Ordering::Relaxed);
while writer_idx > 0
&& writer_idx % self.capacity == slowest_reader
&& writer_idx > slowest_reader
{
std::thread::park_timeout(std::time::Duration::from_micros(1));
writer_idx = self.writer_idx.load(Ordering::Relaxed);
slowest_reader = self.get_slowest_reader().unwrap();
}
}
let writer_idx = self.writer_idx.load(Ordering::Relaxed) % self.capacity;
unsafe {
let buff = (self.buffer.as_ptr()).offset(writer_idx as isize) as _;
ptr::write(buff, item);
};
self.writer_idx.fetch_add(1, Ordering::SeqCst);
}
pub fn get(&self, idx: usize) -> T {
let consumer_idx = self.consumer_idx[idx].load(Ordering::Relaxed);
while self.writer_idx.load(Ordering::Relaxed) == consumer_idx {
std::thread::park_timeout(std::time::Duration::from_micros(1));
}
let result = unsafe {
let buff = (self.buffer.as_ptr()).offset((consumer_idx % self.capacity) as isize);
ptr::read(buff).clone()
};
self.consumer_idx[idx].fetch_add(1, Ordering::Relaxed);
result
}
}