1use std::sync::OnceLock;
2
3use bytemuck::Zeroable;
4
5use crate::ring::{EventSize, PooledEvent, Reader, RingBuffer, Writer};
6
7pub const XS_CAPACITY: usize = 2000; pub const S_CAPACITY: usize = 1000; pub const M_CAPACITY: usize = 300; pub const L_CAPACITY: usize = 60; pub const XL_CAPACITY: usize = 15; static XS_RING: OnceLock<RingBuffer<64, XS_CAPACITY>> = OnceLock::new();
16static S_RING: OnceLock<RingBuffer<256, S_CAPACITY>> = OnceLock::new();
17static M_RING: OnceLock<RingBuffer<1024, M_CAPACITY>> = OnceLock::new();
18static L_RING: OnceLock<RingBuffer<4096, L_CAPACITY>> = OnceLock::new();
19static XL_RING: OnceLock<RingBuffer<16384, XL_CAPACITY>> = OnceLock::new();
20
21#[repr(u8)]
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum PoolId {
24 XS = 0, S = 1, M = 2, L = 3, XL = 4, }
30
31impl PoolId {
32 pub fn from_size(size: EventSize) -> Self {
33 match size {
34 EventSize::XS => PoolId::XS,
35 EventSize::S => PoolId::S,
36 EventSize::M => PoolId::M,
37 EventSize::L => PoolId::L,
38 EventSize::XL => PoolId::XL,
39 EventSize::XXL => panic!("XXL not supported in pools"),
40 }
41 }
42
43 pub fn max_size(&self) -> usize {
44 match self {
45 PoolId::XS => 64,
46 PoolId::S => 256,
47 PoolId::M => 1024,
48 PoolId::L => 4096,
49 PoolId::XL => 16384,
50 }
51 }
52
53 pub fn capacity(&self) -> usize {
54 match self {
55 PoolId::XS => XS_CAPACITY,
56 PoolId::S => S_CAPACITY,
57 PoolId::M => M_CAPACITY,
58 PoolId::L => L_CAPACITY,
59 PoolId::XL => XL_CAPACITY,
60 }
61 }
62}
63
64impl From<PoolId> for u8 {
65 fn from(value: PoolId) -> Self {
66 value as u8
67 }
68}
69
70impl From<u8> for PoolId {
71 fn from(value: u8) -> Self {
72 match value {
73 0 => PoolId::XS,
74 1 => PoolId::S,
75 2 => PoolId::M,
76 3 => PoolId::L,
77 4 => PoolId::XL,
78 _ => panic!("Unknown pool id: {}", value),
79 }
80 }
81}
82
83pub struct RingFactory;
85
86impl RingFactory {
87 pub fn get_writer<const TSHIRT_SIZE: usize, const RING_CAPACITY: usize>(
89 ring: &'static OnceLock<RingBuffer<TSHIRT_SIZE, RING_CAPACITY>>,
90 ) -> Writer<TSHIRT_SIZE, RING_CAPACITY> {
91 Writer::new(ring.get_or_init(RingBuffer::new))
92 }
93
94 pub fn get_reader<const TSHIRT_SIZE: usize, const RING_CAPACITY: usize>(
96 ring: &'static OnceLock<RingBuffer<TSHIRT_SIZE, RING_CAPACITY>>,
97 ) -> Reader<TSHIRT_SIZE, RING_CAPACITY> {
98 Reader::new(ring.get_or_init(RingBuffer::new))
99 }
100}
101
102pub struct EventPoolFactory;
104
105impl EventPoolFactory {
106 pub fn get_xs_writer() -> Writer<64, XS_CAPACITY> {
108 RingFactory::get_writer(&XS_RING)
109 }
110
111 pub fn get_s_writer() -> Writer<256, S_CAPACITY> {
112 RingFactory::get_writer(&S_RING)
113 }
114
115 pub fn get_m_writer() -> Writer<1024, M_CAPACITY> {
116 RingFactory::get_writer(&M_RING)
117 }
118
119 pub fn get_l_writer() -> Writer<4096, L_CAPACITY> {
120 RingFactory::get_writer(&L_RING)
121 }
122
123 pub fn get_xl_writer() -> Writer<16384, XL_CAPACITY> {
124 RingFactory::get_writer(&XL_RING)
125 }
126
127 pub fn get_xs_reader() -> Reader<64, XS_CAPACITY> {
129 RingFactory::get_reader(&XS_RING)
130 }
131
132 pub fn get_s_reader() -> Reader<256, S_CAPACITY> {
133 RingFactory::get_reader(&S_RING)
134 }
135
136 pub fn get_m_reader() -> Reader<1024, M_CAPACITY> {
137 RingFactory::get_reader(&M_RING)
138 }
139
140 pub fn get_l_reader() -> Reader<4096, L_CAPACITY> {
141 RingFactory::get_reader(&L_RING)
142 }
143
144 pub fn get_xl_reader() -> Reader<16384, XL_CAPACITY> {
145 RingFactory::get_reader(&XL_RING)
146 }
147
148 pub fn estimate_size(data_len: usize) -> EventSize {
150 match data_len {
151 0..=64 => EventSize::XS,
152 65..=256 => EventSize::S,
153 257..=1024 => EventSize::M,
154 1025..=4096 => EventSize::L,
155 4097..=16384 => EventSize::XL,
156 _ => EventSize::XXL,
157 }
158 }
159
160 pub fn estimate_pool_id(data_len: usize) -> Option<PoolId> {
161 let size = Self::estimate_size(data_len);
162 if size == EventSize::XXL {
163 None
164 } else {
165 Some(PoolId::from_size(size))
166 }
167 }
168}
169
170pub struct EventUtils;
172
173impl EventUtils {
174 pub fn create_pooled_event<const SIZE: usize>(
176 data: &[u8],
177 event_type: u32,
178 ) -> Result<PooledEvent<SIZE>, EventCreationError> {
179 if data.len() > SIZE {
180 return Err(EventCreationError::DataTooLarge {
181 data_len: data.len(),
182 max_size: SIZE,
183 });
184 }
185
186 let mut pooled = PooledEvent::<SIZE>::zeroed();
187 pooled.data[..data.len()].copy_from_slice(data);
188 pooled.len = data.len() as u32;
189 pooled.event_type = event_type;
190 Ok(pooled)
191 }
192
193 pub fn create_auto_sized_event(data: &[u8], event_type: u32) -> Result<AutoSizedEvent, EventCreationError> {
195 let size = EventPoolFactory::estimate_size(data.len());
196
197 match size {
198 EventSize::XS => {
199 let event = Self::create_pooled_event::<64>(data, event_type)?;
200 Ok(AutoSizedEvent::Xs(event))
201 }
202 EventSize::S => {
203 let event = Self::create_pooled_event::<256>(data, event_type)?;
204 Ok(AutoSizedEvent::S(event))
205 }
206 EventSize::M => {
207 let event = Self::create_pooled_event::<1024>(data, event_type)?;
208 Ok(AutoSizedEvent::M(event))
209 }
210 EventSize::L => {
211 let event = Self::create_pooled_event::<4096>(data, event_type)?;
212 Ok(AutoSizedEvent::L(event))
213 }
214 EventSize::XL => {
215 let event = Self::create_pooled_event::<16384>(data, event_type)?;
216 Ok(AutoSizedEvent::Xl(event))
217 }
218 EventSize::XXL => Err(EventCreationError::DataTooLarge {
219 data_len: data.len(),
220 max_size: 16384,
221 }),
222 }
223 }
224}
225
226#[allow(clippy::large_enum_variant)]
227#[derive(Debug, Clone)]
229pub enum AutoSizedEvent {
230 Xs(PooledEvent<64>),
231 S(PooledEvent<256>),
232 M(PooledEvent<1024>),
233 L(PooledEvent<4096>),
234 Xl(PooledEvent<16384>),
235}
236
237impl AutoSizedEvent {
238 pub fn data(&self) -> &[u8] {
239 match self {
240 AutoSizedEvent::Xs(event) => &event.data[..event.len as usize],
241 AutoSizedEvent::S(event) => &event.data[..event.len as usize],
242 AutoSizedEvent::M(event) => &event.data[..event.len as usize],
243 AutoSizedEvent::L(event) => &event.data[..event.len as usize],
244 AutoSizedEvent::Xl(event) => &event.data[..event.len as usize],
245 }
246 }
247
248 pub fn event_type(&self) -> u32 {
249 match self {
250 AutoSizedEvent::Xs(event) => event.event_type,
251 AutoSizedEvent::S(event) => event.event_type,
252 AutoSizedEvent::M(event) => event.event_type,
253 AutoSizedEvent::L(event) => event.event_type,
254 AutoSizedEvent::Xl(event) => event.event_type,
255 }
256 }
257
258 pub fn len(&self) -> u32 {
259 match self {
260 AutoSizedEvent::Xs(event) => event.len,
261 AutoSizedEvent::S(event) => event.len,
262 AutoSizedEvent::M(event) => event.len,
263 AutoSizedEvent::L(event) => event.len,
264 AutoSizedEvent::Xl(event) => event.len,
265 }
266 }
267
268 pub fn is_empty(&self) -> bool {
269 self.len() == 0
270 }
271
272 pub fn pool_id(&self) -> PoolId {
273 match self {
274 AutoSizedEvent::Xs(_) => PoolId::XS,
275 AutoSizedEvent::S(_) => PoolId::S,
276 AutoSizedEvent::M(_) => PoolId::M,
277 AutoSizedEvent::L(_) => PoolId::L,
278 AutoSizedEvent::Xl(_) => PoolId::XL,
279 }
280 }
281
282 pub fn emit_to_ring(self) -> Result<(), EmitError> {
284 match self {
285 AutoSizedEvent::Xs(event) => {
286 let mut writer = RingFactory::get_writer(&XS_RING);
287 if writer.add(event) {
288 Ok(())
289 } else {
290 Err(EmitError::RingFull(PoolId::XS))
291 }
292 }
293 AutoSizedEvent::S(event) => {
294 let mut writer = RingFactory::get_writer(&S_RING);
295 if writer.add(event) {
296 Ok(())
297 } else {
298 Err(EmitError::RingFull(PoolId::S))
299 }
300 }
301 AutoSizedEvent::M(event) => {
302 let mut writer = RingFactory::get_writer(&M_RING);
303 if writer.add(event) {
304 Ok(())
305 } else {
306 Err(EmitError::RingFull(PoolId::M))
307 }
308 }
309 AutoSizedEvent::L(event) => {
310 let mut writer = RingFactory::get_writer(&L_RING);
311 if writer.add(event) {
312 Ok(())
313 } else {
314 Err(EmitError::RingFull(PoolId::L))
315 }
316 }
317 AutoSizedEvent::Xl(event) => {
318 let mut writer = RingFactory::get_writer(&XL_RING);
319 if writer.add(event) {
320 Ok(())
321 } else {
322 Err(EmitError::RingFull(PoolId::XL))
323 }
324 }
325 }
326 }
327}
328
329#[derive(Debug, thiserror::Error)]
330pub enum EventCreationError {
331 #[error("Data too large: {data_len} bytes > max {max_size} bytes")]
332 DataTooLarge { data_len: usize, max_size: usize },
333}
334
335#[derive(Debug, thiserror::Error)]
336pub enum EmitError {
337 #[error("Ring buffer full for pool {0:?}")]
338 RingFull(PoolId),
339}
340
341#[derive(Debug, Clone)]
343pub struct PoolStats {
344 pub pool_id: PoolId,
345 pub capacity: usize,
346 pub current_backpressure: f32,
347}
348
349impl PoolStats {
350 pub fn collect_xs() -> Self {
351 let reader = RingFactory::get_reader(&XS_RING);
352 Self {
353 pool_id: PoolId::XS,
354 capacity: XS_CAPACITY,
355 current_backpressure: reader.backpressure_ratio(),
356 }
357 }
358
359 pub fn collect_s() -> Self {
360 let reader = RingFactory::get_reader(&S_RING);
361 Self {
362 pool_id: PoolId::S,
363 capacity: S_CAPACITY,
364 current_backpressure: reader.backpressure_ratio(),
365 }
366 }
367
368 pub fn collect_m() -> Self {
369 let reader = RingFactory::get_reader(&M_RING);
370 Self {
371 pool_id: PoolId::M,
372 capacity: M_CAPACITY,
373 current_backpressure: reader.backpressure_ratio(),
374 }
375 }
376
377 pub fn collect_l() -> Self {
378 let reader = RingFactory::get_reader(&L_RING);
379 Self {
380 pool_id: PoolId::L,
381 capacity: L_CAPACITY,
382 current_backpressure: reader.backpressure_ratio(),
383 }
384 }
385
386 pub fn collect_xl() -> Self {
387 let reader = RingFactory::get_reader(&XL_RING);
388 Self {
389 pool_id: PoolId::XL,
390 capacity: XL_CAPACITY,
391 current_backpressure: reader.backpressure_ratio(),
392 }
393 }
394
395 pub fn collect_all() -> Vec<Self> {
396 vec![
397 Self::collect_xs(),
398 Self::collect_s(),
399 Self::collect_m(),
400 Self::collect_l(),
401 Self::collect_xl(),
402 ]
403 }
404}
405
406impl std::fmt::Display for PoolStats {
407 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
408 write!(
409 f,
410 "Pool {:?}: capacity={}, backpressure={:.1}%",
411 self.pool_id,
412 self.capacity,
413 self.current_backpressure * 100.0
414 )
415 }
416}
417
418#[cfg(test)]
419mod pool_tests {
420 use std::{
421 sync::atomic::{AtomicU32, Ordering},
422 thread,
423 time::{Duration, Instant},
424 };
425
426 use super::*;
427
428 static TEST_COUNTER: AtomicU32 = AtomicU32::new(5000000);
430
431 fn next_test_id() -> u32 {
432 TEST_COUNTER.fetch_add(10000, Ordering::Relaxed)
433 }
434
435 #[test]
437 fn test_generic_factory() {
438 let test_id = next_test_id();
439
440 let mut xs_writer = RingFactory::get_writer(&XS_RING);
442 let mut xs_reader = RingFactory::get_reader(&XS_RING);
443
444 let mut m_writer = RingFactory::get_writer(&M_RING);
445 let mut m_reader = RingFactory::get_reader(&M_RING);
446
447 let xs_event = EventUtils::create_pooled_event::<64>(b"xs_generic_test", test_id).unwrap();
449 xs_writer.add(xs_event);
450
451 let m_event = EventUtils::create_pooled_event::<1024>(b"m_generic_test", test_id + 1).unwrap();
453 m_writer.add(m_event);
454
455 if let Some(event) = xs_reader.next() {
457 if event.event_type == test_id {
458 assert_eq!(&event.data[..event.len as usize], b"xs_generic_test");
459 println!("✅ XS generic factory working");
460 }
461 }
462
463 if let Some(event) = m_reader.next() {
464 if event.event_type == test_id + 1 {
465 assert_eq!(&event.data[..event.len as usize], b"m_generic_test");
466 println!("✅ M generic factory working");
467 }
468 }
469 }
470
471 #[test]
473 fn test_api_equivalence() {
474 let test_id = next_test_id();
475
476 let mut generic_writer = RingFactory::get_writer(&S_RING);
478 let mut typed_writer = EventPoolFactory::get_s_writer();
479
480 let event1 = EventUtils::create_pooled_event::<256>(b"generic_api", test_id).unwrap();
482 let event2 = EventUtils::create_pooled_event::<256>(b"typed_api", test_id + 1).unwrap();
483
484 generic_writer.add(event1);
485 typed_writer.add(event2);
486
487 let mut generic_reader = RingFactory::get_reader(&S_RING);
489
490 let mut found_generic = false;
491 let mut found_typed = false;
492
493 for _ in 0..10 {
495 if let Some(event) = generic_reader.next() {
496 if event.event_type == test_id {
497 found_generic = true;
498 }
499 if event.event_type == test_id + 1 {
500 found_typed = true;
501 }
502 }
503 }
504
505 assert!(found_generic || found_typed, "Should find at least one event");
506 println!("✅ API equivalence working");
507 }
508
509 #[test]
511 fn test_performance_comparison() {
512 let test_id = next_test_id();
513 let events_count = 1000;
514
515 let generic_start = Instant::now();
517 {
518 let mut writer = RingFactory::get_writer(&L_RING);
519 for i in 0..events_count {
520 let data = format!("generic_perf_{}", i);
521 let event = EventUtils::create_pooled_event::<4096>(data.as_bytes(), test_id + i).unwrap();
522 writer.add(event);
523 }
524 }
525 let generic_duration = generic_start.elapsed();
526
527 let typed_start = Instant::now();
529 {
530 let mut writer = EventPoolFactory::get_l_writer();
531 for i in 0..events_count {
532 let data = format!("typed_perf_{}", i);
533 let event =
534 EventUtils::create_pooled_event::<4096>(data.as_bytes(), test_id + events_count + i).unwrap();
535 writer.add(event);
536 }
537 }
538 let typed_duration = typed_start.elapsed();
539
540 println!(
541 "Generic API: {:.2}ms for {} events",
542 generic_duration.as_secs_f64() * 1000.0,
543 events_count
544 );
545 println!(
546 "Typed API: {:.2}ms for {} events",
547 typed_duration.as_secs_f64() * 1000.0,
548 events_count
549 );
550
551 assert!(generic_duration.as_millis() < 100, "Generic API too slow");
553 assert!(typed_duration.as_millis() < 100, "Typed API too slow");
554
555 println!("✅ Performance test passed");
556 }
557
558 #[test]
560 fn test_custom_ring_usage() {
561 static CUSTOM_RING: OnceLock<RingBuffer<512, 50>> = OnceLock::new();
563
564 let test_id = next_test_id();
565
566 let mut writer = RingFactory::get_writer(&CUSTOM_RING);
568 let mut reader = RingFactory::get_reader(&CUSTOM_RING);
569
570 let event = EventUtils::create_pooled_event::<512>(b"custom_ring_test", test_id).unwrap();
572 writer.add(event);
573
574 if let Some(event) = reader.next() {
576 if event.event_type == test_id {
577 assert_eq!(&event.data[..event.len as usize], b"custom_ring_test");
578 println!("✅ Custom ring buffer working");
579 }
580 }
581 }
582
583 #[test]
585 fn test_concurrent_generic_access() {
586 let test_id = next_test_id();
587 let events_per_thread = 50;
588
589 let mut handles = vec![];
590
591 for thread_id in 0..4 {
593 let handle = thread::spawn(move || {
594 let mut writer = RingFactory::get_writer(&XL_RING);
595
596 for i in 0..events_per_thread {
597 let data = format!("thread_{}_event_{}", thread_id, i);
598 let event =
599 EventUtils::create_pooled_event::<16384>(data.as_bytes(), test_id + (thread_id * 1000) + i)
600 .unwrap();
601
602 writer.add(event);
603
604 thread::sleep(Duration::from_micros(1));
606 }
607
608 thread_id
609 });
610 handles.push(handle);
611 }
612
613 for handle in handles {
615 let thread_id = handle.join().expect("Thread panicked");
616 println!("Thread {} completed", thread_id);
617 }
618
619 let mut reader = RingFactory::get_reader(&XL_RING);
621 let mut total_found = 0;
622 let expected_min = test_id;
623 let expected_max = test_id + (4 * 1000) + events_per_thread;
624
625 for _ in 0..events_per_thread * 4 + 50 {
626 if let Some(event) = reader.next() {
627 if event.event_type >= expected_min && event.event_type < expected_max {
628 total_found += 1;
629 }
630 }
631 }
632
633 assert!(total_found > 0, "Should find some events from concurrent access");
634 println!(
635 "✅ Concurrent generic access: found {}/{} events",
636 total_found,
637 events_per_thread * 4
638 );
639 }
640
641 #[test]
643 fn test_pool_stats_generic() {
644 let test_id = next_test_id();
645
646 {
648 let mut writer = RingFactory::get_writer(&XS_RING);
649 for i in 0..100 {
650 let event = EventUtils::create_pooled_event::<64>(b"stats_test", test_id + i).unwrap();
651 writer.add(event);
652 }
653 }
654
655 let stats = PoolStats::collect_xs();
657 assert_eq!(stats.pool_id, PoolId::XS);
658 assert_eq!(stats.capacity, XS_CAPACITY);
659 assert!(stats.current_backpressure >= 0.0);
660
661 println!("XS Pool stats: {}", stats);
662
663 let all_stats = PoolStats::collect_all();
665 assert_eq!(all_stats.len(), 5);
666
667 for stat in &all_stats {
668 assert!(stat.current_backpressure >= 0.0);
669 assert!(stat.current_backpressure <= 1.1); }
671
672 println!("✅ Pool statistics working");
673 }
674
675 #[test]
677 fn test_auto_sized_events() {
678 let test_id = next_test_id();
679
680 let small_data = b"small";
682 let medium_data = vec![b'M'; 500];
683 let large_data = vec![b'L'; 2000];
684
685 let small_event = EventUtils::create_auto_sized_event(small_data, test_id).unwrap();
687 let medium_event = EventUtils::create_auto_sized_event(&medium_data, test_id + 1).unwrap();
688 let large_event = EventUtils::create_auto_sized_event(&large_data, test_id + 2).unwrap();
689
690 assert_eq!(small_event.pool_id(), PoolId::XS);
692 assert_eq!(medium_event.pool_id(), PoolId::M);
693 assert_eq!(large_event.pool_id(), PoolId::L);
694
695 small_event.emit_to_ring().unwrap();
697 medium_event.emit_to_ring().unwrap();
698 large_event.emit_to_ring().unwrap();
699
700 let mut xs_reader = RingFactory::get_reader(&XS_RING);
702 let mut m_reader = RingFactory::get_reader(&M_RING);
703 let mut l_reader = RingFactory::get_reader(&L_RING);
704
705 if let Some(event) = xs_reader.next() {
706 if event.event_type == test_id {
707 assert_eq!(&event.data[..event.len as usize], small_data);
708 }
709 }
710
711 if let Some(event) = m_reader.next() {
712 if event.event_type == test_id + 1 {
713 assert_eq!(&event.data[..event.len as usize], &medium_data);
714 }
715 }
716
717 if let Some(event) = l_reader.next() {
718 if event.event_type == test_id + 2 {
719 assert_eq!(&event.data[..event.len as usize], &large_data);
720 }
721 }
722
723 println!("✅ Auto-sized events working");
724 }
725}