1use crate::*;
2use crate::resource_manager::{get_global_resource_manager, ResourceManager};
3use async_stream::stream;
4use futures_core::Stream;
5use futures_util::pin_mut;
6use futures_util::stream::StreamExt;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::collections::HashSet;
10use std::pin::Pin;
11use std::sync::{Arc, Mutex};
12use std::time::{Duration, SystemTime, UNIX_EPOCH};
13use tokio::time::sleep;
14use crate::state::traits::KeyExtractor;
15use crate::state::{StateConfig, StateError, StateStorage};
16
17const MAX_HASHMAP_KEYS: usize = 10_000;
19const MAX_GROUP_SIZE: usize = 10_000; const MAX_PATTERN_SIZE: usize = 1_000; const CLEANUP_INTERVAL: u64 = 1000; const RESOURCE_TRACKING_INTERVAL: u64 = 100; const DEFAULT_BUFFER_SIZE: usize = 1024;
24
25fn evict_oldest_entries<K, V>(map: &mut HashMap<K, V>, max_keys: usize)
27where
28 K: Clone + std::hash::Hash + Eq + std::fmt::Display + std::cmp::Ord,
29 V: Clone,
30{
31 if map.len() > max_keys {
32 let mut entries: Vec<_> = map.iter().map(|(k, _)| k.clone()).collect();
33 entries.sort(); let to_remove = entries.len() - max_keys;
35 for key in entries.into_iter().take(to_remove) {
36 map.remove(&key);
37 }
38 }
39}
40
41async fn track_resource_batch(
43 resource_manager: &Arc<ResourceManager>,
44 allocations: u64,
45 deallocations: u64,
46 buffer_overflows: u64,
47) {
48 if allocations > 0 {
49 resource_manager.track_memory_allocation(allocations).await.ok();
50 }
51 if deallocations > 0 {
52 resource_manager.track_memory_deallocation(deallocations).await;
53 }
54 for _ in 0..buffer_overflows {
55 resource_manager.track_buffer_overflow().await.ok();
56 }
57}
58
59#[derive(Serialize, Deserialize, Clone)]
60struct ThrottleState {
61 count: u32,
62 window_start: u64, }
64
65#[derive(Serialize, Deserialize, Clone)]
66struct SessionState {
67 last_activity: u64, is_new_session: bool,
69}
70
71#[derive(Serialize, Deserialize, Clone)]
72struct LeftItemWithTime<T> {
73 item: T,
74 timestamp: u64,
75 key: String,
76}
77
78#[derive(Serialize, Deserialize, Clone)]
79struct RightItemWithTime<U> {
80 item: U,
81 timestamp: u64,
82 key: String,
83}
84
85pub trait StatefulStreamExt<T>: Stream<Item = T> + Send + Sync + Sized + Unpin + 'static
87where
88 Self: 'static,
89 T: Send + Sync + Clone + Serialize + for<'de> Deserialize<'de> + 'static,
90{
91 fn stateful_map_rs2<F, R>(
93 self,
94 config: StateConfig,
95 key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
96 mut f: F,
97 ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
98 where
99 F: FnMut(
100 T,
101 StateAccess,
102 ) -> std::pin::Pin<
103 Box<dyn std::future::Future<Output = Result<R, StateError>> + Send>,
104 > + Send
105 + Sync
106 + 'static,
107 R: Send + Sync + 'static,
108 Self: Sized,
109 {
110 let storage = config.create_storage_arc();
111 let resource_manager = get_global_resource_manager();
112
113 Box::pin(stream! {
114 let stream = self;
115 futures::pin_mut!(stream);
116 let mut state: HashMap<String, ()> = HashMap::new();
117 let mut item_count = 0u64;
118 let mut pending_allocations = 0u64;
119 let mut pending_deallocations = 0u64;
120 let mut pending_buffer_overflows = 0u64;
121
122 while let Some(item) = StreamExt::next(&mut stream).await {
123 let key = key_extractor.extract_key(&item);
124
125 item_count += 1;
127 if item_count % CLEANUP_INTERVAL == 0 {
128 let before_len = state.len();
129 evict_oldest_entries(&mut state, MAX_HASHMAP_KEYS);
130 let after_len = state.len();
131 if before_len > after_len {
132 pending_deallocations += (before_len - after_len) as u64;
133 pending_buffer_overflows += 1;
134 }
135 }
136
137 if item_count % RESOURCE_TRACKING_INTERVAL == 0 {
139 track_resource_batch(&resource_manager, pending_allocations, pending_deallocations, pending_buffer_overflows).await;
140 pending_allocations = 0;
141 pending_deallocations = 0;
142 pending_buffer_overflows = 0;
143 }
144
145 let is_new_key = !state.contains_key(&key);
146 state.entry(key.clone()).or_insert(());
147 if is_new_key {
148 pending_allocations += 1;
149 }
150
151 let state_access = StateAccess::new(storage.clone(), key);
152 match f(item, state_access).await {
153 Ok(result) => yield Ok(result),
154 Err(e) => yield Err(e),
155 }
156 }
157
158 if pending_allocations > 0 || pending_deallocations > 0 || pending_buffer_overflows > 0 {
160 track_resource_batch(&resource_manager, pending_allocations, pending_deallocations, pending_buffer_overflows).await;
161 }
162 })
163 }
164
165 fn stateful_filter_rs2<F>(
167 self,
168 config: StateConfig,
169 key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
170 mut f: F,
171 ) -> Pin<Box<dyn Stream<Item = Result<T, StateError>> + Send>>
172 where
173 F: FnMut(
174 &T,
175 StateAccess,
176 ) -> std::pin::Pin<
177 Box<dyn std::future::Future<Output = Result<bool, StateError>> + Send>,
178 > + Send
179 + Sync
180 + 'static,
181 Self: Sized,
182 {
183 let storage = config.create_storage_arc();
184 let resource_manager = get_global_resource_manager();
185
186 Box::pin(stream! {
187 let stream = self;
188 futures::pin_mut!(stream);
189 let mut seen_keys: HashSet<String> = HashSet::new();
190 let mut item_count = 0u64;
191 let mut pending_allocations = 0u64;
192
193 while let Some(item) = StreamExt::next(&mut stream).await {
194 let key = key_extractor.extract_key(&item);
195
196 item_count += 1;
198 if item_count % CLEANUP_INTERVAL == 0 && seen_keys.len() > MAX_HASHMAP_KEYS {
199 let old_size = seen_keys.len();
201 seen_keys.clear();
202 pending_allocations = pending_allocations.saturating_sub(old_size as u64);
203 }
204
205 if item_count % RESOURCE_TRACKING_INTERVAL == 0 {
207 if pending_allocations > 0 {
208 resource_manager.track_memory_allocation(pending_allocations).await.ok();
209 pending_allocations = 0;
210 }
211 }
212
213 let is_new_key = seen_keys.insert(key.clone());
215 if is_new_key {
216 pending_allocations += 1;
217 }
218
219 let state_access = StateAccess::new(storage.clone(), key);
220 match f(&item, state_access).await {
221 Ok(should_emit) => {
222 if should_emit {
223 yield Ok(item);
224 }
225 }
226 Err(e) => yield Err(e),
227 }
228 }
229
230 if pending_allocations > 0 {
232 resource_manager.track_memory_allocation(pending_allocations).await.ok();
233 }
234 })
235 }
236
237 fn stateful_fold_rs2<F, R>(
239 self,
240 config: StateConfig,
241 key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
242 initial: R,
243 mut f: F,
244 ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
245 where
246 F: FnMut(
247 R,
248 T,
249 StateAccess,
250 ) -> std::pin::Pin<
251 Box<dyn std::future::Future<Output = Result<R, StateError>> + Send>,
252 > + Send
253 + Sync
254 + 'static,
255 R: Send + Sync + Clone + 'static,
256 Self: Sized,
257 {
258 let storage = config.create_storage_arc();
259 let resource_manager = get_global_resource_manager();
260
261 Box::pin(stream! {
262 let stream = self;
263 futures::pin_mut!(stream);
264 let mut accumulators: HashMap<String, R> = HashMap::new();
265 let mut item_count = 0u64;
266 let mut pending_allocations = 0u64;
267 let mut pending_deallocations = 0u64;
268 let mut pending_buffer_overflows = 0u64;
269
270 while let Some(item) = StreamExt::next(&mut stream).await {
271 let key = key_extractor.extract_key(&item);
272
273 item_count += 1;
275 if item_count % CLEANUP_INTERVAL == 0 {
276 let before_len = accumulators.len();
277 evict_oldest_entries(&mut accumulators, MAX_HASHMAP_KEYS);
278 let after_len = accumulators.len();
279 if before_len > after_len {
280 pending_deallocations += (before_len - after_len) as u64;
281 pending_buffer_overflows += 1;
282 }
283 }
284
285 if item_count % RESOURCE_TRACKING_INTERVAL == 0 {
287 track_resource_batch(&resource_manager, pending_allocations, pending_deallocations, pending_buffer_overflows).await;
288 pending_allocations = 0;
289 pending_deallocations = 0;
290 pending_buffer_overflows = 0;
291 }
292
293 let is_new_key = !accumulators.contains_key(&key);
294 let acc = accumulators.entry(key.clone()).or_insert_with(|| initial.clone());
295 if is_new_key {
296 pending_allocations += 1;
297 }
298 let state_access = StateAccess::new(storage.clone(), key);
299
300 match f(acc.clone(), item, state_access).await {
301 Ok(new_acc) => {
302 *acc = new_acc.clone();
303 yield Ok(new_acc);
304 }
305 Err(e) => yield Err(e),
306 }
307 }
308
309 if pending_allocations > 0 || pending_deallocations > 0 || pending_buffer_overflows > 0 {
311 track_resource_batch(&resource_manager, pending_allocations, pending_deallocations, pending_buffer_overflows).await;
312 }
313 })
314 }
315
316 fn stateful_reduce_rs2<F, R>(
318 self,
319 config: StateConfig,
320 key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
321 initial: R,
322 mut f: F,
323 ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
324 where
325 F: FnMut(
326 R,
327 T,
328 StateAccess,
329 ) -> std::pin::Pin<
330 Box<dyn std::future::Future<Output = Result<R, StateError>> + Send>,
331 > + Send
332 + Sync
333 + 'static,
334 R: Send + Sync + Clone + 'static,
335 Self: Sized,
336 {
337 let storage = config.create_storage_arc();
338 let resource_manager = get_global_resource_manager();
339
340 Box::pin(stream! {
341 let stream = self;
342 futures::pin_mut!(stream);
343 let mut accumulators: HashMap<String, R> = HashMap::new();
344 let mut item_count = 0u64;
345 let mut pending_allocations = 0u64;
346 let mut pending_deallocations = 0u64;
347 let mut pending_buffer_overflows = 0u64;
348
349 while let Some(item) = StreamExt::next(&mut stream).await {
350 let key = key_extractor.extract_key(&item);
351
352 item_count += 1;
354 if item_count % CLEANUP_INTERVAL == 0 {
355 let before_len = accumulators.len();
356 evict_oldest_entries(&mut accumulators, MAX_HASHMAP_KEYS);
357 let after_len = accumulators.len();
358 if before_len > after_len {
359 pending_deallocations += (before_len - after_len) as u64;
360 pending_buffer_overflows += 1;
361 }
362 }
363
364 if item_count % RESOURCE_TRACKING_INTERVAL == 0 {
366 track_resource_batch(&resource_manager, pending_allocations, pending_deallocations, pending_buffer_overflows).await;
367 pending_allocations = 0;
368 pending_deallocations = 0;
369 pending_buffer_overflows = 0;
370 }
371
372 let is_new_key = !accumulators.contains_key(&key);
373 let acc = accumulators.entry(key.clone()).or_insert_with(|| initial.clone());
374 if is_new_key {
375 pending_allocations += 1;
376 }
377 let state_access = StateAccess::new(storage.clone(), key);
378
379 match f(acc.clone(), item, state_access).await {
380 Ok(new_acc) => {
381 *acc = new_acc.clone();
382 yield Ok(new_acc);
383 }
384 Err(e) => yield Err(e),
385 }
386 }
387
388 if pending_allocations > 0 || pending_deallocations > 0 || pending_buffer_overflows > 0 {
390 track_resource_batch(&resource_manager, pending_allocations, pending_deallocations, pending_buffer_overflows).await;
391 }
392 })
393 }
394
395 fn stateful_group_by_rs2<F, R>(
397 self,
398 config: StateConfig,
399 key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
400 f: F,
401 ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
402 where
403 F: FnMut(
404 String,
405 Vec<T>,
406 StateAccess,
407 ) -> std::pin::Pin<
408 Box<dyn std::future::Future<Output = Result<R, StateError>> + Send>,
409 > + Send
410 + Sync
411 + 'static,
412 R: Send + Sync + 'static,
413 Self: Sized,
414 {
415 self.stateful_group_by_advanced_rs2(config, key_extractor, None, None, false, f)
416 }
417
418 fn stateful_group_by_advanced_rs2<F, R>(
420 self,
421 config: StateConfig,
422 key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
423 max_group_size: Option<usize>, group_timeout: Option<std::time::Duration>, emit_on_key_change: bool, mut f: F,
427 ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
428 where
429 F: FnMut(
430 String,
431 Vec<T>,
432 StateAccess,
433 ) -> std::pin::Pin<
434 Box<dyn std::future::Future<Output = Result<R, StateError>> + Send>,
435 > + Send
436 + Sync
437 + 'static,
438 R: Send + Sync + 'static,
439 Self: Sized,
440 {
441 let storage = config.create_storage_arc();
442 let timeout_ms = group_timeout.map(|d| d.as_millis() as u64);
443 let max_group_size = max_group_size.unwrap_or(MAX_GROUP_SIZE);
444 let resource_manager = get_global_resource_manager();
445
446 Box::pin(stream! {
447 let stream = self;
448 futures::pin_mut!(stream);
449 let mut groups: HashMap<String, Vec<T>> = HashMap::new();
450 let mut group_timestamps: HashMap<String, u64> = HashMap::new();
451 let mut last_key: Option<String> = None;
452 let mut item_count = 0u64;
453 let mut pending_allocations = 0u64;
454 let mut pending_deallocations = 0u64;
455 let mut pending_buffer_overflows = 0u64;
456
457 while let Some(item) = StreamExt::next(&mut stream).await {
458 let key = key_extractor.extract_key(&item);
459 let now = unix_timestamp_millis();
460
461 item_count += 1;
463 if item_count % CLEANUP_INTERVAL == 0 {
464 let before_groups = groups.len();
465 let before_timestamps = group_timestamps.len();
466 evict_oldest_entries(&mut groups, MAX_HASHMAP_KEYS);
467 evict_oldest_entries(&mut group_timestamps, MAX_HASHMAP_KEYS);
468 let after_groups = groups.len();
469 let after_timestamps = group_timestamps.len();
470 if before_groups > after_groups || before_timestamps > after_timestamps {
471 pending_deallocations += (before_groups + before_timestamps - after_groups - after_timestamps) as u64;
472 pending_buffer_overflows += 1;
473 }
474 }
475
476 if item_count % RESOURCE_TRACKING_INTERVAL == 0 {
478 track_resource_batch(&resource_manager, pending_allocations, pending_deallocations, pending_buffer_overflows).await;
479 pending_allocations = 0;
480 pending_deallocations = 0;
481 pending_buffer_overflows = 0;
482 }
483
484 if emit_on_key_change {
486 if let Some(ref last_key_val) = last_key {
487 if last_key_val != &key {
488 if let Some(group_items) = groups.remove(last_key_val) {
490 pending_deallocations += group_items.len() as u64;
491 let state_access = StateAccess::new(storage.clone(), last_key_val.clone());
492 match f(last_key_val.clone(), group_items, state_access).await {
493 Ok(result) => yield Ok(result),
494 Err(e) => yield Err(e),
495 }
496 }
497 group_timestamps.remove(last_key_val);
498 }
499 }
500 }
501
502 if let (Some(timeout), Some(&group_start)) = (timeout_ms, group_timestamps.get(&key)) {
504 if now - group_start > timeout {
505 if let Some(group_items) = groups.remove(&key) {
506 pending_deallocations += group_items.len() as u64;
507 let state_access = StateAccess::new(storage.clone(), key.clone());
508 match f(key.clone(), group_items, state_access).await {
509 Ok(result) => yield Ok(result),
510 Err(e) => yield Err(e),
511 }
512 }
513 group_timestamps.remove(&key);
514 }
515 }
516
517 let is_new_group = !groups.contains_key(&key);
519 let group = groups.entry(key.clone()).or_insert_with(Vec::new);
520 if is_new_group {
521 pending_allocations += 1;
522 }
523 group_timestamps.entry(key.clone()).or_insert(now);
524 group.push(item);
525 pending_allocations += 1;
526
527 if group.len() >= max_group_size {
529 if let Some(group_items) = groups.remove(&key) {
530 pending_deallocations += group_items.len() as u64;
531 let state_access = StateAccess::new(storage.clone(), key.clone());
532 match f(key.clone(), group_items, state_access).await {
533 Ok(result) => yield Ok(result),
534 Err(e) => yield Err(e),
535 }
536 }
537 group_timestamps.remove(&key);
538 }
539
540 last_key = Some(key);
541 }
542
543 let now = unix_timestamp_millis();
545 let mut expired_keys = Vec::new();
546
547 if let Some(timeout) = timeout_ms {
548 for (key, &group_start) in &group_timestamps {
549 if now - group_start > timeout {
550 expired_keys.push(key.clone());
551 }
552 }
553 }
554
555 for key in expired_keys {
557 let key_clone = key.clone();
558 if let Some(group_items) = groups.remove(&key_clone) {
559 pending_deallocations += group_items.len() as u64;
560 let state_access = StateAccess::new(storage.clone(), key_clone.clone());
561 match f(key_clone.clone(), group_items, state_access).await {
562 Ok(result) => yield Ok(result),
563 Err(e) => yield Err(e),
564 }
565 }
566 group_timestamps.remove(&key_clone);
567 }
568
569 for (key, group_items) in groups {
571 pending_deallocations += group_items.len() as u64;
572 let state_access = StateAccess::new(storage.clone(), key.clone());
573 match f(key, group_items, state_access).await {
574 Ok(result) => yield Ok(result),
575 Err(e) => yield Err(e),
576 }
577 }
578
579 if pending_allocations > 0 || pending_deallocations > 0 || pending_buffer_overflows > 0 {
581 track_resource_batch(&resource_manager, pending_allocations, pending_deallocations, pending_buffer_overflows).await;
582 }
583 })
584 }
585
586 fn stateful_deduplicate_rs2<F>(
588 self,
589 config: StateConfig,
590 key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
591 ttl: std::time::Duration,
592 mut f: F,
593 ) -> Pin<Box<dyn Stream<Item = Result<T, StateError>> + Send>>
594 where
595 F: FnMut(T) -> T + Send + Sync + 'static,
596 Self: Sized,
597 {
598 let storage = config.create_storage_arc();
599 let ttl_ms = ttl.as_millis() as u64;
600 let resource_manager = get_global_resource_manager();
601
602 Box::pin(stream! {
603 let stream = self;
604 futures::pin_mut!(stream);
605 let mut item_count = 0u64;
606 let mut pending_allocations = 0u64;
607
608 while let Some(item) = StreamExt::next(&mut stream).await {
609 let key = key_extractor.extract_key(&item);
610 let state_access = StateAccess::new(storage.clone(), key.clone());
611
612 let now = unix_timestamp_millis();
613 let state_bytes = match state_access.get().await {
614 Some(bytes) => bytes,
615 None => Vec::new(),
616 };
617
618 let last_seen: u64 = if state_bytes.is_empty() {
619 0
620 } else {
621 match serde_json::from_slice(&state_bytes) {
622 Ok(timestamp) => timestamp,
623 Err(_) => 0,
624 }
625 };
626
627 if now - last_seen > ttl_ms {
628 pending_allocations += 1;
630
631 item_count += 1;
633 if item_count % RESOURCE_TRACKING_INTERVAL == 0 {
634 if pending_allocations > 0 {
635 resource_manager.track_memory_allocation(pending_allocations).await.ok();
636 pending_allocations = 0;
637 }
638 }
639
640 match serde_json::to_vec(&now) {
642 Ok(timestamp_bytes) => {
643 if let Err(e) = state_access.set(×tamp_bytes).await {
644 yield Err(StateError::Storage(format!("Failed to set state for deduplication: {}", e)));
645 continue;
646 }
647 }
648 Err(e) => {
649 yield Err(StateError::Serialization(e));
650 continue;
651 }
652 }
653
654 yield Ok(f(item));
655 }
656 }
657
658 if pending_allocations > 0 {
660 resource_manager.track_memory_allocation(pending_allocations).await.ok();
661 }
662 })
663 }
664
665 fn stateful_throttle_rs2<F>(
667 self,
668 config: StateConfig,
669 key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
670 rate_limit: u32,
671 window_duration: std::time::Duration,
672 mut f: F,
673 ) -> Pin<Box<dyn Stream<Item = Result<T, StateError>> + Send>>
674 where
675 F: FnMut(T) -> T + Send + Sync + 'static,
676 Self: Sized,
677 {
678 let storage = config.create_storage_arc();
679 let window_ms = window_duration.as_millis() as u64;
680 let resource_manager = get_global_resource_manager();
681
682 Box::pin(stream! {
683 let stream = self;
684 futures::pin_mut!(stream);
685 let mut item_count = 0u64;
686 let mut pending_allocations = 0u64;
687 let mut pending_buffer_overflows = 0u64;
688
689 while let Some(item) = StreamExt::next(&mut stream).await {
690 let key = key_extractor.extract_key(&item);
691 let state_access = StateAccess::new(storage.clone(), key.clone());
692
693 let now = unix_timestamp_millis();
694
695 let state_bytes = match state_access.get().await {
697 Some(bytes) => bytes,
698 None => Vec::new(),
699 };
700
701 let mut throttle_state: ThrottleState = if state_bytes.is_empty() {
702 pending_allocations += 1;
704 ThrottleState { count: 0, window_start: now }
705 } else {
706 match serde_json::from_slice(&state_bytes) {
707 Ok(state) => state,
708 Err(_) => ThrottleState { count: 0, window_start: now },
709 }
710 };
711
712 if now - throttle_state.window_start > window_ms {
714 throttle_state.count = 0;
715 throttle_state.window_start = now;
716 }
717
718 if throttle_state.count < rate_limit {
719 throttle_state.count += 1;
720
721 match serde_json::to_vec(&throttle_state) {
723 Ok(state_bytes) => {
724 if let Err(e) = state_access.set(&state_bytes).await {
725 yield Err(StateError::Storage(format!("Failed to set throttle state: {}", e)));
726 continue;
727 }
728 }
729 Err(e) => {
730 yield Err(StateError::Serialization(e));
731 continue;
732 }
733 }
734
735 yield Ok(f(item));
736 } else {
737 pending_buffer_overflows += 1;
739
740 let elapsed_ms = now.saturating_sub(throttle_state.window_start);
742 let remaining = if elapsed_ms >= window_ms {
743 Duration::from_millis(0)
744 } else {
745 Duration::from_millis(window_ms - elapsed_ms)
746 };
747
748 if remaining > Duration::from_millis(0) && remaining < Duration::from_secs(1) {
750 sleep(remaining).await;
751 }
752
753 let now2 = unix_timestamp_millis();
755 throttle_state.count = 1;
756 throttle_state.window_start = now2;
757
758 match serde_json::to_vec(&throttle_state) {
760 Ok(state_bytes) => {
761 if let Err(e) = state_access.set(&state_bytes).await {
762 yield Err(StateError::Storage(format!("Failed to set throttle state: {}", e)));
763 continue;
764 }
765 }
766 Err(e) => {
767 yield Err(StateError::Serialization(e));
768 continue;
769 }
770 }
771
772 yield Ok(f(item));
773 }
774
775 item_count += 1;
777 if item_count % (RESOURCE_TRACKING_INTERVAL * 2) == 0 {
778 if pending_allocations > 0 {
779 resource_manager.track_memory_allocation(pending_allocations).await.ok();
780 pending_allocations = 0;
781 }
782 for _ in 0..pending_buffer_overflows {
783 resource_manager.track_buffer_overflow().await.ok();
784 }
785 pending_buffer_overflows = 0;
786 }
787 }
788
789 if pending_allocations > 0 {
791 resource_manager.track_memory_allocation(pending_allocations).await.ok();
792 }
793 for _ in 0..pending_buffer_overflows {
794 resource_manager.track_buffer_overflow().await.ok();
795 }
796 })
797 }
798
799 fn stateful_session_rs2<F>(
801 self,
802 config: StateConfig,
803 key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
804 session_timeout: std::time::Duration,
805 mut f: F,
806 ) -> Pin<Box<dyn Stream<Item = Result<T, StateError>> + Send>>
807 where
808 F: FnMut(T, bool) -> T + Send + Sync + 'static,
809 Self: Sized,
810 {
811 let storage = config.create_storage_arc();
812 let timeout_ms = session_timeout.as_millis() as u64;
813 let resource_manager = get_global_resource_manager();
814
815 Box::pin(stream! {
816 let stream = self;
817 futures::pin_mut!(stream);
818 let mut item_count = 0u64;
819 let mut pending_allocations = 0u64;
820
821 while let Some(item) = StreamExt::next(&mut stream).await {
822 let key = key_extractor.extract_key(&item);
823 let state_access = StateAccess::new(storage.clone(), key.clone());
824
825 let now = unix_timestamp_millis();
826 let state_bytes = match state_access.get().await {
827 Some(bytes) => bytes,
828 None => Vec::new(),
829 };
830
831 let mut state: SessionState = if state_bytes.is_empty() {
832 pending_allocations += 1;
834 SessionState { last_activity: now, is_new_session: true }
835 } else {
836 match serde_json::from_slice(&state_bytes) {
837 Ok(session_state) => session_state,
838 Err(_) => SessionState { last_activity: now, is_new_session: true },
839 }
840 };
841
842 let is_new_session = now - state.last_activity > timeout_ms;
843 state.last_activity = now;
844 state.is_new_session = is_new_session;
845
846 match serde_json::to_vec(&state) {
848 Ok(state_bytes) => {
849 if let Err(e) = state_access.set(&state_bytes).await {
850 yield Err(StateError::Storage(format!("Failed to set session state: {}", e)));
851 continue;
852 }
853 }
854 Err(e) => {
855 yield Err(StateError::Serialization(e));
856 continue;
857 }
858 }
859
860 item_count += 1;
862 if item_count % RESOURCE_TRACKING_INTERVAL == 0 {
863 if pending_allocations > 0 {
864 resource_manager.track_memory_allocation(pending_allocations).await.ok();
865 pending_allocations = 0;
866 }
867 }
868
869 yield Ok(f(item, is_new_session));
870 }
871
872 if pending_allocations > 0 {
874 resource_manager.track_memory_allocation(pending_allocations).await.ok();
875 }
876 })
877 }
878
879 fn stateful_pattern_rs2<F>(
881 self,
882 config: StateConfig,
883 key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
884 pattern_size: usize,
885 mut f: F,
886 ) -> Pin<Box<dyn Stream<Item = Result<Option<String>, StateError>> + Send>>
887 where
888 F: FnMut(
889 Vec<T>,
890 StateAccess,
891 ) -> std::pin::Pin<
892 Box<dyn std::future::Future<Output = Result<Option<String>, StateError>> + Send>,
893 > + Send
894 + Sync
895 + 'static,
896 Self: Sized,
897 {
898 let storage = config.create_storage_arc();
899 let resource_manager = get_global_resource_manager();
900
901 Box::pin(stream! {
902 let stream = self;
903 futures::pin_mut!(stream);
904 let mut patterns: HashMap<String, Vec<T>> = HashMap::new();
905 let mut item_count = 0u64;
906 let mut pending_allocations = 0u64;
907 let mut pending_deallocations = 0u64;
908 let mut pending_buffer_overflows = 0u64;
909
910 while let Some(item) = StreamExt::next(&mut stream).await {
911 let key = key_extractor.extract_key(&item);
912
913 item_count += 1;
915 if item_count % CLEANUP_INTERVAL == 0 {
916 let before_len = patterns.len();
917 evict_oldest_entries(&mut patterns, MAX_HASHMAP_KEYS);
918 let after_len = patterns.len();
919 if before_len > after_len {
920 pending_deallocations += (before_len - after_len) as u64;
921 pending_buffer_overflows += 1;
922 }
923 }
924
925 if item_count % RESOURCE_TRACKING_INTERVAL == 0 {
927 track_resource_batch(&resource_manager, pending_allocations, pending_deallocations, pending_buffer_overflows).await;
928 pending_allocations = 0;
929 pending_deallocations = 0;
930 pending_buffer_overflows = 0;
931 }
932
933 let is_new_pattern = !patterns.contains_key(&key);
934 let pattern = patterns.entry(key.clone()).or_insert_with(Vec::new);
935 if is_new_pattern {
936 pending_allocations += 1;
937 }
938 pattern.push(item);
939 pending_allocations += 1;
940
941 if pattern.len() > MAX_PATTERN_SIZE {
943 let drained = pattern.len() - MAX_PATTERN_SIZE;
944 pattern.drain(0..drained);
945 pending_deallocations += drained as u64;
946 pending_buffer_overflows += 1;
947 }
948
949 if pattern.len() >= pattern_size {
950 let pattern_items = pattern.drain(..pattern_size).collect::<Vec<_>>();
951 pending_deallocations += pattern_size as u64;
952 let state_access = StateAccess::new(storage.clone(), key.clone());
953 match f(pattern_items, state_access).await {
954 Ok(result) => {
955 if let Some(pattern_str) = result {
956 yield Ok(Some(pattern_str));
957 }
958 }
959 Err(e) => yield Err(e),
960 }
961 }
962 }
963
964 if pending_allocations > 0 || pending_deallocations > 0 || pending_buffer_overflows > 0 {
966 track_resource_batch(&resource_manager, pending_allocations, pending_deallocations, pending_buffer_overflows).await;
967 }
968 })
969 }
970
971 fn stateful_join_rs2<U, F, R>(
973 self,
974 other: Pin<Box<dyn Stream<Item = U> + Send>>,
975 config: StateConfig,
976 key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
977 other_key_extractor: impl KeyExtractor<U> + Send + Sync + 'static,
978 window_duration: std::time::Duration,
979 mut f: F,
980 ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
981 where
982 F: FnMut(
983 T,
984 U,
985 StateAccess,
986 ) -> std::pin::Pin<
987 Box<dyn std::future::Future<Output = Result<R, StateError>> + Send>,
988 > + Send
989 + Sync
990 + 'static,
991 U: Send + Sync + Clone + Serialize + for<'de> Deserialize<'de> + 'static,
992 R: Send + Sync + 'static,
993 Self: Sized,
994 {
995 let storage = config.create_storage_arc();
996 let resource_manager = get_global_resource_manager();
997 Box::pin(stream! {
998 let left_stream = self;
999 let right_stream = other;
1000 futures::pin_mut!(left_stream);
1001 futures::pin_mut!(right_stream);
1002 let mut left_buffer: HashMap<String, Vec<LeftItemWithTime<T>>> = HashMap::new();
1003 let mut right_buffer: HashMap<String, Vec<RightItemWithTime<U>>> = HashMap::new();
1004 let window_ms = window_duration.as_millis() as u64;
1005 let mut item_count = 0u64;
1006 let mut pending_allocations = 0u64;
1007 let mut pending_deallocations = 0u64;
1008 let mut pending_buffer_overflows = 0u64;
1009
1010 loop {
1011 tokio::select! {
1012 left_item = left_stream.next() => {
1013 if let Some(item) = left_item {
1014 let key = key_extractor.extract_key(&item);
1015 let now = unix_timestamp_millis();
1016
1017 item_count += 1;
1019 if item_count % CLEANUP_INTERVAL == 0 {
1020 let before_left = left_buffer.len();
1021 let before_right = right_buffer.len();
1022 evict_oldest_entries(&mut left_buffer, MAX_HASHMAP_KEYS);
1023 evict_oldest_entries(&mut right_buffer, MAX_HASHMAP_KEYS);
1024 let after_left = left_buffer.len();
1025 let after_right = right_buffer.len();
1026 if before_left > after_left || before_right > after_right {
1027 pending_deallocations += (before_left + before_right - after_left - after_right) as u64;
1028 pending_buffer_overflows += 1;
1029 }
1030 }
1031
1032 if item_count % RESOURCE_TRACKING_INTERVAL == 0 {
1034 track_resource_batch(&resource_manager, pending_allocations, pending_deallocations, pending_buffer_overflows).await;
1035 pending_allocations = 0;
1036 pending_deallocations = 0;
1037 pending_buffer_overflows = 0;
1038 }
1039
1040 let before = left_buffer.entry(key.clone()).or_default().len();
1042 left_buffer.entry(key.clone()).or_default().retain(|x| now - x.timestamp <= window_ms);
1043 let after = left_buffer.entry(key.clone()).or_default().len();
1044 if before > after {
1045 pending_deallocations += (before - after) as u64;
1046 }
1047
1048 let left_entry = LeftItemWithTime { item: item.clone(), timestamp: now, key: key.clone() };
1050 let is_new_key = !left_buffer.contains_key(&key);
1051 let left_buf = left_buffer.entry(key.clone()).or_default();
1052 if is_new_key {
1053 pending_allocations += 1;
1054 }
1055 left_buf.push(left_entry.clone());
1056 pending_allocations += 1;
1057
1058 let max_size = config.max_size.unwrap_or(DEFAULT_BUFFER_SIZE);
1060 if left_buf.len() > max_size {
1061 let removed = left_buf.len() - max_size;
1062 left_buf.drain(0..removed);
1063 pending_deallocations += removed as u64;
1064 pending_buffer_overflows += 1;
1065 }
1066
1067 if let Some(rights) = right_buffer.get(&key) {
1069 for right in rights.iter().filter(|r| now - r.timestamp <= window_ms) {
1070 let state_access = StateAccess::new(storage.clone(), key.clone());
1071 match f(item.clone(), right.item.clone(), state_access).await {
1072 Ok(result) => yield Ok(result),
1073 Err(e) => yield Err(e),
1074 }
1075 }
1076 }
1077 } else {
1078 break;
1079 }
1080 }
1081 right_item = right_stream.next() => {
1082 if let Some(item) = right_item {
1083 let key = other_key_extractor.extract_key(&item);
1084 let now = unix_timestamp_millis();
1085 item_count += 1;
1087 if item_count % CLEANUP_INTERVAL == 0 {
1088 let before_left = left_buffer.len();
1089 let before_right = right_buffer.len();
1090 evict_oldest_entries(&mut left_buffer, MAX_HASHMAP_KEYS);
1091 evict_oldest_entries(&mut right_buffer, MAX_HASHMAP_KEYS);
1092 let after_left = left_buffer.len();
1093 let after_right = right_buffer.len();
1094 if before_left > after_left || before_right > after_right {
1095 pending_deallocations += (before_left + before_right - after_left - after_right) as u64;
1096 pending_buffer_overflows += 1;
1097 }
1098 }
1099
1100 if item_count % RESOURCE_TRACKING_INTERVAL == 0 {
1102 track_resource_batch(&resource_manager, pending_allocations, pending_deallocations, pending_buffer_overflows).await;
1103 pending_allocations = 0;
1104 pending_deallocations = 0;
1105 pending_buffer_overflows = 0;
1106 }
1107
1108 let before = right_buffer.entry(key.clone()).or_default().len();
1110 right_buffer.entry(key.clone()).or_default().retain(|x| now - x.timestamp <= window_ms);
1111 let after = right_buffer.entry(key.clone()).or_default().len();
1112 if before > after {
1113 pending_deallocations += (before - after) as u64;
1114 }
1115 let before = left_buffer.entry(key.clone()).or_default().len();
1117 left_buffer.entry(key.clone()).or_default().retain(|x| now - x.timestamp <= window_ms);
1118 let after = left_buffer.entry(key.clone()).or_default().len();
1119 if before > after {
1120 pending_deallocations += (before - after) as u64;
1121 }
1122 let right_entry = RightItemWithTime { item: item.clone(), timestamp: now, key: key.clone() };
1124 let is_new_key = !right_buffer.contains_key(&key);
1125 let right_buf = right_buffer.entry(key.clone()).or_default();
1126 if is_new_key {
1127 pending_allocations += 1;
1128 }
1129 right_buf.push(right_entry.clone());
1130 pending_allocations += 1;
1131 let max_size = config.max_size.unwrap_or(DEFAULT_BUFFER_SIZE);
1133 if right_buf.len() > max_size {
1134 let removed = right_buf.len() - max_size;
1135 right_buf.drain(0..removed);
1136 pending_deallocations += removed as u64;
1137 pending_buffer_overflows += 1;
1138 }
1139 if let Some(lefts) = left_buffer.get(&key) {
1141 for left in lefts.iter().filter(|l| now - l.timestamp <= window_ms) {
1142 let state_access = StateAccess::new(storage.clone(), key.clone());
1143 match f(left.item.clone(), item.clone(), state_access).await {
1144 Ok(result) => yield Ok(result),
1145 Err(e) => yield Err(e),
1146 }
1147 }
1148 }
1149 } else {
1150 break;
1151 }
1152 }
1153 }
1154 }
1155
1156 if pending_allocations > 0 || pending_deallocations > 0 || pending_buffer_overflows > 0 {
1158 track_resource_batch(&resource_manager, pending_allocations, pending_deallocations, pending_buffer_overflows).await;
1159 }
1160 })
1161 }
1162
1163 fn stateful_window_rs2<F, R>(
1165 self,
1166 config: StateConfig,
1167 key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
1168 window_size: usize,
1169 f: F,
1170 ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
1171 where
1172 F: FnMut(
1173 Vec<T>,
1174 StateAccess,
1175 ) -> std::pin::Pin<
1176 Box<dyn std::future::Future<Output = Result<R, StateError>> + Send>,
1177 > + Send
1178 + Sync
1179 + 'static,
1180 R: Send + Sync + 'static,
1181 Self: Sized,
1182 {
1183 self.stateful_window_rs2_advanced(config, key_extractor, window_size, None, false, f)
1184 }
1185
1186 fn stateful_window_rs2_advanced<F, R>(
1188 self,
1189 config: StateConfig,
1190 key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
1191 window_size: usize,
1192 slide_size: Option<usize>, emit_partial: bool, mut f: F,
1195 ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
1196 where
1197 F: FnMut(
1198 Vec<T>,
1199 StateAccess,
1200 ) -> std::pin::Pin<
1201 Box<dyn std::future::Future<Output = Result<R, StateError>> + Send>,
1202 > + Send
1203 + Sync
1204 + 'static,
1205 R: Send + Sync + 'static,
1206 Self: Sized,
1207 {
1208 let storage = config.create_storage_arc();
1209 let slide_size = slide_size.unwrap_or(window_size); let resource_manager = get_global_resource_manager();
1211
1212 Box::pin(stream! {
1213 let stream = self;
1214 futures::pin_mut!(stream);
1215 let mut windows: HashMap<String, Vec<T>> = HashMap::new();
1216 let mut item_count = 0u64;
1217 let mut pending_allocations = 0u64;
1218 let mut pending_deallocations = 0u64;
1219
1220 while let Some(item) = StreamExt::next(&mut stream).await {
1221 let key = key_extractor.extract_key(&item);
1222 let is_new_window = !windows.contains_key(&key);
1223 let window = windows.entry(key.clone()).or_insert_with(Vec::new);
1224 if is_new_window {
1225 pending_allocations += 1;
1226 }
1227
1228 window.push(item);
1229 pending_allocations += 1;
1230
1231 item_count += 1;
1233 if item_count % RESOURCE_TRACKING_INTERVAL == 0 {
1234 if pending_allocations > 0 {
1235 resource_manager.track_memory_allocation(pending_allocations).await.ok();
1236 pending_allocations = 0;
1237 }
1238 if pending_deallocations > 0 {
1239 resource_manager.track_memory_deallocation(pending_deallocations).await;
1240 pending_deallocations = 0;
1241 }
1242 }
1243
1244 if window.len() >= window_size {
1246 let window_items = if slide_size >= window_size {
1247 let items = window.drain(..).collect::<Vec<_>>();
1249 pending_deallocations += items.len() as u64;
1250 items
1251 } else {
1252 let items = window.drain(..window_size).collect::<Vec<_>>();
1254 pending_deallocations += window_size as u64;
1255
1256 let keep_count = window_size.saturating_sub(slide_size);
1258 if keep_count > 0 && items.len() >= slide_size {
1259 let to_keep = items[slide_size..].to_vec();
1261 let to_keep_len = to_keep.len();
1262 window.splice(0..0, to_keep);
1263 pending_allocations += to_keep_len as u64;
1264 }
1265
1266 items
1267 };
1268
1269 let state_access = StateAccess::new(storage.clone(), key.clone());
1270 match f(window_items, state_access).await {
1271 Ok(result) => yield Ok(result),
1272 Err(e) => yield Err(e),
1273 }
1274 }
1275 }
1276
1277 if emit_partial {
1279 for (key, window) in windows {
1280 if !window.is_empty() {
1281 pending_deallocations += window.len() as u64;
1282 let state_access = StateAccess::new(storage.clone(), key.clone());
1283 match f(window, state_access).await {
1284 Ok(result) => yield Ok(result),
1285 Err(e) => yield Err(e),
1286 }
1287 }
1288 }
1289 }
1290
1291 if pending_allocations > 0 {
1293 resource_manager.track_memory_allocation(pending_allocations).await.ok();
1294 }
1295 if pending_deallocations > 0 {
1296 resource_manager.track_memory_deallocation(pending_deallocations).await;
1297 }
1298 })
1299 }
1300}
1301
1302#[derive(Clone)]
1304pub struct StateAccess {
1305 storage: Arc<dyn StateStorage + Send + Sync>,
1306 key: String,
1307}
1308
1309impl StateAccess {
1310 pub fn new(storage: Arc<dyn StateStorage + Send + Sync>, key: String) -> Self {
1311 Self { storage, key }
1312 }
1313
1314 pub async fn get(&self) -> Option<Vec<u8>> {
1315 self.storage.get(&self.key).await
1316 }
1317
1318 pub async fn set(&self, value: &[u8]) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1319 self.storage.set(&self.key, value).await
1320 }
1321}
1322
1323fn unix_timestamp_millis() -> u64 {
1324 SystemTime::now()
1325 .duration_since(UNIX_EPOCH)
1326 .unwrap_or_default()
1327 .as_millis() as u64
1328}
1329
1330impl<T, S> StatefulStreamExt<T> for S
1332where
1333 S: Stream<Item = T> + Send + Sync + Unpin + 'static,
1334 T: Send + Sync + Clone + Serialize + for<'de> Deserialize<'de> + 'static,
1335{
1336}
1337