sklears_core/streaming_lifetimes.rs
1/// Explicit lifetime parameter examples for streaming operations
2///
3/// This module demonstrates advanced lifetime parameter usage in streaming
4/// machine learning operations, providing clear lifetime relationships
5/// and better compile-time safety.
6use futures_core::{Future, Stream};
7use std::pin::Pin;
8
9// Type aliases for complex return types
10type WindowedStream<'window, Output, E> = Pin<
11 Box<
12 dyn Stream<Item = std::result::Result<WindowedOutput<'window, Output>, E>> + Send + 'window,
13 >,
14>;
15
16type ProcessingStream<'stream, Output, E> =
17 Pin<Box<dyn Stream<Item = std::result::Result<Vec<Output>, E>> + Send + 'stream>>;
18
19type ZeroCopyStream<'stream, Input, Output, E> = Pin<
20 Box<
21 dyn Stream<Item = std::result::Result<OutputView<'stream, Input, Output>, E>>
22 + Send
23 + 'stream,
24 >,
25>;
26
27/// Streaming data processor with explicit lifetime relationships
28///
29/// This trait demonstrates advanced lifetime parameter patterns for streaming operations.
30/// The lifetime parameters are explicitly documented to clarify their relationships:
31///
32/// - `'data`: Lifetime of the input data being processed
33/// - `'config`: Lifetime of the configuration object
34/// - `'processor`: Lifetime of the processor itself
35pub trait StreamingProcessor<'data, 'config, 'processor, Input, Output>
36where
37 'data: 'processor, // Data must live at least as long as the processor
38 'config: 'processor, // Config must live at least as long as the processor
39 Input: 'data,
40{
41 type Error: std::error::Error + Send + Sync + 'static;
42
43 /// Process a single item with explicit lifetime management
44 ///
45 /// The lifetime parameters ensure that:
46 /// - Input data lives for the duration of processing
47 /// - Config is accessible throughout the operation
48 /// - The processor maintains its state correctly
49 fn process_item<'item>(
50 &'processor self,
51 item: &'item Input,
52 config: &'config ProcessingConfig,
53 ) -> Pin<Box<dyn Future<Output = std::result::Result<Output, Self::Error>> + Send + 'item>>
54 where
55 'processor: 'item, // Processor must outlive the item processing
56 'config: 'item, // Config must outlive the item processing
57 'data: 'item, // Data lifetime must cover item processing
58 Input: 'item, // Input must be valid for item processing
59 Output: 'item; // Output is tied to item processing lifetime
60
61 /// Process a stream of items with batching and explicit lifetime control
62 ///
63 /// This method shows how to handle complex lifetime relationships in streaming:
64 /// - The input stream has its own lifetime ('stream)
65 /// - The output stream is tied to multiple lifetimes
66 /// - Memory management is explicit and safe
67 fn process_stream<'stream>(
68 &'processor self,
69 input_stream: Pin<Box<dyn Stream<Item = Input> + Send + 'stream>>,
70 config: &'config ProcessingConfig,
71 batch_size: usize,
72 ) -> ProcessingStream<'stream, Output, Self::Error>
73 where
74 'processor: 'stream, // Processor must outlive stream processing
75 'config: 'stream, // Config must outlive stream processing
76 'data: 'stream, // Data lifetime must cover stream processing
77 Input: 'stream, // Input items must be valid for stream duration
78 Output: 'stream; // Output items are tied to stream lifetime
79
80 /// Transform a stream with memory-efficient windowing
81 ///
82 /// Demonstrates lifetime management for windowed operations where:
83 /// - Windows maintain references to multiple input items
84 /// - Output lifetimes are carefully managed to prevent memory leaks
85 /// - Window size affects memory lifetime requirements
86 fn windowed_transform<'window>(
87 &'processor self,
88 input_stream: Pin<Box<dyn Stream<Item = &'window Input> + Send + 'window>>,
89 window_size: usize,
90 config: &'config ProcessingConfig,
91 ) -> WindowedStream<'window, Output, Self::Error>
92 where
93 'processor: 'window, // Processor must outlive windowed operation
94 'config: 'window, // Config must outlive windowed operation
95 'data: 'window, // Data must outlive windowed operation
96 Input: 'window, // Input references must be valid for window duration
97 Output: 'window; // Output is tied to window lifetime
98}
99
100/// Configuration for streaming processing operations
101#[derive(Debug, Clone)]
102pub struct ProcessingConfig {
103 /// Buffer size for streaming operations
104 pub buffer_size: usize,
105 /// Maximum memory usage per batch (bytes)
106 pub max_memory_per_batch: usize,
107 /// Enable memory-efficient processing
108 pub memory_efficient: bool,
109 /// Timeout for individual operations
110 pub operation_timeout: std::time::Duration,
111}
112
113/// Windowed output that maintains lifetime relationships
114///
115/// This struct demonstrates how to maintain safe lifetime relationships
116/// in windowed streaming operations where the output references
117/// input data across multiple time steps.
118#[derive(Debug)]
119pub struct WindowedOutput<'window, T> {
120 /// The computed output value
121 pub value: T,
122 /// Window metadata with lifetime tied to input
123 pub window_info: WindowInfo<'window>,
124 /// Processing timestamp
125 pub timestamp: std::time::Instant,
126}
127
128/// Window information with explicit lifetime management
129#[derive(Debug)]
130pub struct WindowInfo<'window> {
131 /// Window size used for computation
132 pub size: usize,
133 /// References to the first and last items in the window
134 /// This demonstrates safe lifetime management for references
135 pub window_bounds: Option<WindowBounds<'window>>,
136 /// Window statistics
137 pub stats: WindowStats,
138}
139
140/// Window bounds with lifetime-managed references
141#[derive(Debug)]
142pub struct WindowBounds<'window> {
143 /// Reference to the first item in the window
144 pub first: Option<&'window dyn std::fmt::Debug>,
145 /// Reference to the last item in the window
146 pub last: Option<&'window dyn std::fmt::Debug>,
147}
148
149/// Statistics computed over the window
150#[derive(Debug, Clone)]
151pub struct WindowStats {
152 /// Number of items processed
153 pub count: usize,
154 /// Processing duration
155 pub duration: std::time::Duration,
156 /// Memory usage during processing
157 pub memory_used: usize,
158}
159
160/// Advanced streaming ML pipeline with explicit lifetime management
161///
162/// This trait demonstrates complex lifetime relationships in ML pipelines
163/// where multiple processing stages have different lifetime requirements.
164pub trait StreamingMLPipeline<'data, 'model, Input, Output>
165where
166 'data: 'model, // Data must outlive the model
167 Input: 'data,
168{
169 type Model: 'model;
170 type Error: std::error::Error + Send + Sync + 'static;
171 type IntermediateOutput: 'model;
172
173 /// Multi-stage processing with explicit lifetime control
174 ///
175 /// This method shows how to chain multiple processing stages
176 /// while maintaining safe lifetime relationships throughout.
177 fn process_pipeline<'pipeline>(
178 &'model self,
179 input_stream: Pin<Box<dyn Stream<Item = Input> + Send + 'pipeline>>,
180 stages: &'pipeline [PipelineStage<'model, Self::Model>],
181 ) -> Pin<Box<dyn Stream<Item = std::result::Result<Output, Self::Error>> + Send + 'pipeline>>
182 where
183 'model: 'pipeline, // Model must outlive pipeline execution
184 'data: 'pipeline, // Data must outlive pipeline execution
185 Input: 'pipeline, // Input must be valid for pipeline duration
186 Output: 'pipeline, // Output is tied to pipeline lifetime
187 Self::IntermediateOutput: 'pipeline;
188
189 /// Parallel processing with lifetime-aware work distribution
190 ///
191 /// Demonstrates how to safely distribute work across multiple threads
192 /// while maintaining lifetime guarantees for shared data.
193 fn process_parallel<'parallel>(
194 &'model self,
195 input_stream: Pin<Box<dyn Stream<Item = Input> + Send + 'parallel>>,
196 worker_count: usize,
197 config: &'parallel ProcessingConfig,
198 ) -> Pin<Box<dyn Stream<Item = std::result::Result<Output, Self::Error>> + Send + 'parallel>>
199 where
200 'model: 'parallel, // Model must outlive parallel processing
201 'data: 'parallel, // Data must outlive parallel processing
202 Input: 'parallel + Clone, // Input must be cloneable for distribution
203 Output: 'parallel, // Output is tied to parallel processing lifetime
204 Self::Model: Sync, // Model must be thread-safe for parallel access
205 Self: Sync; // Pipeline must be thread-safe
206
207 /// Adaptive processing with dynamic lifetime management
208 ///
209 /// Shows how to handle dynamic lifetime requirements where
210 /// processing parameters may change based on input characteristics.
211 fn process_adaptive<'adaptive>(
212 &'model self,
213 input_stream: Pin<Box<dyn Stream<Item = Input> + Send + 'adaptive>>,
214 adaptation_fn: &'adaptive dyn Fn(&Input) -> AdaptationParams,
215 ) -> Pin<Box<dyn Stream<Item = std::result::Result<Output, Self::Error>> + Send + 'adaptive>>
216 where
217 'model: 'adaptive, // Model must outlive adaptive processing
218 'data: 'adaptive, // Data must outlive adaptive processing
219 Input: 'adaptive, // Input must be valid for adaptive processing
220 Output: 'adaptive; // Output is tied to adaptive processing lifetime
221}
222
223/// Pipeline stage with model lifetime management
224#[derive(Debug)]
225pub struct PipelineStage<'model, Model> {
226 /// Name of the processing stage
227 pub name: &'static str,
228 /// Model used in this stage (with lifetime bound to pipeline)
229 pub model: &'model Model,
230 /// Stage-specific configuration
231 pub config: StageConfig,
232}
233
234/// Configuration for individual pipeline stages
235#[derive(Debug, Clone)]
236pub struct StageConfig {
237 /// Enable this stage
238 pub enabled: bool,
239 /// Stage-specific parameters
240 pub parameters: std::collections::HashMap<String, f64>,
241 /// Memory limit for this stage
242 pub memory_limit: Option<usize>,
243}
244
245/// Parameters for adaptive processing
246#[derive(Debug, Clone)]
247pub struct AdaptationParams {
248 /// Batch size adaptation
249 pub batch_size: usize,
250 /// Memory allocation hint
251 pub memory_hint: usize,
252 /// Processing priority
253 pub priority: ProcessingPriority,
254}
255
256/// Processing priority levels
257#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
258pub enum ProcessingPriority {
259 Low,
260 Normal,
261 High,
262 Critical,
263}
264
265/// Zero-copy streaming operations with lifetime guarantees
266///
267/// This trait demonstrates zero-copy streaming patterns with explicit
268/// lifetime management to ensure memory safety without performance overhead.
269pub trait ZeroCopyStreaming<'data, Input, Output>
270where
271 Input: 'data,
272 Output: 'data,
273{
274 type Error: std::error::Error + Send + Sync + 'static;
275
276 /// Process data without copying, maintaining lifetime relationships
277 ///
278 /// This method shows how to process data in-place while ensuring
279 /// that all lifetime relationships are maintained correctly.
280 fn process_zero_copy<'processing>(
281 &self,
282 data: &'processing mut [Input],
283 output_buffer: &'processing mut [Output],
284 ) -> Pin<Box<dyn Future<Output = std::result::Result<usize, Self::Error>> + Send + 'processing>>
285 where
286 'data: 'processing, // Data lifetime must cover processing
287 Input: 'processing, // Input must be valid for processing duration
288 Output: 'processing; // Output buffer must be valid for processing
289
290 /// Stream processing with zero-copy views
291 ///
292 /// Demonstrates streaming with zero-copy views where the output
293 /// contains references to the input data rather than copies.
294 fn stream_zero_copy_views<'stream>(
295 &self,
296 input_stream: Pin<Box<dyn Stream<Item = &'stream [Input]> + Send + 'stream>>,
297 ) -> ZeroCopyStream<'stream, Input, Output, Self::Error>
298 where
299 'data: 'stream, // Data must outlive streaming
300 Input: 'stream, // Input slices must be valid for stream duration
301 Output: 'stream; // Output views are tied to stream lifetime
302}
303
304/// Zero-copy output view with lifetime management
305///
306/// This struct demonstrates how to create zero-copy views of processed data
307/// while maintaining safe lifetime relationships between input and output.
308#[derive(Debug)]
309pub struct OutputView<'view, Input, Output> {
310 /// Processed output data
311 pub output: Output,
312 /// Optional reference to original input (zero-copy)
313 pub input_ref: Option<&'view Input>,
314 /// Processing metadata
315 pub metadata: ViewMetadata,
316}
317
318/// Metadata for zero-copy views
319#[derive(Debug, Clone)]
320pub struct ViewMetadata {
321 /// Processing time
322 pub processing_time: std::time::Duration,
323 /// Memory overhead (should be minimal for zero-copy)
324 pub memory_overhead: usize,
325 /// Whether the operation was truly zero-copy
326 pub zero_copy: bool,
327}
328
329impl Default for ProcessingConfig {
330 fn default() -> Self {
331 Self {
332 buffer_size: 8192,
333 max_memory_per_batch: 1024 * 1024, // 1MB
334 memory_efficient: true,
335 operation_timeout: std::time::Duration::from_secs(30),
336 }
337 }
338}
339
340impl Default for StageConfig {
341 fn default() -> Self {
342 Self {
343 enabled: true,
344 parameters: std::collections::HashMap::new(),
345 memory_limit: None,
346 }
347 }
348}
349
350impl Default for AdaptationParams {
351 fn default() -> Self {
352 Self {
353 batch_size: 1000,
354 memory_hint: 1024 * 1024, // 1MB
355 priority: ProcessingPriority::Normal,
356 }
357 }
358}
359
360/// Lifetime documentation utilities
361///
362/// This module provides documentation helpers for understanding
363/// lifetime relationships in complex streaming operations.
364pub mod lifetime_docs {
365 /// Documents the lifetime relationships in a streaming operation
366 ///
367 /// # Lifetime Parameters
368 /// - `'data`: The lifetime of the source data being processed
369 /// - `'processor`: The lifetime of the processing pipeline
370 /// - `'config`: The lifetime of configuration objects
371 /// - `'output`: The lifetime of the output data
372 ///
373 /// # Lifetime Relationships
374 /// - `'data: 'processor` - Source data must outlive the processor
375 /// - `'config: 'processor` - Configuration must outlive the processor
376 /// - `'processor: 'output` - Processor must outlive output generation
377 ///
378 /// # Memory Safety Guarantees
379 /// - No dangling references in streaming operations
380 /// - Proper cleanup of resources when lifetimes end
381 /// - Zero-copy operations maintain reference validity
382 /// - Batched processing respects memory constraints
383 pub fn document_streaming_lifetimes() {
384 // This function serves as documentation for lifetime patterns
385 // used throughout the streaming operations in this module.
386 }
387
388 /// Documents common lifetime anti-patterns to avoid
389 ///
390 /// # Anti-patterns
391 /// - Returning references with insufficient lifetime bounds
392 /// - Storing references without proper lifetime constraints
393 /// - Using 'static where shorter lifetimes would suffice
394 /// - Mixing owned and borrowed data without clear lifetime bounds
395 pub fn document_lifetime_antipatterns() {
396 // This function documents what NOT to do with lifetimes
397 // in streaming machine learning operations.
398 }
399}
400
401#[allow(non_snake_case)]
402#[cfg(test)]
403mod tests {
404 use super::*;
405
406 #[test]
407 fn test_processing_config_default() {
408 let config = ProcessingConfig::default();
409 assert_eq!(config.buffer_size, 8192);
410 assert_eq!(config.max_memory_per_batch, 1024 * 1024);
411 assert!(config.memory_efficient);
412 }
413
414 #[test]
415 fn test_stage_config_default() {
416 let config = StageConfig::default();
417 assert!(config.enabled);
418 assert!(config.parameters.is_empty());
419 assert!(config.memory_limit.is_none());
420 }
421
422 #[test]
423 fn test_adaptation_params_default() {
424 let params = AdaptationParams::default();
425 assert_eq!(params.batch_size, 1000);
426 assert_eq!(params.memory_hint, 1024 * 1024);
427 assert_eq!(params.priority, ProcessingPriority::Normal);
428 }
429
430 #[test]
431 fn test_processing_priority_ordering() {
432 assert!(ProcessingPriority::Critical > ProcessingPriority::High);
433 assert!(ProcessingPriority::High > ProcessingPriority::Normal);
434 assert!(ProcessingPriority::Normal > ProcessingPriority::Low);
435 }
436}