api_openai/
streaming_control.rs1mod private
8{
9 use std::
10 {
11 sync ::Arc,
12 time ::Instant,
13 };
14 use core::
15 {
16 sync ::atomic::{ AtomicBool, Ordering },
17 time ::Duration,
18 };
19 use serde::{ Deserialize, Serialize };
20 use tokio::{ sync::mpsc, time };
21
22 #[ derive( Debug, Clone, PartialEq, Serialize, Deserialize ) ]
24 pub enum StreamState
25 {
26 Running,
28 Paused,
30 Cancelled,
32 Completed,
34 Error( String ),
36 }
37
38 #[ derive( Debug, Clone ) ]
40 pub struct CancellationToken
41 {
42 cancelled : Arc< AtomicBool >,
44 }
45
46 impl CancellationToken
47 {
48 #[ inline ]
50 #[ must_use ]
51 pub fn new() -> Self
52 {
53 Self
54 {
55 cancelled : Arc::new( AtomicBool::new( false ) ),
56 }
57 }
58
59 #[ inline ]
61 pub fn cancel( &self )
62 {
63 self.cancelled.store( true, Ordering::SeqCst );
64 }
65
66 #[ inline ]
68 #[ must_use ]
69 pub fn is_cancelled( &self ) -> bool
70 {
71 self.cancelled.load( Ordering::SeqCst )
72 }
73
74 #[ inline ]
76 pub async fn wait_for_cancellation( &self, timeout : Duration ) -> bool
77 {
78 let start = Instant::now();
79 while start.elapsed() < timeout
80 {
81 if self.is_cancelled()
82 {
83 return true;
84 }
85 time ::sleep( Duration::from_millis( 10 ) ).await;
86 }
87 false
88 }
89 }
90
91 impl Default for CancellationToken
92 {
93 #[ inline ]
94 fn default() -> Self
95 {
96 Self::new()
97 }
98 }
99
100 #[ derive( Debug ) ]
102 pub struct StreamControl
103 {
104 state : StreamState,
106 token : CancellationToken,
108 created_at : Instant,
110 }
111
112 impl StreamControl
113 {
114 #[ inline ]
116 #[ must_use ]
117 pub fn new() -> Self
118 {
119 Self
120 {
121 state : StreamState::Running,
122 token : CancellationToken::new(),
123 created_at : Instant::now(),
124 }
125 }
126
127 #[ inline ]
129 #[ must_use ]
130 pub fn state( &self ) -> &StreamState
131 {
132 &self.state
133 }
134
135 #[ inline ]
137 #[ must_use ]
138 pub fn cancellation_token( &self ) -> &CancellationToken
139 {
140 &self.token
141 }
142
143 #[ inline ]
145 #[ must_use ]
146 pub fn elapsed( &self ) -> Duration
147 {
148 self.created_at.elapsed()
149 }
150
151 #[ inline ]
153 pub fn set_state( &mut self, state : StreamState )
154 {
155 self.state = state;
156 }
157
158 #[ inline ]
160 pub fn cancel( &mut self )
161 {
162 self.token.cancel();
163 self.state = StreamState::Cancelled;
164 }
165
166 #[ inline ]
168 #[ must_use ]
169 pub fn is_active( &self ) -> bool
170 {
171 matches!( self.state, StreamState::Running | StreamState::Paused )
172 }
173 }
174
175 impl Default for StreamControl
176 {
177 #[ inline ]
178 fn default() -> Self
179 {
180 Self::new()
181 }
182 }
183
184 #[ derive( Debug, Clone, Serialize, Deserialize ) ]
186 pub struct StreamControlConfig
187 {
188 pub max_pause_duration : Duration,
190 pub pause_buffer_size : usize,
192 pub control_timeout : Duration,
194 }
195
196 impl Default for StreamControlConfig
197 {
198 #[ inline ]
199 fn default() -> Self
200 {
201 Self
202 {
203 max_pause_duration : Duration::from_secs( 300 ), pause_buffer_size : 1024 * 1024, control_timeout : Duration::from_secs( 5 ),
206 }
207 }
208 }
209
210 #[ derive( Debug ) ]
212 pub struct StreamControlManager;
213
214 impl StreamControlManager
215 {
216 #[ inline ]
219 pub fn create_controlled_processor< T >(
220 control : StreamControl,
221 ) -> impl Fn( T ) -> Option< T >
222 where
223 T : Send + 'static,
224 {
225 move | item : T | -> Option< T >
226 {
227 if control.token.is_cancelled()
229 {
230 return None;
231 }
232
233 Some( item )
236 }
237 }
238
239 #[ inline ]
245 pub async fn with_cancellation< T, F, Fut >(
246 token : CancellationToken,
247 operation : F,
248 ) -> Result< T, &'static str >
249 where
250 F : FnOnce() -> Fut,
251 Fut : core::future::Future< Output = T >,
252 {
253 let operation_future = operation();
254
255 tokio ::select!
256 {
257 result = operation_future =>
258 {
259 if token.is_cancelled()
260 {
261 Err( "Operation was cancelled" )
262 }
263 else
264 {
265 Ok( result )
266 }
267 }
268 () = Self::wait_for_cancellation( &token ) =>
269 {
270 Err( "Operation was cancelled" )
271 }
272 }
273 }
274
275 async fn wait_for_cancellation( token : &CancellationToken )
277 {
278 while !token.is_cancelled()
279 {
280 time ::sleep( Duration::from_millis( 10 ) ).await;
281 }
282 }
283
284 #[ inline ]
286 #[ must_use ]
287 pub fn create_timeout_token( timeout : Duration ) -> CancellationToken
288 {
289 let token = CancellationToken::new();
290 let token_clone = token.clone();
291
292 tokio ::spawn( async move
293 {
294 time ::sleep( timeout ).await;
295 token_clone.cancel();
296 });
297
298 token
299 }
300
301 #[ inline ]
303 #[ must_use ]
304 pub fn combine_tokens( tokens : Vec< CancellationToken > ) -> CancellationToken
305 {
306 let combined = CancellationToken::new();
307 let combined_clone = combined.clone();
308
309 tokio ::spawn( async move
310 {
311 loop
312 {
313 if tokens.iter().any( CancellationToken::is_cancelled )
314 {
315 combined_clone.cancel();
316 break;
317 }
318 time ::sleep( Duration::from_millis( 10 ) ).await;
319 }
320 });
321
322 combined
323 }
324
325 #[ inline ]
327 #[ must_use ]
328 pub fn create_control_channel() -> ( StreamControlSender, StreamControlReceiver )
329 {
330 let ( tx, rx ) = mpsc::unbounded_channel();
331 ( StreamControlSender { sender : tx }, StreamControlReceiver { receiver : rx } )
332 }
333 }
334
335 #[ derive( Debug ) ]
337 pub struct StreamControlSender
338 {
339 sender : mpsc::UnboundedSender< StreamControlCommand >,
340 }
341
342 impl StreamControlSender
343 {
344 #[ inline ]
350 pub fn pause( &self ) -> Result< (), &'static str >
351 {
352 self.sender.send( StreamControlCommand::Pause )
353 .map_err( | _ | "Failed to send pause command" )
354 }
355
356 #[ inline ]
362 pub fn resume( &self ) -> Result< (), &'static str >
363 {
364 self.sender.send( StreamControlCommand::Resume )
365 .map_err( | _ | "Failed to send resume command" )
366 }
367
368 #[ inline ]
374 pub fn cancel( &self ) -> Result< (), &'static str >
375 {
376 self.sender.send( StreamControlCommand::Cancel )
377 .map_err( | _ | "Failed to send cancel command" )
378 }
379 }
380
381 #[ derive( Debug ) ]
383 pub struct StreamControlReceiver
384 {
385 receiver : mpsc::UnboundedReceiver< StreamControlCommand >,
386 }
387
388 impl StreamControlReceiver
389 {
390 #[ inline ]
392 pub fn try_recv( &mut self ) -> Option< StreamControlCommand >
393 {
394 self.receiver.try_recv().ok()
395 }
396
397 #[ inline ]
399 pub async fn recv( &mut self ) -> Option< StreamControlCommand >
400 {
401 self.receiver.recv().await
402 }
403 }
404
405 #[ derive( Debug, Clone, PartialEq ) ]
407 pub enum StreamControlCommand
408 {
409 Pause,
411 Resume,
413 Cancel,
415 }
416}
417
418crate ::mod_interface!
419{
420 exposed use private::StreamState;
421 exposed use private::CancellationToken;
422 exposed use private::StreamControl;
423 exposed use private::StreamControlConfig;
424 exposed use private::StreamControlManager;
425 exposed use private::StreamControlSender;
426 exposed use private::StreamControlReceiver;
427 exposed use private::StreamControlCommand;
428}