EventBatcher

Struct EventBatcher 

Source
pub struct EventBatcher<T> { /* private fields */ }
Expand description

Event batcher with time-based debouncing Accumulates events and flushes them periodically

Implementations§

Source§

impl<T> EventBatcher<T>

Source

pub fn new(window_ms: u64) -> Self

Create a new batcher with specified time window (milliseconds)

Examples found in repository?
examples/04_reasoning_streaming.rs (line 19)
7async fn main() -> Result<()> {
8    let api_key = std::env::var("OPENAI_API_KEY")?;
9    let client = OpenAIClient::new(api_key)?;
10
11    let request = ResponseRequest::new(
12        "gpt-5",
13        vec![Message::human("Solve this problem: If a train travels 120 km in 2 hours, what is its average speed? And after that make a dynamic programming problem that can be solved in O(n) time and space complexity.")]
14    ).with_reasoning(ReasoningConfig::medium());
15
16    println!("Streaming response with reasoning (batched for network efficiency):\n");
17    
18    let mut stream = client.reason_stream(request).await?;
19    let mut batcher = EventBatcher::new(50); // 50ms batching window
20    let mut reasoning_displayed = false;
21    let start_time = Instant::now();
22    let mut total_events = 0;
23    let mut total_batches = 0;
24
25    loop {
26        tokio::select! {
27            // Receive events from stream
28            event_result = stream.next() => {
29                match event_result {
30                    Some(Ok(event)) => {
31                        total_events += 1;
32                        batcher.push(event);
33                    }
34                    Some(Err(e)) => {
35                        eprintln!("Stream error: {}", e);
36                        break;
37                    }
38                    None => {
39                        // Stream ended, flush remaining events
40                        if !batcher.is_empty() {
41                            let batch = batcher.take();
42                            total_batches += 1;
43                            display_batch(&batch, &mut reasoning_displayed)?;
44                        }
45                        break;
46                    }
47                }
48            }
49            
50            // Flush batch when timer expires
51            _ = batcher.ticker().tick() => {
52                if !batcher.is_empty() {
53                    let batch = batcher.take();
54                    total_batches += 1;
55                    display_batch(&batch, &mut reasoning_displayed)?;
56                }
57            }
58        }
59    }
60
61    let elapsed = start_time.elapsed();
62    println!("\n\nDone.");
63    println!("Stats: {} events in {} batches ({}% reduction in network calls)", 
64        total_events, 
65        total_batches,
66        if total_events > 0 { 100 - (total_batches * 100 / total_events) } else { 0 }
67    );
68    println!("Time elapsed: {:.2}s", elapsed.as_secs_f64());
69
70    Ok(())
71}
Source

pub fn push(&mut self, event: T)

Add an event to the current batch

Examples found in repository?
examples/04_reasoning_streaming.rs (line 32)
7async fn main() -> Result<()> {
8    let api_key = std::env::var("OPENAI_API_KEY")?;
9    let client = OpenAIClient::new(api_key)?;
10
11    let request = ResponseRequest::new(
12        "gpt-5",
13        vec![Message::human("Solve this problem: If a train travels 120 km in 2 hours, what is its average speed? And after that make a dynamic programming problem that can be solved in O(n) time and space complexity.")]
14    ).with_reasoning(ReasoningConfig::medium());
15
16    println!("Streaming response with reasoning (batched for network efficiency):\n");
17    
18    let mut stream = client.reason_stream(request).await?;
19    let mut batcher = EventBatcher::new(50); // 50ms batching window
20    let mut reasoning_displayed = false;
21    let start_time = Instant::now();
22    let mut total_events = 0;
23    let mut total_batches = 0;
24
25    loop {
26        tokio::select! {
27            // Receive events from stream
28            event_result = stream.next() => {
29                match event_result {
30                    Some(Ok(event)) => {
31                        total_events += 1;
32                        batcher.push(event);
33                    }
34                    Some(Err(e)) => {
35                        eprintln!("Stream error: {}", e);
36                        break;
37                    }
38                    None => {
39                        // Stream ended, flush remaining events
40                        if !batcher.is_empty() {
41                            let batch = batcher.take();
42                            total_batches += 1;
43                            display_batch(&batch, &mut reasoning_displayed)?;
44                        }
45                        break;
46                    }
47                }
48            }
49            
50            // Flush batch when timer expires
51            _ = batcher.ticker().tick() => {
52                if !batcher.is_empty() {
53                    let batch = batcher.take();
54                    total_batches += 1;
55                    display_batch(&batch, &mut reasoning_displayed)?;
56                }
57            }
58        }
59    }
60
61    let elapsed = start_time.elapsed();
62    println!("\n\nDone.");
63    println!("Stats: {} events in {} batches ({}% reduction in network calls)", 
64        total_events, 
65        total_batches,
66        if total_events > 0 { 100 - (total_batches * 100 / total_events) } else { 0 }
67    );
68    println!("Time elapsed: {:.2}s", elapsed.as_secs_f64());
69
70    Ok(())
71}
Source

