1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
use crate::api::backend::KeyedStateBackend;
use crate::api::checkpoint::{CheckpointFunction, CheckpointHandle, FunctionSnapshotContext};
use crate::api::element::Record;
use crate::api::function::{BaseReduceFunction, Context, NamedFunction, ReduceFunction};
use crate::api::properties::SystemProperties;
use crate::api::window::TWindow;
use crate::storage::keyed_state::{TWindowState, WindowState};
use crate::utils::date_time::timestamp_str;

pub(crate) struct WindowBaseReduceFunction {
    reduce: Box<dyn ReduceFunction>,

    state: Option<WindowState>,
}

impl WindowBaseReduceFunction {
    pub fn new(reduce: Box<dyn ReduceFunction>) -> Self {
        WindowBaseReduceFunction {
            reduce,
            state: None,
        }
    }
}

impl BaseReduceFunction for WindowBaseReduceFunction {
    fn open(&mut self, context: &Context) -> crate::api::Result<()> {
        let task_id = context.task_id;
        let application_id = context.application_id.clone();

        let state_mode = context
            .application_properties
            .get_keyed_state_backend()
            .unwrap_or(KeyedStateBackend::Memory);
        self.state = Some(WindowState::new(
            application_id,
            task_id.job_id(),
            task_id.task_number(),
            state_mode,
        ));
        Ok(())
    }

    fn reduce(&mut self, key: Record, record: Record) {
        let state = self.state.as_mut().unwrap();
        let reduce_func = &self.reduce;
        state.merge(key, record, |val1, val2| reduce_func.reduce(val1, val2));
    }

    fn drop_state(&mut self, watermark_timestamp: u64) -> Vec<Record> {
        let state = self.state.as_mut().unwrap();
        let mut drop_windows = Vec::new();
        for window in state.windows() {
            if window.max_timestamp() <= watermark_timestamp {
                drop_windows.push(window.clone());
                state.drop_window(&window);
            }
        }

        if drop_windows.len() > 0 {
            debug!(
                "check window for drop, trigger watermark={}, drop window size={}",
                timestamp_str(watermark_timestamp),
                drop_windows.len()
            );

            drop_windows.sort_by_key(|w| w.max_timestamp());

            drop_windows
                .into_iter()
                .map(|drop_window| {
                    let mut drop_record = Record::new();
                    drop_record.trigger_window = Some(drop_window);
                    drop_record
                })
                .collect()
        } else {
            vec![]
        }
    }

    fn close(&mut self) -> crate::api::Result<()> {
        Ok(())
    }
}

impl NamedFunction for WindowBaseReduceFunction {
    fn name(&self) -> &str {
        "WindowBaseReduceFunction"
    }
}

impl CheckpointFunction for WindowBaseReduceFunction {
    fn initialize_state(
        &mut self,
        _context: &FunctionSnapshotContext,
        _handle: &Option<CheckpointHandle>,
    ) {
    }

    fn snapshot_state(&mut self, _context: &FunctionSnapshotContext) -> Option<CheckpointHandle> {
        let windows = self.state.as_ref().unwrap().windows();
        Some(CheckpointHandle {
            handle: serde_json::to_string(&windows).unwrap(),
        })
    }
}