pub struct EventBatcher<T> { /* private fields */ }Expand description
Event batcher with time-based debouncing Accumulates events and flushes them periodically
Implementations§
Source§impl<T> EventBatcher<T>
impl<T> EventBatcher<T>
Sourcepub fn new(window_ms: u64) -> Self
pub fn new(window_ms: u64) -> Self
Create a new batcher with specified time window (milliseconds)
Examples found in repository?
examples/04_reasoning_streaming.rs (line 19)
7async fn main() -> Result<()> {
8 let api_key = std::env::var("OPENAI_API_KEY")?;
9 let client = OpenAIClient::new(api_key)?;
10
11 let request = ResponseRequest::new(
12 "gpt-5",
13 vec![Message::human("Solve this problem: If a train travels 120 km in 2 hours, what is its average speed? And after that make a dynamic programming problem that can be solved in O(n) time and space complexity.")]
14 ).with_reasoning(ReasoningConfig::medium());
15
16 println!("Streaming response with reasoning (batched for network efficiency):\n");
17
18 let mut stream = client.reason_stream(request).await?;
19 let mut batcher = EventBatcher::new(50); // 50ms batching window
20 let mut reasoning_displayed = false;
21 let start_time = Instant::now();
22 let mut total_events = 0;
23 let mut total_batches = 0;
24
25 loop {
26 tokio::select! {
27 // Receive events from stream
28 event_result = stream.next() => {
29 match event_result {
30 Some(Ok(event)) => {
31 total_events += 1;
32 batcher.push(event);
33 }
34 Some(Err(e)) => {
35 eprintln!("Stream error: {}", e);
36 break;
37 }
38 None => {
39 // Stream ended, flush remaining events
40 if !batcher.is_empty() {
41 let batch = batcher.take();
42 total_batches += 1;
43 display_batch(&batch, &mut reasoning_displayed)?;
44 }
45 break;
46 }
47 }
48 }
49
50 // Flush batch when timer expires
51 _ = batcher.ticker().tick() => {
52 if !batcher.is_empty() {
53 let batch = batcher.take();
54 total_batches += 1;
55 display_batch(&batch, &mut reasoning_displayed)?;
56 }
57 }
58 }
59 }
60
61 let elapsed = start_time.elapsed();
62 println!("\n\nDone.");
63 println!("Stats: {} events in {} batches ({}% reduction in network calls)",
64 total_events,
65 total_batches,
66 if total_events > 0 { 100 - (total_batches * 100 / total_events) } else { 0 }
67 );
68 println!("Time elapsed: {:.2}s", elapsed.as_secs_f64());
69
70 Ok(())
71}Sourcepub fn push(&mut self, event: T)
pub fn push(&mut self, event: T)
Add an event to the current batch
Examples found in repository?
examples/04_reasoning_streaming.rs (line 32)
7async fn main() -> Result<()> {
8 let api_key = std::env::var("OPENAI_API_KEY")?;
9 let client = OpenAIClient::new(api_key)?;
10
11 let request = ResponseRequest::new(
12 "gpt-5",
13 vec![Message::human("Solve this problem: If a train travels 120 km in 2 hours, what is its average speed? And after that make a dynamic programming problem that can be solved in O(n) time and space complexity.")]
14 ).with_reasoning(ReasoningConfig::medium());
15
16 println!("Streaming response with reasoning (batched for network efficiency):\n");
17
18 let mut stream = client.reason_stream(request).await?;
19 let mut batcher = EventBatcher::new(50); // 50ms batching window
20 let mut reasoning_displayed = false;
21 let start_time = Instant::now();
22 let mut total_events = 0;
23 let mut total_batches = 0;
24
25 loop {
26 tokio::select! {
27 // Receive events from stream
28 event_result = stream.next() => {
29 match event_result {
30 Some(Ok(event)) => {
31 total_events += 1;
32 batcher.push(event);
33 }
34 Some(Err(e)) => {
35 eprintln!("Stream error: {}", e);
36 break;
37 }
38 None => {
39 // Stream ended, flush remaining events
40 if !batcher.is_empty() {
41 let batch = batcher.take();
42 total_batches += 1;
43 display_batch(&batch, &mut reasoning_displayed)?;
44 }
45 break;
46 }
47 }
48 }
49
50 // Flush batch when timer expires
51 _ = batcher.ticker().tick() => {
52 if !batcher.is_empty() {
53 let batch = batcher.take();
54 total_batches += 1;
55 display_batch(&batch, &mut reasoning_displayed)?;
56 }
57 }
58 }
59 }
60
61 let elapsed = start_time.elapsed();
62 println!("\n\nDone.");
63 println!("Stats: {} events in {} batches ({}% reduction in network calls)",
64 total_events,
65 total_batches,
66 if total_events > 0 { 100 - (total_batches * 100 / total_events) } else { 0 }
67 );
68 println!("Time elapsed: {:.2}s", elapsed.as_secs_f64());
69
70 Ok(())
71}Sourcepub fn should_flush_now(&self) -> bool
pub fn should_flush_now(&self) -> bool
Check if it’s time to flush (non-blocking) Note: This is a simplified check. For production use, integrate with tokio::select!
Sourcepub fn take(&mut self) -> Vec<T>
pub fn take(&mut self) -> Vec<T>
Take the current batch, leaving an empty one
Examples found in repository?
examples/04_reasoning_streaming.rs (line 41)
7async fn main() -> Result<()> {
8 let api_key = std::env::var("OPENAI_API_KEY")?;
9 let client = OpenAIClient::new(api_key)?;
10
11 let request = ResponseRequest::new(
12 "gpt-5",
13 vec![Message::human("Solve this problem: If a train travels 120 km in 2 hours, what is its average speed? And after that make a dynamic programming problem that can be solved in O(n) time and space complexity.")]
14 ).with_reasoning(ReasoningConfig::medium());
15
16 println!("Streaming response with reasoning (batched for network efficiency):\n");
17
18 let mut stream = client.reason_stream(request).await?;
19 let mut batcher = EventBatcher::new(50); // 50ms batching window
20 let mut reasoning_displayed = false;
21 let start_time = Instant::now();
22 let mut total_events = 0;
23 let mut total_batches = 0;
24
25 loop {
26 tokio::select! {
27 // Receive events from stream
28 event_result = stream.next() => {
29 match event_result {
30 Some(Ok(event)) => {
31 total_events += 1;
32 batcher.push(event);
33 }
34 Some(Err(e)) => {
35 eprintln!("Stream error: {}", e);
36 break;
37 }
38 None => {
39 // Stream ended, flush remaining events
40 if !batcher.is_empty() {
41 let batch = batcher.take();
42 total_batches += 1;
43 display_batch(&batch, &mut reasoning_displayed)?;
44 }
45 break;
46 }
47 }
48 }
49
50 // Flush batch when timer expires
51 _ = batcher.ticker().tick() => {
52 if !batcher.is_empty() {
53 let batch = batcher.take();
54 total_batches += 1;
55 display_batch(&batch, &mut reasoning_displayed)?;
56 }
57 }
58 }
59 }
60
61 let elapsed = start_time.elapsed();
62 println!("\n\nDone.");
63 println!("Stats: {} events in {} batches ({}% reduction in network calls)",
64 total_events,
65 total_batches,
66 if total_events > 0 { 100 - (total_batches * 100 / total_events) } else { 0 }
67 );
68 println!("Time elapsed: {:.2}s", elapsed.as_secs_f64());
69
70 Ok(())
71}Sourcepub fn is_empty(&self) -> bool
pub fn is_empty(&self) -> bool
Check if batch is empty
Examples found in repository?
examples/04_reasoning_streaming.rs (line 40)
7async fn main() -> Result<()> {
8 let api_key = std::env::var("OPENAI_API_KEY")?;
9 let client = OpenAIClient::new(api_key)?;
10
11 let request = ResponseRequest::new(
12 "gpt-5",
13 vec![Message::human("Solve this problem: If a train travels 120 km in 2 hours, what is its average speed? And after that make a dynamic programming problem that can be solved in O(n) time and space complexity.")]
14 ).with_reasoning(ReasoningConfig::medium());
15
16 println!("Streaming response with reasoning (batched for network efficiency):\n");
17
18 let mut stream = client.reason_stream(request).await?;
19 let mut batcher = EventBatcher::new(50); // 50ms batching window
20 let mut reasoning_displayed = false;
21 let start_time = Instant::now();
22 let mut total_events = 0;
23 let mut total_batches = 0;
24
25 loop {
26 tokio::select! {
27 // Receive events from stream
28 event_result = stream.next() => {
29 match event_result {
30 Some(Ok(event)) => {
31 total_events += 1;
32 batcher.push(event);
33 }
34 Some(Err(e)) => {
35 eprintln!("Stream error: {}", e);
36 break;
37 }
38 None => {
39 // Stream ended, flush remaining events
40 if !batcher.is_empty() {
41 let batch = batcher.take();
42 total_batches += 1;
43 display_batch(&batch, &mut reasoning_displayed)?;
44 }
45 break;
46 }
47 }
48 }
49
50 // Flush batch when timer expires
51 _ = batcher.ticker().tick() => {
52 if !batcher.is_empty() {
53 let batch = batcher.take();
54 total_batches += 1;
55 display_batch(&batch, &mut reasoning_displayed)?;
56 }
57 }
58 }
59 }
60
61 let elapsed = start_time.elapsed();
62 println!("\n\nDone.");
63 println!("Stats: {} events in {} batches ({}% reduction in network calls)",
64 total_events,
65 total_batches,
66 if total_events > 0 { 100 - (total_batches * 100 / total_events) } else { 0 }
67 );
68 println!("Time elapsed: {:.2}s", elapsed.as_secs_f64());
69
70 Ok(())
71}Sourcepub fn ticker(&mut self) -> &mut Interval
pub fn ticker(&mut self) -> &mut Interval
Get reference to ticker for use in tokio::select!
Examples found in repository?
examples/04_reasoning_streaming.rs (line 51)
7async fn main() -> Result<()> {
8 let api_key = std::env::var("OPENAI_API_KEY")?;
9 let client = OpenAIClient::new(api_key)?;
10
11 let request = ResponseRequest::new(
12 "gpt-5",
13 vec![Message::human("Solve this problem: If a train travels 120 km in 2 hours, what is its average speed? And after that make a dynamic programming problem that can be solved in O(n) time and space complexity.")]
14 ).with_reasoning(ReasoningConfig::medium());
15
16 println!("Streaming response with reasoning (batched for network efficiency):\n");
17
18 let mut stream = client.reason_stream(request).await?;
19 let mut batcher = EventBatcher::new(50); // 50ms batching window
20 let mut reasoning_displayed = false;
21 let start_time = Instant::now();
22 let mut total_events = 0;
23 let mut total_batches = 0;
24
25 loop {
26 tokio::select! {
27 // Receive events from stream
28 event_result = stream.next() => {
29 match event_result {
30 Some(Ok(event)) => {
31 total_events += 1;
32 batcher.push(event);
33 }
34 Some(Err(e)) => {
35 eprintln!("Stream error: {}", e);
36 break;
37 }
38 None => {
39 // Stream ended, flush remaining events
40 if !batcher.is_empty() {
41 let batch = batcher.take();
42 total_batches += 1;
43 display_batch(&batch, &mut reasoning_displayed)?;
44 }
45 break;
46 }
47 }
48 }
49
50 // Flush batch when timer expires
51 _ = batcher.ticker().tick() => {
52 if !batcher.is_empty() {
53 let batch = batcher.take();
54 total_batches += 1;
55 display_batch(&batch, &mut reasoning_displayed)?;
56 }
57 }
58 }
59 }
60
61 let elapsed = start_time.elapsed();
62 println!("\n\nDone.");
63 println!("Stats: {} events in {} batches ({}% reduction in network calls)",
64 total_events,
65 total_batches,
66 if total_events > 0 { 100 - (total_batches * 100 / total_events) } else { 0 }
67 );
68 println!("Time elapsed: {:.2}s", elapsed.as_secs_f64());
69
70 Ok(())
71}Auto Trait Implementations§
impl<T> Freeze for EventBatcher<T>
impl<T> !RefUnwindSafe for EventBatcher<T>
impl<T> Send for EventBatcher<T>where
T: Send,
impl<T> Sync for EventBatcher<T>where
T: Sync,
impl<T> Unpin for EventBatcher<T>where
T: Unpin,
impl<T> !UnwindSafe for EventBatcher<T>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more