openstack_keystone_distributed_storage/store/
state_machine.rs1use std::fs;
17use std::io;
18use std::path::PathBuf;
19use std::sync::Arc;
20
21use fjall::{Database, Keyspace, KeyspaceCreateOptions, PersistMode, Readable};
22use futures::Stream;
23use futures::TryStreamExt;
24use openraft::LogId;
25use openraft::OptionalSend;
26use openraft::RaftSnapshotBuilder;
27use openraft::SnapshotMeta;
28use openraft::StorageError;
29use openraft::StoredMembership;
30use openraft::alias::{LogIdOf, SnapshotDataOf};
31use openraft::entry::RaftEntry;
32use openraft::storage::EntryResponder;
33use openraft::storage::RaftStateMachine;
34use openraft::storage::Snapshot;
35use openraft::type_config::TypeConfigExt;
36use rand::RngExt;
37use serde::Deserialize;
38use serde::Serialize;
39
40use crate::StoreError;
41use crate::TypeConfig;
42use crate::protobuf as pb;
43
44const KEY_LAST_APPLIED_LOG: &[u8] = b"last_applied_log";
45const KEY_LAST_MEMBERSHIP: &[u8] = b"last_membership";
46
47#[derive(Serialize, Deserialize)]
49struct SnapshotFile {
50 meta: SnapshotMeta<TypeConfig>,
51 data: Vec<(Vec<u8>, Vec<u8>)>,
52}
53
54#[derive(Clone)]
58pub struct FjallStateMachine {
59 db: Arc<Database>,
60 meta: Keyspace,
61 data: Keyspace,
62 snapshot_dir: PathBuf,
63}
64
65impl FjallStateMachine {
66 #[allow(clippy::result_large_err)]
67 pub fn new(db: Arc<Database>, snapshot_dir: PathBuf) -> Result<Self, StoreError> {
68 let meta = db.keyspace("meta", KeyspaceCreateOptions::default)?;
69 let data = db.keyspace("data", KeyspaceCreateOptions::default)?;
70
71 fs::create_dir_all(&snapshot_dir)?;
72
73 Ok(Self {
74 db,
75 snapshot_dir,
76 meta,
77 data,
78 })
79 }
80
81 pub fn data(&self) -> &Keyspace {
82 &self.data
83 }
84
85 #[allow(clippy::result_large_err)]
86 #[tracing::instrument(skip(self))]
87 fn get_meta(
88 &self,
89 ) -> Result<(Option<LogId<TypeConfig>>, StoredMembership<TypeConfig>), StoreError> {
90 let last_applied_log = self
91 .meta
92 .get(KEY_LAST_APPLIED_LOG)?
93 .map(|x| deserialize(&x))
94 .transpose()?;
95 let last_membership = self
96 .meta
97 .get(KEY_LAST_MEMBERSHIP)?
98 .map(|x| deserialize(&x))
99 .transpose()?
100 .unwrap_or_default();
101 Ok((last_applied_log, last_membership))
102 }
103}
104
105fn serialize<T: Serialize>(value: &T) -> Result<Vec<u8>, StorageError<TypeConfig>> {
106 serde_json::to_vec(value).map_err(|e| StorageError::write(TypeConfig::err_from_error(&e)))
107}
108
109fn deserialize<T: for<'de> Deserialize<'de>>(bytes: &[u8]) -> Result<T, StorageError<TypeConfig>> {
110 serde_json::from_slice(bytes).map_err(|e| StorageError::read(TypeConfig::err_from_error(&e)))
111}
112
113impl RaftSnapshotBuilder<TypeConfig> for Arc<FjallStateMachine> {
114 #[tracing::instrument(level = "trace", skip(self))]
115 async fn build_snapshot(&mut self) -> Result<Snapshot<TypeConfig>, io::Error> {
116 let (last_applied_log, last_membership) = self.get_meta()?;
118
119 let snapshot_idx: u64 = rand::rng().random_range(0..1000);
121
122 let snapshot_id = if let Some(last) = last_applied_log {
123 format!(
124 "{}-{}-{}",
125 last.committed_leader_id(),
126 last.index(),
127 snapshot_idx
128 )
129 } else {
130 format!("--{}", snapshot_idx)
131 };
132
133 let meta = SnapshotMeta {
134 last_log_id: last_applied_log,
135 last_membership,
136 snapshot_id: snapshot_id.clone(),
137 };
138
139 tracing::trace!("snapshot metadata: {:?}", meta);
140
141 let snapshot = self.db.snapshot();
143
144 let mut data_buffer = Vec::new();
146 for item in snapshot.iter(&self.data) {
147 let (key, value) = item
148 .into_inner()
149 .map_err(|e| io::Error::other(e.to_string()))?;
150 data_buffer.push((key.to_vec(), value.to_vec()));
151 }
152
153 let snapshot_file = SnapshotFile {
155 meta: meta.clone(),
156 data: data_buffer.clone(),
157 };
158
159 let file_bytes = serialize(&snapshot_file).map_err(|e| {
160 StorageError::write_snapshot(Some(meta.signature()), TypeConfig::err_from_error(&e))
161 })?;
162
163 let snapshot_path = self.snapshot_dir.join(&snapshot_id);
165 fs::write(&snapshot_path, &file_bytes).map_err(|e| {
166 StorageError::write_snapshot(Some(meta.signature()), TypeConfig::err_from_error(&e))
167 })?;
168
169 let data_bytes = serialize(&data_buffer).map_err(|e| {
171 StorageError::write_snapshot(Some(meta.signature()), TypeConfig::err_from_error(&e))
172 })?;
173 tracing::trace!("snapshot written to {:?}", snapshot_path);
174
175 Ok(Snapshot {
176 meta,
177 snapshot: data_bytes,
179 })
180 }
181}
182
183impl RaftStateMachine<TypeConfig> for Arc<FjallStateMachine> {
184 type SnapshotBuilder = Self;
185
186 #[tracing::instrument(skip(self))]
187 async fn applied_state(
188 &mut self,
189 ) -> Result<(Option<LogIdOf<TypeConfig>>, StoredMembership<TypeConfig>), io::Error> {
190 self.get_meta().map_err(|e| io::Error::other(e.to_string()))
191 }
192
193 #[tracing::instrument(skip(self))]
194 async fn get_snapshot_builder(&mut self) -> Self::SnapshotBuilder {
195 self.clone()
196 }
197
198 #[tracing::instrument(skip(self))]
199 async fn begin_receiving_snapshot(&mut self) -> Result<SnapshotDataOf<TypeConfig>, io::Error> {
200 Ok(Vec::new())
201 }
202
203 #[tracing::instrument(skip(self))]
204 async fn install_snapshot(
205 &mut self,
206 meta: &SnapshotMeta<TypeConfig>,
207 snapshot: SnapshotDataOf<TypeConfig>,
208 ) -> Result<(), io::Error> {
209 tracing::info!(
210 { snapshot_size = snapshot.len() },
211 "decoding snapshot for installation"
212 );
213
214 let snapshot_data: Vec<(Vec<u8>, Vec<u8>)> = deserialize(snapshot.as_ref())
216 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
217
218 let snapshot_data_clone = snapshot_data.clone();
220
221 let last_applied_bytes = meta
223 .last_log_id
224 .as_ref()
225 .map(|log_id| {
226 serialize(log_id)
227 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))
228 })
229 .transpose()?;
230
231 let last_membership_bytes = serialize(&meta.last_membership)
232 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
233
234 let mut batch = self.db.batch();
235
236 for current in self.data.iter() {
237 if let Ok(k) = current.key() {
238 batch.remove(&self.data, k);
239 }
240 }
241
242 for (key, value) in snapshot_data {
243 batch.insert(&self.data, key, value);
244 }
245
246 if let Some(bytes) = last_applied_bytes {
247 batch.insert(&self.meta, KEY_LAST_APPLIED_LOG, bytes);
248 }
249 batch.insert(&self.meta, KEY_LAST_MEMBERSHIP, last_membership_bytes);
250
251 batch
252 .commit()
253 .map_err(|e| io::Error::other(e.to_string()))?;
254
255 self.db
256 .persist(PersistMode::SyncAll)
257 .map_err(|e| io::Error::other(e.to_string()))?;
258
259 let snapshot_file = SnapshotFile {
261 meta: meta.clone(),
262 data: snapshot_data_clone,
263 };
264 let file_bytes = serialize(&snapshot_file)
265 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
266
267 let snapshot_path = self.snapshot_dir.join(&meta.snapshot_id);
268 fs::write(&snapshot_path, &file_bytes)?;
269
270 Ok(())
271 }
272
273 #[tracing::instrument(skip(self))]
274 async fn get_current_snapshot(&mut self) -> Result<Option<Snapshot<TypeConfig>>, io::Error> {
275 let mut latest_snapshot_id: Option<String> = None;
277
278 for entry in fs::read_dir(&self.snapshot_dir)? {
279 let entry = entry?;
280 let path = entry.path();
281
282 if !path.is_file() {
283 continue;
284 }
285
286 if let Some(filename) = path.file_name().and_then(|n| n.to_str()) {
287 let snapshot_id = filename.to_string();
288
289 if latest_snapshot_id
291 .as_ref()
292 .is_none_or(|current| snapshot_id > *current)
293 {
294 latest_snapshot_id = Some(snapshot_id);
295 }
296 }
297 }
298
299 let Some(snapshot_id) = latest_snapshot_id else {
300 return Ok(None);
301 };
302
303 let snapshot_path = self.snapshot_dir.join(&snapshot_id);
304
305 let file_bytes = fs::read(&snapshot_path)?;
307 let snapshot_file: SnapshotFile = serde_json::from_slice(&file_bytes)
308 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
309
310 let data_bytes = serde_json::to_vec(&snapshot_file.data)
312 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
313
314 Ok(Some(Snapshot {
315 meta: snapshot_file.meta,
316 snapshot: data_bytes,
317 }))
318 }
319
320 #[tracing::instrument(skip(self, entries))]
321 async fn apply<Strm>(&mut self, mut entries: Strm) -> Result<(), io::Error>
322 where
323 Strm: Stream<Item = Result<EntryResponder<TypeConfig>, io::Error>> + Unpin + OptionalSend,
324 {
325 let mut last_applied_log = None;
326 let mut last_membership = None;
327 let mut batch = self.db.batch();
328
329 while let Some((entry, responder)) = entries.try_next().await? {
330 last_applied_log = Some(entry.log_id());
331 let response = if let Some(req) = entry.app_data {
332 batch.insert(&self.data, req.key.as_bytes(), req.value.as_bytes());
333 Some(req.value.clone())
334 } else if let Some(mem) = entry.membership {
335 last_membership = Some(StoredMembership::new(last_applied_log, mem.try_into()?));
336 None
337 } else {
338 None
339 };
340 if let Some(responder) = responder {
341 responder.send(pb::api::Response { value: response });
342 }
343 }
344 if let Some(val) = last_membership {
345 batch.insert(
346 &self.meta,
347 KEY_LAST_MEMBERSHIP,
348 serde_json::to_vec(&val).map_err(|e| io::Error::other(e.to_string()))?,
349 );
350 }
351 if let Some(val) = last_applied_log {
352 batch.insert(
353 &self.meta,
354 KEY_LAST_APPLIED_LOG,
355 serde_json::to_vec(&val).map_err(|e| io::Error::other(e.to_string()))?,
356 );
357 }
358
359 batch
360 .commit()
361 .map_err(|e| io::Error::other(e.to_string()))?;
362
363 self.db
364 .persist(PersistMode::SyncAll)
365 .map_err(|e| io::Error::other(e.to_string()))?;
366 Ok(())
367 }
368}