api_gemini 0.5.0

Gemini's API for accessing large language models (LLMs).
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
//! Streaming control for fine-grained management of streaming operations.
//!
//! This module provides explicit streaming control following the "Thin Client, Rich API" principle.
//! All streaming control operations are explicit and user-triggered, not automatic behaviors.
//!
//! ## Performance Optimizations
//!
//! This implementation includes several optimizations for production use:
//! - Atomic operations for simple state management to reduce lock contention
//! - Event-driven pause timeout handling instead of polling
//! - Efficient circular buffer for paused data management
//! - Lock-free metrics updates where possible
//! - Optimized resource cleanup with structured cancellation

mod buffer;
mod operations;

use serde::{ Deserialize, Serialize };
use core::time::Duration;
use core::sync::atomic::{ AtomicU64, AtomicUsize, Ordering };

// Re-export buffer types
pub use buffer::BufferStrategy;

// Re-export operation types
pub use operations::
{
  ControllableStream,
  ControllableStreamBuilder,
  StreamingControlApi,
  StreamControlStreamBuilder,
};

#[ cfg( all( feature = "websocket_streaming", feature = "streaming_control" ) ) ]
pub use operations::ControllableWebSocketStream;

/// State of a controllable stream
#[ derive( Debug, Clone, PartialEq, Eq, Serialize, Deserialize ) ]
pub enum StreamState
{
  /// Stream is actively running
  Running,
  /// Stream is paused
  Paused,
  /// Stream has been cancelled
  Cancelled,
  /// Stream completed normally
  Completed,
  /// Stream timed out during pause
  TimedOut,
  /// Stream encountered an error
  Error,
}

impl StreamState
{
  /// Convert StreamState to u8 for atomic operations
  #[ inline ]
  pub( crate ) fn to_u8( &self ) -> u8
  {
    match self
    {
      Self::Running => 0,
      Self::Paused => 1,
      Self::Cancelled => 2,
      Self::Completed => 3,
      Self::TimedOut => 4,
      Self::Error => 5,
    }
  }

  /// Convert u8 back to StreamState from atomic operations
  #[ inline ]
  pub( crate ) fn from_u8( value : u8 ) -> Self
  {
    match value
    {
      0 => Self::Running,
      1 => Self::Paused,
      2 => Self::Cancelled,
      3 => Self::Completed,
      4 => Self::TimedOut,
      5 => Self::Error,
      _ => Self::Error, // Default to error for invalid values
    }
  }
}

/// Level of metrics collection (affects performance)
#[ derive( Debug, Clone, PartialEq, Eq ) ]
pub enum MetricsLevel
{
  /// No metrics collection (fastest)
  None,
  /// Basic metrics only (balanced)
  Basic,
  /// Full metrics collection (most detailed)
  Detailed,
}

/// Configuration for streaming control behavior with performance tuning options
#[ derive( Debug, Clone ) ]
pub struct StreamControlConfig
{
  /// Maximum buffer size for paused streams (in bytes)
  pub buffer_size : usize,
  /// Timeout for paused streams before auto-cancellation
  pub pause_timeout : Duration,
  /// Whether to automatically cleanup resources on completion
  pub auto_cleanup : bool,
  /// Maximum number of chunks to buffer during pause
  pub max_buffered_chunks : usize,
  /// Control operation timeout (how long to wait for control commands to be processed)
  pub control_operation_timeout : Duration,
  /// Buffer management strategy for better memory usage
  pub buffer_strategy : BufferStrategy,
  /// Metrics collection level (affects performance vs observability trade-off)
  pub metrics_level : MetricsLevel,
  /// Whether to use event-driven timeout handling (more efficient)
  pub event_driven_timeouts : bool,
}

impl Default for StreamControlConfig
{
  #[ inline ]
  fn default() -> Self
  {
    Self {
      buffer_size : 1024 * 1024, // 1MB default buffer
      pause_timeout : Duration::from_secs( 300 ), // 5 minutes default
      auto_cleanup : true,
      max_buffered_chunks : 100,
      control_operation_timeout : Duration::from_millis( 100 ), // Fast control response
      buffer_strategy : BufferStrategy::Circular, // More memory efficient
      metrics_level : MetricsLevel::Basic, // Balanced performance/observability
      event_driven_timeouts : true, // More efficient timeout handling
    }
  }
}

