drasi_core/interface/checkpoint_store.rs
1// Copyright 2024 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Checkpoint Store Trait
16//!
17//! Atomic checkpoint persistence for source sequence tracking, opaque source
18//! position bytes (for native stream resumption), and config hashing.
19//!
20//! Implementations are paired with an [`IndexBackendPlugin`](super::IndexBackendPlugin)
21//! and share the same session state. [`CheckpointStore::stage_checkpoint`] writes
22//! into the currently-active session transaction (opened by
23//! [`SessionControl::begin`](super::SessionControl)) and is committed by the
24//! session's outer commit alongside the index updates. All other methods operate
25//! outside a session transaction and commit independently.
26//!
27//! This trait lives in core (not lib) so that index plugins can implement it
28//! without taking a reverse dependency on `drasi-lib`.
29
30use std::collections::HashMap;
31
32use async_trait::async_trait;
33use bytes::Bytes;
34
35use super::IndexError;
36
37/// Per-source checkpoint data.
38///
39/// Contains the monotonic sequence number and an optional opaque position
40/// that the source interprets to seek back into its native change stream
41/// (e.g., Postgres LSN, Kafka offset, EventHub sequence number).
42#[derive(Debug, Clone, PartialEq, Eq)]
43pub struct SourceCheckpoint {
44 pub sequence: u64,
45 /// Opaque position bytes for native stream resumption.
46 /// Sources interpret these to seek back into their change stream.
47 pub source_position: Option<Bytes>,
48}
49
50impl SourceCheckpoint {
51 pub fn new(sequence: u64, source_position: Option<Bytes>) -> Self {
52 Self {
53 sequence,
54 source_position,
55 }
56 }
57}
58
59/// Atomic checkpoint persistence for source sequence tracking and config hashing.
60///
61/// # Method semantics
62///
63/// - [`stage_checkpoint`](Self::stage_checkpoint) SHOULD be called between
64/// `SessionControl::begin` and `SessionControl::commit` for persistent backends.
65/// The write is staged into the active session transaction and persisted by the
66/// outer commit. For volatile (in-memory) backends, it applies immediately.
67/// - All other methods operate independently of the session transaction and
68/// commit on their own.
69///
70/// # Source positions
71///
72/// Each source feeding a query has its own checkpoint entry. The opaque
73/// `source_position` bytes allow native stream resumption — on restart, the
74/// query reads each source's position and passes it via `resume_from`.
75#[async_trait]
76pub trait CheckpointStore: Send + Sync {
77 /// Whether this store persists checkpoints across process restarts.
78 ///
79 /// The orchestration layer uses this to decide whether to propagate
80 /// `resume_from` and `last_sequence` to sources on restart. Volatile
81 /// (in-memory) stores return `false`; persistent stores (RocksDB, Garnet)
82 /// return `true`.
83 fn is_persistent(&self) -> bool;
84
85 /// Stage a source checkpoint into the active session transaction.
86 ///
87 /// For persistent backends, must be called inside an open session (between
88 /// `SessionControl::begin` and `SessionControl::commit`). Returns an error
89 /// if no session is active.
90 ///
91 /// For volatile backends (in-memory), applies immediately.
92 async fn stage_checkpoint(
93 &self,
94 source_id: &str,
95 sequence: u64,
96 source_position: Option<&Bytes>,
97 ) -> Result<(), IndexError>;
98
99 /// Read the committed checkpoint for a single source.
100 ///
101 /// Returns `None` if no checkpoint has been written for `source_id`.
102 /// Reads committed state directly; does not require an active session.
103 async fn read_checkpoint(
104 &self,
105 source_id: &str,
106 ) -> Result<Option<SourceCheckpoint>, IndexError>;
107
108 /// Read all committed source checkpoints, keyed by source id.
109 ///
110 /// Returns an empty map if no checkpoints have been written.
111 /// Reads committed state directly; does not require an active session.
112 async fn read_all_checkpoints(&self) -> Result<HashMap<String, SourceCheckpoint>, IndexError>;
113
114 /// Delete all source checkpoints and the config hash.
115 ///
116 /// Used during auto-reset recovery and `delete_query(cleanup: true)`.
117 /// Standalone commit; not part of any outer session transaction.
118 async fn clear_checkpoints(&self) -> Result<(), IndexError>;
119
120 /// Write the query config hash.
121 ///
122 /// Used at startup to detect query configuration changes that require a
123 /// full re-bootstrap. Standalone commit.
124 async fn write_config_hash(&self, hash: u64) -> Result<(), IndexError>;
125
126 /// Read the stored config hash.
127 ///
128 /// Returns `None` if no hash has been written. Called at startup before
129 /// any session transaction begins.
130 async fn read_config_hash(&self) -> Result<Option<u64>, IndexError>;
131}