rs2_stream/state/
stream_ext.rs

1use crate::*;
2use crate::resource_manager::{get_global_resource_manager, ResourceManager};
3use async_stream::stream;
4use futures_core::Stream;
5use futures_util::pin_mut;
6use futures_util::stream::StreamExt;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::collections::HashSet;
10use std::pin::Pin;
11use std::sync::{Arc, Mutex};
12use std::time::{Duration, SystemTime, UNIX_EPOCH};
13use tokio::time::sleep;
14use crate::state::traits::KeyExtractor;
15use crate::state::{StateConfig, StateError, StateStorage};
16
17// Memory management constants
18const MAX_HASHMAP_KEYS: usize = 10_000;
19const MAX_GROUP_SIZE: usize = 10_000; // Max items per group
20const MAX_PATTERN_SIZE: usize = 1_000; // Max items per pattern
21const CLEANUP_INTERVAL: u64 = 1000; // Cleanup every 1000 items (increased from 100)
22const RESOURCE_TRACKING_INTERVAL: u64 = 100; // Track resources every 100 items
23const DEFAULT_BUFFER_SIZE: usize = 1024;
24
25// LRU eviction helper
26fn evict_oldest_entries<K, V>(map: &mut HashMap<K, V>, max_keys: usize)
27where
28    K: Clone + std::hash::Hash + Eq + std::fmt::Display + std::cmp::Ord,
29    V: Clone,
30{
31    if map.len() > max_keys {
32        let mut entries: Vec<_> = map.iter().map(|(k, _)| k.clone()).collect();
33        entries.sort(); // Simple eviction strategy - could be improved with proper LRU
34        let to_remove = entries.len() - max_keys;
35        for key in entries.into_iter().take(to_remove) {
36            map.remove(&key);
37        }
38    }
39}
40
41// Optimized resource tracking - batch operations
42async fn track_resource_batch(
43    resource_manager: &Arc<ResourceManager>,
44    allocations: u64,
45    deallocations: u64,
46    buffer_overflows: u64,
47) {
48    if allocations > 0 {
49        resource_manager.track_memory_allocation(allocations).await.ok();
50    }
51    if deallocations > 0 {
52        resource_manager.track_memory_deallocation(deallocations).await;
53    }
54    for _ in 0..buffer_overflows {
55        resource_manager.track_buffer_overflow().await.ok();
56    }
57}
58
59#[derive(Serialize, Deserialize, Clone)]
60struct ThrottleState {
61    count: u32,
62    window_start: u64, // UNIX timestamp in milliseconds
63}
64
65#[derive(Serialize, Deserialize, Clone)]
66struct SessionState {
67    last_activity: u64, // UNIX timestamp
68    is_new_session: bool,
69}
70
71#[derive(Serialize, Deserialize, Clone)]
72struct LeftItemWithTime<T> {
73    item: T,
74    timestamp: u64,
75    key: String,
76}
77
78#[derive(Serialize, Deserialize, Clone)]
79struct RightItemWithTime<U> {
80    item: U,
81    timestamp: u64,
82    key: String,
83}
84
85/// Extension trait for adding stateful operations to streams
86pub trait StatefulStreamExt<T>: Stream<Item = T> + Send + Sync + Sized + Unpin + 'static
87where
88    Self: 'static,
89    T: Send + Sync + Clone + Serialize + for<'de> Deserialize<'de> + 'static,
90{
91    /// Apply a stateful map operation
92    fn stateful_map_rs2<F, R>(
93        self,
94        config: StateConfig,
95        key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
96        mut f: F,
97    ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
98    where
99        F: FnMut(
100                T,
101                StateAccess,
102            ) -> std::pin::Pin<
103                Box<dyn std::future::Future<Output = Result<R, StateError>> + Send>,
104            > + Send
105            + Sync
106            + 'static,
107        R: Send + Sync + 'static,
108        Self: Sized,
109    {
110        let storage = config.create_storage_arc();
111        let resource_manager = get_global_resource_manager();
112
113        Box::pin(stream! {
114            let stream = self;
115            futures::pin_mut!(stream);
116            let mut state: HashMap<String, ()> = HashMap::new();
117            let mut item_count = 0u64;
118            let mut pending_allocations = 0u64;
119            let mut pending_deallocations = 0u64;
120            let mut pending_buffer_overflows = 0u64;
121
122            while let Some(item) = StreamExt::next(&mut stream).await {
123                let key = key_extractor.extract_key(&item);
124
125                // Periodic cleanup and resource tracking
126                item_count += 1;
127                if item_count % CLEANUP_INTERVAL == 0 {
128                    let before_len = state.len();
129                    evict_oldest_entries(&mut state, MAX_HASHMAP_KEYS);
130                    let after_len = state.len();
131                    if before_len > after_len {
132                        pending_deallocations += (before_len - after_len) as u64;
133                        pending_buffer_overflows += 1;
134                    }
135                }
136
137                // Batch resource tracking
138                if item_count % RESOURCE_TRACKING_INTERVAL == 0 {
139                    track_resource_batch(&resource_manager, pending_allocations, pending_deallocations, pending_buffer_overflows).await;
140                    pending_allocations = 0;
141                    pending_deallocations = 0;
142                    pending_buffer_overflows = 0;
143                }
144
145                let is_new_key = !state.contains_key(&key);
146                state.entry(key.clone()).or_insert(());
147                if is_new_key {
148                    pending_allocations += 1;
149                }
150
151                let state_access = StateAccess::new(storage.clone(), key);
152                match f(item, state_access).await {
153                    Ok(result) => yield Ok(result),
154                    Err(e) => yield Err(e),
155                }
156            }
157
158            // Final resource tracking
159            if pending_allocations > 0 || pending_deallocations > 0 || pending_buffer_overflows > 0 {
160                track_resource_batch(&resource_manager, pending_allocations, pending_deallocations, pending_buffer_overflows).await;
161            }
162        })
163    }
164
165    /// Apply a stateful filter operation
166    fn stateful_filter_rs2<F>(
167        self,
168        config: StateConfig,
169        key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
170        mut f: F,
171    ) -> Pin<Box<dyn Stream<Item = Result<T, StateError>> + Send>>
172    where
173        F: FnMut(
174                &T,
175                StateAccess,
176            ) -> std::pin::Pin<
177                Box<dyn std::future::Future<Output = Result<bool, StateError>> + Send>,
178            > + Send
179            + Sync
180            + 'static,
181        Self: Sized,
182    {
183        let storage = config.create_storage_arc();
184        let resource_manager = get_global_resource_manager();
185
186        Box::pin(stream! {
187            let stream = self;
188            futures::pin_mut!(stream);
189            let mut seen_keys: HashSet<String> = HashSet::new();
190            let mut item_count = 0u64;
191            let mut pending_allocations = 0u64;
192
193            while let Some(item) = StreamExt::next(&mut stream).await {
194                let key = key_extractor.extract_key(&item);
195
196                // Optimized cleanup - only when necessary
197                item_count += 1;
198                if item_count % CLEANUP_INTERVAL == 0 && seen_keys.len() > MAX_HASHMAP_KEYS {
199                    // More efficient cleanup - clear all and let it rebuild
200                    let old_size = seen_keys.len();
201                    seen_keys.clear();
202                    pending_allocations = pending_allocations.saturating_sub(old_size as u64);
203                }
204
205                // Batch resource tracking
206                if item_count % RESOURCE_TRACKING_INTERVAL == 0 {
207                    if pending_allocations > 0 {
208                        resource_manager.track_memory_allocation(pending_allocations).await.ok();
209                        pending_allocations = 0;
210                    }
211                }
212
213                // Optimized key insertion - avoid double lookup
214                let is_new_key = seen_keys.insert(key.clone());
215                if is_new_key {
216                    pending_allocations += 1;
217                }
218
219                let state_access = StateAccess::new(storage.clone(), key);
220                match f(&item, state_access).await {
221                    Ok(should_emit) => {
222                        if should_emit {
223                            yield Ok(item);
224                        }
225                    }
226                    Err(e) => yield Err(e),
227                }
228            }
229
230            // Final resource tracking
231            if pending_allocations > 0 {
232                resource_manager.track_memory_allocation(pending_allocations).await.ok();
233            }
234        })
235    }
236
237    /// Apply a stateful fold operation
238    fn stateful_fold_rs2<F, R>(
239        self,
240        config: StateConfig,
241        key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
242        initial: R,
243        mut f: F,
244    ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
245    where
246        F: FnMut(
247                R,
248                T,
249                StateAccess,
250            ) -> std::pin::Pin<
251                Box<dyn std::future::Future<Output = Result<R, StateError>> + Send>,
252            > + Send
253            + Sync
254            + 'static,
255        R: Send + Sync + Clone + 'static,
256        Self: Sized,
257    {
258        let storage = config.create_storage_arc();
259        let resource_manager = get_global_resource_manager();
260
261        Box::pin(stream! {
262            let stream = self;
263            futures::pin_mut!(stream);
264            let mut accumulators: HashMap<String, R> = HashMap::new();
265            let mut item_count = 0u64;
266            let mut pending_allocations = 0u64;
267            let mut pending_deallocations = 0u64;
268            let mut pending_buffer_overflows = 0u64;
269
270            while let Some(item) = StreamExt::next(&mut stream).await {
271                let key = key_extractor.extract_key(&item);
272
273                // Periodic cleanup to prevent memory leaks
274                item_count += 1;
275                if item_count % CLEANUP_INTERVAL == 0 {
276                    let before_len = accumulators.len();
277                    evict_oldest_entries(&mut accumulators, MAX_HASHMAP_KEYS);
278                    let after_len = accumulators.len();
279                    if before_len > after_len {
280                        pending_deallocations += (before_len - after_len) as u64;
281                        pending_buffer_overflows += 1;
282                    }
283                }
284
285                // Batch resource tracking
286                if item_count % RESOURCE_TRACKING_INTERVAL == 0 {
287                    track_resource_batch(&resource_manager, pending_allocations, pending_deallocations, pending_buffer_overflows).await;
288                    pending_allocations = 0;
289                    pending_deallocations = 0;
290                    pending_buffer_overflows = 0;
291                }
292
293                let is_new_key = !accumulators.contains_key(&key);
294                let acc = accumulators.entry(key.clone()).or_insert_with(|| initial.clone());
295                if is_new_key {
296                    pending_allocations += 1;
297                }
298                let state_access = StateAccess::new(storage.clone(), key);
299
300                match f(acc.clone(), item, state_access).await {
301                    Ok(new_acc) => {
302                        *acc = new_acc.clone();
303                        yield Ok(new_acc);
304                    }
305                    Err(e) => yield Err(e),
306                }
307            }
308
309            // Final resource tracking
310            if pending_allocations > 0 || pending_deallocations > 0 || pending_buffer_overflows > 0 {
311                track_resource_batch(&resource_manager, pending_allocations, pending_deallocations, pending_buffer_overflows).await;
312            }
313        })
314    }
315
316    /// Apply a stateful reduce operation
317    fn stateful_reduce_rs2<F, R>(
318        self,
319        config: StateConfig,
320        key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
321        initial: R,
322        mut f: F,
323    ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
324    where
325        F: FnMut(
326                R,
327                T,
328                StateAccess,
329            ) -> std::pin::Pin<
330                Box<dyn std::future::Future<Output = Result<R, StateError>> + Send>,
331            > + Send
332            + Sync
333            + 'static,
334        R: Send + Sync + Clone + 'static,
335        Self: Sized,
336    {
337        let storage = config.create_storage_arc();
338        let resource_manager = get_global_resource_manager();
339
340        Box::pin(stream! {
341            let stream = self;
342            futures::pin_mut!(stream);
343            let mut accumulators: HashMap<String, R> = HashMap::new();
344            let mut item_count = 0u64;
345            let mut pending_allocations = 0u64;
346            let mut pending_deallocations = 0u64;
347            let mut pending_buffer_overflows = 0u64;
348
349            while let Some(item) = StreamExt::next(&mut stream).await {
350                let key = key_extractor.extract_key(&item);
351
352                // Periodic cleanup to prevent memory leaks
353                item_count += 1;
354                if item_count % CLEANUP_INTERVAL == 0 {
355                    let before_len = accumulators.len();
356                    evict_oldest_entries(&mut accumulators, MAX_HASHMAP_KEYS);
357                    let after_len = accumulators.len();
358                    if before_len > after_len {
359                        pending_deallocations += (before_len - after_len) as u64;
360                        pending_buffer_overflows += 1;
361                    }
362                }
363
364                // Batch resource tracking
365                if item_count % RESOURCE_TRACKING_INTERVAL == 0 {
366                    track_resource_batch(&resource_manager, pending_allocations, pending_deallocations, pending_buffer_overflows).await;
367                    pending_allocations = 0;
368                    pending_deallocations = 0;
369                    pending_buffer_overflows = 0;
370                }
371
372                let is_new_key = !accumulators.contains_key(&key);
373                let acc = accumulators.entry(key.clone()).or_insert_with(|| initial.clone());
374                if is_new_key {
375                    pending_allocations += 1;
376                }
377                let state_access = StateAccess::new(storage.clone(), key);
378
379                match f(acc.clone(), item, state_access).await {
380                    Ok(new_acc) => {
381                        *acc = new_acc.clone();
382                        yield Ok(new_acc);
383                    }
384                    Err(e) => yield Err(e),
385                }
386            }
387
388            // Final resource tracking
389            if pending_allocations > 0 || pending_deallocations > 0 || pending_buffer_overflows > 0 {
390                track_resource_batch(&resource_manager, pending_allocations, pending_deallocations, pending_buffer_overflows).await;
391            }
392        })
393    }
394
395    /// Apply a stateful group by operation
396    fn stateful_group_by_rs2<F, R>(
397        self,
398        config: StateConfig,
399        key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
400        f: F,
401    ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
402    where
403        F: FnMut(
404                String,
405                Vec<T>,
406                StateAccess,
407            ) -> std::pin::Pin<
408                Box<dyn std::future::Future<Output = Result<R, StateError>> + Send>,
409            > + Send
410            + Sync
411            + 'static,
412        R: Send + Sync + 'static,
413        Self: Sized,
414    {
415        self.stateful_group_by_advanced_rs2(config, key_extractor, None, None, false, f)
416    }
417
418    /// Apply a stateful group by operation with advanced configuration
419    fn stateful_group_by_advanced_rs2<F, R>(
420        self,
421        config: StateConfig,
422        key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
423        max_group_size: Option<usize>, // Emit when group reaches this size
424        group_timeout: Option<std::time::Duration>, // Emit group after this timeout
425        emit_on_key_change: bool,      // Emit previous group when key changes
426        mut f: F,
427    ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
428    where
429        F: FnMut(
430                String,
431                Vec<T>,
432                StateAccess,
433            ) -> std::pin::Pin<
434                Box<dyn std::future::Future<Output = Result<R, StateError>> + Send>,
435            > + Send
436            + Sync
437            + 'static,
438        R: Send + Sync + 'static,
439        Self: Sized,
440    {
441        let storage = config.create_storage_arc();
442        let timeout_ms = group_timeout.map(|d| d.as_millis() as u64);
443        let max_group_size = max_group_size.unwrap_or(MAX_GROUP_SIZE);
444        let resource_manager = get_global_resource_manager();
445
446        Box::pin(stream! {
447            let stream = self;
448            futures::pin_mut!(stream);
449            let mut groups: HashMap<String, Vec<T>> = HashMap::new();
450            let mut group_timestamps: HashMap<String, u64> = HashMap::new();
451            let mut last_key: Option<String> = None;
452            let mut item_count = 0u64;
453            let mut pending_allocations = 0u64;
454            let mut pending_deallocations = 0u64;
455            let mut pending_buffer_overflows = 0u64;
456
457            while let Some(item) = StreamExt::next(&mut stream).await {
458                let key = key_extractor.extract_key(&item);
459                let now = unix_timestamp_millis();
460
461                // Periodic cleanup to prevent memory leaks
462                item_count += 1;
463                if item_count % CLEANUP_INTERVAL == 0 {
464                    let before_groups = groups.len();
465                    let before_timestamps = group_timestamps.len();
466                    evict_oldest_entries(&mut groups, MAX_HASHMAP_KEYS);
467                    evict_oldest_entries(&mut group_timestamps, MAX_HASHMAP_KEYS);
468                    let after_groups = groups.len();
469                    let after_timestamps = group_timestamps.len();
470                    if before_groups > after_groups || before_timestamps > after_timestamps {
471                        pending_deallocations += (before_groups + before_timestamps - after_groups - after_timestamps) as u64;
472                        pending_buffer_overflows += 1;
473                    }
474                }
475
476                // Batch resource tracking
477                if item_count % RESOURCE_TRACKING_INTERVAL == 0 {
478                    track_resource_batch(&resource_manager, pending_allocations, pending_deallocations, pending_buffer_overflows).await;
479                    pending_allocations = 0;
480                    pending_deallocations = 0;
481                    pending_buffer_overflows = 0;
482                }
483
484                // Check if we need to emit the previous group due to key change
485                if emit_on_key_change {
486                    if let Some(ref last_key_val) = last_key {
487                        if last_key_val != &key {
488                            // Key changed, emit the previous group
489                            if let Some(group_items) = groups.remove(last_key_val) {
490                                pending_deallocations += group_items.len() as u64;
491                                let state_access = StateAccess::new(storage.clone(), last_key_val.clone());
492                                match f(last_key_val.clone(), group_items, state_access).await {
493                                    Ok(result) => yield Ok(result),
494                                    Err(e) => yield Err(e),
495                                }
496                            }
497                            group_timestamps.remove(last_key_val);
498                        }
499                    }
500                }
501
502                // Optimized timeout check - only check current key instead of all groups
503                if let (Some(timeout), Some(&group_start)) = (timeout_ms, group_timestamps.get(&key)) {
504                    if now - group_start > timeout {
505                        if let Some(group_items) = groups.remove(&key) {
506                            pending_deallocations += group_items.len() as u64;
507                            let state_access = StateAccess::new(storage.clone(), key.clone());
508                            match f(key.clone(), group_items, state_access).await {
509                                Ok(result) => yield Ok(result),
510                                Err(e) => yield Err(e),
511                            }
512                        }
513                        group_timestamps.remove(&key);
514                    }
515                }
516
517                // Add item to current group
518                let is_new_group = !groups.contains_key(&key);
519                let group = groups.entry(key.clone()).or_insert_with(Vec::new);
520                if is_new_group {
521                    pending_allocations += 1;
522                }
523                group_timestamps.entry(key.clone()).or_insert(now);
524                group.push(item);
525                pending_allocations += 1;
526
527                // Check if we should emit this group due to size limit
528                if group.len() >= max_group_size {
529                    if let Some(group_items) = groups.remove(&key) {
530                        pending_deallocations += group_items.len() as u64;
531                        let state_access = StateAccess::new(storage.clone(), key.clone());
532                        match f(key.clone(), group_items, state_access).await {
533                            Ok(result) => yield Ok(result),
534                            Err(e) => yield Err(e),
535                        }
536                    }
537                    group_timestamps.remove(&key);
538                }
539
540                last_key = Some(key);
541            }
542
543            // Final cleanup - check for any remaining groups that have timed out
544            let now = unix_timestamp_millis();
545            let mut expired_keys = Vec::new();
546
547            if let Some(timeout) = timeout_ms {
548                for (key, &group_start) in &group_timestamps {
549                    if now - group_start > timeout {
550                        expired_keys.push(key.clone());
551                    }
552                }
553            }
554
555            // Emit expired groups
556            for key in expired_keys {
557                let key_clone = key.clone();
558                if let Some(group_items) = groups.remove(&key_clone) {
559                    pending_deallocations += group_items.len() as u64;
560                    let state_access = StateAccess::new(storage.clone(), key_clone.clone());
561                    match f(key_clone.clone(), group_items, state_access).await {
562                        Ok(result) => yield Ok(result),
563                        Err(e) => yield Err(e),
564                    }
565                }
566                group_timestamps.remove(&key_clone);
567            }
568
569            // Emit any remaining groups at stream end
570            for (key, group_items) in groups {
571                pending_deallocations += group_items.len() as u64;
572                let state_access = StateAccess::new(storage.clone(), key.clone());
573                match f(key, group_items, state_access).await {
574                    Ok(result) => yield Ok(result),
575                    Err(e) => yield Err(e),
576                }
577            }
578
579            // Final resource tracking
580            if pending_allocations > 0 || pending_deallocations > 0 || pending_buffer_overflows > 0 {
581                track_resource_batch(&resource_manager, pending_allocations, pending_deallocations, pending_buffer_overflows).await;
582            }
583        })
584    }
585
586    /// Apply a stateful deduplication operation
587    fn stateful_deduplicate_rs2<F>(
588        self,
589        config: StateConfig,
590        key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
591        ttl: std::time::Duration,
592        mut f: F,
593    ) -> Pin<Box<dyn Stream<Item = Result<T, StateError>> + Send>>
594    where
595        F: FnMut(T) -> T + Send + Sync + 'static,
596        Self: Sized,
597    {
598        let storage = config.create_storage_arc();
599        let ttl_ms = ttl.as_millis() as u64;
600        let resource_manager = get_global_resource_manager();
601
602        Box::pin(stream! {
603            let stream = self;
604            futures::pin_mut!(stream);
605            let mut item_count = 0u64;
606            let mut pending_allocations = 0u64;
607
608            while let Some(item) = StreamExt::next(&mut stream).await {
609                let key = key_extractor.extract_key(&item);
610                let state_access = StateAccess::new(storage.clone(), key.clone());
611
612                let now = unix_timestamp_millis();
613                let state_bytes = match state_access.get().await {
614                    Some(bytes) => bytes,
615                    None => Vec::new(),
616                };
617
618                let last_seen: u64 = if state_bytes.is_empty() {
619                    0
620                } else {
621                    match serde_json::from_slice(&state_bytes) {
622                        Ok(timestamp) => timestamp,
623                        Err(_) => 0,
624                    }
625                };
626
627                if now - last_seen > ttl_ms {
628                    // Track memory allocation for new state entry
629                    pending_allocations += 1;
630                    
631                    // Batch resource tracking
632                    item_count += 1;
633                    if item_count % RESOURCE_TRACKING_INTERVAL == 0 {
634                        if pending_allocations > 0 {
635                            resource_manager.track_memory_allocation(pending_allocations).await.ok();
636                            pending_allocations = 0;
637                        }
638                    }
639                    
640                    // Handle serialization error gracefully
641                    match serde_json::to_vec(&now) {
642                        Ok(timestamp_bytes) => {
643                            if let Err(e) = state_access.set(&timestamp_bytes).await {
644                                yield Err(StateError::Storage(format!("Failed to set state for deduplication: {}", e)));
645                                continue;
646                            }
647                        }
648                        Err(e) => {
649                            yield Err(StateError::Serialization(e));
650                            continue;
651                        }
652                    }
653
654                    yield Ok(f(item));
655                }
656            }
657
658            // Final resource tracking
659            if pending_allocations > 0 {
660                resource_manager.track_memory_allocation(pending_allocations).await.ok();
661            }
662        })
663    }
664
665    /// Apply a stateful throttle operation
666    fn stateful_throttle_rs2<F>(
667        self,
668        config: StateConfig,
669        key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
670        rate_limit: u32,
671        window_duration: std::time::Duration,
672        mut f: F,
673    ) -> Pin<Box<dyn Stream<Item = Result<T, StateError>> + Send>>
674    where
675        F: FnMut(T) -> T + Send + Sync + 'static,
676        Self: Sized,
677    {
678        let storage = config.create_storage_arc();
679        let window_ms = window_duration.as_millis() as u64;
680        let resource_manager = get_global_resource_manager();
681
682        Box::pin(stream! {
683            let stream = self;
684            futures::pin_mut!(stream);
685            let mut item_count = 0u64;
686            let mut pending_allocations = 0u64;
687            let mut pending_buffer_overflows = 0u64;
688
689            while let Some(item) = StreamExt::next(&mut stream).await {
690                let key = key_extractor.extract_key(&item);
691                let state_access = StateAccess::new(storage.clone(), key.clone());
692
693                let now = unix_timestamp_millis();
694
695                // Get current throttle state from storage
696                let state_bytes = match state_access.get().await {
697                    Some(bytes) => bytes,
698                    None => Vec::new(),
699                };
700
701                let mut throttle_state: ThrottleState = if state_bytes.is_empty() {
702                    // Track memory allocation for new throttle state
703                    pending_allocations += 1;
704                    ThrottleState { count: 0, window_start: now }
705                } else {
706                    match serde_json::from_slice(&state_bytes) {
707                        Ok(state) => state,
708                        Err(_) => ThrottleState { count: 0, window_start: now },
709                    }
710                };
711
712                // If window expired, reset
713                if now - throttle_state.window_start > window_ms {
714                    throttle_state.count = 0;
715                    throttle_state.window_start = now;
716                }
717
718                if throttle_state.count < rate_limit {
719                    throttle_state.count += 1;
720
721                    // Update state in storage
722                    match serde_json::to_vec(&throttle_state) {
723                        Ok(state_bytes) => {
724                            if let Err(e) = state_access.set(&state_bytes).await {
725                                yield Err(StateError::Storage(format!("Failed to set throttle state: {}", e)));
726                                continue;
727                            }
728                        }
729                        Err(e) => {
730                            yield Err(StateError::Serialization(e));
731                            continue;
732                        }
733                    }
734
735                    yield Ok(f(item));
736                } else {
737                    // Track buffer overflow when rate limiting
738                    pending_buffer_overflows += 1;
739                    
740                    // Optimized sleep - calculate remaining time more efficiently
741                    let elapsed_ms = now.saturating_sub(throttle_state.window_start);
742                    let remaining = if elapsed_ms >= window_ms {
743                        Duration::from_millis(0)
744                    } else {
745                        Duration::from_millis(window_ms - elapsed_ms)
746                    };
747
748                    // Only sleep if necessary and for a reasonable duration
749                    if remaining > Duration::from_millis(0) && remaining < Duration::from_secs(1) {
750                        sleep(remaining).await;
751                    }
752
753                    // After sleep, reset window and count
754                    let now2 = unix_timestamp_millis();
755                    throttle_state.count = 1;
756                    throttle_state.window_start = now2;
757
758                    // Update state in storage
759                    match serde_json::to_vec(&throttle_state) {
760                        Ok(state_bytes) => {
761                            if let Err(e) = state_access.set(&state_bytes).await {
762                                yield Err(StateError::Storage(format!("Failed to set throttle state: {}", e)));
763                                continue;
764                            }
765                        }
766                        Err(e) => {
767                            yield Err(StateError::Serialization(e));
768                            continue;
769                        }
770                    }
771
772                    yield Ok(f(item));
773                }
774
775                // Batch resource tracking - less frequent for throttle
776                item_count += 1;
777                if item_count % (RESOURCE_TRACKING_INTERVAL * 2) == 0 {
778                    if pending_allocations > 0 {
779                        resource_manager.track_memory_allocation(pending_allocations).await.ok();
780                        pending_allocations = 0;
781                    }
782                    for _ in 0..pending_buffer_overflows {
783                        resource_manager.track_buffer_overflow().await.ok();
784                    }
785                    pending_buffer_overflows = 0;
786                }
787            }
788
789            // Final resource tracking
790            if pending_allocations > 0 {
791                resource_manager.track_memory_allocation(pending_allocations).await.ok();
792            }
793            for _ in 0..pending_buffer_overflows {
794                resource_manager.track_buffer_overflow().await.ok();
795            }
796        })
797    }
798
799    /// Apply a stateful session operation
800    fn stateful_session_rs2<F>(
801        self,
802        config: StateConfig,
803        key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
804        session_timeout: std::time::Duration,
805        mut f: F,
806    ) -> Pin<Box<dyn Stream<Item = Result<T, StateError>> + Send>>
807    where
808        F: FnMut(T, bool) -> T + Send + Sync + 'static,
809        Self: Sized,
810    {
811        let storage = config.create_storage_arc();
812        let timeout_ms = session_timeout.as_millis() as u64;
813        let resource_manager = get_global_resource_manager();
814
815        Box::pin(stream! {
816            let stream = self;
817            futures::pin_mut!(stream);
818            let mut item_count = 0u64;
819            let mut pending_allocations = 0u64;
820
821            while let Some(item) = StreamExt::next(&mut stream).await {
822                let key = key_extractor.extract_key(&item);
823                let state_access = StateAccess::new(storage.clone(), key.clone());
824
825                let now = unix_timestamp_millis();
826                let state_bytes = match state_access.get().await {
827                    Some(bytes) => bytes,
828                    None => Vec::new(),
829                };
830
831                let mut state: SessionState = if state_bytes.is_empty() {
832                    // Track memory allocation for new session state
833                    pending_allocations += 1;
834                    SessionState { last_activity: now, is_new_session: true }
835                } else {
836                    match serde_json::from_slice(&state_bytes) {
837                        Ok(session_state) => session_state,
838                        Err(_) => SessionState { last_activity: now, is_new_session: true },
839                    }
840                };
841
842                let is_new_session = now - state.last_activity > timeout_ms;
843                state.last_activity = now;
844                state.is_new_session = is_new_session;
845
846                // Handle serialization and state setting errors gracefully
847                match serde_json::to_vec(&state) {
848                    Ok(state_bytes) => {
849                        if let Err(e) = state_access.set(&state_bytes).await {
850                            yield Err(StateError::Storage(format!("Failed to set session state: {}", e)));
851                            continue;
852                        }
853                    }
854                    Err(e) => {
855                        yield Err(StateError::Serialization(e));
856                        continue;
857                    }
858                }
859
860                // Batch resource tracking
861                item_count += 1;
862                if item_count % RESOURCE_TRACKING_INTERVAL == 0 {
863                    if pending_allocations > 0 {
864                        resource_manager.track_memory_allocation(pending_allocations).await.ok();
865                        pending_allocations = 0;
866                    }
867                }
868
869                yield Ok(f(item, is_new_session));
870            }
871
872            // Final resource tracking
873            if pending_allocations > 0 {
874                resource_manager.track_memory_allocation(pending_allocations).await.ok();
875            }
876        })
877    }
878
879    /// Apply a stateful pattern operation
880    fn stateful_pattern_rs2<F>(
881        self,
882        config: StateConfig,
883        key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
884        pattern_size: usize,
885        mut f: F,
886    ) -> Pin<Box<dyn Stream<Item = Result<Option<String>, StateError>> + Send>>
887    where
888        F: FnMut(
889                Vec<T>,
890                StateAccess,
891            ) -> std::pin::Pin<
892                Box<dyn std::future::Future<Output = Result<Option<String>, StateError>> + Send>,
893            > + Send
894            + Sync
895            + 'static,
896        Self: Sized,
897    {
898        let storage = config.create_storage_arc();
899        let resource_manager = get_global_resource_manager();
900
901        Box::pin(stream! {
902            let stream = self;
903            futures::pin_mut!(stream);
904            let mut patterns: HashMap<String, Vec<T>> = HashMap::new();
905            let mut item_count = 0u64;
906            let mut pending_allocations = 0u64;
907            let mut pending_deallocations = 0u64;
908            let mut pending_buffer_overflows = 0u64;
909
910            while let Some(item) = StreamExt::next(&mut stream).await {
911                let key = key_extractor.extract_key(&item);
912
913                // Periodic cleanup to prevent memory leaks
914                item_count += 1;
915                if item_count % CLEANUP_INTERVAL == 0 {
916                    let before_len = patterns.len();
917                    evict_oldest_entries(&mut patterns, MAX_HASHMAP_KEYS);
918                    let after_len = patterns.len();
919                    if before_len > after_len {
920                        pending_deallocations += (before_len - after_len) as u64;
921                        pending_buffer_overflows += 1;
922                    }
923                }
924
925                // Batch resource tracking
926                if item_count % RESOURCE_TRACKING_INTERVAL == 0 {
927                    track_resource_batch(&resource_manager, pending_allocations, pending_deallocations, pending_buffer_overflows).await;
928                    pending_allocations = 0;
929                    pending_deallocations = 0;
930                    pending_buffer_overflows = 0;
931                }
932
933                let is_new_pattern = !patterns.contains_key(&key);
934                let pattern = patterns.entry(key.clone()).or_insert_with(Vec::new);
935                if is_new_pattern {
936                    pending_allocations += 1;
937                }
938                pattern.push(item);
939                pending_allocations += 1;
940
941                // Limit pattern buffer size to prevent memory overflow
942                if pattern.len() > MAX_PATTERN_SIZE {
943                    let drained = pattern.len() - MAX_PATTERN_SIZE;
944                    pattern.drain(0..drained);
945                    pending_deallocations += drained as u64;
946                    pending_buffer_overflows += 1;
947                }
948
949                if pattern.len() >= pattern_size {
950                    let pattern_items = pattern.drain(..pattern_size).collect::<Vec<_>>();
951                    pending_deallocations += pattern_size as u64;
952                    let state_access = StateAccess::new(storage.clone(), key.clone());
953                    match f(pattern_items, state_access).await {
954                        Ok(result) => {
955                            if let Some(pattern_str) = result {
956                                yield Ok(Some(pattern_str));
957                            }
958                        }
959                        Err(e) => yield Err(e),
960                    }
961                }
962            }
963
964            // Final resource tracking
965            if pending_allocations > 0 || pending_deallocations > 0 || pending_buffer_overflows > 0 {
966                track_resource_batch(&resource_manager, pending_allocations, pending_deallocations, pending_buffer_overflows).await;
967            }
968        })
969    }
970
971    /// Join two streams based on keys with time-based windows (true streaming join)
972    fn stateful_join_rs2<U, F, R>(
973        self,
974        other: Pin<Box<dyn Stream<Item = U> + Send>>,
975        config: StateConfig,
976        key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
977        other_key_extractor: impl KeyExtractor<U> + Send + Sync + 'static,
978        window_duration: std::time::Duration,
979        mut f: F,
980    ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
981    where
982        F: FnMut(
983                T,
984                U,
985                StateAccess,
986            ) -> std::pin::Pin<
987                Box<dyn std::future::Future<Output = Result<R, StateError>> + Send>,
988            > + Send
989            + Sync
990            + 'static,
991        U: Send + Sync + Clone + Serialize + for<'de> Deserialize<'de> + 'static,
992        R: Send + Sync + 'static,
993        Self: Sized,
994    {
995        let storage = config.create_storage_arc();
996        let resource_manager = get_global_resource_manager();
997        Box::pin(stream! {
998            let left_stream = self;
999            let right_stream = other;
1000            futures::pin_mut!(left_stream);
1001            futures::pin_mut!(right_stream);
1002            let mut left_buffer: HashMap<String, Vec<LeftItemWithTime<T>>> = HashMap::new();
1003            let mut right_buffer: HashMap<String, Vec<RightItemWithTime<U>>> = HashMap::new();
1004            let window_ms = window_duration.as_millis() as u64;
1005            let mut item_count = 0u64;
1006            let mut pending_allocations = 0u64;
1007            let mut pending_deallocations = 0u64;
1008            let mut pending_buffer_overflows = 0u64;
1009
1010            loop {
1011                tokio::select! {
1012                    left_item = left_stream.next() => {
1013                        if let Some(item) = left_item {
1014                            let key = key_extractor.extract_key(&item);
1015                            let now = unix_timestamp_millis();
1016
1017                            // Periodic cleanup to prevent memory leaks
1018                            item_count += 1;
1019                            if item_count % CLEANUP_INTERVAL == 0 {
1020                                let before_left = left_buffer.len();
1021                                let before_right = right_buffer.len();
1022                                evict_oldest_entries(&mut left_buffer, MAX_HASHMAP_KEYS);
1023                                evict_oldest_entries(&mut right_buffer, MAX_HASHMAP_KEYS);
1024                                let after_left = left_buffer.len();
1025                                let after_right = right_buffer.len();
1026                                if before_left > after_left || before_right > after_right {
1027                                    pending_deallocations += (before_left + before_right - after_left - after_right) as u64;
1028                                    pending_buffer_overflows += 1;
1029                                }
1030                            }
1031
1032                            // Batch resource tracking
1033                            if item_count % RESOURCE_TRACKING_INTERVAL == 0 {
1034                                track_resource_batch(&resource_manager, pending_allocations, pending_deallocations, pending_buffer_overflows).await;
1035                                pending_allocations = 0;
1036                                pending_deallocations = 0;
1037                                pending_buffer_overflows = 0;
1038                            }
1039
1040                            // Clean up old left items
1041                            let before = left_buffer.entry(key.clone()).or_default().len();
1042                            left_buffer.entry(key.clone()).or_default().retain(|x| now - x.timestamp <= window_ms);
1043                            let after = left_buffer.entry(key.clone()).or_default().len();
1044                            if before > after {
1045                                pending_deallocations += (before - after) as u64;
1046                            }
1047
1048                            // Add new left item
1049                            let left_entry = LeftItemWithTime { item: item.clone(), timestamp: now, key: key.clone() };
1050                            let is_new_key = !left_buffer.contains_key(&key);
1051                            let left_buf = left_buffer.entry(key.clone()).or_default();
1052                            if is_new_key {
1053                                pending_allocations += 1;
1054                            }
1055                            left_buf.push(left_entry.clone());
1056                            pending_allocations += 1;
1057
1058                            // Evict oldest if buffer is full
1059                            let max_size = config.max_size.unwrap_or(DEFAULT_BUFFER_SIZE);
1060                            if left_buf.len() > max_size {
1061                                let removed = left_buf.len() - max_size;
1062                                left_buf.drain(0..removed);
1063                                pending_deallocations += removed as u64;
1064                                pending_buffer_overflows += 1;
1065                            }
1066
1067                            // Join with right items in window
1068                            if let Some(rights) = right_buffer.get(&key) {
1069                                for right in rights.iter().filter(|r| now - r.timestamp <= window_ms) {
1070                                    let state_access = StateAccess::new(storage.clone(), key.clone());
1071                                    match f(item.clone(), right.item.clone(), state_access).await {
1072                                        Ok(result) => yield Ok(result),
1073                                        Err(e) => yield Err(e),
1074                                    }
1075                                }
1076                            }
1077                        } else {
1078                            break;
1079                        }
1080                    }
1081                    right_item = right_stream.next() => {
1082                        if let Some(item) = right_item {
1083                            let key = other_key_extractor.extract_key(&item);
1084                            let now = unix_timestamp_millis();
1085                            // Periodic cleanup to prevent memory leaks
1086                            item_count += 1;
1087                            if item_count % CLEANUP_INTERVAL == 0 {
1088                                let before_left = left_buffer.len();
1089                                let before_right = right_buffer.len();
1090                                evict_oldest_entries(&mut left_buffer, MAX_HASHMAP_KEYS);
1091                                evict_oldest_entries(&mut right_buffer, MAX_HASHMAP_KEYS);
1092                                let after_left = left_buffer.len();
1093                                let after_right = right_buffer.len();
1094                                if before_left > after_left || before_right > after_right {
1095                                    pending_deallocations += (before_left + before_right - after_left - after_right) as u64;
1096                                    pending_buffer_overflows += 1;
1097                                }
1098                            }
1099
1100                            // Batch resource tracking
1101                            if item_count % RESOURCE_TRACKING_INTERVAL == 0 {
1102                                track_resource_batch(&resource_manager, pending_allocations, pending_deallocations, pending_buffer_overflows).await;
1103                                pending_allocations = 0;
1104                                pending_deallocations = 0;
1105                                pending_buffer_overflows = 0;
1106                            }
1107
1108                            // Clean up old right items
1109                            let before = right_buffer.entry(key.clone()).or_default().len();
1110                            right_buffer.entry(key.clone()).or_default().retain(|x| now - x.timestamp <= window_ms);
1111                            let after = right_buffer.entry(key.clone()).or_default().len();
1112                            if before > after {
1113                                pending_deallocations += (before - after) as u64;
1114                            }
1115                            // Clean up old left items
1116                            let before = left_buffer.entry(key.clone()).or_default().len();
1117                            left_buffer.entry(key.clone()).or_default().retain(|x| now - x.timestamp <= window_ms);
1118                            let after = left_buffer.entry(key.clone()).or_default().len();
1119                            if before > after {
1120                                pending_deallocations += (before - after) as u64;
1121                            }
1122                            // Add new right item
1123                            let right_entry = RightItemWithTime { item: item.clone(), timestamp: now, key: key.clone() };
1124                            let is_new_key = !right_buffer.contains_key(&key);
1125                            let right_buf = right_buffer.entry(key.clone()).or_default();
1126                            if is_new_key {
1127                                pending_allocations += 1;
1128                            }
1129                            right_buf.push(right_entry.clone());
1130                            pending_allocations += 1;
1131                            // Evict oldest if buffer is full
1132                            let max_size = config.max_size.unwrap_or(DEFAULT_BUFFER_SIZE);
1133                            if right_buf.len() > max_size {
1134                                let removed = right_buf.len() - max_size;
1135                                right_buf.drain(0..removed);
1136                                pending_deallocations += removed as u64;
1137                                pending_buffer_overflows += 1;
1138                            }
1139                            // Join with left items in window
1140                            if let Some(lefts) = left_buffer.get(&key) {
1141                                for left in lefts.iter().filter(|l| now - l.timestamp <= window_ms) {
1142                                    let state_access = StateAccess::new(storage.clone(), key.clone());
1143                                    match f(left.item.clone(), item.clone(), state_access).await {
1144                                        Ok(result) => yield Ok(result),
1145                                        Err(e) => yield Err(e),
1146                                    }
1147                                }
1148                            }
1149                        } else {
1150                            break;
1151                        }
1152                    }
1153                }
1154            }
1155
1156            // Final resource tracking
1157            if pending_allocations > 0 || pending_deallocations > 0 || pending_buffer_overflows > 0 {
1158                track_resource_batch(&resource_manager, pending_allocations, pending_deallocations, pending_buffer_overflows).await;
1159            }
1160        })
1161    }
1162
1163    /// Apply a stateful window operation (tumbling window, no partial emission)
1164    fn stateful_window_rs2<F, R>(
1165        self,
1166        config: StateConfig,
1167        key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
1168        window_size: usize,
1169        f: F,
1170    ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
1171    where
1172        F: FnMut(
1173                Vec<T>,
1174                StateAccess,
1175            ) -> std::pin::Pin<
1176                Box<dyn std::future::Future<Output = Result<R, StateError>> + Send>,
1177            > + Send
1178            + Sync
1179            + 'static,
1180        R: Send + Sync + 'static,
1181        Self: Sized,
1182    {
1183        self.stateful_window_rs2_advanced(config, key_extractor, window_size, None, false, f)
1184    }
1185
1186    /// Apply a stateful window operation with sliding window support
1187    fn stateful_window_rs2_advanced<F, R>(
1188        self,
1189        config: StateConfig,
1190        key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
1191        window_size: usize,
1192        slide_size: Option<usize>, // None for tumbling, Some(n) for sliding
1193        emit_partial: bool,        // Whether to emit partial windows at stream end
1194        mut f: F,
1195    ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
1196    where
1197        F: FnMut(
1198                Vec<T>,
1199                StateAccess,
1200            ) -> std::pin::Pin<
1201                Box<dyn std::future::Future<Output = Result<R, StateError>> + Send>,
1202            > + Send
1203            + Sync
1204            + 'static,
1205        R: Send + Sync + 'static,
1206        Self: Sized,
1207    {
1208        let storage = config.create_storage_arc();
1209        let slide_size = slide_size.unwrap_or(window_size); // Default to tumbling window
1210        let resource_manager = get_global_resource_manager();
1211
1212        Box::pin(stream! {
1213            let stream = self;
1214            futures::pin_mut!(stream);
1215            let mut windows: HashMap<String, Vec<T>> = HashMap::new();
1216            let mut item_count = 0u64;
1217            let mut pending_allocations = 0u64;
1218            let mut pending_deallocations = 0u64;
1219
1220            while let Some(item) = StreamExt::next(&mut stream).await {
1221                let key = key_extractor.extract_key(&item);
1222                let is_new_window = !windows.contains_key(&key);
1223                let window = windows.entry(key.clone()).or_insert_with(Vec::new);
1224                if is_new_window {
1225                    pending_allocations += 1;
1226                }
1227
1228                window.push(item);
1229                pending_allocations += 1;
1230
1231                // Batch resource tracking
1232                item_count += 1;
1233                if item_count % RESOURCE_TRACKING_INTERVAL == 0 {
1234                    if pending_allocations > 0 {
1235                        resource_manager.track_memory_allocation(pending_allocations).await.ok();
1236                        pending_allocations = 0;
1237                    }
1238                    if pending_deallocations > 0 {
1239                        resource_manager.track_memory_deallocation(pending_deallocations).await;
1240                        pending_deallocations = 0;
1241                    }
1242                }
1243
1244                // Emit window when it reaches the required size
1245                if window.len() >= window_size {
1246                    let window_items = if slide_size >= window_size {
1247                        // Tumbling window - take all items and clear the window
1248                        let items = window.drain(..).collect::<Vec<_>>();
1249                        pending_deallocations += items.len() as u64;
1250                        items
1251                    } else {
1252                        // Sliding window - take window_size items, keep the sliding portion
1253                        let items = window.drain(..window_size).collect::<Vec<_>>();
1254                        pending_deallocations += window_size as u64;
1255
1256                        // Calculate how many items to keep for the next window
1257                        let keep_count = window_size.saturating_sub(slide_size);
1258                        if keep_count > 0 && items.len() >= slide_size {
1259                            // Put back the items that should remain for the sliding window
1260                            let to_keep = items[slide_size..].to_vec();
1261                            let to_keep_len = to_keep.len();
1262                            window.splice(0..0, to_keep);
1263                            pending_allocations += to_keep_len as u64;
1264                        }
1265
1266                        items
1267                    };
1268
1269                    let state_access = StateAccess::new(storage.clone(), key.clone());
1270                    match f(window_items, state_access).await {
1271                        Ok(result) => yield Ok(result),
1272                        Err(e) => yield Err(e),
1273                    }
1274                }
1275            }
1276
1277            // Emit remaining partial windows if requested
1278            if emit_partial {
1279                for (key, window) in windows {
1280                    if !window.is_empty() {
1281                        pending_deallocations += window.len() as u64;
1282                        let state_access = StateAccess::new(storage.clone(), key.clone());
1283                        match f(window, state_access).await {
1284                            Ok(result) => yield Ok(result),
1285                            Err(e) => yield Err(e),
1286                        }
1287                    }
1288                }
1289            }
1290
1291            // Final resource tracking
1292            if pending_allocations > 0 {
1293                resource_manager.track_memory_allocation(pending_allocations).await.ok();
1294            }
1295            if pending_deallocations > 0 {
1296                resource_manager.track_memory_deallocation(pending_deallocations).await;
1297            }
1298        })
1299    }
1300}
1301
1302/// State access for managing persistent state
1303#[derive(Clone)]
1304pub struct StateAccess {
1305    storage: Arc<dyn StateStorage + Send + Sync>,
1306    key: String,
1307}
1308
1309impl StateAccess {
1310    pub fn new(storage: Arc<dyn StateStorage + Send + Sync>, key: String) -> Self {
1311        Self { storage, key }
1312    }
1313
1314    pub async fn get(&self) -> Option<Vec<u8>> {
1315        self.storage.get(&self.key).await
1316    }
1317
1318    pub async fn set(&self, value: &[u8]) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1319        self.storage.set(&self.key, value).await
1320    }
1321}
1322
1323fn unix_timestamp_millis() -> u64 {
1324    SystemTime::now()
1325        .duration_since(UNIX_EPOCH)
1326        .unwrap_or_default()
1327        .as_millis() as u64
1328}
1329
1330// Blanket implementation for all streams that meet the trait bounds
1331impl<T, S> StatefulStreamExt<T> for S
1332where
1333    S: Stream<Item = T> + Send + Sync + Unpin + 'static,
1334    T: Send + Sync + Clone + Serialize + for<'de> Deserialize<'de> + 'static,
1335{
1336}
1337