1use crate::api::events::get::{RequestEventsGet, ResponseEventsGet};
2use crate::api::types::{BotRequest, EventMessage, POLL_TIME};
3use crate::bot::Bot;
4use crate::config::CONFIG;
5use crate::error::{BotError, Result};
6use std::future::Future;
7use std::sync::Arc;
8#[cfg(test)]
9use std::sync::atomic::AtomicU32;
10use std::time::{Duration, Instant};
11use tokio::sync::Semaphore;
12use tokio::time::sleep;
13use tracing::{debug, error, info, trace, warn};
14
15impl Bot {
19 pub async fn event_listener<F, X>(&self, func: F) -> Result<()>
29 where
30 F: Fn(Bot, ResponseEventsGet) -> X,
31 X: Future<Output = Result<()>> + Send + Sync + 'static,
32 {
33 let cfg = &CONFIG.listener;
34 let (shutdown_tx, mut shutdown_rx) = tokio::sync::broadcast::channel::<()>(1);
36
37 let shutdown_tx_clone = shutdown_tx.clone();
39 tokio::spawn(async move {
40 crate::bot::net::shutdown_signal().await;
41 info!("Received stop signal, gracefully stopping event listener...");
42 let _ = shutdown_tx_clone.send(());
43 });
44
45 let mut current_backoff = cfg.empty_backoff_ms;
46 let mut consecutive_empty_polls = 0u32;
47
48 'event_loop: loop {
49 if shutdown_rx.try_recv().is_ok() {
51 info!("Processing shutdown request");
52 break 'event_loop;
53 }
54 let start_time = Instant::now();
55 debug!("Getting events with ID: {}", self.get_last_event_id());
56
57 let req = RequestEventsGet::new(self.get_last_event_id()).with_poll_time(POLL_TIME);
59
60 let res = match self.send_api_request::<RequestEventsGet>(req).await {
62 Ok(res) => res,
63 Err(e) => {
64 error!("Error getting events: {}", e);
65
66 let backoff = Duration::from_millis(current_backoff);
68 warn!("Backing off for {:?} before retrying", backoff);
69 sleep(backoff).await;
70
71 if cfg.use_exponential_backoff {
73 current_backoff = std::cmp::min(current_backoff * 2, cfg.max_backoff_ms);
74 }
75
76 continue;
77 }
78 };
79
80 if !res.events.is_empty() {
82 debug!("Received {} events", res.events.len());
83
84 current_backoff = cfg.empty_backoff_ms;
86 consecutive_empty_polls = 0;
87
88 self.process_event_batch(res, &func).await?;
90 } else {
91 debug!("No events received, continuing to wait");
92 consecutive_empty_polls += 1;
93
94 if consecutive_empty_polls > 1 {
96 let elapsed = start_time.elapsed();
98 let backoff_time = Duration::from_millis(current_backoff);
99
100 if elapsed < backoff_time {
102 let sleep_time = backoff_time - elapsed;
103 debug!("Backing off for {:?}", sleep_time);
104 sleep(sleep_time).await;
105 }
106
107 if cfg.use_exponential_backoff {
109 current_backoff = std::cmp::min(current_backoff * 2, cfg.max_backoff_ms);
110 }
111 }
112 }
113 } info!("Event listener stopped gracefully");
116 Ok(())
117 }
118
119 #[tracing::instrument(skip(self, events, func))]
122 async fn process_event_batch<F, X>(&self, events: ResponseEventsGet, func: &F) -> Result<()>
123 where
124 F: Fn(Bot, ResponseEventsGet) -> X,
125 X: Future<Output = Result<()>> + Send + Sync + 'static,
126 {
127 let cfg = &CONFIG.listener;
128 let memory_usage = if cfg.max_memory_usage > 0 {
130 events.events.len() * 1024 } else {
132 0
133 };
134
135 if cfg.max_memory_usage > 0 && memory_usage > cfg.max_memory_usage {
137 debug!("Processing events in batches due to memory constraints");
138
139 let batches = events.events.len().div_ceil(cfg.max_events_per_batch);
141 for batch_idx in 0..batches {
142 let start_idx = batch_idx * cfg.max_events_per_batch;
143 let end_idx = std::cmp::min(
144 (batch_idx + 1) * cfg.max_events_per_batch,
145 events.events.len(),
146 );
147
148 debug!(
149 "Processing batch {}/{} (events {}-{})",
150 batch_idx + 1,
151 batches,
152 start_idx,
153 end_idx - 1
154 );
155
156 let batch_events = ResponseEventsGet {
158 events: events.events[start_idx..end_idx].into(),
159 };
160
161 let last_event_id = batch_events.events[batch_events.events.len() - 1].event_id;
163
164 self.set_last_event_id(last_event_id);
166 debug!("Updated last event ID: {}", last_event_id);
167
168 if let Err(e) = func(self.clone(), batch_events).await {
170 error!("Error processing events batch: {}", e);
171 return Err(e);
172 }
173
174 sleep(Duration::from_millis(10)).await;
176 }
177 } else {
178 let last_event_id = events.events[events.events.len() - 1].event_id;
181 self.set_last_event_id(last_event_id);
182 debug!("Updated last event ID: {}", last_event_id);
183
184 if let Err(e) = func(self.clone(), events).await {
186 error!("Error processing events: {}", e);
187 return Err(e);
188 }
189 }
190
191 Ok(())
192 }
193
194 pub async fn event_listener_parallel<F, X>(&self, func: F) -> Result<()>
197 where
198 F: Fn(Bot, ResponseEventsGet) -> X + Send + Sync + Clone + 'static,
199 X: Future<Output = Result<()>> + Send + 'static,
200 {
201 let cfg = &CONFIG.listener;
202 info!("Starting parallel event listener...");
203
204 let processor = ParallelEventProcessor::new(
206 cfg.max_events_per_batch.max(1), cfg.max_events_per_batch,
208 );
209
210 let mut backoff = AdaptiveBackoff::new(
211 Duration::from_millis(cfg.empty_backoff_ms),
212 Duration::from_millis(cfg.max_backoff_ms),
213 );
214
215 let (shutdown_tx, mut shutdown_rx) = tokio::sync::broadcast::channel::<()>(1);
220
221 let shutdown_tx_clone = shutdown_tx.clone();
223 tokio::spawn(async move {
224 crate::bot::net::shutdown_signal().await;
225 info!("Received stop signal, gracefully stopping parallel event listener...");
226 let _ = shutdown_tx_clone.send(());
227 });
228
229 'event_loop: loop {
230 if shutdown_rx.try_recv().is_ok() {
232 info!("Processing shutdown request");
233 break 'event_loop;
234 }
235
236 let start_time = Instant::now();
237
238 let req = RequestEventsGet::new(self.get_last_event_id()).with_poll_time(POLL_TIME);
240
241 let res = match self.send_api_request::<RequestEventsGet>(req).await {
243 Ok(res) => res,
244 Err(e) => {
245 error!("Error getting events: {}", e);
246
247 let delay = backoff.calculate_delay(0);
249 warn!("Error occurred, backing off for {:?}", delay);
250 sleep(delay).await;
251 continue;
252 }
253 };
254
255 if !res.events.is_empty() {
257 debug!("Received {} events", res.events.len());
258
259 let last_event_id = res.events[res.events.len() - 1].event_id;
261 self.set_last_event_id(last_event_id);
262 debug!("Updated last event ID: {}", last_event_id);
263
264 let processing_start = Instant::now();
266 match processor
267 .process_events_parallel(self.clone(), res, func.clone())
268 .await
269 {
270 Ok(_) => {
271 let processing_duration = processing_start.elapsed();
272 trace!("Parallel processing completed in {:?}", processing_duration);
273
274 backoff.calculate_delay(1);
276 }
277 Err(e) => {
278 error!("Error in parallel processing: {}", e);
279 return Err(e);
280 }
281 }
282 } else {
283 debug!("No events received, applying adaptive backoff");
284
285 let delay = backoff.calculate_delay(0);
287
288 let elapsed = start_time.elapsed();
290 if elapsed < delay {
291 let sleep_time = delay - elapsed;
292 trace!("Adaptive backoff: sleeping for {:?}", sleep_time);
293 sleep(sleep_time).await;
294 }
295 }
296 } info!("Parallel event listener stopped gracefully");
299 Ok(())
300 }
301}
302
303pub struct ParallelEventProcessor {
305 max_concurrent_batches: usize,
306 batch_size: usize,
307}
308
309impl ParallelEventProcessor {
310 pub fn new(max_concurrent_batches: usize, batch_size: usize) -> Self {
312 Self {
313 max_concurrent_batches,
314 batch_size,
315 }
316 }
317
318 pub async fn process_events_parallel<F, X>(
320 &self,
321 bot: Bot,
322 events: ResponseEventsGet,
323 processor: F,
324 ) -> Result<()>
325 where
326 F: Fn(Bot, ResponseEventsGet) -> X + Send + Sync + Clone + 'static,
327 X: Future<Output = Result<()>> + Send + 'static,
328 {
329 if events.events.is_empty() {
330 return Ok(());
331 }
332
333 let batches = self.create_batches(events);
334 let batch_count = batches.len();
335 let semaphore = Arc::new(Semaphore::new(self.max_concurrent_batches));
336
337 trace!("Processing {} batches in parallel", batch_count);
338
339 let futures: Vec<_> = batches
340 .into_iter()
341 .enumerate()
342 .map(|(batch_idx, batch)| {
343 let processor = processor.clone();
344 let bot = bot.clone();
345 let semaphore = semaphore.clone();
346
347 async move {
348 let _permit = semaphore.acquire().await.map_err(|e| {
349 BotError::System(format!("Failed to acquire semaphore: {e}"))
350 })?;
351
352 trace!(
353 "Processing batch {} with {} events",
354 batch_idx,
355 batch.events.len()
356 );
357
358 let start_time = Instant::now();
359 let result = processor(bot, batch).await;
360 let duration = start_time.elapsed();
361
362 if let Err(ref e) = result {
363 error!("Batch {} failed after {:?}: {}", batch_idx, duration, e);
364 } else {
365 trace!("Batch {} completed in {:?}", batch_idx, duration);
366 }
367
368 result
369 }
370 })
371 .collect();
372
373 use futures::future::join_all;
375 let results: Vec<Result<()>> = join_all(futures).await.into_iter().collect();
376
377 for (idx, result) in results.into_iter().enumerate() {
379 if let Err(e) = result {
380 return Err(BotError::System(format!(
381 "Batch {idx} processing failed: {e}"
382 )));
383 }
384 }
385
386 debug!("All {} batches processed successfully", batch_count);
387 Ok(())
388 }
389
390 fn create_batches(&self, events: ResponseEventsGet) -> Vec<ResponseEventsGet> {
392 events
393 .events
394 .chunks(self.batch_size)
395 .map(|chunk| ResponseEventsGet {
396 events: chunk.to_vec(),
397 })
398 .collect()
399 }
400}
401
402pub struct AdaptiveBackoff {
404 current_delay: Duration,
405 min_delay: Duration,
406 max_delay: Duration,
407 consecutive_empty_polls: u32,
408 last_activity: Option<Instant>,
409 empty_poll_threshold: u32,
410}
411
412impl AdaptiveBackoff {
413 pub fn new(min_delay: Duration, max_delay: Duration) -> Self {
415 Self {
416 current_delay: min_delay,
417 min_delay,
418 max_delay,
419 consecutive_empty_polls: 0,
420 last_activity: None,
421 empty_poll_threshold: 3, }
423 }
424
425 pub fn calculate_delay(&mut self, events_received: usize) -> Duration {
427 let now = Instant::now();
428
429 if events_received > 0 {
430 self.current_delay = self.min_delay;
432 self.consecutive_empty_polls = 0;
433 self.last_activity = Some(now);
434
435 trace!("Events received, reset delay to {:?}", self.current_delay);
436 } else {
437 self.consecutive_empty_polls += 1;
439
440 if self.consecutive_empty_polls > self.empty_poll_threshold {
442 self.current_delay = std::cmp::min(
443 Duration::from_millis(
444 (self.current_delay.as_millis() as u64 * 3 / 2)
445 .max(self.min_delay.as_millis() as u64),
446 ),
447 self.max_delay,
448 );
449
450 trace!(
451 "Empty poll #{}, increased delay to {:?}",
452 self.consecutive_empty_polls, self.current_delay
453 );
454 }
455
456 if let Some(last_activity) = self.last_activity {
458 let idle_time = now.duration_since(last_activity);
459 if idle_time > Duration::from_secs(60) {
460 self.current_delay = std::cmp::min(
461 self.current_delay + Duration::from_millis(100),
462 self.max_delay,
463 );
464 }
465 }
466 }
467
468 self.current_delay
469 }
470
471 pub fn current_delay(&self) -> Duration {
473 self.current_delay
474 }
475
476 pub fn reset(&mut self) {
478 self.current_delay = self.min_delay;
479 self.consecutive_empty_polls = 0;
480 self.last_activity = Some(Instant::now());
481 }
482}
483
484pub struct ZeroCopyEventStream {
486 events: std::collections::VecDeque<EventMessage>,
487 capacity: usize,
488}
489
490impl ZeroCopyEventStream {
491 pub fn new(capacity: usize) -> Self {
493 Self {
494 events: std::collections::VecDeque::with_capacity(capacity),
495 capacity,
496 }
497 }
498
499 pub fn push_events(&mut self, mut new_events: Vec<EventMessage>) {
501 if new_events.len() > self.capacity {
503 new_events.drain(..new_events.len() - self.capacity);
504 }
505
506 while self.events.len() + new_events.len() > self.capacity {
508 self.events.pop_front();
509 }
510
511 self.events.extend(new_events.drain(..));
513 }
514
515 pub fn drain_batch(&mut self, size: usize) -> Vec<EventMessage> {
517 self.events
518 .drain(..std::cmp::min(size, self.events.len()))
519 .collect()
520 }
521
522 pub fn len(&self) -> usize {
524 self.events.len()
525 }
526
527 pub fn is_empty(&self) -> bool {
529 self.events.is_empty()
530 }
531
532 pub fn peek_events(&self, count: usize) -> Vec<&EventMessage> {
534 self.events.iter().take(count).collect()
535 }
536}
537
538#[cfg(test)]
539mod tests {
540 use super::*;
541 use crate::api::events::get::ResponseEventsGet;
542 use crate::api::types::{EventId, EventMessage, EventType};
543 use crate::error::{BotError, Result};
544 use std::sync::Arc;
545 use std::sync::atomic::{AtomicUsize, Ordering};
546
547 #[derive(Clone, Default)]
549 pub struct DummyBot {
550 last_event_id: Arc<AtomicU32>,
551 set_last_event_calls: Arc<AtomicUsize>,
552 }
553 impl DummyBot {
554 fn new() -> Self {
555 Self {
556 last_event_id: Arc::new(AtomicU32::new(0)),
557 set_last_event_calls: Arc::new(AtomicUsize::new(0)),
558 }
559 }
560 fn set_last_event_id(&self, id: EventId) {
561 self.last_event_id.store(id, Ordering::SeqCst);
562 self.set_last_event_calls.fetch_add(1, Ordering::SeqCst);
563 }
564 }
565
566 fn make_events(n: usize) -> ResponseEventsGet {
567 ResponseEventsGet {
568 events: (0..n)
569 .map(|i| EventMessage {
570 event_id: i as EventId,
571 event_type: EventType::None,
572 })
573 .collect(),
574 }
575 }
576
577 #[tokio::test]
578 async fn test_process_event_batch_single_batch() {
579 let bot = DummyBot::new();
580 let events = make_events(3);
581 let call_count = Arc::new(AtomicUsize::new(0));
582 let call_count2 = call_count.clone();
583 let func = move |_bot: DummyBot, _ev: ResponseEventsGet| {
584 let call_count2 = call_count2.clone();
585 async move {
586 call_count2.fetch_add(1, Ordering::SeqCst);
587 Ok(())
588 }
589 };
590 let res = Bot::process_event_batch_test(&bot, events.clone(), &func, 10).await;
592 assert!(res.is_ok());
593 assert_eq!(call_count.load(Ordering::SeqCst), 1);
594 assert_eq!(bot.set_last_event_calls.load(Ordering::SeqCst), 1);
595 }
596
597 #[tokio::test]
598 async fn test_process_event_batch_multiple_batches() {
599 let bot = DummyBot::new();
600 let events = make_events(15);
601 let call_count = Arc::new(AtomicUsize::new(0));
602 let call_count2 = call_count.clone();
603 let func = move |_bot: DummyBot, _ev: ResponseEventsGet| {
604 let call_count2 = call_count2.clone();
605 async move {
606 call_count2.fetch_add(1, Ordering::SeqCst);
607 Ok(())
608 }
609 };
610 let res = Bot::process_event_batch_test(&bot, events.clone(), &func, 5).await;
612 assert!(res.is_ok());
613 assert_eq!(call_count.load(Ordering::SeqCst), 3);
614 assert_eq!(bot.set_last_event_calls.load(Ordering::SeqCst), 3);
615 }
616
617 #[tokio::test]
618 async fn test_process_event_batch_callback_error() {
619 let bot = DummyBot::new();
620 let events = make_events(2);
621 let func = |_bot: DummyBot, _ev: ResponseEventsGet| async move {
622 Err(BotError::System("fail".into()))
623 };
624 let res = Bot::process_event_batch_test(&bot, events, &func, 10).await;
625 assert!(res.is_err());
626 }
627
628 #[tokio::test]
629 async fn test_process_event_batch_empty_events() {
630 let bot = DummyBot::new();
632 let events = make_events(0);
633 let call_count = Arc::new(AtomicUsize::new(0));
634 let call_count2 = call_count.clone();
635 let func = move |_bot: DummyBot, _ev: ResponseEventsGet| {
636 let call_count2 = call_count2.clone();
637 async move {
638 call_count2.fetch_add(1, Ordering::SeqCst);
639 Ok(())
640 }
641 };
642 let res = Bot::process_event_batch_test(&bot, events, &func, 10).await;
643 assert!(res.is_ok());
644 assert_eq!(call_count.load(Ordering::SeqCst), 0);
645 }
646
647 #[tokio::test]
648 async fn test_process_event_batch_error_in_second_batch() {
649 let bot = DummyBot::new();
651 let events = make_events(6);
652 let call_count = Arc::new(AtomicUsize::new(0));
653 let call_count2 = call_count.clone();
654 let func = move |_bot: DummyBot, _ev: ResponseEventsGet| {
655 let call_count2 = call_count2.clone();
656 async move {
657 let n = call_count2.fetch_add(1, Ordering::SeqCst);
658 if n == 1 {
659 Err(BotError::System("fail".into()))
660 } else {
661 Ok(())
662 }
663 }
664 };
665 let res = Bot::process_event_batch_test(&bot, events, &func, 3).await;
667 assert!(res.is_err());
668 assert_eq!(call_count.load(Ordering::SeqCst), 2);
669 }
670
671 #[tokio::test]
672 async fn test_process_event_batch_empty_events_with_memory_limit() {
673 let bot = DummyBot::new();
675 let events = make_events(0);
676 let call_count = Arc::new(AtomicUsize::new(0));
677 let call_count2 = call_count.clone();
678 let func = move |_bot: DummyBot, _ev: ResponseEventsGet| {
679 let call_count2 = call_count2.clone();
680 async move {
681 call_count2.fetch_add(1, Ordering::SeqCst);
682 Ok(())
683 }
684 };
685 let res = Bot::process_event_batch_test(&bot, events, &func, 1).await;
687 assert!(res.is_ok());
688 assert_eq!(call_count.load(Ordering::SeqCst), 0);
689 }
690
691 impl Bot {
693 pub async fn process_event_batch_test<F, X>(
694 bot: &DummyBot,
695 events: ResponseEventsGet,
696 func: &F,
697 max_events_per_batch: usize,
698 ) -> Result<()>
699 where
700 F: Fn(DummyBot, ResponseEventsGet) -> X,
701 X: Future<Output = Result<()>> + Send + Sync + 'static,
702 {
703 let total = events.events.len();
705 if total == 0 {
706 return Ok(());
707 }
708 let batches = total.div_ceil(max_events_per_batch);
709 for batch_idx in 0..batches {
710 let start_idx = batch_idx * max_events_per_batch;
711 let end_idx = std::cmp::min((batch_idx + 1) * max_events_per_batch, total);
712 let batch_events = ResponseEventsGet {
713 events: events.events[start_idx..end_idx].to_vec(),
714 };
715 let last_event_id = batch_events.events[batch_events.events.len() - 1].event_id;
716 bot.set_last_event_id(last_event_id);
717 func(bot.clone(), batch_events).await?;
718 }
719 Ok(())
720 }
721 }
722
723 #[test]
725 fn test_adaptive_backoff_new() {
726 let min_delay = Duration::from_millis(100);
727 let max_delay = Duration::from_millis(5000);
728 let backoff = AdaptiveBackoff::new(min_delay, max_delay);
729
730 assert_eq!(backoff.current_delay(), min_delay);
731 }
732
733 #[test]
734 fn test_adaptive_backoff_calculate_delay_no_events() {
735 let min_delay = Duration::from_millis(100);
736 let max_delay = Duration::from_millis(5000);
737 let mut backoff = AdaptiveBackoff::new(min_delay, max_delay);
738
739 let calculated = backoff.calculate_delay(0);
740 assert!(calculated >= min_delay);
741 assert!(calculated <= max_delay);
742 }
743
744 #[test]
745 fn test_adaptive_backoff_calculate_delay_with_events() {
746 let min_delay = Duration::from_millis(100);
747 let max_delay = Duration::from_millis(5000);
748 let mut backoff = AdaptiveBackoff::new(min_delay, max_delay);
749
750 let calculated = backoff.calculate_delay(5);
751 assert_eq!(calculated, min_delay);
752 }
753
754 #[test]
755 fn test_adaptive_backoff_reset() {
756 let min_delay = Duration::from_millis(100);
757 let max_delay = Duration::from_millis(5000);
758 let mut backoff = AdaptiveBackoff::new(min_delay, max_delay);
759
760 backoff.calculate_delay(0);
762 let after_increase = backoff.current_delay();
763 assert!(after_increase >= min_delay);
764
765 backoff.reset();
767 assert_eq!(backoff.current_delay(), min_delay);
768 }
769
770 #[test]
771 fn test_adaptive_backoff_current_delay() {
772 let min_delay = Duration::from_millis(50);
773 let max_delay = Duration::from_millis(2000);
774 let backoff = AdaptiveBackoff::new(min_delay, max_delay);
775
776 assert_eq!(backoff.current_delay(), min_delay);
777 }
778
779 #[test]
781 fn test_zero_copy_event_stream_new() {
782 let stream = ZeroCopyEventStream::new(100);
783 assert_eq!(stream.len(), 0);
784 assert!(stream.is_empty());
785 }
786
787 #[test]
788 fn test_zero_copy_event_stream_push_events() {
789 let mut stream = ZeroCopyEventStream::new(10);
790 let events = make_events(3);
791
792 stream.push_events(events.events.clone());
793 assert_eq!(stream.len(), 3);
794 assert!(!stream.is_empty());
795 }
796
797 #[test]
798 fn test_zero_copy_event_stream_push_events_overflow() {
799 let mut stream = ZeroCopyEventStream::new(2);
800 let events = make_events(5);
801
802 stream.push_events(events.events.clone());
803 assert_eq!(stream.len(), 2); let remaining_events = stream.peek_events(2);
807 assert_eq!(remaining_events.len(), 2);
808 assert_eq!(remaining_events[0].event_id, 3); assert_eq!(remaining_events[1].event_id, 4);
810 }
811
812 #[test]
813 fn test_zero_copy_event_stream_drain_batch() {
814 let mut stream = ZeroCopyEventStream::new(10);
815 let events = make_events(5);
816
817 stream.push_events(events.events.clone());
818 let drained = stream.drain_batch(3);
819
820 assert_eq!(drained.len(), 3);
821 assert_eq!(stream.len(), 2); }
823
824 #[test]
825 fn test_zero_copy_event_stream_drain_batch_more_than_available() {
826 let mut stream = ZeroCopyEventStream::new(10);
827 let events = make_events(2);
828
829 stream.push_events(events.events.clone());
830 let drained = stream.drain_batch(5);
831
832 assert_eq!(drained.len(), 2); assert_eq!(stream.len(), 0);
834 assert!(stream.is_empty());
835 }
836
837 #[test]
838 fn test_zero_copy_event_stream_peek_events() {
839 let mut stream = ZeroCopyEventStream::new(10);
840 let events = make_events(5);
841
842 stream.push_events(events.events.clone());
843 let peeked = stream.peek_events(3);
844
845 assert_eq!(peeked.len(), 3);
846 assert_eq!(stream.len(), 5); for (i, event_ref) in peeked.iter().enumerate() {
850 assert_eq!(event_ref.event_id, events.events[i].event_id);
851 }
852 }
853
854 #[test]
855 fn test_zero_copy_event_stream_peek_events_more_than_available() {
856 let mut stream = ZeroCopyEventStream::new(10);
857 let events = make_events(2);
858
859 stream.push_events(events.events.clone());
860 let peeked = stream.peek_events(5);
861
862 assert_eq!(peeked.len(), 2); assert_eq!(stream.len(), 2); }
865
866 #[test]
868 fn test_parallel_event_processor_new() {
869 let _processor = ParallelEventProcessor::new(5, 10);
870 }
872}