rs2_stream/state/
stream_ext.rs

1use crate::*;
2use crate::resource_manager::{get_global_resource_manager, ResourceManager};
3use async_stream::stream;
4use futures_core::Stream;
5use futures_util::pin_mut;
6use futures_util::stream::StreamExt;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::collections::HashSet;
10use std::pin::Pin;
11use std::sync::{Arc, Mutex};
12use std::time::{Duration, SystemTime, UNIX_EPOCH};
13use tokio::time::sleep;
14use crate::state::traits::KeyExtractor;
15use crate::state::{StateConfig, StateError, StateStorage};
16
17// Memory management constants
18const MAX_HASHMAP_KEYS: usize = 10_000;
19const MAX_GROUP_SIZE: usize = 10_000; // Max items per group
20const MAX_PATTERN_SIZE: usize = 1_000; // Max items per pattern
21const CLEANUP_INTERVAL: u64 = 1000; // Cleanup every 1000 items (increased from 100)
22const RESOURCE_TRACKING_INTERVAL: u64 = 100; // Track resources every 100 items
23const DEFAULT_BUFFER_SIZE: usize = 1024;
24
25// Optimized constants for different operation types
26const MAP_CLEANUP_INTERVAL: u64 = 10000; // Much less frequent for simple operations
27const WINDOW_CLEANUP_INTERVAL: u64 = 5000; // Less frequent for windowing
28const JOIN_CLEANUP_INTERVAL: u64 = 500; // More frequent for complex operations
29const THROTTLE_CLEANUP_INTERVAL: u64 = 10000; // Very infrequent for timing operations
30
31// LRU eviction helper
32fn evict_oldest_entries<K, V>(map: &mut HashMap<K, V>, max_keys: usize)
33where
34    K: Clone + std::hash::Hash + Eq + std::fmt::Display + std::cmp::Ord,
35    V: Clone,
36{
37    if map.len() > max_keys {
38        let mut entries: Vec<_> = map.iter().map(|(k, _)| k.clone()).collect();
39        entries.sort(); // Simple eviction strategy - could be improved with proper LRU
40        let to_remove = entries.len() - max_keys;
41        for key in entries.into_iter().take(to_remove) {
42            map.remove(&key);
43        }
44    }
45}
46
47// Optimized resource tracking - batch operations
48async fn track_resource_batch(
49    resource_manager: &Arc<ResourceManager>,
50    allocations: u64,
51    deallocations: u64,
52    buffer_overflows: u64,
53) {
54    if allocations > 0 {
55        resource_manager.track_memory_allocation(allocations).await.ok();
56    }
57    if deallocations > 0 {
58        resource_manager.track_memory_deallocation(deallocations).await;
59    }
60    for _ in 0..buffer_overflows {
61        resource_manager.track_buffer_overflow().await.ok();
62    }
63}
64
65#[derive(Serialize, Deserialize, Clone)]
66struct ThrottleState {
67    count: u32,
68    window_start: u64, // UNIX timestamp in milliseconds
69}
70
71#[derive(Serialize, Deserialize, Clone)]
72struct SessionState {
73    last_activity: u64, // UNIX timestamp
74    is_new_session: bool,
75}
76
77#[derive(Serialize, Deserialize, Clone)]
78struct LeftItemWithTime<T> {
79    item: T,
80    timestamp: u64,
81    key: String,
82}
83
84#[derive(Serialize, Deserialize, Clone)]
85struct RightItemWithTime<U> {
86    item: U,
87    timestamp: u64,
88    key: String,
89}
90
91/// Extension trait for adding stateful operations to streams
92pub trait StatefulStreamExt<T>: Stream<Item = T> + Send + Sync + Sized + Unpin + 'static
93where
94    Self: 'static,
95    T: Send + Sync + Clone + Serialize + for<'de> Deserialize<'de> + 'static,
96{
97    /// Apply a stateful map operation
98    fn stateful_map_rs2<F, R>(
99        self,
100        config: StateConfig,
101        key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
102        mut f: F,
103    ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
104    where
105        F: FnMut(
106                T,
107                StateAccess,
108            ) -> std::pin::Pin<
109                Box<dyn std::future::Future<Output = Result<R, StateError>> + Send>,
110            > + Send
111            + Sync
112            + 'static,
113        R: Send + Sync + 'static,
114        Self: Sized,
115    {
116        let storage = config.create_storage_arc();
117
118        Box::pin(stream! {
119            let stream = self;
120            futures::pin_mut!(stream);
121            let mut seen_keys: HashSet<String> = HashSet::new();
122            let mut item_count = 0u64;
123
124            while let Some(item) = StreamExt::next(&mut stream).await {
125                let key = key_extractor.extract_key(&item);
126
127                // Minimal cleanup - only when absolutely necessary
128                item_count += 1;
129                if item_count % MAP_CLEANUP_INTERVAL == 0 && seen_keys.len() > MAX_HASHMAP_KEYS {
130                    seen_keys.clear(); // Simple and fast cleanup
131                }
132
133                // Optimized key tracking - single operation
134                seen_keys.insert(key.clone());
135
136                let state_access = StateAccess::new(storage.clone(), key);
137                match f(item, state_access).await {
138                    Ok(result) => yield Ok(result),
139                    Err(e) => yield Err(e),
140                }
141            }
142        })
143    }
144
145    /// Apply a stateful filter operation
146    fn stateful_filter_rs2<F>(
147        self,
148        config: StateConfig,
149        key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
150        mut f: F,
151    ) -> Pin<Box<dyn Stream<Item = Result<T, StateError>> + Send>>
152    where
153        F: FnMut(
154                &T,
155                StateAccess,
156            ) -> std::pin::Pin<
157                Box<dyn std::future::Future<Output = Result<bool, StateError>> + Send>,
158            > + Send
159            + Sync
160            + 'static,
161        Self: Sized,
162    {
163        let storage = config.create_storage_arc();
164
165        Box::pin(stream! {
166            let stream = self;
167            futures::pin_mut!(stream);
168            let mut seen_keys: HashSet<String> = HashSet::new();
169            let mut item_count = 0u64;
170
171            while let Some(item) = StreamExt::next(&mut stream).await {
172                let key = key_extractor.extract_key(&item);
173
174                // Optimized cleanup - only when necessary
175                item_count += 1;
176                if item_count % (CLEANUP_INTERVAL * 2) == 0 && seen_keys.len() > MAX_HASHMAP_KEYS {
177                    // More efficient cleanup - clear all and let it rebuild
178                    seen_keys.clear();
179                }
180
181                // Optimized key insertion - avoid double lookup
182                let is_new_key = seen_keys.insert(key.clone());
183
184                let state_access = StateAccess::new(storage.clone(), key);
185                match f(&item, state_access).await {
186                    Ok(should_emit) => {
187                        if should_emit {
188                            yield Ok(item);
189                        }
190                    }
191                    Err(e) => yield Err(e),
192                }
193            }
194        })
195    }
196
197    /// Apply a stateful fold operation
198    fn stateful_fold_rs2<F, R>(
199        self,
200        config: StateConfig,
201        key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
202        initial: R,
203        mut f: F,
204    ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
205    where
206        F: FnMut(
207                R,
208                T,
209                StateAccess,
210            ) -> std::pin::Pin<
211                Box<dyn std::future::Future<Output = Result<R, StateError>> + Send>,
212            > + Send
213            + Sync
214            + 'static,
215        R: Send + Sync + Clone + 'static,
216        Self: Sized,
217    {
218        let storage = config.create_storage_arc();
219
220        Box::pin(stream! {
221            let stream = self;
222            futures::pin_mut!(stream);
223            let mut accumulators: HashMap<String, R> = HashMap::new();
224            let mut item_count = 0u64;
225
226            while let Some(item) = StreamExt::next(&mut stream).await {
227                let key = key_extractor.extract_key(&item);
228
229                // Periodic cleanup to prevent memory leaks
230                item_count += 1;
231                if item_count % (CLEANUP_INTERVAL * 2) == 0 {
232                    evict_oldest_entries(&mut accumulators, MAX_HASHMAP_KEYS);
233                }
234
235                let acc = accumulators.entry(key.clone()).or_insert_with(|| initial.clone());
236                let state_access = StateAccess::new(storage.clone(), key);
237
238                match f(acc.clone(), item, state_access).await {
239                    Ok(new_acc) => {
240                        *acc = new_acc.clone();
241                        yield Ok(new_acc);
242                    }
243                    Err(e) => yield Err(e),
244                }
245            }
246        })
247    }
248
249    /// Apply a stateful reduce operation
250    fn stateful_reduce_rs2<F, R>(
251        self,
252        config: StateConfig,
253        key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
254        initial: R,
255        mut f: F,
256    ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
257    where
258        F: FnMut(
259                R,
260                T,
261                StateAccess,
262            ) -> std::pin::Pin<
263                Box<dyn std::future::Future<Output = Result<R, StateError>> + Send>,
264            > + Send
265            + Sync
266            + 'static,
267        R: Send + Sync + Clone + 'static,
268        Self: Sized,
269    {
270        let storage = config.create_storage_arc();
271
272        Box::pin(stream! {
273            let stream = self;
274            futures::pin_mut!(stream);
275            let mut accumulators: HashMap<String, R> = HashMap::new();
276            let mut item_count = 0u64;
277
278            while let Some(item) = StreamExt::next(&mut stream).await {
279                let key = key_extractor.extract_key(&item);
280
281                // Periodic cleanup to prevent memory leaks
282                item_count += 1;
283                if item_count % (CLEANUP_INTERVAL * 2) == 0 {
284                    evict_oldest_entries(&mut accumulators, MAX_HASHMAP_KEYS);
285                }
286
287                let acc = accumulators.entry(key.clone()).or_insert_with(|| initial.clone());
288                let state_access = StateAccess::new(storage.clone(), key);
289
290                match f(acc.clone(), item, state_access).await {
291                    Ok(new_acc) => {
292                        *acc = new_acc.clone();
293                        yield Ok(new_acc);
294                    }
295                    Err(e) => yield Err(e),
296                }
297            }
298        })
299    }
300
301    /// Apply a stateful group by operation
302    fn stateful_group_by_rs2<F, R>(
303        self,
304        config: StateConfig,
305        key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
306        f: F,
307    ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
308    where
309        F: FnMut(
310                String,
311                Vec<T>,
312                StateAccess,
313            ) -> std::pin::Pin<
314                Box<dyn std::future::Future<Output = Result<R, StateError>> + Send>,
315            > + Send
316            + Sync
317            + 'static,
318        R: Send + Sync + 'static,
319        Self: Sized,
320    {
321        self.stateful_group_by_advanced_rs2(config, key_extractor, None, None, false, f)
322    }
323
324    /// Apply a stateful group by operation with advanced configuration
325    fn stateful_group_by_advanced_rs2<F, R>(
326        self,
327        config: StateConfig,
328        key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
329        max_group_size: Option<usize>, // Emit when group reaches this size
330        group_timeout: Option<std::time::Duration>, // Emit group after this timeout
331        emit_on_key_change: bool,      // Emit previous group when key changes
332        mut f: F,
333    ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
334    where
335        F: FnMut(
336                String,
337                Vec<T>,
338                StateAccess,
339            ) -> std::pin::Pin<
340                Box<dyn std::future::Future<Output = Result<R, StateError>> + Send>,
341            > + Send
342            + Sync
343            + 'static,
344        R: Send + Sync + 'static,
345        Self: Sized,
346    {
347        let storage = config.create_storage_arc();
348        let timeout_ms = group_timeout.map(|d| d.as_millis() as u64);
349        let max_group_size = max_group_size.unwrap_or(MAX_GROUP_SIZE);
350
351        Box::pin(stream! {
352            let stream = self;
353            futures::pin_mut!(stream);
354            let mut groups: HashMap<String, Vec<T>> = HashMap::new();
355            let mut group_timestamps: HashMap<String, u64> = HashMap::new();
356            let mut last_key: Option<String> = None;
357            let mut item_count = 0u64;
358
359            while let Some(item) = StreamExt::next(&mut stream).await {
360                let key = key_extractor.extract_key(&item);
361                let now = unix_timestamp_millis();
362
363                // Reduced cleanup frequency for group operations
364                item_count += 1;
365                if item_count % (CLEANUP_INTERVAL * 2) == 0 {
366                    evict_oldest_entries(&mut groups, MAX_HASHMAP_KEYS);
367                    evict_oldest_entries(&mut group_timestamps, MAX_HASHMAP_KEYS);
368                }
369
370                // Check if we need to emit the previous group due to key change
371                if emit_on_key_change {
372                    if let Some(ref last_key_val) = last_key {
373                        if last_key_val != &key {
374                            // Key changed, emit the previous group
375                            if let Some(group_items) = groups.remove(last_key_val) {
376                                let state_access = StateAccess::new(storage.clone(), last_key_val.clone());
377                                match f(last_key_val.clone(), group_items, state_access).await {
378                                    Ok(result) => yield Ok(result),
379                                    Err(e) => yield Err(e),
380                                }
381                            }
382                            group_timestamps.remove(last_key_val);
383                        }
384                    }
385                }
386
387                // Optimized timeout check - only check current key instead of all groups
388                if let (Some(timeout), Some(&group_start)) = (timeout_ms, group_timestamps.get(&key)) {
389                    if now - group_start > timeout {
390                        if let Some(group_items) = groups.remove(&key) {
391                            let state_access = StateAccess::new(storage.clone(), key.clone());
392                            match f(key.clone(), group_items, state_access).await {
393                                Ok(result) => yield Ok(result),
394                                Err(e) => yield Err(e),
395                            }
396                        }
397                        group_timestamps.remove(&key);
398                    }
399                }
400
401                // Add item to current group
402                let group = groups.entry(key.clone()).or_insert_with(Vec::new);
403                group_timestamps.entry(key.clone()).or_insert(now);
404                group.push(item);
405
406                // Check if we should emit this group due to size limit
407                if group.len() >= max_group_size {
408                    if let Some(group_items) = groups.remove(&key) {
409                        let state_access = StateAccess::new(storage.clone(), key.clone());
410                        match f(key.clone(), group_items, state_access).await {
411                            Ok(result) => yield Ok(result),
412                            Err(e) => yield Err(e),
413                        }
414                    }
415                    group_timestamps.remove(&key);
416                }
417
418                last_key = Some(key);
419            }
420
421            // Final cleanup - check for any remaining groups that have timed out
422            let now = unix_timestamp_millis();
423            let mut expired_keys = Vec::new();
424
425            if let Some(timeout) = timeout_ms {
426                for (key, &group_start) in &group_timestamps {
427                    if now - group_start > timeout {
428                        expired_keys.push(key.clone());
429                    }
430                }
431            }
432
433            // Emit expired groups
434            for key in expired_keys {
435                let key_clone = key.clone();
436                if let Some(group_items) = groups.remove(&key_clone) {
437                    let state_access = StateAccess::new(storage.clone(), key_clone.clone());
438                    match f(key_clone.clone(), group_items, state_access).await {
439                        Ok(result) => yield Ok(result),
440                        Err(e) => yield Err(e),
441                    }
442                }
443                group_timestamps.remove(&key_clone);
444            }
445
446            // Emit any remaining groups at stream end
447            for (key, group_items) in groups {
448                let state_access = StateAccess::new(storage.clone(), key.clone());
449                match f(key, group_items, state_access).await {
450                    Ok(result) => yield Ok(result),
451                    Err(e) => yield Err(e),
452                }
453            }
454        })
455    }
456
457    /// Apply a stateful deduplication operation
458    fn stateful_deduplicate_rs2<F>(
459        self,
460        config: StateConfig,
461        key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
462        ttl: std::time::Duration,
463        mut f: F,
464    ) -> Pin<Box<dyn Stream<Item = Result<T, StateError>> + Send>>
465    where
466        F: FnMut(T) -> T + Send + Sync + 'static,
467        Self: Sized,
468    {
469        let storage = config.create_storage_arc();
470        let ttl_ms = ttl.as_millis() as u64;
471
472        Box::pin(stream! {
473            let stream = self;
474            futures::pin_mut!(stream);
475
476            while let Some(item) = StreamExt::next(&mut stream).await {
477                let key = key_extractor.extract_key(&item);
478                let state_access = StateAccess::new(storage.clone(), key.clone());
479
480                let now = unix_timestamp_millis();
481                let state_bytes = match state_access.get().await {
482                    Some(bytes) => bytes,
483                    None => Vec::new(),
484                };
485
486                let last_seen: u64 = if state_bytes.is_empty() {
487                    0
488                } else {
489                    match serde_json::from_slice(&state_bytes) {
490                        Ok(timestamp) => timestamp,
491                        Err(_) => 0,
492                    }
493                };
494
495                if now - last_seen > ttl_ms {
496                    // Handle serialization error gracefully
497                    match serde_json::to_vec(&now) {
498                        Ok(timestamp_bytes) => {
499                            if let Err(e) = state_access.set(&timestamp_bytes).await {
500                                yield Err(StateError::Storage(format!("Failed to set state for deduplication: {}", e)));
501                                continue;
502                            }
503                        }
504                        Err(e) => {
505                            yield Err(StateError::Serialization(e));
506                            continue;
507                        }
508                    }
509
510                    yield Ok(f(item));
511                }
512            }
513        })
514    }
515
516    /// Apply a stateful throttle operation
517    fn stateful_throttle_rs2<F>(
518        self,
519        config: StateConfig,
520        key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
521        rate_limit: u32,
522        window_duration: std::time::Duration,
523        mut f: F,
524    ) -> Pin<Box<dyn Stream<Item = Result<T, StateError>> + Send>>
525    where
526        F: FnMut(T) -> T + Send + Sync + 'static,
527        Self: Sized,
528    {
529        let storage = config.create_storage_arc();
530        let window_ms = window_duration.as_millis() as u64;
531
532        Box::pin(stream! {
533            let stream = self;
534            futures::pin_mut!(stream);
535
536            while let Some(item) = StreamExt::next(&mut stream).await {
537                let key = key_extractor.extract_key(&item);
538                let state_access = StateAccess::new(storage.clone(), key.clone());
539
540                let now = unix_timestamp_millis();
541
542                // Get current throttle state from storage
543                let state_bytes = match state_access.get().await {
544                    Some(bytes) => bytes,
545                    None => Vec::new(),
546                };
547
548                let mut throttle_state: ThrottleState = if state_bytes.is_empty() {
549                    ThrottleState { count: 0, window_start: now }
550                } else {
551                    match serde_json::from_slice(&state_bytes) {
552                        Ok(state) => state,
553                        Err(_) => ThrottleState { count: 0, window_start: now },
554                    }
555                };
556
557                // If window expired, reset
558                if now - throttle_state.window_start > window_ms {
559                    throttle_state.count = 0;
560                    throttle_state.window_start = now;
561                }
562
563                if throttle_state.count < rate_limit {
564                    throttle_state.count += 1;
565
566                    // Update state in storage
567                    match serde_json::to_vec(&throttle_state) {
568                        Ok(state_bytes) => {
569                            if let Err(e) = state_access.set(&state_bytes).await {
570                                yield Err(StateError::Storage(format!("Failed to set throttle state: {}", e)));
571                                continue;
572                            }
573                        }
574                        Err(e) => {
575                            yield Err(StateError::Serialization(e));
576                            continue;
577                        }
578                    }
579
580                    yield Ok(f(item));
581                } else {
582                    // Optimized sleep - calculate remaining time more efficiently
583                    let elapsed_ms = now.saturating_sub(throttle_state.window_start);
584                    let remaining = if elapsed_ms >= window_ms {
585                        Duration::from_millis(0)
586                    } else {
587                        Duration::from_millis(window_ms - elapsed_ms)
588                    };
589
590                    // Only sleep if necessary and for a reasonable duration
591                    if remaining > Duration::from_millis(0) && remaining < Duration::from_secs(1) {
592                        sleep(remaining).await;
593                    }
594
595                    // After sleep, reset window and count
596                    let now2 = unix_timestamp_millis();
597                    throttle_state.count = 1;
598                    throttle_state.window_start = now2;
599
600                    // Update state in storage
601                    match serde_json::to_vec(&throttle_state) {
602                        Ok(state_bytes) => {
603                            if let Err(e) = state_access.set(&state_bytes).await {
604                                yield Err(StateError::Storage(format!("Failed to set throttle state: {}", e)));
605                                continue;
606                            }
607                        }
608                        Err(e) => {
609                            yield Err(StateError::Serialization(e));
610                            continue;
611                        }
612                    }
613
614                    yield Ok(f(item));
615                }
616            }
617        })
618    }
619
620    /// Apply a stateful session operation
621    fn stateful_session_rs2<F>(
622        self,
623        config: StateConfig,
624        key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
625        session_timeout: std::time::Duration,
626        mut f: F,
627    ) -> Pin<Box<dyn Stream<Item = Result<T, StateError>> + Send>>
628    where
629        F: FnMut(T, bool) -> T + Send + Sync + 'static,
630        Self: Sized,
631    {
632        let storage = config.create_storage_arc();
633        let timeout_ms = session_timeout.as_millis() as u64;
634
635        Box::pin(stream! {
636            let stream = self;
637            futures::pin_mut!(stream);
638
639            while let Some(item) = StreamExt::next(&mut stream).await {
640                let key = key_extractor.extract_key(&item);
641                let state_access = StateAccess::new(storage.clone(), key.clone());
642
643                let now = unix_timestamp_millis();
644                let state_bytes = match state_access.get().await {
645                    Some(bytes) => bytes,
646                    None => Vec::new(),
647                };
648
649                let mut state: SessionState = if state_bytes.is_empty() {
650                    SessionState { last_activity: now, is_new_session: true }
651                } else {
652                    match serde_json::from_slice(&state_bytes) {
653                        Ok(session_state) => session_state,
654                        Err(_) => SessionState { last_activity: now, is_new_session: true },
655                    }
656                };
657
658                let is_new_session = now - state.last_activity > timeout_ms;
659                state.last_activity = now;
660                state.is_new_session = is_new_session;
661
662                // Handle serialization and state setting errors gracefully
663                match serde_json::to_vec(&state) {
664                    Ok(state_bytes) => {
665                        if let Err(e) = state_access.set(&state_bytes).await {
666                            yield Err(StateError::Storage(format!("Failed to set session state: {}", e)));
667                            continue;
668                        }
669                    }
670                    Err(e) => {
671                        yield Err(StateError::Serialization(e));
672                        continue;
673                    }
674                }
675
676                yield Ok(f(item, is_new_session));
677            }
678        })
679    }
680
681    /// Apply a stateful pattern operation
682    fn stateful_pattern_rs2<F>(
683        self,
684        config: StateConfig,
685        key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
686        pattern_size: usize,
687        mut f: F,
688    ) -> Pin<Box<dyn Stream<Item = Result<Option<String>, StateError>> + Send>>
689    where
690        F: FnMut(
691                Vec<T>,
692                StateAccess,
693            ) -> std::pin::Pin<
694                Box<dyn std::future::Future<Output = Result<Option<String>, StateError>> + Send>,
695            > + Send
696            + Sync
697            + 'static,
698        Self: Sized,
699    {
700        let storage = config.create_storage_arc();
701
702        Box::pin(stream! {
703            let stream = self;
704            futures::pin_mut!(stream);
705            let mut patterns: HashMap<String, Vec<T>> = HashMap::new();
706            let mut item_count = 0u64;
707
708            while let Some(item) = StreamExt::next(&mut stream).await {
709                let key = key_extractor.extract_key(&item);
710
711                // Periodic cleanup to prevent memory leaks
712                item_count += 1;
713                if item_count % (CLEANUP_INTERVAL * 2) == 0 {
714                    evict_oldest_entries(&mut patterns, MAX_HASHMAP_KEYS);
715                }
716
717                let pattern = patterns.entry(key.clone()).or_insert_with(Vec::new);
718                pattern.push(item);
719
720                // Limit pattern buffer size to prevent memory overflow
721                if pattern.len() > MAX_PATTERN_SIZE {
722                    pattern.drain(0..pattern.len() - MAX_PATTERN_SIZE);
723                }
724
725                if pattern.len() >= pattern_size {
726                    let pattern_items = pattern.drain(..pattern_size).collect::<Vec<_>>();
727                    let state_access = StateAccess::new(storage.clone(), key.clone());
728                    match f(pattern_items, state_access).await {
729                        Ok(result) => {
730                            if let Some(pattern_str) = result {
731                                yield Ok(Some(pattern_str));
732                            }
733                        }
734                        Err(e) => yield Err(e),
735                    }
736                }
737            }
738        })
739    }
740
741    /// Join two streams based on keys with time-based windows (true streaming join)
742    fn stateful_join_rs2<U, F, R>(
743        self,
744        other: Pin<Box<dyn Stream<Item = U> + Send>>,
745        config: StateConfig,
746        key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
747        other_key_extractor: impl KeyExtractor<U> + Send + Sync + 'static,
748        window_duration: std::time::Duration,
749        mut f: F,
750    ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
751    where
752        F: FnMut(
753                T,
754                U,
755                StateAccess,
756            ) -> std::pin::Pin<
757                Box<dyn std::future::Future<Output = Result<R, StateError>> + Send>,
758            > + Send
759            + Sync
760            + 'static,
761        U: Send + Sync + Clone + Serialize + for<'de> Deserialize<'de> + 'static,
762        R: Send + Sync + 'static,
763        Self: Sized,
764    {
765        let storage = config.create_storage_arc();
766        Box::pin(stream! {
767            let left_stream = self;
768            let right_stream = other;
769            futures::pin_mut!(left_stream);
770            futures::pin_mut!(right_stream);
771            let mut left_buffer: HashMap<String, Vec<LeftItemWithTime<T>>> = HashMap::new();
772            let mut right_buffer: HashMap<String, Vec<RightItemWithTime<U>>> = HashMap::new();
773            let window_ms = window_duration.as_millis() as u64;
774            let mut item_count = 0u64;
775
776            loop {
777                tokio::select! {
778                    left_item = left_stream.next() => {
779                        if let Some(item) = left_item {
780                            let key = key_extractor.extract_key(&item);
781                            let now = unix_timestamp_millis();
782
783                            // Reduced cleanup frequency for complex operations
784                            item_count += 1;
785                            if item_count % JOIN_CLEANUP_INTERVAL == 0 {
786                                evict_oldest_entries(&mut left_buffer, MAX_HASHMAP_KEYS);
787                                evict_oldest_entries(&mut right_buffer, MAX_HASHMAP_KEYS);
788                            }
789
790                            // Clean up old left items
791                            left_buffer.entry(key.clone()).or_default().retain(|x| now - x.timestamp <= window_ms);
792
793                            // Add new left item
794                            let left_entry = LeftItemWithTime { item: item.clone(), timestamp: now, key: key.clone() };
795                            left_buffer.entry(key.clone()).or_default().push(left_entry.clone());
796
797                            // Evict oldest if buffer is full
798                            let max_size = config.max_size.unwrap_or(DEFAULT_BUFFER_SIZE);
799                            let left_buf = left_buffer.get_mut(&key).unwrap();
800                            if left_buf.len() > max_size {
801                                left_buf.drain(0..left_buf.len() - max_size);
802                            }
803
804                            // Join with right items in window
805                            if let Some(rights) = right_buffer.get(&key) {
806                                for right in rights.iter().filter(|r| now - r.timestamp <= window_ms) {
807                                    let state_access = StateAccess::new(storage.clone(), key.clone());
808                                    match f(item.clone(), right.item.clone(), state_access).await {
809                                        Ok(result) => yield Ok(result),
810                                        Err(e) => yield Err(e),
811                                    }
812                                }
813                            }
814                        } else {
815                            break;
816                        }
817                    }
818                    right_item = right_stream.next() => {
819                        if let Some(item) = right_item {
820                            let key = other_key_extractor.extract_key(&item);
821                            let now = unix_timestamp_millis();
822                            
823                            // Reduced cleanup frequency for complex operations
824                            item_count += 1;
825                            if item_count % JOIN_CLEANUP_INTERVAL == 0 {
826                                evict_oldest_entries(&mut left_buffer, MAX_HASHMAP_KEYS);
827                                evict_oldest_entries(&mut right_buffer, MAX_HASHMAP_KEYS);
828                            }
829
830                            // Clean up old right items
831                            right_buffer.entry(key.clone()).or_default().retain(|x| now - x.timestamp <= window_ms);
832                            // Clean up old left items
833                            left_buffer.entry(key.clone()).or_default().retain(|x| now - x.timestamp <= window_ms);
834                            
835                            // Add new right item
836                            let right_entry = RightItemWithTime { item: item.clone(), timestamp: now, key: key.clone() };
837                            right_buffer.entry(key.clone()).or_default().push(right_entry.clone());
838                            
839                            // Evict oldest if buffer is full
840                            let max_size = config.max_size.unwrap_or(DEFAULT_BUFFER_SIZE);
841                            let right_buf = right_buffer.get_mut(&key).unwrap();
842                            if right_buf.len() > max_size {
843                                right_buf.drain(0..right_buf.len() - max_size);
844                            }
845                            
846                            // Join with left items in window
847                            if let Some(lefts) = left_buffer.get(&key) {
848                                for left in lefts.iter().filter(|l| now - l.timestamp <= window_ms) {
849                                    let state_access = StateAccess::new(storage.clone(), key.clone());
850                                    match f(left.item.clone(), item.clone(), state_access).await {
851                                        Ok(result) => yield Ok(result),
852                                        Err(e) => yield Err(e),
853                                    }
854                                }
855                            }
856                        } else {
857                            break;
858                        }
859                    }
860                }
861            }
862        })
863    }
864
865    /// Apply a stateful window operation (tumbling window, no partial emission)
866    fn stateful_window_rs2<F, R>(
867        self,
868        config: StateConfig,
869        key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
870        window_size: usize,
871        f: F,
872    ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
873    where
874        F: FnMut(
875                Vec<T>,
876                StateAccess,
877            ) -> std::pin::Pin<
878                Box<dyn std::future::Future<Output = Result<R, StateError>> + Send>,
879            > + Send
880            + Sync
881            + 'static,
882        R: Send + Sync + 'static,
883        Self: Sized,
884    {
885        self.stateful_window_rs2_advanced(config, key_extractor, window_size, None, false, f)
886    }
887
888    /// Apply a stateful window operation with sliding window support
889    fn stateful_window_rs2_advanced<F, R>(
890        self,
891        config: StateConfig,
892        key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
893        window_size: usize,
894        slide_size: Option<usize>, // None for tumbling, Some(n) for sliding
895        emit_partial: bool,        // Whether to emit partial windows at stream end
896        mut f: F,
897    ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
898    where
899        F: FnMut(
900                Vec<T>,
901                StateAccess,
902            ) -> std::pin::Pin<
903                Box<dyn std::future::Future<Output = Result<R, StateError>> + Send>,
904            > + Send
905            + Sync
906            + 'static,
907        R: Send + Sync + 'static,
908        Self: Sized,
909    {
910        let storage = config.create_storage_arc();
911        let slide_size = slide_size.unwrap_or(window_size); // Default to tumbling window
912
913        Box::pin(stream! {
914            let stream = self;
915            futures::pin_mut!(stream);
916            let mut windows: HashMap<String, Vec<T>> = HashMap::new();
917
918            while let Some(item) = StreamExt::next(&mut stream).await {
919                let key = key_extractor.extract_key(&item);
920                let window = windows.entry(key.clone()).or_insert_with(Vec::new);
921
922                window.push(item);
923
924                // Emit window when it reaches the required size
925                if window.len() >= window_size {
926                    let window_items = if slide_size >= window_size {
927                        // Tumbling window - take all items and clear the window
928                        window.drain(..).collect::<Vec<_>>()
929                    } else {
930                        // Sliding window - take window_size items, keep the sliding portion
931                        let items = window.drain(..window_size).collect::<Vec<_>>();
932
933                        // Calculate how many items to keep for the next window
934                        let keep_count = window_size.saturating_sub(slide_size);
935                        if keep_count > 0 && items.len() >= slide_size {
936                            // Put back the items that should remain for the sliding window
937                            let to_keep = items[slide_size..].to_vec();
938                            window.splice(0..0, to_keep);
939                        }
940
941                        items
942                    };
943
944                    let state_access = StateAccess::new(storage.clone(), key.clone());
945                    match f(window_items, state_access).await {
946                        Ok(result) => yield Ok(result),
947                        Err(e) => yield Err(e),
948                    }
949                }
950            }
951
952            // Emit remaining partial windows if requested
953            if emit_partial {
954                for (key, window) in windows {
955                    if !window.is_empty() {
956                        let state_access = StateAccess::new(storage.clone(), key.clone());
957                        match f(window, state_access).await {
958                            Ok(result) => yield Ok(result),
959                            Err(e) => yield Err(e),
960                        }
961                    }
962                }
963            }
964        })
965    }
966}
967
968/// State access for managing persistent state
969#[derive(Clone)]
970pub struct StateAccess {
971    storage: Arc<dyn StateStorage + Send + Sync>,
972    key: String,
973}
974
975impl StateAccess {
976    pub fn new(storage: Arc<dyn StateStorage + Send + Sync>, key: String) -> Self {
977        Self { storage, key }
978    }
979
980    pub async fn get(&self) -> Option<Vec<u8>> {
981        self.storage.get(&self.key).await
982    }
983
984    pub async fn set(&self, value: &[u8]) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
985        self.storage.set(&self.key, value).await
986    }
987}
988
989fn unix_timestamp_millis() -> u64 {
990    SystemTime::now()
991        .duration_since(UNIX_EPOCH)
992        .unwrap_or_default()
993        .as_millis() as u64
994}
995
996// Blanket implementation for all streams that meet the trait bounds
997impl<T, S> StatefulStreamExt<T> for S
998where
999    S: Stream<Item = T> + Send + Sync + Unpin + 'static,
1000    T: Send + Sync + Clone + Serialize + for<'de> Deserialize<'de> + 'static,
1001{
1002}
1003