1use crate::*;
2use crate::resource_manager::{get_global_resource_manager, ResourceManager};
3use async_stream::stream;
4use futures_core::Stream;
5use futures_util::pin_mut;
6use futures_util::stream::StreamExt;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::collections::HashSet;
10use std::pin::Pin;
11use std::sync::{Arc, Mutex};
12use std::time::{Duration, SystemTime, UNIX_EPOCH};
13use tokio::time::sleep;
14use crate::state::traits::KeyExtractor;
15use crate::state::{StateConfig, StateError, StateStorage};
16
17const MAX_HASHMAP_KEYS: usize = 10_000;
19const MAX_GROUP_SIZE: usize = 10_000; const MAX_PATTERN_SIZE: usize = 1_000; const CLEANUP_INTERVAL: u64 = 1000; const RESOURCE_TRACKING_INTERVAL: u64 = 100; const DEFAULT_BUFFER_SIZE: usize = 1024;
24
25const MAP_CLEANUP_INTERVAL: u64 = 10000; const WINDOW_CLEANUP_INTERVAL: u64 = 5000; const JOIN_CLEANUP_INTERVAL: u64 = 500; const THROTTLE_CLEANUP_INTERVAL: u64 = 10000; fn evict_oldest_entries<K, V>(map: &mut HashMap<K, V>, max_keys: usize)
33where
34 K: Clone + std::hash::Hash + Eq + std::fmt::Display + std::cmp::Ord,
35 V: Clone,
36{
37 if map.len() > max_keys {
38 let mut entries: Vec<_> = map.iter().map(|(k, _)| k.clone()).collect();
39 entries.sort(); let to_remove = entries.len() - max_keys;
41 for key in entries.into_iter().take(to_remove) {
42 map.remove(&key);
43 }
44 }
45}
46
47async fn track_resource_batch(
49 resource_manager: &Arc<ResourceManager>,
50 allocations: u64,
51 deallocations: u64,
52 buffer_overflows: u64,
53) {
54 if allocations > 0 {
55 resource_manager.track_memory_allocation(allocations).await.ok();
56 }
57 if deallocations > 0 {
58 resource_manager.track_memory_deallocation(deallocations).await;
59 }
60 for _ in 0..buffer_overflows {
61 resource_manager.track_buffer_overflow().await.ok();
62 }
63}
64
65#[derive(Serialize, Deserialize, Clone)]
66struct ThrottleState {
67 count: u32,
68 window_start: u64, }
70
71#[derive(Serialize, Deserialize, Clone)]
72struct SessionState {
73 last_activity: u64, is_new_session: bool,
75}
76
77#[derive(Serialize, Deserialize, Clone)]
78struct LeftItemWithTime<T> {
79 item: T,
80 timestamp: u64,
81 key: String,
82}
83
84#[derive(Serialize, Deserialize, Clone)]
85struct RightItemWithTime<U> {
86 item: U,
87 timestamp: u64,
88 key: String,
89}
90
91pub trait StatefulStreamExt<T>: Stream<Item = T> + Send + Sync + Sized + Unpin + 'static
93where
94 Self: 'static,
95 T: Send + Sync + Clone + Serialize + for<'de> Deserialize<'de> + 'static,
96{
97 fn stateful_map_rs2<F, R>(
99 self,
100 config: StateConfig,
101 key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
102 mut f: F,
103 ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
104 where
105 F: FnMut(
106 T,
107 StateAccess,
108 ) -> std::pin::Pin<
109 Box<dyn std::future::Future<Output = Result<R, StateError>> + Send>,
110 > + Send
111 + Sync
112 + 'static,
113 R: Send + Sync + 'static,
114 Self: Sized,
115 {
116 let storage = config.create_storage_arc();
117
118 Box::pin(stream! {
119 let stream = self;
120 futures::pin_mut!(stream);
121 let mut seen_keys: HashSet<String> = HashSet::new();
122 let mut item_count = 0u64;
123
124 while let Some(item) = StreamExt::next(&mut stream).await {
125 let key = key_extractor.extract_key(&item);
126
127 item_count += 1;
129 if item_count % MAP_CLEANUP_INTERVAL == 0 && seen_keys.len() > MAX_HASHMAP_KEYS {
130 seen_keys.clear(); }
132
133 seen_keys.insert(key.clone());
135
136 let state_access = StateAccess::new(storage.clone(), key);
137 match f(item, state_access).await {
138 Ok(result) => yield Ok(result),
139 Err(e) => yield Err(e),
140 }
141 }
142 })
143 }
144
145 fn stateful_filter_rs2<F>(
147 self,
148 config: StateConfig,
149 key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
150 mut f: F,
151 ) -> Pin<Box<dyn Stream<Item = Result<T, StateError>> + Send>>
152 where
153 F: FnMut(
154 &T,
155 StateAccess,
156 ) -> std::pin::Pin<
157 Box<dyn std::future::Future<Output = Result<bool, StateError>> + Send>,
158 > + Send
159 + Sync
160 + 'static,
161 Self: Sized,
162 {
163 let storage = config.create_storage_arc();
164
165 Box::pin(stream! {
166 let stream = self;
167 futures::pin_mut!(stream);
168 let mut seen_keys: HashSet<String> = HashSet::new();
169 let mut item_count = 0u64;
170
171 while let Some(item) = StreamExt::next(&mut stream).await {
172 let key = key_extractor.extract_key(&item);
173
174 item_count += 1;
176 if item_count % (CLEANUP_INTERVAL * 2) == 0 && seen_keys.len() > MAX_HASHMAP_KEYS {
177 seen_keys.clear();
179 }
180
181 let is_new_key = seen_keys.insert(key.clone());
183
184 let state_access = StateAccess::new(storage.clone(), key);
185 match f(&item, state_access).await {
186 Ok(should_emit) => {
187 if should_emit {
188 yield Ok(item);
189 }
190 }
191 Err(e) => yield Err(e),
192 }
193 }
194 })
195 }
196
197 fn stateful_fold_rs2<F, R>(
199 self,
200 config: StateConfig,
201 key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
202 initial: R,
203 mut f: F,
204 ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
205 where
206 F: FnMut(
207 R,
208 T,
209 StateAccess,
210 ) -> std::pin::Pin<
211 Box<dyn std::future::Future<Output = Result<R, StateError>> + Send>,
212 > + Send
213 + Sync
214 + 'static,
215 R: Send + Sync + Clone + 'static,
216 Self: Sized,
217 {
218 let storage = config.create_storage_arc();
219
220 Box::pin(stream! {
221 let stream = self;
222 futures::pin_mut!(stream);
223 let mut accumulators: HashMap<String, R> = HashMap::new();
224 let mut item_count = 0u64;
225
226 while let Some(item) = StreamExt::next(&mut stream).await {
227 let key = key_extractor.extract_key(&item);
228
229 item_count += 1;
231 if item_count % (CLEANUP_INTERVAL * 2) == 0 {
232 evict_oldest_entries(&mut accumulators, MAX_HASHMAP_KEYS);
233 }
234
235 let acc = accumulators.entry(key.clone()).or_insert_with(|| initial.clone());
236 let state_access = StateAccess::new(storage.clone(), key);
237
238 match f(acc.clone(), item, state_access).await {
239 Ok(new_acc) => {
240 *acc = new_acc.clone();
241 yield Ok(new_acc);
242 }
243 Err(e) => yield Err(e),
244 }
245 }
246 })
247 }
248
249 fn stateful_reduce_rs2<F, R>(
251 self,
252 config: StateConfig,
253 key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
254 initial: R,
255 mut f: F,
256 ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
257 where
258 F: FnMut(
259 R,
260 T,
261 StateAccess,
262 ) -> std::pin::Pin<
263 Box<dyn std::future::Future<Output = Result<R, StateError>> + Send>,
264 > + Send
265 + Sync
266 + 'static,
267 R: Send + Sync + Clone + 'static,
268 Self: Sized,
269 {
270 let storage = config.create_storage_arc();
271
272 Box::pin(stream! {
273 let stream = self;
274 futures::pin_mut!(stream);
275 let mut accumulators: HashMap<String, R> = HashMap::new();
276 let mut item_count = 0u64;
277
278 while let Some(item) = StreamExt::next(&mut stream).await {
279 let key = key_extractor.extract_key(&item);
280
281 item_count += 1;
283 if item_count % (CLEANUP_INTERVAL * 2) == 0 {
284 evict_oldest_entries(&mut accumulators, MAX_HASHMAP_KEYS);
285 }
286
287 let acc = accumulators.entry(key.clone()).or_insert_with(|| initial.clone());
288 let state_access = StateAccess::new(storage.clone(), key);
289
290 match f(acc.clone(), item, state_access).await {
291 Ok(new_acc) => {
292 *acc = new_acc.clone();
293 yield Ok(new_acc);
294 }
295 Err(e) => yield Err(e),
296 }
297 }
298 })
299 }
300
301 fn stateful_group_by_rs2<F, R>(
303 self,
304 config: StateConfig,
305 key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
306 f: F,
307 ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
308 where
309 F: FnMut(
310 String,
311 Vec<T>,
312 StateAccess,
313 ) -> std::pin::Pin<
314 Box<dyn std::future::Future<Output = Result<R, StateError>> + Send>,
315 > + Send
316 + Sync
317 + 'static,
318 R: Send + Sync + 'static,
319 Self: Sized,
320 {
321 self.stateful_group_by_advanced_rs2(config, key_extractor, None, None, false, f)
322 }
323
324 fn stateful_group_by_advanced_rs2<F, R>(
326 self,
327 config: StateConfig,
328 key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
329 max_group_size: Option<usize>, group_timeout: Option<std::time::Duration>, emit_on_key_change: bool, mut f: F,
333 ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
334 where
335 F: FnMut(
336 String,
337 Vec<T>,
338 StateAccess,
339 ) -> std::pin::Pin<
340 Box<dyn std::future::Future<Output = Result<R, StateError>> + Send>,
341 > + Send
342 + Sync
343 + 'static,
344 R: Send + Sync + 'static,
345 Self: Sized,
346 {
347 let storage = config.create_storage_arc();
348 let timeout_ms = group_timeout.map(|d| d.as_millis() as u64);
349 let max_group_size = max_group_size.unwrap_or(MAX_GROUP_SIZE);
350
351 Box::pin(stream! {
352 let stream = self;
353 futures::pin_mut!(stream);
354 let mut groups: HashMap<String, Vec<T>> = HashMap::new();
355 let mut group_timestamps: HashMap<String, u64> = HashMap::new();
356 let mut last_key: Option<String> = None;
357 let mut item_count = 0u64;
358
359 while let Some(item) = StreamExt::next(&mut stream).await {
360 let key = key_extractor.extract_key(&item);
361 let now = unix_timestamp_millis();
362
363 item_count += 1;
365 if item_count % (CLEANUP_INTERVAL * 2) == 0 {
366 evict_oldest_entries(&mut groups, MAX_HASHMAP_KEYS);
367 evict_oldest_entries(&mut group_timestamps, MAX_HASHMAP_KEYS);
368 }
369
370 if emit_on_key_change {
372 if let Some(ref last_key_val) = last_key {
373 if last_key_val != &key {
374 if let Some(group_items) = groups.remove(last_key_val) {
376 let state_access = StateAccess::new(storage.clone(), last_key_val.clone());
377 match f(last_key_val.clone(), group_items, state_access).await {
378 Ok(result) => yield Ok(result),
379 Err(e) => yield Err(e),
380 }
381 }
382 group_timestamps.remove(last_key_val);
383 }
384 }
385 }
386
387 if let (Some(timeout), Some(&group_start)) = (timeout_ms, group_timestamps.get(&key)) {
389 if now - group_start > timeout {
390 if let Some(group_items) = groups.remove(&key) {
391 let state_access = StateAccess::new(storage.clone(), key.clone());
392 match f(key.clone(), group_items, state_access).await {
393 Ok(result) => yield Ok(result),
394 Err(e) => yield Err(e),
395 }
396 }
397 group_timestamps.remove(&key);
398 }
399 }
400
401 let group = groups.entry(key.clone()).or_insert_with(Vec::new);
403 group_timestamps.entry(key.clone()).or_insert(now);
404 group.push(item);
405
406 if group.len() >= max_group_size {
408 if let Some(group_items) = groups.remove(&key) {
409 let state_access = StateAccess::new(storage.clone(), key.clone());
410 match f(key.clone(), group_items, state_access).await {
411 Ok(result) => yield Ok(result),
412 Err(e) => yield Err(e),
413 }
414 }
415 group_timestamps.remove(&key);
416 }
417
418 last_key = Some(key);
419 }
420
421 let now = unix_timestamp_millis();
423 let mut expired_keys = Vec::new();
424
425 if let Some(timeout) = timeout_ms {
426 for (key, &group_start) in &group_timestamps {
427 if now - group_start > timeout {
428 expired_keys.push(key.clone());
429 }
430 }
431 }
432
433 for key in expired_keys {
435 let key_clone = key.clone();
436 if let Some(group_items) = groups.remove(&key_clone) {
437 let state_access = StateAccess::new(storage.clone(), key_clone.clone());
438 match f(key_clone.clone(), group_items, state_access).await {
439 Ok(result) => yield Ok(result),
440 Err(e) => yield Err(e),
441 }
442 }
443 group_timestamps.remove(&key_clone);
444 }
445
446 for (key, group_items) in groups {
448 let state_access = StateAccess::new(storage.clone(), key.clone());
449 match f(key, group_items, state_access).await {
450 Ok(result) => yield Ok(result),
451 Err(e) => yield Err(e),
452 }
453 }
454 })
455 }
456
457 fn stateful_deduplicate_rs2<F>(
459 self,
460 config: StateConfig,
461 key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
462 ttl: std::time::Duration,
463 mut f: F,
464 ) -> Pin<Box<dyn Stream<Item = Result<T, StateError>> + Send>>
465 where
466 F: FnMut(T) -> T + Send + Sync + 'static,
467 Self: Sized,
468 {
469 let storage = config.create_storage_arc();
470 let ttl_ms = ttl.as_millis() as u64;
471
472 Box::pin(stream! {
473 let stream = self;
474 futures::pin_mut!(stream);
475
476 while let Some(item) = StreamExt::next(&mut stream).await {
477 let key = key_extractor.extract_key(&item);
478 let state_access = StateAccess::new(storage.clone(), key.clone());
479
480 let now = unix_timestamp_millis();
481 let state_bytes = match state_access.get().await {
482 Some(bytes) => bytes,
483 None => Vec::new(),
484 };
485
486 let last_seen: u64 = if state_bytes.is_empty() {
487 0
488 } else {
489 match serde_json::from_slice(&state_bytes) {
490 Ok(timestamp) => timestamp,
491 Err(_) => 0,
492 }
493 };
494
495 if now - last_seen > ttl_ms {
496 match serde_json::to_vec(&now) {
498 Ok(timestamp_bytes) => {
499 if let Err(e) = state_access.set(×tamp_bytes).await {
500 yield Err(StateError::Storage(format!("Failed to set state for deduplication: {}", e)));
501 continue;
502 }
503 }
504 Err(e) => {
505 yield Err(StateError::Serialization(e));
506 continue;
507 }
508 }
509
510 yield Ok(f(item));
511 }
512 }
513 })
514 }
515
516 fn stateful_throttle_rs2<F>(
518 self,
519 config: StateConfig,
520 key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
521 rate_limit: u32,
522 window_duration: std::time::Duration,
523 mut f: F,
524 ) -> Pin<Box<dyn Stream<Item = Result<T, StateError>> + Send>>
525 where
526 F: FnMut(T) -> T + Send + Sync + 'static,
527 Self: Sized,
528 {
529 let storage = config.create_storage_arc();
530 let window_ms = window_duration.as_millis() as u64;
531
532 Box::pin(stream! {
533 let stream = self;
534 futures::pin_mut!(stream);
535
536 while let Some(item) = StreamExt::next(&mut stream).await {
537 let key = key_extractor.extract_key(&item);
538 let state_access = StateAccess::new(storage.clone(), key.clone());
539
540 let now = unix_timestamp_millis();
541
542 let state_bytes = match state_access.get().await {
544 Some(bytes) => bytes,
545 None => Vec::new(),
546 };
547
548 let mut throttle_state: ThrottleState = if state_bytes.is_empty() {
549 ThrottleState { count: 0, window_start: now }
550 } else {
551 match serde_json::from_slice(&state_bytes) {
552 Ok(state) => state,
553 Err(_) => ThrottleState { count: 0, window_start: now },
554 }
555 };
556
557 if now - throttle_state.window_start > window_ms {
559 throttle_state.count = 0;
560 throttle_state.window_start = now;
561 }
562
563 if throttle_state.count < rate_limit {
564 throttle_state.count += 1;
565
566 match serde_json::to_vec(&throttle_state) {
568 Ok(state_bytes) => {
569 if let Err(e) = state_access.set(&state_bytes).await {
570 yield Err(StateError::Storage(format!("Failed to set throttle state: {}", e)));
571 continue;
572 }
573 }
574 Err(e) => {
575 yield Err(StateError::Serialization(e));
576 continue;
577 }
578 }
579
580 yield Ok(f(item));
581 } else {
582 let elapsed_ms = now.saturating_sub(throttle_state.window_start);
584 let remaining = if elapsed_ms >= window_ms {
585 Duration::from_millis(0)
586 } else {
587 Duration::from_millis(window_ms - elapsed_ms)
588 };
589
590 if remaining > Duration::from_millis(0) && remaining < Duration::from_secs(1) {
592 sleep(remaining).await;
593 }
594
595 let now2 = unix_timestamp_millis();
597 throttle_state.count = 1;
598 throttle_state.window_start = now2;
599
600 match serde_json::to_vec(&throttle_state) {
602 Ok(state_bytes) => {
603 if let Err(e) = state_access.set(&state_bytes).await {
604 yield Err(StateError::Storage(format!("Failed to set throttle state: {}", e)));
605 continue;
606 }
607 }
608 Err(e) => {
609 yield Err(StateError::Serialization(e));
610 continue;
611 }
612 }
613
614 yield Ok(f(item));
615 }
616 }
617 })
618 }
619
620 fn stateful_session_rs2<F>(
622 self,
623 config: StateConfig,
624 key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
625 session_timeout: std::time::Duration,
626 mut f: F,
627 ) -> Pin<Box<dyn Stream<Item = Result<T, StateError>> + Send>>
628 where
629 F: FnMut(T, bool) -> T + Send + Sync + 'static,
630 Self: Sized,
631 {
632 let storage = config.create_storage_arc();
633 let timeout_ms = session_timeout.as_millis() as u64;
634
635 Box::pin(stream! {
636 let stream = self;
637 futures::pin_mut!(stream);
638
639 while let Some(item) = StreamExt::next(&mut stream).await {
640 let key = key_extractor.extract_key(&item);
641 let state_access = StateAccess::new(storage.clone(), key.clone());
642
643 let now = unix_timestamp_millis();
644 let state_bytes = match state_access.get().await {
645 Some(bytes) => bytes,
646 None => Vec::new(),
647 };
648
649 let mut state: SessionState = if state_bytes.is_empty() {
650 SessionState { last_activity: now, is_new_session: true }
651 } else {
652 match serde_json::from_slice(&state_bytes) {
653 Ok(session_state) => session_state,
654 Err(_) => SessionState { last_activity: now, is_new_session: true },
655 }
656 };
657
658 let is_new_session = now - state.last_activity > timeout_ms;
659 state.last_activity = now;
660 state.is_new_session = is_new_session;
661
662 match serde_json::to_vec(&state) {
664 Ok(state_bytes) => {
665 if let Err(e) = state_access.set(&state_bytes).await {
666 yield Err(StateError::Storage(format!("Failed to set session state: {}", e)));
667 continue;
668 }
669 }
670 Err(e) => {
671 yield Err(StateError::Serialization(e));
672 continue;
673 }
674 }
675
676 yield Ok(f(item, is_new_session));
677 }
678 })
679 }
680
681 fn stateful_pattern_rs2<F>(
683 self,
684 config: StateConfig,
685 key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
686 pattern_size: usize,
687 mut f: F,
688 ) -> Pin<Box<dyn Stream<Item = Result<Option<String>, StateError>> + Send>>
689 where
690 F: FnMut(
691 Vec<T>,
692 StateAccess,
693 ) -> std::pin::Pin<
694 Box<dyn std::future::Future<Output = Result<Option<String>, StateError>> + Send>,
695 > + Send
696 + Sync
697 + 'static,
698 Self: Sized,
699 {
700 let storage = config.create_storage_arc();
701
702 Box::pin(stream! {
703 let stream = self;
704 futures::pin_mut!(stream);
705 let mut patterns: HashMap<String, Vec<T>> = HashMap::new();
706 let mut item_count = 0u64;
707
708 while let Some(item) = StreamExt::next(&mut stream).await {
709 let key = key_extractor.extract_key(&item);
710
711 item_count += 1;
713 if item_count % (CLEANUP_INTERVAL * 2) == 0 {
714 evict_oldest_entries(&mut patterns, MAX_HASHMAP_KEYS);
715 }
716
717 let pattern = patterns.entry(key.clone()).or_insert_with(Vec::new);
718 pattern.push(item);
719
720 if pattern.len() > MAX_PATTERN_SIZE {
722 pattern.drain(0..pattern.len() - MAX_PATTERN_SIZE);
723 }
724
725 if pattern.len() >= pattern_size {
726 let pattern_items = pattern.drain(..pattern_size).collect::<Vec<_>>();
727 let state_access = StateAccess::new(storage.clone(), key.clone());
728 match f(pattern_items, state_access).await {
729 Ok(result) => {
730 if let Some(pattern_str) = result {
731 yield Ok(Some(pattern_str));
732 }
733 }
734 Err(e) => yield Err(e),
735 }
736 }
737 }
738 })
739 }
740
741 fn stateful_join_rs2<U, F, R>(
743 self,
744 other: Pin<Box<dyn Stream<Item = U> + Send>>,
745 config: StateConfig,
746 key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
747 other_key_extractor: impl KeyExtractor<U> + Send + Sync + 'static,
748 window_duration: std::time::Duration,
749 mut f: F,
750 ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
751 where
752 F: FnMut(
753 T,
754 U,
755 StateAccess,
756 ) -> std::pin::Pin<
757 Box<dyn std::future::Future<Output = Result<R, StateError>> + Send>,
758 > + Send
759 + Sync
760 + 'static,
761 U: Send + Sync + Clone + Serialize + for<'de> Deserialize<'de> + 'static,
762 R: Send + Sync + 'static,
763 Self: Sized,
764 {
765 let storage = config.create_storage_arc();
766 Box::pin(stream! {
767 let left_stream = self;
768 let right_stream = other;
769 futures::pin_mut!(left_stream);
770 futures::pin_mut!(right_stream);
771 let mut left_buffer: HashMap<String, Vec<LeftItemWithTime<T>>> = HashMap::new();
772 let mut right_buffer: HashMap<String, Vec<RightItemWithTime<U>>> = HashMap::new();
773 let window_ms = window_duration.as_millis() as u64;
774 let mut item_count = 0u64;
775
776 loop {
777 tokio::select! {
778 left_item = left_stream.next() => {
779 if let Some(item) = left_item {
780 let key = key_extractor.extract_key(&item);
781 let now = unix_timestamp_millis();
782
783 item_count += 1;
785 if item_count % JOIN_CLEANUP_INTERVAL == 0 {
786 evict_oldest_entries(&mut left_buffer, MAX_HASHMAP_KEYS);
787 evict_oldest_entries(&mut right_buffer, MAX_HASHMAP_KEYS);
788 }
789
790 left_buffer.entry(key.clone()).or_default().retain(|x| now - x.timestamp <= window_ms);
792
793 let left_entry = LeftItemWithTime { item: item.clone(), timestamp: now, key: key.clone() };
795 left_buffer.entry(key.clone()).or_default().push(left_entry.clone());
796
797 let max_size = config.max_size.unwrap_or(DEFAULT_BUFFER_SIZE);
799 let left_buf = left_buffer.get_mut(&key).unwrap();
800 if left_buf.len() > max_size {
801 left_buf.drain(0..left_buf.len() - max_size);
802 }
803
804 if let Some(rights) = right_buffer.get(&key) {
806 for right in rights.iter().filter(|r| now - r.timestamp <= window_ms) {
807 let state_access = StateAccess::new(storage.clone(), key.clone());
808 match f(item.clone(), right.item.clone(), state_access).await {
809 Ok(result) => yield Ok(result),
810 Err(e) => yield Err(e),
811 }
812 }
813 }
814 } else {
815 break;
816 }
817 }
818 right_item = right_stream.next() => {
819 if let Some(item) = right_item {
820 let key = other_key_extractor.extract_key(&item);
821 let now = unix_timestamp_millis();
822
823 item_count += 1;
825 if item_count % JOIN_CLEANUP_INTERVAL == 0 {
826 evict_oldest_entries(&mut left_buffer, MAX_HASHMAP_KEYS);
827 evict_oldest_entries(&mut right_buffer, MAX_HASHMAP_KEYS);
828 }
829
830 right_buffer.entry(key.clone()).or_default().retain(|x| now - x.timestamp <= window_ms);
832 left_buffer.entry(key.clone()).or_default().retain(|x| now - x.timestamp <= window_ms);
834
835 let right_entry = RightItemWithTime { item: item.clone(), timestamp: now, key: key.clone() };
837 right_buffer.entry(key.clone()).or_default().push(right_entry.clone());
838
839 let max_size = config.max_size.unwrap_or(DEFAULT_BUFFER_SIZE);
841 let right_buf = right_buffer.get_mut(&key).unwrap();
842 if right_buf.len() > max_size {
843 right_buf.drain(0..right_buf.len() - max_size);
844 }
845
846 if let Some(lefts) = left_buffer.get(&key) {
848 for left in lefts.iter().filter(|l| now - l.timestamp <= window_ms) {
849 let state_access = StateAccess::new(storage.clone(), key.clone());
850 match f(left.item.clone(), item.clone(), state_access).await {
851 Ok(result) => yield Ok(result),
852 Err(e) => yield Err(e),
853 }
854 }
855 }
856 } else {
857 break;
858 }
859 }
860 }
861 }
862 })
863 }
864
865 fn stateful_window_rs2<F, R>(
867 self,
868 config: StateConfig,
869 key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
870 window_size: usize,
871 f: F,
872 ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
873 where
874 F: FnMut(
875 Vec<T>,
876 StateAccess,
877 ) -> std::pin::Pin<
878 Box<dyn std::future::Future<Output = Result<R, StateError>> + Send>,
879 > + Send
880 + Sync
881 + 'static,
882 R: Send + Sync + 'static,
883 Self: Sized,
884 {
885 self.stateful_window_rs2_advanced(config, key_extractor, window_size, None, false, f)
886 }
887
888 fn stateful_window_rs2_advanced<F, R>(
890 self,
891 config: StateConfig,
892 key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
893 window_size: usize,
894 slide_size: Option<usize>, emit_partial: bool, mut f: F,
897 ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
898 where
899 F: FnMut(
900 Vec<T>,
901 StateAccess,
902 ) -> std::pin::Pin<
903 Box<dyn std::future::Future<Output = Result<R, StateError>> + Send>,
904 > + Send
905 + Sync
906 + 'static,
907 R: Send + Sync + 'static,
908 Self: Sized,
909 {
910 let storage = config.create_storage_arc();
911 let slide_size = slide_size.unwrap_or(window_size); Box::pin(stream! {
914 let stream = self;
915 futures::pin_mut!(stream);
916 let mut windows: HashMap<String, Vec<T>> = HashMap::new();
917
918 while let Some(item) = StreamExt::next(&mut stream).await {
919 let key = key_extractor.extract_key(&item);
920 let window = windows.entry(key.clone()).or_insert_with(Vec::new);
921
922 window.push(item);
923
924 if window.len() >= window_size {
926 let window_items = if slide_size >= window_size {
927 window.drain(..).collect::<Vec<_>>()
929 } else {
930 let items = window.drain(..window_size).collect::<Vec<_>>();
932
933 let keep_count = window_size.saturating_sub(slide_size);
935 if keep_count > 0 && items.len() >= slide_size {
936 let to_keep = items[slide_size..].to_vec();
938 window.splice(0..0, to_keep);
939 }
940
941 items
942 };
943
944 let state_access = StateAccess::new(storage.clone(), key.clone());
945 match f(window_items, state_access).await {
946 Ok(result) => yield Ok(result),
947 Err(e) => yield Err(e),
948 }
949 }
950 }
951
952 if emit_partial {
954 for (key, window) in windows {
955 if !window.is_empty() {
956 let state_access = StateAccess::new(storage.clone(), key.clone());
957 match f(window, state_access).await {
958 Ok(result) => yield Ok(result),
959 Err(e) => yield Err(e),
960 }
961 }
962 }
963 }
964 })
965 }
966}
967
968#[derive(Clone)]
970pub struct StateAccess {
971 storage: Arc<dyn StateStorage + Send + Sync>,
972 key: String,
973}
974
975impl StateAccess {
976 pub fn new(storage: Arc<dyn StateStorage + Send + Sync>, key: String) -> Self {
977 Self { storage, key }
978 }
979
980 pub async fn get(&self) -> Option<Vec<u8>> {
981 self.storage.get(&self.key).await
982 }
983
984 pub async fn set(&self, value: &[u8]) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
985 self.storage.set(&self.key, value).await
986 }
987}
988
989fn unix_timestamp_millis() -> u64 {
990 SystemTime::now()
991 .duration_since(UNIX_EPOCH)
992 .unwrap_or_default()
993 .as_millis() as u64
994}
995
996impl<T, S> StatefulStreamExt<T> for S
998where
999 S: Stream<Item = T> + Send + Sync + Unpin + 'static,
1000 T: Send + Sync + Clone + Serialize + for<'de> Deserialize<'de> + 'static,
1001{
1002}
1003