/// Builder for creating streaming control configuration
#[ derive( Debug, Clone ) ]
pub struct StreamControlConfigBuilder
{
  config : StreamControlConfig,
}

impl StreamControlConfigBuilder
{
  /// Create a new configuration builder
  #[ inline ]
  #[ must_use ]
  pub fn new() -> Self
  {
    Self {
      config : StreamControlConfig::default(),
    }
  }

  /// Set the buffer size for paused streams
  #[ inline ]
  #[ must_use ]
  pub fn buffer_size( mut self, size : usize ) -> Self
  {
    self.config.buffer_size = size;
    self
  }

  /// Set the pause timeout duration
  #[ inline ]
  #[ must_use ]
  pub fn pause_timeout( mut self, timeout : Duration ) -> Self
  {
    self.config.pause_timeout = timeout;
    self
  }

  /// Enable or disable automatic cleanup
  #[ inline ]
  #[ must_use ]
  pub fn auto_cleanup( mut self, enable : bool ) -> Self
  {
    self.config.auto_cleanup = enable;
    self
  }

  /// Set maximum number of chunks to buffer
  #[ inline ]
  #[ must_use ]
  pub fn max_buffered_chunks( mut self, count : usize ) -> Self
  {
    self.config.max_buffered_chunks = count;
    self
  }

  /// Set control operation timeout
  #[ inline ]
  #[ must_use ]
  pub fn control_operation_timeout( mut self, timeout : Duration ) -> Self
  {
    self.config.control_operation_timeout = timeout;
    self
  }

  /// Set buffer management strategy
  #[ inline ]
  #[ must_use ]
  pub fn buffer_strategy( mut self, strategy : BufferStrategy ) -> Self
  {
    self.config.buffer_strategy = strategy;
    self
  }

  /// Set metrics collection level
  #[ inline ]
  #[ must_use ]
  pub fn metrics_level( mut self, level : MetricsLevel ) -> Self
  {
    self.config.metrics_level = level;
    self
  }

  /// Enable or disable event-driven timeout handling
  #[ inline ]
  #[ must_use ]
  pub fn event_driven_timeouts( mut self, enable : bool ) -> Self
  {
    self.config.event_driven_timeouts = enable;
    self
  }

  /// Build the configuration with validation
  ///
  /// # Errors
  ///
  /// Returns `Error` if:
  /// - Buffer size is 0
  /// - Pause timeout is 0
  /// - Max buffered chunks is 0
  #[ inline ]
  pub fn build( self ) -> Result< StreamControlConfig, crate::error::Error >
  {
    if self.config.buffer_size == 0
    {
      return Err( crate::error::Error::ConfigurationError(
        "Buffer size must be greater than 0".to_string()
      ) );
    }

    if self.config.pause_timeout.is_zero()
    {
      return Err( crate::error::Error::ConfigurationError(
        "Pause timeout must be greater than 0".to_string()
      ) );
    }

    if self.config.max_buffered_chunks == 0
    {
      return Err( crate::error::Error::ConfigurationError(
        "Max buffered chunks must be greater than 0".to_string()
      ) );
    }

    if self.config.control_operation_timeout.is_zero()
    {
      return Err( crate::error::Error::ConfigurationError(
        "Control operation timeout must be greater than 0".to_string()
      ) );
    }

    // Validate chunked buffer strategy has reasonable chunk size
    if let BufferStrategy::Chunked { chunk_size } = self.config.buffer_strategy
    {
      if chunk_size == 0
      {
        return Err( crate::error::Error::ConfigurationError(
          "Chunked buffer strategy chunk size must be greater than 0".to_string()
        ) );
      }
      if chunk_size > self.config.buffer_size
      {
        return Err( crate::error::Error::ConfigurationError(
          "Chunked buffer strategy chunk size cannot exceed total buffer size".to_string()
        ) );
      }
    }

    Ok( self.config )
  }
}

impl StreamControlConfig
{
  /// Create a new configuration builder
  #[ inline ]
  #[ must_use ]
  pub fn builder() -> StreamControlConfigBuilder
  {
    StreamControlConfigBuilder::new()
  }
}

