fluxus_api/operators/
window_sorter.rs

1use async_trait::async_trait;
2use fluxus_runtime::state::KeyedStateBackend;
3use fluxus_transformers::Operator;
4use fluxus_utils::{
5    models::{Record, StreamResult},
6    window::WindowConfig,
7};
8use std::{cmp::Ordering, marker::PhantomData};
9
10/// sort_by operator for windowed stream.
11pub struct WindowSorter<T, F> {
12    window_config: WindowConfig,
13    f: F,
14    state: KeyedStateBackend<u64, Vec<T>>,
15    _phantom: PhantomData<T>,
16}
17
18impl<T, F> WindowSorter<T, F>
19where
20    F: FnMut(&T, &T) -> Ordering,
21{
22    pub fn new(window_config: WindowConfig, f: F) -> Self {
23        Self {
24            window_config,
25            f,
26            state: KeyedStateBackend::new(),
27            _phantom: PhantomData,
28        }
29    }
30
31    fn get_window_keys(&self, timestamp: i64) -> Vec<u64> {
32        self.window_config.window_type.get_window_keys(timestamp)
33    }
34}
35
36#[async_trait]
37impl<T, F> Operator<T, Vec<T>> for WindowSorter<T, F>
38where
39    T: Clone + Send + Sync + 'static,
40    F: FnMut(&T, &T) -> Ordering + Send + Sync,
41{
42    async fn process(&mut self, record: Record<T>) -> StreamResult<Vec<Record<Vec<T>>>> {
43        let mut results = Vec::new();
44
45        for window_key in self.get_window_keys(record.timestamp) {
46            let mut current = self.state.get(&window_key).unwrap_or_default();
47            let index = current
48                .binary_search_by(|prob| (self.f)(prob, &record.data))
49                .unwrap_or_else(|i| i);
50            current.insert(index, record.data.clone());
51
52            self.state.set(window_key, current.clone());
53            results.push(Record {
54                data: current,
55                timestamp: record.timestamp,
56            });
57        }
58
59        Ok(results)
60    }
61}
62
63/// Specify sorting method of sort_by_ts
64#[derive(Debug, Clone, Copy)]
65pub enum SortOrder {
66    Asc,
67    Desc,
68}
69
70/// sort_by_ts operator for windowed stream.
71pub struct WindowTimestampSorter<T> {
72    window_config: WindowConfig,
73    method: SortOrder,
74    state: KeyedStateBackend<u64, Vec<Record<T>>>,
75    _phantom: PhantomData<T>,
76}
77
78impl<T> WindowTimestampSorter<T> {
79    pub fn new(window_config: WindowConfig, method: SortOrder) -> Self {
80        Self {
81            window_config,
82            method,
83            state: KeyedStateBackend::new(),
84            _phantom: PhantomData,
85        }
86    }
87
88    fn get_window_keys(&self, timestamp: i64) -> Vec<u64> {
89        self.window_config.window_type.get_window_keys(timestamp)
90    }
91}
92
93#[async_trait]
94impl<T> Operator<T, Vec<T>> for WindowTimestampSorter<T>
95where
96    T: Clone + Send + Sync + 'static,
97{
98    async fn process(&mut self, record: Record<T>) -> StreamResult<Vec<Record<Vec<T>>>> {
99        let mut raw_results = Vec::new();
100        for window_key in self.get_window_keys(record.timestamp) {
101            let mut current = self.state.get(&window_key).unwrap_or_default();
102            let index = current
103                .binary_search_by(|prob| match self.method {
104                    SortOrder::Asc => prob.timestamp.cmp(&record.timestamp),
105                    SortOrder::Desc => record.timestamp.cmp(&prob.timestamp),
106                })
107                .unwrap_or_else(|i| i);
108            current.insert(index, record.clone());
109
110            self.state.set(window_key, current.clone());
111            raw_results.push(Record {
112                data: current,
113                timestamp: record.timestamp,
114            });
115        }
116        let results = raw_results
117            .into_iter()
118            .map(|Record { data, timestamp }| {
119                let data = data.into_iter().map(|rec| rec.data).collect();
120                Record { data, timestamp }
121            })
122            .collect();
123        Ok(results)
124    }
125}