api_openai/
streaming_control.rs

1//! Streaming Control Module
2//!
3//! This module provides stateless streaming control utilities for `OpenAI` API responses.
4//! Following the "Thin Client, Rich API" principle, this module offers control patterns
5//! and cancellation tokens without maintaining persistent stream state.
6
7mod private
8{
9  use std::
10  {
11    sync ::Arc,
12    time ::Instant,
13  };
14  use core::
15  {
16    sync ::atomic::{ AtomicBool, Ordering },
17    time ::Duration,
18  };
19  use serde::{ Deserialize, Serialize };
20  use tokio::{ sync::mpsc, time };
21
22  /// Stream control state for tracking operations
23  #[ derive( Debug, Clone, PartialEq, Serialize, Deserialize ) ]
24  pub enum StreamState
25  {
26    /// Stream is actively running
27    Running,
28    /// Stream is paused (buffering)
29    Paused,
30    /// Stream is cancelled
31    Cancelled,
32    /// Stream completed normally
33    Completed,
34    /// Stream encountered an error
35    Error( String ),
36  }
37
38  /// Cancellation token for controlling streaming operations
39  #[ derive( Debug, Clone ) ]
40  pub struct CancellationToken
41  {
42    /// Internal cancellation flag
43    cancelled : Arc< AtomicBool >,
44  }
45
46  impl CancellationToken
47  {
48    /// Create a new cancellation token
49    #[ inline ]
50    #[ must_use ]
51    pub fn new() -> Self
52    {
53      Self
54      {
55        cancelled : Arc::new( AtomicBool::new( false ) ),
56      }
57    }
58
59    /// Cancel the operation
60    #[ inline ]
61    pub fn cancel( &self )
62    {
63      self.cancelled.store( true, Ordering::SeqCst );
64    }
65
66    /// Check if operation is cancelled
67    #[ inline ]
68    #[ must_use ]
69    pub fn is_cancelled( &self ) -> bool
70    {
71      self.cancelled.load( Ordering::SeqCst )
72    }
73
74    /// Wait for cancellation or timeout
75    #[ inline ]
76    pub async fn wait_for_cancellation( &self, timeout : Duration ) -> bool
77    {
78      let start = Instant::now();
79      while start.elapsed() < timeout
80      {
81        if self.is_cancelled()
82        {
83          return true;
84        }
85        time ::sleep( Duration::from_millis( 10 ) ).await;
86      }
87      false
88    }
89  }
90
91  impl Default for CancellationToken
92  {
93    #[ inline ]
94    fn default() -> Self
95    {
96      Self::new()
97    }
98  }
99
100  /// Stream control handle for managing streaming operations
101  #[ derive( Debug ) ]
102  pub struct StreamControl
103  {
104    /// Current state of the stream
105    state : StreamState,
106    /// Cancellation token
107    token : CancellationToken,
108    /// Creation timestamp
109    created_at : Instant,
110  }
111
112  impl StreamControl
113  {
114    /// Create a new stream control handle
115    #[ inline ]
116    #[ must_use ]
117    pub fn new() -> Self
118    {
119      Self
120      {
121        state : StreamState::Running,
122        token : CancellationToken::new(),
123        created_at : Instant::now(),
124      }
125    }
126
127    /// Get current stream state
128    #[ inline ]
129    #[ must_use ]
130    pub fn state( &self ) -> &StreamState
131    {
132      &self.state
133    }
134
135    /// Get cancellation token
136    #[ inline ]
137    #[ must_use ]
138    pub fn cancellation_token( &self ) -> &CancellationToken
139    {
140      &self.token
141    }
142
143    /// Get elapsed time since creation
144    #[ inline ]
145    #[ must_use ]
146    pub fn elapsed( &self ) -> Duration
147    {
148      self.created_at.elapsed()
149    }
150
151    /// Update stream state
152    #[ inline ]
153    pub fn set_state( &mut self, state : StreamState )
154    {
155      self.state = state;
156    }
157
158    /// Cancel the stream
159    #[ inline ]
160    pub fn cancel( &mut self )
161    {
162      self.token.cancel();
163      self.state = StreamState::Cancelled;
164    }
165
166    /// Check if stream is active (not cancelled, completed, or errored)
167    #[ inline ]
168    #[ must_use ]
169    pub fn is_active( &self ) -> bool
170    {
171      matches!( self.state, StreamState::Running | StreamState::Paused )
172    }
173  }
174
175  impl Default for StreamControl
176  {
177    #[ inline ]
178    fn default() -> Self
179    {
180      Self::new()
181    }
182  }
183
184  /// Configuration for streaming control behavior
185  #[ derive( Debug, Clone, Serialize, Deserialize ) ]
186  pub struct StreamControlConfig
187  {
188    /// Maximum pause duration before automatic timeout
189    pub max_pause_duration : Duration,
190    /// Buffer size for paused streams
191    pub pause_buffer_size : usize,
192    /// Timeout for control operations
193    pub control_timeout : Duration,
194  }
195
196  impl Default for StreamControlConfig
197  {
198    #[ inline ]
199    fn default() -> Self
200    {
201      Self
202      {
203        max_pause_duration : Duration::from_secs( 300 ), // 5 minutes
204        pause_buffer_size : 1024 * 1024, // 1MB
205        control_timeout : Duration::from_secs( 5 ),
206      }
207    }
208  }
209
210  /// Stream control utilities
211  #[ derive( Debug ) ]
212  pub struct StreamControlManager;
213
214  impl StreamControlManager
215  {
216    /// Create a controlled stream processing function
217    /// This returns a function that can process stream items with control
218    #[ inline ]
219    pub fn create_controlled_processor< T >(
220      control : StreamControl,
221    ) -> impl Fn( T ) -> Option< T >
222    where
223      T : Send + 'static,
224    {
225      move | item : T | -> Option< T >
226      {
227        // Check cancellation before processing
228        if control.token.is_cancelled()
229        {
230          return None;
231        }
232
233        // For stateless operation, we process immediately
234        // In a real streaming scenario, this would integrate with the actual stream
235        Some( item )
236      }
237    }
238
239    /// Create a cancellable async operation
240    ///
241    /// # Errors
242    ///
243    /// Returns an error if the operation is cancelled.
244    #[ inline ]
245    pub async fn with_cancellation< T, F, Fut >(
246      token : CancellationToken,
247      operation : F,
248    ) -> Result< T, &'static str >
249    where
250      F : FnOnce() -> Fut,
251      Fut : core::future::Future< Output = T >,
252    {
253      let operation_future = operation();
254
255      tokio ::select!
256      {
257        result = operation_future =>
258        {
259          if token.is_cancelled()
260          {
261            Err( "Operation was cancelled" )
262          }
263          else
264          {
265            Ok( result )
266          }
267        }
268        () = Self::wait_for_cancellation( &token ) =>
269        {
270          Err( "Operation was cancelled" )
271        }
272      }
273    }
274
275    /// Wait for cancellation token to be triggered
276    async fn wait_for_cancellation( token : &CancellationToken )
277    {
278      while !token.is_cancelled()
279      {
280        time ::sleep( Duration::from_millis( 10 ) ).await;
281      }
282    }
283
284    /// Create a timeout-based cancellation token
285    #[ inline ]
286    #[ must_use ]
287    pub fn create_timeout_token( timeout : Duration ) -> CancellationToken
288    {
289      let token = CancellationToken::new();
290      let token_clone = token.clone();
291
292      tokio ::spawn( async move
293      {
294        time ::sleep( timeout ).await;
295        token_clone.cancel();
296      });
297
298      token
299    }
300
301    /// Combine multiple cancellation tokens (any cancellation triggers all)
302    #[ inline ]
303    #[ must_use ]
304    pub fn combine_tokens( tokens : Vec< CancellationToken > ) -> CancellationToken
305    {
306      let combined = CancellationToken::new();
307      let combined_clone = combined.clone();
308
309      tokio ::spawn( async move
310      {
311        loop
312        {
313          if tokens.iter().any( CancellationToken::is_cancelled )
314          {
315            combined_clone.cancel();
316            break;
317          }
318          time ::sleep( Duration::from_millis( 10 ) ).await;
319        }
320      });
321
322      combined
323    }
324
325    /// Create a manual control channel for external control
326    #[ inline ]
327    #[ must_use ]
328    pub fn create_control_channel() -> ( StreamControlSender, StreamControlReceiver )
329    {
330      let ( tx, rx ) = mpsc::unbounded_channel();
331      ( StreamControlSender { sender : tx }, StreamControlReceiver { receiver : rx } )
332    }
333  }
334
335  /// Sender for stream control commands
336  #[ derive( Debug ) ]
337  pub struct StreamControlSender
338  {
339    sender : mpsc::UnboundedSender< StreamControlCommand >,
340  }
341
342  impl StreamControlSender
343  {
344    /// Send a pause command
345    ///
346    /// # Errors
347    ///
348    /// Returns an error if the command cannot be sent.
349    #[ inline ]
350    pub fn pause( &self ) -> Result< (), &'static str >
351    {
352      self.sender.send( StreamControlCommand::Pause )
353        .map_err( | _ | "Failed to send pause command" )
354    }
355
356    /// Send a resume command
357    ///
358    /// # Errors
359    ///
360    /// Returns an error if the command cannot be sent.
361    #[ inline ]
362    pub fn resume( &self ) -> Result< (), &'static str >
363    {
364      self.sender.send( StreamControlCommand::Resume )
365        .map_err( | _ | "Failed to send resume command" )
366    }
367
368    /// Send a cancel command
369    ///
370    /// # Errors
371    ///
372    /// Returns an error if the command cannot be sent.
373    #[ inline ]
374    pub fn cancel( &self ) -> Result< (), &'static str >
375    {
376      self.sender.send( StreamControlCommand::Cancel )
377        .map_err( | _ | "Failed to send cancel command" )
378    }
379  }
380
381  /// Receiver for stream control commands
382  #[ derive( Debug ) ]
383  pub struct StreamControlReceiver
384  {
385    receiver : mpsc::UnboundedReceiver< StreamControlCommand >,
386  }
387
388  impl StreamControlReceiver
389  {
390    /// Try to receive a control command (non-blocking)
391    #[ inline ]
392    pub fn try_recv( &mut self ) -> Option< StreamControlCommand >
393    {
394      self.receiver.try_recv().ok()
395    }
396
397    /// Receive next control command (blocking)
398    #[ inline ]
399    pub async fn recv( &mut self ) -> Option< StreamControlCommand >
400    {
401      self.receiver.recv().await
402    }
403  }
404
405  /// Commands for controlling stream operations
406  #[ derive( Debug, Clone, PartialEq ) ]
407  pub enum StreamControlCommand
408  {
409    /// Pause the stream
410    Pause,
411    /// Resume the stream
412    Resume,
413    /// Cancel the stream
414    Cancel,
415  }
416}
417
418crate ::mod_interface!
419{
420  exposed use private::StreamState;
421  exposed use private::CancellationToken;
422  exposed use private::StreamControl;
423  exposed use private::StreamControlConfig;
424  exposed use private::StreamControlManager;
425  exposed use private::StreamControlSender;
426  exposed use private::StreamControlReceiver;
427  exposed use private::StreamControlCommand;
428}