pub fn should_flush_now(&self) -> bool

Check if it’s time to flush (non-blocking) Note: This is a simplified check. For production use, integrate with tokio::select!

Source

pub fn take(&mut self) -> Vec<T>

Take the current batch, leaving an empty one

Examples found in repository?
examples/04_reasoning_streaming.rs (line 41)
7async fn main() -> Result<()> {
8    let api_key = std::env::var("OPENAI_API_KEY")?;
9    let client = OpenAIClient::new(api_key)?;
10
11    let request = ResponseRequest::new(
12        "gpt-5",
13        vec![Message::human("Solve this problem: If a train travels 120 km in 2 hours, what is its average speed? And after that make a dynamic programming problem that can be solved in O(n) time and space complexity.")]
14    ).with_reasoning(ReasoningConfig::medium());
15
16    println!("Streaming response with reasoning (batched for network efficiency):\n");
17    
18    let mut stream = client.reason_stream(request).await?;
19    let mut batcher = EventBatcher::new(50); // 50ms batching window
20    let mut reasoning_displayed = false;
21    let start_time = Instant::now();
22    let mut total_events = 0;
23    let mut total_batches = 0;
24
25    loop {
26        tokio::select! {
27            // Receive events from stream
28            event_result = stream.next() => {
29                match event_result {
30                    Some(Ok(event)) => {
31                        total_events += 1;
32                        batcher.push(event);
33                    }
34                    Some(Err(e)) => {
35                        eprintln!("Stream error: {}", e);
36                        break;
37                    }
38                    None => {
39                        // Stream ended, flush remaining events
40                        if !batcher.is_empty() {
41                            let batch = batcher.take();
42                            total_batches += 1;
43                            display_batch(&batch, &mut reasoning_displayed)?;
44                        }
45                        break;
46                    }
47                }
48            }
49            
50            // Flush batch when timer expires
51            _ = batcher.ticker().tick() => {
52                if !batcher.is_empty() {
53                    let batch = batcher.take();
54                    total_batches += 1;
55                    display_batch(&batch, &mut reasoning_displayed)?;
56                }
57            }
58        }
59    }
60
61    let elapsed = start_time.elapsed();
62    println!("\n\nDone.");
63    println!("Stats: {} events in {} batches ({}% reduction in network calls)", 
64        total_events, 
65        total_batches,
66        if total_events > 0 { 100 - (total_batches * 100 / total_events) } else { 0 }
67    );
68    println!("Time elapsed: {:.2}s", elapsed.as_secs_f64());
69
70    Ok(())
71}
Source

pub fn len(&self) -> usize

Current batch size

Source

pub fn is_empty(&self) -> bool

Check if batch is empty

