block_db/batch/
mod.rs

1// Authors: Robert Lopez
2
3pub(crate) mod attempt_batch;
4pub(crate) mod state;
5
6use crate::{
7    data_file::DataFile, error::Error, uncorrupt::UncorruptAction, wal::BlockDBLog, BlockDB,
8    BlockKey,
9};
10use state::{BlockDBUpdateState, DataFileUpdateState, FailedBatchState};
11use std::sync::Arc;
12use tokio::sync::RwLock;
13
14#[derive(Debug, Clone)]
15pub struct BatchResult {
16    pub freed_bytes: usize,
17    pub new_block_keys: Vec<BlockKey>,
18}
19
20impl BlockDB {
21    /// Performs a batch of writes followed by a batch of frees,
22    /// combined into a single atomic operation.
23    ///
24    /// Returns:
25    /// - A `Vec<BlockKey>` containing the keys for the newly written `DataBlock`s,
26    ///   in the same order they were provided.
27    /// - The total number of bytes freed by the batch delete.
28    ///
29    /// ---
30    /// - **Atomic**
31    /// - **Corruptible**
32    ///
33    /// ---
34    /// Example
35    /// ```
36    /// let mut block_db = BlockDB::open("./data", None).await?;
37    ///
38    /// let block_key = block_db.write(b"Shark").await?;
39    ///
40    /// let BatchResult {
41    ///     freed_bytes,
42    ///     new_block_keys,
43    /// } = block_db
44    ///         .batch(
45    ///             vec![b"Hello", b"World"],
46    ///             vec![&block_key]
47    ///         ).await?;
48    /// ```
49    pub async fn batch<Bytes: AsRef<[u8]>>(
50        &mut self,
51        writes: Vec<Bytes>,
52        frees: Vec<&BlockKey>,
53    ) -> Result<BatchResult, Error> {
54        macro_rules! return_corrupted_error {
55            ($err:expr, $data_file_ids_to_delete:expr, $data_file_inverse_log_map:expr) => {
56                return Err(Error::Corrupted {
57                    err: Box::new($err),
58                    action: UncorruptAction::BadBatchUnwind {
59                        data_file_ids_to_delete: $data_file_ids_to_delete,
60                        data_file_inverse_log_map: $data_file_inverse_log_map,
61                    },
62                });
63            };
64        }
65
66        match self.attempt_batch(writes, frees).await {
67            Ok(BlockDBUpdateState {
68                logs,
69                freed_bytes,
70                new_block_keys,
71                new_data_file_map,
72                data_file_log_map,
73                data_file_update_map,
74                data_file_inverse_log_map,
75            }) => {
76                self.wal.write().await.log_many(&logs).await?;
77
78                let added_data_file_ids = logs
79                    .iter()
80                    .filter_map(|log| {
81                        if let BlockDBLog::AddDataFile(data_file_id) = log {
82                            Some(data_file_id.to_string())
83                        } else {
84                            debug_assert!(
85                                matches!(log, BlockDBLog::AddDataFile(_)),
86                                "BlockDBUpdateState.logs had a invalid Log {log:?}"
87                            );
88                            None
89                        }
90                    })
91                    .collect::<Vec<String>>();
92
93                let mut data_files_writer = self.data_files.write().await;
94                data_files_writer.extend(new_data_file_map);
95
96                let mut logged_data_files: Vec<Arc<RwLock<DataFile>>> = vec![];
97
98                for (data_file_id, data_file_logs) in data_file_log_map {
99                    if let Some(data_file) = data_files_writer.get(&data_file_id) {
100                        if let Err(err) =
101                            data_file.write().await.wal.log_many(&data_file_logs).await
102                        {
103                            for logged_data_file in logged_data_files {
104                                let mut logged_data_file_writer = logged_data_file.write().await;
105
106                                if let Err(err) = logged_data_file_writer.wal.undo().await {
107                                    return_corrupted_error!(
108                                        Error::Walr(err),
109                                        added_data_file_ids,
110                                        data_file_inverse_log_map
111                                    );
112                                } else if let Err(err) = logged_data_file_writer.recover().await {
113                                    return_corrupted_error!(
114                                        err,
115                                        added_data_file_ids,
116                                        data_file_inverse_log_map
117                                    );
118                                }
119                            }
120
121                            return_corrupted_error!(
122                                Error::Walr(err),
123                                added_data_file_ids,
124                                data_file_inverse_log_map
125                            );
126                        }
127
128                        logged_data_files.push(data_file.clone());
129                    }
130                }
131
132                for (
133                    data_file_id,
134                    DataFileUpdateState {
135                        new_size,
136                        new_free_bytes,
137                        new_used_bytes,
138                        new_data_blocks,
139                        new_free_chunk_offsets,
140                    },
141                ) in data_file_update_map
142                {
143                    if let Some(data_file) = data_files_writer.get(&data_file_id) {
144                        let mut data_file_writer = data_file.write().await;
145                        data_file_writer.size = new_size;
146                        data_file_writer.free_bytes = new_free_bytes;
147                        data_file_writer.used_bytes = new_used_bytes;
148                        data_file_writer.data_blocks = new_data_blocks;
149                        data_file_writer.free_chunk_offsets = new_free_chunk_offsets;
150                    }
151                }
152
153                Ok(BatchResult {
154                    freed_bytes,
155                    new_block_keys,
156                })
157            }
158            Err(FailedBatchState {
159                err,
160                data_file_inverse_log_map,
161            }) => {
162                return_corrupted_error!(err, vec![], data_file_inverse_log_map);
163            }
164        }
165    }
166}