1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
use crate::api::backend::KeyedStateBackend; use crate::api::checkpoint::{CheckpointFunction, CheckpointHandle, FunctionSnapshotContext}; use crate::api::element::Record; use crate::api::function::{BaseReduceFunction, Context, NamedFunction, ReduceFunction}; use crate::api::properties::SystemProperties; use crate::api::window::TWindow; use crate::storage::keyed_state::{TWindowState, WindowState}; use crate::utils::date_time::timestamp_str; pub(crate) struct WindowBaseReduceFunction { reduce: Box<dyn ReduceFunction>, state: Option<WindowState>, } impl WindowBaseReduceFunction { pub fn new(reduce: Box<dyn ReduceFunction>) -> Self { WindowBaseReduceFunction { reduce, state: None, } } } impl BaseReduceFunction for WindowBaseReduceFunction { fn open(&mut self, context: &Context) -> crate::api::Result<()> { let task_id = context.task_id; let application_id = context.application_id.clone(); let state_mode = context .application_properties .get_keyed_state_backend() .unwrap_or(KeyedStateBackend::Memory); self.state = Some(WindowState::new( application_id, task_id.job_id(), task_id.task_number(), state_mode, )); Ok(()) } fn reduce(&mut self, key: Record, record: Record) { let state = self.state.as_mut().unwrap(); let reduce_func = &self.reduce; state.merge(key, record, |val1, val2| reduce_func.reduce(val1, val2)); } fn drop_state(&mut self, watermark_timestamp: u64) -> Vec<Record> { let state = self.state.as_mut().unwrap(); let mut drop_windows = Vec::new(); for window in state.windows() { if window.max_timestamp() <= watermark_timestamp { drop_windows.push(window.clone()); state.drop_window(&window); } } if drop_windows.len() > 0 { debug!( "check window for drop, trigger watermark={}, drop window size={}", timestamp_str(watermark_timestamp), drop_windows.len() ); drop_windows.sort_by_key(|w| w.max_timestamp()); drop_windows .into_iter() .map(|drop_window| { let mut drop_record = Record::new(); drop_record.trigger_window = Some(drop_window); drop_record }) .collect() } else { vec![] } } fn close(&mut self) -> crate::api::Result<()> { Ok(()) } } impl NamedFunction for WindowBaseReduceFunction { fn name(&self) -> &str { "WindowBaseReduceFunction" } } impl CheckpointFunction for WindowBaseReduceFunction { fn initialize_state( &mut self, _context: &FunctionSnapshotContext, _handle: &Option<CheckpointHandle>, ) { } fn snapshot_state(&mut self, _context: &FunctionSnapshotContext) -> Option<CheckpointHandle> { let windows = self.state.as_ref().unwrap().windows(); Some(CheckpointHandle { handle: serde_json::to_string(&windows).unwrap(), }) } }