1use super::array_from_fn;
2use crate::buffer::{AtomicStats, BufferStats, SignalBuffer, CACHE_LINE_SIZE};
3use crate::math::Transcendental;
4use core::marker::PhantomData;
5use std::fmt;
6
7#[repr(align(64))]
14pub struct FanOutBuffer<T: Transcendental, const N: usize, const CONSUMERS: usize> {
15 storage: [T; N],
16 version: usize,
17 read_versions: [usize; CONSUMERS],
18 valid: bool,
19 stats: AtomicStats,
20 _phantom: PhantomData<T>,
21}
22
23impl<T: Transcendental, const N: usize, const CONSUMERS: usize> FanOutBuffer<T, N, CONSUMERS> {
24 pub fn new() -> Self {
29 assert!(
30 CONSUMERS > 0,
31 "FanOutBuffer must have at least one consumer"
32 );
33 Self {
34 storage: array_from_fn(|_| T::ZERO),
35 version: 0,
36 read_versions: [0; CONSUMERS],
37 valid: false,
38 stats: AtomicStats::new(),
39 _phantom: PhantomData,
40 }
41 }
42
43 #[inline(always)]
45 pub fn write(&mut self, data: &[T; N]) {
46 self.storage.copy_from_slice(data);
47 self.version += 1;
48 self.valid = true;
49 self.stats.record_write();
50 self.stats.update_peak(1);
51 }
52
53 #[inline(always)]
55 pub fn try_read(&mut self, consumer_id: usize) -> Option<[T; N]> {
56 if consumer_id >= CONSUMERS {
57 return None;
58 }
59 let current_version = self.version;
60 if self.read_versions[consumer_id] == current_version || !self.valid {
61 self.stats.record_underflow();
62 return None;
63 }
64 let mut result = [T::ZERO; N];
65 result.copy_from_slice(&self.storage);
66 self.read_versions[consumer_id] = current_version;
67 self.stats.record_read();
68 Some(result)
69 }
70
71 pub fn has_new_data(&self, consumer_id: usize) -> bool {
73 consumer_id < CONSUMERS && self.version != self.read_versions[consumer_id] && self.valid
74 }
75
76 pub const fn consumer_count(&self) -> usize {
78 CONSUMERS
79 }
80 pub fn current_version(&self) -> usize {
82 self.version
83 }
84 pub fn last_read_version(&self, consumer_id: usize) -> Option<usize> {
86 if consumer_id >= CONSUMERS {
87 None
88 } else {
89 Some(self.read_versions[consumer_id])
90 }
91 }
92
93 pub fn reset(&mut self) {
95 self.valid = false;
96 self.read_versions.fill(0);
97 self.stats.reset();
98 }
99}
100
101impl<T: Transcendental, const N: usize, const CONSUMERS: usize> SignalBuffer<T>
102 for FanOutBuffer<T, N, CONSUMERS>
103{
104 fn capacity(&self) -> usize {
105 N
106 }
107 fn len(&self) -> usize {
108 if self.valid {
109 1
110 } else {
111 0
112 }
113 }
114 fn is_empty(&self) -> bool {
115 !self.valid
116 }
117 fn is_full(&self) -> bool {
118 self.valid
119 }
120 fn clear(&mut self) {
121 self.reset();
122 }
123 fn stats(&self) -> BufferStats {
124 let mut stats = self.stats.snapshot();
125 stats.fill_level = if self.valid { 1.0 } else { 0.0 };
126 stats
127 }
128 fn reset_stats(&mut self) {
129 self.stats.reset();
130 }
131}
132
133impl<T: Transcendental, const N: usize, const CONSUMERS: usize> Default
134 for FanOutBuffer<T, N, CONSUMERS>
135{
136 fn default() -> Self {
137 Self::new()
138 }
139}
140
141impl<T: Transcendental + fmt::Debug, const N: usize, const CONSUMERS: usize> fmt::Debug
142 for FanOutBuffer<T, N, CONSUMERS>
143{
144 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
145 f.debug_struct("FanOutBuffer")
146 .field("capacity", &N)
147 .field("consumers", &CONSUMERS)
148 .field("has_data", &self.valid)
149 .field("version", &self.version)
150 .field("stats", &self.stats.snapshot())
151 .field("alignment", &CACHE_LINE_SIZE)
152 .finish()
153 }
154}
155
156#[repr(align(64))]
163pub struct FanInBuffer<T: Transcendental, const N: usize, const PRODUCERS: usize> {
164 storage: [[T; N]; PRODUCERS],
165 valid: [bool; PRODUCERS],
166 write_seq: [usize; PRODUCERS],
167 read_seq: usize,
168 stats: AtomicStats,
169 _phantom: PhantomData<T>,
170}
171
172impl<T: Transcendental, const N: usize, const PRODUCERS: usize> FanInBuffer<T, N, PRODUCERS> {
173 pub fn new() -> Self {
178 assert!(PRODUCERS > 0, "FanInBuffer must have at least one producer");
179 Self {
180 storage: array_from_fn(|_| [T::ZERO; N]),
181 valid: [false; PRODUCERS],
182 write_seq: [0; PRODUCERS],
183 read_seq: 0,
184 stats: AtomicStats::new(),
185 _phantom: PhantomData,
186 }
187 }
188
189 #[inline(always)]
191 pub fn write(&mut self, producer_id: usize, data: &[T; N]) {
192 if producer_id >= PRODUCERS {
193 return;
194 }
195 self.storage[producer_id].copy_from_slice(data);
196 self.valid[producer_id] = true;
197 self.write_seq[producer_id] += 1;
198 self.stats.record_write();
199 }
200
201 #[inline(always)]
203 pub fn try_read(&mut self) -> Option<[T; N]> {
204 let mut result = [T::ZERO; N];
205 let mut any_valid = false;
206 let mut active_producers = 0;
207 let current_seq = self.read_seq;
208 for producer in 0..PRODUCERS {
209 if self.valid[producer] && self.write_seq[producer] > current_seq {
210 any_valid = true;
211 active_producers += 1;
212 for (res, &val) in result.iter_mut().zip(self.storage[producer].iter()) {
213 *res += val;
214 }
215 }
216 }
217 if any_valid {
218 self.read_seq += 1;
219 self.stats.record_read();
220 self.stats.update_peak(active_producers);
221 Some(result)
222 } else {
223 self.stats.record_underflow();
224 None
225 }
226 }
227
228 pub const fn producer_count(&self) -> usize {
230 PRODUCERS
231 }
232
233 pub fn producer_has_data(&self, producer_id: usize) -> bool {
235 if producer_id >= PRODUCERS {
236 return false;
237 }
238 self.write_seq[producer_id] > self.read_seq && self.valid[producer_id]
239 }
240
241 pub fn read_seq(&self) -> usize {
243 self.read_seq
244 }
245 pub fn write_seq(&self, producer_id: usize) -> Option<usize> {
247 if producer_id >= PRODUCERS {
248 None
249 } else {
250 Some(self.write_seq[producer_id])
251 }
252 }
253
254 pub fn reset(&mut self) {
256 self.valid.fill(false);
257 self.write_seq.fill(0);
258 self.read_seq = 0;
259 self.stats.reset();
260 }
261
262 pub fn clear_producer(&mut self, producer_id: usize) {
264 if producer_id < PRODUCERS {
265 self.valid[producer_id] = false;
266 self.write_seq[producer_id] = 0;
267 }
268 }
269}
270
271impl<T: Transcendental, const N: usize, const PRODUCERS: usize> SignalBuffer<T>
272 for FanInBuffer<T, N, PRODUCERS>
273{
274 fn capacity(&self) -> usize {
275 N * PRODUCERS
276 }
277 fn len(&self) -> usize {
278 let mut count = 0;
279 for producer in 0..PRODUCERS {
280 if self.write_seq[producer] > self.read_seq && self.valid[producer] {
281 count += 1;
282 }
283 }
284 count
285 }
286 fn is_empty(&self) -> bool {
287 self.len() == 0
288 }
289 fn is_full(&self) -> bool {
290 self.len() == PRODUCERS
291 }
292 fn clear(&mut self) {
293 self.reset();
294 }
295 fn stats(&self) -> BufferStats {
296 let mut stats = self.stats.snapshot();
297 stats.fill_level = self.len() as f32 / PRODUCERS as f32;
298 stats
299 }
300 fn reset_stats(&mut self) {
301 self.stats.reset();
302 }
303}
304
305impl<T: Transcendental, const N: usize, const PRODUCERS: usize> Default
306 for FanInBuffer<T, N, PRODUCERS>
307{
308 fn default() -> Self {
309 Self::new()
310 }
311}
312
313impl<T: Transcendental + fmt::Debug, const N: usize, const PRODUCERS: usize> fmt::Debug
314 for FanInBuffer<T, N, PRODUCERS>
315{
316 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
317 let active = self.valid.iter().filter(|v| **v).count();
318 f.debug_struct("FanInBuffer")
319 .field("capacity", &(N * PRODUCERS))
320 .field("producers", &PRODUCERS)
321 .field("active_producers", &active)
322 .field("len", &self.len())
323 .field("read_seq", &self.read_seq)
324 .field("stats", &self.stats.snapshot())
325 .field("alignment", &CACHE_LINE_SIZE)
326 .finish()
327 }
328}
329
330#[cfg(test)]
331mod tests {
332 use super::*;
333
334 #[test]
335 fn test_fan_out_buffer_basic() {
336 let mut buffer = FanOutBuffer::<f32, 64, 3>::new();
337 let data = [42.0; 64];
338 buffer.write(&data);
339 for i in 0..3 {
340 let read = buffer.try_read(i).unwrap();
341 assert_eq!(read[0], 42.0);
342 }
343 }
344
345 #[test]
346 fn test_fan_in_buffer_basic() {
347 let mut buffer = FanInBuffer::<f32, 64, 2>::new();
348 buffer.write(0, &[1.0; 64]);
349 buffer.write(1, &[2.0; 64]);
350 let mixed = buffer.try_read().unwrap();
351 assert_eq!(mixed[0], 3.0);
352 }
353}