Examples found in repository?
examples/04_reasoning_streaming.rs (line 40)
7async fn main() -> Result<()> {
8    let api_key = std::env::var("OPENAI_API_KEY")?;
9    let client = OpenAIClient::new(api_key)?;
10
11    let request = ResponseRequest::new(
12        "gpt-5",
13        vec![Message::human("Solve this problem: If a train travels 120 km in 2 hours, what is its average speed? And after that make a dynamic programming problem that can be solved in O(n) time and space complexity.")]
14    ).with_reasoning(ReasoningConfig::medium());
15
16    println!("Streaming response with reasoning (batched for network efficiency):\n");
17    
18    let mut stream = client.reason_stream(request).await?;
19    let mut batcher = EventBatcher::new(50); // 50ms batching window
20    let mut reasoning_displayed = false;
21    let start_time = Instant::now();
22    let mut total_events = 0;
23    let mut total_batches = 0;
24
25    loop {
26        tokio::select! {
27            // Receive events from stream
28            event_result = stream.next() => {
29                match event_result {
30                    Some(Ok(event)) => {
31                        total_events += 1;
32                        batcher.push(event);
33                    }
34                    Some(Err(e)) => {
35                        eprintln!("Stream error: {}", e);
36                        break;
37                    }
38                    None => {
39                        // Stream ended, flush remaining events
40                        if !batcher.is_empty() {
41                            let batch = batcher.take();
42                            total_batches += 1;
43                            display_batch(&batch, &mut reasoning_displayed)?;
44                        }
45                        break;
46                    }
47                }
48            }
49            
50            // Flush batch when timer expires
51            _ = batcher.ticker().tick() => {
52                if !batcher.is_empty() {
53                    let batch = batcher.take();
54                    total_batches += 1;
55                    display_batch(&batch, &mut reasoning_displayed)?;
56                }
57            }
58        }
59    }
60
61    let elapsed = start_time.elapsed();
62    println!("\n\nDone.");
63    println!("Stats: {} events in {} batches ({}% reduction in network calls)", 
64        total_events, 
65        total_batches,
66        if total_events > 0 { 100 - (total_batches * 100 / total_events) } else { 0 }
67    );
68    println!("Time elapsed: {:.2}s", elapsed.as_secs_f64());
69
70    Ok(())
71}
Source

pub fn ticker(&mut self) -> &mut Interval

Get reference to ticker for use in tokio::select!

Examples found in repository?
examples/04_reasoning_streaming.rs (line 51)
7async fn main() -> Result<()> {
8    let api_key = std::env::var("OPENAI_API_KEY")?;
9    let client = OpenAIClient::new(api_key)?;
10
11    let request = ResponseRequest::new(
12        "gpt-5",
13        vec![Message::human("Solve this problem: If a train travels 120 km in 2 hours, what is its average speed? And after that make a dynamic programming problem that can be solved in O(n) time and space complexity.")]
14    ).with_reasoning(ReasoningConfig::medium());
15
16    println!("Streaming response with reasoning (batched for network efficiency):\n");
17    
18    let mut stream = client.reason_stream(request).await?;
19    let mut batcher = EventBatcher::new(50); // 50ms batching window
20    let mut reasoning_displayed = false;
21    let start_time = Instant::now();
22    let mut total_events = 0;
23    let mut total_batches = 0;
24
25    loop {
26        tokio::select! {
27            // Receive events from stream
28            event_result = stream.next() => {
29                match event_result {
30                    Some(Ok(event)) => {
31                        total_events += 1;
32                        batcher.push(event);
33                    }
34                    Some(Err(e)) => {
35                        eprintln!("Stream error: {}", e);
36                        break;
37                    }
38                    None => {
39                        // Stream ended, flush remaining events
40                        if !batcher.is_empty() {
41                            let batch = batcher.take();
42                            total_batches += 1;
43                            display_batch(&batch, &mut reasoning_displayed)?;
44                        }
45                        break;
46                    }
47                }
48            }
49            
50            // Flush batch when timer expires
51            _ = batcher.ticker().tick() => {
52                if !batcher.is_empty() {
53                    let batch = batcher.take();
54                    total_batches += 1;
55                    display_batch(&batch, &mut reasoning_displayed)?;
56                }
57            }
58        }
59    }
60
61    let elapsed = start_time.elapsed();
62    println!("\n\nDone.");
63    println!("Stats: {} events in {} batches ({}% reduction in network calls)", 
64        total_events, 
65        total_batches,
66        if total_events > 0 { 100 - (total_batches * 100 / total_events) } else { 0 }
67    );
68    println!("Time elapsed: {:.2}s", elapsed.as_secs_f64());
69
70    Ok(())
71}
Source

pub fn window_ms(&self) -> u64

Get window duration

Auto Trait Implementations§

§

impl<T> Freeze for EventBatcher<T>

§

impl<T> !RefUnwindSafe for EventBatcher<T>

§

impl<T> Send for EventBatcher<T>
where T: Send,

§

impl<T> Sync for EventBatcher<T>
where T: Sync,

§

impl<T> Unpin for EventBatcher<T>
where T: Unpin,

§

impl<T> !UnwindSafe for EventBatcher<T>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more