1use super::array_from_fn;
2use crate::buffer::{AtomicStats, Buffer, BufferStats, CACHE_LINE_SIZE};
3use crate::math::Transcendental;
4use core::marker::PhantomData;
5use std::fmt;
6
7#[repr(align(64))]
14pub struct FanOutBuffer<T: Transcendental, const N: usize, const CONSUMERS: usize> {
15 storage: [T; N],
16 version: usize,
17 read_versions: [usize; CONSUMERS],
18 valid: bool,
19 stats: AtomicStats,
20 _phantom: PhantomData<T>,
21}
22
23impl<T: Transcendental, const N: usize, const CONSUMERS: usize> FanOutBuffer<T, N, CONSUMERS> {
24 pub fn new() -> Self {
29 assert!(
30 CONSUMERS > 0,
31 "FanOutBuffer must have at least one consumer"
32 );
33 Self {
34 storage: array_from_fn(|_| T::ZERO),
35 version: 0,
36 read_versions: [0; CONSUMERS],
37 valid: false,
38 stats: AtomicStats::new(),
39 _phantom: PhantomData,
40 }
41 }
42
43 #[inline(always)]
45 pub fn write(&mut self, data: &[T; N]) {
46 self.storage.copy_from_slice(data);
47 self.version += 1;
48 self.valid = true;
49 self.stats.record_write();
50 self.stats.update_peak(1);
51 }
52
53 #[inline(always)]
55 pub fn try_read(&mut self, consumer_id: usize) -> Option<[T; N]> {
56 if consumer_id >= CONSUMERS {
57 return None;
58 }
59 let current_version = self.version;
60 if self.read_versions[consumer_id] == current_version || !self.valid {
61 self.stats.record_underflow();
62 return None;
63 }
64 let mut result = [T::ZERO; N];
65 result.copy_from_slice(&self.storage);
66 self.read_versions[consumer_id] = current_version;
67 self.stats.record_read();
68 Some(result)
69 }
70
71 pub fn has_new_data(&self, consumer_id: usize) -> bool {
73 consumer_id < CONSUMERS && self.version != self.read_versions[consumer_id] && self.valid
74 }
75
76 pub const fn consumer_count(&self) -> usize {
78 CONSUMERS
79 }
80 pub fn current_version(&self) -> usize {
82 self.version
83 }
84 pub fn last_read_version(&self, consumer_id: usize) -> Option<usize> {
86 if consumer_id >= CONSUMERS {
87 None
88 } else {
89 Some(self.read_versions[consumer_id])
90 }
91 }
92
93 pub fn reset(&mut self) {
95 self.valid = false;
96 self.read_versions.fill(0);
97 self.stats.reset();
98 }
99}
100
101impl<T: Transcendental, const N: usize, const CONSUMERS: usize> Buffer<T>
102 for FanOutBuffer<T, N, CONSUMERS>
103{
104 fn capacity(&self) -> usize {
105 N
106 }
107 fn len(&self) -> usize {
108 if self.valid {
109 1
110 } else {
111 0
112 }
113 }
114 fn is_empty(&self) -> bool {
115 !self.valid
116 }
117 fn is_full(&self) -> bool {
118 self.valid
119 }
120 fn as_slice(&self) -> &[T] {
121 &self.storage
122 }
123 fn as_mut_slice(&mut self) -> &mut [T] {
124 &mut self.storage
125 }
126 fn fill(&mut self, value: T) {
127 self.storage.fill(value);
128 }
129 fn copy_from(&mut self, src: &[T]) {
130 let len = src.len().min(N);
131 self.storage[..len].copy_from_slice(&src[..len]);
132 }
133 fn clear(&mut self) {
134 self.reset();
135 }
136 fn stats(&self) -> BufferStats {
137 let mut stats = self.stats.snapshot();
138 stats.fill_level = if self.valid { 1.0 } else { 0.0 };
139 stats
140 }
141 fn reset_stats(&mut self) {
142 self.stats.reset();
143 }
144}
145
146impl<T: Transcendental, const N: usize, const CONSUMERS: usize> Default
147 for FanOutBuffer<T, N, CONSUMERS>
148{
149 fn default() -> Self {
150 Self::new()
151 }
152}
153
154impl<T: Transcendental + fmt::Debug, const N: usize, const CONSUMERS: usize> fmt::Debug
155 for FanOutBuffer<T, N, CONSUMERS>
156{
157 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
158 f.debug_struct("FanOutBuffer")
159 .field("capacity", &N)
160 .field("consumers", &CONSUMERS)
161 .field("has_data", &self.valid)
162 .field("version", &self.version)
163 .field("stats", &self.stats.snapshot())
164 .field("alignment", &CACHE_LINE_SIZE)
165 .finish()
166 }
167}
168
169#[repr(align(64))]
176pub struct FanInBuffer<T: Transcendental, const N: usize, const PRODUCERS: usize> {
177 storage: [[T; N]; PRODUCERS],
178 valid: [bool; PRODUCERS],
179 write_seq: [usize; PRODUCERS],
180 read_seq: usize,
181 stats: AtomicStats,
182 _phantom: PhantomData<T>,
183}
184
185impl<T: Transcendental, const N: usize, const PRODUCERS: usize> FanInBuffer<T, N, PRODUCERS> {
186 pub fn new() -> Self {
191 assert!(PRODUCERS > 0, "FanInBuffer must have at least one producer");
192 Self {
193 storage: array_from_fn(|_| [T::ZERO; N]),
194 valid: [false; PRODUCERS],
195 write_seq: [0; PRODUCERS],
196 read_seq: 0,
197 stats: AtomicStats::new(),
198 _phantom: PhantomData,
199 }
200 }
201
202 #[inline(always)]
204 pub fn write(&mut self, producer_id: usize, data: &[T; N]) {
205 if producer_id >= PRODUCERS {
206 return;
207 }
208 self.storage[producer_id].copy_from_slice(data);
209 self.valid[producer_id] = true;
210 self.write_seq[producer_id] += 1;
211 self.stats.record_write();
212 }
213
214 #[inline(always)]
216 pub fn try_read(&mut self) -> Option<[T; N]> {
217 let mut result = [T::ZERO; N];
218 let mut any_valid = false;
219 let mut active_producers = 0;
220 let current_seq = self.read_seq;
221 for producer in 0..PRODUCERS {
222 if self.valid[producer] && self.write_seq[producer] > current_seq {
223 any_valid = true;
224 active_producers += 1;
225 for (res, &val) in result.iter_mut().zip(self.storage[producer].iter()) {
226 *res += val;
227 }
228 }
229 }
230 if any_valid {
231 self.read_seq += 1;
232 self.stats.record_read();
233 self.stats.update_peak(active_producers);
234 Some(result)
235 } else {
236 self.stats.record_underflow();
237 None
238 }
239 }
240
241 pub const fn producer_count(&self) -> usize {
243 PRODUCERS
244 }
245
246 pub fn producer_has_data(&self, producer_id: usize) -> bool {
248 if producer_id >= PRODUCERS {
249 return false;
250 }
251 self.write_seq[producer_id] > self.read_seq && self.valid[producer_id]
252 }
253
254 pub fn read_seq(&self) -> usize {
256 self.read_seq
257 }
258 pub fn write_seq(&self, producer_id: usize) -> Option<usize> {
260 if producer_id >= PRODUCERS {
261 None
262 } else {
263 Some(self.write_seq[producer_id])
264 }
265 }
266
267 pub fn reset(&mut self) {
269 self.valid.fill(false);
270 self.write_seq.fill(0);
271 self.read_seq = 0;
272 self.stats.reset();
273 }
274
275 pub fn clear_producer(&mut self, producer_id: usize) {
277 if producer_id < PRODUCERS {
278 self.valid[producer_id] = false;
279 self.write_seq[producer_id] = 0;
280 }
281 }
282}
283
284impl<T: Transcendental, const N: usize, const PRODUCERS: usize> Buffer<T>
285 for FanInBuffer<T, N, PRODUCERS>
286{
287 fn capacity(&self) -> usize {
288 N * PRODUCERS
289 }
290 fn len(&self) -> usize {
291 let mut count = 0;
292 for producer in 0..PRODUCERS {
293 if self.write_seq[producer] > self.read_seq && self.valid[producer] {
294 count += 1;
295 }
296 }
297 count
298 }
299 fn is_empty(&self) -> bool {
300 self.len() == 0
301 }
302 fn is_full(&self) -> bool {
303 self.len() == PRODUCERS
304 }
305 fn as_slice(&self) -> &[T] {
306 &self.storage[0]
307 }
308 fn as_mut_slice(&mut self) -> &mut [T] {
309 &mut self.storage[0]
310 }
311 fn fill(&mut self, value: T) {
312 for p in 0..PRODUCERS {
313 self.storage[p].fill(value);
314 }
315 }
316 fn copy_from(&mut self, src: &[T]) {
317 if PRODUCERS > 0 {
318 let len = src.len().min(N);
319 self.storage[0][..len].copy_from_slice(&src[..len]);
320 }
321 }
322 fn clear(&mut self) {
323 self.reset();
324 }
325 fn stats(&self) -> BufferStats {
326 let mut stats = self.stats.snapshot();
327 stats.fill_level = self.len() as f32 / PRODUCERS as f32;
328 stats
329 }
330 fn reset_stats(&mut self) {
331 self.stats.reset();
332 }
333}
334
335impl<T: Transcendental, const N: usize, const PRODUCERS: usize> Default
336 for FanInBuffer<T, N, PRODUCERS>
337{
338 fn default() -> Self {
339 Self::new()
340 }
341}
342
343impl<T: Transcendental + fmt::Debug, const N: usize, const PRODUCERS: usize> fmt::Debug
344 for FanInBuffer<T, N, PRODUCERS>
345{
346 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
347 let active = self.valid.iter().filter(|v| **v).count();
348 f.debug_struct("FanInBuffer")
349 .field("capacity", &(N * PRODUCERS))
350 .field("producers", &PRODUCERS)
351 .field("active_producers", &active)
352 .field("len", &self.len())
353 .field("read_seq", &self.read_seq)
354 .field("stats", &self.stats.snapshot())
355 .field("alignment", &CACHE_LINE_SIZE)
356 .finish()
357 }
358}
359
360#[cfg(test)]
361mod tests {
362 use super::*;
363
364 #[test]
365 fn test_fan_out_buffer_basic() {
366 let mut buffer = FanOutBuffer::<f32, 64, 3>::new();
367 let data = [42.0; 64];
368 buffer.write(&data);
369 for i in 0..3 {
370 let read = buffer.try_read(i).unwrap();
371 assert_eq!(read[0], 42.0);
372 }
373 }
374
375 #[test]
376 fn test_fan_in_buffer_basic() {
377 let mut buffer = FanInBuffer::<f32, 64, 2>::new();
378 buffer.write(0, &[1.0; 64]);
379 buffer.write(1, &[2.0; 64]);
380 let mixed = buffer.try_read().unwrap();
381 assert_eq!(mixed[0], 3.0);
382 }
383}