1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
use std::collections::HashMap;
use std::fmt::{Debug, Display, Formatter};

use crate::api::runtime::CheckpointId;

#[derive(Clone, Serialize, Deserialize, Debug)]
#[serde(tag = "type", content = "param")]
pub enum CheckpointBackend {
    Memory,
    MySql {
        endpoint: String,
        table: Option<String>,
    },
}

impl Display for CheckpointBackend {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        match self {
            CheckpointBackend::Memory => write!(f, "Memory"),
            CheckpointBackend::MySql { endpoint, table } => {
                write!(f, "MySql{{endpoint={}}}, table={:?}}}", endpoint, table)
            }
        }
    }
}

#[derive(Copy, Clone, Serialize, Deserialize, Debug)]
#[serde(tag = "type", content = "param")]
pub enum KeyedStateBackend {
    Memory,
    // FsStateBackend(String),
    // RocksDBStateBackend(String),
}

impl Display for KeyedStateBackend {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        match self {
            KeyedStateBackend::Memory => write!(f, "Memory"),
            // StateBackend::FsStateBackend(path) => write!(f, "FsStateBackend{{path={}}}", path),
            // KeyedStateBackend::RocksDBStateBackend(path) => {
            //     write!(f, "RocksDBStateBackend{{path={}}}", path)
            // }
        }
    }
}

#[derive(Clone, Debug)]
pub struct StateValue {
    pub values: Vec<String>,
}

impl StateValue {
    pub fn new(values: Vec<String>) -> Self {
        StateValue { values }
    }
}

pub trait OperatorState: Debug {
    fn update(&mut self, checkpoint_id: CheckpointId, values: Vec<String>);
    fn snapshot(&mut self) -> std::io::Result<()>;
    fn load_latest(&self, checkpoint_id: CheckpointId)
        -> std::io::Result<HashMap<u16, StateValue>>;
}