1use crate::buffer::{RingBuffer, RingBufferError, SlotId};
5use crate::priority::Priority;
6use std::sync::atomic::{AtomicU64, Ordering};
7
8pub struct PriorityRingBuffer {
9 critical: RingBuffer,
10 high: RingBuffer,
11 low: RingBuffer,
12 critical_count: usize,
13 high_count: usize,
14 low_count: usize,
15 critical_written: AtomicU64,
16 critical_read: AtomicU64,
17 high_written: AtomicU64,
18 high_read: AtomicU64,
19 low_written: AtomicU64,
20 low_read: AtomicU64,
21}
22
23pub type PriorityResult<T> = Result<T, RingBufferError>;
24
25impl PriorityRingBuffer {
26 pub fn new(total_slots: usize) -> Result<Self, RingBufferError> {
27 if total_slots < 3 {
28 return Err(RingBufferError::Full);
29 }
30
31 let critical_count = ((total_slots as f64 * 0.2).ceil() as usize).max(1);
32 let high_count = ((total_slots as f64 * 0.5).ceil() as usize).max(1);
33 let low_count = (total_slots - critical_count - high_count).max(1);
34
35 let critical = RingBuffer::new(critical_count.next_power_of_two())?;
36 let high = RingBuffer::new(high_count.next_power_of_two())?;
37 let low = RingBuffer::new(low_count.next_power_of_two())?;
38
39 Ok(Self {
40 critical,
41 high,
42 low,
43 critical_count,
44 high_count,
45 low_count,
46 critical_written: AtomicU64::new(0),
47 critical_read: AtomicU64::new(0),
48 high_written: AtomicU64::new(0),
49 high_read: AtomicU64::new(0),
50 low_written: AtomicU64::new(0),
51 low_read: AtomicU64::new(0),
52 })
53 }
54
55 pub fn total_slots(&self) -> usize {
56 self.critical_count + self.high_count + self.low_count
57 }
58
59 pub fn slot_counts(&self) -> (usize, usize, usize) {
60 (self.critical_count, self.high_count, self.low_count)
61 }
62
63 pub fn pending_counts(&self) -> (u64, u64, u64) {
64 let crit = self.critical_written.load(Ordering::Acquire)
65 .wrapping_sub(self.critical_read.load(Ordering::Acquire));
66 let high = self.high_written.load(Ordering::Acquire)
67 .wrapping_sub(self.high_read.load(Ordering::Acquire));
68 let low = self.low_written.load(Ordering::Acquire)
69 .wrapping_sub(self.low_read.load(Ordering::Acquire));
70 (crit, high, low)
71 }
72
73 pub fn total_pending(&self) -> u64 {
74 let (crit, high, low) = self.pending_counts();
75 crit + high + low
76 }
77
78 pub fn is_empty(&self) -> bool {
79 self.total_pending() == 0
80 }
81
82 pub fn write(&self, priority: Priority, data: &[u8]) -> PriorityResult<SlotId> {
83 match priority {
84 Priority::Critical => {
85 let pending = self.critical_written.load(Ordering::Acquire)
86 .wrapping_sub(self.critical_read.load(Ordering::Acquire));
87 if pending >= self.critical_count as u64 {
88 return Err(RingBufferError::Full);
89 }
90 self.critical.write_slot(crate::buffer::Priority::Normal, data)
91 .map(|_| {
92 self.critical_written.fetch_add(1, Ordering::Release);
93 SlotId { index: pending, seq: pending }
94 })
95 }
96 Priority::High => {
97 let pending = self.high_written.load(Ordering::Acquire)
98 .wrapping_sub(self.high_read.load(Ordering::Acquire));
99 if pending >= self.high_count as u64 {
100 return Err(RingBufferError::Full);
101 }
102 self.high.write_slot(crate::buffer::Priority::Normal, data)
103 .map(|_| {
104 self.high_written.fetch_add(1, Ordering::Release);
105 SlotId { index: pending, seq: pending }
106 })
107 }
108 Priority::Low => {
109 let pending = self.low_written.load(Ordering::Acquire)
110 .wrapping_sub(self.low_read.load(Ordering::Acquire));
111 if pending >= self.low_count as u64 {
112 return Err(RingBufferError::Full);
113 }
114 self.low.write_slot(crate::buffer::Priority::Normal, data)
115 .map(|_| {
116 self.low_written.fetch_add(1, Ordering::Release);
117 SlotId { index: pending, seq: pending }
118 })
119 }
120 }
121 }
122
123 pub fn read(&self) -> Option<(Priority, Vec<u8>)> {
124 if let Some(data) = self.critical.read_slot() {
125 self.critical_read.fetch_add(1, Ordering::Release);
126 return Some((Priority::Critical, data.1));
127 }
128
129 if let Some(data) = self.high.read_slot() {
130 self.high_read.fetch_add(1, Ordering::Release);
131 return Some((Priority::High, data.1));
132 }
133
134 if let Some(data) = self.low.read_slot() {
135 self.low_read.fetch_add(1, Ordering::Release);
136 return Some((Priority::Low, data.1));
137 }
138
139 None
140 }
141
142 pub fn clear(&self) {
143 self.critical_written.store(0, Ordering::Release);
144 self.critical_read.store(0, Ordering::Release);
145 self.high_written.store(0, Ordering::Release);
146 self.high_read.store(0, Ordering::Release);
147 self.low_written.store(0, Ordering::Release);
148 self.low_read.store(0, Ordering::Release);
149 }
150}