vkteams_bot/bot/
longpoll.rs

1use crate::api::events::get::{RequestEventsGet, ResponseEventsGet};
2use crate::api::types::{BotRequest, EventMessage, POLL_TIME};
3use crate::bot::Bot;
4use crate::config::CONFIG;
5use crate::error::{BotError, Result};
6use std::future::Future;
7use std::sync::Arc;
8#[cfg(test)]
9use std::sync::atomic::AtomicU32;
10use std::time::{Duration, Instant};
11use tokio::sync::Semaphore;
12use tokio::time::sleep;
13use tracing::{debug, error, info, trace, warn};
14
15/// Listen for events and execute callback function
16/// ## Parameters
17/// - `func` - callback function with [`Result`] type [`ResponseEventsGet`] as argument
18impl Bot {
19    /// Listen for events and execute callback function
20    /// ## Parameters
21    /// - `func` - callback function with [`Result`] type and [`ResponseEventsGet`] argument
22    ///
23    /// ## Errors
24    /// - `BotError::Api` - API error when getting events
25    /// - `BotError::Network` - network error when getting events
26    /// - `BotError::Serialization` - response deserialization error
27    /// - `BotError::System` - error when executing callback function
28    pub async fn event_listener<F, X>(&self, func: F) -> Result<()>
29    where
30        F: Fn(Bot, ResponseEventsGet) -> X,
31        X: Future<Output = Result<()>> + Send + Sync + 'static,
32    {
33        let cfg = &CONFIG.listener;
34        // Create a channel to signal shutdown
35        let (shutdown_tx, mut shutdown_rx) = tokio::sync::broadcast::channel::<()>(1);
36
37        // Setup shutdown signal handler
38        let shutdown_tx_clone = shutdown_tx.clone();
39        tokio::spawn(async move {
40            crate::bot::net::shutdown_signal().await;
41            info!("Received stop signal, gracefully stopping event listener...");
42            let _ = shutdown_tx_clone.send(());
43        });
44
45        let mut current_backoff = cfg.empty_backoff_ms;
46        let mut consecutive_empty_polls = 0u32;
47
48        'event_loop: loop {
49            // Check if we received a shutdown signal
50            if shutdown_rx.try_recv().is_ok() {
51                info!("Processing shutdown request");
52                break 'event_loop;
53            }
54            let start_time = Instant::now();
55            debug!("Getting events with ID: {}", self.get_last_event_id());
56
57            // Make a request to the API
58            let req = RequestEventsGet::new(self.get_last_event_id()).with_poll_time(POLL_TIME);
59
60            // Get response, with error handling for network issues
61            let res = match self.send_api_request::<RequestEventsGet>(req).await {
62                Ok(res) => res,
63                Err(e) => {
64                    error!("Error getting events: {}", e);
65
66                    // Apply backoff before retrying
67                    let backoff = Duration::from_millis(current_backoff);
68                    warn!("Backing off for {:?} before retrying", backoff);
69                    sleep(backoff).await;
70
71                    // Increase backoff time for next failure, with maximum limit
72                    if cfg.use_exponential_backoff {
73                        current_backoff = std::cmp::min(current_backoff * 2, cfg.max_backoff_ms);
74                    }
75
76                    continue;
77                }
78            };
79
80            // Process events if we have any
81            if !res.events.is_empty() {
82                debug!("Received {} events", res.events.len());
83
84                // Reset backoff time when we get events
85                current_backoff = cfg.empty_backoff_ms;
86                consecutive_empty_polls = 0;
87
88                // Process events
89                self.process_event_batch(res, &func).await?;
90            } else {
91                debug!("No events received, continuing to wait");
92                consecutive_empty_polls += 1;
93
94                // Apply backoff when no events received
95                if consecutive_empty_polls > 1 {
96                    // Calculate how much time we should back off
97                    let elapsed = start_time.elapsed();
98                    let backoff_time = Duration::from_millis(current_backoff);
99
100                    // Only sleep if we need to wait longer than we already have
101                    if elapsed < backoff_time {
102                        let sleep_time = backoff_time - elapsed;
103                        debug!("Backing off for {:?}", sleep_time);
104                        sleep(sleep_time).await;
105                    }
106
107                    // Increase backoff time for next empty poll, with maximum limit
108                    if cfg.use_exponential_backoff {
109                        current_backoff = std::cmp::min(current_backoff * 2, cfg.max_backoff_ms);
110                    }
111                }
112            }
113        } // End of event_loop
114
115        info!("Event listener stopped gracefully");
116        Ok(())
117    }
118
119    /// Process a batch of events
120    /// Handles events in chunks to manage memory usage
121    #[tracing::instrument(skip(self, events, func))]
122    async fn process_event_batch<F, X>(&self, events: ResponseEventsGet, func: &F) -> Result<()>
123    where
124        F: Fn(Bot, ResponseEventsGet) -> X,
125        X: Future<Output = Result<()>> + Send + Sync + 'static,
126    {
127        let cfg = &CONFIG.listener;
128        // Calculate approximate memory usage of events
129        let memory_usage = if cfg.max_memory_usage > 0 {
130            events.events.len() * 1024 // Assume 1KB per event as estimate
131        } else {
132            0
133        };
134
135        // Check if we need to process events in batches to manage memory
136        if cfg.max_memory_usage > 0 && memory_usage > cfg.max_memory_usage {
137            debug!("Processing events in batches due to memory constraints");
138
139            // Process events in smaller batches
140            let batches = events.events.len().div_ceil(cfg.max_events_per_batch);
141            for batch_idx in 0..batches {
142                let start_idx = batch_idx * cfg.max_events_per_batch;
143                let end_idx = std::cmp::min(
144                    (batch_idx + 1) * cfg.max_events_per_batch,
145                    events.events.len(),
146                );
147
148                debug!(
149                    "Processing batch {}/{} (events {}-{})",
150                    batch_idx + 1,
151                    batches,
152                    start_idx,
153                    end_idx - 1
154                );
155
156                // Create a batch of events (zero-copy slice)
157                let batch_events = ResponseEventsGet {
158                    events: events.events[start_idx..end_idx].into(),
159                };
160
161                // Get the last event ID in this batch
162                let last_event_id = batch_events.events[batch_events.events.len() - 1].event_id;
163
164                // Update last event ID
165                self.set_last_event_id(last_event_id);
166                debug!("Updated last event ID: {}", last_event_id);
167
168                // Process this batch of events
169                if let Err(e) = func(self.clone(), batch_events).await {
170                    error!("Error processing events batch: {}", e);
171                    return Err(e);
172                }
173
174                // Brief pause between batches to allow GC to run
175                sleep(Duration::from_millis(10)).await;
176            }
177        } else {
178            // Process all events at once (original behavior)
179            // Update last event id
180            let last_event_id = events.events[events.events.len() - 1].event_id;
181            self.set_last_event_id(last_event_id);
182            debug!("Updated last event ID: {}", last_event_id);
183
184            // Execute callback function
185            if let Err(e) = func(self.clone(), events).await {
186                error!("Error processing events: {}", e);
187                return Err(e);
188            }
189        }
190
191        Ok(())
192    }
193
194    /// Listen for events with parallel processing
195    /// Enhanced version that processes events in parallel batches
196    pub async fn event_listener_parallel<F, X>(&self, func: F) -> Result<()>
197    where
198        F: Fn(Bot, ResponseEventsGet) -> X + Send + Sync + Clone + 'static,
199        X: Future<Output = Result<()>> + Send + 'static,
200    {
201        let cfg = &CONFIG.listener;
202        info!("Starting parallel event listener...");
203
204        // Initialize parallel processor and adaptive backoff
205        let processor = ParallelEventProcessor::new(
206            cfg.max_events_per_batch.max(1), // Use as max concurrent batches
207            cfg.max_events_per_batch,
208        );
209
210        let mut backoff = AdaptiveBackoff::new(
211            Duration::from_millis(cfg.empty_backoff_ms),
212            Duration::from_millis(cfg.max_backoff_ms),
213        );
214
215        // Initialize event stream buffer for zero-copy processing (future use)
216        // let mut event_stream = ZeroCopyEventStream::new(cfg.max_events_per_batch * 10);
217
218        // Create a channel to signal shutdown
219        let (shutdown_tx, mut shutdown_rx) = tokio::sync::broadcast::channel::<()>(1);
220
221        // Setup shutdown signal handler
222        let shutdown_tx_clone = shutdown_tx.clone();
223        tokio::spawn(async move {
224            crate::bot::net::shutdown_signal().await;
225            info!("Received stop signal, gracefully stopping parallel event listener...");
226            let _ = shutdown_tx_clone.send(());
227        });
228
229        'event_loop: loop {
230            // Check if we received a shutdown signal
231            if shutdown_rx.try_recv().is_ok() {
232                info!("Processing shutdown request");
233                break 'event_loop;
234            }
235
236            let start_time = Instant::now();
237
238            // Create request for events
239            let req = RequestEventsGet::new(self.get_last_event_id()).with_poll_time(POLL_TIME);
240
241            // Send request and handle response
242            let res = match self.send_api_request::<RequestEventsGet>(req).await {
243                Ok(res) => res,
244                Err(e) => {
245                    error!("Error getting events: {}", e);
246
247                    // Apply adaptive backoff on error
248                    let delay = backoff.calculate_delay(0);
249                    warn!("Error occurred, backing off for {:?}", delay);
250                    sleep(delay).await;
251                    continue;
252                }
253            };
254
255            // Process events if we have any
256            if !res.events.is_empty() {
257                debug!("Received {} events", res.events.len());
258
259                // Update last event ID from the most recent event
260                let last_event_id = res.events[res.events.len() - 1].event_id;
261                self.set_last_event_id(last_event_id);
262                debug!("Updated last event ID: {}", last_event_id);
263
264                // Process events in parallel
265                let processing_start = Instant::now();
266                match processor
267                    .process_events_parallel(self.clone(), res, func.clone())
268                    .await
269                {
270                    Ok(_) => {
271                        let processing_duration = processing_start.elapsed();
272                        trace!("Parallel processing completed in {:?}", processing_duration);
273
274                        // Reset backoff on successful processing
275                        backoff.calculate_delay(1);
276                    }
277                    Err(e) => {
278                        error!("Error in parallel processing: {}", e);
279                        return Err(e);
280                    }
281                }
282            } else {
283                debug!("No events received, applying adaptive backoff");
284
285                // Apply adaptive backoff for empty polls
286                let delay = backoff.calculate_delay(0);
287
288                // Only sleep if we haven't already spent enough time waiting
289                let elapsed = start_time.elapsed();
290                if elapsed < delay {
291                    let sleep_time = delay - elapsed;
292                    trace!("Adaptive backoff: sleeping for {:?}", sleep_time);
293                    sleep(sleep_time).await;
294                }
295            }
296        } // End of event_loop
297
298        info!("Parallel event listener stopped gracefully");
299        Ok(())
300    }
301}
302
303/// Parallel event processor for concurrent batch processing
304pub struct ParallelEventProcessor {
305    max_concurrent_batches: usize,
306    batch_size: usize,
307}
308
309impl ParallelEventProcessor {
310    /// Create new parallel event processor
311    pub fn new(max_concurrent_batches: usize, batch_size: usize) -> Self {
312        Self {
313            max_concurrent_batches,
314            batch_size,
315        }
316    }
317
318    /// Process events in parallel batches
319    pub async fn process_events_parallel<F, X>(
320        &self,
321        bot: Bot,
322        events: ResponseEventsGet,
323        processor: F,
324    ) -> Result<()>
325    where
326        F: Fn(Bot, ResponseEventsGet) -> X + Send + Sync + Clone + 'static,
327        X: Future<Output = Result<()>> + Send + 'static,
328    {
329        if events.events.is_empty() {
330            return Ok(());
331        }
332
333        let batches = self.create_batches(events);
334        let batch_count = batches.len();
335        let semaphore = Arc::new(Semaphore::new(self.max_concurrent_batches));
336
337        trace!("Processing {} batches in parallel", batch_count);
338
339        let futures: Vec<_> = batches
340            .into_iter()
341            .enumerate()
342            .map(|(batch_idx, batch)| {
343                let processor = processor.clone();
344                let bot = bot.clone();
345                let semaphore = semaphore.clone();
346
347                async move {
348                    let _permit = semaphore.acquire().await.map_err(|e| {
349                        BotError::System(format!("Failed to acquire semaphore: {e}"))
350                    })?;
351
352                    trace!(
353                        "Processing batch {} with {} events",
354                        batch_idx,
355                        batch.events.len()
356                    );
357
358                    let start_time = Instant::now();
359                    let result = processor(bot, batch).await;
360                    let duration = start_time.elapsed();
361
362                    if let Err(ref e) = result {
363                        error!("Batch {} failed after {:?}: {}", batch_idx, duration, e);
364                    } else {
365                        trace!("Batch {} completed in {:?}", batch_idx, duration);
366                    }
367
368                    result
369                }
370            })
371            .collect();
372
373        // Wait for all batches to complete using a simpler approach
374        use futures::future::join_all;
375        let results: Vec<Result<()>> = join_all(futures).await.into_iter().collect();
376
377        // Check if any batches failed
378        for (idx, result) in results.into_iter().enumerate() {
379            if let Err(e) = result {
380                return Err(BotError::System(format!(
381                    "Batch {idx} processing failed: {e}"
382                )));
383            }
384        }
385
386        debug!("All {} batches processed successfully", batch_count);
387        Ok(())
388    }
389
390    /// Create batches from events, ensuring no events are lost
391    fn create_batches(&self, events: ResponseEventsGet) -> Vec<ResponseEventsGet> {
392        events
393            .events
394            .chunks(self.batch_size)
395            .map(|chunk| ResponseEventsGet {
396                events: chunk.to_vec(),
397            })
398            .collect()
399    }
400}
401
402/// Adaptive backoff strategy that adjusts delay based on activity
403pub struct AdaptiveBackoff {
404    current_delay: Duration,
405    min_delay: Duration,
406    max_delay: Duration,
407    consecutive_empty_polls: u32,
408    last_activity: Option<Instant>,
409    empty_poll_threshold: u32,
410}
411
412impl AdaptiveBackoff {
413    /// Create new adaptive backoff strategy
414    pub fn new(min_delay: Duration, max_delay: Duration) -> Self {
415        Self {
416            current_delay: min_delay,
417            min_delay,
418            max_delay,
419            consecutive_empty_polls: 0,
420            last_activity: None,
421            empty_poll_threshold: 3, // Start backing off after 3 consecutive empty polls
422        }
423    }
424
425    /// Calculate delay based on events received and system load
426    pub fn calculate_delay(&mut self, events_received: usize) -> Duration {
427        let now = Instant::now();
428
429        if events_received > 0 {
430            // Reset to minimum delay when events are received
431            self.current_delay = self.min_delay;
432            self.consecutive_empty_polls = 0;
433            self.last_activity = Some(now);
434
435            trace!("Events received, reset delay to {:?}", self.current_delay);
436        } else {
437            // Exponential backoff for empty polls
438            self.consecutive_empty_polls += 1;
439
440            // Only increase backoff after several consecutive empty polls
441            if self.consecutive_empty_polls > self.empty_poll_threshold {
442                self.current_delay = std::cmp::min(
443                    Duration::from_millis(
444                        (self.current_delay.as_millis() as u64 * 3 / 2)
445                            .max(self.min_delay.as_millis() as u64),
446                    ),
447                    self.max_delay,
448                );
449
450                trace!(
451                    "Empty poll #{}, increased delay to {:?}",
452                    self.consecutive_empty_polls, self.current_delay
453                );
454            }
455
456            // If we've been idle for a long time, increase delay more aggressively
457            if let Some(last_activity) = self.last_activity {
458                let idle_time = now.duration_since(last_activity);
459                if idle_time > Duration::from_secs(60) {
460                    self.current_delay = std::cmp::min(
461                        self.current_delay + Duration::from_millis(100),
462                        self.max_delay,
463                    );
464                }
465            }
466        }
467
468        self.current_delay
469    }
470
471    /// Get current delay without modifying state
472    pub fn current_delay(&self) -> Duration {
473        self.current_delay
474    }
475
476    /// Reset backoff to minimum (useful for external triggers)
477    pub fn reset(&mut self) {
478        self.current_delay = self.min_delay;
479        self.consecutive_empty_polls = 0;
480        self.last_activity = Some(Instant::now());
481    }
482}
483
484/// Zero-copy event streaming buffer
485pub struct ZeroCopyEventStream {
486    events: std::collections::VecDeque<EventMessage>,
487    capacity: usize,
488}
489
490impl ZeroCopyEventStream {
491    /// Create new event stream with given capacity
492    pub fn new(capacity: usize) -> Self {
493        Self {
494            events: std::collections::VecDeque::with_capacity(capacity),
495            capacity,
496        }
497    }
498
499    /// Push events efficiently by moving data
500    pub fn push_events(&mut self, mut new_events: Vec<EventMessage>) {
501        // If new events exceed capacity, take only the last capacity events
502        if new_events.len() > self.capacity {
503            new_events.drain(..new_events.len() - self.capacity);
504        }
505
506        // Ensure we don't exceed capacity by removing old events
507        while self.events.len() + new_events.len() > self.capacity {
508            self.events.pop_front();
509        }
510
511        // Move events instead of copying
512        self.events.extend(new_events.drain(..));
513    }
514
515    /// Drain a batch of events efficiently
516    pub fn drain_batch(&mut self, size: usize) -> Vec<EventMessage> {
517        self.events
518            .drain(..std::cmp::min(size, self.events.len()))
519            .collect()
520    }
521
522    /// Get current number of events
523    pub fn len(&self) -> usize {
524        self.events.len()
525    }
526
527    /// Check if empty
528    pub fn is_empty(&self) -> bool {
529        self.events.is_empty()
530    }
531
532    /// Get events without removing them
533    pub fn peek_events(&self, count: usize) -> Vec<&EventMessage> {
534        self.events.iter().take(count).collect()
535    }
536}
537
538#[cfg(test)]
539mod tests {
540    use super::*;
541    use crate::api::events::get::ResponseEventsGet;
542    use crate::api::types::{EventId, EventMessage, EventType};
543    use crate::error::{BotError, Result};
544    use std::sync::Arc;
545    use std::sync::atomic::{AtomicUsize, Ordering};
546
547    // Dummy Bot с моками для тестов
548    #[derive(Clone, Default)]
549    pub struct DummyBot {
550        last_event_id: Arc<AtomicU32>,
551        set_last_event_calls: Arc<AtomicUsize>,
552    }
553    impl DummyBot {
554        fn new() -> Self {
555            Self {
556                last_event_id: Arc::new(AtomicU32::new(0)),
557                set_last_event_calls: Arc::new(AtomicUsize::new(0)),
558            }
559        }
560        fn set_last_event_id(&self, id: EventId) {
561            self.last_event_id.store(id, Ordering::SeqCst);
562            self.set_last_event_calls.fetch_add(1, Ordering::SeqCst);
563        }
564    }
565
566    fn make_events(n: usize) -> ResponseEventsGet {
567        ResponseEventsGet {
568            events: (0..n)
569                .map(|i| EventMessage {
570                    event_id: i as EventId,
571                    event_type: EventType::None,
572                })
573                .collect(),
574        }
575    }
576
577    #[tokio::test]
578    async fn test_process_event_batch_single_batch() {
579        let bot = DummyBot::new();
580        let events = make_events(3);
581        let call_count = Arc::new(AtomicUsize::new(0));
582        let call_count2 = call_count.clone();
583        let func = move |_bot: DummyBot, _ev: ResponseEventsGet| {
584            let call_count2 = call_count2.clone();
585            async move {
586                call_count2.fetch_add(1, Ordering::SeqCst);
587                Ok(())
588            }
589        };
590        // Симулируем batch < max_events_per_batch
591        let res = Bot::process_event_batch_test(&bot, events.clone(), &func, 10).await;
592        assert!(res.is_ok());
593        assert_eq!(call_count.load(Ordering::SeqCst), 1);
594        assert_eq!(bot.set_last_event_calls.load(Ordering::SeqCst), 1);
595    }
596
597    #[tokio::test]
598    async fn test_process_event_batch_multiple_batches() {
599        let bot = DummyBot::new();
600        let events = make_events(15);
601        let call_count = Arc::new(AtomicUsize::new(0));
602        let call_count2 = call_count.clone();
603        let func = move |_bot: DummyBot, _ev: ResponseEventsGet| {
604            let call_count2 = call_count2.clone();
605            async move {
606                call_count2.fetch_add(1, Ordering::SeqCst);
607                Ok(())
608            }
609        };
610        // max_events_per_batch = 5, должно быть 3 батча
611        let res = Bot::process_event_batch_test(&bot, events.clone(), &func, 5).await;
612        assert!(res.is_ok());
613        assert_eq!(call_count.load(Ordering::SeqCst), 3);
614        assert_eq!(bot.set_last_event_calls.load(Ordering::SeqCst), 3);
615    }
616
617    #[tokio::test]
618    async fn test_process_event_batch_callback_error() {
619        let bot = DummyBot::new();
620        let events = make_events(2);
621        let func = |_bot: DummyBot, _ev: ResponseEventsGet| async move {
622            Err(BotError::System("fail".into()))
623        };
624        let res = Bot::process_event_batch_test(&bot, events, &func, 10).await;
625        assert!(res.is_err());
626    }
627
628    #[tokio::test]
629    async fn test_process_event_batch_empty_events() {
630        // Should return Ok and not call callback
631        let bot = DummyBot::new();
632        let events = make_events(0);
633        let call_count = Arc::new(AtomicUsize::new(0));
634        let call_count2 = call_count.clone();
635        let func = move |_bot: DummyBot, _ev: ResponseEventsGet| {
636            let call_count2 = call_count2.clone();
637            async move {
638                call_count2.fetch_add(1, Ordering::SeqCst);
639                Ok(())
640            }
641        };
642        let res = Bot::process_event_batch_test(&bot, events, &func, 10).await;
643        assert!(res.is_ok());
644        assert_eq!(call_count.load(Ordering::SeqCst), 0);
645    }
646
647    #[tokio::test]
648    async fn test_process_event_batch_error_in_second_batch() {
649        // Should return error only after first batch is ok
650        let bot = DummyBot::new();
651        let events = make_events(6);
652        let call_count = Arc::new(AtomicUsize::new(0));
653        let call_count2 = call_count.clone();
654        let func = move |_bot: DummyBot, _ev: ResponseEventsGet| {
655            let call_count2 = call_count2.clone();
656            async move {
657                let n = call_count2.fetch_add(1, Ordering::SeqCst);
658                if n == 1 {
659                    Err(BotError::System("fail".into()))
660                } else {
661                    Ok(())
662                }
663            }
664        };
665        // batch size 3: first batch ok, second batch returns error
666        let res = Bot::process_event_batch_test(&bot, events, &func, 3).await;
667        assert!(res.is_err());
668        assert_eq!(call_count.load(Ordering::SeqCst), 2);
669    }
670
671    #[tokio::test]
672    async fn test_process_event_batch_empty_events_with_memory_limit() {
673        // Should return Ok and not call callback, even if max_memory_usage > 0
674        let bot = DummyBot::new();
675        let events = make_events(0);
676        let call_count = Arc::new(AtomicUsize::new(0));
677        let call_count2 = call_count.clone();
678        let func = move |_bot: DummyBot, _ev: ResponseEventsGet| {
679            let call_count2 = call_count2.clone();
680            async move {
681                call_count2.fetch_add(1, Ordering::SeqCst);
682                Ok(())
683            }
684        };
685        // batch size 1, but no events
686        let res = Bot::process_event_batch_test(&bot, events, &func, 1).await;
687        assert!(res.is_ok());
688        assert_eq!(call_count.load(Ordering::SeqCst), 0);
689    }
690
691    // Вспомогательная функция для теста process_event_batch с параметром max_events_per_batch
692    impl Bot {
693        pub async fn process_event_batch_test<F, X>(
694            bot: &DummyBot,
695            events: ResponseEventsGet,
696            func: &F,
697            max_events_per_batch: usize,
698        ) -> Result<()>
699        where
700            F: Fn(DummyBot, ResponseEventsGet) -> X,
701            X: Future<Output = Result<()>> + Send + Sync + 'static,
702        {
703            // Упрощённая логика batching для теста
704            let total = events.events.len();
705            if total == 0 {
706                return Ok(());
707            }
708            let batches = total.div_ceil(max_events_per_batch);
709            for batch_idx in 0..batches {
710                let start_idx = batch_idx * max_events_per_batch;
711                let end_idx = std::cmp::min((batch_idx + 1) * max_events_per_batch, total);
712                let batch_events = ResponseEventsGet {
713                    events: events.events[start_idx..end_idx].to_vec(),
714                };
715                let last_event_id = batch_events.events[batch_events.events.len() - 1].event_id;
716                bot.set_last_event_id(last_event_id);
717                func(bot.clone(), batch_events).await?;
718            }
719            Ok(())
720        }
721    }
722
723    // Tests for AdaptiveBackoff component
724    #[test]
725    fn test_adaptive_backoff_new() {
726        let min_delay = Duration::from_millis(100);
727        let max_delay = Duration::from_millis(5000);
728        let backoff = AdaptiveBackoff::new(min_delay, max_delay);
729
730        assert_eq!(backoff.current_delay(), min_delay);
731    }
732
733    #[test]
734    fn test_adaptive_backoff_calculate_delay_no_events() {
735        let min_delay = Duration::from_millis(100);
736        let max_delay = Duration::from_millis(5000);
737        let mut backoff = AdaptiveBackoff::new(min_delay, max_delay);
738
739        let calculated = backoff.calculate_delay(0);
740        assert!(calculated >= min_delay);
741        assert!(calculated <= max_delay);
742    }
743
744    #[test]
745    fn test_adaptive_backoff_calculate_delay_with_events() {
746        let min_delay = Duration::from_millis(100);
747        let max_delay = Duration::from_millis(5000);
748        let mut backoff = AdaptiveBackoff::new(min_delay, max_delay);
749
750        let calculated = backoff.calculate_delay(5);
751        assert_eq!(calculated, min_delay);
752    }
753
754    #[test]
755    fn test_adaptive_backoff_reset() {
756        let min_delay = Duration::from_millis(100);
757        let max_delay = Duration::from_millis(5000);
758        let mut backoff = AdaptiveBackoff::new(min_delay, max_delay);
759
760        // Increase delay first
761        backoff.calculate_delay(0);
762        let after_increase = backoff.current_delay();
763        assert!(after_increase >= min_delay);
764
765        // Reset and check
766        backoff.reset();
767        assert_eq!(backoff.current_delay(), min_delay);
768    }
769
770    #[test]
771    fn test_adaptive_backoff_current_delay() {
772        let min_delay = Duration::from_millis(50);
773        let max_delay = Duration::from_millis(2000);
774        let backoff = AdaptiveBackoff::new(min_delay, max_delay);
775
776        assert_eq!(backoff.current_delay(), min_delay);
777    }
778
779    // Tests for ZeroCopyEventStream component
780    #[test]
781    fn test_zero_copy_event_stream_new() {
782        let stream = ZeroCopyEventStream::new(100);
783        assert_eq!(stream.len(), 0);
784        assert!(stream.is_empty());
785    }
786
787    #[test]
788    fn test_zero_copy_event_stream_push_events() {
789        let mut stream = ZeroCopyEventStream::new(10);
790        let events = make_events(3);
791
792        stream.push_events(events.events.clone());
793        assert_eq!(stream.len(), 3);
794        assert!(!stream.is_empty());
795    }
796
797    #[test]
798    fn test_zero_copy_event_stream_push_events_overflow() {
799        let mut stream = ZeroCopyEventStream::new(2);
800        let events = make_events(5);
801
802        stream.push_events(events.events.clone());
803        assert_eq!(stream.len(), 2); // Should be capped at capacity
804
805        // Verify we get the last 2 events (most recent)
806        let remaining_events = stream.peek_events(2);
807        assert_eq!(remaining_events.len(), 2);
808        assert_eq!(remaining_events[0].event_id, 3); // Last 2 events should be 3 and 4
809        assert_eq!(remaining_events[1].event_id, 4);
810    }
811
812    #[test]
813    fn test_zero_copy_event_stream_drain_batch() {
814        let mut stream = ZeroCopyEventStream::new(10);
815        let events = make_events(5);
816
817        stream.push_events(events.events.clone());
818        let drained = stream.drain_batch(3);
819
820        assert_eq!(drained.len(), 3);
821        assert_eq!(stream.len(), 2); // 5 - 3 = 2 remaining
822    }
823
824    #[test]
825    fn test_zero_copy_event_stream_drain_batch_more_than_available() {
826        let mut stream = ZeroCopyEventStream::new(10);
827        let events = make_events(2);
828
829        stream.push_events(events.events.clone());
830        let drained = stream.drain_batch(5);
831
832        assert_eq!(drained.len(), 2); // Only 2 available
833        assert_eq!(stream.len(), 0);
834        assert!(stream.is_empty());
835    }
836
837    #[test]
838    fn test_zero_copy_event_stream_peek_events() {
839        let mut stream = ZeroCopyEventStream::new(10);
840        let events = make_events(5);
841
842        stream.push_events(events.events.clone());
843        let peeked = stream.peek_events(3);
844
845        assert_eq!(peeked.len(), 3);
846        assert_eq!(stream.len(), 5); // Stream unchanged after peek
847
848        // Check that peeked events match the first 3 events
849        for (i, event_ref) in peeked.iter().enumerate() {
850            assert_eq!(event_ref.event_id, events.events[i].event_id);
851        }
852    }
853
854    #[test]
855    fn test_zero_copy_event_stream_peek_events_more_than_available() {
856        let mut stream = ZeroCopyEventStream::new(10);
857        let events = make_events(2);
858
859        stream.push_events(events.events.clone());
860        let peeked = stream.peek_events(5);
861
862        assert_eq!(peeked.len(), 2); // Only 2 available
863        assert_eq!(stream.len(), 2); // Stream unchanged
864    }
865
866    // Tests for ParallelEventProcessor component
867    #[test]
868    fn test_parallel_event_processor_new() {
869        let _processor = ParallelEventProcessor::new(5, 10);
870        // Can't directly test internal fields, but constructor should not panic
871    }
872}