sklears_core/
streaming_lifetimes.rs

1/// Explicit lifetime parameter examples for streaming operations
2///
3/// This module demonstrates advanced lifetime parameter usage in streaming
4/// machine learning operations, providing clear lifetime relationships
5/// and better compile-time safety.
6use futures_core::{Future, Stream};
7use std::pin::Pin;
8
9// Type aliases for complex return types
10type WindowedStream<'window, Output, E> = Pin<
11    Box<
12        dyn Stream<Item = std::result::Result<WindowedOutput<'window, Output>, E>> + Send + 'window,
13    >,
14>;
15
16type ProcessingStream<'stream, Output, E> =
17    Pin<Box<dyn Stream<Item = std::result::Result<Vec<Output>, E>> + Send + 'stream>>;
18
19type ZeroCopyStream<'stream, Input, Output, E> = Pin<
20    Box<
21        dyn Stream<Item = std::result::Result<OutputView<'stream, Input, Output>, E>>
22            + Send
23            + 'stream,
24    >,
25>;
26
27/// Streaming data processor with explicit lifetime relationships
28///
29/// This trait demonstrates advanced lifetime parameter patterns for streaming operations.
30/// The lifetime parameters are explicitly documented to clarify their relationships:
31///
32/// - `'data`: Lifetime of the input data being processed
33/// - `'config`: Lifetime of the configuration object
34/// - `'processor`: Lifetime of the processor itself
35pub trait StreamingProcessor<'data, 'config, 'processor, Input, Output>
36where
37    'data: 'processor,   // Data must live at least as long as the processor
38    'config: 'processor, // Config must live at least as long as the processor
39    Input: 'data,
40{
41    type Error: std::error::Error + Send + Sync + 'static;
42
43    /// Process a single item with explicit lifetime management
44    ///
45    /// The lifetime parameters ensure that:
46    /// - Input data lives for the duration of processing
47    /// - Config is accessible throughout the operation
48    /// - The processor maintains its state correctly
49    fn process_item<'item>(
50        &'processor self,
51        item: &'item Input,
52        config: &'config ProcessingConfig,
53    ) -> Pin<Box<dyn Future<Output = std::result::Result<Output, Self::Error>> + Send + 'item>>
54    where
55        'processor: 'item, // Processor must outlive the item processing
56        'config: 'item,    // Config must outlive the item processing
57        'data: 'item,      // Data lifetime must cover item processing
58        Input: 'item,      // Input must be valid for item processing
59        Output: 'item; // Output is tied to item processing lifetime
60
61    /// Process a stream of items with batching and explicit lifetime control
62    ///
63    /// This method shows how to handle complex lifetime relationships in streaming:
64    /// - The input stream has its own lifetime ('stream)
65    /// - The output stream is tied to multiple lifetimes
66    /// - Memory management is explicit and safe
67    fn process_stream<'stream>(
68        &'processor self,
69        input_stream: Pin<Box<dyn Stream<Item = Input> + Send + 'stream>>,
70        config: &'config ProcessingConfig,
71        batch_size: usize,
72    ) -> ProcessingStream<'stream, Output, Self::Error>
73    where
74        'processor: 'stream, // Processor must outlive stream processing
75        'config: 'stream,    // Config must outlive stream processing
76        'data: 'stream,      // Data lifetime must cover stream processing
77        Input: 'stream,      // Input items must be valid for stream duration
78        Output: 'stream; // Output items are tied to stream lifetime
79
80    /// Transform a stream with memory-efficient windowing
81    ///
82    /// Demonstrates lifetime management for windowed operations where:
83    /// - Windows maintain references to multiple input items
84    /// - Output lifetimes are carefully managed to prevent memory leaks
85    /// - Window size affects memory lifetime requirements
86    fn windowed_transform<'window>(
87        &'processor self,
88        input_stream: Pin<Box<dyn Stream<Item = &'window Input> + Send + 'window>>,
89        window_size: usize,
90        config: &'config ProcessingConfig,
91    ) -> WindowedStream<'window, Output, Self::Error>
92    where
93        'processor: 'window, // Processor must outlive windowed operation
94        'config: 'window,    // Config must outlive windowed operation
95        'data: 'window,      // Data must outlive windowed operation
96        Input: 'window,      // Input references must be valid for window duration
97        Output: 'window; // Output is tied to window lifetime
98}
99
100/// Configuration for streaming processing operations
101#[derive(Debug, Clone)]
102pub struct ProcessingConfig {
103    /// Buffer size for streaming operations
104    pub buffer_size: usize,
105    /// Maximum memory usage per batch (bytes)
106    pub max_memory_per_batch: usize,
107    /// Enable memory-efficient processing
108    pub memory_efficient: bool,
109    /// Timeout for individual operations
110    pub operation_timeout: std::time::Duration,
111}
112
113/// Windowed output that maintains lifetime relationships
114///
115/// This struct demonstrates how to maintain safe lifetime relationships
116/// in windowed streaming operations where the output references
117/// input data across multiple time steps.
118#[derive(Debug)]
119pub struct WindowedOutput<'window, T> {
120    /// The computed output value
121    pub value: T,
122    /// Window metadata with lifetime tied to input
123    pub window_info: WindowInfo<'window>,
124    /// Processing timestamp
125    pub timestamp: std::time::Instant,
126}
127
128/// Window information with explicit lifetime management
129#[derive(Debug)]
130pub struct WindowInfo<'window> {
131    /// Window size used for computation
132    pub size: usize,
133    /// References to the first and last items in the window
134    /// This demonstrates safe lifetime management for references
135    pub window_bounds: Option<WindowBounds<'window>>,
136    /// Window statistics
137    pub stats: WindowStats,
138}
139
140/// Window bounds with lifetime-managed references
141#[derive(Debug)]
142pub struct WindowBounds<'window> {
143    /// Reference to the first item in the window
144    pub first: Option<&'window dyn std::fmt::Debug>,
145    /// Reference to the last item in the window
146    pub last: Option<&'window dyn std::fmt::Debug>,
147}
148
149/// Statistics computed over the window
150#[derive(Debug, Clone)]
151pub struct WindowStats {
152    /// Number of items processed
153    pub count: usize,
154    /// Processing duration
155    pub duration: std::time::Duration,
156    /// Memory usage during processing
157    pub memory_used: usize,
158}
159
160/// Advanced streaming ML pipeline with explicit lifetime management
161///
162/// This trait demonstrates complex lifetime relationships in ML pipelines
163/// where multiple processing stages have different lifetime requirements.
164pub trait StreamingMLPipeline<'data, 'model, Input, Output>
165where
166    'data: 'model, // Data must outlive the model
167    Input: 'data,
168{
169    type Model: 'model;
170    type Error: std::error::Error + Send + Sync + 'static;
171    type IntermediateOutput: 'model;
172
173    /// Multi-stage processing with explicit lifetime control
174    ///
175    /// This method shows how to chain multiple processing stages
176    /// while maintaining safe lifetime relationships throughout.
177    fn process_pipeline<'pipeline>(
178        &'model self,
179        input_stream: Pin<Box<dyn Stream<Item = Input> + Send + 'pipeline>>,
180        stages: &'pipeline [PipelineStage<'model, Self::Model>],
181    ) -> Pin<Box<dyn Stream<Item = std::result::Result<Output, Self::Error>> + Send + 'pipeline>>
182    where
183        'model: 'pipeline, // Model must outlive pipeline execution
184        'data: 'pipeline,  // Data must outlive pipeline execution
185        Input: 'pipeline,  // Input must be valid for pipeline duration
186        Output: 'pipeline, // Output is tied to pipeline lifetime
187        Self::IntermediateOutput: 'pipeline;
188
189    /// Parallel processing with lifetime-aware work distribution
190    ///
191    /// Demonstrates how to safely distribute work across multiple threads
192    /// while maintaining lifetime guarantees for shared data.
193    fn process_parallel<'parallel>(
194        &'model self,
195        input_stream: Pin<Box<dyn Stream<Item = Input> + Send + 'parallel>>,
196        worker_count: usize,
197        config: &'parallel ProcessingConfig,
198    ) -> Pin<Box<dyn Stream<Item = std::result::Result<Output, Self::Error>> + Send + 'parallel>>
199    where
200        'model: 'parallel,        // Model must outlive parallel processing
201        'data: 'parallel,         // Data must outlive parallel processing
202        Input: 'parallel + Clone, // Input must be cloneable for distribution
203        Output: 'parallel,        // Output is tied to parallel processing lifetime
204        Self::Model: Sync,        // Model must be thread-safe for parallel access
205        Self: Sync; // Pipeline must be thread-safe
206
207    /// Adaptive processing with dynamic lifetime management
208    ///
209    /// Shows how to handle dynamic lifetime requirements where
210    /// processing parameters may change based on input characteristics.
211    fn process_adaptive<'adaptive>(
212        &'model self,
213        input_stream: Pin<Box<dyn Stream<Item = Input> + Send + 'adaptive>>,
214        adaptation_fn: &'adaptive dyn Fn(&Input) -> AdaptationParams,
215    ) -> Pin<Box<dyn Stream<Item = std::result::Result<Output, Self::Error>> + Send + 'adaptive>>
216    where
217        'model: 'adaptive, // Model must outlive adaptive processing
218        'data: 'adaptive,  // Data must outlive adaptive processing
219        Input: 'adaptive,  // Input must be valid for adaptive processing
220        Output: 'adaptive; // Output is tied to adaptive processing lifetime
221}
222
223/// Pipeline stage with model lifetime management
224#[derive(Debug)]
225pub struct PipelineStage<'model, Model> {
226    /// Name of the processing stage
227    pub name: &'static str,
228    /// Model used in this stage (with lifetime bound to pipeline)
229    pub model: &'model Model,
230    /// Stage-specific configuration
231    pub config: StageConfig,
232}
233
234/// Configuration for individual pipeline stages
235#[derive(Debug, Clone)]
236pub struct StageConfig {
237    /// Enable this stage
238    pub enabled: bool,
239    /// Stage-specific parameters
240    pub parameters: std::collections::HashMap<String, f64>,
241    /// Memory limit for this stage
242    pub memory_limit: Option<usize>,
243}
244
245/// Parameters for adaptive processing
246#[derive(Debug, Clone)]
247pub struct AdaptationParams {
248    /// Batch size adaptation
249    pub batch_size: usize,
250    /// Memory allocation hint
251    pub memory_hint: usize,
252    /// Processing priority
253    pub priority: ProcessingPriority,
254}
255
256/// Processing priority levels
257#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
258pub enum ProcessingPriority {
259    Low,
260    Normal,
261    High,
262    Critical,
263}
264
265/// Zero-copy streaming operations with lifetime guarantees
266///
267/// This trait demonstrates zero-copy streaming patterns with explicit
268/// lifetime management to ensure memory safety without performance overhead.
269pub trait ZeroCopyStreaming<'data, Input, Output>
270where
271    Input: 'data,
272    Output: 'data,
273{
274    type Error: std::error::Error + Send + Sync + 'static;
275
276    /// Process data without copying, maintaining lifetime relationships
277    ///
278    /// This method shows how to process data in-place while ensuring
279    /// that all lifetime relationships are maintained correctly.
280    fn process_zero_copy<'processing>(
281        &self,
282        data: &'processing mut [Input],
283        output_buffer: &'processing mut [Output],
284    ) -> Pin<Box<dyn Future<Output = std::result::Result<usize, Self::Error>> + Send + 'processing>>
285    where
286        'data: 'processing, // Data lifetime must cover processing
287        Input: 'processing, // Input must be valid for processing duration
288        Output: 'processing; // Output buffer must be valid for processing
289
290    /// Stream processing with zero-copy views
291    ///
292    /// Demonstrates streaming with zero-copy views where the output
293    /// contains references to the input data rather than copies.
294    fn stream_zero_copy_views<'stream>(
295        &self,
296        input_stream: Pin<Box<dyn Stream<Item = &'stream [Input]> + Send + 'stream>>,
297    ) -> ZeroCopyStream<'stream, Input, Output, Self::Error>
298    where
299        'data: 'stream, // Data must outlive streaming
300        Input: 'stream, // Input slices must be valid for stream duration
301        Output: 'stream; // Output views are tied to stream lifetime
302}
303
304/// Zero-copy output view with lifetime management
305///
306/// This struct demonstrates how to create zero-copy views of processed data
307/// while maintaining safe lifetime relationships between input and output.
308#[derive(Debug)]
309pub struct OutputView<'view, Input, Output> {
310    /// Processed output data
311    pub output: Output,
312    /// Optional reference to original input (zero-copy)
313    pub input_ref: Option<&'view Input>,
314    /// Processing metadata
315    pub metadata: ViewMetadata,
316}
317
318/// Metadata for zero-copy views
319#[derive(Debug, Clone)]
320pub struct ViewMetadata {
321    /// Processing time
322    pub processing_time: std::time::Duration,
323    /// Memory overhead (should be minimal for zero-copy)
324    pub memory_overhead: usize,
325    /// Whether the operation was truly zero-copy
326    pub zero_copy: bool,
327}
328
329impl Default for ProcessingConfig {
330    fn default() -> Self {
331        Self {
332            buffer_size: 8192,
333            max_memory_per_batch: 1024 * 1024, // 1MB
334            memory_efficient: true,
335            operation_timeout: std::time::Duration::from_secs(30),
336        }
337    }
338}
339
340impl Default for StageConfig {
341    fn default() -> Self {
342        Self {
343            enabled: true,
344            parameters: std::collections::HashMap::new(),
345            memory_limit: None,
346        }
347    }
348}
349
350impl Default for AdaptationParams {
351    fn default() -> Self {
352        Self {
353            batch_size: 1000,
354            memory_hint: 1024 * 1024, // 1MB
355            priority: ProcessingPriority::Normal,
356        }
357    }
358}
359
360/// Lifetime documentation utilities
361///
362/// This module provides documentation helpers for understanding
363/// lifetime relationships in complex streaming operations.
364pub mod lifetime_docs {
365    /// Documents the lifetime relationships in a streaming operation
366    ///
367    /// # Lifetime Parameters
368    /// - `'data`: The lifetime of the source data being processed
369    /// - `'processor`: The lifetime of the processing pipeline
370    /// - `'config`: The lifetime of configuration objects
371    /// - `'output`: The lifetime of the output data
372    ///
373    /// # Lifetime Relationships
374    /// - `'data: 'processor` - Source data must outlive the processor
375    /// - `'config: 'processor` - Configuration must outlive the processor
376    /// - `'processor: 'output` - Processor must outlive output generation
377    ///
378    /// # Memory Safety Guarantees
379    /// - No dangling references in streaming operations
380    /// - Proper cleanup of resources when lifetimes end
381    /// - Zero-copy operations maintain reference validity
382    /// - Batched processing respects memory constraints
383    pub fn document_streaming_lifetimes() {
384        // This function serves as documentation for lifetime patterns
385        // used throughout the streaming operations in this module.
386    }
387
388    /// Documents common lifetime anti-patterns to avoid
389    ///
390    /// # Anti-patterns
391    /// - Returning references with insufficient lifetime bounds
392    /// - Storing references without proper lifetime constraints
393    /// - Using 'static where shorter lifetimes would suffice
394    /// - Mixing owned and borrowed data without clear lifetime bounds
395    pub fn document_lifetime_antipatterns() {
396        // This function documents what NOT to do with lifetimes
397        // in streaming machine learning operations.
398    }
399}
400
401#[allow(non_snake_case)]
402#[cfg(test)]
403mod tests {
404    use super::*;
405
406    #[test]
407    fn test_processing_config_default() {
408        let config = ProcessingConfig::default();
409        assert_eq!(config.buffer_size, 8192);
410        assert_eq!(config.max_memory_per_batch, 1024 * 1024);
411        assert!(config.memory_efficient);
412    }
413
414    #[test]
415    fn test_stage_config_default() {
416        let config = StageConfig::default();
417        assert!(config.enabled);
418        assert!(config.parameters.is_empty());
419        assert!(config.memory_limit.is_none());
420    }
421
422    #[test]
423    fn test_adaptation_params_default() {
424        let params = AdaptationParams::default();
425        assert_eq!(params.batch_size, 1000);
426        assert_eq!(params.memory_hint, 1024 * 1024);
427        assert_eq!(params.priority, ProcessingPriority::Normal);
428    }
429
430    #[test]
431    fn test_processing_priority_ordering() {
432        assert!(ProcessingPriority::Critical > ProcessingPriority::High);
433        assert!(ProcessingPriority::High > ProcessingPriority::Normal);
434        assert!(ProcessingPriority::Normal > ProcessingPriority::Low);
435    }
436}