Skip to main content

openstack_keystone_distributed_storage/store/
state_machine.rs

1// Licensed under the Apache License, Version 2.0 (the "License");
2// you may not use this file except in compliance with the License.
3// You may obtain a copy of the License at
4//
5//     http://www.apache.org/licenses/LICENSE-2.0
6//
7// Unless required by applicable law or agreed to in writing, software
8// distributed under the License is distributed on an "AS IS" BASIS,
9// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10// See the License for the specific language governing permissions and
11// limitations under the License.
12//
13// SPDX-License-Identifier: Apache-2.0
14//! # Fjall DB based `openraft` state machine implementation.
15
16use std::fs;
17use std::io;
18use std::path::PathBuf;
19use std::sync::Arc;
20
21use fjall::{Database, Keyspace, KeyspaceCreateOptions, PersistMode, Readable};
22use futures::Stream;
23use futures::TryStreamExt;
24use openraft::LogId;
25use openraft::OptionalSend;
26use openraft::RaftSnapshotBuilder;
27use openraft::SnapshotMeta;
28use openraft::StorageError;
29use openraft::StoredMembership;
30use openraft::alias::{LogIdOf, SnapshotDataOf};
31use openraft::entry::RaftEntry;
32use openraft::storage::EntryResponder;
33use openraft::storage::RaftStateMachine;
34use openraft::storage::Snapshot;
35use openraft::type_config::TypeConfigExt;
36use rand::RngExt;
37use serde::Deserialize;
38use serde::Serialize;
39
40use crate::StoreError;
41use crate::TypeConfig;
42use crate::protobuf as pb;
43
44const KEY_LAST_APPLIED_LOG: &[u8] = b"last_applied_log";
45const KEY_LAST_MEMBERSHIP: &[u8] = b"last_membership";
46
47/// Snapshot file format: metadata + data stored together.
48#[derive(Serialize, Deserialize)]
49struct SnapshotFile {
50    meta: SnapshotMeta<TypeConfig>,
51    data: Vec<(Vec<u8>, Vec<u8>)>,
52}
53
54/// State machine backed by FjallDB for full persistence.
55/// All application data is stored directly in the `data` column family.
56/// Snapshots are persisted to the `snapshot_dir` directory.
57#[derive(Clone)]
58pub struct FjallStateMachine {
59    db: Arc<Database>,
60    meta: Keyspace,
61    data: Keyspace,
62    snapshot_dir: PathBuf,
63}
64
65impl FjallStateMachine {
66    #[allow(clippy::result_large_err)]
67    pub fn new(db: Arc<Database>, snapshot_dir: PathBuf) -> Result<Self, StoreError> {
68        let meta = db.keyspace("meta", KeyspaceCreateOptions::default)?;
69        let data = db.keyspace("data", KeyspaceCreateOptions::default)?;
70
71        fs::create_dir_all(&snapshot_dir)?;
72
73        Ok(Self {
74            db,
75            snapshot_dir,
76            meta,
77            data,
78        })
79    }
80
81    pub fn data(&self) -> &Keyspace {
82        &self.data
83    }
84
85    #[allow(clippy::result_large_err)]
86    #[tracing::instrument(skip(self))]
87    fn get_meta(
88        &self,
89    ) -> Result<(Option<LogId<TypeConfig>>, StoredMembership<TypeConfig>), StoreError> {
90        let last_applied_log = self
91            .meta
92            .get(KEY_LAST_APPLIED_LOG)?
93            .map(|x| deserialize(&x))
94            .transpose()?;
95        let last_membership = self
96            .meta
97            .get(KEY_LAST_MEMBERSHIP)?
98            .map(|x| deserialize(&x))
99            .transpose()?
100            .unwrap_or_default();
101        Ok((last_applied_log, last_membership))
102    }
103}
104
105fn serialize<T: Serialize>(value: &T) -> Result<Vec<u8>, StorageError<TypeConfig>> {
106    serde_json::to_vec(value).map_err(|e| StorageError::write(TypeConfig::err_from_error(&e)))
107}
108
109fn deserialize<T: for<'de> Deserialize<'de>>(bytes: &[u8]) -> Result<T, StorageError<TypeConfig>> {
110    serde_json::from_slice(bytes).map_err(|e| StorageError::read(TypeConfig::err_from_error(&e)))
111}
112
113impl RaftSnapshotBuilder<TypeConfig> for Arc<FjallStateMachine> {
114    #[tracing::instrument(level = "trace", skip(self))]
115    async fn build_snapshot(&mut self) -> Result<Snapshot<TypeConfig>, io::Error> {
116        //// 1. Get the last applied log ID from the snapshot view
117        let (last_applied_log, last_membership) = self.get_meta()?;
118
119        // Generate a random snapshot index.
120        let snapshot_idx: u64 = rand::rng().random_range(0..1000);
121
122        let snapshot_id = if let Some(last) = last_applied_log {
123            format!(
124                "{}-{}-{}",
125                last.committed_leader_id(),
126                last.index(),
127                snapshot_idx
128            )
129        } else {
130            format!("--{}", snapshot_idx)
131        };
132
133        let meta = SnapshotMeta {
134            last_log_id: last_applied_log,
135            last_membership,
136            snapshot_id: snapshot_id.clone(),
137        };
138
139        tracing::trace!("snapshot metadata: {:?}", meta);
140
141        // 2. Capture a point-in-time view of the entire database
142        let snapshot = self.db.snapshot();
143
144        // 3. Serialize all KV pairs in the 'data' keyspace from the snapshot
145        let mut data_buffer = Vec::new();
146        for item in snapshot.iter(&self.data) {
147            let (key, value) = item
148                .into_inner()
149                .map_err(|e| io::Error::other(e.to_string()))?;
150            data_buffer.push((key.to_vec(), value.to_vec()));
151        }
152
153        // Serialize both metadata and data together
154        let snapshot_file = SnapshotFile {
155            meta: meta.clone(),
156            data: data_buffer.clone(),
157        };
158
159        let file_bytes = serialize(&snapshot_file).map_err(|e| {
160            StorageError::write_snapshot(Some(meta.signature()), TypeConfig::err_from_error(&e))
161        })?;
162
163        // Write complete snapshot to file
164        let snapshot_path = self.snapshot_dir.join(&snapshot_id);
165        fs::write(&snapshot_path, &file_bytes).map_err(|e| {
166            StorageError::write_snapshot(Some(meta.signature()), TypeConfig::err_from_error(&e))
167        })?;
168
169        // Return snapshot with data-only for backward compatibility with the data field
170        let data_bytes = serialize(&data_buffer).map_err(|e| {
171            StorageError::write_snapshot(Some(meta.signature()), TypeConfig::err_from_error(&e))
172        })?;
173        tracing::trace!("snapshot written to {:?}", snapshot_path);
174
175        Ok(Snapshot {
176            meta,
177            //snapshot: Cursor::new(data_bytes),
178            snapshot: data_bytes,
179        })
180    }
181}
182
183impl RaftStateMachine<TypeConfig> for Arc<FjallStateMachine> {
184    type SnapshotBuilder = Self;
185
186    #[tracing::instrument(skip(self))]
187    async fn applied_state(
188        &mut self,
189    ) -> Result<(Option<LogIdOf<TypeConfig>>, StoredMembership<TypeConfig>), io::Error> {
190        self.get_meta().map_err(|e| io::Error::other(e.to_string()))
191    }
192
193    #[tracing::instrument(skip(self))]
194    async fn get_snapshot_builder(&mut self) -> Self::SnapshotBuilder {
195        self.clone()
196    }
197
198    #[tracing::instrument(skip(self))]
199    async fn begin_receiving_snapshot(&mut self) -> Result<SnapshotDataOf<TypeConfig>, io::Error> {
200        Ok(Vec::new())
201    }
202
203    #[tracing::instrument(skip(self))]
204    async fn install_snapshot(
205        &mut self,
206        meta: &SnapshotMeta<TypeConfig>,
207        snapshot: SnapshotDataOf<TypeConfig>,
208    ) -> Result<(), io::Error> {
209        tracing::info!(
210            { snapshot_size = snapshot.len() },
211            "decoding snapshot for installation"
212        );
213
214        // Deserialize snapshot data
215        let snapshot_data: Vec<(Vec<u8>, Vec<u8>)> = deserialize(snapshot.as_ref())
216            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
217
218        // Clone data for file writing later
219        let snapshot_data_clone = snapshot_data.clone();
220
221        // Prepare metadata to restore
222        let last_applied_bytes = meta
223            .last_log_id
224            .as_ref()
225            .map(|log_id| {
226                serialize(log_id)
227                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))
228            })
229            .transpose()?;
230
231        let last_membership_bytes = serialize(&meta.last_membership)
232            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
233
234        let mut batch = self.db.batch();
235
236        for current in self.data.iter() {
237            if let Ok(k) = current.key() {
238                batch.remove(&self.data, k);
239            }
240        }
241
242        for (key, value) in snapshot_data {
243            batch.insert(&self.data, key, value);
244        }
245
246        if let Some(bytes) = last_applied_bytes {
247            batch.insert(&self.meta, KEY_LAST_APPLIED_LOG, bytes);
248        }
249        batch.insert(&self.meta, KEY_LAST_MEMBERSHIP, last_membership_bytes);
250
251        batch
252            .commit()
253            .map_err(|e| io::Error::other(e.to_string()))?;
254
255        self.db
256            .persist(PersistMode::SyncAll)
257            .map_err(|e| io::Error::other(e.to_string()))?;
258
259        // Write snapshot file with metadata for get_current_snapshot
260        let snapshot_file = SnapshotFile {
261            meta: meta.clone(),
262            data: snapshot_data_clone,
263        };
264        let file_bytes = serialize(&snapshot_file)
265            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
266
267        let snapshot_path = self.snapshot_dir.join(&meta.snapshot_id);
268        fs::write(&snapshot_path, &file_bytes)?;
269
270        Ok(())
271    }
272
273    #[tracing::instrument(skip(self))]
274    async fn get_current_snapshot(&mut self) -> Result<Option<Snapshot<TypeConfig>>, io::Error> {
275        // Find the latest snapshot file by comparing filenames lexicographically
276        let mut latest_snapshot_id: Option<String> = None;
277
278        for entry in fs::read_dir(&self.snapshot_dir)? {
279            let entry = entry?;
280            let path = entry.path();
281
282            if !path.is_file() {
283                continue;
284            }
285
286            if let Some(filename) = path.file_name().and_then(|n| n.to_str()) {
287                let snapshot_id = filename.to_string();
288
289                // Update latest if this is the first snapshot or if it's newer
290                if latest_snapshot_id
291                    .as_ref()
292                    .is_none_or(|current| snapshot_id > *current)
293                {
294                    latest_snapshot_id = Some(snapshot_id);
295                }
296            }
297        }
298
299        let Some(snapshot_id) = latest_snapshot_id else {
300            return Ok(None);
301        };
302
303        let snapshot_path = self.snapshot_dir.join(&snapshot_id);
304
305        // Read and deserialize snapshot file
306        let file_bytes = fs::read(&snapshot_path)?;
307        let snapshot_file: SnapshotFile = serde_json::from_slice(&file_bytes)
308            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
309
310        // Serialize data for snapshot field
311        let data_bytes = serde_json::to_vec(&snapshot_file.data)
312            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
313
314        Ok(Some(Snapshot {
315            meta: snapshot_file.meta,
316            snapshot: data_bytes,
317        }))
318    }
319
320    #[tracing::instrument(skip(self, entries))]
321    async fn apply<Strm>(&mut self, mut entries: Strm) -> Result<(), io::Error>
322    where
323        Strm: Stream<Item = Result<EntryResponder<TypeConfig>, io::Error>> + Unpin + OptionalSend,
324    {
325        let mut last_applied_log = None;
326        let mut last_membership = None;
327        let mut batch = self.db.batch();
328
329        while let Some((entry, responder)) = entries.try_next().await? {
330            last_applied_log = Some(entry.log_id());
331            let response = if let Some(req) = entry.app_data {
332                batch.insert(&self.data, req.key.as_bytes(), req.value.as_bytes());
333                Some(req.value.clone())
334            } else if let Some(mem) = entry.membership {
335                last_membership = Some(StoredMembership::new(last_applied_log, mem.try_into()?));
336                None
337            } else {
338                None
339            };
340            if let Some(responder) = responder {
341                responder.send(pb::api::Response { value: response });
342            }
343        }
344        if let Some(val) = last_membership {
345            batch.insert(
346                &self.meta,
347                KEY_LAST_MEMBERSHIP,
348                serde_json::to_vec(&val).map_err(|e| io::Error::other(e.to_string()))?,
349            );
350        }
351        if let Some(val) = last_applied_log {
352            batch.insert(
353                &self.meta,
354                KEY_LAST_APPLIED_LOG,
355                serde_json::to_vec(&val).map_err(|e| io::Error::other(e.to_string()))?,
356            );
357        }
358
359        batch
360            .commit()
361            .map_err(|e| io::Error::other(e.to_string()))?;
362
363        self.db
364            .persist(PersistMode::SyncAll)
365            .map_err(|e| io::Error::other(e.to_string()))?;
366        Ok(())
367    }
368}