Skip to main content

memlink_shm/
pring.rs

1//! Multi-priority ring buffer with three-tier message queuing.
2//! Critical (20%), High (50%), Low (30%) slot distribution with FIFO ordering.
3
4use crate::buffer::{RingBuffer, RingBufferError, SlotId};
5use crate::priority::Priority;
6use std::sync::atomic::{AtomicU64, Ordering};
7
8pub struct PriorityRingBuffer {
9    critical: RingBuffer,
10    high: RingBuffer,
11    low: RingBuffer,
12    critical_count: usize,
13    high_count: usize,
14    low_count: usize,
15    critical_written: AtomicU64,
16    critical_read: AtomicU64,
17    high_written: AtomicU64,
18    high_read: AtomicU64,
19    low_written: AtomicU64,
20    low_read: AtomicU64,
21}
22
23pub type PriorityResult<T> = Result<T, RingBufferError>;
24
25impl PriorityRingBuffer {
26    pub fn new(total_slots: usize) -> Result<Self, RingBufferError> {
27        if total_slots < 3 {
28            return Err(RingBufferError::Full);
29        }
30
31        let critical_count = ((total_slots as f64 * 0.2).ceil() as usize).max(1);
32        let high_count = ((total_slots as f64 * 0.5).ceil() as usize).max(1);
33        let low_count = (total_slots - critical_count - high_count).max(1);
34
35        let critical = RingBuffer::new(critical_count.next_power_of_two())?;
36        let high = RingBuffer::new(high_count.next_power_of_two())?;
37        let low = RingBuffer::new(low_count.next_power_of_two())?;
38
39        Ok(Self {
40            critical,
41            high,
42            low,
43            critical_count,
44            high_count,
45            low_count,
46            critical_written: AtomicU64::new(0),
47            critical_read: AtomicU64::new(0),
48            high_written: AtomicU64::new(0),
49            high_read: AtomicU64::new(0),
50            low_written: AtomicU64::new(0),
51            low_read: AtomicU64::new(0),
52        })
53    }
54
55    pub fn total_slots(&self) -> usize {
56        self.critical_count + self.high_count + self.low_count
57    }
58
59    pub fn slot_counts(&self) -> (usize, usize, usize) {
60        (self.critical_count, self.high_count, self.low_count)
61    }
62
63    pub fn pending_counts(&self) -> (u64, u64, u64) {
64        let crit = self.critical_written.load(Ordering::Acquire)
65            .wrapping_sub(self.critical_read.load(Ordering::Acquire));
66        let high = self.high_written.load(Ordering::Acquire)
67            .wrapping_sub(self.high_read.load(Ordering::Acquire));
68        let low = self.low_written.load(Ordering::Acquire)
69            .wrapping_sub(self.low_read.load(Ordering::Acquire));
70        (crit, high, low)
71    }
72
73    pub fn total_pending(&self) -> u64 {
74        let (crit, high, low) = self.pending_counts();
75        crit + high + low
76    }
77
78    pub fn is_empty(&self) -> bool {
79        self.total_pending() == 0
80    }
81
82    pub fn write(&self, priority: Priority, data: &[u8]) -> PriorityResult<SlotId> {
83        match priority {
84            Priority::Critical => {
85                let pending = self.critical_written.load(Ordering::Acquire)
86                    .wrapping_sub(self.critical_read.load(Ordering::Acquire));
87                if pending >= self.critical_count as u64 {
88                    return Err(RingBufferError::Full);
89                }
90                self.critical.write_slot(crate::buffer::Priority::Normal, data)
91                    .map(|_| {
92                        self.critical_written.fetch_add(1, Ordering::Release);
93                        SlotId { index: pending, seq: pending }
94                    })
95            }
96            Priority::High => {
97                let pending = self.high_written.load(Ordering::Acquire)
98                    .wrapping_sub(self.high_read.load(Ordering::Acquire));
99                if pending >= self.high_count as u64 {
100                    return Err(RingBufferError::Full);
101                }
102                self.high.write_slot(crate::buffer::Priority::Normal, data)
103                    .map(|_| {
104                        self.high_written.fetch_add(1, Ordering::Release);
105                        SlotId { index: pending, seq: pending }
106                    })
107            }
108            Priority::Low => {
109                let pending = self.low_written.load(Ordering::Acquire)
110                    .wrapping_sub(self.low_read.load(Ordering::Acquire));
111                if pending >= self.low_count as u64 {
112                    return Err(RingBufferError::Full);
113                }
114                self.low.write_slot(crate::buffer::Priority::Normal, data)
115                    .map(|_| {
116                        self.low_written.fetch_add(1, Ordering::Release);
117                        SlotId { index: pending, seq: pending }
118                    })
119            }
120        }
121    }
122
123    pub fn read(&self) -> Option<(Priority, Vec<u8>)> {
124        if let Some(data) = self.critical.read_slot() {
125            self.critical_read.fetch_add(1, Ordering::Release);
126            return Some((Priority::Critical, data.1));
127        }
128
129        if let Some(data) = self.high.read_slot() {
130            self.high_read.fetch_add(1, Ordering::Release);
131            return Some((Priority::High, data.1));
132        }
133
134        if let Some(data) = self.low.read_slot() {
135            self.low_read.fetch_add(1, Ordering::Release);
136            return Some((Priority::Low, data.1));
137        }
138
139        None
140    }
141
142    pub fn clear(&self) {
143        self.critical_written.store(0, Ordering::Release);
144        self.critical_read.store(0, Ordering::Release);
145        self.high_written.store(0, Ordering::Release);
146        self.high_read.store(0, Ordering::Release);
147        self.low_written.store(0, Ordering::Release);
148        self.low_read.store(0, Ordering::Release);
149    }
150}