/// Metrics for streaming operations with atomic updates for better performance
#[ derive( Debug ) ]
pub struct StreamMetrics
{
  /// Total number of chunks received
  pub total_chunks : AtomicU64,
  /// Current buffer size in bytes
  pub buffer_size : AtomicUsize,
  /// Total bytes received
  pub bytes_received : AtomicU64,
  /// Number of times stream was paused
  pub pause_count : AtomicU64,
  /// Number of times stream was resumed
  pub resume_count : AtomicU64,
  /// Total number of state changes
  pub state_changes : AtomicU64,
  /// Peak buffer size reached during stream lifetime
  pub peak_buffer_size : AtomicUsize,
  /// Average response time for control operations in microseconds
  pub avg_control_response_time_us : AtomicU64,
  /// Number of control operations performed
  pub control_operations : AtomicU64,
  /// Number of buffer overflows (when pause buffer reached max capacity)
  pub buffer_overflows : AtomicU64,
  /// Number of items sent through the stream
  pub items_sent : AtomicU64,
}

impl StreamMetrics
{
  /// Create new StreamMetrics with all counters initialized to zero
  pub fn new() -> Self
  {
    Self
    {
      total_chunks : AtomicU64::new( 0 ),
      buffer_size : AtomicUsize::new( 0 ),
      bytes_received : AtomicU64::new( 0 ),
      pause_count : AtomicU64::new( 0 ),
      resume_count : AtomicU64::new( 0 ),
      state_changes : AtomicU64::new( 0 ),
      peak_buffer_size : AtomicUsize::new( 0 ),
      avg_control_response_time_us : AtomicU64::new( 0 ),
      control_operations : AtomicU64::new( 0 ),
      buffer_overflows : AtomicU64::new( 0 ),
      items_sent : AtomicU64::new( 0 ),
    }
  }
}

/// Snapshot of metrics for external consumers (non-atomic version)
#[ derive( Debug, Clone ) ]
pub struct StreamMetricsSnapshot
{
  /// Total number of chunks received
  pub total_chunks : u64,
  /// Current buffer size in bytes
  pub buffer_size : usize,
  /// Total bytes received
  pub bytes_received : u64,
  /// Number of times stream was paused
  pub pause_count : u64,
  /// Number of times stream was resumed
  pub resume_count : u64,
  /// Total number of state changes
  pub state_changes : u64,
  /// Peak buffer size reached during stream lifetime
  pub peak_buffer_size : usize,
  /// Average response time for control operations in microseconds
  pub avg_control_response_time_us : u64,
  /// Number of control operations performed
  pub control_operations : u64,
  /// Number of buffer overflows
  pub buffer_overflows : u64,
  /// Number of items sent through the stream
  pub items_sent : u64,
}

impl Default for StreamMetrics
{
  fn default() -> Self
  {
    Self {
      total_chunks : AtomicU64::new( 0 ),
      buffer_size : AtomicUsize::new( 0 ),
      bytes_received : AtomicU64::new( 0 ),
      pause_count : AtomicU64::new( 0 ),
      resume_count : AtomicU64::new( 0 ),
      state_changes : AtomicU64::new( 0 ),
      peak_buffer_size : AtomicUsize::new( 0 ),
      avg_control_response_time_us : AtomicU64::new( 0 ),
      control_operations : AtomicU64::new( 0 ),
      buffer_overflows : AtomicU64::new( 0 ),
      items_sent : AtomicU64::new( 0 ),
    }
  }
}

impl StreamMetrics
{
  /// Create a snapshot of the current metrics
  pub fn snapshot( &self ) -> StreamMetricsSnapshot
  {
    StreamMetricsSnapshot {
      total_chunks : self.total_chunks.load( Ordering::Relaxed ),
      buffer_size : self.buffer_size.load( Ordering::Relaxed ),
      bytes_received : self.bytes_received.load( Ordering::Relaxed ),
      pause_count : self.pause_count.load( Ordering::Relaxed ),
      resume_count : self.resume_count.load( Ordering::Relaxed ),
      state_changes : self.state_changes.load( Ordering::Relaxed ),
      peak_buffer_size : self.peak_buffer_size.load( Ordering::Relaxed ),
      avg_control_response_time_us : self.avg_control_response_time_us.load( Ordering::Relaxed ),
      control_operations : self.control_operations.load( Ordering::Relaxed ),
      buffer_overflows : self.buffer_overflows.load( Ordering::Relaxed ),
      items_sent : self.items_sent.load( Ordering::Relaxed ),
    }
  }
}