1pub(crate) mod attempt_batch;
4pub(crate) mod state;
5
6use crate::{
7 data_file::DataFile, error::Error, uncorrupt::UncorruptAction, wal::BlockDBLog, BlockDB,
8 BlockKey,
9};
10use state::{BlockDBUpdateState, DataFileUpdateState, FailedBatchState};
11use std::sync::Arc;
12use tokio::sync::RwLock;
13
14#[derive(Debug, Clone)]
15pub struct BatchResult {
16 pub freed_bytes: usize,
17 pub new_block_keys: Vec<BlockKey>,
18}
19
20impl BlockDB {
21 pub async fn batch<Bytes: AsRef<[u8]>>(
50 &mut self,
51 writes: Vec<Bytes>,
52 frees: Vec<&BlockKey>,
53 ) -> Result<BatchResult, Error> {
54 macro_rules! return_corrupted_error {
55 ($err:expr, $data_file_ids_to_delete:expr, $data_file_inverse_log_map:expr) => {
56 return Err(Error::Corrupted {
57 err: Box::new($err),
58 action: UncorruptAction::BadBatchUnwind {
59 data_file_ids_to_delete: $data_file_ids_to_delete,
60 data_file_inverse_log_map: $data_file_inverse_log_map,
61 },
62 });
63 };
64 }
65
66 match self.attempt_batch(writes, frees).await {
67 Ok(BlockDBUpdateState {
68 logs,
69 freed_bytes,
70 new_block_keys,
71 new_data_file_map,
72 data_file_log_map,
73 data_file_update_map,
74 data_file_inverse_log_map,
75 }) => {
76 self.wal.write().await.log_many(&logs).await?;
77
78 let added_data_file_ids = logs
79 .iter()
80 .filter_map(|log| {
81 if let BlockDBLog::AddDataFile(data_file_id) = log {
82 Some(data_file_id.to_string())
83 } else {
84 debug_assert!(
85 matches!(log, BlockDBLog::AddDataFile(_)),
86 "BlockDBUpdateState.logs had a invalid Log {log:?}"
87 );
88 None
89 }
90 })
91 .collect::<Vec<String>>();
92
93 let mut data_files_writer = self.data_files.write().await;
94 data_files_writer.extend(new_data_file_map);
95
96 let mut logged_data_files: Vec<Arc<RwLock<DataFile>>> = vec![];
97
98 for (data_file_id, data_file_logs) in data_file_log_map {
99 if let Some(data_file) = data_files_writer.get(&data_file_id) {
100 if let Err(err) =
101 data_file.write().await.wal.log_many(&data_file_logs).await
102 {
103 for logged_data_file in logged_data_files {
104 let mut logged_data_file_writer = logged_data_file.write().await;
105
106 if let Err(err) = logged_data_file_writer.wal.undo().await {
107 return_corrupted_error!(
108 Error::Walr(err),
109 added_data_file_ids,
110 data_file_inverse_log_map
111 );
112 } else if let Err(err) = logged_data_file_writer.recover().await {
113 return_corrupted_error!(
114 err,
115 added_data_file_ids,
116 data_file_inverse_log_map
117 );
118 }
119 }
120
121 return_corrupted_error!(
122 Error::Walr(err),
123 added_data_file_ids,
124 data_file_inverse_log_map
125 );
126 }
127
128 logged_data_files.push(data_file.clone());
129 }
130 }
131
132 for (
133 data_file_id,
134 DataFileUpdateState {
135 new_size,
136 new_free_bytes,
137 new_used_bytes,
138 new_data_blocks,
139 new_free_chunk_offsets,
140 },
141 ) in data_file_update_map
142 {
143 if let Some(data_file) = data_files_writer.get(&data_file_id) {
144 let mut data_file_writer = data_file.write().await;
145 data_file_writer.size = new_size;
146 data_file_writer.free_bytes = new_free_bytes;
147 data_file_writer.used_bytes = new_used_bytes;
148 data_file_writer.data_blocks = new_data_blocks;
149 data_file_writer.free_chunk_offsets = new_free_chunk_offsets;
150 }
151 }
152
153 Ok(BatchResult {
154 freed_bytes,
155 new_block_keys,
156 })
157 }
158 Err(FailedBatchState {
159 err,
160 data_file_inverse_log_map,
161 }) => {
162 return_corrupted_error!(err, vec![], data_file_inverse_log_map);
163 }
164 }
165 }
166}