rlink/core/
checkpoint.rs

1use std::fmt::Debug;
2
3use crate::core::runtime::{CheckpointId, OperatorId, TaskId};
4
5/// This struct provides a context in which user functions that use managed state metadata
6#[derive(Clone, Debug)]
7pub struct FunctionSnapshotContext {
8    pub operator_id: OperatorId,
9    pub task_id: TaskId,
10    pub checkpoint_id: CheckpointId,
11    pub completed_checkpoint_id: Option<CheckpointId>,
12}
13
14impl FunctionSnapshotContext {
15    pub fn new(
16        operator_id: OperatorId,
17        task_id: TaskId,
18        checkpoint_id: CheckpointId,
19        completed_checkpoint_id: Option<CheckpointId>,
20    ) -> Self {
21        FunctionSnapshotContext {
22            operator_id,
23            task_id,
24            checkpoint_id,
25            completed_checkpoint_id,
26        }
27    }
28}
29
30/// checkpoint handle
31/// identify a checkpoint resource
32/// eg: 1. checkpoint state and save to distribution file system, mark the file path as handle.
33///     2. as mq system, mark the offset as handle
34#[derive(Clone, Debug, Serialize, Deserialize)]
35pub struct CheckpointHandle {
36    pub handle: String,
37}
38
39impl Default for CheckpointHandle {
40    fn default() -> Self {
41        Self {
42            handle: "".to_string(),
43        }
44    }
45}
46
47/// descriptor a `Checkpoint`
48/// use for network communication between `Coordinator` and `Worker`
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct Checkpoint {
51    pub operator_id: OperatorId,
52    pub task_id: TaskId,
53    pub checkpoint_id: CheckpointId,
54    pub completed_checkpoint_id: Option<CheckpointId>,
55    pub handle: CheckpointHandle,
56}
57
58pub trait CheckpointFunction {
59    fn consult_version(
60        &mut self,
61        context: &FunctionSnapshotContext,
62        _handle: &Option<CheckpointHandle>,
63    ) -> CheckpointId {
64        context.checkpoint_id
65    }
66
67    /// trigger the method when a `operator` initialization
68    fn initialize_state(
69        &mut self,
70        _context: &FunctionSnapshotContext,
71        _handle: &Option<CheckpointHandle>,
72    ) {
73    }
74
75    /// trigger the method when the `operator` operate a `Barrier` event
76    fn snapshot_state(&mut self, _context: &FunctionSnapshotContext) -> Option<CheckpointHandle> {
77        None
78    }
79}