Skip to main content

rusted_ring/
pool.rs

1use std::sync::OnceLock;
2
3use bytemuck::Zeroable;
4
5use crate::ring::{EventSize, PooledEvent, Reader, RingBuffer, Writer};
6
7// Pool size constants - Adjusted to stay under 1MB per ring buffer
8pub const XS_CAPACITY: usize = 2000; // 64 * 2000 = 128KB
9pub const S_CAPACITY: usize = 1000; // 256 * 1000 = 256KB
10pub const M_CAPACITY: usize = 300; // 1024 * 300 = 307KB
11pub const L_CAPACITY: usize = 60; // 4096 * 60 = 245KB
12pub const XL_CAPACITY: usize = 15; // 16384 * 15 = 245KB
13
14// Static ring buffers - no Arc, no heap allocation
15static XS_RING: OnceLock<RingBuffer<64, XS_CAPACITY>> = OnceLock::new();
16static S_RING: OnceLock<RingBuffer<256, S_CAPACITY>> = OnceLock::new();
17static M_RING: OnceLock<RingBuffer<1024, M_CAPACITY>> = OnceLock::new();
18static L_RING: OnceLock<RingBuffer<4096, L_CAPACITY>> = OnceLock::new();
19static XL_RING: OnceLock<RingBuffer<16384, XL_CAPACITY>> = OnceLock::new();
20
21#[repr(u8)]
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum PoolId {
24    XS = 0, // 64 bytes
25    S = 1,  // 256 bytes
26    M = 2,  // 1KB
27    L = 3,  // 4KB
28    XL = 4, // 16KB
29}
30
31impl PoolId {
32    pub fn from_size(size: EventSize) -> Self {
33        match size {
34            EventSize::XS => PoolId::XS,
35            EventSize::S => PoolId::S,
36            EventSize::M => PoolId::M,
37            EventSize::L => PoolId::L,
38            EventSize::XL => PoolId::XL,
39            EventSize::XXL => panic!("XXL not supported in pools"),
40        }
41    }
42
43    pub fn max_size(&self) -> usize {
44        match self {
45            PoolId::XS => 64,
46            PoolId::S => 256,
47            PoolId::M => 1024,
48            PoolId::L => 4096,
49            PoolId::XL => 16384,
50        }
51    }
52
53    pub fn capacity(&self) -> usize {
54        match self {
55            PoolId::XS => XS_CAPACITY,
56            PoolId::S => S_CAPACITY,
57            PoolId::M => M_CAPACITY,
58            PoolId::L => L_CAPACITY,
59            PoolId::XL => XL_CAPACITY,
60        }
61    }
62}
63
64impl From<PoolId> for u8 {
65    fn from(value: PoolId) -> Self {
66        value as u8
67    }
68}
69
70impl From<u8> for PoolId {
71    fn from(value: u8) -> Self {
72        match value {
73            0 => PoolId::XS,
74            1 => PoolId::S,
75            2 => PoolId::M,
76            3 => PoolId::L,
77            4 => PoolId::XL,
78            _ => panic!("Unknown pool id: {}", value),
79        }
80    }
81}
82
83/// Generic ring buffer factory with const parameters
84pub struct RingFactory;
85
86impl RingFactory {
87    /// Generic factory method for writers
88    pub fn get_writer<const TSHIRT_SIZE: usize, const RING_CAPACITY: usize>(
89        ring: &'static OnceLock<RingBuffer<TSHIRT_SIZE, RING_CAPACITY>>,
90    ) -> Writer<TSHIRT_SIZE, RING_CAPACITY> {
91        Writer::new(ring.get_or_init(RingBuffer::new))
92    }
93
94    /// Generic factory method for readers
95    pub fn get_reader<const TSHIRT_SIZE: usize, const RING_CAPACITY: usize>(
96        ring: &'static OnceLock<RingBuffer<TSHIRT_SIZE, RING_CAPACITY>>,
97    ) -> Reader<TSHIRT_SIZE, RING_CAPACITY> {
98        Reader::new(ring.get_or_init(RingBuffer::new))
99    }
100}
101
102/// Convenience factory - provides typed access to specific pools
103pub struct EventPoolFactory;
104
105impl EventPoolFactory {
106    // Writer factory methods
107    pub fn get_xs_writer() -> Writer<64, XS_CAPACITY> {
108        RingFactory::get_writer(&XS_RING)
109    }
110
111    pub fn get_s_writer() -> Writer<256, S_CAPACITY> {
112        RingFactory::get_writer(&S_RING)
113    }
114
115    pub fn get_m_writer() -> Writer<1024, M_CAPACITY> {
116        RingFactory::get_writer(&M_RING)
117    }
118
119    pub fn get_l_writer() -> Writer<4096, L_CAPACITY> {
120        RingFactory::get_writer(&L_RING)
121    }
122
123    pub fn get_xl_writer() -> Writer<16384, XL_CAPACITY> {
124        RingFactory::get_writer(&XL_RING)
125    }
126
127    // Reader factory methods
128    pub fn get_xs_reader() -> Reader<64, XS_CAPACITY> {
129        RingFactory::get_reader(&XS_RING)
130    }
131
132    pub fn get_s_reader() -> Reader<256, S_CAPACITY> {
133        RingFactory::get_reader(&S_RING)
134    }
135
136    pub fn get_m_reader() -> Reader<1024, M_CAPACITY> {
137        RingFactory::get_reader(&M_RING)
138    }
139
140    pub fn get_l_reader() -> Reader<4096, L_CAPACITY> {
141        RingFactory::get_reader(&L_RING)
142    }
143
144    pub fn get_xl_reader() -> Reader<16384, XL_CAPACITY> {
145        RingFactory::get_reader(&XL_RING)
146    }
147
148    // Utility methods
149    pub fn estimate_size(data_len: usize) -> EventSize {
150        match data_len {
151            0..=64 => EventSize::XS,
152            65..=256 => EventSize::S,
153            257..=1024 => EventSize::M,
154            1025..=4096 => EventSize::L,
155            4097..=16384 => EventSize::XL,
156            _ => EventSize::XXL,
157        }
158    }
159
160    pub fn estimate_pool_id(data_len: usize) -> Option<PoolId> {
161        let size = Self::estimate_size(data_len);
162        if size == EventSize::XXL {
163            None
164        } else {
165            Some(PoolId::from_size(size))
166        }
167    }
168}
169
170/// Simple event creation utilities
171pub struct EventUtils;
172
173impl EventUtils {
174    /// Create pooled event from raw data
175    pub fn create_pooled_event<const SIZE: usize>(
176        data: &[u8],
177        event_type: u32,
178    ) -> Result<PooledEvent<SIZE>, EventCreationError> {
179        if data.len() > SIZE {
180            return Err(EventCreationError::DataTooLarge {
181                data_len: data.len(),
182                max_size: SIZE,
183            });
184        }
185
186        let mut pooled = PooledEvent::<SIZE>::zeroed();
187        pooled.data[..data.len()].copy_from_slice(data);
188        pooled.len = data.len() as u32;
189        pooled.event_type = event_type;
190        Ok(pooled)
191    }
192
193    /// Auto-detect size and create appropriate pooled event
194    pub fn create_auto_sized_event(data: &[u8], event_type: u32) -> Result<AutoSizedEvent, EventCreationError> {
195        let size = EventPoolFactory::estimate_size(data.len());
196
197        match size {
198            EventSize::XS => {
199                let event = Self::create_pooled_event::<64>(data, event_type)?;
200                Ok(AutoSizedEvent::Xs(event))
201            }
202            EventSize::S => {
203                let event = Self::create_pooled_event::<256>(data, event_type)?;
204                Ok(AutoSizedEvent::S(event))
205            }
206            EventSize::M => {
207                let event = Self::create_pooled_event::<1024>(data, event_type)?;
208                Ok(AutoSizedEvent::M(event))
209            }
210            EventSize::L => {
211                let event = Self::create_pooled_event::<4096>(data, event_type)?;
212                Ok(AutoSizedEvent::L(event))
213            }
214            EventSize::XL => {
215                let event = Self::create_pooled_event::<16384>(data, event_type)?;
216                Ok(AutoSizedEvent::Xl(event))
217            }
218            EventSize::XXL => Err(EventCreationError::DataTooLarge {
219                data_len: data.len(),
220                max_size: 16384,
221            }),
222        }
223    }
224}
225
226#[allow(clippy::large_enum_variant)]
227/// Auto-sized event wrapper for convenience
228#[derive(Debug, Clone)]
229pub enum AutoSizedEvent {
230    Xs(PooledEvent<64>),
231    S(PooledEvent<256>),
232    M(PooledEvent<1024>),
233    L(PooledEvent<4096>),
234    Xl(PooledEvent<16384>),
235}
236
237impl AutoSizedEvent {
238    pub fn data(&self) -> &[u8] {
239        match self {
240            AutoSizedEvent::Xs(event) => &event.data[..event.len as usize],
241            AutoSizedEvent::S(event) => &event.data[..event.len as usize],
242            AutoSizedEvent::M(event) => &event.data[..event.len as usize],
243            AutoSizedEvent::L(event) => &event.data[..event.len as usize],
244            AutoSizedEvent::Xl(event) => &event.data[..event.len as usize],
245        }
246    }
247
248    pub fn event_type(&self) -> u32 {
249        match self {
250            AutoSizedEvent::Xs(event) => event.event_type,
251            AutoSizedEvent::S(event) => event.event_type,
252            AutoSizedEvent::M(event) => event.event_type,
253            AutoSizedEvent::L(event) => event.event_type,
254            AutoSizedEvent::Xl(event) => event.event_type,
255        }
256    }
257
258    pub fn len(&self) -> u32 {
259        match self {
260            AutoSizedEvent::Xs(event) => event.len,
261            AutoSizedEvent::S(event) => event.len,
262            AutoSizedEvent::M(event) => event.len,
263            AutoSizedEvent::L(event) => event.len,
264            AutoSizedEvent::Xl(event) => event.len,
265        }
266    }
267
268    pub fn is_empty(&self) -> bool {
269        self.len() == 0
270    }
271
272    pub fn pool_id(&self) -> PoolId {
273        match self {
274            AutoSizedEvent::Xs(_) => PoolId::XS,
275            AutoSizedEvent::S(_) => PoolId::S,
276            AutoSizedEvent::M(_) => PoolId::M,
277            AutoSizedEvent::L(_) => PoolId::L,
278            AutoSizedEvent::Xl(_) => PoolId::XL,
279        }
280    }
281
282    /// Emit this event to the appropriate ring buffer
283    pub fn emit_to_ring(self) -> Result<(), EmitError> {
284        match self {
285            AutoSizedEvent::Xs(event) => {
286                let mut writer = RingFactory::get_writer(&XS_RING);
287                if writer.add(event) {
288                    Ok(())
289                } else {
290                    Err(EmitError::RingFull(PoolId::XS))
291                }
292            }
293            AutoSizedEvent::S(event) => {
294                let mut writer = RingFactory::get_writer(&S_RING);
295                if writer.add(event) {
296                    Ok(())
297                } else {
298                    Err(EmitError::RingFull(PoolId::S))
299                }
300            }
301            AutoSizedEvent::M(event) => {
302                let mut writer = RingFactory::get_writer(&M_RING);
303                if writer.add(event) {
304                    Ok(())
305                } else {
306                    Err(EmitError::RingFull(PoolId::M))
307                }
308            }
309            AutoSizedEvent::L(event) => {
310                let mut writer = RingFactory::get_writer(&L_RING);
311                if writer.add(event) {
312                    Ok(())
313                } else {
314                    Err(EmitError::RingFull(PoolId::L))
315                }
316            }
317            AutoSizedEvent::Xl(event) => {
318                let mut writer = RingFactory::get_writer(&XL_RING);
319                if writer.add(event) {
320                    Ok(())
321                } else {
322                    Err(EmitError::RingFull(PoolId::XL))
323                }
324            }
325        }
326    }
327}
328
329#[derive(Debug, thiserror::Error)]
330pub enum EventCreationError {
331    #[error("Data too large: {data_len} bytes > max {max_size} bytes")]
332    DataTooLarge { data_len: usize, max_size: usize },
333}
334
335#[derive(Debug, thiserror::Error)]
336pub enum EmitError {
337    #[error("Ring buffer full for pool {0:?}")]
338    RingFull(PoolId),
339}
340
341/// Pool statistics for monitoring
342#[derive(Debug, Clone)]
343pub struct PoolStats {
344    pub pool_id: PoolId,
345    pub capacity: usize,
346    pub current_backpressure: f32,
347}
348
349impl PoolStats {
350    pub fn collect_xs() -> Self {
351        let reader = RingFactory::get_reader(&XS_RING);
352        Self {
353            pool_id: PoolId::XS,
354            capacity: XS_CAPACITY,
355            current_backpressure: reader.backpressure_ratio(),
356        }
357    }
358
359    pub fn collect_s() -> Self {
360        let reader = RingFactory::get_reader(&S_RING);
361        Self {
362            pool_id: PoolId::S,
363            capacity: S_CAPACITY,
364            current_backpressure: reader.backpressure_ratio(),
365        }
366    }
367
368    pub fn collect_m() -> Self {
369        let reader = RingFactory::get_reader(&M_RING);
370        Self {
371            pool_id: PoolId::M,
372            capacity: M_CAPACITY,
373            current_backpressure: reader.backpressure_ratio(),
374        }
375    }
376
377    pub fn collect_l() -> Self {
378        let reader = RingFactory::get_reader(&L_RING);
379        Self {
380            pool_id: PoolId::L,
381            capacity: L_CAPACITY,
382            current_backpressure: reader.backpressure_ratio(),
383        }
384    }
385
386    pub fn collect_xl() -> Self {
387        let reader = RingFactory::get_reader(&XL_RING);
388        Self {
389            pool_id: PoolId::XL,
390            capacity: XL_CAPACITY,
391            current_backpressure: reader.backpressure_ratio(),
392        }
393    }
394
395    pub fn collect_all() -> Vec<Self> {
396        vec![
397            Self::collect_xs(),
398            Self::collect_s(),
399            Self::collect_m(),
400            Self::collect_l(),
401            Self::collect_xl(),
402        ]
403    }
404}
405
406impl std::fmt::Display for PoolStats {
407    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
408        write!(
409            f,
410            "Pool {:?}: capacity={}, backpressure={:.1}%",
411            self.pool_id,
412            self.capacity,
413            self.current_backpressure * 100.0
414        )
415    }
416}
417
418#[cfg(test)]
419mod pool_tests {
420    use std::{
421        sync::atomic::{AtomicU32, Ordering},
422        thread,
423        time::{Duration, Instant},
424    };
425
426    use super::*;
427
428    // Simple test counter
429    static TEST_COUNTER: AtomicU32 = AtomicU32::new(5000000);
430
431    fn next_test_id() -> u32 {
432        TEST_COUNTER.fetch_add(10000, Ordering::Relaxed)
433    }
434
435    /// Test generic factory with explicit const parameters
436    #[test]
437    fn test_generic_factory() {
438        let test_id = next_test_id();
439
440        // Test direct generic factory usage
441        let mut xs_writer = RingFactory::get_writer(&XS_RING);
442        let mut xs_reader = RingFactory::get_reader(&XS_RING);
443
444        let mut m_writer = RingFactory::get_writer(&M_RING);
445        let mut m_reader = RingFactory::get_reader(&M_RING);
446
447        // Write to XS pool
448        let xs_event = EventUtils::create_pooled_event::<64>(b"xs_generic_test", test_id).unwrap();
449        xs_writer.add(xs_event);
450
451        // Write to M pool
452        let m_event = EventUtils::create_pooled_event::<1024>(b"m_generic_test", test_id + 1).unwrap();
453        m_writer.add(m_event);
454
455        // Read back
456        if let Some(event) = xs_reader.next() {
457            if event.event_type == test_id {
458                assert_eq!(&event.data[..event.len as usize], b"xs_generic_test");
459                println!("✅ XS generic factory working");
460            }
461        }
462
463        if let Some(event) = m_reader.next() {
464            if event.event_type == test_id + 1 {
465                assert_eq!(&event.data[..event.len as usize], b"m_generic_test");
466                println!("✅ M generic factory working");
467            }
468        }
469    }
470
471    /// Test that both APIs work and are equivalent
472    #[test]
473    fn test_api_equivalence() {
474        let test_id = next_test_id();
475
476        // Use both APIs to write to the same pool
477        let mut generic_writer = RingFactory::get_writer(&S_RING);
478        let mut typed_writer = EventPoolFactory::get_s_writer();
479
480        // Both should write to the same ring buffer
481        let event1 = EventUtils::create_pooled_event::<256>(b"generic_api", test_id).unwrap();
482        let event2 = EventUtils::create_pooled_event::<256>(b"typed_api", test_id + 1).unwrap();
483
484        generic_writer.add(event1);
485        typed_writer.add(event2);
486
487        // Read using both APIs
488        let mut generic_reader = RingFactory::get_reader(&S_RING);
489
490        let mut found_generic = false;
491        let mut found_typed = false;
492
493        // Both readers should see both events
494        for _ in 0..10 {
495            if let Some(event) = generic_reader.next() {
496                if event.event_type == test_id {
497                    found_generic = true;
498                }
499                if event.event_type == test_id + 1 {
500                    found_typed = true;
501                }
502            }
503        }
504
505        assert!(found_generic || found_typed, "Should find at least one event");
506        println!("✅ API equivalence working");
507    }
508
509    /// Test performance of generic vs typed API
510    #[test]
511    fn test_performance_comparison() {
512        let test_id = next_test_id();
513        let events_count = 1000;
514
515        // Test generic API performance
516        let generic_start = Instant::now();
517        {
518            let mut writer = RingFactory::get_writer(&L_RING);
519            for i in 0..events_count {
520                let data = format!("generic_perf_{}", i);
521                let event = EventUtils::create_pooled_event::<4096>(data.as_bytes(), test_id + i).unwrap();
522                writer.add(event);
523            }
524        }
525        let generic_duration = generic_start.elapsed();
526
527        // Test typed API performance
528        let typed_start = Instant::now();
529        {
530            let mut writer = EventPoolFactory::get_l_writer();
531            for i in 0..events_count {
532                let data = format!("typed_perf_{}", i);
533                let event =
534                    EventUtils::create_pooled_event::<4096>(data.as_bytes(), test_id + events_count + i).unwrap();
535                writer.add(event);
536            }
537        }
538        let typed_duration = typed_start.elapsed();
539
540        println!(
541            "Generic API: {:.2}ms for {} events",
542            generic_duration.as_secs_f64() * 1000.0,
543            events_count
544        );
545        println!(
546            "Typed API: {:.2}ms for {} events",
547            typed_duration.as_secs_f64() * 1000.0,
548            events_count
549        );
550
551        // Both should be reasonably fast
552        assert!(generic_duration.as_millis() < 100, "Generic API too slow");
553        assert!(typed_duration.as_millis() < 100, "Typed API too slow");
554
555        println!("✅ Performance test passed");
556    }
557
558    /// Test custom ring buffer usage
559    #[test]
560    fn test_custom_ring_usage() {
561        // Define a custom ring buffer
562        static CUSTOM_RING: OnceLock<RingBuffer<512, 50>> = OnceLock::new();
563
564        let test_id = next_test_id();
565
566        // Use generic factory with custom ring
567        let mut writer = RingFactory::get_writer(&CUSTOM_RING);
568        let mut reader = RingFactory::get_reader(&CUSTOM_RING);
569
570        // Write custom sized event
571        let event = EventUtils::create_pooled_event::<512>(b"custom_ring_test", test_id).unwrap();
572        writer.add(event);
573
574        // Read back
575        if let Some(event) = reader.next() {
576            if event.event_type == test_id {
577                assert_eq!(&event.data[..event.len as usize], b"custom_ring_test");
578                println!("✅ Custom ring buffer working");
579            }
580        }
581    }
582
583    /// Test concurrent access with generic factory
584    #[test]
585    fn test_concurrent_generic_access() {
586        let test_id = next_test_id();
587        let events_per_thread = 50;
588
589        let mut handles = vec![];
590
591        // Spawn multiple threads using generic factory
592        for thread_id in 0..4 {
593            let handle = thread::spawn(move || {
594                let mut writer = RingFactory::get_writer(&XL_RING);
595
596                for i in 0..events_per_thread {
597                    let data = format!("thread_{}_event_{}", thread_id, i);
598                    let event =
599                        EventUtils::create_pooled_event::<16384>(data.as_bytes(), test_id + (thread_id * 1000) + i)
600                            .unwrap();
601
602                    writer.add(event);
603
604                    // Small delay to encourage interleaving
605                    thread::sleep(Duration::from_micros(1));
606                }
607
608                thread_id
609            });
610            handles.push(handle);
611        }
612
613        // Wait for all threads
614        for handle in handles {
615            let thread_id = handle.join().expect("Thread panicked");
616            println!("Thread {} completed", thread_id);
617        }
618
619        // Read back events
620        let mut reader = RingFactory::get_reader(&XL_RING);
621        let mut total_found = 0;
622        let expected_min = test_id;
623        let expected_max = test_id + (4 * 1000) + events_per_thread;
624
625        for _ in 0..events_per_thread * 4 + 50 {
626            if let Some(event) = reader.next() {
627                if event.event_type >= expected_min && event.event_type < expected_max {
628                    total_found += 1;
629                }
630            }
631        }
632
633        assert!(total_found > 0, "Should find some events from concurrent access");
634        println!(
635            "✅ Concurrent generic access: found {}/{} events",
636            total_found,
637            events_per_thread * 4
638        );
639    }
640
641    /// Test pool statistics with generic factory
642    #[test]
643    fn test_pool_stats_generic() {
644        let test_id = next_test_id();
645
646        // Write events to create backpressure
647        {
648            let mut writer = RingFactory::get_writer(&XS_RING);
649            for i in 0..100 {
650                let event = EventUtils::create_pooled_event::<64>(b"stats_test", test_id + i).unwrap();
651                writer.add(event);
652            }
653        }
654
655        // Collect stats
656        let stats = PoolStats::collect_xs();
657        assert_eq!(stats.pool_id, PoolId::XS);
658        assert_eq!(stats.capacity, XS_CAPACITY);
659        assert!(stats.current_backpressure >= 0.0);
660
661        println!("XS Pool stats: {}", stats);
662
663        // Test all pool stats
664        let all_stats = PoolStats::collect_all();
665        assert_eq!(all_stats.len(), 5);
666
667        for stat in &all_stats {
668            assert!(stat.current_backpressure >= 0.0);
669            assert!(stat.current_backpressure <= 1.1); // Allow slight overflow
670        }
671
672        println!("✅ Pool statistics working");
673    }
674
675    /// Test auto-sized events with generic factory
676    #[test]
677    fn test_auto_sized_events() {
678        let test_id = next_test_id();
679
680        // Test different sized data
681        let small_data = b"small";
682        let medium_data = vec![b'M'; 500];
683        let large_data = vec![b'L'; 2000];
684
685        // Create auto-sized events
686        let small_event = EventUtils::create_auto_sized_event(small_data, test_id).unwrap();
687        let medium_event = EventUtils::create_auto_sized_event(&medium_data, test_id + 1).unwrap();
688        let large_event = EventUtils::create_auto_sized_event(&large_data, test_id + 2).unwrap();
689
690        // Verify correct pool selection
691        assert_eq!(small_event.pool_id(), PoolId::XS);
692        assert_eq!(medium_event.pool_id(), PoolId::M);
693        assert_eq!(large_event.pool_id(), PoolId::L);
694
695        // Emit to ring buffers
696        small_event.emit_to_ring().unwrap();
697        medium_event.emit_to_ring().unwrap();
698        large_event.emit_to_ring().unwrap();
699
700        // Verify data integrity
701        let mut xs_reader = RingFactory::get_reader(&XS_RING);
702        let mut m_reader = RingFactory::get_reader(&M_RING);
703        let mut l_reader = RingFactory::get_reader(&L_RING);
704
705        if let Some(event) = xs_reader.next() {
706            if event.event_type == test_id {
707                assert_eq!(&event.data[..event.len as usize], small_data);
708            }
709        }
710
711        if let Some(event) = m_reader.next() {
712            if event.event_type == test_id + 1 {
713                assert_eq!(&event.data[..event.len as usize], &medium_data);
714            }
715        }
716
717        if let Some(event) = l_reader.next() {
718            if event.event_type == test_id + 2 {
719                assert_eq!(&event.data[..event.len as usize], &large_data);
720            }
721        }
722
723        println!("✅ Auto-sized events working");
724    }
725}