fluxus_api/operators/
window_sorter.rs1use async_trait::async_trait;
2use fluxus_runtime::state::KeyedStateBackend;
3use fluxus_transformers::Operator;
4use fluxus_utils::{
5 models::{Record, StreamResult},
6 window::WindowConfig,
7};
8use std::{cmp::Ordering, marker::PhantomData};
9
10pub struct WindowSorter<T, F> {
12 window_config: WindowConfig,
13 f: F,
14 state: KeyedStateBackend<u64, Vec<T>>,
15 _phantom: PhantomData<T>,
16}
17
18impl<T, F> WindowSorter<T, F>
19where
20 F: FnMut(&T, &T) -> Ordering,
21{
22 pub fn new(window_config: WindowConfig, f: F) -> Self {
23 Self {
24 window_config,
25 f,
26 state: KeyedStateBackend::new(),
27 _phantom: PhantomData,
28 }
29 }
30
31 fn get_window_keys(&self, timestamp: i64) -> Vec<u64> {
32 self.window_config.window_type.get_window_keys(timestamp)
33 }
34}
35
36#[async_trait]
37impl<T, F> Operator<T, Vec<T>> for WindowSorter<T, F>
38where
39 T: Clone + Send + Sync + 'static,
40 F: FnMut(&T, &T) -> Ordering + Send + Sync,
41{
42 async fn process(&mut self, record: Record<T>) -> StreamResult<Vec<Record<Vec<T>>>> {
43 let mut results = Vec::new();
44
45 for window_key in self.get_window_keys(record.timestamp) {
46 let mut current = self.state.get(&window_key).unwrap_or_default();
47 let index = current
48 .binary_search_by(|prob| (self.f)(prob, &record.data))
49 .unwrap_or_else(|i| i);
50 current.insert(index, record.data.clone());
51
52 self.state.set(window_key, current.clone());
53 results.push(Record {
54 data: current,
55 timestamp: record.timestamp,
56 });
57 }
58
59 Ok(results)
60 }
61}
62
63#[derive(Debug, Clone, Copy)]
65pub enum SortOrder {
66 Asc,
67 Desc,
68}
69
70pub struct WindowTimestampSorter<T> {
72 window_config: WindowConfig,
73 method: SortOrder,
74 state: KeyedStateBackend<u64, Vec<Record<T>>>,
75 _phantom: PhantomData<T>,
76}
77
78impl<T> WindowTimestampSorter<T> {
79 pub fn new(window_config: WindowConfig, method: SortOrder) -> Self {
80 Self {
81 window_config,
82 method,
83 state: KeyedStateBackend::new(),
84 _phantom: PhantomData,
85 }
86 }
87
88 fn get_window_keys(&self, timestamp: i64) -> Vec<u64> {
89 self.window_config.window_type.get_window_keys(timestamp)
90 }
91}
92
93#[async_trait]
94impl<T> Operator<T, Vec<T>> for WindowTimestampSorter<T>
95where
96 T: Clone + Send + Sync + 'static,
97{
98 async fn process(&mut self, record: Record<T>) -> StreamResult<Vec<Record<Vec<T>>>> {
99 let mut raw_results = Vec::new();
100 for window_key in self.get_window_keys(record.timestamp) {
101 let mut current = self.state.get(&window_key).unwrap_or_default();
102 let index = current
103 .binary_search_by(|prob| match self.method {
104 SortOrder::Asc => prob.timestamp.cmp(&record.timestamp),
105 SortOrder::Desc => record.timestamp.cmp(&prob.timestamp),
106 })
107 .unwrap_or_else(|i| i);
108 current.insert(index, record.clone());
109
110 self.state.set(window_key, current.clone());
111 raw_results.push(Record {
112 data: current,
113 timestamp: record.timestamp,
114 });
115 }
116 let results = raw_results
117 .into_iter()
118 .map(|Record { data, timestamp }| {
119 let data = data.into_iter().map(|rec| rec.data).collect();
120 Record { data, timestamp }
121 })
122 .collect();
123 Ok(results)
124 }
125}